diff --git a/borg/cache.py b/borg/cache.py index 207fb58a6..65e64af5b 100644 --- a/borg/cache.py +++ b/borg/cache.py @@ -96,8 +96,7 @@ class Cache: with open(os.path.join(self.path, 'config'), 'w') as fd: config.write(fd) ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8')) - with open(os.path.join(self.path, 'chunks.archive'), 'wb') as fd: - pass # empty file + os.makedirs(os.path.join(self.path, 'chunks.archive.d')) with open(os.path.join(self.path, 'files'), 'wb') as fd: pass # empty file @@ -153,7 +152,6 @@ class Cache: os.mkdir(txn_dir) shutil.copy(os.path.join(self.path, 'config'), txn_dir) shutil.copy(os.path.join(self.path, 'chunks'), txn_dir) - shutil.copy(os.path.join(self.path, 'chunks.archive'), txn_dir) shutil.copy(os.path.join(self.path, 'files'), txn_dir) os.rename(os.path.join(self.path, 'txn.tmp'), os.path.join(self.path, 'txn.active')) @@ -195,7 +193,6 @@ class Cache: if os.path.exists(txn_dir): shutil.copy(os.path.join(txn_dir, 'config'), self.path) shutil.copy(os.path.join(txn_dir, 'chunks'), self.path) - shutil.copy(os.path.join(txn_dir, 'chunks.archive'), self.path) shutil.copy(os.path.join(txn_dir, 'files'), self.path) os.rename(txn_dir, os.path.join(self.path, 'txn.tmp')) if os.path.exists(os.path.join(self.path, 'txn.tmp')): @@ -206,53 +203,14 @@ class Cache: def sync(self): """Re-synchronize chunks cache with repository. - If present, uses a compressed tar archive of known backup archive - indices, so it only needs to fetch infos from repo and build a chunk - index once per backup archive. - If out of sync, the tar gets rebuilt from known + fetched chunk infos, - so it has complete and current information about all backup archives. - Finally, it builds the master chunks index by merging all indices from - the tar. + Maintains a directory with known backup archive indexes, so it only + needs to fetch infos from repo and build a chunk index once per backup + archive. + If out of sync, missing archive indexes get added, outdated indexes + get removed and a new master chunks index is built by merging all + archive indexes. """ - in_archive_path = os.path.join(self.path, 'chunks.archive') - out_archive_path = os.path.join(self.path, 'chunks.archive.tmp') - - def open_in_archive(): - try: - tf = tarfile.open(in_archive_path, 'r') - except OSError as e: - if e.errno != errno.ENOENT: - raise - # file not found - tf = None - except tarfile.ReadError: - # empty file? - tf = None - return tf - - def open_out_archive(): - for compression in ('gz', ): - # 'xz' needs py 3.3 and is expensive on the cpu - # 'bz2' also works on 3.2 and is expensive on the cpu - # 'gz' also works on 3.2 and is less expensive on the cpu - try: - tf = tarfile.open(out_archive_path, 'w:'+compression, format=tarfile.PAX_FORMAT) - break - except tarfile.CompressionError: - continue - else: # shouldn't happen - tf = None - return tf - - def close_archive(tf): - if tf: - tf.close() - - def delete_in_archive(): - os.unlink(in_archive_path) - - def rename_out_archive(): - os.rename(out_archive_path, in_archive_path) + archive_path = os.path.join(self.path, 'chunks.archive.d') def add(chunk_idx, id, size, csize, incr=1): try: @@ -261,16 +219,21 @@ class Cache: except KeyError: chunk_idx[id] = incr, size, csize - def transfer_known_idx(archive_id, tf_in, tf_out): - archive_id_hex = hexlify(archive_id).decode('ascii') - tarinfo = tf_in.getmember(archive_id_hex) - archive_name = tarinfo.pax_headers['archive_name'] - print('Already known archive:', archive_name) - f_in = tf_in.extractfile(archive_id_hex) - tf_out.addfile(tarinfo, f_in) - return archive_name + def mkpath(id, suffix=''): + path = os.path.join(archive_path, id + suffix) + return path.encode('utf-8') - def fetch_and_build_idx(archive_id, repository, key, tmp_dir, tf_out): + def list_archives(): + fns = os.listdir(archive_path) + # only return filenames that are 64 hex digits (256bit) + return [fn for fn in fns if len(fn) == 64] + + def cleanup_outdated(ids): + for id in ids: + id_hex = hexlify(id).decode('ascii') + os.unlink(mkpath(id_hex)) + + def fetch_and_build_idx(archive_id, repository, key): chunk_idx = ChunkIndex() cdata = repository.get(archive_id) data = key.decrypt(archive_id, cdata) @@ -293,55 +256,71 @@ class Cache: for chunk_id, size, csize in item[b'chunks']: add(chunk_idx, chunk_id, size, csize) archive_id_hex = hexlify(archive_id).decode('ascii') - file_tmp = os.path.join(tmp_dir, archive_id_hex).encode('utf-8') - chunk_idx.write(file_tmp) - tarinfo = tf_out.gettarinfo(file_tmp, archive_id_hex) - tarinfo.pax_headers['archive_name'] = archive[b'name'] - with open(file_tmp, 'rb') as f: - tf_out.addfile(tarinfo, f) - os.unlink(file_tmp) + fn = mkpath(archive_id_hex) + fn_tmp = mkpath(archive_id_hex, suffix='.tmp') + try: + chunk_idx.write(fn_tmp) + except Exception: + os.unlink(fn_tmp) + else: + os.rename(fn_tmp, fn) - def create_master_idx(chunk_idx, tf_in, tmp_dir): + def create_master_idx(chunk_idx): + # deallocates old hashindex, creates empty hashindex: chunk_idx.clear() - for tarinfo in tf_in: - archive_id_hex = tarinfo.name - archive_name = tarinfo.pax_headers['archive_name'] - print("- extracting archive %s ..." % archive_name) - tf_in.extract(archive_id_hex, tmp_dir) - chunk_idx_path = os.path.join(tmp_dir, archive_id_hex).encode('utf-8') - print("- reading archive ...") - archive_chunk_idx = ChunkIndex.read(chunk_idx_path) - print("- merging archive ...") - chunk_idx.merge(archive_chunk_idx) - os.unlink(chunk_idx_path) + archives = list_archives() + if archives: + chunk_idx = None + for fn in archives: + archive_id_hex = fn + archive_id = unhexlify(archive_id_hex) + for name, info in self.manifest.archives.items(): + if info[b'id'] == archive_id: + archive_name = name + break + archive_chunk_idx_path = mkpath(archive_id_hex) + print("- reading archive %s ..." % archive_name) + archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path) + print("- merging archive ...") + if chunk_idx is None: + # we just use the first archive's idx as starting point, + # to avoid growing the hash table from 0 size and also + # to save 1 merge call. + chunk_idx = archive_chunk_idx + else: + chunk_idx.merge(archive_chunk_idx) + return chunk_idx + + def legacy_support(): + try: + # get rid of the compressed tar file, if present + os.unlink(os.path.join(self.path, 'chunks.archive')) + except: + pass + try: + # create the directory for the archive index files we use now + os.mkdir(archive_path) + except: + pass + self.begin_txn() print('Synchronizing chunks cache...') - # XXX we have to do stuff on disk due to lacking ChunkIndex api - with tempfile.TemporaryDirectory(prefix='borg-tmp') as tmp_dir: - repository = cache_if_remote(self.repository) - out_archive = open_out_archive() - in_archive = open_in_archive() - if in_archive: - known_ids = set(unhexlify(hexid) for hexid in in_archive.getnames()) - else: - known_ids = set() - archive_ids = set(info[b'id'] for info in self.manifest.archives.values()) - print('Rebuilding archive collection. Known: %d Repo: %d Unknown: %d' % ( - len(known_ids), len(archive_ids), len(archive_ids - known_ids), )) - for archive_id in archive_ids & known_ids: - transfer_known_idx(archive_id, in_archive, out_archive) - close_archive(in_archive) - delete_in_archive() # free disk space - for archive_id in archive_ids - known_ids: - fetch_and_build_idx(archive_id, repository, self.key, tmp_dir, out_archive) - close_archive(out_archive) - rename_out_archive() - print('Merging collection into master chunks cache...') - in_archive = open_in_archive() - create_master_idx(self.chunks, in_archive, tmp_dir) - close_archive(in_archive) - print('Done.') + repository = cache_if_remote(self.repository) + legacy_support() + known_ids = set(unhexlify(hexid) for hexid in list_archives()) + archive_ids = set(info[b'id'] for info in self.manifest.archives.values()) + print('Rebuilding archive collection. Repo: %d Known: %d Outdated: %d Unknown: %d' % ( + len(archive_ids), len(known_ids), + len(known_ids - archive_ids), len(archive_ids - known_ids), )) + cleanup_outdated(known_ids - archive_ids) + for archive_id in archive_ids - known_ids: + fetch_and_build_idx(archive_id, repository, self.key) + known_ids = set(unhexlify(hexid) for hexid in list_archives()) + assert known_ids == archive_ids + print('Merging collection into master chunks cache...') + self.chunks = create_master_idx(self.chunks) + print('Done.') def add_chunk(self, id, data, stats): if not self.txn_active: