diff --git a/borg/cache.py b/borg/cache.py index 65e64af5b..13f80f325 100644 --- a/borg/cache.py +++ b/borg/cache.py @@ -212,6 +212,23 @@ class Cache: """ archive_path = os.path.join(self.path, 'chunks.archive.d') + def mkpath(id, suffix=''): + id_hex = hexlify(id).decode('ascii') + path = os.path.join(archive_path, id_hex + suffix) + return path.encode('utf-8') + + def cached_archives(): + fns = os.listdir(archive_path) + # filenames with 64 hex digits == 256bit + return set(unhexlify(fn) for fn in fns if len(fn) == 64) + + def repo_archives(): + return set(info[b'id'] for info in self.manifest.archives.values()) + + def cleanup_outdated(ids): + for id in ids: + os.unlink(mkpath(id)) + def add(chunk_idx, id, size, csize, incr=1): try: count, size, csize = chunk_idx[id] @@ -219,20 +236,6 @@ class Cache: except KeyError: chunk_idx[id] = incr, size, csize - def mkpath(id, suffix=''): - path = os.path.join(archive_path, id + suffix) - return path.encode('utf-8') - - def list_archives(): - fns = os.listdir(archive_path) - # only return filenames that are 64 hex digits (256bit) - return [fn for fn in fns if len(fn) == 64] - - def cleanup_outdated(ids): - for id in ids: - id_hex = hexlify(id).decode('ascii') - os.unlink(mkpath(id_hex)) - def fetch_and_build_idx(archive_id, repository, key): chunk_idx = ChunkIndex() cdata = repository.get(archive_id) @@ -242,7 +245,6 @@ class Cache: if archive[b'version'] != 1: raise Exception('Unknown archive metadata version') decode_dict(archive, (b'name',)) - print('Analyzing new archive:', archive[b'name']) unpacker = msgpack.Unpacker() for item_id, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])): data = key.decrypt(item_id, chunk) @@ -255,33 +257,43 @@ class Cache: if b'chunks' in item: for chunk_id, size, csize in item[b'chunks']: add(chunk_idx, chunk_id, size, csize) - archive_id_hex = hexlify(archive_id).decode('ascii') - fn = mkpath(archive_id_hex) - fn_tmp = mkpath(archive_id_hex, suffix='.tmp') + fn = mkpath(archive_id) + fn_tmp = mkpath(archive_id, suffix='.tmp') try: chunk_idx.write(fn_tmp) except Exception: os.unlink(fn_tmp) else: os.rename(fn_tmp, fn) + return chunk_idx + + def lookup_name(archive_id): + for name, info in self.manifest.archives.items(): + if info[b'id'] == archive_id: + return name def create_master_idx(chunk_idx): + print('Synchronizing chunks cache...') + cached_ids = cached_archives() + archive_ids = repo_archives() + print('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.' % ( + len(archive_ids), len(cached_ids), + len(cached_ids - archive_ids), len(archive_ids - cached_ids), )) # deallocates old hashindex, creates empty hashindex: chunk_idx.clear() - archives = list_archives() - if archives: + cleanup_outdated(cached_ids - archive_ids) + if archive_ids: chunk_idx = None - for fn in archives: - archive_id_hex = fn - archive_id = unhexlify(archive_id_hex) - for name, info in self.manifest.archives.items(): - if info[b'id'] == archive_id: - archive_name = name - break - archive_chunk_idx_path = mkpath(archive_id_hex) - print("- reading archive %s ..." % archive_name) - archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path) - print("- merging archive ...") + for archive_id in archive_ids: + archive_name = lookup_name(archive_id) + if archive_id in cached_ids: + archive_chunk_idx_path = mkpath(archive_id) + print("Reading cached archive chunk index for %s ..." % archive_name) + archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path) + else: + print('Fetching and building archive index for %s ...' % archive_name) + archive_chunk_idx = fetch_and_build_idx(archive_id, repository, self.key) + print("Merging into master chunks index ...") if chunk_idx is None: # we just use the first archive's idx as starting point, # to avoid growing the hash table from 0 size and also @@ -289,38 +301,28 @@ class Cache: chunk_idx = archive_chunk_idx else: chunk_idx.merge(archive_chunk_idx) + print('Done.') return chunk_idx - def legacy_support(): + def legacy_cleanup(): + """bring old cache dirs into the desired state (cleanup and adapt)""" try: - # get rid of the compressed tar file, if present os.unlink(os.path.join(self.path, 'chunks.archive')) except: pass try: - # create the directory for the archive index files we use now + os.unlink(os.path.join(self.path, 'chunks.archive.tmp')) + except: + pass + try: os.mkdir(archive_path) except: pass - self.begin_txn() - print('Synchronizing chunks cache...') repository = cache_if_remote(self.repository) - legacy_support() - known_ids = set(unhexlify(hexid) for hexid in list_archives()) - archive_ids = set(info[b'id'] for info in self.manifest.archives.values()) - print('Rebuilding archive collection. Repo: %d Known: %d Outdated: %d Unknown: %d' % ( - len(archive_ids), len(known_ids), - len(known_ids - archive_ids), len(archive_ids - known_ids), )) - cleanup_outdated(known_ids - archive_ids) - for archive_id in archive_ids - known_ids: - fetch_and_build_idx(archive_id, repository, self.key) - known_ids = set(unhexlify(hexid) for hexid in list_archives()) - assert known_ids == archive_ids - print('Merging collection into master chunks cache...') + legacy_cleanup() self.chunks = create_master_idx(self.chunks) - print('Done.') def add_chunk(self, id, data, stats): if not self.txn_active: