simplify fetch_many api

it now takes either:
- a list of ChunkListEntry namedtuples
- a list of chunk IDs [bytes]
This commit is contained in:
Thomas Waldmann 2024-11-25 21:55:30 +01:00
parent 6357f2ebc1
commit e48ccc7627
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
4 changed files with 23 additions and 16 deletions

View file

@ -312,10 +312,22 @@ class DownloadPipeline:
self.repository.preload([c.id for c in item.chunks])
return preload_chunks
def fetch_many(self, ids, is_preloaded=False, ro_type=None):
def fetch_many(self, chunks, is_preloaded=False, ro_type=None):
assert ro_type is not None
for id_, cdata in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
_, data = self.repo_objs.parse(id_, cdata, ro_type=ro_type)
ids = []
sizes = []
if all(isinstance(chunk, ChunkListEntry) for chunk in chunks):
for chunk in chunks:
ids.append(chunk.id)
sizes.append(chunk.size)
elif all(isinstance(chunk, bytes) for chunk in chunks):
ids = list(chunks)
sizes = [None] * len(ids)
else:
raise TypeError(f"unsupported or mixed element types: {chunks}")
for id, size, cdata in zip(ids, sizes, self.repository.get_many(ids, is_preloaded=is_preloaded)):
_, data = self.repo_objs.parse(id, cdata, ro_type=ro_type)
assert size is None or len(data) == size
yield data
@ -770,9 +782,7 @@ Duration: {0.duration}
# it would get stuck.
if "chunks" in item:
item_chunks_size = 0
for data in self.pipeline.fetch_many(
[c.id for c in item.chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM
):
for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
if pi:
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
if stdout:
@ -821,8 +831,7 @@ Duration: {0.duration}
with backup_io("open"):
fd = open(path, "wb")
with fd:
ids = [c.id for c in item.chunks]
for data in self.pipeline.fetch_many(ids, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
if pi:
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
with backup_io("write"):
@ -1005,8 +1014,8 @@ Duration: {0.duration}
path,
item1,
item2,
archive1.pipeline.fetch_many([c.id for c in item1.get("chunks", [])], ro_type=ROBJ_FILE_STREAM),
archive2.pipeline.fetch_many([c.id for c in item2.get("chunks", [])], ro_type=ROBJ_FILE_STREAM),
archive1.pipeline.fetch_many(item1.get("chunks", []), ro_type=ROBJ_FILE_STREAM),
archive2.pipeline.fetch_many(item2.get("chunks", []), ro_type=ROBJ_FILE_STREAM),
can_compare_chunk_ids=can_compare_chunk_ids,
)
@ -2193,7 +2202,7 @@ class ArchiveRecreater:
return chunk_entry
def iter_chunks(self, archive, target, chunks):
chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks], ro_type=ROBJ_FILE_STREAM)
chunk_iterator = archive.pipeline.fetch_many(chunks, ro_type=ROBJ_FILE_STREAM)
if target.recreate_rechunkify:
# The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
# (does not load the entire file into memory)

View file

@ -113,9 +113,7 @@ class TarMixIn:
"""
Return a file-like object that reads from the chunks of *item*.
"""
chunk_iterator = archive.pipeline.fetch_many(
[chunk_id for chunk_id, _ in item.chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM
)
chunk_iterator = archive.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM)
if pi:
info = [remove_surrogates(item.path)]
return ChunkIteratorFileWrapper(

View file

@ -124,7 +124,7 @@ class ChunkIteratorFileWrapper:
def open_item(archive, item):
"""Return file-like object for archived item (with chunks)."""
chunk_iterator = archive.pipeline.fetch_many([c.id for c in item.chunks], ro_type=ROBJ_FILE_STREAM)
chunk_iterator = archive.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM)
return ChunkIteratorFileWrapper(chunk_iterator)

View file

@ -911,7 +911,7 @@ class ItemFormatter(BaseFormatter):
hash = self.xxh64()
elif hash_function in self.hash_algorithms:
hash = hashlib.new(hash_function)
for data in self.archive.pipeline.fetch_many([c.id for c in item.chunks], ro_type=ROBJ_FILE_STREAM):
for data in self.archive.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM):
hash.update(data)
return hash.hexdigest()