From 71bbde208f8303e4feab3f4272b2ad35cb064fd3 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Sat, 6 Jun 2026 00:09:08 +0530 Subject: [PATCH 1/2] repository: add PackWriter and two-phase chunk index update, refs #8572 PackWriter buffers (chunk_id, cdata) pairs and flushes as pack files via borgstore. At N=1 pack_id == chunk_id; UNKNOWN_INT32 (0xFFFFFFFF) placeholders in the index are replaced by real pack location fields after flush() via update_pack_info(). Update test_chunkindex_add to expect UNKNOWN_INT32 sentinels from add(). --- src/borg/archive.py | 10 +++- src/borg/cache.py | 6 ++- src/borg/constants.py | 4 ++ src/borg/hashindex.pyi | 1 + src/borg/hashindex.pyx | 11 +++- src/borg/repository.py | 77 ++++++++++++++++++++++++++-- src/borg/testsuite/hashindex_test.py | 13 +++-- 7 files changed, 111 insertions(+), 11 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index ee156c412..c2c929443 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -2014,10 +2014,16 @@ class ArchiveChecker: assert cdata is not None pack_id = id_ # only correct for N=1 self.chunks[id_] = ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=size, pack_id=pack_id, obj_offset=0, obj_size=0 + flags=ChunkIndex.F_USED, + size=size, + pack_id=pack_id, + obj_offset=UNKNOWN_INT32, + obj_size=UNKNOWN_INT32, ) if self.repair: - self.repository.put(id_, cdata) + pack_results = self.repository.put(id_, cdata) + for chunk_id, pack_id, obj_offset, obj_size in pack_results: + self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size) def verify_file_chunks(archive_name, item): """Verifies that all file chunks are present. Missing file chunks will be logged.""" diff --git a/src/borg/cache.py b/src/borg/cache.py index 7ef37c89f..014177bca 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -730,9 +730,13 @@ class ChunksMixin: cdata = self.repo_objs.format( id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type ) - self.repository.put(id, cdata, wait=wait) + pack_results = self.repository.put(id, cdata, wait=wait) self.last_refresh_dt = now # .put also refreshed the lock self.chunks.add(id, size) + # pack_results is non-empty when the PackWriter flushed (always at max_count=1). + # Update the index with real pack location so obj_size is not UNKNOWN_INT32. + for chunk_id, pack_id, obj_offset, obj_size in pack_results: + self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size) stats.update(size, not exists) return ChunkListEntry(id, size) diff --git a/src/borg/constants.py b/src/borg/constants.py index b62af7bf9..28e9010c8 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -51,6 +51,10 @@ ROBJ_DONTCARE = "*" # used to parse without type assertion (= accept any type) # the header, and the total size was set to precisely 20 MiB for borg < 1.3). MAX_DATA_SIZE = 20971479 +# Placeholder for pack location fields (obj_offset, obj_size) when the value is not yet known. +# Grep for UNKNOWN_INT32 to find every site that still needs updating. +UNKNOWN_INT32 = 0xFFFFFFFF + # MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header) MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 # see assertion at end of repository module diff --git a/src/borg/hashindex.pyi b/src/borg/hashindex.pyi index a978c587a..359db0c0e 100644 --- a/src/borg/hashindex.pyi +++ b/src/borg/hashindex.pyi @@ -21,6 +21,7 @@ class ChunkIndex: M_USER: int M_SYSTEM: int def add(self, key: bytes, size: int) -> None: ... + def update_pack_info(self, key: bytes, pack_id: bytes, obj_offset: int, obj_size: int) -> None: ... def iteritems(self, *, only_new: bool = ...) -> Iterator: ... def clear_new(self) -> None: ... def __contains__(self, key: bytes) -> bool: ... diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index e4eb43274..261ddc262 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -5,6 +5,8 @@ import struct from borghash import HashTableNT +from .constants import UNKNOWN_INT32 + cdef _NoDefault = object() @@ -80,7 +82,9 @@ class ChunkIndex(HTProxyMixin, MutableMapping): flags = v.flags | self.F_USED assert v.size == 0 or v.size == size pack_id = key # N=1: chunk_id == pack_id - self[key] = ChunkIndexEntry(flags=flags, size=size, pack_id=pack_id, obj_offset=0, obj_size=0) + self[key] = ChunkIndexEntry( + flags=flags, size=size, pack_id=pack_id, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) def __getitem__(self, key): """Specialized __getitem__ that hides system flags.""" @@ -105,6 +109,11 @@ class ChunkIndex(HTProxyMixin, MutableMapping): user_flags = value.flags & self.M_USER self.ht[key] = value._replace(flags=system_flags | user_flags) + def update_pack_info(self, key, pack_id, obj_offset, obj_size): + """Update the on-disk location fields for an existing entry without changing flags or size.""" + existing = self[key] + self[key] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size) + def clear_new(self): """Clears the F_NEW flag of all items.""" for key, value in self.ht.items(): diff --git a/src/borg/repository.py b/src/borg/repository.py index 879e5afde..2fe83e80f 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -99,6 +99,71 @@ def build_rest_backend(location): return REST(base_url="http://stdio-backend", command=rest_serve_command(location)) +class PackWriter: + """Buffers chunks into a pack file and writes to the store when full. + + Collects (chunk_id, cdata) pairs in a list and flushes once max_count is + reached. On flush it returns the location info for every chunk so the + caller can update the ChunkIndex with real values. + + At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack, + so pack_id == chunk_id and the naming scheme is unchanged from before. + Raising max_count later (N>1 phase) enables real packing without touching + this class's interface. + """ + + def __init__(self, store, *, max_count=1): + self.store = store + self.max_count = max_count + self._pieces = [] # list of (chunk_id, cdata) + + def add(self, chunk_id, cdata): + """Buffer a chunk. Returns flush results if the pack is now full, else [].""" + self._pieces.append((chunk_id, cdata)) + if len(self._pieces) >= self.max_count: + return self.flush() + return [] + + def flush(self): + """Write the current pack to the store. + + Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples — + one entry per chunk that was written. Returns [] if there was nothing + to flush. + """ + if not self._pieces: + return [] + + # Build the pack bytes once by joining all pieces (avoids O(n^2) copies + # that incremental string concatenation would cause in Python). + pack_data = b"".join(cdata for _, cdata in self._pieces) + + # Determine pack_id. + # N=1: the pack contains exactly one chunk, so we keep pack_id == chunk_id + # (backward-compatible file naming: packs/{chunk_id_hex}). + # N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the + # file is content-addressed and borgstore can verify/cache it. + if len(self._pieces) == 1: + pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id + else: + pack_id = sha256(pack_data).digest() # N>1: content-addressed + + # Record (chunk_id, pack_id, obj_offset, obj_size) for every piece. + results = [] + offset = 0 + for chunk_id, cdata in self._pieces: + obj_size = len(cdata) + results.append((chunk_id, pack_id, offset, obj_size)) + offset += obj_size + + key = "packs/" + bin_to_hex(pack_id) + try: + self.store.store(key, pack_data) + finally: + self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk + return results + + class Repository: """borgstore-based key/value store.""" @@ -219,6 +284,7 @@ class Repository: self.do_lock = lock self.lock_wait = lock_wait self.exclusive = exclusive + self._pack_writer = None def __repr__(self): return f"<{self.__class__.__name__} {self._location}>" @@ -340,6 +406,7 @@ class Repository: # important: lock *after* making sure that there actually is an existing, supported repository. if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() + self._pack_writer = PackWriter(self.store, max_count=1) self.opened = True def close(self): @@ -564,15 +631,17 @@ class Repository: Note: when doing calls with wait=False this gets async and caller must deal with async results / exceptions later. + + Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for + every chunk written to disk this call. At max_count=1 this is always + one entry. Callers should pass these to ChunkIndex.update_pack_info() + so the index holds real location values rather than UNKNOWN_INT32 placeholders. """ self._lock_refresh() data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - - pack_id = id # N=1: pack_id == chunk_id - key = "packs/" + bin_to_hex(pack_id) - self.store.store(key, data) + return self._pack_writer.add(id, data) def delete(self, id, wait=True): """delete a repo object diff --git a/src/borg/testsuite/hashindex_test.py b/src/borg/testsuite/hashindex_test.py index 8c9ec5be9..6ba88d53a 100644 --- a/src/borg/testsuite/hashindex_test.py +++ b/src/borg/testsuite/hashindex_test.py @@ -3,6 +3,7 @@ import struct import pytest +from ..constants import UNKNOWN_INT32 from ..hashindex import ChunkIndex, ChunkIndexEntry @@ -20,11 +21,17 @@ def test_chunkindex_add(): chunks = ChunkIndex() x = H2(1) chunks.add(x, 0) - assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=0, obj_size=0) + assert chunks[x] == ChunkIndexEntry( + flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) chunks.add(x, 2) # updating size (we do not have a size yet) - assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=0, obj_size=0) + assert chunks[x] == ChunkIndexEntry( + flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) chunks.add(x, 2) - assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=0, obj_size=0) + assert chunks[x] == ChunkIndexEntry( + flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) with pytest.raises(AssertionError): chunks.add(x, 3) # inconsistent size (we already have a different size) From 0395cc1bef558138304e1a414bcf1cd7b3d7fdbc Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Mon, 8 Jun 2026 03:41:33 +0530 Subject: [PATCH 2/2] hashindex: add UNKNOWN_BYTES32, batch update_pack_info, return None from PackWriter, refs #8572 Fix PackWriter.flush() to use max_count == 1 (not len == 1) for the pack_id hack, so final partial packs under max_count > 1 correctly use SHA256. Add covering test. Move sha256 import to module level in repository_test. --- src/borg/archive.py | 6 +-- src/borg/cache.py | 10 ++-- src/borg/constants.py | 3 ++ src/borg/hashindex.pyi | 2 +- src/borg/hashindex.pyx | 16 ++++--- src/borg/repository.py | 23 +++++++-- src/borg/testsuite/hashindex_test.py | 29 ++++++++++-- src/borg/testsuite/repository_test.py | 67 ++++++++++++++++++++++++++- 8 files changed, 130 insertions(+), 26 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index c2c929443..7f628adb9 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -2012,18 +2012,16 @@ class ArchiveChecker: # either we already have this chunk in repo and chunks index or we add it now if id_ not in self.chunks: assert cdata is not None - pack_id = id_ # only correct for N=1 self.chunks[id_] = ChunkIndexEntry( flags=ChunkIndex.F_USED, size=size, - pack_id=pack_id, + pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32, ) if self.repair: pack_results = self.repository.put(id_, cdata) - for chunk_id, pack_id, obj_offset, obj_size in pack_results: - self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size) + self.chunks.update_pack_info(pack_results) def verify_file_chunks(archive_name, item): """Verifies that all file chunks are present. Missing file chunks will be logged.""" diff --git a/src/borg/cache.py b/src/borg/cache.py index 014177bca..b52fe31cb 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -733,10 +733,7 @@ class ChunksMixin: pack_results = self.repository.put(id, cdata, wait=wait) self.last_refresh_dt = now # .put also refreshed the lock self.chunks.add(id, size) - # pack_results is non-empty when the PackWriter flushed (always at max_count=1). - # Update the index with real pack location so obj_size is not UNKNOWN_INT32. - for chunk_id, pack_id, obj_offset, obj_size in pack_results: - self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size) + self.chunks.update_pack_info(pack_results) stats.update(size, not exists) return ChunkListEntry(id, size) @@ -834,6 +831,11 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): def close(self): self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.close") + # Flush any chunks still buffered in the pack writer and update the index + # so the last batch gets real pack location values instead of UNKNOWN_*. + if self._chunks is not None: + pack_results = self.repository.flush() + self._chunks.update_pack_info(pack_results) if self._files is not None: pi.output("Saving files cache") integrity_data = self._write_files_cache(self._files) diff --git a/src/borg/constants.py b/src/borg/constants.py index 28e9010c8..262ac4d27 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -55,6 +55,9 @@ MAX_DATA_SIZE = 20971479 # Grep for UNKNOWN_INT32 to find every site that still needs updating. UNKNOWN_INT32 = 0xFFFFFFFF +# Placeholder for pack_id (32-byte field) when the value is not yet known. +UNKNOWN_BYTES32 = b"\xff" * 32 + # MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header) MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 # see assertion at end of repository module diff --git a/src/borg/hashindex.pyi b/src/borg/hashindex.pyi index 359db0c0e..a947a3d54 100644 --- a/src/borg/hashindex.pyi +++ b/src/borg/hashindex.pyi @@ -21,7 +21,7 @@ class ChunkIndex: M_USER: int M_SYSTEM: int def add(self, key: bytes, size: int) -> None: ... - def update_pack_info(self, key: bytes, pack_id: bytes, obj_offset: int, obj_size: int) -> None: ... + def update_pack_info(self, pack_results: list | None) -> None: ... def iteritems(self, *, only_new: bool = ...) -> Iterator: ... def clear_new(self) -> None: ... def __contains__(self, key: bytes) -> bool: ... diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 261ddc262..cc280528e 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -5,7 +5,7 @@ import struct from borghash import HashTableNT -from .constants import UNKNOWN_INT32 +from .constants import UNKNOWN_INT32, UNKNOWN_BYTES32 @@ -81,9 +81,8 @@ class ChunkIndex(HTProxyMixin, MutableMapping): else: flags = v.flags | self.F_USED assert v.size == 0 or v.size == size - pack_id = key # N=1: chunk_id == pack_id self[key] = ChunkIndexEntry( - flags=flags, size=size, pack_id=pack_id, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=flags, size=size, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) def __getitem__(self, key): @@ -109,10 +108,13 @@ class ChunkIndex(HTProxyMixin, MutableMapping): user_flags = value.flags & self.M_USER self.ht[key] = value._replace(flags=system_flags | user_flags) - def update_pack_info(self, key, pack_id, obj_offset, obj_size): - """Update the on-disk location fields for an existing entry without changing flags or size.""" - existing = self[key] - self[key] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size) + def update_pack_info(self, pack_results): + """Update the on-disk location fields for a list of (chunk_id, pack_id, obj_offset, obj_size) tuples.""" + if not pack_results: + return + for chunk_id, pack_id, obj_offset, obj_size in pack_results: + existing = self[chunk_id] + self[chunk_id] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size) def clear_new(self): """Clears the F_NEW flag of all items.""" diff --git a/src/borg/repository.py b/src/borg/repository.py index 2fe83e80f..63001124d 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -118,21 +118,21 @@ class PackWriter: self._pieces = [] # list of (chunk_id, cdata) def add(self, chunk_id, cdata): - """Buffer a chunk. Returns flush results if the pack is now full, else [].""" + """Buffer a chunk. Returns flush results if the pack is now full, else None.""" self._pieces.append((chunk_id, cdata)) if len(self._pieces) >= self.max_count: return self.flush() - return [] + return None def flush(self): """Write the current pack to the store. Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples — - one entry per chunk that was written. Returns [] if there was nothing + one entry per chunk that was written. Returns None if there was nothing to flush. """ if not self._pieces: - return [] + return None # Build the pack bytes once by joining all pieces (avoids O(n^2) copies # that incremental string concatenation would cause in Python). @@ -143,7 +143,7 @@ class PackWriter: # (backward-compatible file naming: packs/{chunk_id_hex}). # N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the # file is content-addressed and borgstore can verify/cache it. - if len(self._pieces) == 1: + if self.max_count == 1: pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id else: pack_id = sha256(pack_data).digest() # N>1: content-addressed @@ -409,7 +409,20 @@ class Repository: self._pack_writer = PackWriter(self.store, max_count=1) self.opened = True + def flush(self): + """Flush any buffered pack writer chunks. Returns pack_results (or None). + + Callers that maintain a ChunkIndex must call this and pass the result to + chunks.update_pack_info() before closing, so index entries for the last + batch of chunks get real pack location values instead of UNKNOWN_*. + """ + if self._pack_writer is not None: + return self._pack_writer.flush() + return None + def close(self): + if self._pack_writer is not None: + assert not self._pack_writer._pieces, "PackWriter has unflushed chunks; call flush() before close()" if self.lock: self.lock.release() self.lock = None diff --git a/src/borg/testsuite/hashindex_test.py b/src/borg/testsuite/hashindex_test.py index 6ba88d53a..b503a7c61 100644 --- a/src/borg/testsuite/hashindex_test.py +++ b/src/borg/testsuite/hashindex_test.py @@ -3,7 +3,7 @@ import struct import pytest -from ..constants import UNKNOWN_INT32 +from ..constants import UNKNOWN_INT32, UNKNOWN_BYTES32 from ..hashindex import ChunkIndex, ChunkIndexEntry @@ -22,20 +22,41 @@ def test_chunkindex_add(): x = H2(1) chunks.add(x, 0) assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=ChunkIndex.F_USED, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) chunks.add(x, 2) # updating size (we do not have a size yet) assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) chunks.add(x, 2) assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) with pytest.raises(AssertionError): chunks.add(x, 3) # inconsistent size (we already have a different size) +def test_chunkindex_update_pack_info(): + chunks = ChunkIndex() + x1, x2 = H2(1), H2(2) + chunks.add(x1, 10) + chunks.add(x2, 20) + assert chunks[x1].obj_offset == UNKNOWN_INT32 + assert chunks[x2].obj_offset == UNKNOWN_INT32 + + pack_id = H2(3) + # Both chunks land in the same pack (N>1 scenario): batch update in one call. + chunks.update_pack_info([(x1, pack_id, 0, 50), (x2, pack_id, 50, 60)]) + # Location fields updated; flags and size must be unchanged. + assert chunks[x1] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=10, pack_id=pack_id, obj_offset=0, obj_size=50) + assert chunks[x2] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=20, pack_id=pack_id, obj_offset=50, obj_size=60) + + # None and empty list are both no-ops. + chunks.update_pack_info(None) + chunks.update_pack_info([]) + assert chunks[x1].obj_offset == 0 + + def test_keyerror(): chunks = ChunkIndex() x = H2(1) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 2da91e5c5..d4ea660c8 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -1,10 +1,11 @@ import os import sys +from hashlib import sha256 import pytest from ..constants import ROBJ_FILE_STREAM from ..helpers import IntegrityError, Location -from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command +from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command, PackWriter from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from ..crypto.key import PlaintextKey from .hashindex_test import H @@ -200,3 +201,67 @@ class TestCacheIfRemote: def test_decrypted_cache_and_transform_incompatible(self, cache_repository, repo_objs): with pytest.raises(ValueError): cache_if_remote(cache_repository, decrypted_cache=repo_objs, transform=lambda key, data: data) + + +class MockStore: + def __init__(self): + self.stored = {} + + def store(self, key, data): + self.stored[key] = data + + +def test_pack_writer_returns_none_when_not_full(): + pw = PackWriter(MockStore(), max_count=2) + assert pw.add(b"a" * 32, b"data") is None + + +def test_pack_writer_flush_returns_none_when_empty(): + pw = PackWriter(MockStore(), max_count=1) + assert pw.flush() is None + + +def test_pack_writer_n1_flush(): + store = MockStore() + chunk_id = b"c" * 32 + cdata = b"payload" + pw = PackWriter(store, max_count=1) + results = pw.add(chunk_id, cdata) + assert results is not None + assert len(results) == 1 + stored_id, pack_id, obj_offset, obj_size = results[0] + assert stored_id == chunk_id + assert pack_id == chunk_id # N=1: pack_id == chunk_id + assert obj_offset == 0 + assert obj_size == len(cdata) + + +def test_pack_writer_n2_flush(): + store = MockStore() + id1, id2 = b"a" * 32, b"b" * 32 + data1, data2 = b"first", b"second" + pw = PackWriter(store, max_count=2) + assert pw.add(id1, data1) is None + results = pw.add(id2, data2) + assert results is not None + assert len(results) == 2 + pack_data = data1 + data2 + expected_pack_id = sha256(pack_data).digest() + assert results[0] == (id1, expected_pack_id, 0, len(data1)) + assert results[1] == (id2, expected_pack_id, len(data1), len(data2)) + + +def test_pack_writer_final_partial_pack_uses_sha256(): + # When max_count > 1, a final flush with only 1 piece must still use SHA256, + # not the N=1 pack_id == chunk_id hack. + store = MockStore() + chunk_id = b"d" * 32 + cdata = b"solo" + pw = PackWriter(store, max_count=3) + assert pw.add(chunk_id, cdata) is None + results = pw.flush() + assert results is not None + assert len(results) == 1 + _, pack_id, _, _ = results[0] + assert pack_id == sha256(cdata).digest() + assert pack_id != chunk_id