diff --git a/docs/usage/general/environment.rst.inc b/docs/usage/general/environment.rst.inc index 6cabf3032..b386dc465 100644 --- a/docs/usage/general/environment.rst.inc +++ b/docs/usage/general/environment.rst.inc @@ -84,6 +84,18 @@ General: - ``pyfuse3``: only try to load pyfuse3 - ``llfuse``: only try to load llfuse - ``none``: do not try to load an implementation + BORG_CACHE_IMPL + Choose the implementation for the clientside cache, choose one of: + + - ``local``: uses a persistent chunks cache and keeps it in a perfect state (precise refcounts and + sizes), requiring a potentially resource expensive cache sync in multi-client scenarios. + Also has a persistent files cache. + - ``adhoc``: builds a non-persistent chunks cache by querying the repo. Chunks cache contents + are somewhat sloppy for already existing chunks, concerning their refcount ("infinite") and + size (0). No files cache (slow, will chunk all input files). DEPRECATED. + - ``adhocwithfiles``: Like ``adhoc``, but with a persistent files cache. Default implementation. + - ``cli``: Determine the cache implementation from cli options. Without special options, will + usually end up with the ``local`` implementation. BORG_SELFTEST This can be used to influence borg's builtin self-tests. The default is to execute the tests at the beginning of each borg command invocation. diff --git a/src/borg/archive.py b/src/borg/archive.py index 4270fc386..eda4ec4c6 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -643,14 +643,14 @@ Duration: {0.duration} # so we can already remove it here, the next .save() will then commit this cleanup. # remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks: del self.manifest.archives[self.checkpoint_name] - self.cache.chunk_decref(self.id, self.stats) + self.cache.chunk_decref(self.id, 1, self.stats) for id in metadata.item_ptrs: - self.cache.chunk_decref(id, self.stats) + self.cache.chunk_decref(id, 1, self.stats) # also get rid of that part item, we do not want to have it in next checkpoint or final archive tail_chunks = self.items_buffer.restore_chunks_state() # tail_chunks contain the tail of the archive items metadata stream, not needed for next commit. for id in tail_chunks: - self.cache.chunk_decref(id, self.stats) + self.cache.chunk_decref(id, 1, self.stats) # TODO can we have real size here? def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None): name = name or self.name @@ -1024,7 +1024,7 @@ Duration: {0.duration} new_id = self.key.id_hash(data) self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META) self.manifest.archives[self.name] = (new_id, metadata.time) - self.cache.chunk_decref(self.id, self.stats) + self.cache.chunk_decref(self.id, 1, self.stats) self.id = new_id def rename(self, name): @@ -1052,12 +1052,15 @@ Duration: {0.duration} error = True return exception_ignored # must not return None here - def chunk_decref(id, stats): + def chunk_decref(id, size, stats): try: - self.cache.chunk_decref(id, stats, wait=False) + self.cache.chunk_decref(id, size, stats, wait=False) except KeyError: - cid = bin_to_hex(id) - raise ChunksIndexError(cid) + nonlocal error + if forced == 0: + cid = bin_to_hex(id) + raise ChunksIndexError(cid) + error = True else: fetch_async_response(wait=False) @@ -1073,13 +1076,13 @@ Duration: {0.duration} pi.show(i) _, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM) unpacker.feed(data) - chunk_decref(items_id, stats) + chunk_decref(items_id, 1, stats) try: for item in unpacker: item = Item(internal_dict=item) if "chunks" in item: for chunk_id, size in item.chunks: - chunk_decref(chunk_id, stats) + chunk_decref(chunk_id, size, stats) except (TypeError, ValueError): # if items metadata spans multiple chunks and one chunk got dropped somehow, # it could be that unpacker yields bad types @@ -1096,12 +1099,12 @@ Duration: {0.duration} # delete the blocks that store all the references that end up being loaded into metadata.items: for id in self.metadata.item_ptrs: - chunk_decref(id, stats) + chunk_decref(id, 1, stats) # in forced delete mode, we try hard to delete at least the manifest entry, # if possible also the archive superblock, even if processing the items raises # some harmless exception. - chunk_decref(self.id, stats) + chunk_decref(self.id, 1, stats) del self.manifest.archives[self.name] while fetch_async_response(wait=True) is not None: # we did async deletes, process outstanding results (== exceptions), @@ -1510,7 +1513,7 @@ class FilesystemObjectProcessors: except BackupOSError: # see comments in process_file's exception handler, same issue here. for chunk in item.get("chunks", []): - cache.chunk_decref(chunk.id, self.stats, wait=False) + cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False) raise else: item.get_size(memorize=True) @@ -1544,7 +1547,7 @@ class FilesystemObjectProcessors: item.chunks = [] for chunk_id, chunk_size in hl_chunks: # process one-by-one, so we will know in item.chunks how far we got - chunk_entry = cache.chunk_incref(chunk_id, self.stats) + chunk_entry = cache.chunk_incref(chunk_id, chunk_size, self.stats) item.chunks.append(chunk_entry) else: # normal case, no "2nd+" hardlink if not is_special_file: @@ -1552,26 +1555,26 @@ class FilesystemObjectProcessors: started_hashing = time.monotonic() path_hash = self.key.id_hash(hashed_path) self.stats.hashing_time += time.monotonic() - started_hashing - known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st) + known, chunks = cache.file_known_and_unchanged(hashed_path, path_hash, st) else: # in --read-special mode, we may be called for special files. # there should be no information in the cache about special files processed in # read-special mode, but we better play safe as this was wrong in the past: hashed_path = path_hash = None - known, ids = False, None - if ids is not None: + known, chunks = False, None + if chunks is not None: # Make sure all ids are available - for id_ in ids: - if not cache.seen_chunk(id_): + for chunk in chunks: + if not cache.seen_chunk(chunk.id): # cache said it is unmodified, but we lost a chunk: process file like modified status = "M" break else: item.chunks = [] - for chunk_id in ids: + for chunk in chunks: # process one-by-one, so we will know in item.chunks how far we got - chunk_entry = cache.chunk_incref(chunk_id, self.stats) - item.chunks.append(chunk_entry) + cache.chunk_incref(chunk.id, chunk.size, self.stats) + item.chunks.append(chunk) status = "U" # regular file, unchanged else: status = "M" if known else "A" # regular file, modified or added @@ -1606,7 +1609,7 @@ class FilesystemObjectProcessors: # block or char device will change without its mtime/size/inode changing. # also, we must not memorize a potentially inconsistent/corrupt file that # changed while we backed it up. - cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks]) + cache.memorize_file(hashed_path, path_hash, st, item.chunks) self.stats.files_stats[status] += 1 # must be done late if not changed_while_backup: status = None # we already called print_file_status @@ -1620,7 +1623,7 @@ class FilesystemObjectProcessors: # but we will not add an item (see add_item in create_helper) and thus # they would be orphaned chunks in case that we commit the transaction. for chunk in item.get("chunks", []): - cache.chunk_decref(chunk.id, self.stats, wait=False) + cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False) # Now that we have cleaned up the chunk references, we can re-raise the exception. # This will skip processing of this file, but might retry or continue with the next one. raise @@ -1731,7 +1734,7 @@ class TarfileObjectProcessors: except BackupOSError: # see comment in FilesystemObjectProcessors.process_file, same issue here. for chunk in item.get("chunks", []): - self.cache.chunk_decref(chunk.id, self.stats, wait=False) + self.cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False) raise @@ -2328,10 +2331,10 @@ class ArchiveChecker: unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0} orphaned = unused - self.possibly_superseded if orphaned: - logger.error(f"{len(orphaned)} orphaned objects found!") + logger.info(f"{len(orphaned)} orphaned (unused) objects found.") for chunk_id in orphaned: logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.") - self.error_found = True + # To support working with AdHocCache or AdHocWithFilesCache, we do not set self.error_found = True. if self.repair and unused: logger.info( "Deleting %d orphaned and %d superseded objects..." % (len(orphaned), len(self.possibly_superseded)) @@ -2444,7 +2447,7 @@ class ArchiveRecreater: def process_chunks(self, archive, target, item): if not target.recreate_rechunkify: for chunk_id, size in item.chunks: - self.cache.chunk_incref(chunk_id, target.stats) + self.cache.chunk_incref(chunk_id, size, target.stats) return item.chunks chunk_iterator = self.iter_chunks(archive, target, list(item.chunks)) chunk_processor = partial(self.chunk_processor, target) @@ -2452,8 +2455,9 @@ class ArchiveRecreater: def chunk_processor(self, target, chunk): chunk_id, data = cached_hash(chunk, self.key.id_hash) + size = len(data) if chunk_id in self.seen_chunks: - return self.cache.chunk_incref(chunk_id, target.stats) + return self.cache.chunk_incref(chunk_id, size, target.stats) chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM) self.cache.repository.async_response(wait=False) self.seen_chunks.add(chunk_entry.id) diff --git a/src/borg/archiver/config_cmd.py b/src/borg/archiver/config_cmd.py index 733a0a8c2..f92baf4f3 100644 --- a/src/borg/archiver/config_cmd.py +++ b/src/borg/archiver/config_cmd.py @@ -5,7 +5,6 @@ from ._common import with_repository from ..cache import Cache, assert_secure from ..constants import * # NOQA from ..helpers import Error, CommandError -from ..helpers import Location from ..helpers import parse_file_size, hex_to_bin from ..manifest import Manifest @@ -52,11 +51,8 @@ class ConfigMixIn: def cache_validate(section, name, value=None, check_value=True): if section not in ["cache"]: raise ValueError("Invalid section") - if name in ["previous_location"]: - if check_value: - Location(value) - else: - raise ValueError("Invalid name") + # currently, we do not support setting anything in the cache via borg config. + raise ValueError("Invalid name") def list_config(config): default_values = { diff --git a/src/borg/archiver/create_cmd.py b/src/borg/archiver/create_cmd.py index 5b0547ca0..40160f641 100644 --- a/src/borg/archiver/create_cmd.py +++ b/src/borg/archiver/create_cmd.py @@ -224,7 +224,9 @@ class CreateMixIn: manifest, progress=args.progress, lock_wait=self.lock_wait, - permit_adhoc_cache=args.no_cache_sync, + no_cache_sync_permitted=args.no_cache_sync, + no_cache_sync_forced=args.no_cache_sync_forced, + prefer_adhoc_cache=args.prefer_adhoc_cache, cache_mode=args.files_cache_mode, iec=args.iec, ) as cache: @@ -801,7 +803,19 @@ class CreateMixIn: "--no-cache-sync", dest="no_cache_sync", action="store_true", - help="experimental: do not synchronize the cache. Implies not using the files cache.", + help="experimental: do not synchronize the chunks cache.", + ) + subparser.add_argument( + "--no-cache-sync-forced", + dest="no_cache_sync_forced", + action="store_true", + help="experimental: do not synchronize the chunks cache (forced).", + ) + subparser.add_argument( + "--prefer-adhoc-cache", + dest="prefer_adhoc_cache", + action="store_true", + help="experimental: prefer AdHocCache (w/o files cache) over AdHocWithFilesCache (with files cache).", ) subparser.add_argument( "--stdin-name", diff --git a/src/borg/archiver/rinfo_cmd.py b/src/borg/archiver/rinfo_cmd.py index 221d05fc0..bba038117 100644 --- a/src/borg/archiver/rinfo_cmd.py +++ b/src/borg/archiver/rinfo_cmd.py @@ -59,16 +59,9 @@ class RInfoMixIn: output += f" out of {format_file_size(storage_quota, iec=args.iec)}" output += "\n" - output += ( - textwrap.dedent( - """ - Cache: {cache.path} - Security dir: {security_dir} - """ - ) - .strip() - .format(**info) - ) + if hasattr(info["cache"], "path"): + output += "Cache: {cache.path}\n".format(**info) + output += "Security dir: {security_dir}\n".format(**info) print(output) print(str(cache)) diff --git a/src/borg/archiver/transfer_cmd.py b/src/borg/archiver/transfer_cmd.py index 1922cf1cf..1ba8ed3c8 100644 --- a/src/borg/archiver/transfer_cmd.py +++ b/src/borg/archiver/transfer_cmd.py @@ -143,7 +143,7 @@ class TransferMixIn: transfer_size += size else: if not dry_run: - chunk_entry = cache.chunk_incref(chunk_id, archive.stats) + chunk_entry = cache.chunk_incref(chunk_id, size, archive.stats) chunks.append(chunk_entry) present_size += size if not dry_run: diff --git a/src/borg/cache.py b/src/borg/cache.py index 5394a2133..88fe32902 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -13,7 +13,6 @@ files_cache_logger = create_logger("borg.debug.files_cache") from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer -from .helpers import Location from .helpers import Error from .helpers import get_cache_dir, get_security_dir from .helpers import bin_to_hex, hex_to_bin, parse_stringified_list @@ -35,8 +34,8 @@ from .platform import SaveFile from .remote import cache_if_remote from .repository import LIST_SCAN_LIMIT -# note: cmtime might me either a ctime or a mtime timestamp -FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunk_ids") +# note: cmtime might be either a ctime or a mtime timestamp, chunks is a list of ChunkListEntry +FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunks") class SecurityManager: @@ -100,7 +99,7 @@ class SecurityManager: with SaveFile(self.manifest_ts_file) as fd: fd.write(manifest.timestamp) - def assert_location_matches(self, cache_config=None): + def assert_location_matches(self): # Warn user before sending data to a relocated repository try: with open(self.location_file) as fd: @@ -112,10 +111,6 @@ class SecurityManager: except OSError as exc: logger.warning("Could not read previous location file: %s", exc) previous_location = None - if cache_config and cache_config.previous_location and previous_location != cache_config.previous_location: - # Reconcile cache and security dir; we take the cache location. - previous_location = cache_config.previous_location - logger.debug("security: using previous_location of cache: %r", previous_location) repository_location = self.repository._location.canonical_path() if previous_location and previous_location != repository_location: @@ -134,13 +129,11 @@ class SecurityManager: ): raise Cache.RepositoryAccessAborted() # adapt on-disk config immediately if the new location was accepted - logger.debug("security: updating location stored in cache and security dir") + logger.debug("security: updating location stored in security dir") with SaveFile(self.location_file) as fd: fd.write(repository_location) - if cache_config: - cache_config.save() - def assert_no_manifest_replay(self, manifest, key, cache_config=None): + def assert_no_manifest_replay(self, manifest, key): try: with open(self.manifest_ts_file) as fd: timestamp = fd.read() @@ -151,8 +144,6 @@ class SecurityManager: except OSError as exc: logger.warning("Could not read previous location file: %s", exc) timestamp = "" - if cache_config: - timestamp = max(timestamp, cache_config.timestamp or "") logger.debug("security: determined newest manifest timestamp as %s", timestamp) # If repository is older than the cache or security dir something fishy is going on if timestamp and timestamp > manifest.timestamp: @@ -161,32 +152,22 @@ class SecurityManager: else: raise Cache.RepositoryReplay() - def assert_key_type(self, key, cache_config=None): + def assert_key_type(self, key): # Make sure an encrypted repository has not been swapped for an unencrypted repository - if cache_config and cache_config.key_type is not None and cache_config.key_type != str(key.TYPE): - raise Cache.EncryptionMethodMismatch() if self.known() and not self.key_matches(key): raise Cache.EncryptionMethodMismatch() - def assert_secure(self, manifest, key, *, cache_config=None, warn_if_unencrypted=True, lock_wait=None): + def assert_secure(self, manifest, key, *, warn_if_unencrypted=True, lock_wait=None): # warn_if_unencrypted=False is only used for initializing a new repository. # Thus, avoiding asking about a repository that's currently initializing. self.assert_access_unknown(warn_if_unencrypted, manifest, key) - if cache_config: - self._assert_secure(manifest, key, cache_config) - else: - cache_config = CacheConfig(self.repository, lock_wait=lock_wait) - if cache_config.exists(): - with cache_config: - self._assert_secure(manifest, key, cache_config) - else: - self._assert_secure(manifest, key) + self._assert_secure(manifest, key) logger.debug("security: repository checks ok, allowing access") - def _assert_secure(self, manifest, key, cache_config=None): - self.assert_location_matches(cache_config) - self.assert_key_type(key, cache_config) - self.assert_no_manifest_replay(manifest, key, cache_config) + def _assert_secure(self, manifest, key): + self.assert_location_matches() + self.assert_key_type(key) + self.assert_no_manifest_replay(manifest, key) if not self.known(): logger.debug("security: remembering previously unknown repository") self.save(manifest, key) @@ -221,42 +202,10 @@ def assert_secure(repository, manifest, lock_wait): sm.assert_secure(manifest, manifest.key, lock_wait=lock_wait) -def recanonicalize_relative_location(cache_location, repository): - # borg < 1.0.8rc1 had different canonicalization for the repo location (see #1655 and #1741). - repo_location = repository._location.canonical_path() - rl = Location(repo_location) - cl = Location(cache_location) - if ( - cl.proto == rl.proto - and cl.user == rl.user - and cl.host == rl.host - and cl.port == rl.port - and cl.path - and rl.path - and cl.path.startswith("/~/") - and rl.path.startswith("/./") - and cl.path[3:] == rl.path[3:] - ): - # everything is same except the expected change in relative path canonicalization, - # update previous_location to avoid warning / user query about changed location: - return repo_location - else: - return cache_location - - def cache_dir(repository, path=None): return path or os.path.join(get_cache_dir(), repository.id_str) -def files_cache_name(): - suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "") - return "files." + suffix if suffix else "files" - - -def discover_files_cache_name(path): - return [fn for fn in os.listdir(path) if fn == "files" or fn.startswith("files.")][0] - - class CacheConfig: def __init__(self, repository, path=None, lock_wait=None): self.repository = repository @@ -299,8 +248,6 @@ class CacheConfig: self._check_upgrade(self.config_path) self.id = self._config.get("cache", "repository") self.manifest_id = hex_to_bin(self._config.get("cache", "manifest")) - self.timestamp = self._config.get("cache", "timestamp", fallback=None) - self.key_type = self._config.get("cache", "key_type", fallback=None) self.ignored_features = set(parse_stringified_list(self._config.get("cache", "ignored_features", fallback=""))) self.mandatory_features = set( parse_stringified_list(self._config.get("cache", "mandatory_features", fallback="")) @@ -319,17 +266,10 @@ class CacheConfig: except configparser.NoSectionError: logger.debug("Cache integrity: No integrity data found (files, chunks). Cache is from old version.") self.integrity = {} - previous_location = self._config.get("cache", "previous_location", fallback=None) - if previous_location: - self.previous_location = recanonicalize_relative_location(previous_location, self.repository) - else: - self.previous_location = None - self._config.set("cache", "previous_location", self.repository._location.canonical_path()) - def save(self, manifest=None, key=None): + def save(self, manifest=None): if manifest: self._config.set("cache", "manifest", manifest.id_str) - self._config.set("cache", "timestamp", manifest.timestamp) self._config.set("cache", "ignored_features", ",".join(self.ignored_features)) self._config.set("cache", "mandatory_features", ",".join(self.mandatory_features)) if not self._config.has_section("integrity"): @@ -337,8 +277,6 @@ class CacheConfig: for file, integrity_data in self.integrity.items(): self._config.set("integrity", file, integrity_data) self._config.set("integrity", "manifest", manifest.id_str) - if key: - self._config.set("cache", "key_type", str(key.TYPE)) with SaveFile(self.config_path) as fd: self._config.write(fd) @@ -361,6 +299,10 @@ class CacheConfig: raise Exception("%s does not look like a Borg cache." % config_path) from None +def get_cache_impl(): + return os.environ.get("BORG_CACHE_IMPL", "adhocwithfiles") + + class Cache: """Client Side cache""" @@ -412,7 +354,9 @@ class Cache: warn_if_unencrypted=True, progress=False, lock_wait=None, - permit_adhoc_cache=False, + no_cache_sync_permitted=False, + no_cache_sync_forced=False, + prefer_adhoc_cache=False, cache_mode=FILES_CACHE_MODE_DISABLED, iec=False, ): @@ -428,14 +372,37 @@ class Cache: cache_mode=cache_mode, ) + def adhocwithfiles(): + return AdHocWithFilesCache( + manifest=manifest, + path=path, + warn_if_unencrypted=warn_if_unencrypted, + progress=progress, + iec=iec, + lock_wait=lock_wait, + cache_mode=cache_mode, + ) + def adhoc(): return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec) - if not permit_adhoc_cache: + impl = get_cache_impl() + if impl != "cli": + methods = dict(local=local, adhocwithfiles=adhocwithfiles, adhoc=adhoc) + try: + method = methods[impl] + except KeyError: + raise RuntimeError("Unknown BORG_CACHE_IMPL value: %s" % impl) + return method() + + if no_cache_sync_forced: + return adhoc() if prefer_adhoc_cache else adhocwithfiles() + + if not no_cache_sync_permitted: return local() - # ad-hoc cache may be permitted, but if the local cache is in sync it'd be stupid to invalidate - # it by needlessly using the ad-hoc cache. + # no cache sync may be permitted, but if the local cache is in sync it'd be stupid to invalidate + # it by needlessly using the AdHocCache or the AdHocWithFilesCache. # Check if the local cache exists and is in sync. cache_config = CacheConfig(repository, path, lock_wait) @@ -447,8 +414,12 @@ class Cache: # Local cache is in sync, use it logger.debug("Cache: choosing local cache (in sync)") return local() - logger.debug("Cache: choosing ad-hoc cache (local cache does not exist or is not in sync)") - return adhoc() + if prefer_adhoc_cache: # adhoc cache, without files cache + logger.debug("Cache: choosing AdHocCache (local cache does not exist or is not in sync)") + return adhoc() + else: + logger.debug("Cache: choosing AdHocWithFilesCache (local cache does not exist or is not in sync)") + return adhocwithfiles() class CacheStatsMixin: @@ -470,6 +441,9 @@ Total chunks: {0.total_chunks} def stats(self): from .archive import Archive + if isinstance(self, AdHocCache) and getattr(self, "chunks", None) is None: + self.chunks = self._load_chunks_from_repo() # AdHocCache usually only has .chunks after begin_txn. + # XXX: this should really be moved down to `hashindex.pyx` total_size, unique_size, total_unique_chunks, total_chunks = self.chunks.summarize() # since borg 1.2 we have new archive metadata telling the total size per archive, @@ -489,7 +463,303 @@ Total chunks: {0.total_chunks} return self.Summary(**stats) -class LocalCache(CacheStatsMixin): +class FilesCacheMixin: + """ + Massively accelerate processing of unchanged files by caching their chunks list. + With that, we can avoid having to read and chunk them to get their chunks list. + """ + + FILES_CACHE_NAME = "files" + + def __init__(self, cache_mode): + self.cache_mode = cache_mode + self.files = None + self._newest_cmtime = None + + def files_cache_name(self): + suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "") + return self.FILES_CACHE_NAME + "." + suffix if suffix else self.FILES_CACHE_NAME + + def discover_files_cache_name(self, path): + return [ + fn for fn in os.listdir(path) if fn == self.FILES_CACHE_NAME or fn.startswith(self.FILES_CACHE_NAME + ".") + ][0] + + def _create_empty_files_cache(self, path): + with IntegrityCheckedFile(path=os.path.join(path, self.files_cache_name()), write=True) as fd: + pass # empty file + return fd.integrity_data + + def _read_files_cache(self): + if "d" in self.cache_mode: # d(isabled) + return + + self.files = {} + logger.debug("Reading files cache ...") + files_cache_logger.debug("FILES-CACHE-LOAD: starting...") + msg = None + try: + with IntegrityCheckedFile( + path=os.path.join(self.path, self.files_cache_name()), + write=False, + integrity_data=self.cache_config.integrity.get(self.files_cache_name()), + ) as fd: + u = msgpack.Unpacker(use_list=True) + while True: + data = fd.read(64 * 1024) + if not data: + break + u.feed(data) + try: + for path_hash, item in u: + entry = FileCacheEntry(*item) + # in the end, this takes about 240 Bytes per file + self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1)) + except (TypeError, ValueError) as exc: + msg = "The files cache seems invalid. [%s]" % str(exc) + break + except OSError as exc: + msg = "The files cache can't be read. [%s]" % str(exc) + except FileIntegrityError as fie: + msg = "The files cache is corrupted. [%s]" % str(fie) + if msg is not None: + logger.warning(msg) + logger.warning("Continuing without files cache - expect lower performance.") + self.files = {} + files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files)) + + def _write_files_cache(self): + if self._newest_cmtime is None: + # was never set because no files were modified/added + self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262 + ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20)) + files_cache_logger.debug("FILES-CACHE-SAVE: starting...") + with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd: + entry_count = 0 + for path_hash, item in self.files.items(): + # Only keep files seen in this backup that are older than newest cmtime seen in this backup - + # this is to avoid issues with filesystem snapshots and cmtime granularity. + # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet. + entry = FileCacheEntry(*msgpack.unpackb(item)) + if ( + entry.age == 0 + and timestamp_to_int(entry.cmtime) < self._newest_cmtime + or entry.age > 0 + and entry.age < ttl + ): + msgpack.pack((path_hash, entry), fd) + entry_count += 1 + files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl) + files_cache_logger.debug( + "FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime + ) + files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count) + return fd.integrity_data + + def file_known_and_unchanged(self, hashed_path, path_hash, st): + """ + Check if we know the file that has this path_hash (know == it is in our files cache) and + whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode). + + :param hashed_path: the file's path as we gave it to hash(hashed_path) + :param path_hash: hash(hashed_path), to save some memory in the files cache + :param st: the file's stat() result + :return: known, chunks (known is True if we have infos about this file in the cache, + chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None). + """ + if not stat.S_ISREG(st.st_mode): + return False, None + cache_mode = self.cache_mode + if "d" in cache_mode: # d(isabled) + files_cache_logger.debug("UNKNOWN: files cache disabled") + return False, None + # note: r(echunk) does not need the files cache in this method, but the files cache will + # be updated and saved to disk to memorize the files. To preserve previous generations in + # the cache, this means that it also needs to get loaded from disk first. + if "r" in cache_mode: # r(echunk) + files_cache_logger.debug("UNKNOWN: rechunking enforced") + return False, None + entry = self.files.get(path_hash) + if not entry: + files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path) + return False, None + # we know the file! + entry = FileCacheEntry(*msgpack.unpackb(entry)) + if "s" in cache_mode and entry.size != st.st_size: + files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path) + return True, None + if "i" in cache_mode and entry.inode != st.st_ino: + files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path) + return True, None + if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns: + files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path) + return True, None + elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns: + files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path) + return True, None + # we ignored the inode number in the comparison above or it is still same. + # if it is still the same, replacing it in the tuple doesn't change it. + # if we ignored it, a reason for doing that is that files were moved to a new + # disk / new fs (so a one-time change of inode number is expected) and we wanted + # to avoid everything getting chunked again. to be able to re-enable the inode + # number comparison in a future backup run (and avoid chunking everything + # again at that time), we need to update the inode number in the cache with what + # we see in the filesystem. + self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0)) + chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple + return True, chunks + + def memorize_file(self, hashed_path, path_hash, st, chunks): + if not stat.S_ISREG(st.st_mode): + return + cache_mode = self.cache_mode + # note: r(echunk) modes will update the files cache, d(isabled) mode won't + if "d" in cache_mode: + files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled") + return + if "c" in cache_mode: + cmtime_type = "ctime" + cmtime_ns = safe_ns(st.st_ctime_ns) + elif "m" in cache_mode: + cmtime_type = "mtime" + cmtime_ns = safe_ns(st.st_mtime_ns) + else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError + cmtime_type = "ctime" + cmtime_ns = safe_ns(st.st_ctime_ns) + entry = FileCacheEntry( + age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks + ) + self.files[path_hash] = msgpack.packb(entry) + self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns) + files_cache_logger.debug( + "FILES-CACHE-UPDATE: put %r [has %s] <- %r", + entry._replace(chunks="[%d entries]" % len(entry.chunks)), + cmtime_type, + hashed_path, + ) + + +class ChunksMixin: + """ + Chunks index related code for misc. Cache implementations. + """ + + def chunk_incref(self, id, size, stats): + assert isinstance(size, int) and size > 0 + if not self._txn_active: + self.begin_txn() + count, _size = self.chunks.incref(id) + stats.update(size, False) + return ChunkListEntry(id, size) + + def chunk_decref(self, id, size, stats, wait=True): + assert isinstance(size, int) and size > 0 + if not self._txn_active: + self.begin_txn() + count, _size = self.chunks.decref(id) + if count == 0: + del self.chunks[id] + self.repository.delete(id, wait=wait) + stats.update(-size, True) + else: + stats.update(-size, False) + + def seen_chunk(self, id, size=None): + if not self._txn_active: + self.begin_txn() + entry = self.chunks.get(id, ChunkIndexEntry(0, None)) + if entry.refcount and size is not None: + assert isinstance(entry.size, int) + if entry.size: + # LocalCache: has existing size information and uses *size* to make an effort at detecting collisions. + if size != entry.size: + # we already have a chunk with that id, but different size. + # this is either a hash collision (unlikely) or corruption or a bug. + raise Exception( + "chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, entry.size, size) + ) + else: + # AdHocWithFilesCache / AdHocCache: + # Here *size* is used to update the chunk's size information, which will be zero for existing chunks. + self.chunks[id] = entry._replace(size=size) + return entry.refcount + + def add_chunk( + self, + id, + meta, + data, + *, + stats, + wait=True, + compress=True, + size=None, + ctype=None, + clevel=None, + ro_type=ROBJ_FILE_STREAM, + ): + assert ro_type is not None + if not self._txn_active: + self.begin_txn() + if size is None: + if compress: + size = len(data) # data is still uncompressed + else: + raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") + refcount = self.seen_chunk(id, size) + if refcount: + return self.chunk_incref(id, size, stats) + cdata = self.repo_objs.format( + id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type + ) + self.repository.put(id, cdata, wait=wait) + self.chunks.add(id, 1, size) + stats.update(size, not refcount) + return ChunkListEntry(id, size) + + def _load_chunks_from_repo(self): + # Explicitly set the initial usable hash table capacity to avoid performance issues + # due to hash table "resonance". + # Since we're creating an archive, add 10 % from the start. + num_chunks = len(self.repository) + chunks = ChunkIndex(usable=num_chunks * 1.1) + pi = ProgressIndicatorPercent( + total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks" + ) + t0 = perf_counter() + num_requests = 0 + marker = None + while True: + result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) + num_requests += 1 + if not result: + break + pi.show(increase=len(result)) + marker = result[-1] + # All chunks from the repository have a refcount of MAX_VALUE, which is sticky, + # therefore we can't/won't delete them. Chunks we added ourselves in this transaction + # (e.g. checkpoint archives) are tracked correctly. + init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) + for id_ in result: + chunks[id_] = init_entry + assert len(chunks) == num_chunks + # LocalCache does not contain the manifest, either. + del chunks[self.manifest.MANIFEST_ID] + duration = perf_counter() - t0 or 0.01 + pi.finish() + logger.debug( + "Cache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s", + num_chunks, + duration, + num_requests, + format_file_size(num_chunks * 34 / duration), + ) + # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes. + # Protocol overhead is neglected in this calculation. + return chunks + + +class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): """ Persistent, local (client-side) cache. """ @@ -512,15 +782,14 @@ class LocalCache(CacheStatsMixin): :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison """ CacheStatsMixin.__init__(self, iec=iec) + FilesCacheMixin.__init__(self, cache_mode) assert isinstance(manifest, Manifest) self.manifest = manifest self.repository = manifest.repository self.key = manifest.key self.repo_objs = manifest.repo_objs self.progress = progress - self.cache_mode = cache_mode - self.timestamp = None - self.txn_active = False + self._txn_active = False self.do_cache = os.environ.get("BORG_USE_CHUNKS_ARCHIVE", "yes").lower() in ["yes", "1", "true"] self.path = cache_dir(self.repository, path) @@ -539,7 +808,7 @@ class LocalCache(CacheStatsMixin): self.open() try: - self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config) + self.security_manager.assert_secure(manifest, self.key) if not self.check_cache_compatibility(): self.wipe_cache() @@ -567,8 +836,7 @@ class LocalCache(CacheStatsMixin): self.cache_config.create() ChunkIndex().write(os.path.join(self.path, "chunks")) os.makedirs(os.path.join(self.path, "chunks.archive.d")) - with SaveFile(os.path.join(self.path, files_cache_name()), binary=True): - pass # empty file + self._create_empty_files_cache(self.path) def _do_open(self): self.cache_config.load() @@ -578,10 +846,7 @@ class LocalCache(CacheStatsMixin): integrity_data=self.cache_config.integrity.get("chunks"), ) as fd: self.chunks = ChunkIndex.read(fd) - if "d" in self.cache_mode: # d(isabled) - self.files = None - else: - self._read_files() + self._read_files_cache() def open(self): if not os.path.isdir(self.path): @@ -594,42 +859,6 @@ class LocalCache(CacheStatsMixin): self.cache_config.close() self.cache_config = None - def _read_files(self): - self.files = {} - self._newest_cmtime = None - logger.debug("Reading files cache ...") - files_cache_logger.debug("FILES-CACHE-LOAD: starting...") - msg = None - try: - with IntegrityCheckedFile( - path=os.path.join(self.path, files_cache_name()), - write=False, - integrity_data=self.cache_config.integrity.get(files_cache_name()), - ) as fd: - u = msgpack.Unpacker(use_list=True) - while True: - data = fd.read(64 * 1024) - if not data: - break - u.feed(data) - try: - for path_hash, item in u: - entry = FileCacheEntry(*item) - # in the end, this takes about 240 Bytes per file - self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1)) - except (TypeError, ValueError) as exc: - msg = "The files cache seems invalid. [%s]" % str(exc) - break - except OSError as exc: - msg = "The files cache can't be read. [%s]" % str(exc) - except FileIntegrityError as fie: - msg = "The files cache is corrupted. [%s]" % str(fie) - if msg is not None: - logger.warning(msg) - logger.warning("Continuing without files cache - expect lower performance.") - self.files = {} - files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files)) - def begin_txn(self): # Initialize transaction snapshot pi = ProgressIndicatorMessage(msgid="cache.begin_transaction") @@ -641,57 +870,32 @@ class LocalCache(CacheStatsMixin): shutil.copy(os.path.join(self.path, "chunks"), txn_dir) pi.output("Initializing cache transaction: Reading files") try: - shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir) + shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir) except FileNotFoundError: - with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True): - pass # empty file + self._create_empty_files_cache(txn_dir) os.replace(txn_dir, os.path.join(self.path, "txn.active")) - self.txn_active = True + self._txn_active = True pi.finish() def commit(self): """Commit transaction""" - if not self.txn_active: + if not self._txn_active: return self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.commit") if self.files is not None: - if self._newest_cmtime is None: - # was never set because no files were modified/added - self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262 - ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20)) pi.output("Saving files cache") - files_cache_logger.debug("FILES-CACHE-SAVE: starting...") - with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd: - entry_count = 0 - for path_hash, item in self.files.items(): - # Only keep files seen in this backup that are older than newest cmtime seen in this backup - - # this is to avoid issues with filesystem snapshots and cmtime granularity. - # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet. - entry = FileCacheEntry(*msgpack.unpackb(item)) - if ( - entry.age == 0 - and timestamp_to_int(entry.cmtime) < self._newest_cmtime - or entry.age > 0 - and entry.age < ttl - ): - msgpack.pack((path_hash, entry), fd) - entry_count += 1 - files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl) - files_cache_logger.debug( - "FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime - ) - files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count) - self.cache_config.integrity[files_cache_name()] = fd.integrity_data + integrity_data = self._write_files_cache() + self.cache_config.integrity[self.files_cache_name()] = integrity_data pi.output("Saving chunks cache") with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd: self.chunks.write(fd) self.cache_config.integrity["chunks"] = fd.integrity_data pi.output("Saving cache config") - self.cache_config.save(self.manifest, self.key) + self.cache_config.save(self.manifest) os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp")) shutil.rmtree(os.path.join(self.path, "txn.tmp")) - self.txn_active = False + self._txn_active = False pi.finish() def rollback(self): @@ -704,12 +908,12 @@ class LocalCache(CacheStatsMixin): if os.path.exists(txn_dir): shutil.copy(os.path.join(txn_dir, "config"), self.path) shutil.copy(os.path.join(txn_dir, "chunks"), self.path) - shutil.copy(os.path.join(txn_dir, discover_files_cache_name(txn_dir)), self.path) + shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path) txn_tmp = os.path.join(self.path, "txn.tmp") os.replace(txn_dir, txn_tmp) if os.path.exists(txn_tmp): shutil.rmtree(txn_tmp) - self.txn_active = False + self._txn_active = False self._do_open() def sync(self): @@ -935,9 +1139,8 @@ class LocalCache(CacheStatsMixin): with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd: self.chunks.write(fd) self.cache_config.integrity["chunks"] = fd.integrity_data - with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd: - pass # empty file - self.cache_config.integrity[files_cache_name()] = fd.integrity_data + integrity_data = self._create_empty_files_cache(self.path) + self.cache_config.integrity[self.files_cache_name()] = integrity_data self.cache_config.manifest_id = "" self.cache_config._config.set("cache", "manifest", "") if not self.cache_config._config.has_section("integrity"): @@ -962,149 +1165,170 @@ class LocalCache(CacheStatsMixin): self.cache_config.ignored_features.update(repo_features - my_features) self.cache_config.mandatory_features.update(repo_features & my_features) - def add_chunk( + +class AdHocWithFilesCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin): + """ + Like AdHocCache, but with a files cache. + """ + + def __init__( self, - id, - meta, - data, - *, - stats, - wait=True, - compress=True, - size=None, - ctype=None, - clevel=None, - ro_type=ROBJ_FILE_STREAM, + manifest, + path=None, + warn_if_unencrypted=True, + progress=False, + lock_wait=None, + cache_mode=FILES_CACHE_MODE_DISABLED, + iec=False, ): - assert ro_type is not None - if not self.txn_active: - self.begin_txn() - if size is None and compress: - size = len(data) # data is still uncompressed - refcount = self.seen_chunk(id, size) - if refcount: - return self.chunk_incref(id, stats) - if size is None: - raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also") - cdata = self.repo_objs.format( - id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type - ) - self.repository.put(id, cdata, wait=wait) - self.chunks.add(id, 1, size) - stats.update(size, not refcount) - return ChunkListEntry(id, size) - - def seen_chunk(self, id, size=None): - refcount, stored_size = self.chunks.get(id, ChunkIndexEntry(0, None)) - if size is not None and stored_size is not None and size != stored_size: - # we already have a chunk with that id, but different size. - # this is either a hash collision (unlikely) or corruption or a bug. - raise Exception( - "chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, stored_size, size) - ) - return refcount - - def chunk_incref(self, id, stats, size=None): - if not self.txn_active: - self.begin_txn() - count, _size = self.chunks.incref(id) - stats.update(_size, False) - return ChunkListEntry(id, _size) - - def chunk_decref(self, id, stats, wait=True): - if not self.txn_active: - self.begin_txn() - count, size = self.chunks.decref(id) - if count == 0: - del self.chunks[id] - self.repository.delete(id, wait=wait) - stats.update(-size, True) - else: - stats.update(-size, False) - - def file_known_and_unchanged(self, hashed_path, path_hash, st): """ - Check if we know the file that has this path_hash (know == it is in our files cache) and - whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode). - - :param hashed_path: the file's path as we gave it to hash(hashed_path) - :param path_hash: hash(hashed_path), to save some memory in the files cache - :param st: the file's stat() result - :return: known, ids (known is True if we have infos about this file in the cache, - ids is the list of chunk ids IF the file has not changed, otherwise None). + :param warn_if_unencrypted: print warning if accessing unknown unencrypted repository + :param lock_wait: timeout for lock acquisition (int [s] or None [wait forever]) + :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison """ - if not stat.S_ISREG(st.st_mode): - return False, None - cache_mode = self.cache_mode - if "d" in cache_mode: # d(isabled) - files_cache_logger.debug("UNKNOWN: files cache disabled") - return False, None - # note: r(echunk) does not need the files cache in this method, but the files cache will - # be updated and saved to disk to memorize the files. To preserve previous generations in - # the cache, this means that it also needs to get loaded from disk first. - if "r" in cache_mode: # r(echunk) - files_cache_logger.debug("UNKNOWN: rechunking enforced") - return False, None - entry = self.files.get(path_hash) - if not entry: - files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path) - return False, None - # we know the file! - entry = FileCacheEntry(*msgpack.unpackb(entry)) - if "s" in cache_mode and entry.size != st.st_size: - files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path) - return True, None - if "i" in cache_mode and entry.inode != st.st_ino: - files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path) - return True, None - if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns: - files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path) - return True, None - elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns: - files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path) - return True, None - # we ignored the inode number in the comparison above or it is still same. - # if it is still the same, replacing it in the tuple doesn't change it. - # if we ignored it, a reason for doing that is that files were moved to a new - # disk / new fs (so a one-time change of inode number is expected) and we wanted - # to avoid everything getting chunked again. to be able to re-enable the inode - # number comparison in a future backup run (and avoid chunking everything - # again at that time), we need to update the inode number in the cache with what - # we see in the filesystem. - self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0)) - return True, entry.chunk_ids + CacheStatsMixin.__init__(self, iec=iec) + FilesCacheMixin.__init__(self, cache_mode) + assert isinstance(manifest, Manifest) + self.manifest = manifest + self.repository = manifest.repository + self.key = manifest.key + self.repo_objs = manifest.repo_objs + self.progress = progress + self._txn_active = False - def memorize_file(self, hashed_path, path_hash, st, ids): - if not stat.S_ISREG(st.st_mode): + self.path = cache_dir(self.repository, path) + self.security_manager = SecurityManager(self.repository) + self.cache_config = CacheConfig(self.repository, self.path, lock_wait) + + # Warn user before sending data to a never seen before unencrypted repository + if not os.path.exists(self.path): + self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key) + self.create() + + self.open() + try: + self.security_manager.assert_secure(manifest, self.key) + + if not self.check_cache_compatibility(): + self.wipe_cache() + + self.update_compatibility() + except: # noqa + self.close() + raise + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def create(self): + """Create a new empty cache at `self.path`""" + os.makedirs(self.path) + with open(os.path.join(self.path, "README"), "w") as fd: + fd.write(CACHE_README) + self.cache_config.create() + self._create_empty_files_cache(self.path) + + def _do_open(self): + self.cache_config.load() + self.chunks = self._load_chunks_from_repo() + self._read_files_cache() + + def open(self): + if not os.path.isdir(self.path): + raise Exception("%s Does not look like a Borg cache" % self.path) + self.cache_config.open() + self.rollback() + + def close(self): + if self.cache_config is not None: + self.cache_config.close() + self.cache_config = None + + def begin_txn(self): + # Initialize transaction snapshot + pi = ProgressIndicatorMessage(msgid="cache.begin_transaction") + txn_dir = os.path.join(self.path, "txn.tmp") + os.mkdir(txn_dir) + pi.output("Initializing cache transaction: Reading config") + shutil.copy(os.path.join(self.path, "config"), txn_dir) + pi.output("Initializing cache transaction: Reading files") + try: + shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir) + except FileNotFoundError: + self._create_empty_files_cache(txn_dir) + os.replace(txn_dir, os.path.join(self.path, "txn.active")) + pi.finish() + self._txn_active = True + + def commit(self): + if not self._txn_active: return - cache_mode = self.cache_mode - # note: r(echunk) modes will update the files cache, d(isabled) mode won't - if "d" in cache_mode: - files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled") - return - if "c" in cache_mode: - cmtime_type = "ctime" - cmtime_ns = safe_ns(st.st_ctime_ns) - elif "m" in cache_mode: - cmtime_type = "mtime" - cmtime_ns = safe_ns(st.st_mtime_ns) - else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError - cmtime_type = "ctime" - cmtime_ns = safe_ns(st.st_ctime_ns) - entry = FileCacheEntry( - age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunk_ids=ids - ) - self.files[path_hash] = msgpack.packb(entry) - self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns) - files_cache_logger.debug( - "FILES-CACHE-UPDATE: put %r [has %s] <- %r", - entry._replace(chunk_ids="[%d entries]" % len(entry.chunk_ids)), - cmtime_type, - hashed_path, - ) + self.security_manager.save(self.manifest, self.key) + pi = ProgressIndicatorMessage(msgid="cache.commit") + if self.files is not None: + pi.output("Saving files cache") + integrity_data = self._write_files_cache() + self.cache_config.integrity[self.files_cache_name()] = integrity_data + pi.output("Saving cache config") + self.cache_config.save(self.manifest) + os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp")) + shutil.rmtree(os.path.join(self.path, "txn.tmp")) + self._txn_active = False + pi.finish() + + def rollback(self): + # Remove partial transaction + if os.path.exists(os.path.join(self.path, "txn.tmp")): + shutil.rmtree(os.path.join(self.path, "txn.tmp")) + # Roll back active transaction + txn_dir = os.path.join(self.path, "txn.active") + if os.path.exists(txn_dir): + shutil.copy(os.path.join(txn_dir, "config"), self.path) + shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path) + txn_tmp = os.path.join(self.path, "txn.tmp") + os.replace(txn_dir, txn_tmp) + if os.path.exists(txn_tmp): + shutil.rmtree(txn_tmp) + self._txn_active = False + self._do_open() + + def check_cache_compatibility(self): + my_features = Manifest.SUPPORTED_REPO_FEATURES + if self.cache_config.ignored_features & my_features: + # The cache might not contain references of chunks that need a feature that is mandatory for some operation + # and which this version supports. To avoid corruption while executing that operation force rebuild. + return False + if not self.cache_config.mandatory_features <= my_features: + # The cache was build with consideration to at least one feature that this version does not understand. + # This client might misinterpret the cache. Thus force a rebuild. + return False + return True + + def wipe_cache(self): + logger.warning("Discarding incompatible cache and forcing a cache rebuild") + self.chunks = ChunkIndex() + self._create_empty_files_cache(self.path) + self.cache_config.manifest_id = "" + self.cache_config._config.set("cache", "manifest", "") + + self.cache_config.ignored_features = set() + self.cache_config.mandatory_features = set() + + def update_compatibility(self): + operation_to_features_map = self.manifest.get_all_mandatory_features() + my_features = Manifest.SUPPORTED_REPO_FEATURES + repo_features = set() + for operation, features in operation_to_features_map.items(): + repo_features.update(features) + + self.cache_config.ignored_features.update(repo_features - my_features) + self.cache_config.mandatory_features.update(repo_features & my_features) -class AdHocCache(CacheStatsMixin): +class AdHocCache(CacheStatsMixin, ChunksMixin): """ Ad-hoc, non-persistent cache. @@ -1132,8 +1356,6 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" self.security_manager = SecurityManager(self.repository) self.security_manager.assert_secure(manifest, self.key, lock_wait=lock_wait) - logger.warning("Note: --no-cache-sync is an experimental feature.") - # Public API def __enter__(self): @@ -1149,59 +1371,9 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" files_cache_logger.debug("UNKNOWN: files cache not implemented") return False, None - def memorize_file(self, hashed_path, path_hash, st, ids): + def memorize_file(self, hashed_path, path_hash, st, chunks): pass - def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None, ro_type=ROBJ_FILE_STREAM): - assert ro_type is not None - if not self._txn_active: - self.begin_txn() - if size is None and compress: - size = len(data) # data is still uncompressed - if size is None: - raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") - refcount = self.seen_chunk(id, size) - if refcount: - return self.chunk_incref(id, stats, size=size) - cdata = self.repo_objs.format(id, meta, data, compress=compress, ro_type=ro_type) - self.repository.put(id, cdata, wait=wait) - self.chunks.add(id, 1, size) - stats.update(size, not refcount) - return ChunkListEntry(id, size) - - def seen_chunk(self, id, size=None): - if not self._txn_active: - self.begin_txn() - entry = self.chunks.get(id, ChunkIndexEntry(0, None)) - if entry.refcount and size and not entry.size: - # The LocalCache has existing size information and uses *size* to make an effort at detecting collisions. - # This is of course not possible for the AdHocCache. - # Here *size* is used to update the chunk's size information, which will be zero for existing chunks. - self.chunks[id] = entry._replace(size=size) - return entry.refcount - - def chunk_incref(self, id, stats, size=None): - if not self._txn_active: - self.begin_txn() - count, _size = self.chunks.incref(id) - # When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with - # size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to. - size = _size or size - assert size - stats.update(size, False) - return ChunkListEntry(id, size) - - def chunk_decref(self, id, stats, wait=True): - if not self._txn_active: - self.begin_txn() - count, size = self.chunks.decref(id) - if count == 0: - del self.chunks[id] - self.repository.delete(id, wait=wait) - stats.update(-size, True) - else: - stats.update(-size, False) - def commit(self): if not self._txn_active: return @@ -1214,41 +1386,4 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" def begin_txn(self): self._txn_active = True - # Explicitly set the initial usable hash table capacity to avoid performance issues - # due to hash table "resonance". - # Since we're creating an archive, add 10 % from the start. - num_chunks = len(self.repository) - self.chunks = ChunkIndex(usable=num_chunks * 1.1) - pi = ProgressIndicatorPercent( - total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks" - ) - t0 = perf_counter() - num_requests = 0 - marker = None - while True: - result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - num_requests += 1 - if not result: - break - pi.show(increase=len(result)) - marker = result[-1] - # All chunks from the repository have a refcount of MAX_VALUE, which is sticky, - # therefore we can't/won't delete them. Chunks we added ourselves in this transaction - # (e.g. checkpoint archives) are tracked correctly. - init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) - for id_ in result: - self.chunks[id_] = init_entry - assert len(self.chunks) == num_chunks - # LocalCache does not contain the manifest, either. - del self.chunks[self.manifest.MANIFEST_ID] - duration = perf_counter() - t0 or 0.01 - pi.finish() - logger.debug( - "AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s", - num_chunks, - duration, - num_requests, - format_file_size(num_chunks * 34 / duration), - ) - # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes. - # Protocol overhead is neglected in this calculation. + self.chunks = self._load_chunks_from_repo() diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index c82769730..c69889b18 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -1184,13 +1184,13 @@ class BorgJsonEncoder(json.JSONEncoder): from ..repository import Repository from ..remote import RemoteRepository from ..archive import Archive - from ..cache import LocalCache, AdHocCache + from ..cache import LocalCache, AdHocCache, AdHocWithFilesCache if isinstance(o, Repository) or isinstance(o, RemoteRepository): return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()} if isinstance(o, Archive): return o.info() - if isinstance(o, LocalCache): + if isinstance(o, (LocalCache, AdHocWithFilesCache)): return {"path": o.path, "stats": o.stats()} if isinstance(o, AdHocCache): return {"stats": o.stats()} diff --git a/src/borg/testsuite/archiver/__init__.py b/src/borg/testsuite/archiver/__init__.py index bb5a7e169..9d7a5db42 100644 --- a/src/borg/testsuite/archiver/__init__.py +++ b/src/borg/testsuite/archiver/__init__.py @@ -18,7 +18,7 @@ import pytest from ... import xattr, platform from ...archive import Archive from ...archiver import Archiver, PURE_PYTHON_MSGPACK_WARNING -from ...cache import Cache +from ...cache import Cache, LocalCache from ...constants import * # NOQA from ...helpers import Location, umount from ...helpers import EXIT_SUCCESS @@ -356,9 +356,15 @@ def check_cache(archiver): manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) with Cache(repository, manifest, sync=False) as cache: original_chunks = cache.chunks + # the LocalCache implementation has an on-disk chunks cache, + # but AdHocWithFilesCache and AdHocCache don't have persistent chunks cache. + persistent = isinstance(cache, LocalCache) Cache.destroy(repository) with Cache(repository, manifest) as cache: correct_chunks = cache.chunks + if not persistent: + # there is no point in doing the checks + return assert original_chunks is not correct_chunks seen = set() for id, (refcount, size) in correct_chunks.iteritems(): diff --git a/src/borg/testsuite/archiver/check_cmd.py b/src/borg/testsuite/archiver/check_cmd.py index cf4c6baf2..16b768073 100644 --- a/src/borg/testsuite/archiver/check_cmd.py +++ b/src/borg/testsuite/archiver/check_cmd.py @@ -338,10 +338,11 @@ def test_extra_chunks(archivers, request): with Repository(archiver.repository_location, exclusive=True) as repository: repository.put(b"01234567890123456789012345678901", b"xxxx") repository.commit(compact=False) - cmd(archiver, "check", exit_code=1) - cmd(archiver, "check", exit_code=1) + output = cmd(archiver, "check", "-v", exit_code=0) # orphans are not considered warnings anymore + assert "1 orphaned (unused) objects found." in output cmd(archiver, "check", "--repair", exit_code=0) - cmd(archiver, "check", exit_code=0) + output = cmd(archiver, "check", "-v", exit_code=0) + assert "orphaned (unused) objects found." not in output cmd(archiver, "extract", "archive1", "--dry-run", exit_code=0) diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index 51b3fe60e..a9324fbdf 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -4,7 +4,7 @@ from unittest.mock import patch import pytest -from ...cache import Cache, LocalCache +from ...cache import Cache, LocalCache, get_cache_impl from ...constants import * # NOQA from ...helpers import Location, get_security_dir, bin_to_hex from ...helpers import EXIT_ERROR @@ -153,32 +153,29 @@ def test_repository_move(archivers, request, monkeypatch): security_dir = get_security_directory(archiver.repository_path) os.replace(archiver.repository_path, archiver.repository_path + "_new") archiver.repository_location += "_new" + # borg should notice that the repository location changed and abort. + if archiver.FORK_DEFAULT: + cmd(archiver, "rinfo", exit_code=EXIT_ERROR) + else: + with pytest.raises(Cache.RepositoryAccessAborted): + cmd(archiver, "rinfo") + # if we explicitly allow relocated repos, it should work fine. monkeypatch.setenv("BORG_RELOCATED_REPO_ACCESS_IS_OK", "yes") cmd(archiver, "rinfo") monkeypatch.delenv("BORG_RELOCATED_REPO_ACCESS_IS_OK") with open(os.path.join(security_dir, "location")) as fd: location = fd.read() assert location == Location(archiver.repository_location).canonical_path() - # Needs no confirmation anymore - cmd(archiver, "rinfo") - shutil.rmtree(archiver.cache_path) + # after new repo location was confirmed once, it needs no further confirmation anymore. cmd(archiver, "rinfo") shutil.rmtree(security_dir) + # it also needs no confirmation if we have no knowledge about the previous location. cmd(archiver, "rinfo") + # it will re-create security-related infos in the security dir: for file in ("location", "key-type", "manifest-timestamp"): assert os.path.exists(os.path.join(security_dir, file)) -def test_security_dir_compat(archivers, request): - archiver = request.getfixturevalue(archivers) - cmd(archiver, "rcreate", RK_ENCRYPTION) - with open(os.path.join(get_security_directory(archiver.repository_path), "location"), "w") as fd: - fd.write("something outdated") - # This is fine, because the cache still has the correct information. security_dir and cache can disagree - # if older versions are used to confirm a renamed repository. - cmd(archiver, "rinfo") - - def test_unknown_unencrypted(archivers, request, monkeypatch): archiver = request.getfixturevalue(archivers) cmd(archiver, "rcreate", "--encryption=none") @@ -207,9 +204,12 @@ def test_unknown_feature_on_create(archivers, request): cmd_raises_unknown_feature(archiver, ["create", "test", "input"]) +@pytest.mark.skipif(get_cache_impl() in ("adhocwithfiles", "adhoc"), reason="only works with LocalCache") def test_unknown_feature_on_cache_sync(archivers, request): + # LocalCache.sync checks repo compat archiver = request.getfixturevalue(archivers) cmd(archiver, "rcreate", RK_ENCRYPTION) + # delete the cache to trigger a cache sync later in borg create cmd(archiver, "rdelete", "--cache-only") add_unknown_feature(archiver.repository_path, Manifest.Operation.READ) cmd_raises_unknown_feature(archiver, ["create", "test", "input"]) @@ -277,6 +277,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request): repository._location = Location(archiver.repository_location) manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) with Cache(repository, manifest) as cache: + is_localcache = isinstance(cache, LocalCache) cache.begin_txn() cache.cache_config.mandatory_features = {"unknown-feature"} cache.commit() @@ -295,7 +296,8 @@ def test_unknown_mandatory_feature_in_cache(archivers, request): with patch.object(LocalCache, "wipe_cache", wipe_wrapper): cmd(archiver, "create", "test", "input") - assert called + if is_localcache: + assert called with Repository(archiver.repository_path, exclusive=True) as repository: if remote_repo: @@ -315,10 +317,14 @@ def test_check_cache(archivers, request): cache.begin_txn() cache.chunks.incref(list(cache.chunks.iteritems())[0][0]) cache.commit() + persistent = isinstance(cache, LocalCache) + if not persistent: + pytest.skip("check_cache is pointless if we do not have a persistent chunks cache") with pytest.raises(AssertionError): check_cache(archiver) +@pytest.mark.skipif(get_cache_impl() in ("adhocwithfiles", "adhoc"), reason="only works with LocalCache") def test_env_use_chunks_archive(archivers, request, monkeypatch): archiver = request.getfixturevalue(archivers) create_test_files(archiver.input_path) diff --git a/src/borg/testsuite/archiver/corruption.py b/src/borg/testsuite/archiver/corruption.py index cb8b55417..65804eaca 100644 --- a/src/borg/testsuite/archiver/corruption.py +++ b/src/borg/testsuite/archiver/corruption.py @@ -34,7 +34,7 @@ def test_check_corrupted_repository(archiver): def corrupt_archiver(archiver): create_test_files(archiver.input_path) cmd(archiver, "rcreate", RK_ENCRYPTION) - archiver.cache_path = json.loads(cmd(archiver, "rinfo", "--json"))["cache"]["path"] + archiver.cache_path = json.loads(cmd(archiver, "rinfo", "--json"))["cache"].get("path") def corrupt(file, amount=1): @@ -48,9 +48,16 @@ def corrupt(file, amount=1): @pytest.mark.allow_cache_wipe def test_cache_chunks(archiver): corrupt_archiver(archiver) + if archiver.cache_path is None: + pytest.skip("no cache path for this kind of Cache implementation") + create_src_archive(archiver, "test") chunks_path = os.path.join(archiver.cache_path, "chunks") + if not os.path.exists(chunks_path): + pytest.skip("no persistent chunks index for this kind of Cache implementation") + chunks_before_corruption = set(ChunkIndex(path=chunks_path).iteritems()) + corrupt(chunks_path) assert not archiver.FORK_DEFAULT # test does not support forking @@ -74,6 +81,9 @@ def test_cache_chunks(archiver): def test_cache_files(archiver): corrupt_archiver(archiver) + if archiver.cache_path is None: + pytest.skip("no cache path for this kind of Cache implementation") + cmd(archiver, "create", "test", "input") corrupt(os.path.join(archiver.cache_path, "files")) out = cmd(archiver, "create", "test1", "input") @@ -83,6 +93,9 @@ def test_cache_files(archiver): def test_chunks_archive(archiver): corrupt_archiver(archiver) + if archiver.cache_path is None: + pytest.skip("no cache path for this kind of Cache implementation") + cmd(archiver, "create", "test1", "input") # Find ID of test1, so we can corrupt it later :) target_id = cmd(archiver, "rlist", "--format={id}{NL}").strip() @@ -93,6 +106,8 @@ def test_chunks_archive(archiver): cmd(archiver, "rinfo", "--json") chunks_archive = os.path.join(archiver.cache_path, "chunks.archive.d") + if not os.path.exists(chunks_archive): + pytest.skip("Only LocalCache has a per-archive chunks index cache.") assert len(os.listdir(chunks_archive)) == 4 # two archives, one chunks cache and one .integrity file each corrupt(os.path.join(chunks_archive, target_id + ".compact")) @@ -114,6 +129,9 @@ def test_chunks_archive(archiver): def test_old_version_interfered(archiver): corrupt_archiver(archiver) + if archiver.cache_path is None: + pytest.skip("no cache path for this kind of Cache implementation") + # Modify the main manifest ID without touching the manifest ID in the integrity section. # This happens if a version without integrity checking modifies the cache. config_path = os.path.join(archiver.cache_path, "config") diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index a064f4635..4d7626d2b 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -12,6 +12,7 @@ import time import pytest from ... import platform +from ...cache import get_cache_impl from ...constants import * # NOQA from ...manifest import Manifest from ...platform import is_cygwin, is_win32, is_darwin @@ -540,20 +541,21 @@ def test_create_pattern_intermediate_folders_first(archivers, request): assert out_list.index("d x/b") < out_list.index("- x/b/foo_b") -def test_create_no_cache_sync(archivers, request): +@pytest.mark.skipif(get_cache_impl() in ("adhocwithfiles", "local"), reason="only works with AdHocCache") +def test_create_no_cache_sync_adhoc(archivers, request): # TODO: add test for AdHocWithFilesCache archiver = request.getfixturevalue(archivers) create_test_files(archiver.input_path) cmd(archiver, "rcreate", RK_ENCRYPTION) cmd(archiver, "rdelete", "--cache-only") create_json = json.loads( - cmd(archiver, "create", "--no-cache-sync", "--json", "--error", "test", "input") - ) # ignore experimental warning + cmd(archiver, "create", "--no-cache-sync", "--prefer-adhoc-cache", "--json", "test", "input") + ) info_json = json.loads(cmd(archiver, "info", "-a", "test", "--json")) create_stats = create_json["cache"]["stats"] info_stats = info_json["cache"]["stats"] assert create_stats == info_stats cmd(archiver, "rdelete", "--cache-only") - cmd(archiver, "create", "--no-cache-sync", "test2", "input") + cmd(archiver, "create", "--no-cache-sync", "--prefer-adhoc-cache", "test2", "input") cmd(archiver, "rinfo") cmd(archiver, "check") diff --git a/src/borg/testsuite/archiver/debug_cmds.py b/src/borg/testsuite/archiver/debug_cmds.py index 4518a3ccf..3923871a5 100644 --- a/src/borg/testsuite/archiver/debug_cmds.py +++ b/src/borg/testsuite/archiver/debug_cmds.py @@ -168,7 +168,12 @@ def test_debug_refcount_obj(archivers, request): create_json = json.loads(cmd(archiver, "create", "--json", "test", "input")) archive_id = create_json["archive"]["id"] output = cmd(archiver, "debug", "refcount-obj", archive_id).strip() - assert output == f"object {archive_id} has 1 referrers [info from chunks cache]." + # LocalCache does precise refcounting, so we'll get 1 reference for the archive. + # AdHocCache or AdHocWithFilesCache doesn't, we'll get ChunkIndex.MAX_VALUE as refcount. + assert ( + output == f"object {archive_id} has 1 referrers [info from chunks cache]." + or output == f"object {archive_id} has 4294966271 referrers [info from chunks cache]." + ) # Invalid IDs do not abort or return an error output = cmd(archiver, "debug", "refcount-obj", "124", "xyza").strip() diff --git a/src/borg/testsuite/archiver/delete_cmd.py b/src/borg/testsuite/archiver/delete_cmd.py index 30727cac2..25c35e931 100644 --- a/src/borg/testsuite/archiver/delete_cmd.py +++ b/src/borg/testsuite/archiver/delete_cmd.py @@ -25,9 +25,8 @@ def test_delete(archivers, request): cmd(archiver, "extract", "test.2", "--dry-run") output = cmd(archiver, "delete", "-a", "test.2", "--stats") assert "Original size: -" in output # negative size == deleted data - # Make sure all data except the manifest has been deleted - with Repository(archiver.repository_path) as repository: - assert len(repository) == 1 + output = cmd(archiver, "rlist") + assert output == "" # no archives left! def test_delete_multiple(archivers, request): diff --git a/src/borg/testsuite/archiver/list_cmd.py b/src/borg/testsuite/archiver/list_cmd.py index 6d90a41cb..33c9a483b 100644 --- a/src/borg/testsuite/archiver/list_cmd.py +++ b/src/borg/testsuite/archiver/list_cmd.py @@ -40,9 +40,9 @@ def test_list_chunk_counts(archivers, request): fd.write(b"baab" * 2000000) cmd(archiver, "rcreate", RK_ENCRYPTION) cmd(archiver, "create", "test", "input") - output = cmd(archiver, "list", "test", "--format", "{num_chunks} {unique_chunks} {path}{NL}") - assert "0 0 input/empty_file" in output - assert "2 2 input/two_chunks" in output + output = cmd(archiver, "list", "test", "--format", "{num_chunks} {path}{NL}") + assert "0 input/empty_file" in output + assert "2 input/two_chunks" in output def test_list_size(archivers, request): diff --git a/src/borg/testsuite/archiver/recreate_cmd.py b/src/borg/testsuite/archiver/recreate_cmd.py index 078ec1ed0..fc7f8d1b9 100644 --- a/src/borg/testsuite/archiver/recreate_cmd.py +++ b/src/borg/testsuite/archiver/recreate_cmd.py @@ -153,15 +153,18 @@ def test_recreate_rechunkify(archivers, request): cmd(archiver, "rcreate", RK_ENCRYPTION) cmd(archiver, "create", "test1", "input", "--chunker-params", "7,9,8,128") cmd(archiver, "create", "test2", "input", "--files-cache=disabled") - chunks_list = cmd(archiver, "list", "test1", "input/large_file", "--format", "{num_chunks} {unique_chunks}") - num_chunks, unique_chunks = map(int, chunks_list.split(" ")) - # test1 and test2 do not deduplicate - assert num_chunks == unique_chunks + num_chunks1 = int(cmd(archiver, "list", "test1", "input/large_file", "--format", "{num_chunks}")) + num_chunks2 = int(cmd(archiver, "list", "test2", "input/large_file", "--format", "{num_chunks}")) + # right now, the file is chunked differently + assert num_chunks1 != num_chunks2 cmd(archiver, "recreate", "--chunker-params", "default") check_cache(archiver) - # test1 and test2 do deduplicate after recreate - assert int(cmd(archiver, "list", "test1", "input/large_file", "--format={size}")) - assert not int(cmd(archiver, "list", "test1", "input/large_file", "--format", "{unique_chunks}")) + num_chunks1 = int(cmd(archiver, "list", "test1", "input/large_file", "--format", "{num_chunks}")) + num_chunks2 = int(cmd(archiver, "list", "test2", "input/large_file", "--format", "{num_chunks}")) + # now the files are chunked in the same way + # TODO: this is a rather weak test, it could be improved by comparing the IDs in the chunk lists, + # to make sure that everything is completely deduplicated now (both files have identical chunks). + assert num_chunks1 == num_chunks2 def test_recreate_fixed_rechunkify(archivers, request): diff --git a/src/borg/testsuite/cache.py b/src/borg/testsuite/cache.py index 571a483d2..60cb870e3 100644 --- a/src/borg/testsuite/cache.py +++ b/src/borg/testsuite/cache.py @@ -189,7 +189,7 @@ class TestAdHocCache: def test_does_not_delete_existing_chunks(self, repository, cache): assert cache.seen_chunk(H(1)) == ChunkIndex.MAX_VALUE - cache.chunk_decref(H(1), Statistics()) + cache.chunk_decref(H(1), 1, Statistics()) assert repository.get(H(1)) == b"1234" def test_seen_chunk_add_chunk_size(self, cache): @@ -199,7 +199,7 @@ class TestAdHocCache: """E.g. checkpoint archives""" cache.add_chunk(H(5), {}, b"1010", stats=Statistics()) assert cache.seen_chunk(H(5)) == 1 - cache.chunk_decref(H(5), Statistics()) + cache.chunk_decref(H(5), 1, Statistics()) assert not cache.seen_chunk(H(5)) with pytest.raises(Repository.ObjectNotFound): repository.get(H(5)) @@ -220,9 +220,9 @@ class TestAdHocCache: def test_incref_after_add_chunk(self, cache): assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4) - assert cache.chunk_incref(H(3), Statistics()) == (H(3), 4) + assert cache.chunk_incref(H(3), 4, Statistics()) == (H(3), 4) def test_existing_incref_after_add_chunk(self, cache): """This case occurs with part files, see Archive.chunk_file.""" assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4) - assert cache.chunk_incref(H(1), Statistics()) == (H(1), 4) + assert cache.chunk_incref(H(1), 4, Statistics()) == (H(1), 4) diff --git a/src/borg/testsuite/conftest.py b/src/borg/testsuite/conftest.py index aa45da4ea..4708d9170 100644 --- a/src/borg/testsuite/conftest.py +++ b/src/borg/testsuite/conftest.py @@ -127,6 +127,7 @@ def archiver(tmp_path, set_env_variables): archiver.patterns_file_path = os.fspath(tmp_path / "patterns") os.environ["BORG_KEYS_DIR"] = archiver.keys_path os.environ["BORG_CACHE_DIR"] = archiver.cache_path + # os.environ["BORG_CACHE_IMPL"] = "adhocwithfiles" os.mkdir(archiver.input_path) os.chmod(archiver.input_path, 0o777) # avoid troubles with fakeroot / FUSE os.mkdir(archiver.output_path) diff --git a/src/borg/upgrade.py b/src/borg/upgrade.py index a8d17ac41..22a27c18c 100644 --- a/src/borg/upgrade.py +++ b/src/borg/upgrade.py @@ -84,8 +84,8 @@ class UpgraderFrom12To20: chunks, chunks_healthy = self.hlm.retrieve(id=hlid, default=(None, None)) if chunks is not None: item.chunks = chunks - for chunk_id, _ in chunks: - self.cache.chunk_incref(chunk_id, self.archive.stats) + for chunk_id, chunk_size in chunks: + self.cache.chunk_incref(chunk_id, chunk_size, self.archive.stats) if chunks_healthy is not None: item.chunks_healthy = chunks del item.source # not used for hardlinks any more, replaced by hlid