from configparser import RawConfigParser from .remote import cache_if_remote import errno import msgpack import os import stat import sys import threading from binascii import hexlify import shutil import tarfile import tempfile from .key import PlaintextKey from .helpers import Error, get_cache_dir, decode_dict, st_mtime_ns, unhexlify, int_to_bigint, \ bigint_to_int from .locking import UpgradableLock from .hashindex import ChunkIndex class Cache: """Client Side cache """ class RepositoryReplay(Error): """Cache is newer than repository, refusing to continue""" class CacheInitAbortedError(Error): """Cache initialization aborted""" class RepositoryAccessAborted(Error): """Repository access aborted""" class EncryptionMethodMismatch(Error): """Repository encryption method changed since last acccess, refusing to continue """ class ChunkSizeNotReady(Exception): """computation of some chunk size is not yet finished""" def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True): self.lock = None self.timestamp = None self.thread_lock = threading.Lock() self.txn_active = False self.repository = repository self.key = key self.manifest = manifest self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii')) self.do_files = do_files # Warn user before sending data to a never seen before unencrypted repository if not os.path.exists(self.path): if warn_if_unencrypted and isinstance(key, PlaintextKey): if not self._confirm('Warning: Attempting to access a previously unknown unencrypted repository', 'BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'): raise self.CacheInitAbortedError() self.create() self.open() # Warn user before sending data to a relocated repository if self.previous_location and self.previous_location != repository._location.canonical_path(): msg = 'Warning: The repository at location {} was previously located at {}'.format(repository._location.canonical_path(), self.previous_location) if not self._confirm(msg, 'BORG_RELOCATED_REPO_ACCESS_IS_OK'): raise self.RepositoryAccessAborted() if sync and self.manifest.id != self.manifest_id: # If repository is older than the cache something fishy is going on if self.timestamp and self.timestamp > manifest.timestamp: raise self.RepositoryReplay() # Make sure an encrypted repository has not been swapped for an unencrypted repository if self.key_type is not None and self.key_type != str(key.TYPE): raise self.EncryptionMethodMismatch() self.sync() self.commit() def __del__(self): self.close() def _confirm(self, message, env_var_override=None): print(message, file=sys.stderr) if env_var_override and os.environ.get(env_var_override): print("Yes (From {})".format(env_var_override)) return True if not sys.stdin.isatty(): return False try: answer = input('Do you want to continue? [yN] ') except EOFError: return False return answer and answer in 'Yy' def create(self): """Create a new empty cache at `self.path` """ os.makedirs(self.path) with open(os.path.join(self.path, 'README'), 'w') as fd: fd.write('This is a Borg cache') config = RawConfigParser() config.add_section('cache') config.set('cache', 'version', '1') config.set('cache', 'repository', hexlify(self.repository.id).decode('ascii')) config.set('cache', 'manifest', '') with open(os.path.join(self.path, 'config'), 'w') as fd: config.write(fd) ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8')) with open(os.path.join(self.path, 'chunks.archive'), 'wb') as fd: pass # empty file with open(os.path.join(self.path, 'files'), 'wb') as fd: pass # empty file def destroy(self): """destroy the cache at `self.path` """ self.close() os.remove(os.path.join(self.path, 'config')) # kill config first shutil.rmtree(self.path) def _do_open(self): self.config = RawConfigParser() self.config.read(os.path.join(self.path, 'config')) if self.config.getint('cache', 'version') != 1: raise Exception('%s Does not look like a Borg cache') self.id = self.config.get('cache', 'repository') self.manifest_id = unhexlify(self.config.get('cache', 'manifest')) self.timestamp = self.config.get('cache', 'timestamp', fallback=None) self.key_type = self.config.get('cache', 'key_type', fallback=None) self.previous_location = self.config.get('cache', 'previous_location', fallback=None) self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8')) self.files = None def open(self): if not os.path.isdir(self.path): raise Exception('%s Does not look like a Borg cache' % self.path) self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True).acquire() self.rollback() def close(self): if self.lock: self.lock.release() self.lock = None def _read_files(self): self.files = {} self._newest_mtime = 0 with open(os.path.join(self.path, 'files'), 'rb') as fd: u = msgpack.Unpacker(use_list=True) while True: data = fd.read(64 * 1024) if not data: break u.feed(data) for path_hash, item in u: item[0] += 1 # in the end, this takes about 240 Bytes per file self.files[path_hash] = msgpack.packb(item) def begin_txn(self): # Initialize transaction snapshot txn_dir = os.path.join(self.path, 'txn.tmp') os.mkdir(txn_dir) shutil.copy(os.path.join(self.path, 'config'), txn_dir) shutil.copy(os.path.join(self.path, 'chunks'), txn_dir) shutil.copy(os.path.join(self.path, 'chunks.archive'), txn_dir) shutil.copy(os.path.join(self.path, 'files'), txn_dir) os.rename(os.path.join(self.path, 'txn.tmp'), os.path.join(self.path, 'txn.active')) self.txn_active = True def commit(self): """Commit transaction """ if not self.txn_active: return if self.files is not None: with open(os.path.join(self.path, 'files'), 'wb') as fd: for path_hash, item in self.files.items(): # Discard cached files with the newest mtime to avoid # issues with filesystem snapshots and mtime precision item = msgpack.unpackb(item) if item[0] < 10 and bigint_to_int(item[3]) < self._newest_mtime: msgpack.pack((path_hash, item), fd) self.config.set('cache', 'manifest', hexlify(self.manifest.id).decode('ascii')) self.config.set('cache', 'timestamp', self.manifest.timestamp) self.config.set('cache', 'key_type', str(self.key.TYPE)) self.config.set('cache', 'previous_location', self.repository._location.canonical_path()) with open(os.path.join(self.path, 'config'), 'w') as fd: self.config.write(fd) self.chunks.write(os.path.join(self.path, 'chunks').encode('utf-8')) os.rename(os.path.join(self.path, 'txn.active'), os.path.join(self.path, 'txn.tmp')) shutil.rmtree(os.path.join(self.path, 'txn.tmp')) self.txn_active = False def rollback(self): """Roll back partial and aborted transactions """ # Remove partial transaction if os.path.exists(os.path.join(self.path, 'txn.tmp')): shutil.rmtree(os.path.join(self.path, 'txn.tmp')) # Roll back active transaction txn_dir = os.path.join(self.path, 'txn.active') if os.path.exists(txn_dir): shutil.copy(os.path.join(txn_dir, 'config'), self.path) shutil.copy(os.path.join(txn_dir, 'chunks'), self.path) shutil.copy(os.path.join(txn_dir, 'chunks.archive'), self.path) shutil.copy(os.path.join(txn_dir, 'files'), self.path) os.rename(txn_dir, os.path.join(self.path, 'txn.tmp')) if os.path.exists(os.path.join(self.path, 'txn.tmp')): shutil.rmtree(os.path.join(self.path, 'txn.tmp')) self.txn_active = False self._do_open() def sync(self): """Re-synchronize chunks cache with repository. If present, uses a compressed tar archive of known backup archive indices, so it only needs to fetch infos from repo and build a chunk index once per backup archive. If out of sync, the tar gets rebuilt from known + fetched chunk infos, so it has complete and current information about all backup archives. Finally, it builds the master chunks index by merging all indices from the tar. Note: compression (esp. xz) is very effective in keeping the tar relatively small compared to the files it contains. """ in_archive_path = os.path.join(self.path, 'chunks.archive') out_archive_path = os.path.join(self.path, 'chunks.archive.tmp') def open_in_archive(): try: tf = tarfile.open(in_archive_path, 'r') except OSError as e: if e.errno != errno.ENOENT: raise # file not found tf = None except tarfile.ReadError: # empty file? tf = None return tf def open_out_archive(): for compression in ('xz', 'bz2', 'gz'): # xz needs py 3.3, bz2 and gz also work on 3.2 try: tf = tarfile.open(out_archive_path, 'w:'+compression, format=tarfile.PAX_FORMAT) break except tarfile.CompressionError: continue else: # shouldn't happen tf = None return tf def close_archive(tf): if tf: tf.close() def delete_in_archive(): os.unlink(in_archive_path) def rename_out_archive(): os.rename(out_archive_path, in_archive_path) def add(chunk_idx, id, size, csize, incr=1): try: count, size, csize = chunk_idx[id] chunk_idx[id] = count + incr, size, csize except KeyError: chunk_idx[id] = incr, size, csize def transfer_known_idx(archive_id, tf_in, tf_out): archive_id_hex = hexlify(archive_id).decode('ascii') tarinfo = tf_in.getmember(archive_id_hex) archive_name = tarinfo.pax_headers['archive_name'] print('Already known archive:', archive_name) f_in = tf_in.extractfile(archive_id_hex) tf_out.addfile(tarinfo, f_in) return archive_name def fetch_and_build_idx(archive_id, repository, key, tmp_dir, tf_out): chunk_idx = ChunkIndex() cdata = repository.get(archive_id) data = key.decrypt(archive_id, cdata) add(chunk_idx, archive_id, len(data), len(cdata)) archive = msgpack.unpackb(data) if archive[b'version'] != 1: raise Exception('Unknown archive metadata version') decode_dict(archive, (b'name',)) print('Analyzing new archive:', archive[b'name']) unpacker = msgpack.Unpacker() for item_id, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])): data = key.decrypt(item_id, chunk) add(chunk_idx, item_id, len(data), len(chunk)) unpacker.feed(data) for item in unpacker: if not isinstance(item, dict): print('Error: Did not get expected metadata dict - archive corrupted!') continue if b'chunks' in item: for chunk_id, size, csize in item[b'chunks']: add(chunk_idx, chunk_id, size, csize) archive_id_hex = hexlify(archive_id).decode('ascii') file_tmp = os.path.join(tmp_dir, archive_id_hex).encode('utf-8') chunk_idx.write(file_tmp) tarinfo = tf_out.gettarinfo(file_tmp, archive_id_hex) tarinfo.pax_headers['archive_name'] = archive[b'name'] with open(file_tmp, 'rb') as f: tf_out.addfile(tarinfo, f) os.unlink(file_tmp) def create_master_idx(chunk_idx, tf_in, tmp_dir): chunk_idx.clear() for tarinfo in tf_in: archive_id_hex = tarinfo.name archive_name = tarinfo.pax_headers['archive_name'] print("- extracting archive %s ..." % archive_name) tf_in.extract(archive_id_hex, tmp_dir) chunk_idx_path = os.path.join(tmp_dir, archive_id_hex).encode('utf-8') print("- reading archive ...") archive_chunk_idx = ChunkIndex.read(chunk_idx_path) print("- merging archive ...") chunk_idx.merge(archive_chunk_idx) os.unlink(chunk_idx_path) self.begin_txn() print('Synchronizing chunks cache...') # XXX we have to do stuff on disk due to lacking ChunkIndex api with tempfile.TemporaryDirectory(prefix='borg-tmp') as tmp_dir: repository = cache_if_remote(self.repository) out_archive = open_out_archive() in_archive = open_in_archive() if in_archive: known_ids = set(unhexlify(hexid) for hexid in in_archive.getnames()) else: known_ids = set() archive_ids = set(info[b'id'] for info in self.manifest.archives.values()) print('Rebuilding archive collection. Known: %d Repo: %d Unknown: %d' % ( len(known_ids), len(archive_ids), len(archive_ids - known_ids), )) for archive_id in archive_ids & known_ids: transfer_known_idx(archive_id, in_archive, out_archive) close_archive(in_archive) delete_in_archive() # free disk space for archive_id in archive_ids - known_ids: fetch_and_build_idx(archive_id, repository, self.key, tmp_dir, out_archive) close_archive(out_archive) rename_out_archive() print('Merging collection into master chunks cache...') in_archive = open_in_archive() create_master_idx(self.chunks, in_archive, tmp_dir) close_archive(in_archive) print('Done.') def add_chunk(self, id, data, stats): if not self.txn_active: self.begin_txn() size = len(data) if self.seen_chunk(id, size): return self.chunk_incref(id, stats) data = self.key.encrypt(data) csize = len(data) self.repository.put(id, data, wait=False) self.chunks[id] = (1, size, csize) stats.update(size, csize, True) return id, size, csize def chunk_modify(self, id, count=None, delta=None, size=None, csize=None): """modify a self.chunks entry, return the new value. must be thread safe. """ with self.thread_lock: _count, _size, _csize = self.chunks[id] modified = False if size is not None and size != _size: assert _size == 0 _size = size modified = True if csize is not None and csize != _csize: assert _csize == 0 _csize = csize modified = True if count is not None and count != _count: assert _count == 0 _count = count modified = True if delta is not None and delta != 0: _count += delta assert _count >= 0 modified = True if modified: self.chunks[id] = _count, _size, _csize return _count, _size, _csize def add_chunk_nostats(self, cchunk, id, size, csize): # do not update stats here, see postprocess if not self.txn_active: self.begin_txn() new_chunk = cchunk is not None if new_chunk: # note: count = 1 already set in seen_or_announce_chunk _, size, csize = self.chunk_modify(id, size=size, csize=csize) self.repository.put(id, cchunk, wait=False) else: # note: csize might be still 0 (not yet computed) here _, size, csize = self.chunk_modify(id, delta=1, size=size) return size, csize, new_chunk def postprocess_results(self, size_infos, results, stats): # we need to do some post processing: # - chunks that are duplicate may have csize not yet set correctly due # to the multi threaded processing. all (x, 0) sizes must be still # set using the correct size from the other duplicate chunk (not x, 0). # - we need to reconstruct the correct order of the chunks. # - we need to fix the stats now we have the correct csize chunks = [] for _, id, new_chunk in sorted(results): try: size, csize = size_infos[id] except KeyError: raise self.ChunkSizeNotReady chunks.append((id, size, csize, new_chunk)) # do another pass after we have made sure we have all size info results = [] for id, size, csize, new_chunk in chunks: stats.update(size, csize, new_chunk) results.append((id, size, csize)) return results def seen_chunk(self, id, size=None): refcount, stored_size, _ = self.chunks.get(id, (0, None, None)) if size is not None and stored_size is not None and size != stored_size: # we already have a chunk with that id, but different size. # this is either a hash collision (unlikely) or corruption or a bug. raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % ( id, stored_size, size)) return refcount def seen_or_announce_chunk(self, id, size): """return True if we have seen the chunk already (thus, we already have it or will have it soon). in case we don't have seen it, announce its (future) availability, return False. must be thread safe. """ with self.thread_lock: try: # did we see this id already (and is count > 0)? count, _size, _csize = self.chunks[id] if size != _size: raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % ( id, _size, size)) return count > 0 except KeyError: # announce that we will put this chunk soon, # so that deduplication knows we already have it. self.chunks[id] = 1, size, 0 return False def chunk_incref(self, id, stats): if not self.txn_active: self.begin_txn() count, size, csize = self.chunks[id] self.chunks[id] = (count + 1, size, csize) stats.update(size, csize, False) return id, size, csize def chunk_decref(self, id, stats): if not self.txn_active: self.begin_txn() count, size, csize = self.chunks[id] if count == 1: del self.chunks[id] self.repository.delete(id, wait=False) stats.update(-size, -csize, True) else: self.chunks[id] = (count - 1, size, csize) stats.update(-size, -csize, False) def file_known_and_unchanged(self, path_hash, st): if not (self.do_files and stat.S_ISREG(st.st_mode)): return None if self.files is None: self._read_files() entry = self.files.get(path_hash) if not entry: return None entry = msgpack.unpackb(entry) if entry[2] == st.st_size and bigint_to_int(entry[3]) == st_mtime_ns(st) and entry[1] == st.st_ino: # reset entry age entry[0] = 0 self.files[path_hash] = msgpack.packb(entry) return entry[4] else: return None def memorize_file(self, path_hash, st, ids): if not (self.do_files and stat.S_ISREG(st.st_mode)): return # Entry: Age, inode, size, mtime, chunk ids mtime_ns = st_mtime_ns(st) self.files[path_hash] = msgpack.packb((0, st.st_ino, st.st_size, int_to_bigint(mtime_ns), ids)) self._newest_mtime = max(self._newest_mtime, mtime_ns)