mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-11 01:41:57 -04:00
hashindex: add UNKNOWN_BYTES32, batch update_pack_info, return None from PackWriter, refs #8572
Fix PackWriter.flush() to use max_count == 1 (not len == 1) for the pack_id hack, so final partial packs under max_count > 1 correctly use SHA256. Add covering test. Move sha256 import to module level in repository_test.
This commit is contained in:
parent
71bbde208f
commit
0395cc1bef
8 changed files with 130 additions and 26 deletions
|
|
@ -2012,18 +2012,16 @@ class ArchiveChecker:
|
|||
# either we already have this chunk in repo and chunks index or we add it now
|
||||
if id_ not in self.chunks:
|
||||
assert cdata is not None
|
||||
pack_id = id_ # only correct for N=1
|
||||
self.chunks[id_] = ChunkIndexEntry(
|
||||
flags=ChunkIndex.F_USED,
|
||||
size=size,
|
||||
pack_id=pack_id,
|
||||
pack_id=UNKNOWN_BYTES32,
|
||||
obj_offset=UNKNOWN_INT32,
|
||||
obj_size=UNKNOWN_INT32,
|
||||
)
|
||||
if self.repair:
|
||||
pack_results = self.repository.put(id_, cdata)
|
||||
for chunk_id, pack_id, obj_offset, obj_size in pack_results:
|
||||
self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size)
|
||||
self.chunks.update_pack_info(pack_results)
|
||||
|
||||
def verify_file_chunks(archive_name, item):
|
||||
"""Verifies that all file chunks are present. Missing file chunks will be logged."""
|
||||
|
|
|
|||
|
|
@ -733,10 +733,7 @@ class ChunksMixin:
|
|||
pack_results = self.repository.put(id, cdata, wait=wait)
|
||||
self.last_refresh_dt = now # .put also refreshed the lock
|
||||
self.chunks.add(id, size)
|
||||
# pack_results is non-empty when the PackWriter flushed (always at max_count=1).
|
||||
# Update the index with real pack location so obj_size is not UNKNOWN_INT32.
|
||||
for chunk_id, pack_id, obj_offset, obj_size in pack_results:
|
||||
self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size)
|
||||
self.chunks.update_pack_info(pack_results)
|
||||
stats.update(size, not exists)
|
||||
return ChunkListEntry(id, size)
|
||||
|
||||
|
|
@ -834,6 +831,11 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
|
|||
def close(self):
|
||||
self.security_manager.save(self.manifest, self.key)
|
||||
pi = ProgressIndicatorMessage(msgid="cache.close")
|
||||
# Flush any chunks still buffered in the pack writer and update the index
|
||||
# so the last batch gets real pack location values instead of UNKNOWN_*.
|
||||
if self._chunks is not None:
|
||||
pack_results = self.repository.flush()
|
||||
self._chunks.update_pack_info(pack_results)
|
||||
if self._files is not None:
|
||||
pi.output("Saving files cache")
|
||||
integrity_data = self._write_files_cache(self._files)
|
||||
|
|
|
|||
|
|
@ -55,6 +55,9 @@ MAX_DATA_SIZE = 20971479
|
|||
# Grep for UNKNOWN_INT32 to find every site that still needs updating.
|
||||
UNKNOWN_INT32 = 0xFFFFFFFF
|
||||
|
||||
# Placeholder for pack_id (32-byte field) when the value is not yet known.
|
||||
UNKNOWN_BYTES32 = b"\xff" * 32
|
||||
|
||||
# MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header)
|
||||
MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 # see assertion at end of repository module
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ class ChunkIndex:
|
|||
M_USER: int
|
||||
M_SYSTEM: int
|
||||
def add(self, key: bytes, size: int) -> None: ...
|
||||
def update_pack_info(self, key: bytes, pack_id: bytes, obj_offset: int, obj_size: int) -> None: ...
|
||||
def update_pack_info(self, pack_results: list | None) -> None: ...
|
||||
def iteritems(self, *, only_new: bool = ...) -> Iterator: ...
|
||||
def clear_new(self) -> None: ...
|
||||
def __contains__(self, key: bytes) -> bool: ...
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import struct
|
|||
|
||||
from borghash import HashTableNT
|
||||
|
||||
from .constants import UNKNOWN_INT32
|
||||
from .constants import UNKNOWN_INT32, UNKNOWN_BYTES32
|
||||
|
||||
|
||||
|
||||
|
|
@ -81,9 +81,8 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
|
|||
else:
|
||||
flags = v.flags | self.F_USED
|
||||
assert v.size == 0 or v.size == size
|
||||
pack_id = key # N=1: chunk_id == pack_id
|
||||
self[key] = ChunkIndexEntry(
|
||||
flags=flags, size=size, pack_id=pack_id, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
flags=flags, size=size, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
)
|
||||
|
||||
def __getitem__(self, key):
|
||||
|
|
@ -109,10 +108,13 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
|
|||
user_flags = value.flags & self.M_USER
|
||||
self.ht[key] = value._replace(flags=system_flags | user_flags)
|
||||
|
||||
def update_pack_info(self, key, pack_id, obj_offset, obj_size):
|
||||
"""Update the on-disk location fields for an existing entry without changing flags or size."""
|
||||
existing = self[key]
|
||||
self[key] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size)
|
||||
def update_pack_info(self, pack_results):
|
||||
"""Update the on-disk location fields for a list of (chunk_id, pack_id, obj_offset, obj_size) tuples."""
|
||||
if not pack_results:
|
||||
return
|
||||
for chunk_id, pack_id, obj_offset, obj_size in pack_results:
|
||||
existing = self[chunk_id]
|
||||
self[chunk_id] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size)
|
||||
|
||||
def clear_new(self):
|
||||
"""Clears the F_NEW flag of all items."""
|
||||
|
|
|
|||
|
|
@ -118,21 +118,21 @@ class PackWriter:
|
|||
self._pieces = [] # list of (chunk_id, cdata)
|
||||
|
||||
def add(self, chunk_id, cdata):
|
||||
"""Buffer a chunk. Returns flush results if the pack is now full, else []."""
|
||||
"""Buffer a chunk. Returns flush results if the pack is now full, else None."""
|
||||
self._pieces.append((chunk_id, cdata))
|
||||
if len(self._pieces) >= self.max_count:
|
||||
return self.flush()
|
||||
return []
|
||||
return None
|
||||
|
||||
def flush(self):
|
||||
"""Write the current pack to the store.
|
||||
|
||||
Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples —
|
||||
one entry per chunk that was written. Returns [] if there was nothing
|
||||
one entry per chunk that was written. Returns None if there was nothing
|
||||
to flush.
|
||||
"""
|
||||
if not self._pieces:
|
||||
return []
|
||||
return None
|
||||
|
||||
# Build the pack bytes once by joining all pieces (avoids O(n^2) copies
|
||||
# that incremental string concatenation would cause in Python).
|
||||
|
|
@ -143,7 +143,7 @@ class PackWriter:
|
|||
# (backward-compatible file naming: packs/{chunk_id_hex}).
|
||||
# N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the
|
||||
# file is content-addressed and borgstore can verify/cache it.
|
||||
if len(self._pieces) == 1:
|
||||
if self.max_count == 1:
|
||||
pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id
|
||||
else:
|
||||
pack_id = sha256(pack_data).digest() # N>1: content-addressed
|
||||
|
|
@ -409,7 +409,20 @@ class Repository:
|
|||
self._pack_writer = PackWriter(self.store, max_count=1)
|
||||
self.opened = True
|
||||
|
||||
def flush(self):
|
||||
"""Flush any buffered pack writer chunks. Returns pack_results (or None).
|
||||
|
||||
Callers that maintain a ChunkIndex must call this and pass the result to
|
||||
chunks.update_pack_info() before closing, so index entries for the last
|
||||
batch of chunks get real pack location values instead of UNKNOWN_*.
|
||||
"""
|
||||
if self._pack_writer is not None:
|
||||
return self._pack_writer.flush()
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
if self._pack_writer is not None:
|
||||
assert not self._pack_writer._pieces, "PackWriter has unflushed chunks; call flush() before close()"
|
||||
if self.lock:
|
||||
self.lock.release()
|
||||
self.lock = None
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import struct
|
|||
|
||||
import pytest
|
||||
|
||||
from ..constants import UNKNOWN_INT32
|
||||
from ..constants import UNKNOWN_INT32, UNKNOWN_BYTES32
|
||||
from ..hashindex import ChunkIndex, ChunkIndexEntry
|
||||
|
||||
|
||||
|
|
@ -22,20 +22,41 @@ def test_chunkindex_add():
|
|||
x = H2(1)
|
||||
chunks.add(x, 0)
|
||||
assert chunks[x] == ChunkIndexEntry(
|
||||
flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
flags=ChunkIndex.F_USED, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
)
|
||||
chunks.add(x, 2) # updating size (we do not have a size yet)
|
||||
assert chunks[x] == ChunkIndexEntry(
|
||||
flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
)
|
||||
chunks.add(x, 2)
|
||||
assert chunks[x] == ChunkIndexEntry(
|
||||
flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
)
|
||||
with pytest.raises(AssertionError):
|
||||
chunks.add(x, 3) # inconsistent size (we already have a different size)
|
||||
|
||||
|
||||
def test_chunkindex_update_pack_info():
|
||||
chunks = ChunkIndex()
|
||||
x1, x2 = H2(1), H2(2)
|
||||
chunks.add(x1, 10)
|
||||
chunks.add(x2, 20)
|
||||
assert chunks[x1].obj_offset == UNKNOWN_INT32
|
||||
assert chunks[x2].obj_offset == UNKNOWN_INT32
|
||||
|
||||
pack_id = H2(3)
|
||||
# Both chunks land in the same pack (N>1 scenario): batch update in one call.
|
||||
chunks.update_pack_info([(x1, pack_id, 0, 50), (x2, pack_id, 50, 60)])
|
||||
# Location fields updated; flags and size must be unchanged.
|
||||
assert chunks[x1] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=10, pack_id=pack_id, obj_offset=0, obj_size=50)
|
||||
assert chunks[x2] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=20, pack_id=pack_id, obj_offset=50, obj_size=60)
|
||||
|
||||
# None and empty list are both no-ops.
|
||||
chunks.update_pack_info(None)
|
||||
chunks.update_pack_info([])
|
||||
assert chunks[x1].obj_offset == 0
|
||||
|
||||
|
||||
def test_keyerror():
|
||||
chunks = ChunkIndex()
|
||||
x = H2(1)
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
import os
|
||||
import sys
|
||||
from hashlib import sha256
|
||||
|
||||
import pytest
|
||||
from ..constants import ROBJ_FILE_STREAM
|
||||
from ..helpers import IntegrityError, Location
|
||||
from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command
|
||||
from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command, PackWriter
|
||||
from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION
|
||||
from ..crypto.key import PlaintextKey
|
||||
from .hashindex_test import H
|
||||
|
|
@ -200,3 +201,67 @@ class TestCacheIfRemote:
|
|||
def test_decrypted_cache_and_transform_incompatible(self, cache_repository, repo_objs):
|
||||
with pytest.raises(ValueError):
|
||||
cache_if_remote(cache_repository, decrypted_cache=repo_objs, transform=lambda key, data: data)
|
||||
|
||||
|
||||
class MockStore:
|
||||
def __init__(self):
|
||||
self.stored = {}
|
||||
|
||||
def store(self, key, data):
|
||||
self.stored[key] = data
|
||||
|
||||
|
||||
def test_pack_writer_returns_none_when_not_full():
|
||||
pw = PackWriter(MockStore(), max_count=2)
|
||||
assert pw.add(b"a" * 32, b"data") is None
|
||||
|
||||
|
||||
def test_pack_writer_flush_returns_none_when_empty():
|
||||
pw = PackWriter(MockStore(), max_count=1)
|
||||
assert pw.flush() is None
|
||||
|
||||
|
||||
def test_pack_writer_n1_flush():
|
||||
store = MockStore()
|
||||
chunk_id = b"c" * 32
|
||||
cdata = b"payload"
|
||||
pw = PackWriter(store, max_count=1)
|
||||
results = pw.add(chunk_id, cdata)
|
||||
assert results is not None
|
||||
assert len(results) == 1
|
||||
stored_id, pack_id, obj_offset, obj_size = results[0]
|
||||
assert stored_id == chunk_id
|
||||
assert pack_id == chunk_id # N=1: pack_id == chunk_id
|
||||
assert obj_offset == 0
|
||||
assert obj_size == len(cdata)
|
||||
|
||||
|
||||
def test_pack_writer_n2_flush():
|
||||
store = MockStore()
|
||||
id1, id2 = b"a" * 32, b"b" * 32
|
||||
data1, data2 = b"first", b"second"
|
||||
pw = PackWriter(store, max_count=2)
|
||||
assert pw.add(id1, data1) is None
|
||||
results = pw.add(id2, data2)
|
||||
assert results is not None
|
||||
assert len(results) == 2
|
||||
pack_data = data1 + data2
|
||||
expected_pack_id = sha256(pack_data).digest()
|
||||
assert results[0] == (id1, expected_pack_id, 0, len(data1))
|
||||
assert results[1] == (id2, expected_pack_id, len(data1), len(data2))
|
||||
|
||||
|
||||
def test_pack_writer_final_partial_pack_uses_sha256():
|
||||
# When max_count > 1, a final flush with only 1 piece must still use SHA256,
|
||||
# not the N=1 pack_id == chunk_id hack.
|
||||
store = MockStore()
|
||||
chunk_id = b"d" * 32
|
||||
cdata = b"solo"
|
||||
pw = PackWriter(store, max_count=3)
|
||||
assert pw.add(chunk_id, cdata) is None
|
||||
results = pw.flush()
|
||||
assert results is not None
|
||||
assert len(results) == 1
|
||||
_, pack_id, _, _ = results[0]
|
||||
assert pack_id == sha256(cdata).digest()
|
||||
assert pack_id != chunk_id
|
||||
|
|
|
|||
Loading…
Reference in a new issue