diff --git a/src/borg/archive.py b/src/borg/archive.py index 06470eaba..b27f48aa1 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -314,10 +314,8 @@ class Archive: self.create = create if self.create: self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats) - self.chunker = Chunker(self.key.chunk_seed, *chunker_params) if name in manifest.archives: raise self.AlreadyExists(name) - self.last_checkpoint = time.monotonic() i = 0 while True: self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '') @@ -809,6 +807,25 @@ Utilization of max. archive size: {csize_max:.0%} logger.warning('forced deletion succeeded, but the deleted archive was corrupted.') logger.warning('borg check --repair is required to free all space.') + @staticmethod + def _open_rb(path): + try: + # if we have O_NOATIME, this likely will succeed if we are root or owner of file: + return os.open(path, flags_noatime) + except PermissionError: + if flags_noatime == flags_normal: + # we do not have O_NOATIME, no need to try again: + raise + # Was this EPERM due to the O_NOATIME flag? Try again without it: + return os.open(path, flags_normal) + + +class MetadataCollector: + def __init__(self, *, noatime, noctime, numeric_owner): + self.noatime = noatime + self.noctime = noctime + self.numeric_owner = numeric_owner + def stat_simple_attrs(self, st): attrs = dict( mode=st.st_mode, @@ -847,6 +864,82 @@ Utilization of max. archive size: {csize_max:.0%} attrs.update(self.stat_ext_attrs(st, path)) return attrs + +class ChunksProcessor: + # Processes an iterator of chunks for an Item + + def __init__(self, *, key, cache, + add_item, write_checkpoint, + checkpoint_interval): + self.key = key + self.cache = cache + self.add_item = add_item + self.write_checkpoint = write_checkpoint + self.checkpoint_interval = checkpoint_interval + self.last_checkpoint = time.monotonic() + + def write_part_file(self, item, from_chunk, number): + item = Item(internal_dict=item.as_dict()) + length = len(item.chunks) + # the item should only have the *additional* chunks we processed after the last partial item: + item.chunks = item.chunks[from_chunk:] + item.get_size(memorize=True) + item.path += '.borg_part_%d' % number + item.part = number + number += 1 + self.add_item(item, show_progress=False) + self.write_checkpoint() + return length, number + + def process_file_chunks(self, item, cache, stats, chunk_iter, chunk_processor=None): + if not chunk_processor: + def chunk_processor(data): + chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False) + self.cache.repository.async_response(wait=False) + return chunk_entry + + item.chunks = [] + from_chunk = 0 + part_number = 1 + for data in chunk_iter: + item.chunks.append(chunk_processor(data)) + if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval: + from_chunk, part_number = self.write_part_file(item, from_chunk, part_number) + self.last_checkpoint = time.monotonic() + else: + if part_number > 1: + if item.chunks[from_chunk:]: + # if we already have created a part item inside this file, we want to put the final + # chunks (if any) into a part item also (so all parts can be concatenated to get + # the complete file): + from_chunk, part_number = self.write_part_file(item, from_chunk, part_number) + self.last_checkpoint = time.monotonic() + + # if we created part files, we have referenced all chunks from the part files, + # but we also will reference the same chunks also from the final, complete file: + for chunk in item.chunks: + cache.chunk_incref(chunk.id, stats, size=chunk.size) + + +class FilesystemObjectProcessors: + # When ported to threading, then this doesn't need chunker, cache, key any more. + # write_checkpoint should then be in the item buffer, + # and process_file becomes a callback passed to __init__. + + def __init__(self, *, metadata_collector, cache, key, + add_item, process_file_chunks, + chunker_params): + self.metadata_collector = metadata_collector + self.cache = cache + self.key = key + self.add_item = add_item + self.process_file_chunks = process_file_chunks + + self.hard_links = {} + self.stats = Statistics() # threading: done by cache (including progress) + self.cwd = os.getcwd() + self.chunker = Chunker(key.chunk_seed, *chunker_params) + @contextmanager def create_helper(self, path, st, status=None, hardlinkable=True): safe_path = make_path_safe(path) @@ -869,18 +962,18 @@ Utilization of max. archive size: {csize_max:.0%} def process_dir(self, path, st): with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master): - item.update(self.stat_attrs(st, path)) + item.update(self.metadata_collector.stat_attrs(st, path)) return status def process_fifo(self, path, st): with self.create_helper(path, st, 'f') as (item, status, hardlinked, hardlink_master): # fifo - item.update(self.stat_attrs(st, path)) + item.update(self.metadata_collector.stat_attrs(st, path)) return status def process_dev(self, path, st, dev_type): with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hardlink_master): # char/block device item.rdev = st.st_rdev - item.update(self.stat_attrs(st, path)) + item.update(self.metadata_collector.stat_attrs(st, path)) return status def process_symlink(self, path, st): @@ -892,53 +985,9 @@ Utilization of max. archive size: {csize_max:.0%} item.source = source if st.st_nlink > 1: logger.warning('hardlinked symlinks will be archived as non-hardlinked symlinks!') - item.update(self.stat_attrs(st, path)) + item.update(self.metadata_collector.stat_attrs(st, path)) return status - def write_part_file(self, item, from_chunk, number): - item = Item(internal_dict=item.as_dict()) - length = len(item.chunks) - # the item should only have the *additional* chunks we processed after the last partial item: - item.chunks = item.chunks[from_chunk:] - item.get_size(memorize=True) - item.path += '.borg_part_%d' % number - item.part = number - number += 1 - self.add_item(item, show_progress=False) - self.write_checkpoint() - return length, number - - def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None): - if not chunk_processor: - def chunk_processor(data): - chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False) - self.cache.repository.async_response(wait=False) - return chunk_entry - - item.chunks = [] - from_chunk = 0 - part_number = 1 - for data in chunk_iter: - item.chunks.append(chunk_processor(data)) - if self.show_progress: - self.stats.show_progress(item=item, dt=0.2) - if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval: - from_chunk, part_number = self.write_part_file(item, from_chunk, part_number) - self.last_checkpoint = time.monotonic() - else: - if part_number > 1: - if item.chunks[from_chunk:]: - # if we already have created a part item inside this file, we want to put the final - # chunks (if any) into a part item also (so all parts can be concatenated to get - # the complete file): - from_chunk, part_number = self.write_part_file(item, from_chunk, part_number) - self.last_checkpoint = time.monotonic() - - # if we created part files, we have referenced all chunks from the part files, - # but we also will reference the same chunks also from the final, complete file: - for chunk in item.chunks: - cache.chunk_incref(chunk.id, stats, size=chunk.size) - def process_stdin(self, path, cache): uid, gid = 0, 0 t = int(time.time()) * 1000000000 @@ -950,7 +999,7 @@ Utilization of max. archive size: {csize_max:.0%} mtime=t, atime=t, ctime=t, ) fd = sys.stdin.buffer # binary - self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd))) + self.process_file_chunks(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd))) item.get_size(memorize=True) self.stats.nfiles += 1 self.add_item(item) @@ -983,7 +1032,7 @@ Utilization of max. archive size: {csize_max:.0%} else: status = 'A' # regular file, added item.hardlink_master = hardlinked - item.update(self.stat_simple_attrs(st)) + item.update(self.metadata_collector.stat_simple_attrs(st)) # Only chunkify the file if needed if chunks is not None: item.chunks = chunks @@ -991,14 +1040,14 @@ Utilization of max. archive size: {csize_max:.0%} with backup_io('open'): fh = Archive._open_rb(path) with os.fdopen(fh, 'rb') as fd: - self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh))) + self.process_file_chunks(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh))) if not is_special_file: # we must not memorize special files, because the contents of e.g. a # block or char device will change without its mtime/size/inode changing. cache.memorize_file(path_hash, st, [c.id for c in item.chunks]) status = status or 'M' # regular file, modified (if not 'A' already) self.stats.nfiles += 1 - item.update(self.stat_attrs(st, path)) + item.update(self.metadata_collector.stat_attrs(st, path)) item.get_size(memorize=True) if is_special_file: # we processed a special file like a regular file. reflect that in mode, @@ -1006,24 +1055,6 @@ Utilization of max. archive size: {csize_max:.0%} item.mode = stat.S_IFREG | stat.S_IMODE(item.mode) return status - @staticmethod - def list_archives(repository, key, manifest, cache=None): - # expensive! see also Manifest.list_archive_infos. - for name in manifest.archives: - yield Archive(repository, key, manifest, name, cache=cache) - - @staticmethod - def _open_rb(path): - try: - # if we have O_NOATIME, this likely will succeed if we are root or owner of file: - return os.open(path, flags_noatime) - except PermissionError: - if flags_noatime == flags_normal: - # we do not have O_NOATIME, no need to try again: - raise - # Was this EPERM due to the O_NOATIME flag? Try again without it: - return os.open(path, flags_normal) - def valid_msgpacked_dict(d, keys_serialized): """check if the data looks like a msgpacked dict""" @@ -1663,7 +1694,7 @@ class ArchiveRecreater: return item.chunks chunk_iterator = self.iter_chunks(archive, target, list(item.chunks)) chunk_processor = partial(self.chunk_processor, target) - target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor) + target.process_file_chunks(item, self.cache, target.stats, chunk_iterator, chunk_processor) def chunk_processor(self, target, data): chunk_id = self.key.id_hash(data) @@ -1759,6 +1790,11 @@ class ArchiveRecreater: target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params if target.recreate_rechunkify: logger.debug('Rechunking archive from %s to %s', source_chunker_params or '(unknown)', target.chunker_params) + target.process_file_chunks = ChunksProcessor( + cache=self.cache, key=self.key, + add_item=target.add_item, write_checkpoint=target.write_checkpoint, + checkpoint_interval=self.checkpoint_interval).process_file_chunks + target.chunker = Chunker(self.key.chunk_seed, *target.chunker_params) return target def create_target_archive(self, name): diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 7d27e8c64..0f2ebd9a8 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -33,10 +33,10 @@ import msgpack import borg from . import __version__ from . import helpers -from . import shellpattern from .algorithms.checksums import crc32 from .archive import Archive, ArchiveChecker, ArchiveRecreater, Statistics, is_special from .archive import BackupOSError, backup_io +from .archive import FilesystemObjectProcessors, MetadataCollector, ChunksProcessor from .cache import Cache, assert_secure from .constants import * # NOQA from .compress import CompressionSpec @@ -448,7 +448,7 @@ class Archiver: matcher = PatternMatcher(fallback=True) matcher.add_inclexcl(args.patterns) - def create_inner(archive, cache): + def create_inner(archive, cache, fso): # Add cache dir to inode_skip list skip_inodes = set() try: @@ -468,7 +468,7 @@ class Archiver: path = 'stdin' if not dry_run: try: - status = archive.process_stdin(path, cache) + status = fso.process_stdin(path, cache) except BackupOSError as e: status = 'E' self.print_warning('%s: %s', path, e) @@ -486,7 +486,7 @@ class Archiver: restrict_dev = st.st_dev else: restrict_dev = None - self._process(archive, cache, matcher, args.exclude_caches, args.exclude_if_present, + self._process(fso, cache, matcher, args.exclude_caches, args.exclude_if_present, args.keep_exclude_tags, skip_inodes, path, restrict_dev, read_special=args.read_special, dry_run=dry_run, st=st) if not dry_run: @@ -523,12 +523,20 @@ class Archiver: progress=args.progress, chunker_params=args.chunker_params, start=t0, start_monotonic=t0_monotonic, log_json=args.log_json) - create_inner(archive, cache) + metadata_collector = MetadataCollector(noatime=args.noatime, noctime=args.noctime, + numeric_owner=args.numeric_owner) + cp = ChunksProcessor(cache=cache, key=key, + add_item=archive.add_item, write_checkpoint=archive.write_checkpoint, + checkpoint_interval=args.checkpoint_interval) + fso = FilesystemObjectProcessors(metadata_collector=metadata_collector, cache=cache, key=key, + process_file_chunks=cp.process_file_chunks, add_item=archive.add_item, + chunker_params=args.chunker_params) + create_inner(archive, cache, fso) else: - create_inner(None, None) + create_inner(None, None, None) return self.exit_code - def _process(self, archive, cache, matcher, exclude_caches, exclude_if_present, + def _process(self, fso, cache, matcher, exclude_caches, exclude_if_present, keep_exclude_tags, skip_inodes, path, restrict_dev, read_special=False, dry_run=False, st=None): """ @@ -566,33 +574,33 @@ class Archiver: return if stat.S_ISREG(st.st_mode): if not dry_run: - status = archive.process_file(path, st, cache, self.ignore_inode) + status = fso.process_file(path, st, cache, self.ignore_inode) elif stat.S_ISDIR(st.st_mode): if recurse: tag_paths = dir_is_tagged(path, exclude_caches, exclude_if_present) if tag_paths: if keep_exclude_tags and not dry_run: - archive.process_dir(path, st) + fso.process_dir(path, st) for tag_path in tag_paths: - self._process(archive, cache, matcher, exclude_caches, exclude_if_present, + self._process(fso, cache, matcher, exclude_caches, exclude_if_present, keep_exclude_tags, skip_inodes, tag_path, restrict_dev, read_special=read_special, dry_run=dry_run) return if not dry_run: if not recurse_excluded_dir: - status = archive.process_dir(path, st) + status = fso.process_dir(path, st) if recurse: with backup_io('scandir'): entries = helpers.scandir_inorder(path) for dirent in entries: normpath = os.path.normpath(dirent.path) - self._process(archive, cache, matcher, exclude_caches, exclude_if_present, + self._process(fso, cache, matcher, exclude_caches, exclude_if_present, keep_exclude_tags, skip_inodes, normpath, restrict_dev, read_special=read_special, dry_run=dry_run) elif stat.S_ISLNK(st.st_mode): if not dry_run: if not read_special: - status = archive.process_symlink(path, st) + status = fso.process_symlink(path, st) else: try: st_target = os.stat(path) @@ -601,27 +609,27 @@ class Archiver: else: special = is_special(st_target.st_mode) if special: - status = archive.process_file(path, st_target, cache) + status = fso.process_file(path, st_target, cache) else: - status = archive.process_symlink(path, st) + status = fso.process_symlink(path, st) elif stat.S_ISFIFO(st.st_mode): if not dry_run: if not read_special: - status = archive.process_fifo(path, st) + status = fso.process_fifo(path, st) else: - status = archive.process_file(path, st, cache) + status = fso.process_file(path, st, cache) elif stat.S_ISCHR(st.st_mode): if not dry_run: if not read_special: - status = archive.process_dev(path, st, 'c') + status = fso.process_dev(path, st, 'c') else: - status = archive.process_file(path, st, cache) + status = fso.process_file(path, st, cache) elif stat.S_ISBLK(st.st_mode): if not dry_run: if not read_special: - status = archive.process_dev(path, st, 'b') + status = fso.process_dev(path, st, 'b') else: - status = archive.process_file(path, st, cache) + status = fso.process_file(path, st, cache) elif stat.S_ISSOCK(st.st_mode): # Ignore unix sockets return