From c914035d08de8a17cc858425dea4aff8912092b0 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Tue, 9 Jun 2026 21:22:40 +0200 Subject: [PATCH] repository: remove chunk preloading Preloading was used only by extract and export-tar, and the modern borgstore-based Repository.preload() was already a no-op. Remove the preload calls from both commands and the now-dead supporting code: - DownloadPipeline.preload_item_chunks / Archive.preload_item_chunks - hlids_preloaded tracking - is_preloaded parameter from DownloadPipeline.fetch_many and Repository.get_many - the no-op Repository.preload() Also: remove preload support from borg.legacy With no remaining callers, drop the legacy-side preload machinery: LegacyRemoteRepository: - preload_ids / chunkid_to_msgids state and the pop_preload_msgid helper - is_preloaded parameter and handling in call_many() (get requests now always go through the normal send path; MAX_INFLIGHT pipelining of regular calls is unchanged) - is_preloaded from get_many() and the preload() method LegacyRepository: - is_preloaded from get_many() and the no-op preload() stub Both legacy repo classes now match the modern Repository interface. Co-Authored-By: Claude Opus 4.8 --- src/borg/archive.py | 53 +++----------------------------- src/borg/archiver/extract_cmd.py | 2 -- src/borg/archiver/tar_cmds.py | 3 +- src/borg/legacy/remote.py | 53 ++++++-------------------------- src/borg/legacy/repository.py | 5 +-- src/borg/repository.py | 5 +-- 6 files changed, 17 insertions(+), 104 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 7f628adb9..6a3414cfd 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -271,7 +271,6 @@ class DownloadPipeline: def __init__(self, repository, repo_objs): self.repository = repository self.repo_objs = repo_objs - self.hlids_preloaded = None def unpack_many(self, ids, *, filter=None): """ @@ -280,7 +279,6 @@ class DownloadPipeline: *ids* is a chunk ID list of an item content data stream. *filter* is an optional callable to decide whether an item will be yielded, default: yield all items. """ - self.hlids_preloaded = set() unpacker = msgpack.Unpacker(use_list=False) for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM, replacement_chunk=False): if data is None: @@ -295,35 +293,7 @@ class DownloadPipeline: item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy] yield item - def preload_item_chunks(self, item, optimize_hardlinks=False): - """ - Preloads the content data chunks of an item (if any). - optimize_hardlinks can be set to True if item chunks only need to be preloaded for - 1st hard link, but not for any further hard link to same inode / with same hlid. - Returns True if chunks were preloaded. - - Warning: if data chunks are preloaded then all data chunks have to be retrieved, - otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak. - """ - preload_chunks = False - if "chunks" in item: - if optimize_hardlinks: - hlid = item.get("hlid", None) - if hlid is None: - preload_chunks = True - elif hlid in self.hlids_preloaded: - preload_chunks = False - else: - # not having the hard link's chunks already preloaded for other hard link to same inode - preload_chunks = True - self.hlids_preloaded.add(hlid) - else: - preload_chunks = True - if preload_chunks: - self.repository.preload([c.id for c in item.chunks]) - return preload_chunks - - def fetch_many(self, chunks, is_preloaded=False, ro_type=None, replacement_chunk=True): + def fetch_many(self, chunks, ro_type=None, replacement_chunk=True): assert ro_type is not None ids = [] sizes = [] @@ -336,9 +306,7 @@ class DownloadPipeline: sizes = [None] * len(ids) else: raise TypeError(f"unsupported or mixed element types: {chunks}") - for id, size, cdata in zip( - ids, sizes, self.repository.get_many(ids, is_preloaded=is_preloaded, raise_missing=False) - ): + for id, size, cdata in zip(ids, sizes, self.repository.get_many(ids, raise_missing=False)): if cdata is None: if replacement_chunk and size is not None: logger.error(f"repository object {bin_to_hex(id)} missing, returning {size} zero bytes.") @@ -663,15 +631,6 @@ Duration: {0.duration} def iter_items(self, filter=None): yield from self.pipeline.unpack_many(self.metadata.items, filter=lambda item: self.item_filter(item, filter)) - def preload_item_chunks(self, item, optimize_hardlinks=False): - """ - Preloads item content data chunks from the repository. - - Warning: if data chunks are preloaded then all data chunks have to be retrieved, - otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak. - """ - return self.pipeline.preload_item_chunks(item, optimize_hardlinks=optimize_hardlinks) - def add_item(self, item, show_progress=True, stats=None): if show_progress and self.show_progress: if stats is None: @@ -811,12 +770,10 @@ Duration: {0.duration} if dry_run or stdout: with self.extract_helper(item, "", hlm, dry_run=dry_run or stdout) as hardlink_set: if not hardlink_set: - # it does not really set hard links due to dry_run, but we need to behave same - # as non-dry_run concerning fetching preloaded chunks from the pipeline or - # it would get stuck. + # it does not really set hard links due to dry_run, but behave the same as non-dry_run. if "chunks" in item: item_chunks_size = 0 - for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM): + for data in self.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM): if pi: pi.show(increase=len(data), info=[remove_surrogates(item.path)]) if stdout: @@ -872,7 +829,7 @@ Duration: {0.duration} fd = open(path, "wb") with fd: trailing_hole = False - for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM): + for data in self.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM): if pi: pi.show(increase=len(data), info=[remove_surrogates(item.path)]) with backup_io("write"): diff --git a/src/borg/archiver/extract_cmd.py b/src/borg/archiver/extract_cmd.py index eaa6c4049..854d254da 100644 --- a/src/borg/archiver/extract_cmd.py +++ b/src/borg/archiver/extract_cmd.py @@ -72,8 +72,6 @@ class ExtractMixIn: logging.getLogger("borg.output.list").info(f"{log_prefix} {remove_surrogates(item.path)}") if is_matched: - archive.preload_item_chunks(item, optimize_hardlinks=True) - if not dry_run: while dirs and not item.path.startswith(dirs[-1].path): dir_item = dirs.pop(-1) diff --git a/src/borg/archiver/tar_cmds.py b/src/borg/archiver/tar_cmds.py index bb3ac41e3..73119e8ee 100644 --- a/src/borg/archiver/tar_cmds.py +++ b/src/borg/archiver/tar_cmds.py @@ -112,7 +112,7 @@ class TarMixIn: """ Return a file-like object that reads from the chunks of *item*. """ - chunk_iterator = archive.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM) + chunk_iterator = archive.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM) if pi: info = [remove_surrogates(item.path)] return ChunkIteratorFileWrapper( @@ -231,7 +231,6 @@ class TarMixIn: return ph for item in archive.iter_items(filter): - archive.preload_item_chunks(item, optimize_hardlinks=True) orig_path = item.path if strip_components: item.path = os.sep.join(orig_path.split(os.sep)[strip_components:]) diff --git a/src/borg/legacy/remote.py b/src/borg/legacy/remote.py index 091926d63..f2ab2b008 100644 --- a/src/borg/legacy/remote.py +++ b/src/borg/legacy/remote.py @@ -241,14 +241,12 @@ class LegacyRemoteRepository: def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, args=None): self.location = self._location = location - self.preload_ids = [] self.msgid = 0 self.rx_bytes = 0 self.tx_bytes = 0 self.to_send = EfficientCollectionQueue(1024 * 1024, bytes) self.stdin_fd = self.stdout_fd = self.stderr_fd = None self.stderr_received = b"" # incomplete stderr line bytes received (no \n yet) - self.chunkid_to_msgids = {} self.ignore_responses = set() self.responses = {} self.async_responses = {} @@ -431,7 +429,7 @@ class LegacyRemoteRepository: for resp in self.call_many(cmd, [args], **kw): return resp - def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True): + def call_many(self, cmd, calls, wait=True, async_wait=True): if not calls and cmd != "async_responses": return @@ -448,12 +446,6 @@ class LegacyRemoteRepository: if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]: raise - def pop_preload_msgid(chunkid): - msgid = self.chunkid_to_msgids[chunkid].pop(0) - if not self.chunkid_to_msgids[chunkid]: - del self.chunkid_to_msgids[chunkid] - return msgid - def handle_error(unpacked): if "exception_class" not in unpacked: return @@ -533,7 +525,7 @@ class LegacyRemoteRepository: else: handle_error(unpacked) yield unpacked[RESULT] - if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): + if self.to_send or (calls and len(waiting_for) < MAX_INFLIGHT): w_fds = [self.stdin_fd] else: w_fds = [] @@ -594,39 +586,15 @@ class LegacyRemoteRepository: # decode late, avoid partial utf-8 sequences. _logger.warning("stderr: " + line.decode().strip()) if w: - while ( - (len(self.to_send) <= maximum_to_send) - and (calls or self.preload_ids) - and len(waiting_for) < MAX_INFLIGHT - ): - if calls: - if is_preloaded: - assert cmd == "get", "is_preload is only supported for 'get'" - if calls[0]["id"] in self.chunkid_to_msgids: - waiting_for.append(pop_preload_msgid(calls.pop(0)["id"])) - else: - args = calls.pop(0) - if cmd == "get" and args["id"] in self.chunkid_to_msgids: - waiting_for.append(pop_preload_msgid(args["id"])) - else: - self.msgid += 1 - waiting_for.append(self.msgid) - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) - if not self.to_send and self.preload_ids: - chunk_id = self.preload_ids.pop(0) - args = {"id": chunk_id} - self.msgid += 1 - self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args})) + while (len(self.to_send) <= maximum_to_send) and calls and len(waiting_for) < MAX_INFLIGHT: + args = calls.pop(0) + self.msgid += 1 + waiting_for.append(self.msgid) + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) send_buffer() finally: self.ignore_responses |= set(waiting_for) # we lose order here - if is_preloaded: - for call in calls: - chunkid = call["id"] - if chunkid in self.chunkid_to_msgids: - self.ignore_responses.add(pop_preload_msgid(chunkid)) @api(since=parse_version("1.0.0"), v1_legacy={"since": parse_version("2.0.0b21"), "previously": True}) def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, v1_legacy=False): @@ -668,9 +636,9 @@ class LegacyRemoteRepository: for resp in self.get_many([id], read_data=read_data, raise_missing=raise_missing): return resp - def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True): + def get_many(self, ids, read_data=True, raise_missing=True): # note: legacy remote protocol does not support raise_missing parameter, so we ignore it here - yield from self.call_many("get", [{"id": id, "read_data": read_data} for id in ids], is_preloaded=is_preloaded) + yield from self.call_many("get", [{"id": id, "read_data": read_data} for id in ids]) @api(since=parse_version("1.0.0")) def put(self, id, data, wait=True): @@ -713,9 +681,6 @@ class LegacyRemoteRepository: for resp in self.call_many("async_responses", calls=[], wait=True, async_wait=wait): return resp - def preload(self, ids): - self.preload_ids += ids - @api(since=parse_version("2.0.0b8")) def get_manifest(self): """actual remoting is done via self.call in the @api decorator""" diff --git a/src/borg/legacy/repository.py b/src/borg/legacy/repository.py index 5e8ca46f3..274f1fe00 100644 --- a/src/borg/legacy/repository.py +++ b/src/borg/legacy/repository.py @@ -1148,7 +1148,7 @@ class LegacyRepository: else: return None - def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True): + def get_many(self, ids, read_data=True, raise_missing=True): for id_ in ids: yield self.get(id_, read_data=read_data, raise_missing=raise_missing) @@ -1213,9 +1213,6 @@ class LegacyRepository: to arrive. With wait=False, it will only return already received responses. """ - def preload(self, ids): - """Preload objects (only applies to remote repositories)""" - def get_manifest(self): try: return self.get(Manifest.MANIFEST_ID) diff --git a/src/borg/repository.py b/src/borg/repository.py index 63001124d..5f45d29c4 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -635,7 +635,7 @@ class Repository: else: return None - def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True): + def get_many(self, ids, read_data=True, raise_missing=True): for id_ in ids: yield self.get(id_, read_data=read_data, raise_missing=raise_missing) @@ -681,9 +681,6 @@ class Repository: to arrive. With wait=False, it will only return already received responses. """ - def preload(self, ids): - """Preload objects (only applies to remote repositories)""" - def break_lock(self): Lock(self.store).break_lock()