implement TarfileObjectProcessors similar to FilesystemObjectProcessors

This commit is contained in:
Thomas Waldmann 2021-06-10 01:41:11 +02:00
parent 5304693c55
commit fb2efd88fe
2 changed files with 71 additions and 32 deletions

View file

@ -1392,6 +1392,60 @@ class FilesystemObjectProcessors:
return status
class TarfileObjectProcessors:
def __init__(self, *, cache, key,
add_item, process_file_chunks,
chunker_params, show_progress,
log_json, iec, file_status_printer=None):
self.cache = cache
self.key = key
self.add_item = add_item
self.process_file_chunks = process_file_chunks
self.show_progress = show_progress
self.print_file_status = file_status_printer or (lambda *args: None)
self.stats = Statistics(output_json=log_json, iec=iec) # threading: done by cache (including progress)
self.chunker = get_chunker(*chunker_params, seed=key.chunk_seed, sparse=False)
@contextmanager
def create_helper(self, tarinfo, status=None, type=None):
item = Item(path=make_path_safe(tarinfo.name), mode=tarinfo.mode | type,
uid=tarinfo.uid, gid=tarinfo.gid, user=tarinfo.uname, group=tarinfo.gname,
mtime=tarinfo.mtime * 1000**3)
yield item, status
# if we get here, "with"-block worked ok without error/exception, the item was processed ok...
self.add_item(item, stats=self.stats)
def process_dir(self, *, tarinfo, status, type):
with self.create_helper(tarinfo, status, type) as (item, status):
return status
def process_fifo(self, *, tarinfo, status, type):
with self.create_helper(tarinfo, status, type) as (item, status): # fifo
return status
def process_dev(self, *, tarinfo, status, type):
with self.create_helper(tarinfo, status, type) as (item, status): # char/block device
item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor)
return status
def process_link(self, *, tarinfo, status, type):
with self.create_helper(tarinfo, status, type) as (item, status):
item.source = tarinfo.linkname
return status
def process_file(self, *, tarinfo, status, type, tar):
with self.create_helper(tarinfo, status, type) as (item, status):
self.print_file_status(status, tarinfo.name)
status = None # we already printed the status
fd = tar.extractfile(tarinfo)
self.process_file_chunks(item, self.cache, self.stats, self.show_progress,
backup_io_iter(self.chunker.chunkify(fd)))
item.get_size(memorize=True)
self.stats.nfiles += 1
return status
def valid_msgpacked_dict(d, keys_serialized):
"""check if the data <d> looks like a msgpacked dict"""
d_len = len(d)

View file

@ -39,7 +39,7 @@ try:
from .algorithms.checksums import crc32
from .archive import Archive, ArchiveChecker, ArchiveRecreater, Statistics, is_special
from .archive import BackupError, BackupOSError, backup_io, OsOpen, stat_update_check
from .archive import FilesystemObjectProcessors, MetadataCollector, ChunksProcessor
from .archive import FilesystemObjectProcessors, TarfileObjectProcessors, MetadataCollector, ChunksProcessor
from .archive import has_link
from .cache import Cache, assert_secure, SecurityManager
from .constants import * # NOQA
@ -1748,11 +1748,6 @@ class Archiver:
return self.exit_code
def _import_tar(self, args, repository, manifest, key, cache, tar):
def tarinfo_to_item(tarinfo, type=0):
return Item(path=make_path_safe(tarinfo.name), mode=tarinfo.mode | type,
uid=tarinfo.uid, gid=tarinfo.gid, user=tarinfo.uname, group=tarinfo.gname,
mtime=tarinfo.mtime * 1000**3)
t0 = datetime.utcnow()
t0_monotonic = time.monotonic()
@ -1761,52 +1756,42 @@ class Archiver:
progress=args.progress,
chunker_params=args.chunker_params, start=t0, start_monotonic=t0_monotonic,
log_json=args.log_json)
cp = ChunksProcessor(cache=cache, key=key,
add_item=archive.add_item, write_checkpoint=archive.write_checkpoint,
checkpoint_interval=args.checkpoint_interval, rechunkify=False)
tfo = TarfileObjectProcessors(cache=cache, key=key,
process_file_chunks=cp.process_file_chunks, add_item=archive.add_item,
chunker_params=args.chunker_params, show_progress=args.progress,
log_json=args.log_json, iec=args.iec,
file_status_printer=self.print_file_status)
while True:
tarinfo = tar.next()
status = '?'
if not tarinfo:
break
if tarinfo.isreg():
status = 'A'
fd = tar.extractfile(tarinfo)
item = tarinfo_to_item(tarinfo, stat.S_IFREG)
fd = fd # avoid "unused fd" warning
item.chunks = [] # TODO, see do_create
item.get_size(memorize=True)
status = tfo.process_file(tarinfo=tarinfo, status='A', type=stat.S_IFREG, tar=tar)
archive.stats.nfiles += 1
elif tarinfo.isdir():
status = 'd'
item = tarinfo_to_item(tarinfo, stat.S_IFDIR)
status = tfo.process_dir(tarinfo=tarinfo, status='d', type=stat.S_IFDIR)
elif tarinfo.issym():
status = 's'
item = tarinfo_to_item(tarinfo, stat.S_IFLNK)
item.source = tarinfo.linkname
status = tfo.process_link(tarinfo=tarinfo, status='s', type=stat.S_IFLNK)
elif tarinfo.islnk():
# tar uses the same hardlink model as borg (rather vice versa); the first instance of a hardlink
# is stored as a regular file, later instances are special entries referencing back to the
# first instance.
status = 'h'
item = tarinfo_to_item(tarinfo, stat.S_IFREG)
item.source = tarinfo.linkname
status = tfo.process_link(tarinfo=tarinfo, status='h', type=stat.S_IFREG)
elif tarinfo.isblk():
status = 'b'
item = tarinfo_to_item(tarinfo, stat.S_IFBLK)
item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor)
status = tfo.process_dev(tarinfo=tarinfo, status='b', type=stat.S_IFBLK)
elif tarinfo.ischr():
status = 'c'
item = tarinfo_to_item(tarinfo, stat.S_IFCHR)
item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor)
status = tfo.process_dev(tarinfo=tarinfo, status='c', type=stat.S_IFCHR)
elif tarinfo.isfifo():
status = 'f'
item = tarinfo_to_item(tarinfo, stat.S_IFIFO)
status = tfo.process_fifo(tarinfo=tarinfo, status='f', type=stat.S_IFIFO)
else:
# TODO: GNUTYPE_SPARSE?
status = 'E'
self.print_warning('%s: Unsupported tar type %s', tarinfo.name, tarinfo.type)
self.print_file_status('E', tarinfo.name)
continue
self.print_file_status(status, tarinfo.name)
archive.add_item(item)
self._it_save_archive(args, archive)