mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-09 08:51:54 -04:00
Merge pull request #9692 from mr-raj12/pack-files-step3-pack-id
repoobj, repository: add chunk_id to header, introduce packs/ namespace
This commit is contained in:
commit
b8034fcf40
5 changed files with 60 additions and 54 deletions
|
|
@ -127,9 +127,9 @@ def with_repository(
|
|||
)
|
||||
|
||||
with repository:
|
||||
if repository.version not in (3,):
|
||||
if repository.version not in (4,):
|
||||
raise Error(
|
||||
f"This borg version only accepts version 3 repos for -r/--repo, "
|
||||
f"This borg version only accepts version 4 repos for -r/--repo, "
|
||||
f"but not version {repository.version}. "
|
||||
f"You can use 'borg transfer' to copy archives from old to new repos."
|
||||
)
|
||||
|
|
@ -194,10 +194,10 @@ def with_other_repository(manifest=False, cache=False, compatibility=None):
|
|||
)
|
||||
|
||||
with repository:
|
||||
acceptable_versions = (1,) if v1_legacy else (3,)
|
||||
acceptable_versions = (1,) if v1_legacy else (4,)
|
||||
if repository.version not in acceptable_versions:
|
||||
raise Error(
|
||||
f"This borg version only accepts version {' or '.join(acceptable_versions)} "
|
||||
f"This borg version only accepts version {' or '.join(str(v) for v in acceptable_versions)} "
|
||||
f"repos for --other-repo."
|
||||
)
|
||||
kwargs["other_repository"] = repository
|
||||
|
|
|
|||
|
|
@ -13,11 +13,14 @@ AUTHENTICATED_NO_KEY = "authenticated_no_key" in workarounds
|
|||
OBJ_MAGIC = b"BORG_OBJ"
|
||||
OBJ_VERSION = 0x01
|
||||
|
||||
# Fixed header size per blob: OBJ_MAGIC(8) + version(1) + chunk_id(32) + meta_size(4) + data_size(4)
|
||||
REPOOBJ_HEADER_SIZE = 49
|
||||
|
||||
|
||||
class RepoObj:
|
||||
# Object header: magic (8b), format version (1b), meta size (4b), data size (4b).
|
||||
obj_header = Struct("<8sBII")
|
||||
ObjHeader = namedtuple("ObjHeader", "magic version meta_size data_size")
|
||||
# Object header: magic (8b), format version (1b), chunk_id (32b), meta size (4b), data size (4b).
|
||||
obj_header = Struct("<8sB32sII")
|
||||
ObjHeader = namedtuple("ObjHeader", "magic version chunk_id meta_size data_size")
|
||||
|
||||
@classmethod
|
||||
def extract_crypted_data(cls, data: bytes) -> bytes:
|
||||
|
|
@ -72,7 +75,7 @@ class RepoObj:
|
|||
data_encrypted = self.key.encrypt(id, data_compressed)
|
||||
meta_packed = msgpack.packb(meta)
|
||||
meta_encrypted = self.key.encrypt(id, meta_packed)
|
||||
hdr = self.ObjHeader(OBJ_MAGIC, OBJ_VERSION, len(meta_encrypted), len(data_encrypted))
|
||||
hdr = self.ObjHeader(OBJ_MAGIC, OBJ_VERSION, id, len(meta_encrypted), len(data_encrypted))
|
||||
hdr_packed = self.obj_header.pack(*hdr)
|
||||
return hdr_packed + meta_encrypted + data_encrypted
|
||||
|
||||
|
|
|
|||
|
|
@ -115,15 +115,13 @@ class Repository:
|
|||
location = Location(url)
|
||||
self._location = location
|
||||
self.url = url
|
||||
# lots of stuff in data: use 2 levels by default (data/00/00/ .. data/ff/ff/ dirs)!
|
||||
data_levels = int(os.environ.get("BORG_STORE_DATA_LEVELS", "2"))
|
||||
ns_config = {
|
||||
"archives/": {"levels": [0]},
|
||||
"cache/": {"levels": [0]},
|
||||
"config/": {"levels": [0]},
|
||||
"data/": {"levels": [data_levels]},
|
||||
"keys/": {"levels": [0]},
|
||||
"locks/": {"levels": [0]},
|
||||
"packs/": {"levels": [1]},
|
||||
}
|
||||
# Get permissions from parameter or environment variable
|
||||
permissions = permissions if permissions is not None else os.environ.get("BORG_REPO_PERMISSIONS", "all")
|
||||
|
|
@ -136,9 +134,9 @@ class Repository:
|
|||
"archives": "lrw",
|
||||
"cache": "lrwWD", # WD for chunks.<HASH>, last-key-checked, ...
|
||||
"config": "lrW", # W for manifest
|
||||
"data": "lrw",
|
||||
"keys": "lr",
|
||||
"locks": "lrwD", # borg needs to create/delete a shared lock here
|
||||
"packs": "lrw",
|
||||
}
|
||||
elif permissions == "write-only": # mostly no reading
|
||||
permissions = {
|
||||
|
|
@ -146,9 +144,9 @@ class Repository:
|
|||
"archives": "lw",
|
||||
"cache": "lrwWD", # read allowed, e.g. for chunks.<HASH> cache
|
||||
"config": "lrW", # W for manifest
|
||||
"data": "lw", # no r!
|
||||
"keys": "lr",
|
||||
"locks": "lrwD", # borg needs to create/delete a shared lock here
|
||||
"packs": "lw", # no r!
|
||||
}
|
||||
elif permissions == "read-only": # mostly r/o
|
||||
permissions = {"": "lr", "locks": "lrwD"}
|
||||
|
|
@ -171,7 +169,7 @@ class Repository:
|
|||
self._send_log = send_log_cb or (lambda: None)
|
||||
self.do_create = create
|
||||
self.created = False
|
||||
self.acceptable_repo_versions = (3,)
|
||||
self.acceptable_repo_versions = (4,)
|
||||
self.opened = False
|
||||
self.lock = None
|
||||
self.do_lock = lock
|
||||
|
|
@ -209,10 +207,10 @@ class Repository:
|
|||
self.store.open()
|
||||
try:
|
||||
self.store.store("config/readme", REPOSITORY_README.encode())
|
||||
self.version = 3
|
||||
self.version = 4
|
||||
self.store.store("config/version", str(self.version).encode())
|
||||
self.store.store("config/id", bin_to_hex(os.urandom(32)).encode())
|
||||
# we know repo/data/ still does not have any chunks stored in it,
|
||||
# we know repo/packs/ still does not have any chunks stored in it,
|
||||
# but for some stores, there might be a lot of empty directories and
|
||||
# listing them all might be rather slow, so we better cache an empty
|
||||
# ChunkIndex from here so that the first repo operation does not have
|
||||
|
|
@ -327,22 +325,21 @@ class Repository:
|
|||
def check_object(obj):
|
||||
"""Check if obj looks valid."""
|
||||
hdr_size = RepoObj.obj_header.size
|
||||
obj_size = len(obj)
|
||||
if obj_size >= hdr_size:
|
||||
hdr = RepoObj.ObjHeader(*RepoObj.obj_header.unpack(obj[:hdr_size]))
|
||||
if hdr.magic != OBJ_MAGIC:
|
||||
log_error("invalid object magic.")
|
||||
elif hdr.version != OBJ_VERSION:
|
||||
log_error(f"unsupported object version: {hdr.version}.")
|
||||
else:
|
||||
meta = obj[hdr_size : hdr_size + hdr.meta_size]
|
||||
if hdr.meta_size != len(meta):
|
||||
log_error("metadata size mismatch.")
|
||||
data = obj[hdr_size + hdr.meta_size : hdr_size + hdr.meta_size + hdr.data_size]
|
||||
if hdr.data_size != len(data):
|
||||
log_error("data size mismatch.")
|
||||
else:
|
||||
if len(obj) < hdr_size:
|
||||
log_error("too small.")
|
||||
return
|
||||
hdr = RepoObj.ObjHeader(*RepoObj.obj_header.unpack(obj[:hdr_size]))
|
||||
if hdr.magic != OBJ_MAGIC:
|
||||
log_error("invalid object magic.")
|
||||
elif hdr.version != OBJ_VERSION:
|
||||
log_error(f"unsupported object version: {hdr.version}.")
|
||||
else:
|
||||
meta = obj[hdr_size : hdr_size + hdr.meta_size]
|
||||
if hdr.meta_size != len(meta):
|
||||
log_error("metadata size mismatch.")
|
||||
data = obj[hdr_size + hdr.meta_size : hdr_size + hdr.meta_size + hdr.data_size]
|
||||
if hdr.data_size != len(data):
|
||||
log_error("data size mismatch.")
|
||||
|
||||
# TODO: progress indicator, ...
|
||||
partial = bool(max_duration)
|
||||
|
|
@ -376,11 +373,11 @@ class Repository:
|
|||
# As we don't do garbage collection here, this is not a problem.
|
||||
# We also don't know the plaintext size, so we set it to 0.
|
||||
init_entry = ChunkIndexEntry(flags=ChunkIndex.F_USED, size=0)
|
||||
infos = self.store.list("data")
|
||||
infos = self.store.list("packs")
|
||||
try:
|
||||
for info in infos:
|
||||
self._lock_refresh()
|
||||
key = "data/%s" % info.name
|
||||
key = "packs/%s" % info.name
|
||||
if key <= last_key_checked: # needs sorted keys
|
||||
continue
|
||||
try:
|
||||
|
|
@ -412,8 +409,9 @@ class Repository:
|
|||
# add all existing objects to the index.
|
||||
# borg check: the index may have corrupted objects (we did not delete them)
|
||||
# borg check --repair: the index will only have non-corrupted objects.
|
||||
id = hex_to_bin(info.name)
|
||||
chunks[id] = init_entry
|
||||
pack_id = hex_to_bin(info.name)
|
||||
chunk_id = pack_id # N=1: chunk_id == pack_id
|
||||
chunks[chunk_id] = init_entry
|
||||
now = time.monotonic()
|
||||
if now > t_last_checkpoint + 300: # checkpoint every 5 mins
|
||||
t_last_checkpoint = now
|
||||
|
|
@ -437,7 +435,7 @@ class Repository:
|
|||
self, chunks, incremental=False, clear=True, force_write=True, delete_other=True
|
||||
)
|
||||
except StoreObjectNotFound:
|
||||
# it can be that there is no "data/" at all, then it crashes when iterating infos.
|
||||
# it can be that there is no "packs/" at all, then it crashes when iterating infos.
|
||||
pass
|
||||
logger.info(f"Checked {objs_checked} repository objects, {objs_errors} errors.")
|
||||
if objs_errors == 0:
|
||||
|
|
@ -456,33 +454,35 @@ class Repository:
|
|||
"""
|
||||
collect = True if marker is None else False
|
||||
result = []
|
||||
infos = self.store.list("data") # generator yielding ItemInfos
|
||||
infos = self.store.list("packs") # generator yielding ItemInfos
|
||||
while True:
|
||||
self._lock_refresh()
|
||||
try:
|
||||
info = next(infos)
|
||||
except StoreObjectNotFound:
|
||||
break # can happen e.g. if "data" does not exist, pointless to continue in that case
|
||||
break # can happen e.g. if "packs" does not exist, pointless to continue in that case
|
||||
except StopIteration:
|
||||
break
|
||||
else:
|
||||
id = hex_to_bin(info.name)
|
||||
pack_id = hex_to_bin(info.name)
|
||||
chunk_id = pack_id # N=1: chunk_id == pack_id
|
||||
if collect:
|
||||
result.append((id, info.size))
|
||||
chunk_size = info.size # only correct for N=1
|
||||
result.append((chunk_id, chunk_size))
|
||||
if len(result) == limit:
|
||||
break
|
||||
elif id == marker:
|
||||
elif chunk_id == marker:
|
||||
collect = True
|
||||
# note: do not collect the marker id
|
||||
return result
|
||||
|
||||
def get(self, id, read_data=True, raise_missing=True):
|
||||
self._lock_refresh()
|
||||
pack_id = id # N=1: pack_id == chunk_id
|
||||
id_hex = bin_to_hex(id)
|
||||
key = "data/" + id_hex
|
||||
key = "packs/" + bin_to_hex(pack_id)
|
||||
try:
|
||||
if read_data:
|
||||
# read everything
|
||||
return self.store.load(key)
|
||||
else:
|
||||
# RepoObj layout supports separately encrypted metadata and data.
|
||||
|
|
@ -523,7 +523,8 @@ class Repository:
|
|||
if data_size > MAX_DATA_SIZE:
|
||||
raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]")
|
||||
|
||||
key = "data/" + bin_to_hex(id)
|
||||
pack_id = id # N=1: pack_id == chunk_id
|
||||
key = "packs/" + bin_to_hex(pack_id)
|
||||
self.store.store(key, data)
|
||||
|
||||
def delete(self, id, wait=True):
|
||||
|
|
@ -533,7 +534,8 @@ class Repository:
|
|||
deal with async results / exceptions later.
|
||||
"""
|
||||
self._lock_refresh()
|
||||
key = "data/" + bin_to_hex(id)
|
||||
pack_id = id # N=1: pack_id == chunk_id
|
||||
key = "packs/" + bin_to_hex(pack_id)
|
||||
try:
|
||||
self.store.delete(key)
|
||||
except StoreObjectNotFound:
|
||||
|
|
|
|||
|
|
@ -225,7 +225,7 @@ def test_corrupted_manifest(archivers, request):
|
|||
archive, repository = open_archive(archiver.repository_path, "archive1")
|
||||
with repository:
|
||||
manifest = repository.get_manifest()
|
||||
corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:]
|
||||
corrupted_manifest = manifest[:250] + b"corrupted!" + manifest[250:]
|
||||
repository.put_manifest(corrupted_manifest)
|
||||
cmd(archiver, "check", exit_code=1)
|
||||
output = cmd(archiver, "check", "-v", "--repair", exit_code=0)
|
||||
|
|
@ -273,7 +273,7 @@ def test_manifest_rebuild_corrupted_chunk(archivers, request):
|
|||
archive, repository = open_archive(archiver.repository_path, "archive1")
|
||||
with repository:
|
||||
manifest = repository.get_manifest()
|
||||
corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:]
|
||||
corrupted_manifest = manifest[:250] + b"corrupted!" + manifest[250:]
|
||||
repository.put_manifest(corrupted_manifest)
|
||||
chunk = repository.get(archive.id)
|
||||
corrupted_chunk = chunk + b"corrupted!"
|
||||
|
|
@ -312,7 +312,7 @@ def test_spoofed_archive(archivers, request):
|
|||
with repository:
|
||||
# attacker would corrupt or delete the manifest to trigger a rebuild of it:
|
||||
manifest = repository.get_manifest()
|
||||
corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:]
|
||||
corrupted_manifest = manifest[:250] + b"corrupted!" + manifest[250:]
|
||||
repository.put_manifest(corrupted_manifest)
|
||||
archive_dict = {
|
||||
"command_line": "",
|
||||
|
|
@ -351,8 +351,9 @@ def test_extra_chunks(archivers, request):
|
|||
check_cmd_setup(archiver)
|
||||
cmd(archiver, "check", exit_code=0)
|
||||
with Repository(archiver.repository_location, exclusive=True) as repository:
|
||||
chunk = fchunk(b"xxxx")
|
||||
repository.put(b"01234567890123456789012345678901", chunk)
|
||||
key = b"01234567890123456789012345678901"
|
||||
chunk = fchunk(b"xxxx", chunk_id=key)
|
||||
repository.put(key, chunk)
|
||||
cmd(archiver, "check", "-v", exit_code=0) # check does not deal with orphans anymore
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -53,9 +53,9 @@ def reopen(repository, exclusive: bool | None = True, create=False):
|
|||
)
|
||||
|
||||
|
||||
def fchunk(data, meta=b""):
|
||||
def fchunk(data, meta=b"", chunk_id=b"\x00" * 32):
|
||||
# Format chunk: create a raw chunk that has a valid RepoObj layout, but does not use encryption or compression.
|
||||
hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, len(meta), len(data))
|
||||
hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, chunk_id, len(meta), len(data))
|
||||
assert isinstance(data, bytes)
|
||||
chunk = hdr + meta + data
|
||||
return chunk
|
||||
|
|
@ -65,7 +65,7 @@ def pchunk(chunk):
|
|||
# Parse chunk: extract data and metadata from a raw chunk made by fchunk.
|
||||
hdr_size = RepoObj.obj_header.size
|
||||
hdr = chunk[:hdr_size]
|
||||
meta_size, data_size = RepoObj.obj_header.unpack(hdr)[2:4]
|
||||
meta_size, data_size = RepoObj.obj_header.unpack(hdr)[3:5]
|
||||
meta = chunk[hdr_size : hdr_size + meta_size]
|
||||
data = chunk[hdr_size + meta_size : hdr_size + meta_size + data_size]
|
||||
return data, meta
|
||||
|
|
@ -97,7 +97,7 @@ def test_basic_operations(repo_fixtures, request):
|
|||
def test_read_data(repo_fixtures, request):
|
||||
with get_repository_from_fixture(repo_fixtures, request) as repository:
|
||||
meta, data = b"meta", b"data"
|
||||
hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, len(meta), len(data))
|
||||
hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, H(0), len(meta), len(data))
|
||||
chunk_complete = hdr + meta + data
|
||||
chunk_short = hdr + meta
|
||||
repository.put(H(0), chunk_complete)
|
||||
|
|
|
|||
Loading…
Reference in a new issue