From 4488c077a736d05b181c78377a7fdaaa90b8175a Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Tue, 19 Sep 2023 22:47:15 +0200 Subject: [PATCH 01/26] files cache: add chunk size information the files cache used to have only the chunk ids, so it had to rely on the chunks index having the size information - which is problematic with e.g. the AdhocCache (has size==0 for all not new chunks) and blocked using the files cache there. --- src/borg/archive.py | 18 ++++++++++-------- src/borg/cache.py | 19 ++++++++++--------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 4270fc386..f058e6ce8 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1552,25 +1552,27 @@ class FilesystemObjectProcessors: started_hashing = time.monotonic() path_hash = self.key.id_hash(hashed_path) self.stats.hashing_time += time.monotonic() - started_hashing - known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st) + known, chunks = cache.file_known_and_unchanged(hashed_path, path_hash, st) else: # in --read-special mode, we may be called for special files. # there should be no information in the cache about special files processed in # read-special mode, but we better play safe as this was wrong in the past: hashed_path = path_hash = None - known, ids = False, None - if ids is not None: + known, chunks = False, None + if chunks is not None: # Make sure all ids are available - for id_ in ids: - if not cache.seen_chunk(id_): + for chunk in chunks: + if not cache.seen_chunk(chunk.id): # cache said it is unmodified, but we lost a chunk: process file like modified status = "M" break else: item.chunks = [] - for chunk_id in ids: + for chunk in chunks: # process one-by-one, so we will know in item.chunks how far we got - chunk_entry = cache.chunk_incref(chunk_id, self.stats) + chunk_entry = cache.chunk_incref(chunk.id, self.stats) + # chunk.size is from files cache, chunk_entry.size from index: + assert chunk == chunk_entry item.chunks.append(chunk_entry) status = "U" # regular file, unchanged else: @@ -1606,7 +1608,7 @@ class FilesystemObjectProcessors: # block or char device will change without its mtime/size/inode changing. # also, we must not memorize a potentially inconsistent/corrupt file that # changed while we backed it up. - cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks]) + cache.memorize_file(hashed_path, path_hash, st, item.chunks) self.stats.files_stats[status] += 1 # must be done late if not changed_while_backup: status = None # we already called print_file_status diff --git a/src/borg/cache.py b/src/borg/cache.py index 5394a2133..5ad9d38c4 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -35,8 +35,8 @@ from .platform import SaveFile from .remote import cache_if_remote from .repository import LIST_SCAN_LIMIT -# note: cmtime might me either a ctime or a mtime timestamp -FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunk_ids") +# note: cmtime might be either a ctime or a mtime timestamp, chunks is a list of ChunkListEntry +FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunks") class SecurityManager: @@ -1030,8 +1030,8 @@ class LocalCache(CacheStatsMixin): :param hashed_path: the file's path as we gave it to hash(hashed_path) :param path_hash: hash(hashed_path), to save some memory in the files cache :param st: the file's stat() result - :return: known, ids (known is True if we have infos about this file in the cache, - ids is the list of chunk ids IF the file has not changed, otherwise None). + :return: known, chunks (known is True if we have infos about this file in the cache, + chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None). """ if not stat.S_ISREG(st.st_mode): return False, None @@ -1072,9 +1072,10 @@ class LocalCache(CacheStatsMixin): # again at that time), we need to update the inode number in the cache with what # we see in the filesystem. self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0)) - return True, entry.chunk_ids + chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple + return True, chunks - def memorize_file(self, hashed_path, path_hash, st, ids): + def memorize_file(self, hashed_path, path_hash, st, chunks): if not stat.S_ISREG(st.st_mode): return cache_mode = self.cache_mode @@ -1092,13 +1093,13 @@ class LocalCache(CacheStatsMixin): cmtime_type = "ctime" cmtime_ns = safe_ns(st.st_ctime_ns) entry = FileCacheEntry( - age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunk_ids=ids + age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks ) self.files[path_hash] = msgpack.packb(entry) self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns) files_cache_logger.debug( "FILES-CACHE-UPDATE: put %r [has %s] <- %r", - entry._replace(chunk_ids="[%d entries]" % len(entry.chunk_ids)), + entry._replace(chunks="[%d entries]" % len(entry.chunks)), cmtime_type, hashed_path, ) @@ -1149,7 +1150,7 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" files_cache_logger.debug("UNKNOWN: files cache not implemented") return False, None - def memorize_file(self, hashed_path, path_hash, st, ids): + def memorize_file(self, hashed_path, path_hash, st, chunks): pass def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None, ro_type=ROBJ_FILE_STREAM): From 17fce18b443134842cf803527706f849e3c63140 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 21 Sep 2023 21:38:09 +0200 Subject: [PATCH 02/26] always give id and size to chunk_incref/chunk_decref incref: returns (id, size), so it needs the size if it can't get it from the chunks index. also needed for updating stats. decref: caller does not always have the chunk size (e.g. for metadata chunks), as we consider 0 to be an invalid size, we call with size == 1 in that case. thus, stats might be slightly off. --- src/borg/archive.py | 39 +++++++++++++------------- src/borg/archiver/transfer_cmd.py | 2 +- src/borg/cache.py | 46 ++++++++++++++++++------------- src/borg/testsuite/cache.py | 8 +++--- src/borg/upgrade.py | 4 +-- 5 files changed, 53 insertions(+), 46 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index f058e6ce8..891a16222 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -643,14 +643,14 @@ Duration: {0.duration} # so we can already remove it here, the next .save() will then commit this cleanup. # remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks: del self.manifest.archives[self.checkpoint_name] - self.cache.chunk_decref(self.id, self.stats) + self.cache.chunk_decref(self.id, 1, self.stats) for id in metadata.item_ptrs: - self.cache.chunk_decref(id, self.stats) + self.cache.chunk_decref(id, 1, self.stats) # also get rid of that part item, we do not want to have it in next checkpoint or final archive tail_chunks = self.items_buffer.restore_chunks_state() # tail_chunks contain the tail of the archive items metadata stream, not needed for next commit. for id in tail_chunks: - self.cache.chunk_decref(id, self.stats) + self.cache.chunk_decref(id, 1, self.stats) # TODO can we have real size here? def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None): name = name or self.name @@ -1024,7 +1024,7 @@ Duration: {0.duration} new_id = self.key.id_hash(data) self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META) self.manifest.archives[self.name] = (new_id, metadata.time) - self.cache.chunk_decref(self.id, self.stats) + self.cache.chunk_decref(self.id, 1, self.stats) self.id = new_id def rename(self, name): @@ -1052,9 +1052,9 @@ Duration: {0.duration} error = True return exception_ignored # must not return None here - def chunk_decref(id, stats): + def chunk_decref(id, size, stats): try: - self.cache.chunk_decref(id, stats, wait=False) + self.cache.chunk_decref(id, size, stats, wait=False) except KeyError: cid = bin_to_hex(id) raise ChunksIndexError(cid) @@ -1073,13 +1073,13 @@ Duration: {0.duration} pi.show(i) _, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM) unpacker.feed(data) - chunk_decref(items_id, stats) + chunk_decref(items_id, 1, stats) try: for item in unpacker: item = Item(internal_dict=item) if "chunks" in item: for chunk_id, size in item.chunks: - chunk_decref(chunk_id, stats) + chunk_decref(chunk_id, size, stats) except (TypeError, ValueError): # if items metadata spans multiple chunks and one chunk got dropped somehow, # it could be that unpacker yields bad types @@ -1096,12 +1096,12 @@ Duration: {0.duration} # delete the blocks that store all the references that end up being loaded into metadata.items: for id in self.metadata.item_ptrs: - chunk_decref(id, stats) + chunk_decref(id, 1, stats) # in forced delete mode, we try hard to delete at least the manifest entry, # if possible also the archive superblock, even if processing the items raises # some harmless exception. - chunk_decref(self.id, stats) + chunk_decref(self.id, 1, stats) del self.manifest.archives[self.name] while fetch_async_response(wait=True) is not None: # we did async deletes, process outstanding results (== exceptions), @@ -1510,7 +1510,7 @@ class FilesystemObjectProcessors: except BackupOSError: # see comments in process_file's exception handler, same issue here. for chunk in item.get("chunks", []): - cache.chunk_decref(chunk.id, self.stats, wait=False) + cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False) raise else: item.get_size(memorize=True) @@ -1544,7 +1544,7 @@ class FilesystemObjectProcessors: item.chunks = [] for chunk_id, chunk_size in hl_chunks: # process one-by-one, so we will know in item.chunks how far we got - chunk_entry = cache.chunk_incref(chunk_id, self.stats) + chunk_entry = cache.chunk_incref(chunk_id, chunk_size, self.stats) item.chunks.append(chunk_entry) else: # normal case, no "2nd+" hardlink if not is_special_file: @@ -1570,10 +1570,8 @@ class FilesystemObjectProcessors: item.chunks = [] for chunk in chunks: # process one-by-one, so we will know in item.chunks how far we got - chunk_entry = cache.chunk_incref(chunk.id, self.stats) - # chunk.size is from files cache, chunk_entry.size from index: - assert chunk == chunk_entry - item.chunks.append(chunk_entry) + cache.chunk_incref(chunk.id, chunk.size, self.stats) + item.chunks.append(chunk) status = "U" # regular file, unchanged else: status = "M" if known else "A" # regular file, modified or added @@ -1622,7 +1620,7 @@ class FilesystemObjectProcessors: # but we will not add an item (see add_item in create_helper) and thus # they would be orphaned chunks in case that we commit the transaction. for chunk in item.get("chunks", []): - cache.chunk_decref(chunk.id, self.stats, wait=False) + cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False) # Now that we have cleaned up the chunk references, we can re-raise the exception. # This will skip processing of this file, but might retry or continue with the next one. raise @@ -1733,7 +1731,7 @@ class TarfileObjectProcessors: except BackupOSError: # see comment in FilesystemObjectProcessors.process_file, same issue here. for chunk in item.get("chunks", []): - self.cache.chunk_decref(chunk.id, self.stats, wait=False) + self.cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False) raise @@ -2446,7 +2444,7 @@ class ArchiveRecreater: def process_chunks(self, archive, target, item): if not target.recreate_rechunkify: for chunk_id, size in item.chunks: - self.cache.chunk_incref(chunk_id, target.stats) + self.cache.chunk_incref(chunk_id, size, target.stats) return item.chunks chunk_iterator = self.iter_chunks(archive, target, list(item.chunks)) chunk_processor = partial(self.chunk_processor, target) @@ -2454,8 +2452,9 @@ class ArchiveRecreater: def chunk_processor(self, target, chunk): chunk_id, data = cached_hash(chunk, self.key.id_hash) + size = len(data) if chunk_id in self.seen_chunks: - return self.cache.chunk_incref(chunk_id, target.stats) + return self.cache.chunk_incref(chunk_id, size, target.stats) chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM) self.cache.repository.async_response(wait=False) self.seen_chunks.add(chunk_entry.id) diff --git a/src/borg/archiver/transfer_cmd.py b/src/borg/archiver/transfer_cmd.py index 1922cf1cf..1ba8ed3c8 100644 --- a/src/borg/archiver/transfer_cmd.py +++ b/src/borg/archiver/transfer_cmd.py @@ -143,7 +143,7 @@ class TransferMixIn: transfer_size += size else: if not dry_run: - chunk_entry = cache.chunk_incref(chunk_id, archive.stats) + chunk_entry = cache.chunk_incref(chunk_id, size, archive.stats) chunks.append(chunk_entry) present_size += size if not dry_run: diff --git a/src/borg/cache.py b/src/borg/cache.py index 5ad9d38c4..5cd3cc497 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -979,11 +979,14 @@ class LocalCache(CacheStatsMixin): assert ro_type is not None if not self.txn_active: self.begin_txn() - if size is None and compress: - size = len(data) # data is still uncompressed + if size is None: + if compress: + size = len(data) # data is still uncompressed + else: + raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") refcount = self.seen_chunk(id, size) if refcount: - return self.chunk_incref(id, stats) + return self.chunk_incref(id, size, stats) if size is None: raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also") cdata = self.repo_objs.format( @@ -1004,17 +1007,21 @@ class LocalCache(CacheStatsMixin): ) return refcount - def chunk_incref(self, id, stats, size=None): + def chunk_incref(self, id, size, stats): + assert isinstance(size, int) and size > 0 if not self.txn_active: self.begin_txn() count, _size = self.chunks.incref(id) - stats.update(_size, False) - return ChunkListEntry(id, _size) + assert size == _size + stats.update(size, False) + return ChunkListEntry(id, size) - def chunk_decref(self, id, stats, wait=True): + def chunk_decref(self, id, size, stats, wait=True): + assert isinstance(size, int) and size > 0 if not self.txn_active: self.begin_txn() - count, size = self.chunks.decref(id) + count, _size = self.chunks.decref(id) + assert size == 1 or size == _size # don't check if caller gave fake size 1 if count == 0: del self.chunks[id] self.repository.delete(id, wait=wait) @@ -1157,13 +1164,14 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" assert ro_type is not None if not self._txn_active: self.begin_txn() - if size is None and compress: - size = len(data) # data is still uncompressed if size is None: - raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") + if compress: + size = len(data) # data is still uncompressed + else: + raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") refcount = self.seen_chunk(id, size) if refcount: - return self.chunk_incref(id, stats, size=size) + return self.chunk_incref(id, size, stats) cdata = self.repo_objs.format(id, meta, data, compress=compress, ro_type=ro_type) self.repository.put(id, cdata, wait=wait) self.chunks.add(id, 1, size) @@ -1181,21 +1189,21 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" self.chunks[id] = entry._replace(size=size) return entry.refcount - def chunk_incref(self, id, stats, size=None): + def chunk_incref(self, id, size, stats): + assert isinstance(size, int) and size > 0 if not self._txn_active: self.begin_txn() count, _size = self.chunks.incref(id) - # When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with - # size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to. - size = _size or size - assert size + assert size == _size stats.update(size, False) return ChunkListEntry(id, size) - def chunk_decref(self, id, stats, wait=True): + def chunk_decref(self, id, size, stats, wait=True): + assert isinstance(size, int) and size > 0 if not self._txn_active: self.begin_txn() - count, size = self.chunks.decref(id) + count, _size = self.chunks.decref(id) + assert size == 1 or size == _size # don't check if caller gave fake size 1 if count == 0: del self.chunks[id] self.repository.delete(id, wait=wait) diff --git a/src/borg/testsuite/cache.py b/src/borg/testsuite/cache.py index 571a483d2..60cb870e3 100644 --- a/src/borg/testsuite/cache.py +++ b/src/borg/testsuite/cache.py @@ -189,7 +189,7 @@ class TestAdHocCache: def test_does_not_delete_existing_chunks(self, repository, cache): assert cache.seen_chunk(H(1)) == ChunkIndex.MAX_VALUE - cache.chunk_decref(H(1), Statistics()) + cache.chunk_decref(H(1), 1, Statistics()) assert repository.get(H(1)) == b"1234" def test_seen_chunk_add_chunk_size(self, cache): @@ -199,7 +199,7 @@ class TestAdHocCache: """E.g. checkpoint archives""" cache.add_chunk(H(5), {}, b"1010", stats=Statistics()) assert cache.seen_chunk(H(5)) == 1 - cache.chunk_decref(H(5), Statistics()) + cache.chunk_decref(H(5), 1, Statistics()) assert not cache.seen_chunk(H(5)) with pytest.raises(Repository.ObjectNotFound): repository.get(H(5)) @@ -220,9 +220,9 @@ class TestAdHocCache: def test_incref_after_add_chunk(self, cache): assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4) - assert cache.chunk_incref(H(3), Statistics()) == (H(3), 4) + assert cache.chunk_incref(H(3), 4, Statistics()) == (H(3), 4) def test_existing_incref_after_add_chunk(self, cache): """This case occurs with part files, see Archive.chunk_file.""" assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4) - assert cache.chunk_incref(H(1), Statistics()) == (H(1), 4) + assert cache.chunk_incref(H(1), 4, Statistics()) == (H(1), 4) diff --git a/src/borg/upgrade.py b/src/borg/upgrade.py index a8d17ac41..22a27c18c 100644 --- a/src/borg/upgrade.py +++ b/src/borg/upgrade.py @@ -84,8 +84,8 @@ class UpgraderFrom12To20: chunks, chunks_healthy = self.hlm.retrieve(id=hlid, default=(None, None)) if chunks is not None: item.chunks = chunks - for chunk_id, _ in chunks: - self.cache.chunk_incref(chunk_id, self.archive.stats) + for chunk_id, chunk_size in chunks: + self.cache.chunk_incref(chunk_id, chunk_size, self.archive.stats) if chunks_healthy is not None: item.chunks_healthy = chunks del item.source # not used for hardlinks any more, replaced by hlid From de342581d6b7fa93a5623be82b45e07317eff585 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 23 Sep 2023 01:01:07 +0200 Subject: [PATCH 03/26] fix AdHocCache.add_chunk signature (ctype, clevel kwargs) --- src/borg/cache.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 5cd3cc497..831a46ee0 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -1160,7 +1160,20 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" def memorize_file(self, hashed_path, path_hash, st, chunks): pass - def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None, ro_type=ROBJ_FILE_STREAM): + def add_chunk( + self, + id, + meta, + data, + *, + stats, + wait=True, + compress=True, + size=None, + ctype=None, + clevel=None, + ro_type=ROBJ_FILE_STREAM, + ): assert ro_type is not None if not self._txn_active: self.begin_txn() @@ -1172,7 +1185,9 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" refcount = self.seen_chunk(id, size) if refcount: return self.chunk_incref(id, size, stats) - cdata = self.repo_objs.format(id, meta, data, compress=compress, ro_type=ro_type) + cdata = self.repo_objs.format( + id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type + ) self.repository.put(id, cdata, wait=wait) self.chunks.add(id, 1, size) stats.update(size, not refcount) From 98162fbb42a4eb89e2f3fcacbe51d352b4599e05 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 21 Sep 2023 23:28:30 +0200 Subject: [PATCH 04/26] create --no-cache-sync-forced option when given, force using the AdHocCache. --- src/borg/archiver/create_cmd.py | 7 +++++++ src/borg/cache.py | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/src/borg/archiver/create_cmd.py b/src/borg/archiver/create_cmd.py index 5b0547ca0..d87c1bd1e 100644 --- a/src/borg/archiver/create_cmd.py +++ b/src/borg/archiver/create_cmd.py @@ -225,6 +225,7 @@ class CreateMixIn: progress=args.progress, lock_wait=self.lock_wait, permit_adhoc_cache=args.no_cache_sync, + force_adhoc_cache=args.no_cache_sync_forced, cache_mode=args.files_cache_mode, iec=args.iec, ) as cache: @@ -803,6 +804,12 @@ class CreateMixIn: action="store_true", help="experimental: do not synchronize the cache. Implies not using the files cache.", ) + subparser.add_argument( + "--no-cache-sync-forced", + dest="no_cache_sync_forced", + action="store_true", + help="experimental: do not synchronize the cache (forced). Implies not using the files cache.", + ) subparser.add_argument( "--stdin-name", metavar="NAME", diff --git a/src/borg/cache.py b/src/borg/cache.py index 831a46ee0..3338c111b 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -413,6 +413,7 @@ class Cache: progress=False, lock_wait=None, permit_adhoc_cache=False, + force_adhoc_cache=False, cache_mode=FILES_CACHE_MODE_DISABLED, iec=False, ): @@ -431,6 +432,9 @@ class Cache: def adhoc(): return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec) + if force_adhoc_cache: + return adhoc() + if not permit_adhoc_cache: return local() From d466005682c0a5347a26500758a04e94119042eb Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 22 Sep 2023 20:28:17 +0200 Subject: [PATCH 05/26] refactor files cache code into FilesCacheMixin class --- src/borg/cache.py | 355 ++++++++++++++++++++++++---------------------- 1 file changed, 187 insertions(+), 168 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 3338c111b..9944dac71 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -248,15 +248,6 @@ def cache_dir(repository, path=None): return path or os.path.join(get_cache_dir(), repository.id_str) -def files_cache_name(): - suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "") - return "files." + suffix if suffix else "files" - - -def discover_files_cache_name(path): - return [fn for fn in os.listdir(path) if fn == "files" or fn.startswith("files.")][0] - - class CacheConfig: def __init__(self, repository, path=None, lock_wait=None): self.repository = repository @@ -493,7 +484,183 @@ Total chunks: {0.total_chunks} return self.Summary(**stats) -class LocalCache(CacheStatsMixin): +class FilesCacheMixin: + """ + Massively accelerate processing of unchanged files by caching their chunks list. + With that, we can avoid having to read and chunk them to get their chunks list. + """ + + FILES_CACHE_NAME = "files" + + def __init__(self, cache_mode): + self.cache_mode = cache_mode + self.files = None + self._newest_cmtime = None + + def files_cache_name(self): + suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "") + return self.FILES_CACHE_NAME + "." + suffix if suffix else self.FILES_CACHE_NAME + + def discover_files_cache_name(self, path): + return [ + fn for fn in os.listdir(path) if fn == self.FILES_CACHE_NAME or fn.startswith(self.FILES_CACHE_NAME + ".") + ][0] + + def _create_empty_files_cache(self, path): + with IntegrityCheckedFile(path=os.path.join(path, self.files_cache_name()), write=True) as fd: + pass # empty file + return fd.integrity_data + + def _read_files_cache(self): + if "d" in self.cache_mode: # d(isabled) + return + + self.files = {} + logger.debug("Reading files cache ...") + files_cache_logger.debug("FILES-CACHE-LOAD: starting...") + msg = None + try: + with IntegrityCheckedFile( + path=os.path.join(self.path, self.files_cache_name()), + write=False, + integrity_data=self.cache_config.integrity.get(self.files_cache_name()), + ) as fd: + u = msgpack.Unpacker(use_list=True) + while True: + data = fd.read(64 * 1024) + if not data: + break + u.feed(data) + try: + for path_hash, item in u: + entry = FileCacheEntry(*item) + # in the end, this takes about 240 Bytes per file + self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1)) + except (TypeError, ValueError) as exc: + msg = "The files cache seems invalid. [%s]" % str(exc) + break + except OSError as exc: + msg = "The files cache can't be read. [%s]" % str(exc) + except FileIntegrityError as fie: + msg = "The files cache is corrupted. [%s]" % str(fie) + if msg is not None: + logger.warning(msg) + logger.warning("Continuing without files cache - expect lower performance.") + self.files = {} + files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files)) + + def _write_files_cache(self): + if self._newest_cmtime is None: + # was never set because no files were modified/added + self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262 + ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20)) + files_cache_logger.debug("FILES-CACHE-SAVE: starting...") + with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd: + entry_count = 0 + for path_hash, item in self.files.items(): + # Only keep files seen in this backup that are older than newest cmtime seen in this backup - + # this is to avoid issues with filesystem snapshots and cmtime granularity. + # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet. + entry = FileCacheEntry(*msgpack.unpackb(item)) + if ( + entry.age == 0 + and timestamp_to_int(entry.cmtime) < self._newest_cmtime + or entry.age > 0 + and entry.age < ttl + ): + msgpack.pack((path_hash, entry), fd) + entry_count += 1 + files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl) + files_cache_logger.debug( + "FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime + ) + files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count) + return fd.integrity_data + + def file_known_and_unchanged(self, hashed_path, path_hash, st): + """ + Check if we know the file that has this path_hash (know == it is in our files cache) and + whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode). + + :param hashed_path: the file's path as we gave it to hash(hashed_path) + :param path_hash: hash(hashed_path), to save some memory in the files cache + :param st: the file's stat() result + :return: known, chunks (known is True if we have infos about this file in the cache, + chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None). + """ + if not stat.S_ISREG(st.st_mode): + return False, None + cache_mode = self.cache_mode + if "d" in cache_mode: # d(isabled) + files_cache_logger.debug("UNKNOWN: files cache disabled") + return False, None + # note: r(echunk) does not need the files cache in this method, but the files cache will + # be updated and saved to disk to memorize the files. To preserve previous generations in + # the cache, this means that it also needs to get loaded from disk first. + if "r" in cache_mode: # r(echunk) + files_cache_logger.debug("UNKNOWN: rechunking enforced") + return False, None + entry = self.files.get(path_hash) + if not entry: + files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path) + return False, None + # we know the file! + entry = FileCacheEntry(*msgpack.unpackb(entry)) + if "s" in cache_mode and entry.size != st.st_size: + files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path) + return True, None + if "i" in cache_mode and entry.inode != st.st_ino: + files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path) + return True, None + if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns: + files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path) + return True, None + elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns: + files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path) + return True, None + # we ignored the inode number in the comparison above or it is still same. + # if it is still the same, replacing it in the tuple doesn't change it. + # if we ignored it, a reason for doing that is that files were moved to a new + # disk / new fs (so a one-time change of inode number is expected) and we wanted + # to avoid everything getting chunked again. to be able to re-enable the inode + # number comparison in a future backup run (and avoid chunking everything + # again at that time), we need to update the inode number in the cache with what + # we see in the filesystem. + self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0)) + chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple + return True, chunks + + def memorize_file(self, hashed_path, path_hash, st, chunks): + if not stat.S_ISREG(st.st_mode): + return + cache_mode = self.cache_mode + # note: r(echunk) modes will update the files cache, d(isabled) mode won't + if "d" in cache_mode: + files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled") + return + if "c" in cache_mode: + cmtime_type = "ctime" + cmtime_ns = safe_ns(st.st_ctime_ns) + elif "m" in cache_mode: + cmtime_type = "mtime" + cmtime_ns = safe_ns(st.st_mtime_ns) + else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError + cmtime_type = "ctime" + cmtime_ns = safe_ns(st.st_ctime_ns) + entry = FileCacheEntry( + age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks + ) + self.files[path_hash] = msgpack.packb(entry) + self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns) + files_cache_logger.debug( + "FILES-CACHE-UPDATE: put %r [has %s] <- %r", + entry._replace(chunks="[%d entries]" % len(entry.chunks)), + cmtime_type, + hashed_path, + ) + + +class LocalCache(CacheStatsMixin, FilesCacheMixin): """ Persistent, local (client-side) cache. """ @@ -516,13 +683,13 @@ class LocalCache(CacheStatsMixin): :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison """ CacheStatsMixin.__init__(self, iec=iec) + FilesCacheMixin.__init__(self, cache_mode) assert isinstance(manifest, Manifest) self.manifest = manifest self.repository = manifest.repository self.key = manifest.key self.repo_objs = manifest.repo_objs self.progress = progress - self.cache_mode = cache_mode self.timestamp = None self.txn_active = False self.do_cache = os.environ.get("BORG_USE_CHUNKS_ARCHIVE", "yes").lower() in ["yes", "1", "true"] @@ -571,8 +738,7 @@ class LocalCache(CacheStatsMixin): self.cache_config.create() ChunkIndex().write(os.path.join(self.path, "chunks")) os.makedirs(os.path.join(self.path, "chunks.archive.d")) - with SaveFile(os.path.join(self.path, files_cache_name()), binary=True): - pass # empty file + self._create_empty_files_cache(self.path) def _do_open(self): self.cache_config.load() @@ -582,10 +748,7 @@ class LocalCache(CacheStatsMixin): integrity_data=self.cache_config.integrity.get("chunks"), ) as fd: self.chunks = ChunkIndex.read(fd) - if "d" in self.cache_mode: # d(isabled) - self.files = None - else: - self._read_files() + self._read_files_cache() def open(self): if not os.path.isdir(self.path): @@ -598,42 +761,6 @@ class LocalCache(CacheStatsMixin): self.cache_config.close() self.cache_config = None - def _read_files(self): - self.files = {} - self._newest_cmtime = None - logger.debug("Reading files cache ...") - files_cache_logger.debug("FILES-CACHE-LOAD: starting...") - msg = None - try: - with IntegrityCheckedFile( - path=os.path.join(self.path, files_cache_name()), - write=False, - integrity_data=self.cache_config.integrity.get(files_cache_name()), - ) as fd: - u = msgpack.Unpacker(use_list=True) - while True: - data = fd.read(64 * 1024) - if not data: - break - u.feed(data) - try: - for path_hash, item in u: - entry = FileCacheEntry(*item) - # in the end, this takes about 240 Bytes per file - self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1)) - except (TypeError, ValueError) as exc: - msg = "The files cache seems invalid. [%s]" % str(exc) - break - except OSError as exc: - msg = "The files cache can't be read. [%s]" % str(exc) - except FileIntegrityError as fie: - msg = "The files cache is corrupted. [%s]" % str(fie) - if msg is not None: - logger.warning(msg) - logger.warning("Continuing without files cache - expect lower performance.") - self.files = {} - files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files)) - def begin_txn(self): # Initialize transaction snapshot pi = ProgressIndicatorMessage(msgid="cache.begin_transaction") @@ -645,10 +772,9 @@ class LocalCache(CacheStatsMixin): shutil.copy(os.path.join(self.path, "chunks"), txn_dir) pi.output("Initializing cache transaction: Reading files") try: - shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir) + shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir) except FileNotFoundError: - with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True): - pass # empty file + self._create_empty_files_cache(txn_dir) os.replace(txn_dir, os.path.join(self.path, "txn.active")) self.txn_active = True pi.finish() @@ -660,33 +786,9 @@ class LocalCache(CacheStatsMixin): self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.commit") if self.files is not None: - if self._newest_cmtime is None: - # was never set because no files were modified/added - self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262 - ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20)) pi.output("Saving files cache") - files_cache_logger.debug("FILES-CACHE-SAVE: starting...") - with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd: - entry_count = 0 - for path_hash, item in self.files.items(): - # Only keep files seen in this backup that are older than newest cmtime seen in this backup - - # this is to avoid issues with filesystem snapshots and cmtime granularity. - # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet. - entry = FileCacheEntry(*msgpack.unpackb(item)) - if ( - entry.age == 0 - and timestamp_to_int(entry.cmtime) < self._newest_cmtime - or entry.age > 0 - and entry.age < ttl - ): - msgpack.pack((path_hash, entry), fd) - entry_count += 1 - files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl) - files_cache_logger.debug( - "FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime - ) - files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count) - self.cache_config.integrity[files_cache_name()] = fd.integrity_data + integrity_data = self._write_files_cache() + self.cache_config.integrity[self.files_cache_name()] = integrity_data pi.output("Saving chunks cache") with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd: self.chunks.write(fd) @@ -708,7 +810,7 @@ class LocalCache(CacheStatsMixin): if os.path.exists(txn_dir): shutil.copy(os.path.join(txn_dir, "config"), self.path) shutil.copy(os.path.join(txn_dir, "chunks"), self.path) - shutil.copy(os.path.join(txn_dir, discover_files_cache_name(txn_dir)), self.path) + shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path) txn_tmp = os.path.join(self.path, "txn.tmp") os.replace(txn_dir, txn_tmp) if os.path.exists(txn_tmp): @@ -939,9 +1041,8 @@ class LocalCache(CacheStatsMixin): with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd: self.chunks.write(fd) self.cache_config.integrity["chunks"] = fd.integrity_data - with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd: - pass # empty file - self.cache_config.integrity[files_cache_name()] = fd.integrity_data + integrity_data = self._create_empty_files_cache(self.path) + self.cache_config.integrity[self.files_cache_name()] = integrity_data self.cache_config.manifest_id = "" self.cache_config._config.set("cache", "manifest", "") if not self.cache_config._config.has_section("integrity"): @@ -1033,88 +1134,6 @@ class LocalCache(CacheStatsMixin): else: stats.update(-size, False) - def file_known_and_unchanged(self, hashed_path, path_hash, st): - """ - Check if we know the file that has this path_hash (know == it is in our files cache) and - whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode). - - :param hashed_path: the file's path as we gave it to hash(hashed_path) - :param path_hash: hash(hashed_path), to save some memory in the files cache - :param st: the file's stat() result - :return: known, chunks (known is True if we have infos about this file in the cache, - chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None). - """ - if not stat.S_ISREG(st.st_mode): - return False, None - cache_mode = self.cache_mode - if "d" in cache_mode: # d(isabled) - files_cache_logger.debug("UNKNOWN: files cache disabled") - return False, None - # note: r(echunk) does not need the files cache in this method, but the files cache will - # be updated and saved to disk to memorize the files. To preserve previous generations in - # the cache, this means that it also needs to get loaded from disk first. - if "r" in cache_mode: # r(echunk) - files_cache_logger.debug("UNKNOWN: rechunking enforced") - return False, None - entry = self.files.get(path_hash) - if not entry: - files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path) - return False, None - # we know the file! - entry = FileCacheEntry(*msgpack.unpackb(entry)) - if "s" in cache_mode and entry.size != st.st_size: - files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path) - return True, None - if "i" in cache_mode and entry.inode != st.st_ino: - files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path) - return True, None - if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns: - files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path) - return True, None - elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns: - files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path) - return True, None - # we ignored the inode number in the comparison above or it is still same. - # if it is still the same, replacing it in the tuple doesn't change it. - # if we ignored it, a reason for doing that is that files were moved to a new - # disk / new fs (so a one-time change of inode number is expected) and we wanted - # to avoid everything getting chunked again. to be able to re-enable the inode - # number comparison in a future backup run (and avoid chunking everything - # again at that time), we need to update the inode number in the cache with what - # we see in the filesystem. - self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0)) - chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple - return True, chunks - - def memorize_file(self, hashed_path, path_hash, st, chunks): - if not stat.S_ISREG(st.st_mode): - return - cache_mode = self.cache_mode - # note: r(echunk) modes will update the files cache, d(isabled) mode won't - if "d" in cache_mode: - files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled") - return - if "c" in cache_mode: - cmtime_type = "ctime" - cmtime_ns = safe_ns(st.st_ctime_ns) - elif "m" in cache_mode: - cmtime_type = "mtime" - cmtime_ns = safe_ns(st.st_mtime_ns) - else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError - cmtime_type = "ctime" - cmtime_ns = safe_ns(st.st_ctime_ns) - entry = FileCacheEntry( - age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks - ) - self.files[path_hash] = msgpack.packb(entry) - self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns) - files_cache_logger.debug( - "FILES-CACHE-UPDATE: put %r [has %s] <- %r", - entry._replace(chunks="[%d entries]" % len(entry.chunks)), - cmtime_type, - hashed_path, - ) - class AdHocCache(CacheStatsMixin): """ From 876c08f654de57276846db56a1f95929f73a0c01 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 23 Sep 2023 13:41:46 +0200 Subject: [PATCH 06/26] tolerate missing chunks with delete --force if a chunk is missing in repo, it will also be missing in a ad-hoc built chunks index. --- src/borg/archive.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 891a16222..8da51b22d 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1056,8 +1056,11 @@ Duration: {0.duration} try: self.cache.chunk_decref(id, size, stats, wait=False) except KeyError: - cid = bin_to_hex(id) - raise ChunksIndexError(cid) + nonlocal error + if forced == 0: + cid = bin_to_hex(id) + raise ChunksIndexError(cid) + error = True else: fetch_async_response(wait=False) From cb8d6f8e4a34e2986b03da23ea389d2cf8b56765 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 28 Sep 2023 00:16:15 +0200 Subject: [PATCH 07/26] AdHocCache has no cache persistence thus: - no cache.path - skip on-disk cache corruption tests for AdHocCache --- src/borg/archiver/rinfo_cmd.py | 13 +++---------- src/borg/testsuite/archiver/corruption.py | 14 +++++++++++++- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/borg/archiver/rinfo_cmd.py b/src/borg/archiver/rinfo_cmd.py index 221d05fc0..bba038117 100644 --- a/src/borg/archiver/rinfo_cmd.py +++ b/src/borg/archiver/rinfo_cmd.py @@ -59,16 +59,9 @@ class RInfoMixIn: output += f" out of {format_file_size(storage_quota, iec=args.iec)}" output += "\n" - output += ( - textwrap.dedent( - """ - Cache: {cache.path} - Security dir: {security_dir} - """ - ) - .strip() - .format(**info) - ) + if hasattr(info["cache"], "path"): + output += "Cache: {cache.path}\n".format(**info) + output += "Security dir: {security_dir}\n".format(**info) print(output) print(str(cache)) diff --git a/src/borg/testsuite/archiver/corruption.py b/src/borg/testsuite/archiver/corruption.py index cb8b55417..0df305e00 100644 --- a/src/borg/testsuite/archiver/corruption.py +++ b/src/borg/testsuite/archiver/corruption.py @@ -34,7 +34,7 @@ def test_check_corrupted_repository(archiver): def corrupt_archiver(archiver): create_test_files(archiver.input_path) cmd(archiver, "rcreate", RK_ENCRYPTION) - archiver.cache_path = json.loads(cmd(archiver, "rinfo", "--json"))["cache"]["path"] + archiver.cache_path = json.loads(cmd(archiver, "rinfo", "--json"))["cache"].get("path") def corrupt(file, amount=1): @@ -48,6 +48,9 @@ def corrupt(file, amount=1): @pytest.mark.allow_cache_wipe def test_cache_chunks(archiver): corrupt_archiver(archiver) + if archiver.cache_path is None: + pytest.skip("no cache path for this kind of Cache implementation") + create_src_archive(archiver, "test") chunks_path = os.path.join(archiver.cache_path, "chunks") chunks_before_corruption = set(ChunkIndex(path=chunks_path).iteritems()) @@ -74,6 +77,9 @@ def test_cache_chunks(archiver): def test_cache_files(archiver): corrupt_archiver(archiver) + if archiver.cache_path is None: + pytest.skip("no cache path for this kind of Cache implementation") + cmd(archiver, "create", "test", "input") corrupt(os.path.join(archiver.cache_path, "files")) out = cmd(archiver, "create", "test1", "input") @@ -83,6 +89,9 @@ def test_cache_files(archiver): def test_chunks_archive(archiver): corrupt_archiver(archiver) + if archiver.cache_path is None: + pytest.skip("no cache path for this kind of Cache implementation") + cmd(archiver, "create", "test1", "input") # Find ID of test1, so we can corrupt it later :) target_id = cmd(archiver, "rlist", "--format={id}{NL}").strip() @@ -114,6 +123,9 @@ def test_chunks_archive(archiver): def test_old_version_interfered(archiver): corrupt_archiver(archiver) + if archiver.cache_path is None: + pytest.skip("no cache path for this kind of Cache implementation") + # Modify the main manifest ID without touching the manifest ID in the integrity section. # This happens if a version without integrity checking modifies the cache. config_path = os.path.join(archiver.cache_path, "config") From e2a1999c594c873def8525704e8a1ed2b25d9a2e Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 22 Sep 2023 23:40:42 +0200 Subject: [PATCH 08/26] implement NewCache Also: - move common code to ChunksMixin - always use ._txn_active (not .txn_active) Some tests are still failing. --- src/borg/cache.py | 468 ++++++++++++++++++++------------ src/borg/helpers/parseformat.py | 4 +- 2 files changed, 298 insertions(+), 174 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 9944dac71..bf6d2bdcb 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -420,6 +420,17 @@ class Cache: cache_mode=cache_mode, ) + def newcache(): + return NewCache( + manifest=manifest, + path=path, + warn_if_unencrypted=warn_if_unencrypted, + progress=progress, + iec=iec, + lock_wait=lock_wait, + cache_mode=cache_mode, + ) + def adhoc(): return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec) @@ -660,7 +671,127 @@ class FilesCacheMixin: ) -class LocalCache(CacheStatsMixin, FilesCacheMixin): +class ChunksMixin: + """ + Chunks index related code for misc. Cache implementations. + """ + + def chunk_incref(self, id, size, stats): + assert isinstance(size, int) and size > 0 + if not self._txn_active: + self.begin_txn() + count, _size = self.chunks.incref(id) + stats.update(size, False) + return ChunkListEntry(id, size) + + def chunk_decref(self, id, size, stats, wait=True): + assert isinstance(size, int) and size > 0 + if not self._txn_active: + self.begin_txn() + count, _size = self.chunks.decref(id) + if count == 0: + del self.chunks[id] + self.repository.delete(id, wait=wait) + stats.update(-size, True) + else: + stats.update(-size, False) + + def seen_chunk(self, id, size=None): + if not self._txn_active: + self.begin_txn() + entry = self.chunks.get(id, ChunkIndexEntry(0, None)) + if entry.refcount and size is not None: + assert isinstance(entry.size, int) + if entry.size: + # LocalCache: has existing size information and uses *size* to make an effort at detecting collisions. + if size != entry.size: + # we already have a chunk with that id, but different size. + # this is either a hash collision (unlikely) or corruption or a bug. + raise Exception( + "chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, entry.size, size) + ) + else: + # NewCache / AdHocCache: + # Here *size* is used to update the chunk's size information, which will be zero for existing chunks. + self.chunks[id] = entry._replace(size=size) + return entry.refcount + + def add_chunk( + self, + id, + meta, + data, + *, + stats, + wait=True, + compress=True, + size=None, + ctype=None, + clevel=None, + ro_type=ROBJ_FILE_STREAM, + ): + assert ro_type is not None + if not self._txn_active: + self.begin_txn() + if size is None: + if compress: + size = len(data) # data is still uncompressed + else: + raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") + refcount = self.seen_chunk(id, size) + if refcount: + return self.chunk_incref(id, size, stats) + cdata = self.repo_objs.format( + id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type + ) + self.repository.put(id, cdata, wait=wait) + self.chunks.add(id, 1, size) + stats.update(size, not refcount) + return ChunkListEntry(id, size) + + def _load_chunks_from_repo(self): + # Explicitly set the initial usable hash table capacity to avoid performance issues + # due to hash table "resonance". + # Since we're creating an archive, add 10 % from the start. + num_chunks = len(self.repository) + chunks = ChunkIndex(usable=num_chunks * 1.1) + pi = ProgressIndicatorPercent( + total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks" + ) + t0 = perf_counter() + num_requests = 0 + marker = None + while True: + result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) + num_requests += 1 + if not result: + break + pi.show(increase=len(result)) + marker = result[-1] + # All chunks from the repository have a refcount of MAX_VALUE, which is sticky, + # therefore we can't/won't delete them. Chunks we added ourselves in this transaction + # (e.g. checkpoint archives) are tracked correctly. + init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) + for id_ in result: + chunks[id_] = init_entry + assert len(chunks) == num_chunks + # LocalCache does not contain the manifest, either. + del chunks[self.manifest.MANIFEST_ID] + duration = perf_counter() - t0 or 0.01 + pi.finish() + logger.debug( + "Cache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s", + num_chunks, + duration, + num_requests, + format_file_size(num_chunks * 34 / duration), + ) + # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes. + # Protocol overhead is neglected in this calculation. + return chunks + + +class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): """ Persistent, local (client-side) cache. """ @@ -691,7 +822,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin): self.repo_objs = manifest.repo_objs self.progress = progress self.timestamp = None - self.txn_active = False + self._txn_active = False self.do_cache = os.environ.get("BORG_USE_CHUNKS_ARCHIVE", "yes").lower() in ["yes", "1", "true"] self.path = cache_dir(self.repository, path) @@ -776,12 +907,12 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin): except FileNotFoundError: self._create_empty_files_cache(txn_dir) os.replace(txn_dir, os.path.join(self.path, "txn.active")) - self.txn_active = True + self._txn_active = True pi.finish() def commit(self): """Commit transaction""" - if not self.txn_active: + if not self._txn_active: return self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.commit") @@ -797,7 +928,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin): self.cache_config.save(self.manifest, self.key) os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp")) shutil.rmtree(os.path.join(self.path, "txn.tmp")) - self.txn_active = False + self._txn_active = False pi.finish() def rollback(self): @@ -815,7 +946,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin): os.replace(txn_dir, txn_tmp) if os.path.exists(txn_tmp): shutil.rmtree(txn_tmp) - self.txn_active = False + self._txn_active = False self._do_open() def sync(self): @@ -1067,75 +1198,171 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin): self.cache_config.ignored_features.update(repo_features - my_features) self.cache_config.mandatory_features.update(repo_features & my_features) - def add_chunk( + +class NewCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): + """ + Like AdHocCache, but with a files cache. + """ + + def __init__( self, - id, - meta, - data, - *, - stats, - wait=True, - compress=True, - size=None, - ctype=None, - clevel=None, - ro_type=ROBJ_FILE_STREAM, + manifest, + path=None, + warn_if_unencrypted=True, + progress=False, + lock_wait=None, + cache_mode=FILES_CACHE_MODE_DISABLED, + iec=False, ): - assert ro_type is not None - if not self.txn_active: - self.begin_txn() - if size is None: - if compress: - size = len(data) # data is still uncompressed - else: - raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") - refcount = self.seen_chunk(id, size) - if refcount: - return self.chunk_incref(id, size, stats) - if size is None: - raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also") - cdata = self.repo_objs.format( - id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type - ) - self.repository.put(id, cdata, wait=wait) - self.chunks.add(id, 1, size) - stats.update(size, not refcount) - return ChunkListEntry(id, size) + """ + :param warn_if_unencrypted: print warning if accessing unknown unencrypted repository + :param lock_wait: timeout for lock acquisition (int [s] or None [wait forever]) + :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison + """ + CacheStatsMixin.__init__(self, iec=iec) + FilesCacheMixin.__init__(self, cache_mode) + assert isinstance(manifest, Manifest) + self.manifest = manifest + self.repository = manifest.repository + self.key = manifest.key + self.repo_objs = manifest.repo_objs + self.progress = progress + self.timestamp = None + self._txn_active = False - def seen_chunk(self, id, size=None): - refcount, stored_size = self.chunks.get(id, ChunkIndexEntry(0, None)) - if size is not None and stored_size is not None and size != stored_size: - # we already have a chunk with that id, but different size. - # this is either a hash collision (unlikely) or corruption or a bug. - raise Exception( - "chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, stored_size, size) - ) - return refcount + self.path = cache_dir(self.repository, path) + self.security_manager = SecurityManager(self.repository) + self.cache_config = CacheConfig(self.repository, self.path, lock_wait) - def chunk_incref(self, id, size, stats): - assert isinstance(size, int) and size > 0 - if not self.txn_active: - self.begin_txn() - count, _size = self.chunks.incref(id) - assert size == _size - stats.update(size, False) - return ChunkListEntry(id, size) + # Warn user before sending data to a never seen before unencrypted repository + if not os.path.exists(self.path): + self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key) + self.create() - def chunk_decref(self, id, size, stats, wait=True): - assert isinstance(size, int) and size > 0 - if not self.txn_active: - self.begin_txn() - count, _size = self.chunks.decref(id) - assert size == 1 or size == _size # don't check if caller gave fake size 1 - if count == 0: - del self.chunks[id] - self.repository.delete(id, wait=wait) - stats.update(-size, True) - else: - stats.update(-size, False) + self.open() + try: + self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config) + + if not self.check_cache_compatibility(): + self.wipe_cache() + + self.update_compatibility() + except: # noqa + self.close() + raise + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def create(self): + """Create a new empty cache at `self.path`""" + os.makedirs(self.path) + with open(os.path.join(self.path, "README"), "w") as fd: + fd.write(CACHE_README) + self.cache_config.create() + self._create_empty_files_cache(self.path) + + def _do_open(self): + self.cache_config.load() + self.chunks = self._load_chunks_from_repo() + self._read_files_cache() + + def open(self): + if not os.path.isdir(self.path): + raise Exception("%s Does not look like a Borg cache" % self.path) + self.cache_config.open() + self.rollback() + + def close(self): + if self.cache_config is not None: + self.cache_config.close() + self.cache_config = None + + def begin_txn(self): + # Initialize transaction snapshot + pi = ProgressIndicatorMessage(msgid="cache.begin_transaction") + txn_dir = os.path.join(self.path, "txn.tmp") + os.mkdir(txn_dir) + pi.output("Initializing cache transaction: Reading config") + shutil.copy(os.path.join(self.path, "config"), txn_dir) + pi.output("Initializing cache transaction: Reading files") + try: + shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir) + except FileNotFoundError: + self._create_empty_files_cache(txn_dir) + os.replace(txn_dir, os.path.join(self.path, "txn.active")) + pi.finish() + self._txn_active = True + + def commit(self): + if not self._txn_active: + return + self.security_manager.save(self.manifest, self.key) + pi = ProgressIndicatorMessage(msgid="cache.commit") + if self.files is not None: + pi.output("Saving files cache") + integrity_data = self._write_files_cache() + self.cache_config.integrity[self.files_cache_name()] = integrity_data + pi.output("Saving cache config") + self.cache_config.save(self.manifest, self.key) + os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp")) + shutil.rmtree(os.path.join(self.path, "txn.tmp")) + self._txn_active = False + pi.finish() + + def rollback(self): + # Remove partial transaction + if os.path.exists(os.path.join(self.path, "txn.tmp")): + shutil.rmtree(os.path.join(self.path, "txn.tmp")) + # Roll back active transaction + txn_dir = os.path.join(self.path, "txn.active") + if os.path.exists(txn_dir): + shutil.copy(os.path.join(txn_dir, "config"), self.path) + shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path) + txn_tmp = os.path.join(self.path, "txn.tmp") + os.replace(txn_dir, txn_tmp) + if os.path.exists(txn_tmp): + shutil.rmtree(txn_tmp) + self._txn_active = False + self._do_open() + + def check_cache_compatibility(self): + my_features = Manifest.SUPPORTED_REPO_FEATURES + if self.cache_config.ignored_features & my_features: + # The cache might not contain references of chunks that need a feature that is mandatory for some operation + # and which this version supports. To avoid corruption while executing that operation force rebuild. + return False + if not self.cache_config.mandatory_features <= my_features: + # The cache was build with consideration to at least one feature that this version does not understand. + # This client might misinterpret the cache. Thus force a rebuild. + return False + return True + + def wipe_cache(self): + logger.warning("Discarding incompatible cache and forcing a cache rebuild") + self.chunks = ChunkIndex() + self._create_empty_files_cache(self.path) + self.cache_config.manifest_id = "" + self.cache_config._config.set("cache", "manifest", "") + + self.cache_config.ignored_features = set() + self.cache_config.mandatory_features = set() + + def update_compatibility(self): + operation_to_features_map = self.manifest.get_all_mandatory_features() + my_features = Manifest.SUPPORTED_REPO_FEATURES + repo_features = set() + for operation, features in operation_to_features_map.items(): + repo_features.update(features) + + self.cache_config.ignored_features.update(repo_features - my_features) + self.cache_config.mandatory_features.update(repo_features & my_features) -class AdHocCache(CacheStatsMixin): +class AdHocCache(CacheStatsMixin, ChunksMixin): """ Ad-hoc, non-persistent cache. @@ -1183,72 +1410,6 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" def memorize_file(self, hashed_path, path_hash, st, chunks): pass - def add_chunk( - self, - id, - meta, - data, - *, - stats, - wait=True, - compress=True, - size=None, - ctype=None, - clevel=None, - ro_type=ROBJ_FILE_STREAM, - ): - assert ro_type is not None - if not self._txn_active: - self.begin_txn() - if size is None: - if compress: - size = len(data) # data is still uncompressed - else: - raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") - refcount = self.seen_chunk(id, size) - if refcount: - return self.chunk_incref(id, size, stats) - cdata = self.repo_objs.format( - id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type - ) - self.repository.put(id, cdata, wait=wait) - self.chunks.add(id, 1, size) - stats.update(size, not refcount) - return ChunkListEntry(id, size) - - def seen_chunk(self, id, size=None): - if not self._txn_active: - self.begin_txn() - entry = self.chunks.get(id, ChunkIndexEntry(0, None)) - if entry.refcount and size and not entry.size: - # The LocalCache has existing size information and uses *size* to make an effort at detecting collisions. - # This is of course not possible for the AdHocCache. - # Here *size* is used to update the chunk's size information, which will be zero for existing chunks. - self.chunks[id] = entry._replace(size=size) - return entry.refcount - - def chunk_incref(self, id, size, stats): - assert isinstance(size, int) and size > 0 - if not self._txn_active: - self.begin_txn() - count, _size = self.chunks.incref(id) - assert size == _size - stats.update(size, False) - return ChunkListEntry(id, size) - - def chunk_decref(self, id, size, stats, wait=True): - assert isinstance(size, int) and size > 0 - if not self._txn_active: - self.begin_txn() - count, _size = self.chunks.decref(id) - assert size == 1 or size == _size # don't check if caller gave fake size 1 - if count == 0: - del self.chunks[id] - self.repository.delete(id, wait=wait) - stats.update(-size, True) - else: - stats.update(-size, False) - def commit(self): if not self._txn_active: return @@ -1261,41 +1422,4 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" def begin_txn(self): self._txn_active = True - # Explicitly set the initial usable hash table capacity to avoid performance issues - # due to hash table "resonance". - # Since we're creating an archive, add 10 % from the start. - num_chunks = len(self.repository) - self.chunks = ChunkIndex(usable=num_chunks * 1.1) - pi = ProgressIndicatorPercent( - total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks" - ) - t0 = perf_counter() - num_requests = 0 - marker = None - while True: - result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - num_requests += 1 - if not result: - break - pi.show(increase=len(result)) - marker = result[-1] - # All chunks from the repository have a refcount of MAX_VALUE, which is sticky, - # therefore we can't/won't delete them. Chunks we added ourselves in this transaction - # (e.g. checkpoint archives) are tracked correctly. - init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) - for id_ in result: - self.chunks[id_] = init_entry - assert len(self.chunks) == num_chunks - # LocalCache does not contain the manifest, either. - del self.chunks[self.manifest.MANIFEST_ID] - duration = perf_counter() - t0 or 0.01 - pi.finish() - logger.debug( - "AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s", - num_chunks, - duration, - num_requests, - format_file_size(num_chunks * 34 / duration), - ) - # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes. - # Protocol overhead is neglected in this calculation. + self.chunks = self._load_chunks_from_repo() diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index c82769730..7e906547a 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -1184,13 +1184,13 @@ class BorgJsonEncoder(json.JSONEncoder): from ..repository import Repository from ..remote import RemoteRepository from ..archive import Archive - from ..cache import LocalCache, AdHocCache + from ..cache import LocalCache, AdHocCache, NewCache if isinstance(o, Repository) or isinstance(o, RemoteRepository): return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()} if isinstance(o, Archive): return o.info() - if isinstance(o, LocalCache): + if isinstance(o, (LocalCache, NewCache)): return {"path": o.path, "stats": o.stats()} if isinstance(o, AdHocCache): return {"stats": o.stats()} From 4cd9bc8a6be3ed165a95b6fe0ae6eb7c480d71c8 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 30 Sep 2023 23:32:03 +0200 Subject: [PATCH 09/26] check: do not consider orphan chunks a problem if we use AdHocCache or NewCache, we do not have precise refcounting. thus, we do not delete repo objects as their refcount does not go to zero. check --repair will just remove the orphans. --- src/borg/archive.py | 4 ++-- src/borg/testsuite/archiver/check_cmd.py | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 8da51b22d..13cac5fc5 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -2331,10 +2331,10 @@ class ArchiveChecker: unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0} orphaned = unused - self.possibly_superseded if orphaned: - logger.error(f"{len(orphaned)} orphaned objects found!") + logger.info(f"{len(orphaned)} orphaned (unused) objects found.") for chunk_id in orphaned: logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.") - self.error_found = True + # To support working with AdHocCache or NewCache, we do not set self.error_found = True. if self.repair and unused: logger.info( "Deleting %d orphaned and %d superseded objects..." % (len(orphaned), len(self.possibly_superseded)) diff --git a/src/borg/testsuite/archiver/check_cmd.py b/src/borg/testsuite/archiver/check_cmd.py index cf4c6baf2..16b768073 100644 --- a/src/borg/testsuite/archiver/check_cmd.py +++ b/src/borg/testsuite/archiver/check_cmd.py @@ -338,10 +338,11 @@ def test_extra_chunks(archivers, request): with Repository(archiver.repository_location, exclusive=True) as repository: repository.put(b"01234567890123456789012345678901", b"xxxx") repository.commit(compact=False) - cmd(archiver, "check", exit_code=1) - cmd(archiver, "check", exit_code=1) + output = cmd(archiver, "check", "-v", exit_code=0) # orphans are not considered warnings anymore + assert "1 orphaned (unused) objects found." in output cmd(archiver, "check", "--repair", exit_code=0) - cmd(archiver, "check", exit_code=0) + output = cmd(archiver, "check", "-v", exit_code=0) + assert "orphaned (unused) objects found." not in output cmd(archiver, "extract", "archive1", "--dry-run", exit_code=0) From 25a7a1443af414d7d36b2870bfc7354c9058c071 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 30 Sep 2023 23:37:52 +0200 Subject: [PATCH 10/26] skip tests requiring the chunks index (archive) Only LocalCache implements these. --- src/borg/testsuite/archiver/corruption.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/borg/testsuite/archiver/corruption.py b/src/borg/testsuite/archiver/corruption.py index 0df305e00..965d2d1d2 100644 --- a/src/borg/testsuite/archiver/corruption.py +++ b/src/borg/testsuite/archiver/corruption.py @@ -54,6 +54,11 @@ def test_cache_chunks(archiver): create_src_archive(archiver, "test") chunks_path = os.path.join(archiver.cache_path, "chunks") chunks_before_corruption = set(ChunkIndex(path=chunks_path).iteritems()) + + chunks_index = os.path.join(archiver.cache_path, "chunks") + if not os.path.exists(chunks_index): + pytest.skip("Only works with LocalCache.") + corrupt(chunks_path) assert not archiver.FORK_DEFAULT # test does not support forking @@ -102,6 +107,8 @@ def test_chunks_archive(archiver): cmd(archiver, "rinfo", "--json") chunks_archive = os.path.join(archiver.cache_path, "chunks.archive.d") + if not os.path.exists(chunks_archive): + pytest.skip("Only LocalCache has a per-archive chunks index cache.") assert len(os.listdir(chunks_archive)) == 4 # two archives, one chunks cache and one .integrity file each corrupt(os.path.join(chunks_archive, target_id + ".compact")) From 5136fa8fe3ab974ba9a128ce7c3d7f729ee43e7c Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 30 Sep 2023 23:52:02 +0200 Subject: [PATCH 11/26] fix test part that only works with LocalCache --- src/borg/testsuite/archiver/checks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index 51b3fe60e..1de88d4e1 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -277,6 +277,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request): repository._location = Location(archiver.repository_location) manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) with Cache(repository, manifest) as cache: + is_localcache = isinstance(cache, LocalCache) cache.begin_txn() cache.cache_config.mandatory_features = {"unknown-feature"} cache.commit() @@ -295,7 +296,8 @@ def test_unknown_mandatory_feature_in_cache(archivers, request): with patch.object(LocalCache, "wipe_cache", wipe_wrapper): cmd(archiver, "create", "test", "input") - assert called + if is_localcache: + assert called with Repository(archiver.repository_path, exclusive=True) as repository: if remote_repo: From 86adc04da4f2bbc960cc8f8f8a622b2248de264a Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sun, 1 Oct 2023 00:34:35 +0200 Subject: [PATCH 12/26] fix test_debug_refcount_obj for misc. refcounts --- src/borg/testsuite/archiver/debug_cmds.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/borg/testsuite/archiver/debug_cmds.py b/src/borg/testsuite/archiver/debug_cmds.py index 4518a3ccf..0dd4ea139 100644 --- a/src/borg/testsuite/archiver/debug_cmds.py +++ b/src/borg/testsuite/archiver/debug_cmds.py @@ -168,7 +168,12 @@ def test_debug_refcount_obj(archivers, request): create_json = json.loads(cmd(archiver, "create", "--json", "test", "input")) archive_id = create_json["archive"]["id"] output = cmd(archiver, "debug", "refcount-obj", archive_id).strip() - assert output == f"object {archive_id} has 1 referrers [info from chunks cache]." + # LocalCache does precise refcounting, so we'll get 1 reference for the archive. + # AdHocCache or NewCache doesn't, we'll get ChunkIndex.MAX_VALUE as refcount. + assert ( + output == f"object {archive_id} has 1 referrers [info from chunks cache]." + or output == f"object {archive_id} has 4294966271 referrers [info from chunks cache]." + ) # Invalid IDs do not abort or return an error output = cmd(archiver, "debug", "refcount-obj", "124", "xyza").strip() From c73f6d4ff31f3f21a06210b12e79b182eb9b35fe Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 31 May 2024 19:03:46 +0200 Subject: [PATCH 13/26] fix test_delete NewCache does not do precise refcounting, thus chunks won't be deleted from the repo at "borg delete" time. "borg check --repair" would remove such chunks IF they are orphans. --- src/borg/testsuite/archiver/delete_cmd.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/borg/testsuite/archiver/delete_cmd.py b/src/borg/testsuite/archiver/delete_cmd.py index 30727cac2..25c35e931 100644 --- a/src/borg/testsuite/archiver/delete_cmd.py +++ b/src/borg/testsuite/archiver/delete_cmd.py @@ -25,9 +25,8 @@ def test_delete(archivers, request): cmd(archiver, "extract", "test.2", "--dry-run") output = cmd(archiver, "delete", "-a", "test.2", "--stats") assert "Original size: -" in output # negative size == deleted data - # Make sure all data except the manifest has been deleted - with Repository(archiver.repository_path) as repository: - assert len(repository) == 1 + output = cmd(archiver, "rlist") + assert output == "" # no archives left! def test_delete_multiple(archivers, request): From ab6049e269bbaede2ab6c76927a94ad3a0ff0451 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 31 May 2024 19:07:54 +0200 Subject: [PATCH 14/26] fix test_list_chunk_counts NewCache does not do precise refcounting, thus it does not know about unique chunks. For now, let's just test num_chunks, but not unique_chunks. --- src/borg/testsuite/archiver/list_cmd.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/borg/testsuite/archiver/list_cmd.py b/src/borg/testsuite/archiver/list_cmd.py index 6d90a41cb..33c9a483b 100644 --- a/src/borg/testsuite/archiver/list_cmd.py +++ b/src/borg/testsuite/archiver/list_cmd.py @@ -40,9 +40,9 @@ def test_list_chunk_counts(archivers, request): fd.write(b"baab" * 2000000) cmd(archiver, "rcreate", RK_ENCRYPTION) cmd(archiver, "create", "test", "input") - output = cmd(archiver, "list", "test", "--format", "{num_chunks} {unique_chunks} {path}{NL}") - assert "0 0 input/empty_file" in output - assert "2 2 input/two_chunks" in output + output = cmd(archiver, "list", "test", "--format", "{num_chunks} {path}{NL}") + assert "0 input/empty_file" in output + assert "2 input/two_chunks" in output def test_list_size(archivers, request): From e3a0c4f3758ae8bb2cf573a49408bded72362173 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 31 May 2024 20:04:00 +0200 Subject: [PATCH 15/26] fix check_cache and test_check_cache NewCache and AdHocCache do not have a persistent chunks index, so both check_cache and test_check_cache are pointless. --- src/borg/testsuite/archiver/__init__.py | 8 +++++++- src/borg/testsuite/archiver/checks.py | 3 +++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/borg/testsuite/archiver/__init__.py b/src/borg/testsuite/archiver/__init__.py index bb5a7e169..abfadf6ce 100644 --- a/src/borg/testsuite/archiver/__init__.py +++ b/src/borg/testsuite/archiver/__init__.py @@ -18,7 +18,7 @@ import pytest from ... import xattr, platform from ...archive import Archive from ...archiver import Archiver, PURE_PYTHON_MSGPACK_WARNING -from ...cache import Cache +from ...cache import Cache, LocalCache from ...constants import * # NOQA from ...helpers import Location, umount from ...helpers import EXIT_SUCCESS @@ -356,9 +356,15 @@ def check_cache(archiver): manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) with Cache(repository, manifest, sync=False) as cache: original_chunks = cache.chunks + # the LocalCache implementation has an on-disk chunks cache, + # but NewCache and AdHocCache don't have persistent chunks cache. + persistent = isinstance(cache, LocalCache) Cache.destroy(repository) with Cache(repository, manifest) as cache: correct_chunks = cache.chunks + if not persistent: + # there is no point in doing the checks + return assert original_chunks is not correct_chunks seen = set() for id, (refcount, size) in correct_chunks.iteritems(): diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index 1de88d4e1..60fc48f7e 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -317,6 +317,9 @@ def test_check_cache(archivers, request): cache.begin_txn() cache.chunks.incref(list(cache.chunks.iteritems())[0][0]) cache.commit() + persistent = isinstance(cache, LocalCache) + if not persistent: + pytest.skip("check_cache is pointless if we do not have a persistent chunks cache") with pytest.raises(AssertionError): check_cache(archiver) From fbfa7cf7bf0d1cc3dade6b8e0a67832b433e49ce Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 1 Jun 2024 14:27:42 +0200 Subject: [PATCH 16/26] fix test_recreate_rechunkify We can not use unique_chunks counter with NewCache, thus we use a simpler (and weaker) assertion. --- src/borg/testsuite/archiver/recreate_cmd.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/borg/testsuite/archiver/recreate_cmd.py b/src/borg/testsuite/archiver/recreate_cmd.py index 078ec1ed0..fc7f8d1b9 100644 --- a/src/borg/testsuite/archiver/recreate_cmd.py +++ b/src/borg/testsuite/archiver/recreate_cmd.py @@ -153,15 +153,18 @@ def test_recreate_rechunkify(archivers, request): cmd(archiver, "rcreate", RK_ENCRYPTION) cmd(archiver, "create", "test1", "input", "--chunker-params", "7,9,8,128") cmd(archiver, "create", "test2", "input", "--files-cache=disabled") - chunks_list = cmd(archiver, "list", "test1", "input/large_file", "--format", "{num_chunks} {unique_chunks}") - num_chunks, unique_chunks = map(int, chunks_list.split(" ")) - # test1 and test2 do not deduplicate - assert num_chunks == unique_chunks + num_chunks1 = int(cmd(archiver, "list", "test1", "input/large_file", "--format", "{num_chunks}")) + num_chunks2 = int(cmd(archiver, "list", "test2", "input/large_file", "--format", "{num_chunks}")) + # right now, the file is chunked differently + assert num_chunks1 != num_chunks2 cmd(archiver, "recreate", "--chunker-params", "default") check_cache(archiver) - # test1 and test2 do deduplicate after recreate - assert int(cmd(archiver, "list", "test1", "input/large_file", "--format={size}")) - assert not int(cmd(archiver, "list", "test1", "input/large_file", "--format", "{unique_chunks}")) + num_chunks1 = int(cmd(archiver, "list", "test1", "input/large_file", "--format", "{num_chunks}")) + num_chunks2 = int(cmd(archiver, "list", "test2", "input/large_file", "--format", "{num_chunks}")) + # now the files are chunked in the same way + # TODO: this is a rather weak test, it could be improved by comparing the IDs in the chunk lists, + # to make sure that everything is completely deduplicated now (both files have identical chunks). + assert num_chunks1 == num_chunks2 def test_recreate_fixed_rechunkify(archivers, request): From cf8c3a3ae78dc99c312712c21a68feeca05c7bb5 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 1 Jun 2024 16:53:59 +0200 Subject: [PATCH 17/26] keep previous repo location only in security dir removed some code borg had for backwards compatibility with old borg versions (that had previous_location only in the cache). now the repo location is only checked against the location file in the security dir, simplifying the code and also fixing a related test failure with NewCache. also improved test_repository_move to test for aborting in case the repo location changed unexpectedly. --- src/borg/archiver/config_cmd.py | 8 ++--- src/borg/cache.py | 42 ++------------------------- src/borg/testsuite/archiver/checks.py | 23 +++++++-------- 3 files changed, 15 insertions(+), 58 deletions(-) diff --git a/src/borg/archiver/config_cmd.py b/src/borg/archiver/config_cmd.py index 733a0a8c2..f92baf4f3 100644 --- a/src/borg/archiver/config_cmd.py +++ b/src/borg/archiver/config_cmd.py @@ -5,7 +5,6 @@ from ._common import with_repository from ..cache import Cache, assert_secure from ..constants import * # NOQA from ..helpers import Error, CommandError -from ..helpers import Location from ..helpers import parse_file_size, hex_to_bin from ..manifest import Manifest @@ -52,11 +51,8 @@ class ConfigMixIn: def cache_validate(section, name, value=None, check_value=True): if section not in ["cache"]: raise ValueError("Invalid section") - if name in ["previous_location"]: - if check_value: - Location(value) - else: - raise ValueError("Invalid name") + # currently, we do not support setting anything in the cache via borg config. + raise ValueError("Invalid name") def list_config(config): default_values = { diff --git a/src/borg/cache.py b/src/borg/cache.py index bf6d2bdcb..af2cbe37b 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -13,7 +13,6 @@ files_cache_logger = create_logger("borg.debug.files_cache") from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer -from .helpers import Location from .helpers import Error from .helpers import get_cache_dir, get_security_dir from .helpers import bin_to_hex, hex_to_bin, parse_stringified_list @@ -100,7 +99,7 @@ class SecurityManager: with SaveFile(self.manifest_ts_file) as fd: fd.write(manifest.timestamp) - def assert_location_matches(self, cache_config=None): + def assert_location_matches(self): # Warn user before sending data to a relocated repository try: with open(self.location_file) as fd: @@ -112,10 +111,6 @@ class SecurityManager: except OSError as exc: logger.warning("Could not read previous location file: %s", exc) previous_location = None - if cache_config and cache_config.previous_location and previous_location != cache_config.previous_location: - # Reconcile cache and security dir; we take the cache location. - previous_location = cache_config.previous_location - logger.debug("security: using previous_location of cache: %r", previous_location) repository_location = self.repository._location.canonical_path() if previous_location and previous_location != repository_location: @@ -134,11 +129,9 @@ class SecurityManager: ): raise Cache.RepositoryAccessAborted() # adapt on-disk config immediately if the new location was accepted - logger.debug("security: updating location stored in cache and security dir") + logger.debug("security: updating location stored in security dir") with SaveFile(self.location_file) as fd: fd.write(repository_location) - if cache_config: - cache_config.save() def assert_no_manifest_replay(self, manifest, key, cache_config=None): try: @@ -184,7 +177,7 @@ class SecurityManager: logger.debug("security: repository checks ok, allowing access") def _assert_secure(self, manifest, key, cache_config=None): - self.assert_location_matches(cache_config) + self.assert_location_matches() self.assert_key_type(key, cache_config) self.assert_no_manifest_replay(manifest, key, cache_config) if not self.known(): @@ -221,29 +214,6 @@ def assert_secure(repository, manifest, lock_wait): sm.assert_secure(manifest, manifest.key, lock_wait=lock_wait) -def recanonicalize_relative_location(cache_location, repository): - # borg < 1.0.8rc1 had different canonicalization for the repo location (see #1655 and #1741). - repo_location = repository._location.canonical_path() - rl = Location(repo_location) - cl = Location(cache_location) - if ( - cl.proto == rl.proto - and cl.user == rl.user - and cl.host == rl.host - and cl.port == rl.port - and cl.path - and rl.path - and cl.path.startswith("/~/") - and rl.path.startswith("/./") - and cl.path[3:] == rl.path[3:] - ): - # everything is same except the expected change in relative path canonicalization, - # update previous_location to avoid warning / user query about changed location: - return repo_location - else: - return cache_location - - def cache_dir(repository, path=None): return path or os.path.join(get_cache_dir(), repository.id_str) @@ -310,12 +280,6 @@ class CacheConfig: except configparser.NoSectionError: logger.debug("Cache integrity: No integrity data found (files, chunks). Cache is from old version.") self.integrity = {} - previous_location = self._config.get("cache", "previous_location", fallback=None) - if previous_location: - self.previous_location = recanonicalize_relative_location(previous_location, self.repository) - else: - self.previous_location = None - self._config.set("cache", "previous_location", self.repository._location.canonical_path()) def save(self, manifest=None, key=None): if manifest: diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index 60fc48f7e..3b5e65e1d 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -153,32 +153,29 @@ def test_repository_move(archivers, request, monkeypatch): security_dir = get_security_directory(archiver.repository_path) os.replace(archiver.repository_path, archiver.repository_path + "_new") archiver.repository_location += "_new" + # borg should notice that the repository location changed and abort. + if archiver.FORK_DEFAULT: + cmd(archiver, "rinfo", exit_code=EXIT_ERROR) + else: + with pytest.raises(Cache.RepositoryAccessAborted): + cmd(archiver, "rinfo") + # if we explicitly allow relocated repos, it should work fine. monkeypatch.setenv("BORG_RELOCATED_REPO_ACCESS_IS_OK", "yes") cmd(archiver, "rinfo") monkeypatch.delenv("BORG_RELOCATED_REPO_ACCESS_IS_OK") with open(os.path.join(security_dir, "location")) as fd: location = fd.read() assert location == Location(archiver.repository_location).canonical_path() - # Needs no confirmation anymore - cmd(archiver, "rinfo") - shutil.rmtree(archiver.cache_path) + # after new repo location was confirmed once, it needs no further confirmation anymore. cmd(archiver, "rinfo") shutil.rmtree(security_dir) + # it also needs no confirmation if we have no knowledge about the previous location. cmd(archiver, "rinfo") + # it will re-create security-related infos in the security dir: for file in ("location", "key-type", "manifest-timestamp"): assert os.path.exists(os.path.join(security_dir, file)) -def test_security_dir_compat(archivers, request): - archiver = request.getfixturevalue(archivers) - cmd(archiver, "rcreate", RK_ENCRYPTION) - with open(os.path.join(get_security_directory(archiver.repository_path), "location"), "w") as fd: - fd.write("something outdated") - # This is fine, because the cache still has the correct information. security_dir and cache can disagree - # if older versions are used to confirm a renamed repository. - cmd(archiver, "rinfo") - - def test_unknown_unencrypted(archivers, request, monkeypatch): archiver = request.getfixturevalue(archivers) cmd(archiver, "rcreate", "--encryption=none") From 89d867ea305afea3efe6e57e6337737e4e3136f5 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 1 Jun 2024 19:36:02 +0200 Subject: [PATCH 18/26] keep key_type only in security dir removed some code borg had for backwards compatibility with old borg versions (that had key_type only in the cache). now the repo key_type is only checked against the key-type file in the security dir, simplifying the code. --- src/borg/cache.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index af2cbe37b..99dc2dce3 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -154,10 +154,8 @@ class SecurityManager: else: raise Cache.RepositoryReplay() - def assert_key_type(self, key, cache_config=None): + def assert_key_type(self, key): # Make sure an encrypted repository has not been swapped for an unencrypted repository - if cache_config and cache_config.key_type is not None and cache_config.key_type != str(key.TYPE): - raise Cache.EncryptionMethodMismatch() if self.known() and not self.key_matches(key): raise Cache.EncryptionMethodMismatch() @@ -178,7 +176,7 @@ class SecurityManager: def _assert_secure(self, manifest, key, cache_config=None): self.assert_location_matches() - self.assert_key_type(key, cache_config) + self.assert_key_type(key) self.assert_no_manifest_replay(manifest, key, cache_config) if not self.known(): logger.debug("security: remembering previously unknown repository") @@ -261,7 +259,6 @@ class CacheConfig: self.id = self._config.get("cache", "repository") self.manifest_id = hex_to_bin(self._config.get("cache", "manifest")) self.timestamp = self._config.get("cache", "timestamp", fallback=None) - self.key_type = self._config.get("cache", "key_type", fallback=None) self.ignored_features = set(parse_stringified_list(self._config.get("cache", "ignored_features", fallback=""))) self.mandatory_features = set( parse_stringified_list(self._config.get("cache", "mandatory_features", fallback="")) @@ -281,7 +278,7 @@ class CacheConfig: logger.debug("Cache integrity: No integrity data found (files, chunks). Cache is from old version.") self.integrity = {} - def save(self, manifest=None, key=None): + def save(self, manifest=None): if manifest: self._config.set("cache", "manifest", manifest.id_str) self._config.set("cache", "timestamp", manifest.timestamp) @@ -292,8 +289,6 @@ class CacheConfig: for file, integrity_data in self.integrity.items(): self._config.set("integrity", file, integrity_data) self._config.set("integrity", "manifest", manifest.id_str) - if key: - self._config.set("cache", "key_type", str(key.TYPE)) with SaveFile(self.config_path) as fd: self._config.write(fd) @@ -889,7 +884,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): self.chunks.write(fd) self.cache_config.integrity["chunks"] = fd.integrity_data pi.output("Saving cache config") - self.cache_config.save(self.manifest, self.key) + self.cache_config.save(self.manifest) os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp")) shutil.rmtree(os.path.join(self.path, "txn.tmp")) self._txn_active = False @@ -1271,7 +1266,7 @@ class NewCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): integrity_data = self._write_files_cache() self.cache_config.integrity[self.files_cache_name()] = integrity_data pi.output("Saving cache config") - self.cache_config.save(self.manifest, self.key) + self.cache_config.save(self.manifest) os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp")) shutil.rmtree(os.path.join(self.path, "txn.tmp")) self._txn_active = False From 85688e7543d0a325eaf9f2d32904ae2b770ef484 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 1 Jun 2024 20:19:05 +0200 Subject: [PATCH 19/26] keep timestamp only in security dir removed some code borg had for backwards compatibility with old borg versions (that had timestamp only in the cache). now the manifest timestamp is only checked against the manifest-timestamp file in the security dir, simplifying the code. --- src/borg/cache.py | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 99dc2dce3..20e7a3903 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -133,7 +133,7 @@ class SecurityManager: with SaveFile(self.location_file) as fd: fd.write(repository_location) - def assert_no_manifest_replay(self, manifest, key, cache_config=None): + def assert_no_manifest_replay(self, manifest, key): try: with open(self.manifest_ts_file) as fd: timestamp = fd.read() @@ -144,8 +144,6 @@ class SecurityManager: except OSError as exc: logger.warning("Could not read previous location file: %s", exc) timestamp = "" - if cache_config: - timestamp = max(timestamp, cache_config.timestamp or "") logger.debug("security: determined newest manifest timestamp as %s", timestamp) # If repository is older than the cache or security dir something fishy is going on if timestamp and timestamp > manifest.timestamp: @@ -159,25 +157,17 @@ class SecurityManager: if self.known() and not self.key_matches(key): raise Cache.EncryptionMethodMismatch() - def assert_secure(self, manifest, key, *, cache_config=None, warn_if_unencrypted=True, lock_wait=None): + def assert_secure(self, manifest, key, *, warn_if_unencrypted=True, lock_wait=None): # warn_if_unencrypted=False is only used for initializing a new repository. # Thus, avoiding asking about a repository that's currently initializing. self.assert_access_unknown(warn_if_unencrypted, manifest, key) - if cache_config: - self._assert_secure(manifest, key, cache_config) - else: - cache_config = CacheConfig(self.repository, lock_wait=lock_wait) - if cache_config.exists(): - with cache_config: - self._assert_secure(manifest, key, cache_config) - else: - self._assert_secure(manifest, key) + self._assert_secure(manifest, key) logger.debug("security: repository checks ok, allowing access") - def _assert_secure(self, manifest, key, cache_config=None): + def _assert_secure(self, manifest, key): self.assert_location_matches() self.assert_key_type(key) - self.assert_no_manifest_replay(manifest, key, cache_config) + self.assert_no_manifest_replay(manifest, key) if not self.known(): logger.debug("security: remembering previously unknown repository") self.save(manifest, key) @@ -258,7 +248,6 @@ class CacheConfig: self._check_upgrade(self.config_path) self.id = self._config.get("cache", "repository") self.manifest_id = hex_to_bin(self._config.get("cache", "manifest")) - self.timestamp = self._config.get("cache", "timestamp", fallback=None) self.ignored_features = set(parse_stringified_list(self._config.get("cache", "ignored_features", fallback=""))) self.mandatory_features = set( parse_stringified_list(self._config.get("cache", "mandatory_features", fallback="")) @@ -281,7 +270,6 @@ class CacheConfig: def save(self, manifest=None): if manifest: self._config.set("cache", "manifest", manifest.id_str) - self._config.set("cache", "timestamp", manifest.timestamp) self._config.set("cache", "ignored_features", ",".join(self.ignored_features)) self._config.set("cache", "mandatory_features", ",".join(self.mandatory_features)) if not self._config.has_section("integrity"): @@ -780,7 +768,6 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): self.key = manifest.key self.repo_objs = manifest.repo_objs self.progress = progress - self.timestamp = None self._txn_active = False self.do_cache = os.environ.get("BORG_USE_CHUNKS_ARCHIVE", "yes").lower() in ["yes", "1", "true"] @@ -800,7 +787,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): self.open() try: - self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config) + self.security_manager.assert_secure(manifest, self.key) if not self.check_cache_compatibility(): self.wipe_cache() @@ -1186,7 +1173,6 @@ class NewCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): self.key = manifest.key self.repo_objs = manifest.repo_objs self.progress = progress - self.timestamp = None self._txn_active = False self.path = cache_dir(self.repository, path) @@ -1200,7 +1186,7 @@ class NewCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): self.open() try: - self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config) + self.security_manager.assert_secure(manifest, self.key) if not self.check_cache_compatibility(): self.wipe_cache() From 561dcc8abf7c8b5fc4be90888fb097f0d8ecdcbb Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 1 Jun 2024 18:13:59 +0200 Subject: [PATCH 20/26] Refactor cache sync options and introduce new cache preference Add new borg create option '--prefer-adhoc-cache' to prefer the AdHocCache over the NewCache implementation. Adjust a test to match the previous default behaviour (== use the AdHocCache) with --no-cache-sync. --- src/borg/archiver/create_cmd.py | 15 +++++++++++---- src/borg/cache.py | 23 ++++++++++++++--------- src/borg/testsuite/archiver/create_cmd.py | 6 +++--- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/borg/archiver/create_cmd.py b/src/borg/archiver/create_cmd.py index d87c1bd1e..785c7ea8a 100644 --- a/src/borg/archiver/create_cmd.py +++ b/src/borg/archiver/create_cmd.py @@ -224,8 +224,9 @@ class CreateMixIn: manifest, progress=args.progress, lock_wait=self.lock_wait, - permit_adhoc_cache=args.no_cache_sync, - force_adhoc_cache=args.no_cache_sync_forced, + no_cache_sync_permitted=args.no_cache_sync, + no_cache_sync_forced=args.no_cache_sync_forced, + prefer_adhoc_cache=args.prefer_adhoc_cache, cache_mode=args.files_cache_mode, iec=args.iec, ) as cache: @@ -802,13 +803,19 @@ class CreateMixIn: "--no-cache-sync", dest="no_cache_sync", action="store_true", - help="experimental: do not synchronize the cache. Implies not using the files cache.", + help="experimental: do not synchronize the chunks cache.", ) subparser.add_argument( "--no-cache-sync-forced", dest="no_cache_sync_forced", action="store_true", - help="experimental: do not synchronize the cache (forced). Implies not using the files cache.", + help="experimental: do not synchronize the chunks cache (forced).", + ) + subparser.add_argument( + "--prefer-adhoc-cache", + dest="prefer_adhoc_cache", + action="store_true", + help="experimental: prefer AdHocCache (w/o files cache) over NewCache (with files cache).", ) subparser.add_argument( "--stdin-name", diff --git a/src/borg/cache.py b/src/borg/cache.py index 20e7a3903..a7f6bc53c 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -350,8 +350,9 @@ class Cache: warn_if_unencrypted=True, progress=False, lock_wait=None, - permit_adhoc_cache=False, - force_adhoc_cache=False, + no_cache_sync_permitted=False, + no_cache_sync_forced=False, + prefer_adhoc_cache=False, cache_mode=FILES_CACHE_MODE_DISABLED, iec=False, ): @@ -381,14 +382,14 @@ class Cache: def adhoc(): return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec) - if force_adhoc_cache: - return adhoc() + if no_cache_sync_forced: + return adhoc() if prefer_adhoc_cache else newcache() - if not permit_adhoc_cache: + if not no_cache_sync_permitted: return local() - # ad-hoc cache may be permitted, but if the local cache is in sync it'd be stupid to invalidate - # it by needlessly using the ad-hoc cache. + # no cache sync may be permitted, but if the local cache is in sync it'd be stupid to invalidate + # it by needlessly using the AdHocCache or the NewCache. # Check if the local cache exists and is in sync. cache_config = CacheConfig(repository, path, lock_wait) @@ -400,8 +401,12 @@ class Cache: # Local cache is in sync, use it logger.debug("Cache: choosing local cache (in sync)") return local() - logger.debug("Cache: choosing ad-hoc cache (local cache does not exist or is not in sync)") - return adhoc() + if prefer_adhoc_cache: + logger.debug("Cache: choosing AdHocCache (local cache does not exist or is not in sync)") + return adhoc() + else: + logger.debug("Cache: choosing NewCache (local cache does not exist or is not in sync)") + return newcache() class CacheStatsMixin: diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index a064f4635..82b7c7af7 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -540,20 +540,20 @@ def test_create_pattern_intermediate_folders_first(archivers, request): assert out_list.index("d x/b") < out_list.index("- x/b/foo_b") -def test_create_no_cache_sync(archivers, request): +def test_create_no_cache_sync_adhoc(archivers, request): # TODO: add test for NewCache archiver = request.getfixturevalue(archivers) create_test_files(archiver.input_path) cmd(archiver, "rcreate", RK_ENCRYPTION) cmd(archiver, "rdelete", "--cache-only") create_json = json.loads( - cmd(archiver, "create", "--no-cache-sync", "--json", "--error", "test", "input") + cmd(archiver, "create", "--no-cache-sync", "--prefer-adhoc-cache", "--json", "--error", "test", "input") ) # ignore experimental warning info_json = json.loads(cmd(archiver, "info", "-a", "test", "--json")) create_stats = create_json["cache"]["stats"] info_stats = info_json["cache"]["stats"] assert create_stats == info_stats cmd(archiver, "rdelete", "--cache-only") - cmd(archiver, "create", "--no-cache-sync", "test2", "input") + cmd(archiver, "create", "--no-cache-sync", "--prefer-adhoc-cache", "test2", "input") cmd(archiver, "rinfo") cmd(archiver, "check") From c7249583e700af84023d4e56487b358c25350218 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 12 Jul 2024 18:57:49 +0200 Subject: [PATCH 21/26] fix test_cache_chunks - skip test_cache_chunks if there is no persistent chunks cache file - init self.chunks for AdHocCache - remove warning output from AdHocCache.__init__, it gets mixed with JSON output and fails the JSON decoder. --- src/borg/cache.py | 5 +++-- src/borg/testsuite/archiver/corruption.py | 7 +++---- src/borg/testsuite/archiver/create_cmd.py | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index a7f6bc53c..99709d172 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -428,6 +428,9 @@ Total chunks: {0.total_chunks} def stats(self): from .archive import Archive + if isinstance(self, AdHocCache) and getattr(self, "chunks", None) is None: + self.chunks = self._load_chunks_from_repo() # AdHocCache usually only has .chunks after begin_txn. + # XXX: this should really be moved down to `hashindex.pyx` total_size, unique_size, total_unique_chunks, total_chunks = self.chunks.summarize() # since borg 1.2 we have new archive metadata telling the total size per archive, @@ -1340,8 +1343,6 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" self.security_manager = SecurityManager(self.repository) self.security_manager.assert_secure(manifest, self.key, lock_wait=lock_wait) - logger.warning("Note: --no-cache-sync is an experimental feature.") - # Public API def __enter__(self): diff --git a/src/borg/testsuite/archiver/corruption.py b/src/borg/testsuite/archiver/corruption.py index 965d2d1d2..65804eaca 100644 --- a/src/borg/testsuite/archiver/corruption.py +++ b/src/borg/testsuite/archiver/corruption.py @@ -53,11 +53,10 @@ def test_cache_chunks(archiver): create_src_archive(archiver, "test") chunks_path = os.path.join(archiver.cache_path, "chunks") - chunks_before_corruption = set(ChunkIndex(path=chunks_path).iteritems()) + if not os.path.exists(chunks_path): + pytest.skip("no persistent chunks index for this kind of Cache implementation") - chunks_index = os.path.join(archiver.cache_path, "chunks") - if not os.path.exists(chunks_index): - pytest.skip("Only works with LocalCache.") + chunks_before_corruption = set(ChunkIndex(path=chunks_path).iteritems()) corrupt(chunks_path) diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index 82b7c7af7..a19490041 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -546,8 +546,8 @@ def test_create_no_cache_sync_adhoc(archivers, request): # TODO: add test for N cmd(archiver, "rcreate", RK_ENCRYPTION) cmd(archiver, "rdelete", "--cache-only") create_json = json.loads( - cmd(archiver, "create", "--no-cache-sync", "--prefer-adhoc-cache", "--json", "--error", "test", "input") - ) # ignore experimental warning + cmd(archiver, "create", "--no-cache-sync", "--prefer-adhoc-cache", "--json", "test", "input") + ) info_json = json.loads(cmd(archiver, "info", "-a", "test", "--json")) create_stats = create_json["cache"]["stats"] info_stats = info_json["cache"]["stats"] From 72be8eff15228e2557666bfdb61f0cd697c1fa8a Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 12 Jul 2024 21:11:03 +0200 Subject: [PATCH 22/26] add comments to test_unknown_feature_on_cache_sync --- src/borg/testsuite/archiver/checks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index 3b5e65e1d..b78b27cdb 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -205,8 +205,10 @@ def test_unknown_feature_on_create(archivers, request): def test_unknown_feature_on_cache_sync(archivers, request): + # LocalCache.sync checks repo compat archiver = request.getfixturevalue(archivers) cmd(archiver, "rcreate", RK_ENCRYPTION) + # delete the cache to trigger a cache sync later in borg create cmd(archiver, "rdelete", "--cache-only") add_unknown_feature(archiver.repository_path, Manifest.Operation.READ) cmd_raises_unknown_feature(archiver, ["create", "test", "input"]) From 616af8daa8ab2d0aea606f463035587739fd2eec Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 12 Jul 2024 22:57:26 +0200 Subject: [PATCH 23/26] BORG_CACHE_IMPL environment variable added BORG_CACHE_IMPL allows users to choose the client-side cache implementation from 'local', 'newcache' and 'adhoc'. --- docs/usage/general/environment.rst.inc | 10 ++++++++++ src/borg/cache.py | 9 +++++++++ src/borg/testsuite/conftest.py | 1 + 3 files changed, 20 insertions(+) diff --git a/docs/usage/general/environment.rst.inc b/docs/usage/general/environment.rst.inc index 6cabf3032..917db5c7c 100644 --- a/docs/usage/general/environment.rst.inc +++ b/docs/usage/general/environment.rst.inc @@ -84,6 +84,16 @@ General: - ``pyfuse3``: only try to load pyfuse3 - ``llfuse``: only try to load llfuse - ``none``: do not try to load an implementation + BORG_CACHE_IMPL + Choose the implementation for the clientside cache, choose one of: + + - ``local``: uses a persistent chunks cache and keeps it in a perfect state (precise refcounts and + sizes), requiring a potentially resource expensive cache sync in multi-client scenarios. + Also has a persistent files cache. Default implementation. + - ``adhoc``: builds a non-persistent chunks cache by querying the repo. Chunks cache contents + are somewhat sloppy for already existing chunks, concerning their refcount ("infinite") and + size (0). No files cache (slow, will chunk all input files). DEPRECATED. + - ``newcache``: Like ``adhoc``, but with a persistent files cache. BORG_SELFTEST This can be used to influence borg's builtin self-tests. The default is to execute the tests at the beginning of each borg command invocation. diff --git a/src/borg/cache.py b/src/borg/cache.py index 99709d172..38a6ce215 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -382,6 +382,15 @@ class Cache: def adhoc(): return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec) + impl = os.environ.get("BORG_CACHE_IMPL", None) + if impl is not None: + methods = dict(local=local, newcache=newcache, adhoc=adhoc) + try: + method = methods[impl] + except KeyError: + raise RuntimeError("Unknown BORG_CACHE_IMPL value: %s" % impl) + return method() + if no_cache_sync_forced: return adhoc() if prefer_adhoc_cache else newcache() diff --git a/src/borg/testsuite/conftest.py b/src/borg/testsuite/conftest.py index aa45da4ea..d17f1dc7a 100644 --- a/src/borg/testsuite/conftest.py +++ b/src/borg/testsuite/conftest.py @@ -127,6 +127,7 @@ def archiver(tmp_path, set_env_variables): archiver.patterns_file_path = os.fspath(tmp_path / "patterns") os.environ["BORG_KEYS_DIR"] = archiver.keys_path os.environ["BORG_CACHE_DIR"] = archiver.cache_path + # os.environ["BORG_CACHE_IMPL"] = "newcache" os.mkdir(archiver.input_path) os.chmod(archiver.input_path, 0o777) # avoid troubles with fakeroot / FUSE os.mkdir(archiver.output_path) From 78954b648790eb32e478a916d38f1333073f1a18 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sun, 14 Jul 2024 19:52:17 +0200 Subject: [PATCH 24/26] skip tests not working with specific cache implementations --- src/borg/testsuite/archiver/checks.py | 2 ++ src/borg/testsuite/archiver/create_cmd.py | 1 + 2 files changed, 3 insertions(+) diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index b78b27cdb..60c87db6f 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -204,6 +204,7 @@ def test_unknown_feature_on_create(archivers, request): cmd_raises_unknown_feature(archiver, ["create", "test", "input"]) +@pytest.mark.skipif(os.environ.get("BORG_CACHE_IMPL") in ("newcache", "adhoc"), reason="only works with LocalCache") def test_unknown_feature_on_cache_sync(archivers, request): # LocalCache.sync checks repo compat archiver = request.getfixturevalue(archivers) @@ -323,6 +324,7 @@ def test_check_cache(archivers, request): check_cache(archiver) +@pytest.mark.skipif(os.environ.get("BORG_CACHE_IMPL") in ("newcache", "adhoc"), reason="only works with LocalCache") def test_env_use_chunks_archive(archivers, request, monkeypatch): archiver = request.getfixturevalue(archivers) create_test_files(archiver.input_path) diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index a19490041..50c02aab2 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -540,6 +540,7 @@ def test_create_pattern_intermediate_folders_first(archivers, request): assert out_list.index("d x/b") < out_list.index("- x/b/foo_b") +@pytest.mark.skipif(os.environ.get("BORG_CACHE_IMPL") in ("newcache", "local"), reason="only works with AdHocCache") def test_create_no_cache_sync_adhoc(archivers, request): # TODO: add test for NewCache archiver = request.getfixturevalue(archivers) create_test_files(archiver.input_path) From 5a500cddf85853e0d511750ffe74154d7509b755 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 18 Jul 2024 22:14:00 +0200 Subject: [PATCH 25/26] rename NewCache -> AdHocWithFilesCache --- docs/usage/general/environment.rst.inc | 2 +- src/borg/archive.py | 2 +- src/borg/archiver/create_cmd.py | 2 +- src/borg/cache.py | 20 ++++++++++---------- src/borg/helpers/parseformat.py | 4 ++-- src/borg/testsuite/archiver/__init__.py | 2 +- src/borg/testsuite/archiver/checks.py | 8 ++++++-- src/borg/testsuite/archiver/create_cmd.py | 6 ++++-- src/borg/testsuite/archiver/debug_cmds.py | 2 +- src/borg/testsuite/conftest.py | 2 +- 10 files changed, 28 insertions(+), 22 deletions(-) diff --git a/docs/usage/general/environment.rst.inc b/docs/usage/general/environment.rst.inc index 917db5c7c..cd32e09b9 100644 --- a/docs/usage/general/environment.rst.inc +++ b/docs/usage/general/environment.rst.inc @@ -93,7 +93,7 @@ General: - ``adhoc``: builds a non-persistent chunks cache by querying the repo. Chunks cache contents are somewhat sloppy for already existing chunks, concerning their refcount ("infinite") and size (0). No files cache (slow, will chunk all input files). DEPRECATED. - - ``newcache``: Like ``adhoc``, but with a persistent files cache. + - ``adhocwithfiles``: Like ``adhoc``, but with a persistent files cache. BORG_SELFTEST This can be used to influence borg's builtin self-tests. The default is to execute the tests at the beginning of each borg command invocation. diff --git a/src/borg/archive.py b/src/borg/archive.py index 13cac5fc5..eda4ec4c6 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -2334,7 +2334,7 @@ class ArchiveChecker: logger.info(f"{len(orphaned)} orphaned (unused) objects found.") for chunk_id in orphaned: logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.") - # To support working with AdHocCache or NewCache, we do not set self.error_found = True. + # To support working with AdHocCache or AdHocWithFilesCache, we do not set self.error_found = True. if self.repair and unused: logger.info( "Deleting %d orphaned and %d superseded objects..." % (len(orphaned), len(self.possibly_superseded)) diff --git a/src/borg/archiver/create_cmd.py b/src/borg/archiver/create_cmd.py index 785c7ea8a..40160f641 100644 --- a/src/borg/archiver/create_cmd.py +++ b/src/borg/archiver/create_cmd.py @@ -815,7 +815,7 @@ class CreateMixIn: "--prefer-adhoc-cache", dest="prefer_adhoc_cache", action="store_true", - help="experimental: prefer AdHocCache (w/o files cache) over NewCache (with files cache).", + help="experimental: prefer AdHocCache (w/o files cache) over AdHocWithFilesCache (with files cache).", ) subparser.add_argument( "--stdin-name", diff --git a/src/borg/cache.py b/src/borg/cache.py index 38a6ce215..4836314d4 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -368,8 +368,8 @@ class Cache: cache_mode=cache_mode, ) - def newcache(): - return NewCache( + def adhocwithfiles(): + return AdHocWithFilesCache( manifest=manifest, path=path, warn_if_unencrypted=warn_if_unencrypted, @@ -384,7 +384,7 @@ class Cache: impl = os.environ.get("BORG_CACHE_IMPL", None) if impl is not None: - methods = dict(local=local, newcache=newcache, adhoc=adhoc) + methods = dict(local=local, adhocwithfiles=adhocwithfiles, adhoc=adhoc) try: method = methods[impl] except KeyError: @@ -392,13 +392,13 @@ class Cache: return method() if no_cache_sync_forced: - return adhoc() if prefer_adhoc_cache else newcache() + return adhoc() if prefer_adhoc_cache else adhocwithfiles() if not no_cache_sync_permitted: return local() # no cache sync may be permitted, but if the local cache is in sync it'd be stupid to invalidate - # it by needlessly using the AdHocCache or the NewCache. + # it by needlessly using the AdHocCache or the AdHocWithFilesCache. # Check if the local cache exists and is in sync. cache_config = CacheConfig(repository, path, lock_wait) @@ -410,12 +410,12 @@ class Cache: # Local cache is in sync, use it logger.debug("Cache: choosing local cache (in sync)") return local() - if prefer_adhoc_cache: + if prefer_adhoc_cache: # adhoc cache, without files cache logger.debug("Cache: choosing AdHocCache (local cache does not exist or is not in sync)") return adhoc() else: - logger.debug("Cache: choosing NewCache (local cache does not exist or is not in sync)") - return newcache() + logger.debug("Cache: choosing AdHocWithFilesCache (local cache does not exist or is not in sync)") + return adhocwithfiles() class CacheStatsMixin: @@ -675,7 +675,7 @@ class ChunksMixin: "chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, entry.size, size) ) else: - # NewCache / AdHocCache: + # AdHocWithFilesCache / AdHocCache: # Here *size* is used to update the chunk's size information, which will be zero for existing chunks. self.chunks[id] = entry._replace(size=size) return entry.refcount @@ -1162,7 +1162,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): self.cache_config.mandatory_features.update(repo_features & my_features) -class NewCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): +class AdHocWithFilesCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): """ Like AdHocCache, but with a files cache. """ diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index 7e906547a..c69889b18 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -1184,13 +1184,13 @@ class BorgJsonEncoder(json.JSONEncoder): from ..repository import Repository from ..remote import RemoteRepository from ..archive import Archive - from ..cache import LocalCache, AdHocCache, NewCache + from ..cache import LocalCache, AdHocCache, AdHocWithFilesCache if isinstance(o, Repository) or isinstance(o, RemoteRepository): return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()} if isinstance(o, Archive): return o.info() - if isinstance(o, (LocalCache, NewCache)): + if isinstance(o, (LocalCache, AdHocWithFilesCache)): return {"path": o.path, "stats": o.stats()} if isinstance(o, AdHocCache): return {"stats": o.stats()} diff --git a/src/borg/testsuite/archiver/__init__.py b/src/borg/testsuite/archiver/__init__.py index abfadf6ce..9d7a5db42 100644 --- a/src/borg/testsuite/archiver/__init__.py +++ b/src/borg/testsuite/archiver/__init__.py @@ -357,7 +357,7 @@ def check_cache(archiver): with Cache(repository, manifest, sync=False) as cache: original_chunks = cache.chunks # the LocalCache implementation has an on-disk chunks cache, - # but NewCache and AdHocCache don't have persistent chunks cache. + # but AdHocWithFilesCache and AdHocCache don't have persistent chunks cache. persistent = isinstance(cache, LocalCache) Cache.destroy(repository) with Cache(repository, manifest) as cache: diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index 60c87db6f..c095762e6 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -204,7 +204,9 @@ def test_unknown_feature_on_create(archivers, request): cmd_raises_unknown_feature(archiver, ["create", "test", "input"]) -@pytest.mark.skipif(os.environ.get("BORG_CACHE_IMPL") in ("newcache", "adhoc"), reason="only works with LocalCache") +@pytest.mark.skipif( + os.environ.get("BORG_CACHE_IMPL") in ("adhocwithfiles", "adhoc"), reason="only works with LocalCache" +) def test_unknown_feature_on_cache_sync(archivers, request): # LocalCache.sync checks repo compat archiver = request.getfixturevalue(archivers) @@ -324,7 +326,9 @@ def test_check_cache(archivers, request): check_cache(archiver) -@pytest.mark.skipif(os.environ.get("BORG_CACHE_IMPL") in ("newcache", "adhoc"), reason="only works with LocalCache") +@pytest.mark.skipif( + os.environ.get("BORG_CACHE_IMPL") in ("adhocwithfiles", "adhoc"), reason="only works with LocalCache" +) def test_env_use_chunks_archive(archivers, request, monkeypatch): archiver = request.getfixturevalue(archivers) create_test_files(archiver.input_path) diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index 50c02aab2..461097ac5 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -540,8 +540,10 @@ def test_create_pattern_intermediate_folders_first(archivers, request): assert out_list.index("d x/b") < out_list.index("- x/b/foo_b") -@pytest.mark.skipif(os.environ.get("BORG_CACHE_IMPL") in ("newcache", "local"), reason="only works with AdHocCache") -def test_create_no_cache_sync_adhoc(archivers, request): # TODO: add test for NewCache +@pytest.mark.skipif( + os.environ.get("BORG_CACHE_IMPL") in ("adhocwithfiles", "local"), reason="only works with AdHocCache" +) +def test_create_no_cache_sync_adhoc(archivers, request): # TODO: add test for AdHocWithFilesCache archiver = request.getfixturevalue(archivers) create_test_files(archiver.input_path) cmd(archiver, "rcreate", RK_ENCRYPTION) diff --git a/src/borg/testsuite/archiver/debug_cmds.py b/src/borg/testsuite/archiver/debug_cmds.py index 0dd4ea139..3923871a5 100644 --- a/src/borg/testsuite/archiver/debug_cmds.py +++ b/src/borg/testsuite/archiver/debug_cmds.py @@ -169,7 +169,7 @@ def test_debug_refcount_obj(archivers, request): archive_id = create_json["archive"]["id"] output = cmd(archiver, "debug", "refcount-obj", archive_id).strip() # LocalCache does precise refcounting, so we'll get 1 reference for the archive. - # AdHocCache or NewCache doesn't, we'll get ChunkIndex.MAX_VALUE as refcount. + # AdHocCache or AdHocWithFilesCache doesn't, we'll get ChunkIndex.MAX_VALUE as refcount. assert ( output == f"object {archive_id} has 1 referrers [info from chunks cache]." or output == f"object {archive_id} has 4294966271 referrers [info from chunks cache]." diff --git a/src/borg/testsuite/conftest.py b/src/borg/testsuite/conftest.py index d17f1dc7a..4708d9170 100644 --- a/src/borg/testsuite/conftest.py +++ b/src/borg/testsuite/conftest.py @@ -127,7 +127,7 @@ def archiver(tmp_path, set_env_variables): archiver.patterns_file_path = os.fspath(tmp_path / "patterns") os.environ["BORG_KEYS_DIR"] = archiver.keys_path os.environ["BORG_CACHE_DIR"] = archiver.cache_path - # os.environ["BORG_CACHE_IMPL"] = "newcache" + # os.environ["BORG_CACHE_IMPL"] = "adhocwithfiles" os.mkdir(archiver.input_path) os.chmod(archiver.input_path, 0o777) # avoid troubles with fakeroot / FUSE os.mkdir(archiver.output_path) From 619a06a5bac5d1a6a3600cf44ff234a5b7287357 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 18 Jul 2024 22:51:17 +0200 Subject: [PATCH 26/26] BORG_CACHE_IMPL defaults to "adhocwithfiles" now Also: support a "cli" env var value, that does not determine the implementation from the env var, but rather from cli options (similar to as it was before adding BORG_CACHE_IMPL). --- docs/usage/general/environment.rst.inc | 6 ++++-- src/borg/cache.py | 8 ++++++-- src/borg/testsuite/archiver/checks.py | 10 +++------- src/borg/testsuite/archiver/create_cmd.py | 5 ++--- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/docs/usage/general/environment.rst.inc b/docs/usage/general/environment.rst.inc index cd32e09b9..b386dc465 100644 --- a/docs/usage/general/environment.rst.inc +++ b/docs/usage/general/environment.rst.inc @@ -89,11 +89,13 @@ General: - ``local``: uses a persistent chunks cache and keeps it in a perfect state (precise refcounts and sizes), requiring a potentially resource expensive cache sync in multi-client scenarios. - Also has a persistent files cache. Default implementation. + Also has a persistent files cache. - ``adhoc``: builds a non-persistent chunks cache by querying the repo. Chunks cache contents are somewhat sloppy for already existing chunks, concerning their refcount ("infinite") and size (0). No files cache (slow, will chunk all input files). DEPRECATED. - - ``adhocwithfiles``: Like ``adhoc``, but with a persistent files cache. + - ``adhocwithfiles``: Like ``adhoc``, but with a persistent files cache. Default implementation. + - ``cli``: Determine the cache implementation from cli options. Without special options, will + usually end up with the ``local`` implementation. BORG_SELFTEST This can be used to influence borg's builtin self-tests. The default is to execute the tests at the beginning of each borg command invocation. diff --git a/src/borg/cache.py b/src/borg/cache.py index 4836314d4..88fe32902 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -299,6 +299,10 @@ class CacheConfig: raise Exception("%s does not look like a Borg cache." % config_path) from None +def get_cache_impl(): + return os.environ.get("BORG_CACHE_IMPL", "adhocwithfiles") + + class Cache: """Client Side cache""" @@ -382,8 +386,8 @@ class Cache: def adhoc(): return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec) - impl = os.environ.get("BORG_CACHE_IMPL", None) - if impl is not None: + impl = get_cache_impl() + if impl != "cli": methods = dict(local=local, adhocwithfiles=adhocwithfiles, adhoc=adhoc) try: method = methods[impl] diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index c095762e6..a9324fbdf 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -4,7 +4,7 @@ from unittest.mock import patch import pytest -from ...cache import Cache, LocalCache +from ...cache import Cache, LocalCache, get_cache_impl from ...constants import * # NOQA from ...helpers import Location, get_security_dir, bin_to_hex from ...helpers import EXIT_ERROR @@ -204,9 +204,7 @@ def test_unknown_feature_on_create(archivers, request): cmd_raises_unknown_feature(archiver, ["create", "test", "input"]) -@pytest.mark.skipif( - os.environ.get("BORG_CACHE_IMPL") in ("adhocwithfiles", "adhoc"), reason="only works with LocalCache" -) +@pytest.mark.skipif(get_cache_impl() in ("adhocwithfiles", "adhoc"), reason="only works with LocalCache") def test_unknown_feature_on_cache_sync(archivers, request): # LocalCache.sync checks repo compat archiver = request.getfixturevalue(archivers) @@ -326,9 +324,7 @@ def test_check_cache(archivers, request): check_cache(archiver) -@pytest.mark.skipif( - os.environ.get("BORG_CACHE_IMPL") in ("adhocwithfiles", "adhoc"), reason="only works with LocalCache" -) +@pytest.mark.skipif(get_cache_impl() in ("adhocwithfiles", "adhoc"), reason="only works with LocalCache") def test_env_use_chunks_archive(archivers, request, monkeypatch): archiver = request.getfixturevalue(archivers) create_test_files(archiver.input_path) diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index 461097ac5..4d7626d2b 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -12,6 +12,7 @@ import time import pytest from ... import platform +from ...cache import get_cache_impl from ...constants import * # NOQA from ...manifest import Manifest from ...platform import is_cygwin, is_win32, is_darwin @@ -540,9 +541,7 @@ def test_create_pattern_intermediate_folders_first(archivers, request): assert out_list.index("d x/b") < out_list.index("- x/b/foo_b") -@pytest.mark.skipif( - os.environ.get("BORG_CACHE_IMPL") in ("adhocwithfiles", "local"), reason="only works with AdHocCache" -) +@pytest.mark.skipif(get_cache_impl() in ("adhocwithfiles", "local"), reason="only works with AdHocCache") def test_create_no_cache_sync_adhoc(archivers, request): # TODO: add test for AdHocWithFilesCache archiver = request.getfixturevalue(archivers) create_test_files(archiver.input_path)