diff --git a/src/borg/archive.py b/src/borg/archive.py index 5ddf9790d..01d1617d1 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -262,42 +262,55 @@ class DownloadPipeline: def __init__(self, repository, repo_objs): self.repository = repository self.repo_objs = repo_objs + self.hlids_preloaded = None - def unpack_many(self, ids, *, filter=None, preload=False): + def unpack_many(self, ids, *, filter=None): """ Return iterator of items. - *ids* is a chunk ID list of an item stream. *filter* is a callable - to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item. - - Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved, - otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak. + *ids* is a chunk ID list of an item content data stream. + *filter* is an optional callable to decide whether an item will be yielded, default: yield all items. """ - hlids_preloaded = set() + self.hlids_preloaded = set() unpacker = msgpack.Unpacker(use_list=False) for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM): unpacker.feed(data) for _item in unpacker: item = Item(internal_dict=_item) - if "chunks" in item: - item.chunks = [ChunkListEntry(*e) for e in item.chunks] - if "chunks_healthy" in item: - item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy] - if filter and not filter(item): - continue - if preload and "chunks" in item: - hlid = item.get("hlid", None) - if hlid is None: - preload_chunks = True - elif hlid in hlids_preloaded: - preload_chunks = False - else: - # not having the hardlink's chunks already preloaded for other hardlink to same inode - preload_chunks = True - hlids_preloaded.add(hlid) - if preload_chunks: - self.repository.preload([c.id for c in item.chunks]) - yield item + if filter is None or filter(item): + if "chunks" in item: + item.chunks = [ChunkListEntry(*e) for e in item.chunks] + if "chunks_healthy" in item: + item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy] + yield item + + def preload_item_chunks(self, item, optimize_hardlinks=False): + """ + Preloads the content data chunks of an item (if any). + optimize_hardlinks can be set to True if item chunks only need to be preloaded for + 1st hardlink, but not for any further hardlink to same inode / with same hlid. + Returns True if chunks were preloaded. + + Warning: if data chunks are preloaded then all data chunks have to be retrieved, + otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak. + """ + preload_chunks = False + if "chunks" in item: + if optimize_hardlinks: + hlid = item.get("hlid", None) + if hlid is None: + preload_chunks = True + elif hlid in self.hlids_preloaded: + preload_chunks = False + else: + # not having the hardlink's chunks already preloaded for other hardlink to same inode + preload_chunks = True + self.hlids_preloaded.add(hlid) + else: + preload_chunks = True + if preload_chunks: + self.repository.preload([c.id for c in item.chunks]) + return preload_chunks def fetch_many(self, ids, is_preloaded=False, ro_type=None): assert ro_type is not None @@ -605,12 +618,17 @@ Duration: {0.duration} def item_filter(self, item, filter=None): return filter(item) if filter else True - def iter_items(self, filter=None, preload=False): - # note: when calling this with preload=True, later fetch_many() must be called with - # is_preloaded=True or the RemoteRepository code will leak memory! - yield from self.pipeline.unpack_many( - self.metadata.items, preload=preload, filter=lambda item: self.item_filter(item, filter) - ) + def iter_items(self, filter=None): + yield from self.pipeline.unpack_many(self.metadata.items, filter=lambda item: self.item_filter(item, filter)) + + def preload_item_chunks(self, item, optimize_hardlinks=False): + """ + Preloads item content data chunks from the repository. + + Warning: if data chunks are preloaded then all data chunks have to be retrieved, + otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak. + """ + return self.pipeline.preload_item_chunks(item, optimize_hardlinks=optimize_hardlinks) def add_item(self, item, show_progress=True, stats=None): if show_progress and self.show_progress: diff --git a/src/borg/archiver/extract_cmd.py b/src/borg/archiver/extract_cmd.py index af22fc568..d0fafc1d5 100644 --- a/src/borg/archiver/extract_cmd.py +++ b/src/borg/archiver/extract_cmd.py @@ -56,7 +56,8 @@ class ExtractMixIn: else: pi = None - for item in archive.iter_items(filter, preload=True): + for item in archive.iter_items(filter): + archive.preload_item_chunks(item, optimize_hardlinks=True) orig_path = item.path if strip_components: item.path = os.sep.join(orig_path.split(os.sep)[strip_components:]) diff --git a/src/borg/archiver/tar_cmds.py b/src/borg/archiver/tar_cmds.py index c4798c2e2..d95b992ef 100644 --- a/src/borg/archiver/tar_cmds.py +++ b/src/borg/archiver/tar_cmds.py @@ -226,7 +226,8 @@ class TarMixIn: ph["BORG.item.meta"] = meta_text return ph - for item in archive.iter_items(filter, preload=True): + for item in archive.iter_items(filter): + archive.preload_item_chunks(item, optimize_hardlinks=True) orig_path = item.path if strip_components: item.path = os.sep.join(orig_path.split(os.sep)[strip_components:]) diff --git a/src/borg/cache.py b/src/borg/cache.py index eef00f1a0..d880179bc 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -455,7 +455,7 @@ class FilesCacheMixin: ) files_cache_logger.debug("FILES-CACHE-BUILD: starting...") archive = Archive(self.manifest, prev_archive.id) - for item in archive.iter_items(preload=False): + for item in archive.iter_items(): # only put regular files' infos into the files cache: if stat.S_ISREG(item.mode): path_hash = self.key.id_hash(safe_encode(item.path)) diff --git a/src/borg/remote.py b/src/borg/remote.py index fcf6a116f..03fd633f7 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -780,6 +780,8 @@ class RemoteRepository: if not calls and cmd != "async_responses": return + assert not is_preloaded or cmd == "get", "is_preloaded is only supported for 'get'" + def send_buffer(): if self.to_send: try: @@ -846,6 +848,9 @@ class RemoteRepository: maximum_to_send = 0 if wait else self.upload_buffer_size_limit send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise. while wait or calls: + logger.debug( + f"call_many: calls: {len(calls)} waiting_for: {len(waiting_for)} responses: {len(self.responses)}" + ) if self.shutdown_time and time.monotonic() > self.shutdown_time: # we are shutting this RemoteRepository down already, make sure we do not waste # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. @@ -946,18 +951,18 @@ class RemoteRepository: and len(waiting_for) < MAX_INFLIGHT ): if calls: - if is_preloaded: - assert cmd == "get", "is_preload is only supported for 'get'" - if calls[0]["id"] in self.chunkid_to_msgids: - waiting_for.append(pop_preload_msgid(calls.pop(0)["id"])) - else: - args = calls.pop(0) - if cmd == "get" and args["id"] in self.chunkid_to_msgids: - waiting_for.append(pop_preload_msgid(args["id"])) - else: - self.msgid += 1 - waiting_for.append(self.msgid) - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) + args = calls[0] + if cmd == "get" and args["id"] in self.chunkid_to_msgids: + # we have a get command and have already sent a request for this chunkid when + # doing preloading, so we know the msgid of the response we are waiting for: + waiting_for.append(pop_preload_msgid(args["id"])) + del calls[0] + elif not is_preloaded: + # make and send a request (already done if we are using preloading) + self.msgid += 1 + waiting_for.append(self.msgid) + del calls[0] + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) if not self.to_send and self.preload_ids: chunk_id = self.preload_ids.pop(0) args = {"id": chunk_id}