From 694fa93b761a11f5a3f9b5a9e8227eabea2cb9e1 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 23 Dec 2024 22:30:12 +0100 Subject: [PATCH] iter_items: decouple item iteration and content data chunks preloading It needs to be possible to iterate over all items in an archive, do some output (e.g. if an item is included / excluded) and then only preload content data chunks for the included items. --- src/borg/archive.py | 82 +++++++++++++++++++------------- src/borg/archiver/extract_cmd.py | 3 +- src/borg/archiver/tar_cmds.py | 3 +- src/borg/cache.py | 2 +- 4 files changed, 55 insertions(+), 35 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 5ddf9790d..01d1617d1 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -262,42 +262,55 @@ class DownloadPipeline: def __init__(self, repository, repo_objs): self.repository = repository self.repo_objs = repo_objs + self.hlids_preloaded = None - def unpack_many(self, ids, *, filter=None, preload=False): + def unpack_many(self, ids, *, filter=None): """ Return iterator of items. - *ids* is a chunk ID list of an item stream. *filter* is a callable - to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item. - - Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved, - otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak. + *ids* is a chunk ID list of an item content data stream. + *filter* is an optional callable to decide whether an item will be yielded, default: yield all items. """ - hlids_preloaded = set() + self.hlids_preloaded = set() unpacker = msgpack.Unpacker(use_list=False) for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM): unpacker.feed(data) for _item in unpacker: item = Item(internal_dict=_item) - if "chunks" in item: - item.chunks = [ChunkListEntry(*e) for e in item.chunks] - if "chunks_healthy" in item: - item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy] - if filter and not filter(item): - continue - if preload and "chunks" in item: - hlid = item.get("hlid", None) - if hlid is None: - preload_chunks = True - elif hlid in hlids_preloaded: - preload_chunks = False - else: - # not having the hardlink's chunks already preloaded for other hardlink to same inode - preload_chunks = True - hlids_preloaded.add(hlid) - if preload_chunks: - self.repository.preload([c.id for c in item.chunks]) - yield item + if filter is None or filter(item): + if "chunks" in item: + item.chunks = [ChunkListEntry(*e) for e in item.chunks] + if "chunks_healthy" in item: + item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy] + yield item + + def preload_item_chunks(self, item, optimize_hardlinks=False): + """ + Preloads the content data chunks of an item (if any). + optimize_hardlinks can be set to True if item chunks only need to be preloaded for + 1st hardlink, but not for any further hardlink to same inode / with same hlid. + Returns True if chunks were preloaded. + + Warning: if data chunks are preloaded then all data chunks have to be retrieved, + otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak. + """ + preload_chunks = False + if "chunks" in item: + if optimize_hardlinks: + hlid = item.get("hlid", None) + if hlid is None: + preload_chunks = True + elif hlid in self.hlids_preloaded: + preload_chunks = False + else: + # not having the hardlink's chunks already preloaded for other hardlink to same inode + preload_chunks = True + self.hlids_preloaded.add(hlid) + else: + preload_chunks = True + if preload_chunks: + self.repository.preload([c.id for c in item.chunks]) + return preload_chunks def fetch_many(self, ids, is_preloaded=False, ro_type=None): assert ro_type is not None @@ -605,12 +618,17 @@ Duration: {0.duration} def item_filter(self, item, filter=None): return filter(item) if filter else True - def iter_items(self, filter=None, preload=False): - # note: when calling this with preload=True, later fetch_many() must be called with - # is_preloaded=True or the RemoteRepository code will leak memory! - yield from self.pipeline.unpack_many( - self.metadata.items, preload=preload, filter=lambda item: self.item_filter(item, filter) - ) + def iter_items(self, filter=None): + yield from self.pipeline.unpack_many(self.metadata.items, filter=lambda item: self.item_filter(item, filter)) + + def preload_item_chunks(self, item, optimize_hardlinks=False): + """ + Preloads item content data chunks from the repository. + + Warning: if data chunks are preloaded then all data chunks have to be retrieved, + otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak. + """ + return self.pipeline.preload_item_chunks(item, optimize_hardlinks=optimize_hardlinks) def add_item(self, item, show_progress=True, stats=None): if show_progress and self.show_progress: diff --git a/src/borg/archiver/extract_cmd.py b/src/borg/archiver/extract_cmd.py index af22fc568..d0fafc1d5 100644 --- a/src/borg/archiver/extract_cmd.py +++ b/src/borg/archiver/extract_cmd.py @@ -56,7 +56,8 @@ class ExtractMixIn: else: pi = None - for item in archive.iter_items(filter, preload=True): + for item in archive.iter_items(filter): + archive.preload_item_chunks(item, optimize_hardlinks=True) orig_path = item.path if strip_components: item.path = os.sep.join(orig_path.split(os.sep)[strip_components:]) diff --git a/src/borg/archiver/tar_cmds.py b/src/borg/archiver/tar_cmds.py index c4798c2e2..d95b992ef 100644 --- a/src/borg/archiver/tar_cmds.py +++ b/src/borg/archiver/tar_cmds.py @@ -226,7 +226,8 @@ class TarMixIn: ph["BORG.item.meta"] = meta_text return ph - for item in archive.iter_items(filter, preload=True): + for item in archive.iter_items(filter): + archive.preload_item_chunks(item, optimize_hardlinks=True) orig_path = item.path if strip_components: item.path = os.sep.join(orig_path.split(os.sep)[strip_components:]) diff --git a/src/borg/cache.py b/src/borg/cache.py index eef00f1a0..d880179bc 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -455,7 +455,7 @@ class FilesCacheMixin: ) files_cache_logger.debug("FILES-CACHE-BUILD: starting...") archive = Archive(self.manifest, prev_archive.id) - for item in archive.iter_items(preload=False): + for item in archive.iter_items(): # only put regular files' infos into the files cache: if stat.S_ISREG(item.mode): path_hash = self.key.id_hash(safe_encode(item.path))