From fb2efd88fe31e0c9bb9f4ec28902110a9ec736cc Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 10 Jun 2021 01:41:11 +0200 Subject: [PATCH] implement TarfileObjectProcessors similar to FilesystemObjectProcessors --- src/borg/archive.py | 54 ++++++++++++++++++++++++++++++++++++++++++++ src/borg/archiver.py | 49 ++++++++++++++-------------------------- 2 files changed, 71 insertions(+), 32 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 50c90721d..8fc456bb0 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1392,6 +1392,60 @@ class FilesystemObjectProcessors: return status +class TarfileObjectProcessors: + def __init__(self, *, cache, key, + add_item, process_file_chunks, + chunker_params, show_progress, + log_json, iec, file_status_printer=None): + self.cache = cache + self.key = key + self.add_item = add_item + self.process_file_chunks = process_file_chunks + self.show_progress = show_progress + self.print_file_status = file_status_printer or (lambda *args: None) + + self.stats = Statistics(output_json=log_json, iec=iec) # threading: done by cache (including progress) + self.chunker = get_chunker(*chunker_params, seed=key.chunk_seed, sparse=False) + + @contextmanager + def create_helper(self, tarinfo, status=None, type=None): + item = Item(path=make_path_safe(tarinfo.name), mode=tarinfo.mode | type, + uid=tarinfo.uid, gid=tarinfo.gid, user=tarinfo.uname, group=tarinfo.gname, + mtime=tarinfo.mtime * 1000**3) + yield item, status + # if we get here, "with"-block worked ok without error/exception, the item was processed ok... + self.add_item(item, stats=self.stats) + + def process_dir(self, *, tarinfo, status, type): + with self.create_helper(tarinfo, status, type) as (item, status): + return status + + def process_fifo(self, *, tarinfo, status, type): + with self.create_helper(tarinfo, status, type) as (item, status): # fifo + return status + + def process_dev(self, *, tarinfo, status, type): + with self.create_helper(tarinfo, status, type) as (item, status): # char/block device + item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor) + return status + + def process_link(self, *, tarinfo, status, type): + with self.create_helper(tarinfo, status, type) as (item, status): + item.source = tarinfo.linkname + return status + + def process_file(self, *, tarinfo, status, type, tar): + with self.create_helper(tarinfo, status, type) as (item, status): + self.print_file_status(status, tarinfo.name) + status = None # we already printed the status + fd = tar.extractfile(tarinfo) + self.process_file_chunks(item, self.cache, self.stats, self.show_progress, + backup_io_iter(self.chunker.chunkify(fd))) + item.get_size(memorize=True) + self.stats.nfiles += 1 + return status + + def valid_msgpacked_dict(d, keys_serialized): """check if the data looks like a msgpacked dict""" d_len = len(d) diff --git a/src/borg/archiver.py b/src/borg/archiver.py index d9e64a312..57b9c66bd 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -39,7 +39,7 @@ try: from .algorithms.checksums import crc32 from .archive import Archive, ArchiveChecker, ArchiveRecreater, Statistics, is_special from .archive import BackupError, BackupOSError, backup_io, OsOpen, stat_update_check - from .archive import FilesystemObjectProcessors, MetadataCollector, ChunksProcessor + from .archive import FilesystemObjectProcessors, TarfileObjectProcessors, MetadataCollector, ChunksProcessor from .archive import has_link from .cache import Cache, assert_secure, SecurityManager from .constants import * # NOQA @@ -1748,11 +1748,6 @@ class Archiver: return self.exit_code def _import_tar(self, args, repository, manifest, key, cache, tar): - def tarinfo_to_item(tarinfo, type=0): - return Item(path=make_path_safe(tarinfo.name), mode=tarinfo.mode | type, - uid=tarinfo.uid, gid=tarinfo.gid, user=tarinfo.uname, group=tarinfo.gname, - mtime=tarinfo.mtime * 1000**3) - t0 = datetime.utcnow() t0_monotonic = time.monotonic() @@ -1761,52 +1756,42 @@ class Archiver: progress=args.progress, chunker_params=args.chunker_params, start=t0, start_monotonic=t0_monotonic, log_json=args.log_json) + cp = ChunksProcessor(cache=cache, key=key, + add_item=archive.add_item, write_checkpoint=archive.write_checkpoint, + checkpoint_interval=args.checkpoint_interval, rechunkify=False) + tfo = TarfileObjectProcessors(cache=cache, key=key, + process_file_chunks=cp.process_file_chunks, add_item=archive.add_item, + chunker_params=args.chunker_params, show_progress=args.progress, + log_json=args.log_json, iec=args.iec, + file_status_printer=self.print_file_status) while True: tarinfo = tar.next() - status = '?' if not tarinfo: break if tarinfo.isreg(): - status = 'A' - fd = tar.extractfile(tarinfo) - item = tarinfo_to_item(tarinfo, stat.S_IFREG) - fd = fd # avoid "unused fd" warning - item.chunks = [] # TODO, see do_create - item.get_size(memorize=True) + status = tfo.process_file(tarinfo=tarinfo, status='A', type=stat.S_IFREG, tar=tar) archive.stats.nfiles += 1 elif tarinfo.isdir(): - status = 'd' - item = tarinfo_to_item(tarinfo, stat.S_IFDIR) + status = tfo.process_dir(tarinfo=tarinfo, status='d', type=stat.S_IFDIR) elif tarinfo.issym(): - status = 's' - item = tarinfo_to_item(tarinfo, stat.S_IFLNK) - item.source = tarinfo.linkname + status = tfo.process_link(tarinfo=tarinfo, status='s', type=stat.S_IFLNK) elif tarinfo.islnk(): # tar uses the same hardlink model as borg (rather vice versa); the first instance of a hardlink # is stored as a regular file, later instances are special entries referencing back to the # first instance. - status = 'h' - item = tarinfo_to_item(tarinfo, stat.S_IFREG) - item.source = tarinfo.linkname + status = tfo.process_link(tarinfo=tarinfo, status='h', type=stat.S_IFREG) elif tarinfo.isblk(): - status = 'b' - item = tarinfo_to_item(tarinfo, stat.S_IFBLK) - item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor) + status = tfo.process_dev(tarinfo=tarinfo, status='b', type=stat.S_IFBLK) elif tarinfo.ischr(): - status = 'c' - item = tarinfo_to_item(tarinfo, stat.S_IFCHR) - item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor) + status = tfo.process_dev(tarinfo=tarinfo, status='c', type=stat.S_IFCHR) elif tarinfo.isfifo(): - status = 'f' - item = tarinfo_to_item(tarinfo, stat.S_IFIFO) + status = tfo.process_fifo(tarinfo=tarinfo, status='f', type=stat.S_IFIFO) else: # TODO: GNUTYPE_SPARSE? + status = 'E' self.print_warning('%s: Unsupported tar type %s', tarinfo.name, tarinfo.type) - self.print_file_status('E', tarinfo.name) - continue self.print_file_status(status, tarinfo.name) - archive.add_item(item) self._it_save_archive(args, archive)