From 22dd925986f46615d4bb8b09830c493ae65ec896 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sun, 30 Aug 2015 03:03:48 +0200 Subject: [PATCH] chunks index archive: remove all tar and compression related stuff and just use separate files in a directory the compression was quite cpu intensive and didn't work that great anyway. now the disk space usage is a bit higher, but it is much faster and less hard on the cpu. disk space needs grow linearly with the amount and size of the archives, this is a problem esp. if one has many and/or big archives (but this problem existed before also because compression was not as effective as I believed). the tar archive always needed a complete rebuild (and thus: decompression and recompression) because deleting outdated archive indexes was not possible in the tar file. now we just have a directory chunks.archive.d and keep archive index files there for all archives we already know. if an archive does not exist any more in the repo, we just delete its index file. if an archive is unknown still, we fetch the infos and build a new index file. when merging, we avoid growing the hash table from zero, but just start with the first archive's index as basis for merging. --- borg/cache.py | 185 ++++++++++++++++++++++---------------------------- 1 file changed, 82 insertions(+), 103 deletions(-) diff --git a/borg/cache.py b/borg/cache.py index 207fb58a6..65e64af5b 100644 --- a/borg/cache.py +++ b/borg/cache.py @@ -96,8 +96,7 @@ class Cache: with open(os.path.join(self.path, 'config'), 'w') as fd: config.write(fd) ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8')) - with open(os.path.join(self.path, 'chunks.archive'), 'wb') as fd: - pass # empty file + os.makedirs(os.path.join(self.path, 'chunks.archive.d')) with open(os.path.join(self.path, 'files'), 'wb') as fd: pass # empty file @@ -153,7 +152,6 @@ class Cache: os.mkdir(txn_dir) shutil.copy(os.path.join(self.path, 'config'), txn_dir) shutil.copy(os.path.join(self.path, 'chunks'), txn_dir) - shutil.copy(os.path.join(self.path, 'chunks.archive'), txn_dir) shutil.copy(os.path.join(self.path, 'files'), txn_dir) os.rename(os.path.join(self.path, 'txn.tmp'), os.path.join(self.path, 'txn.active')) @@ -195,7 +193,6 @@ class Cache: if os.path.exists(txn_dir): shutil.copy(os.path.join(txn_dir, 'config'), self.path) shutil.copy(os.path.join(txn_dir, 'chunks'), self.path) - shutil.copy(os.path.join(txn_dir, 'chunks.archive'), self.path) shutil.copy(os.path.join(txn_dir, 'files'), self.path) os.rename(txn_dir, os.path.join(self.path, 'txn.tmp')) if os.path.exists(os.path.join(self.path, 'txn.tmp')): @@ -206,53 +203,14 @@ class Cache: def sync(self): """Re-synchronize chunks cache with repository. - If present, uses a compressed tar archive of known backup archive - indices, so it only needs to fetch infos from repo and build a chunk - index once per backup archive. - If out of sync, the tar gets rebuilt from known + fetched chunk infos, - so it has complete and current information about all backup archives. - Finally, it builds the master chunks index by merging all indices from - the tar. + Maintains a directory with known backup archive indexes, so it only + needs to fetch infos from repo and build a chunk index once per backup + archive. + If out of sync, missing archive indexes get added, outdated indexes + get removed and a new master chunks index is built by merging all + archive indexes. """ - in_archive_path = os.path.join(self.path, 'chunks.archive') - out_archive_path = os.path.join(self.path, 'chunks.archive.tmp') - - def open_in_archive(): - try: - tf = tarfile.open(in_archive_path, 'r') - except OSError as e: - if e.errno != errno.ENOENT: - raise - # file not found - tf = None - except tarfile.ReadError: - # empty file? - tf = None - return tf - - def open_out_archive(): - for compression in ('gz', ): - # 'xz' needs py 3.3 and is expensive on the cpu - # 'bz2' also works on 3.2 and is expensive on the cpu - # 'gz' also works on 3.2 and is less expensive on the cpu - try: - tf = tarfile.open(out_archive_path, 'w:'+compression, format=tarfile.PAX_FORMAT) - break - except tarfile.CompressionError: - continue - else: # shouldn't happen - tf = None - return tf - - def close_archive(tf): - if tf: - tf.close() - - def delete_in_archive(): - os.unlink(in_archive_path) - - def rename_out_archive(): - os.rename(out_archive_path, in_archive_path) + archive_path = os.path.join(self.path, 'chunks.archive.d') def add(chunk_idx, id, size, csize, incr=1): try: @@ -261,16 +219,21 @@ class Cache: except KeyError: chunk_idx[id] = incr, size, csize - def transfer_known_idx(archive_id, tf_in, tf_out): - archive_id_hex = hexlify(archive_id).decode('ascii') - tarinfo = tf_in.getmember(archive_id_hex) - archive_name = tarinfo.pax_headers['archive_name'] - print('Already known archive:', archive_name) - f_in = tf_in.extractfile(archive_id_hex) - tf_out.addfile(tarinfo, f_in) - return archive_name + def mkpath(id, suffix=''): + path = os.path.join(archive_path, id + suffix) + return path.encode('utf-8') - def fetch_and_build_idx(archive_id, repository, key, tmp_dir, tf_out): + def list_archives(): + fns = os.listdir(archive_path) + # only return filenames that are 64 hex digits (256bit) + return [fn for fn in fns if len(fn) == 64] + + def cleanup_outdated(ids): + for id in ids: + id_hex = hexlify(id).decode('ascii') + os.unlink(mkpath(id_hex)) + + def fetch_and_build_idx(archive_id, repository, key): chunk_idx = ChunkIndex() cdata = repository.get(archive_id) data = key.decrypt(archive_id, cdata) @@ -293,55 +256,71 @@ class Cache: for chunk_id, size, csize in item[b'chunks']: add(chunk_idx, chunk_id, size, csize) archive_id_hex = hexlify(archive_id).decode('ascii') - file_tmp = os.path.join(tmp_dir, archive_id_hex).encode('utf-8') - chunk_idx.write(file_tmp) - tarinfo = tf_out.gettarinfo(file_tmp, archive_id_hex) - tarinfo.pax_headers['archive_name'] = archive[b'name'] - with open(file_tmp, 'rb') as f: - tf_out.addfile(tarinfo, f) - os.unlink(file_tmp) + fn = mkpath(archive_id_hex) + fn_tmp = mkpath(archive_id_hex, suffix='.tmp') + try: + chunk_idx.write(fn_tmp) + except Exception: + os.unlink(fn_tmp) + else: + os.rename(fn_tmp, fn) - def create_master_idx(chunk_idx, tf_in, tmp_dir): + def create_master_idx(chunk_idx): + # deallocates old hashindex, creates empty hashindex: chunk_idx.clear() - for tarinfo in tf_in: - archive_id_hex = tarinfo.name - archive_name = tarinfo.pax_headers['archive_name'] - print("- extracting archive %s ..." % archive_name) - tf_in.extract(archive_id_hex, tmp_dir) - chunk_idx_path = os.path.join(tmp_dir, archive_id_hex).encode('utf-8') - print("- reading archive ...") - archive_chunk_idx = ChunkIndex.read(chunk_idx_path) - print("- merging archive ...") - chunk_idx.merge(archive_chunk_idx) - os.unlink(chunk_idx_path) + archives = list_archives() + if archives: + chunk_idx = None + for fn in archives: + archive_id_hex = fn + archive_id = unhexlify(archive_id_hex) + for name, info in self.manifest.archives.items(): + if info[b'id'] == archive_id: + archive_name = name + break + archive_chunk_idx_path = mkpath(archive_id_hex) + print("- reading archive %s ..." % archive_name) + archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path) + print("- merging archive ...") + if chunk_idx is None: + # we just use the first archive's idx as starting point, + # to avoid growing the hash table from 0 size and also + # to save 1 merge call. + chunk_idx = archive_chunk_idx + else: + chunk_idx.merge(archive_chunk_idx) + return chunk_idx + + def legacy_support(): + try: + # get rid of the compressed tar file, if present + os.unlink(os.path.join(self.path, 'chunks.archive')) + except: + pass + try: + # create the directory for the archive index files we use now + os.mkdir(archive_path) + except: + pass + self.begin_txn() print('Synchronizing chunks cache...') - # XXX we have to do stuff on disk due to lacking ChunkIndex api - with tempfile.TemporaryDirectory(prefix='borg-tmp') as tmp_dir: - repository = cache_if_remote(self.repository) - out_archive = open_out_archive() - in_archive = open_in_archive() - if in_archive: - known_ids = set(unhexlify(hexid) for hexid in in_archive.getnames()) - else: - known_ids = set() - archive_ids = set(info[b'id'] for info in self.manifest.archives.values()) - print('Rebuilding archive collection. Known: %d Repo: %d Unknown: %d' % ( - len(known_ids), len(archive_ids), len(archive_ids - known_ids), )) - for archive_id in archive_ids & known_ids: - transfer_known_idx(archive_id, in_archive, out_archive) - close_archive(in_archive) - delete_in_archive() # free disk space - for archive_id in archive_ids - known_ids: - fetch_and_build_idx(archive_id, repository, self.key, tmp_dir, out_archive) - close_archive(out_archive) - rename_out_archive() - print('Merging collection into master chunks cache...') - in_archive = open_in_archive() - create_master_idx(self.chunks, in_archive, tmp_dir) - close_archive(in_archive) - print('Done.') + repository = cache_if_remote(self.repository) + legacy_support() + known_ids = set(unhexlify(hexid) for hexid in list_archives()) + archive_ids = set(info[b'id'] for info in self.manifest.archives.values()) + print('Rebuilding archive collection. Repo: %d Known: %d Outdated: %d Unknown: %d' % ( + len(archive_ids), len(known_ids), + len(known_ids - archive_ids), len(archive_ids - known_ids), )) + cleanup_outdated(known_ids - archive_ids) + for archive_id in archive_ids - known_ids: + fetch_and_build_idx(archive_id, repository, self.key) + known_ids = set(unhexlify(hexid) for hexid in list_archives()) + assert known_ids == archive_ids + print('Merging collection into master chunks cache...') + self.chunks = create_master_idx(self.chunks) + print('Done.') def add_chunk(self, id, data, stats): if not self.txn_active: