in-file checkpoints, fixes #1198, fixes #1093

also: unify code for stdin and on-disk file processing
This commit is contained in:
Thomas Waldmann 2016-06-26 18:07:01 +02:00
parent 75d91c4bd1
commit dd5f957e6f
3 changed files with 28 additions and 16 deletions

View file

@ -331,13 +331,10 @@ Number of files: {0.stats.nfiles}'''.format(
for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
yield item
def add_item(self, item):
if self.show_progress:
def add_item(self, item, show_progress=True):
if show_progress and self.show_progress:
self.stats.show_progress(item=item, dt=0.2)
self.items_buffer.add(item)
if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
self.write_checkpoint()
self.last_checkpoint = time.time()
def write_checkpoint(self):
self.save(self.checkpoint_name)
@ -712,6 +709,26 @@ Number of files: {0.stats.nfiles}'''.format(
self.add_item(item)
return 's' # symlink
def chunk_file(self, item, cache, stats, fd, fh=-1, **chunk_kw):
checkpoint_number = 1
item.chunks = []
for data in backup_io_iter(self.chunker.chunkify(fd, fh)):
item.chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats))
if self.show_progress:
self.stats.show_progress(item=item, dt=0.2)
if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
checkpoint_item = Item(internal_dict=item.as_dict())
checkpoint_item.path += '.checkpoint_%d' % checkpoint_number
checkpoint_item.checkpoint = checkpoint_number
checkpoint_number += 1
self.add_item(checkpoint_item, show_progress=False)
self.write_checkpoint()
self.last_checkpoint = time.time()
# we have saved the checkpoint file, but we will reference the same
# chunks also from the checkpoint or the "real" file we save next
for chunk in checkpoint_item.chunks:
cache.chunk_incref(chunk.id, stats)
def process_stdin(self, path, cache):
uid, gid = 0, 0
t = int(time.time()) * 1000000000
@ -723,9 +740,7 @@ Number of files: {0.stats.nfiles}'''.format(
mtime=t, atime=t, ctime=t,
)
fd = sys.stdin.buffer # binary
item.chunks = []
for data in backup_io_iter(self.chunker.chunkify(fd)):
item.chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
self.chunk_file(item, cache, self.stats, fd)
self.stats.nfiles += 1
self.add_item(item)
return 'i' # stdin
@ -781,13 +796,7 @@ Number of files: {0.stats.nfiles}'''.format(
with backup_io():
fh = Archive._open_rb(path)
with os.fdopen(fh, 'rb') as fd:
item.chunks = []
for data in backup_io_iter(self.chunker.chunkify(fd, fh)):
item.chunks.append(cache.add_chunk(self.key.id_hash(data),
Chunk(data, compress=compress),
self.stats))
if self.show_progress:
self.stats.show_progress(item=item, dt=0.2)
self.chunk_file(item, cache, self.stats, fd, fh, compress=compress)
if not is_special_file:
# we must not memorize special files, because the contents of e.g. a
# block or char device will change without its mtime/size/inode changing.

View file

@ -1,7 +1,8 @@
# this set must be kept complete, otherwise the RobustUnpacker might malfunction:
ITEM_KEYS = frozenset(['path', 'source', 'rdev', 'chunks', 'chunks_healthy', 'hardlink_master',
'mode', 'user', 'group', 'uid', 'gid', 'mtime', 'atime', 'ctime',
'xattrs', 'bsdflags', 'acl_nfs4', 'acl_access', 'acl_default', 'acl_extended', ])
'xattrs', 'bsdflags', 'acl_nfs4', 'acl_access', 'acl_default', 'acl_extended',
'checkpoint'])
# this is the set of keys that are always present in items:
REQUIRED_ITEM_KEYS = frozenset(['path', 'mtime', ])

View file

@ -155,6 +155,8 @@ class Item(PropDict):
deleted = PropDict._make_property('deleted', bool)
nlink = PropDict._make_property('nlink', int)
checkpoint = PropDict._make_property('checkpoint', int)
class EncryptedKey(PropDict):
"""