From b148a366fe34db6d506ee6597285303ff1e9ad13 Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Wed, 1 Nov 2017 03:02:25 +0100 Subject: [PATCH] fuse: Separate creation of filesystem from implementation of llfuse funcs (#3042) fuse: Separate creation of filesystem from implementation of llfuse funcs --- src/borg/fuse.py | 325 ++++++++++++++++++++++++----------------------- 1 file changed, 166 insertions(+), 159 deletions(-) diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 129e3b20b..401520b2a 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -202,27 +202,21 @@ class ItemCache: self.write_offset = write_offset -class FuseOperations(llfuse.Operations): - """Export archive as a FUSE filesystem +class FuseBackend(object): + """Virtual filesystem based on archive(s) to provide information to fuse """ - # mount options - allow_damaged_files = False - versions = False - def __init__(self, key, repository, manifest, args, decrypted_repository): - super().__init__() + def __init__(self, key, manifest, repository, args, decrypted_repository): self.repository_uncached = repository - self.decrypted_repository = decrypted_repository - self.args = args - self.manifest = manifest + self._args = args + self._manifest = manifest self.key = key - # Maps inode numbers to Item instances. This is used for synthetic inodes, - # i.e. file-system objects that are made up by FuseOperations and are not contained - # in the archives. For example archive directories or intermediate directories + # Maps inode numbers to Item instances. This is used for synthetic inodes, i.e. file-system objects that are + # made up and are not contained in the archives. For example archive directories or intermediate directories # not contained in archives. - self.items = {} - # _inode_count is the current count of synthetic inodes, i.e. those in self.items - self._inode_count = 0 + self._items = {} + # _inode_count is the current count of synthetic inodes, i.e. those in self._items + self.inode_count = 0 # Maps inode numbers to the inode number of the parent self.parent = {} # Maps inode numbers to a dictionary mapping byte directory entry names to their inode numbers, @@ -231,34 +225,179 @@ class FuseOperations(llfuse.Operations): self.default_uid = os.getuid() self.default_gid = os.getgid() self.default_dir = Item(mode=0o40755, mtime=int(time.time() * 1e9), uid=self.default_uid, gid=self.default_gid) + # Archives to be loaded when first accessed, mapped by their placeholder inode self.pending_archives = {} self.cache = ItemCache(decrypted_repository) - data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1)) - logger.debug('mount data cache capacity: %d chunks', data_cache_capacity) - self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None) + self.allow_damaged_files = False + self.versions = False def _create_filesystem(self): self._create_dir(parent=1) # first call, create root dir (inode == 1) - if self.args.location.archive: - self.process_archive(self.args.location.archive) + if self._args.location.archive: + self._process_archive(self._args.location.archive) else: self.versions_index = FuseVersionsIndex() - for archive in self.manifest.archives.list_considering(self.args): + for archive in self._manifest.archives.list_considering(self._args): if self.versions: # process archives immediately - self.process_archive(archive.name) + self._process_archive(archive.name) else: # lazily load archives, create archive placeholder inode archive_inode = self._create_dir(parent=1, mtime=int(archive.ts.timestamp() * 1e9)) self.contents[1][os.fsencode(archive.name)] = archive_inode self.pending_archives[archive_inode] = archive.name + def get_item(self, inode): + try: + return self._items[inode] + except KeyError: + return self.cache.get(inode) + + def check_pending_archive(self, inode): + # Check if this is an archive we need to load + archive_name = self.pending_archives.pop(inode, None) + if archive_name is not None: + self._process_archive(archive_name, [os.fsencode(archive_name)]) + + def _allocate_inode(self): + self.inode_count += 1 + return self.inode_count + + def _create_dir(self, parent, mtime=None): + """Create directory + """ + ino = self._allocate_inode() + if mtime is not None: + self._items[ino] = Item(**self.default_dir.as_dict()) + self._items[ino].mtime = mtime + else: + self._items[ino] = self.default_dir + self.parent[ino] = parent + return ino + + def find_inode(self, path, prefix=[]): + segments = prefix + path.split(b'/') + inode = 1 + for segment in segments: + inode = self.contents[inode][segment] + return inode + + def _process_archive(self, archive_name, prefix=[]): + """Build FUSE inode hierarchy from archive metadata + """ + self.file_versions = {} # for versions mode: original path -> version + t0 = time.perf_counter() + archive = Archive(self.repository_uncached, self.key, self._manifest, archive_name, + consider_part_files=self._args.consider_part_files) + for item_inode, item in self.cache.iter_archive_items(archive.metadata.items): + path = os.fsencode(item.path) + is_dir = stat.S_ISDIR(item.mode) + if is_dir: + try: + # This can happen if an archive was created with a command line like + # $ borg create ... dir1/file dir1 + # In this case the code below will have created a default_dir inode for dir1 already. + inode = self.find_inode(path, prefix) + except KeyError: + pass + else: + self._items[inode] = item + continue + segments = prefix + path.split(b'/') + parent = 1 + for segment in segments[:-1]: + parent = self._process_inner(segment, parent) + self._process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode) + duration = time.perf_counter() - t0 + logger.debug('fuse: _process_archive completed in %.1f s for archive %s', duration, archive.name) + + def _process_leaf(self, name, item, parent, prefix, is_dir, item_inode): + def file_version(item, path): + if 'chunks' in item: + file_id = blake2b_128(path) + current_version, previous_id = self.versions_index.get(file_id, (0, None)) + + chunk_ids = [chunk_id for chunk_id, _, _ in item.chunks] + contents_id = blake2b_128(b''.join(chunk_ids)) + + if contents_id != previous_id: + current_version += 1 + self.versions_index[file_id] = current_version, contents_id + + return current_version + + def make_versioned_name(name, version, add_dir=False): + if add_dir: + # add intermediate directory with same name as filename + path_fname = name.rsplit(b'/', 1) + name += b'/' + path_fname[-1] + # keep original extension at end to avoid confusing tools + name, ext = os.path.splitext(name) + version_enc = os.fsencode('.%05d' % version) + return name + version_enc + ext + + if self.versions and not is_dir: + parent = self._process_inner(name, parent) + path = os.fsencode(item.path) + version = file_version(item, path) + if version is not None: + # regular file, with contents - maybe a hardlink master + name = make_versioned_name(name, version) + self.file_versions[path] = version + + path = item.path + del item.path # save some space + if 'source' in item and hardlinkable(item.mode): + # a hardlink, no contents, is the hardlink master + source = os.fsencode(item.source) + if self.versions: + # adjust source name with version + version = self.file_versions[source] + source = make_versioned_name(source, version, add_dir=True) + name = make_versioned_name(name, version) + try: + inode = self.find_inode(source, prefix) + except KeyError: + logger.warning('Skipping broken hard link: %s -> %s', path, item.source) + return + item = self.cache.get(inode) + item.nlink = item.get('nlink', 1) + 1 + self._items[inode] = item + else: + inode = item_inode + self.parent[inode] = parent + if name: + self.contents[parent][name] = inode + + def _process_inner(self, name, parent_inode): + dir = self.contents[parent_inode] + if name in dir: + inode = dir[name] + else: + inode = self._create_dir(parent_inode) + if name: + dir[name] = inode + return inode + + +class FuseOperations(llfuse.Operations, FuseBackend): + """Export archive as a FUSE filesystem + """ + + def __init__(self, key, repository, manifest, args, decrypted_repository): + llfuse.Operations.__init__(self) + FuseBackend.__init__(self, key, manifest, repository, args, decrypted_repository) + self.decrypted_repository = decrypted_repository + data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1)) + logger.debug('mount data cache capacity: %d chunks', data_cache_capacity) + self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None) + def sig_info_handler(self, sig_no, stack): logger.debug('fuse: %d synth inodes, %d edges (%s)', - self._inode_count, len(self.parent), + self.inode_count, len(self.parent), # getsizeof is the size of the dict itself; key and value are two small-ish integers, # which are shared due to code structure (this has been verified). - format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self._inode_count))) + format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self.inode_count))) logger.debug('fuse: %d pending archives', len(self.pending_archives)) logger.debug('fuse: ItemCache %d entries (%d direct, %d indirect), meta-array size %s, direct items size %s', self.cache.direct_items + self.cache.indirect_items, self.cache.direct_items, self.cache.indirect_items, @@ -306,119 +445,6 @@ class FuseOperations(llfuse.Operations): finally: llfuse.close(umount) - def _create_dir(self, parent, mtime=None): - """Create directory - """ - ino = self.allocate_inode() - if mtime is not None: - self.items[ino] = Item(**self.default_dir.as_dict()) - self.items[ino].mtime = mtime - else: - self.items[ino] = self.default_dir - self.parent[ino] = parent - return ino - - def process_archive(self, archive_name, prefix=[]): - """Build FUSE inode hierarchy from archive metadata - """ - self.file_versions = {} # for versions mode: original path -> version - t0 = time.perf_counter() - archive = Archive(self.repository_uncached, self.key, self.manifest, archive_name, - consider_part_files=self.args.consider_part_files) - for item_inode, item in self.cache.iter_archive_items(archive.metadata.items): - path = os.fsencode(item.path) - is_dir = stat.S_ISDIR(item.mode) - if is_dir: - try: - # This can happen if an archive was created with a command line like - # $ borg create ... dir1/file dir1 - # In this case the code below will have created a default_dir inode for dir1 already. - inode = self._find_inode(path, prefix) - except KeyError: - pass - else: - self.items[inode] = item - continue - segments = prefix + path.split(b'/') - parent = 1 - for segment in segments[:-1]: - parent = self.process_inner(segment, parent) - self.process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode) - duration = time.perf_counter() - t0 - logger.debug('fuse: process_archive completed in %.1f s for archive %s', duration, archive.name) - - def process_leaf(self, name, item, parent, prefix, is_dir, item_inode): - def file_version(item, path): - if 'chunks' in item: - file_id = blake2b_128(path) - current_version, previous_id = self.versions_index.get(file_id, (0, None)) - - chunk_ids = [chunk_id for chunk_id, _, _ in item.chunks] - contents_id = blake2b_128(b''.join(chunk_ids)) - - if contents_id != previous_id: - current_version += 1 - self.versions_index[file_id] = current_version, contents_id - - return current_version - - def make_versioned_name(name, version, add_dir=False): - if add_dir: - # add intermediate directory with same name as filename - path_fname = name.rsplit(b'/', 1) - name += b'/' + path_fname[-1] - # keep original extension at end to avoid confusing tools - name, ext = os.path.splitext(name) - version_enc = os.fsencode('.%05d' % version) - return name + version_enc + ext - - if self.versions and not is_dir: - parent = self.process_inner(name, parent) - path = os.fsencode(item.path) - version = file_version(item, path) - if version is not None: - # regular file, with contents - maybe a hardlink master - name = make_versioned_name(name, version) - self.file_versions[path] = version - - path = item.path - del item.path # save some space - if 'source' in item and hardlinkable(item.mode): - # a hardlink, no contents, is the hardlink master - source = os.fsencode(item.source) - if self.versions: - # adjust source name with version - version = self.file_versions[source] - source = make_versioned_name(source, version, add_dir=True) - name = make_versioned_name(name, version) - try: - inode = self._find_inode(source, prefix) - except KeyError: - logger.warning('Skipping broken hard link: %s -> %s', path, item.source) - return - item = self.cache.get(inode) - item.nlink = item.get('nlink', 1) + 1 - self.items[inode] = item - else: - inode = item_inode - self.parent[inode] = parent - if name: - self.contents[parent][name] = inode - - def process_inner(self, name, parent_inode): - dir = self.contents[parent_inode] - if name in dir: - inode = dir[name] - else: - inode = self._create_dir(parent_inode) - if name: - dir[name] = inode - return inode - - def allocate_inode(self): - self._inode_count += 1 - return self._inode_count - def statfs(self, ctx=None): stat_ = llfuse.StatvfsData() stat_.f_bsize = 512 @@ -431,19 +457,6 @@ class FuseOperations(llfuse.Operations): stat_.f_favail = 0 return stat_ - def get_item(self, inode): - try: - return self.items[inode] - except KeyError: - return self.cache.get(inode) - - def _find_inode(self, path, prefix=[]): - segments = prefix + path.split(b'/') - inode = 1 - for segment in segments: - inode = self.contents[inode][segment] - return inode - def getattr(self, inode, ctx=None): item = self.get_item(inode) entry = llfuse.EntryAttributes() @@ -482,14 +495,8 @@ class FuseOperations(llfuse.Operations): except KeyError: raise llfuse.FUSEError(llfuse.ENOATTR) from None - def _load_pending_archive(self, inode): - # Check if this is an archive we need to load - archive_name = self.pending_archives.pop(inode, None) - if archive_name: - self.process_archive(archive_name, [os.fsencode(archive_name)]) - def lookup(self, parent_inode, name, ctx=None): - self._load_pending_archive(parent_inode) + self.check_pending_archive(parent_inode) if name == b'.': inode = parent_inode elif name == b'..': @@ -513,7 +520,7 @@ class FuseOperations(llfuse.Operations): return inode def opendir(self, inode, ctx=None): - self._load_pending_archive(inode) + self.check_pending_archive(inode) return inode def read(self, fh, offset, size):