iter_items: decouple item iteration and content data chunks preloading

It needs to be possible to iterate over all items in an archive,
do some output (e.g. if an item is included / excluded) and then
only preload content data chunks for the included items.
This commit is contained in:
Thomas Waldmann 2024-12-23 22:30:12 +01:00
parent 2108616e4c
commit 694fa93b76
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
4 changed files with 55 additions and 35 deletions

View file

@ -262,42 +262,55 @@ class DownloadPipeline:
def __init__(self, repository, repo_objs):
self.repository = repository
self.repo_objs = repo_objs
self.hlids_preloaded = None
def unpack_many(self, ids, *, filter=None, preload=False):
def unpack_many(self, ids, *, filter=None):
"""
Return iterator of items.
*ids* is a chunk ID list of an item stream. *filter* is a callable
to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
*ids* is a chunk ID list of an item content data stream.
*filter* is an optional callable to decide whether an item will be yielded, default: yield all items.
"""
hlids_preloaded = set()
self.hlids_preloaded = set()
unpacker = msgpack.Unpacker(use_list=False)
for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM):
unpacker.feed(data)
for _item in unpacker:
item = Item(internal_dict=_item)
if "chunks" in item:
item.chunks = [ChunkListEntry(*e) for e in item.chunks]
if "chunks_healthy" in item:
item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
if filter and not filter(item):
continue
if preload and "chunks" in item:
hlid = item.get("hlid", None)
if hlid is None:
preload_chunks = True
elif hlid in hlids_preloaded:
preload_chunks = False
else:
# not having the hardlink's chunks already preloaded for other hardlink to same inode
preload_chunks = True
hlids_preloaded.add(hlid)
if preload_chunks:
self.repository.preload([c.id for c in item.chunks])
yield item
if filter is None or filter(item):
if "chunks" in item:
item.chunks = [ChunkListEntry(*e) for e in item.chunks]
if "chunks_healthy" in item:
item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
yield item
def preload_item_chunks(self, item, optimize_hardlinks=False):
"""
Preloads the content data chunks of an item (if any).
optimize_hardlinks can be set to True if item chunks only need to be preloaded for
1st hardlink, but not for any further hardlink to same inode / with same hlid.
Returns True if chunks were preloaded.
Warning: if data chunks are preloaded then all data chunks have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
preload_chunks = False
if "chunks" in item:
if optimize_hardlinks:
hlid = item.get("hlid", None)
if hlid is None:
preload_chunks = True
elif hlid in self.hlids_preloaded:
preload_chunks = False
else:
# not having the hardlink's chunks already preloaded for other hardlink to same inode
preload_chunks = True
self.hlids_preloaded.add(hlid)
else:
preload_chunks = True
if preload_chunks:
self.repository.preload([c.id for c in item.chunks])
return preload_chunks
def fetch_many(self, ids, is_preloaded=False, ro_type=None):
assert ro_type is not None
@ -605,12 +618,17 @@ Duration: {0.duration}
def item_filter(self, item, filter=None):
return filter(item) if filter else True
def iter_items(self, filter=None, preload=False):
# note: when calling this with preload=True, later fetch_many() must be called with
# is_preloaded=True or the RemoteRepository code will leak memory!
yield from self.pipeline.unpack_many(
self.metadata.items, preload=preload, filter=lambda item: self.item_filter(item, filter)
)
def iter_items(self, filter=None):
yield from self.pipeline.unpack_many(self.metadata.items, filter=lambda item: self.item_filter(item, filter))
def preload_item_chunks(self, item, optimize_hardlinks=False):
"""
Preloads item content data chunks from the repository.
Warning: if data chunks are preloaded then all data chunks have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
return self.pipeline.preload_item_chunks(item, optimize_hardlinks=optimize_hardlinks)
def add_item(self, item, show_progress=True, stats=None):
if show_progress and self.show_progress:

View file

@ -56,7 +56,8 @@ class ExtractMixIn:
else:
pi = None
for item in archive.iter_items(filter, preload=True):
for item in archive.iter_items(filter):
archive.preload_item_chunks(item, optimize_hardlinks=True)
orig_path = item.path
if strip_components:
item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])

View file

@ -226,7 +226,8 @@ class TarMixIn:
ph["BORG.item.meta"] = meta_text
return ph
for item in archive.iter_items(filter, preload=True):
for item in archive.iter_items(filter):
archive.preload_item_chunks(item, optimize_hardlinks=True)
orig_path = item.path
if strip_components:
item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])

View file

@ -455,7 +455,7 @@ class FilesCacheMixin:
)
files_cache_logger.debug("FILES-CACHE-BUILD: starting...")
archive = Archive(self.manifest, prev_archive.id)
for item in archive.iter_items(preload=False):
for item in archive.iter_items():
# only put regular files' infos into the files cache:
if stat.S_ISREG(item.mode):
path_hash = self.key.id_hash(safe_encode(item.path))