diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 0e6a6ba07..35b08275e 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -58,13 +58,8 @@ class ArchiveGarbageCollector: return chunks def save_chunk_index(self): - # first clean up: - for id, entry in self.chunks.iteritems(): - # we already deleted the unused chunks, so everything left must be used: - assert entry.flags & ChunkIndex.F_USED - # as we put the wrong size in there, we need to clean up the size: - self.chunks[id] = entry._replace(size=0) - # now self.chunks is an uptodate ChunkIndex, usable for general borg usage! + # write_chunkindex_to_repo now removes all flags and size infos. + # we need this, as we put the wrong size in there. write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True, delete_other=True) self.chunks = None # nothing there (cleared!) diff --git a/src/borg/cache.py b/src/borg/cache.py index efd3282d9..76d43a6fd 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -396,9 +396,7 @@ class FilesCacheMixin: assert isinstance(entry, FileCacheEntry) compressed_chunks = [] for id, size in entry.chunks: - cie = self.chunks.get(id) - assert cie is not None - assert cie.flags & ChunkIndex.F_USED + cie = self.chunks[id] # may raise KeyError if chunk id is not in repo if cie.size == 0: # size is not known in the chunks index yet self.chunks[id] = cie._replace(size=size) else: @@ -418,9 +416,7 @@ class FilesCacheMixin: for idx in entry.chunks: assert isinstance(idx, int), f"{idx} is not an int" id = self.chunks.idx_to_k(idx) - cie = self.chunks.get(id) - assert cie is not None - assert cie.flags & ChunkIndex.F_USED + cie = self.chunks[id] assert cie.size > 0 chunks.append((id, cie.size)) entry = entry._replace(chunks=chunks) @@ -485,6 +481,7 @@ class FilesCacheMixin: mtime=int_to_timestamp(mtime_ns), chunks=item.chunks, ) + # note: if the repo is an a valid state, next line should not fail with KeyError: files[path_hash] = self.compress_entry(entry) # deal with special snapshot / timestamp granularity case, see FAQ: for path_hash in self._newest_path_hashes: @@ -529,7 +526,11 @@ class FilesCacheMixin: for path_hash, entry in u: entry = FileCacheEntry(*entry) entry = entry._replace(age=entry.age + 1) - files[path_hash] = self.compress_entry(entry) + try: + files[path_hash] = self.compress_entry(entry) + except KeyError: + # repo is missing a chunk referenced from entry + logger.debug(f"compress_entry failed for {entry}, skipping.") except (TypeError, ValueError) as exc: msg = "The files cache seems invalid. [%s]" % str(exc) break @@ -706,14 +707,23 @@ CHUNKINDEX_HASH_SEED = 2 def write_chunkindex_to_repo_cache( repository, chunks, *, clear=False, force_write=False, delete_other=False, delete_these=None ): - cached_hashes = list_chunkindex_hashes(repository) + # the borghash code has no means to only serialize the F_NEW table entries, + # thus we copy only the new entries to a temporary table: + new_chunks = ChunkIndex() + # for now, we don't want to serialize the flags or the size, just the keys (chunk IDs): + cleaned_value = ChunkIndexEntry(flags=ChunkIndex.F_NONE, size=0) + for key, _ in chunks.iteritems(only_new=True): + new_chunks[key] = cleaned_value with io.BytesIO() as f: - chunks.write(f) + new_chunks.write(f) data = f.getvalue() + logger.debug(f"caching {len(new_chunks)} new chunks.") + new_chunks.clear() # free memory of the temporary table if clear: # if we don't need the in-memory chunks index anymore: chunks.clear() # free memory, immediately new_hash = bin_to_hex(xxh64(data, seed=CHUNKINDEX_HASH_SEED)) + cached_hashes = list_chunkindex_hashes(repository) if force_write or new_hash not in cached_hashes: # when an updated chunks index is stored into the cache, we also store its hash as part of the name. # when a client is loading the chunks index from a cache, it has to compare its xxh64 @@ -725,12 +735,15 @@ def write_chunkindex_to_repo_cache( cache_name = f"cache/chunks.{new_hash}" logger.debug(f"caching chunks index as {cache_name} in repository...") repository.store_store(cache_name, data) + # we have successfully stored to the repository, so we can clear all F_NEW flags now: + chunks.clear_new() + # delete some not needed cached chunk indexes, but never the one we just wrote: if delete_other: - delete_these = cached_hashes + delete_these = set(cached_hashes) - {new_hash} elif delete_these: - pass + delete_these = set(delete_these) - {new_hash} else: - delete_these = [] + delete_these = set() for hash in delete_these: cache_name = f"cache/chunks.{hash}" try: @@ -783,6 +796,8 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi write_chunkindex_to_repo_cache( repository, chunks, clear=False, force_write=True, delete_these=hashes ) + else: + chunks.clear_new() return chunks # if we didn't get anything from the cache, compute the ChunkIndex the slow way: logger.debug("querying the chunk IDs list from the repo...") diff --git a/src/borg/hashindex.pyi b/src/borg/hashindex.pyi index 6b236afa7..994e54b5a 100644 --- a/src/borg/hashindex.pyi +++ b/src/borg/hashindex.pyi @@ -13,8 +13,12 @@ CIE = Union[Tuple[int, int], Type[ChunkIndexEntry]] class ChunkIndex: F_NONE: int F_USED: int + F_NEW: int + M_USER: int + M_SYSTEM: int def add(self, key: bytes, size: int) -> None: ... - def iteritems(self, marker: bytes = ...) -> Iterator: ... + def iteritems(self, *, only_new: bool = ...) -> Iterator: ... + def clear_new(self) -> None: ... def __contains__(self, key: bytes) -> bool: ... def __getitem__(self, key: bytes) -> Type[ChunkIndexEntry]: ... def __setitem__(self, key: bytes, value: CIE) -> None: ... diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 00fe68404..beca0540b 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -39,11 +39,16 @@ ChunkIndexEntry = namedtuple('ChunkIndexEntry', 'flags size') class ChunkIndex(HTProxyMixin, MutableMapping): """ - Mapping from key256 to (refcount32, size32) to track chunks in the repository. + Mapping from key256 to (flags32, size32) to track chunks in the repository. """ - # .flags values: 2^0 .. 2^31 + # .flags related values: F_NONE = 0 # all flags cleared - F_USED = 1 # chunk is used/referenced + M_USER = 0x00ffffff # mask for user flags + M_SYSTEM = 0xff000000 # mask for system flags + # user flags: + F_USED = 2 ** 0 # chunk is used/referenced + # system flags (internal use, always 0 to user, not changeable by user): + F_NEW = 2 ** 24 # a new chunk that is not present in repo/cache/chunks.* yet. def __init__(self, capacity=1000, path=None, usable=None): if path: @@ -53,8 +58,15 @@ class ChunkIndex(HTProxyMixin, MutableMapping): capacity = usable * 2 # load factor 0.5 self.ht = HashTableNT(key_size=32, value_format="