From c52861e0cafafa2988ae98c3e25918068463012d Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 14 May 2016 22:46:41 +0200 Subject: [PATCH] Improve LoggedIO write performance, make commit mechanism more solid - Instead of very small (5 MB-ish) segment files, use larger ones - Request asynchronous write-out or write-through (TODO) where it is supported, to achieve a continuously high throughput for writes - Instead of depending on ordered writes (write data, commit tag, sync) for consistency, do a double-sync commit as more serious RDBMS also do i.e. write data, sync, write commit tag, sync Since commits are very expensive in Borg at the moment this makes no difference performance-wise. New platform APIs: SyncFile, sync_dir [x] Naive implementation (equivalent to what Borg did before) [x] Linux implementation [ ] Windows implementation [-] OSX implementation (F_FULLSYNC) --- borg/constants.py | 10 ++++- borg/helpers.py | 2 +- borg/platform.py | 12 ++---- borg/platform_base.py | 78 +++++++++++++++++++++++++++++++++++++++ borg/platform_darwin.pyx | 2 +- borg/platform_freebsd.pyx | 2 +- borg/platform_linux.pyx | 63 ++++++++++++++++++++++++++----- borg/repository.py | 38 ++++++++++--------- docs/usage.rst | 16 ++++++++ 9 files changed, 181 insertions(+), 42 deletions(-) create mode 100644 borg/platform_base.py diff --git a/borg/constants.py b/borg/constants.py index 95b16c47a..0f6d3ddc7 100644 --- a/borg/constants.py +++ b/borg/constants.py @@ -12,8 +12,14 @@ UMASK_DEFAULT = 0o077 CACHE_TAG_NAME = 'CACHEDIR.TAG' CACHE_TAG_CONTENTS = b'Signature: 8a477f597d28d172789f06886806bc55' -DEFAULT_MAX_SEGMENT_SIZE = 5 * 1024 * 1024 -DEFAULT_SEGMENTS_PER_DIR = 10000 +# A large, but not unreasonably large segment size. Always less than 2 GiB (for legacy file systems). We choose +# 500 MiB which means that no indirection from the inode is needed for typical Linux file systems. +# Note that this is a soft-limit and can be exceeded (worst case) by a full maximum chunk size and some metadata +# bytes. That's why it's 500 MiB instead of 512 MiB. +DEFAULT_MAX_SEGMENT_SIZE = 500 * 1024 * 1024 + +# A few hundred files per directory to go easy on filesystems which don't like too many files per dir (NTFS) +DEFAULT_SEGMENTS_PER_DIR = 500 CHUNK_MIN_EXP = 19 # 2**19 == 512kiB CHUNK_MAX_EXP = 23 # 2**23 == 8MiB diff --git a/borg/helpers.py b/borg/helpers.py index 7d8905942..c73553e94 100644 --- a/borg/helpers.py +++ b/borg/helpers.py @@ -82,7 +82,7 @@ def check_extension_modules(): raise ExtensionModuleError if crypto.API_VERSION != 3: raise ExtensionModuleError - if platform.API_VERSION != 2: + if platform.API_VERSION != 3: raise ExtensionModuleError diff --git a/borg/platform.py b/borg/platform.py index 1bc8ee5e4..9c1c4ebed 100644 --- a/borg/platform.py +++ b/borg/platform.py @@ -1,16 +1,10 @@ import sys +from .platform_base import acl_get, acl_set, SyncFile, sync_dir, API_VERSION + if sys.platform.startswith('linux'): # pragma: linux only - from .platform_linux import acl_get, acl_set, API_VERSION + from .platform_linux import acl_get, acl_set, SyncFile, API_VERSION elif sys.platform.startswith('freebsd'): # pragma: freebsd only from .platform_freebsd import acl_get, acl_set, API_VERSION elif sys.platform == 'darwin': # pragma: darwin only from .platform_darwin import acl_get, acl_set, API_VERSION -else: # pragma: unknown platform only - API_VERSION = 2 - - def acl_get(path, item, st, numeric_owner=False): - pass - - def acl_set(path, item, numeric_owner=False): - pass diff --git a/borg/platform_base.py b/borg/platform_base.py new file mode 100644 index 000000000..a897f9f10 --- /dev/null +++ b/borg/platform_base.py @@ -0,0 +1,78 @@ +import os + +API_VERSION = 3 + +fdatasync = getattr(os, 'fdatasync', os.fsync) + + +def acl_get(path, item, st, numeric_owner=False): + """ + Saves ACL Entries + + If `numeric_owner` is True the user/group field is not preserved only uid/gid + """ + + +def acl_set(path, item, numeric_owner=False): + """ + Restore ACL Entries + + If `numeric_owner` is True the stored uid/gid is used instead + of the user/group names + """ + + +def sync_dir(path): + fd = os.open(path, os.O_RDONLY) + try: + os.fsync(fd) + finally: + os.close(fd) + + +class SyncFile: + """ + A file class that is supposed to enable write ordering (one way or another) and data durability after close(). + + The degree to which either is possible varies with operating system, file system and hardware. + + This fallback implements a naive and slow way of doing this. On some operating systems it can't actually + guarantee any of the above, since fsync() doesn't guarantee it. Furthermore it may not be possible at all + to satisfy the above guarantees on some hardware or operating systems. In these cases we hope that the thorough + checksumming implemented catches any corrupted data due to misordered, delayed or partial writes. + + Note that POSIX doesn't specify *anything* about power failures (or similar failures). A system that + routinely loses files or corrupts file on power loss is POSIX compliant. + + TODO: Use F_FULLSYNC on OSX. + TODO: A Windows implementation should use CreateFile with FILE_FLAG_WRITE_THROUGH. + """ + + def __init__(self, path): + self.fd = open(path, 'wb') + self.fileno = self.fd.fileno() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def write(self, data): + self.fd.write(data) + + def sync(self): + """ + Synchronize file contents. Everything written prior to sync() must become durable before anything written + after sync(). + """ + self.fd.flush() + fdatasync(self.fileno) + if hasattr(os, 'posix_fadvise'): + os.posix_fadvise(self.fileno, 0, 0, os.POSIX_FADV_DONTNEED) + + def close(self): + """sync() and close.""" + self.sync() + self.fd.close() + sync_dir(os.path.dirname(self.fd.name)) diff --git a/borg/platform_darwin.pyx b/borg/platform_darwin.pyx index edb41f715..4dc25b83a 100644 --- a/borg/platform_darwin.pyx +++ b/borg/platform_darwin.pyx @@ -1,7 +1,7 @@ import os from .helpers import user2uid, group2gid, safe_decode, safe_encode -API_VERSION = 2 +API_VERSION = 3 cdef extern from "sys/acl.h": ctypedef struct _acl_t: diff --git a/borg/platform_freebsd.pyx b/borg/platform_freebsd.pyx index 27d636263..ae69af68a 100644 --- a/borg/platform_freebsd.pyx +++ b/borg/platform_freebsd.pyx @@ -1,7 +1,7 @@ import os from .helpers import posix_acl_use_stored_uid_gid, safe_encode, safe_decode -API_VERSION = 2 +API_VERSION = 3 cdef extern from "errno.h": int errno diff --git a/borg/platform_linux.pyx b/borg/platform_linux.pyx index f9ed42415..854904f3f 100644 --- a/borg/platform_linux.pyx +++ b/borg/platform_linux.pyx @@ -1,13 +1,17 @@ import os import re +import resource from stat import S_ISLNK from .helpers import posix_acl_use_stored_uid_gid, user2uid, group2gid, safe_decode, safe_encode +from .platform_base import SyncFile as BaseSyncFile +from libc cimport errno -API_VERSION = 2 +API_VERSION = 3 cdef extern from "sys/types.h": int ACL_TYPE_ACCESS int ACL_TYPE_DEFAULT + ctypedef off64_t cdef extern from "sys/acl.h": ctypedef struct _acl_t: @@ -23,6 +27,12 @@ cdef extern from "sys/acl.h": cdef extern from "acl/libacl.h": int acl_extended_file(const char *path) +cdef extern from "fcntl.h": + int sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags) + unsigned int SYNC_FILE_RANGE_WRITE + unsigned int SYNC_FILE_RANGE_WAIT_BEFORE + unsigned int SYNC_FILE_RANGE_WAIT_AFTER + _comment_re = re.compile(' *#.*', re.M) @@ -77,10 +87,6 @@ cdef acl_numeric_ids(acl): def acl_get(path, item, st, numeric_owner=False): - """Saves ACL Entries - - If `numeric_owner` is True the user/group field is not preserved only uid/gid - """ cdef acl_t default_acl = NULL cdef acl_t access_acl = NULL cdef char *default_text = NULL @@ -112,11 +118,6 @@ def acl_get(path, item, st, numeric_owner=False): def acl_set(path, item, numeric_owner=False): - """Restore ACL Entries - - If `numeric_owner` is True the stored uid/gid is used instead - of the user/group names - """ cdef acl_t access_acl = NULL cdef acl_t default_acl = NULL @@ -141,3 +142,45 @@ def acl_set(path, item, numeric_owner=False): acl_set_file(p, ACL_TYPE_DEFAULT, default_acl) finally: acl_free(default_acl) + +cdef _sync_file_range(fd, offset, length, flags): + assert offset & PAGE_MASK == 0, "offset %d not page-aligned" % offset + assert length & PAGE_MASK == 0, "length %d not page-aligned" % length + if sync_file_range(fd, offset, length, flags) != 0: + raise OSError(errno, os.strerror(errno)) + os.posix_fadvise(fd, offset, length, os.POSIX_FADV_DONTNEED) + +cdef unsigned PAGE_MASK = resource.getpagesize() - 1 + + +class SyncFile(BaseSyncFile): + """ + Implemented using sync_file_range for asynchronous write-out and fdatasync for actual durability. + + "write-out" means that dirty pages (= data that was written) are submitted to an I/O queue and will be send to + disk in the immediate future. + """ + + def __init__(self, path): + super().__init__(path) + self.offset = 0 + self.write_window = (16 * 1024 ** 2) & ~PAGE_MASK + self.last_sync = 0 + self.pending_sync = None + + def write(self, data): + self.offset += self.fd.write(data) + offset = self.offset & ~PAGE_MASK + if offset >= self.last_sync + self.write_window: + self.fd.flush() + _sync_file_range(self.fileno, self.last_sync, offset - self.last_sync, SYNC_FILE_RANGE_WRITE) + if self.pending_sync is not None: + _sync_file_range(self.fileno, self.pending_sync, self.last_sync - self.pending_sync, + SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WAIT_AFTER) + self.pending_sync = self.last_sync + self.last_sync = offset + + def sync(self): + self.fd.flush() + os.fdatasync(self.fileno) + os.posix_fadvise(self.fileno, 0, 0, os.POSIX_FADV_DONTNEED) diff --git a/borg/repository.py b/borg/repository.py index 3f8d5d68d..52937c76f 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -17,6 +17,7 @@ from .helpers import Error, ErrorWithTraceback, IntegrityError, Location, Progre from .hashindex import NSIndex from .locking import UpgradableLock, LockError, LockErrorT from .lrucache import LRUCache +from .platform import SyncFile, sync_dir MAX_OBJECT_SIZE = 20 * 1024 * 1024 MAGIC = b'BORG_SEG' @@ -32,7 +33,7 @@ class Repository: On disk layout: dir/README dir/config - dir/data// + dir/data// dir/index.X dir/hints.X """ @@ -507,7 +508,7 @@ class LoggedIO: def __init__(self, path, limit, segments_per_dir, capacity=90): self.path = path self.fds = LRUCache(capacity, - dispose=lambda fd: fd.close()) + dispose=self.close_fd) self.segment = 0 self.limit = limit self.segments_per_dir = segments_per_dir @@ -519,6 +520,11 @@ class LoggedIO: self.fds.clear() self.fds = None # Just to make sure we're disabled + def close_fd(self, fd): + if hasattr(os, 'posix_fadvise'): # only on UNIX + os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED) + fd.close() + def segment_iterator(self, reverse=False): data_path = os.path.join(self.path, 'data') dirs = sorted((dir for dir in os.listdir(data_path) if dir.isdigit()), key=int, reverse=reverse) @@ -535,7 +541,7 @@ class LoggedIO: return None def get_segments_transaction_id(self): - """Verify that the transaction id is consistent with the index transaction id + """Return the last committed segment. """ for segment, filename in self.segment_iterator(reverse=True): if self.is_committed_segment(filename): @@ -578,7 +584,8 @@ class LoggedIO: dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir)) if not os.path.exists(dirname): os.mkdir(dirname) - self._write_fd = open(self.segment_filename(self.segment), 'ab') + sync_dir(os.path.join(self.path, 'data')) + self._write_fd = SyncFile(self.segment_filename(self.segment)) self._write_fd.write(MAGIC) self.offset = MAGIC_LEN return self._write_fd @@ -591,6 +598,13 @@ class LoggedIO: self.fds[segment] = fd return fd + def close_segment(self): + if self._write_fd: + self.segment += 1 + self.offset = 0 + self._write_fd.close() + self._write_fd = None + def delete_segment(self, segment): if segment in self.fds: del self.fds[segment] @@ -641,7 +655,7 @@ class LoggedIO: def read(self, segment, offset, id): if segment == self.segment and self._write_fd: - self._write_fd.flush() + self._write_fd.sync() fd = self.get_fd(segment) fd.seek(offset) header = fd.read(self.put_header_fmt.size) @@ -703,20 +717,8 @@ class LoggedIO: def write_commit(self): fd = self.get_write_fd(no_new=True) + fd.sync() header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT) crc = self.crc_fmt.pack(crc32(header) & 0xffffffff) fd.write(b''.join((crc, header))) self.close_segment() - - def close_segment(self): - if self._write_fd: - self.segment += 1 - self.offset = 0 - self._write_fd.flush() - os.fsync(self._write_fd.fileno()) - if hasattr(os, 'posix_fadvise'): # only on UNIX - # tell the OS that it does not need to cache what we just wrote, - # avoids spoiling the cache for the OS and other processes. - os.posix_fadvise(self._write_fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED) - self._write_fd.close() - self._write_fd = None diff --git a/docs/usage.rst b/docs/usage.rst index 600af4fea..7473e8597 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -170,6 +170,22 @@ Network: In case you are interested in more details, please read the internals documentation. +File systems +~~~~~~~~~~~~ + +We strongly recommend against using Borg (or any other database-like +software) on non-journaling file systems like FAT, since it is not +possible to assume any consistency in case of power failures (or a +sudden disconnect of an external drive or similar failures). + +While Borg uses a data store that is resilient against these failures +when used on journaling file systems, it is not possible to guarantee +this with some hardware -- independent of the software used. We don't +know a list of affected hardware. + +If you are suspicious whether your Borg repository is still consistent +and readable after one of the failures mentioned above occured, run +``borg check --verify-data`` to make sure it is consistent. Units ~~~~~