Merge pull request #8592 from ThomasWaldmann/preload

item content chunks preloading related changes
This commit is contained in:
TW 2024-12-24 16:02:00 +01:00 committed by GitHub
commit 26b2ffc8a0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 72 additions and 47 deletions

View file

@ -262,42 +262,55 @@ class DownloadPipeline:
def __init__(self, repository, repo_objs):
self.repository = repository
self.repo_objs = repo_objs
self.hlids_preloaded = None
def unpack_many(self, ids, *, filter=None, preload=False):
def unpack_many(self, ids, *, filter=None):
"""
Return iterator of items.
*ids* is a chunk ID list of an item stream. *filter* is a callable
to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
*ids* is a chunk ID list of an item content data stream.
*filter* is an optional callable to decide whether an item will be yielded, default: yield all items.
"""
hlids_preloaded = set()
self.hlids_preloaded = set()
unpacker = msgpack.Unpacker(use_list=False)
for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM):
unpacker.feed(data)
for _item in unpacker:
item = Item(internal_dict=_item)
if "chunks" in item:
item.chunks = [ChunkListEntry(*e) for e in item.chunks]
if "chunks_healthy" in item:
item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
if filter and not filter(item):
continue
if preload and "chunks" in item:
hlid = item.get("hlid", None)
if hlid is None:
preload_chunks = True
elif hlid in hlids_preloaded:
preload_chunks = False
else:
# not having the hardlink's chunks already preloaded for other hardlink to same inode
preload_chunks = True
hlids_preloaded.add(hlid)
if preload_chunks:
self.repository.preload([c.id for c in item.chunks])
yield item
if filter is None or filter(item):
if "chunks" in item:
item.chunks = [ChunkListEntry(*e) for e in item.chunks]
if "chunks_healthy" in item:
item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
yield item
def preload_item_chunks(self, item, optimize_hardlinks=False):
"""
Preloads the content data chunks of an item (if any).
optimize_hardlinks can be set to True if item chunks only need to be preloaded for
1st hardlink, but not for any further hardlink to same inode / with same hlid.
Returns True if chunks were preloaded.
Warning: if data chunks are preloaded then all data chunks have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
preload_chunks = False
if "chunks" in item:
if optimize_hardlinks:
hlid = item.get("hlid", None)
if hlid is None:
preload_chunks = True
elif hlid in self.hlids_preloaded:
preload_chunks = False
else:
# not having the hardlink's chunks already preloaded for other hardlink to same inode
preload_chunks = True
self.hlids_preloaded.add(hlid)
else:
preload_chunks = True
if preload_chunks:
self.repository.preload([c.id for c in item.chunks])
return preload_chunks
def fetch_many(self, ids, is_preloaded=False, ro_type=None):
assert ro_type is not None
@ -605,12 +618,17 @@ Duration: {0.duration}
def item_filter(self, item, filter=None):
return filter(item) if filter else True
def iter_items(self, filter=None, preload=False):
# note: when calling this with preload=True, later fetch_many() must be called with
# is_preloaded=True or the RemoteRepository code will leak memory!
yield from self.pipeline.unpack_many(
self.metadata.items, preload=preload, filter=lambda item: self.item_filter(item, filter)
)
def iter_items(self, filter=None):
yield from self.pipeline.unpack_many(self.metadata.items, filter=lambda item: self.item_filter(item, filter))
def preload_item_chunks(self, item, optimize_hardlinks=False):
"""
Preloads item content data chunks from the repository.
Warning: if data chunks are preloaded then all data chunks have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
return self.pipeline.preload_item_chunks(item, optimize_hardlinks=optimize_hardlinks)
def add_item(self, item, show_progress=True, stats=None):
if show_progress and self.show_progress:

View file

@ -56,7 +56,8 @@ class ExtractMixIn:
else:
pi = None
for item in archive.iter_items(filter, preload=True):
for item in archive.iter_items(filter):
archive.preload_item_chunks(item, optimize_hardlinks=True)
orig_path = item.path
if strip_components:
item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])

View file

@ -226,7 +226,8 @@ class TarMixIn:
ph["BORG.item.meta"] = meta_text
return ph
for item in archive.iter_items(filter, preload=True):
for item in archive.iter_items(filter):
archive.preload_item_chunks(item, optimize_hardlinks=True)
orig_path = item.path
if strip_components:
item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])

View file

@ -455,7 +455,7 @@ class FilesCacheMixin:
)
files_cache_logger.debug("FILES-CACHE-BUILD: starting...")
archive = Archive(self.manifest, prev_archive.id)
for item in archive.iter_items(preload=False):
for item in archive.iter_items():
# only put regular files' infos into the files cache:
if stat.S_ISREG(item.mode):
path_hash = self.key.id_hash(safe_encode(item.path))

View file

@ -780,6 +780,8 @@ class RemoteRepository:
if not calls and cmd != "async_responses":
return
assert not is_preloaded or cmd == "get", "is_preloaded is only supported for 'get'"
def send_buffer():
if self.to_send:
try:
@ -846,6 +848,9 @@ class RemoteRepository:
maximum_to_send = 0 if wait else self.upload_buffer_size_limit
send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise.
while wait or calls:
logger.debug(
f"call_many: calls: {len(calls)} waiting_for: {len(waiting_for)} responses: {len(self.responses)}"
)
if self.shutdown_time and time.monotonic() > self.shutdown_time:
# we are shutting this RemoteRepository down already, make sure we do not waste
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
@ -946,18 +951,18 @@ class RemoteRepository:
and len(waiting_for) < MAX_INFLIGHT
):
if calls:
if is_preloaded:
assert cmd == "get", "is_preload is only supported for 'get'"
if calls[0]["id"] in self.chunkid_to_msgids:
waiting_for.append(pop_preload_msgid(calls.pop(0)["id"]))
else:
args = calls.pop(0)
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
waiting_for.append(pop_preload_msgid(args["id"]))
else:
self.msgid += 1
waiting_for.append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
args = calls[0]
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
# we have a get command and have already sent a request for this chunkid when
# doing preloading, so we know the msgid of the response we are waiting for:
waiting_for.append(pop_preload_msgid(args["id"]))
del calls[0]
elif not is_preloaded:
# make and send a request (already done if we are using preloading)
self.msgid += 1
waiting_for.append(self.msgid)
del calls[0]
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
if not self.to_send and self.preload_ids:
chunk_id = self.preload_ids.pop(0)
args = {"id": chunk_id}