repository: remove chunk preloading

Preloading was used only by extract and export-tar, and the modern
borgstore-based Repository.preload() was already a no-op. Remove the
preload calls from both commands and the now-dead supporting code:

- DownloadPipeline.preload_item_chunks / Archive.preload_item_chunks
- hlids_preloaded tracking
- is_preloaded parameter from DownloadPipeline.fetch_many and
  Repository.get_many
- the no-op Repository.preload()

Also: remove preload support from borg.legacy

With no remaining callers, drop the legacy-side preload machinery:

LegacyRemoteRepository:
- preload_ids / chunkid_to_msgids state and the pop_preload_msgid helper
- is_preloaded parameter and handling in call_many() (get requests now
  always go through the normal send path; MAX_INFLIGHT pipelining of
  regular calls is unchanged)
- is_preloaded from get_many() and the preload() method

LegacyRepository:
- is_preloaded from get_many() and the no-op preload() stub

Both legacy repo classes now match the modern Repository interface.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Thomas Waldmann 2026-06-09 21:22:40 +02:00
parent 2600428f81
commit c914035d08
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
6 changed files with 17 additions and 104 deletions

View file

@ -271,7 +271,6 @@ class DownloadPipeline:
def __init__(self, repository, repo_objs):
self.repository = repository
self.repo_objs = repo_objs
self.hlids_preloaded = None
def unpack_many(self, ids, *, filter=None):
"""
@ -280,7 +279,6 @@ class DownloadPipeline:
*ids* is a chunk ID list of an item content data stream.
*filter* is an optional callable to decide whether an item will be yielded, default: yield all items.
"""
self.hlids_preloaded = set()
unpacker = msgpack.Unpacker(use_list=False)
for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM, replacement_chunk=False):
if data is None:
@ -295,35 +293,7 @@ class DownloadPipeline:
item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
yield item
def preload_item_chunks(self, item, optimize_hardlinks=False):
"""
Preloads the content data chunks of an item (if any).
optimize_hardlinks can be set to True if item chunks only need to be preloaded for
1st hard link, but not for any further hard link to same inode / with same hlid.
Returns True if chunks were preloaded.
Warning: if data chunks are preloaded then all data chunks have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
preload_chunks = False
if "chunks" in item:
if optimize_hardlinks:
hlid = item.get("hlid", None)
if hlid is None:
preload_chunks = True
elif hlid in self.hlids_preloaded:
preload_chunks = False
else:
# not having the hard link's chunks already preloaded for other hard link to same inode
preload_chunks = True
self.hlids_preloaded.add(hlid)
else:
preload_chunks = True
if preload_chunks:
self.repository.preload([c.id for c in item.chunks])
return preload_chunks
def fetch_many(self, chunks, is_preloaded=False, ro_type=None, replacement_chunk=True):
def fetch_many(self, chunks, ro_type=None, replacement_chunk=True):
assert ro_type is not None
ids = []
sizes = []
@ -336,9 +306,7 @@ class DownloadPipeline:
sizes = [None] * len(ids)
else:
raise TypeError(f"unsupported or mixed element types: {chunks}")
for id, size, cdata in zip(
ids, sizes, self.repository.get_many(ids, is_preloaded=is_preloaded, raise_missing=False)
):
for id, size, cdata in zip(ids, sizes, self.repository.get_many(ids, raise_missing=False)):
if cdata is None:
if replacement_chunk and size is not None:
logger.error(f"repository object {bin_to_hex(id)} missing, returning {size} zero bytes.")
@ -663,15 +631,6 @@ Duration: {0.duration}
def iter_items(self, filter=None):
yield from self.pipeline.unpack_many(self.metadata.items, filter=lambda item: self.item_filter(item, filter))
def preload_item_chunks(self, item, optimize_hardlinks=False):
"""
Preloads item content data chunks from the repository.
Warning: if data chunks are preloaded then all data chunks have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
return self.pipeline.preload_item_chunks(item, optimize_hardlinks=optimize_hardlinks)
def add_item(self, item, show_progress=True, stats=None):
if show_progress and self.show_progress:
if stats is None:
@ -811,12 +770,10 @@ Duration: {0.duration}
if dry_run or stdout:
with self.extract_helper(item, "", hlm, dry_run=dry_run or stdout) as hardlink_set:
if not hardlink_set:
# it does not really set hard links due to dry_run, but we need to behave same
# as non-dry_run concerning fetching preloaded chunks from the pipeline or
# it would get stuck.
# it does not really set hard links due to dry_run, but behave the same as non-dry_run.
if "chunks" in item:
item_chunks_size = 0
for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
for data in self.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM):
if pi:
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
if stdout:
@ -872,7 +829,7 @@ Duration: {0.duration}
fd = open(path, "wb")
with fd:
trailing_hole = False
for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
for data in self.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM):
if pi:
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
with backup_io("write"):

View file

@ -72,8 +72,6 @@ class ExtractMixIn:
logging.getLogger("borg.output.list").info(f"{log_prefix} {remove_surrogates(item.path)}")
if is_matched:
archive.preload_item_chunks(item, optimize_hardlinks=True)
if not dry_run:
while dirs and not item.path.startswith(dirs[-1].path):
dir_item = dirs.pop(-1)

View file

@ -112,7 +112,7 @@ class TarMixIn:
"""
Return a file-like object that reads from the chunks of *item*.
"""
chunk_iterator = archive.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM)
chunk_iterator = archive.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM)
if pi:
info = [remove_surrogates(item.path)]
return ChunkIteratorFileWrapper(
@ -231,7 +231,6 @@ class TarMixIn:
return ph
for item in archive.iter_items(filter):
archive.preload_item_chunks(item, optimize_hardlinks=True)
orig_path = item.path
if strip_components:
item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])

View file

@ -241,14 +241,12 @@ class LegacyRemoteRepository:
def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, args=None):
self.location = self._location = location
self.preload_ids = []
self.msgid = 0
self.rx_bytes = 0
self.tx_bytes = 0
self.to_send = EfficientCollectionQueue(1024 * 1024, bytes)
self.stdin_fd = self.stdout_fd = self.stderr_fd = None
self.stderr_received = b"" # incomplete stderr line bytes received (no \n yet)
self.chunkid_to_msgids = {}
self.ignore_responses = set()
self.responses = {}
self.async_responses = {}
@ -431,7 +429,7 @@ class LegacyRemoteRepository:
for resp in self.call_many(cmd, [args], **kw):
return resp
def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True):
def call_many(self, cmd, calls, wait=True, async_wait=True):
if not calls and cmd != "async_responses":
return
@ -448,12 +446,6 @@ class LegacyRemoteRepository:
if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]:
raise
def pop_preload_msgid(chunkid):
msgid = self.chunkid_to_msgids[chunkid].pop(0)
if not self.chunkid_to_msgids[chunkid]:
del self.chunkid_to_msgids[chunkid]
return msgid
def handle_error(unpacked):
if "exception_class" not in unpacked:
return
@ -533,7 +525,7 @@ class LegacyRemoteRepository:
else:
handle_error(unpacked)
yield unpacked[RESULT]
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
if self.to_send or (calls and len(waiting_for) < MAX_INFLIGHT):
w_fds = [self.stdin_fd]
else:
w_fds = []
@ -594,39 +586,15 @@ class LegacyRemoteRepository:
# decode late, avoid partial utf-8 sequences.
_logger.warning("stderr: " + line.decode().strip())
if w:
while (
(len(self.to_send) <= maximum_to_send)
and (calls or self.preload_ids)
and len(waiting_for) < MAX_INFLIGHT
):
if calls:
if is_preloaded:
assert cmd == "get", "is_preload is only supported for 'get'"
if calls[0]["id"] in self.chunkid_to_msgids:
waiting_for.append(pop_preload_msgid(calls.pop(0)["id"]))
else:
args = calls.pop(0)
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
waiting_for.append(pop_preload_msgid(args["id"]))
else:
self.msgid += 1
waiting_for.append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
if not self.to_send and self.preload_ids:
chunk_id = self.preload_ids.pop(0)
args = {"id": chunk_id}
self.msgid += 1
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
while (len(self.to_send) <= maximum_to_send) and calls and len(waiting_for) < MAX_INFLIGHT:
args = calls.pop(0)
self.msgid += 1
waiting_for.append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
send_buffer()
finally:
self.ignore_responses |= set(waiting_for) # we lose order here
if is_preloaded:
for call in calls:
chunkid = call["id"]
if chunkid in self.chunkid_to_msgids:
self.ignore_responses.add(pop_preload_msgid(chunkid))
@api(since=parse_version("1.0.0"), v1_legacy={"since": parse_version("2.0.0b21"), "previously": True})
def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, v1_legacy=False):
@ -668,9 +636,9 @@ class LegacyRemoteRepository:
for resp in self.get_many([id], read_data=read_data, raise_missing=raise_missing):
return resp
def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True):
def get_many(self, ids, read_data=True, raise_missing=True):
# note: legacy remote protocol does not support raise_missing parameter, so we ignore it here
yield from self.call_many("get", [{"id": id, "read_data": read_data} for id in ids], is_preloaded=is_preloaded)
yield from self.call_many("get", [{"id": id, "read_data": read_data} for id in ids])
@api(since=parse_version("1.0.0"))
def put(self, id, data, wait=True):
@ -713,9 +681,6 @@ class LegacyRemoteRepository:
for resp in self.call_many("async_responses", calls=[], wait=True, async_wait=wait):
return resp
def preload(self, ids):
self.preload_ids += ids
@api(since=parse_version("2.0.0b8"))
def get_manifest(self):
"""actual remoting is done via self.call in the @api decorator"""

View file

@ -1148,7 +1148,7 @@ class LegacyRepository:
else:
return None
def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True):
def get_many(self, ids, read_data=True, raise_missing=True):
for id_ in ids:
yield self.get(id_, read_data=read_data, raise_missing=raise_missing)
@ -1213,9 +1213,6 @@ class LegacyRepository:
to arrive. With wait=False, it will only return already received responses.
"""
def preload(self, ids):
"""Preload objects (only applies to remote repositories)"""
def get_manifest(self):
try:
return self.get(Manifest.MANIFEST_ID)

View file

@ -635,7 +635,7 @@ class Repository:
else:
return None
def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True):
def get_many(self, ids, read_data=True, raise_missing=True):
for id_ in ids:
yield self.get(id_, read_data=read_data, raise_missing=raise_missing)
@ -681,9 +681,6 @@ class Repository:
to arrive. With wait=False, it will only return already received responses.
"""
def preload(self, ids):
"""Preload objects (only applies to remote repositories)"""
def break_lock(self):
Lock(self.store).break_lock()