mirror of
https://github.com/borgbackup/borg.git
synced 2026-05-28 04:03:21 -04:00
Improve LoggedIO write performance, make commit mechanism more solid
- Instead of very small (5 MB-ish) segment files, use larger ones - Request asynchronous write-out or write-through (TODO) where it is supported, to achieve a continuously high throughput for writes - Instead of depending on ordered writes (write data, commit tag, sync) for consistency, do a double-sync commit as more serious RDBMS also do i.e. write data, sync, write commit tag, sync Since commits are very expensive in Borg at the moment this makes no difference performance-wise. New platform APIs: SyncFile, sync_dir [x] Naive implementation (equivalent to what Borg did before) [x] Linux implementation [ ] Windows implementation [-] OSX implementation (F_FULLSYNC)
This commit is contained in:
parent
3bc22061f7
commit
c52861e0ca
9 changed files with 181 additions and 42 deletions
|
|
@ -12,8 +12,14 @@ UMASK_DEFAULT = 0o077
|
|||
CACHE_TAG_NAME = 'CACHEDIR.TAG'
|
||||
CACHE_TAG_CONTENTS = b'Signature: 8a477f597d28d172789f06886806bc55'
|
||||
|
||||
DEFAULT_MAX_SEGMENT_SIZE = 5 * 1024 * 1024
|
||||
DEFAULT_SEGMENTS_PER_DIR = 10000
|
||||
# A large, but not unreasonably large segment size. Always less than 2 GiB (for legacy file systems). We choose
|
||||
# 500 MiB which means that no indirection from the inode is needed for typical Linux file systems.
|
||||
# Note that this is a soft-limit and can be exceeded (worst case) by a full maximum chunk size and some metadata
|
||||
# bytes. That's why it's 500 MiB instead of 512 MiB.
|
||||
DEFAULT_MAX_SEGMENT_SIZE = 500 * 1024 * 1024
|
||||
|
||||
# A few hundred files per directory to go easy on filesystems which don't like too many files per dir (NTFS)
|
||||
DEFAULT_SEGMENTS_PER_DIR = 500
|
||||
|
||||
CHUNK_MIN_EXP = 19 # 2**19 == 512kiB
|
||||
CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ def check_extension_modules():
|
|||
raise ExtensionModuleError
|
||||
if crypto.API_VERSION != 3:
|
||||
raise ExtensionModuleError
|
||||
if platform.API_VERSION != 2:
|
||||
if platform.API_VERSION != 3:
|
||||
raise ExtensionModuleError
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,16 +1,10 @@
|
|||
import sys
|
||||
|
||||
from .platform_base import acl_get, acl_set, SyncFile, sync_dir, API_VERSION
|
||||
|
||||
if sys.platform.startswith('linux'): # pragma: linux only
|
||||
from .platform_linux import acl_get, acl_set, API_VERSION
|
||||
from .platform_linux import acl_get, acl_set, SyncFile, API_VERSION
|
||||
elif sys.platform.startswith('freebsd'): # pragma: freebsd only
|
||||
from .platform_freebsd import acl_get, acl_set, API_VERSION
|
||||
elif sys.platform == 'darwin': # pragma: darwin only
|
||||
from .platform_darwin import acl_get, acl_set, API_VERSION
|
||||
else: # pragma: unknown platform only
|
||||
API_VERSION = 2
|
||||
|
||||
def acl_get(path, item, st, numeric_owner=False):
|
||||
pass
|
||||
|
||||
def acl_set(path, item, numeric_owner=False):
|
||||
pass
|
||||
|
|
|
|||
78
borg/platform_base.py
Normal file
78
borg/platform_base.py
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
import os
|
||||
|
||||
API_VERSION = 3
|
||||
|
||||
fdatasync = getattr(os, 'fdatasync', os.fsync)
|
||||
|
||||
|
||||
def acl_get(path, item, st, numeric_owner=False):
|
||||
"""
|
||||
Saves ACL Entries
|
||||
|
||||
If `numeric_owner` is True the user/group field is not preserved only uid/gid
|
||||
"""
|
||||
|
||||
|
||||
def acl_set(path, item, numeric_owner=False):
|
||||
"""
|
||||
Restore ACL Entries
|
||||
|
||||
If `numeric_owner` is True the stored uid/gid is used instead
|
||||
of the user/group names
|
||||
"""
|
||||
|
||||
|
||||
def sync_dir(path):
|
||||
fd = os.open(path, os.O_RDONLY)
|
||||
try:
|
||||
os.fsync(fd)
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
|
||||
class SyncFile:
|
||||
"""
|
||||
A file class that is supposed to enable write ordering (one way or another) and data durability after close().
|
||||
|
||||
The degree to which either is possible varies with operating system, file system and hardware.
|
||||
|
||||
This fallback implements a naive and slow way of doing this. On some operating systems it can't actually
|
||||
guarantee any of the above, since fsync() doesn't guarantee it. Furthermore it may not be possible at all
|
||||
to satisfy the above guarantees on some hardware or operating systems. In these cases we hope that the thorough
|
||||
checksumming implemented catches any corrupted data due to misordered, delayed or partial writes.
|
||||
|
||||
Note that POSIX doesn't specify *anything* about power failures (or similar failures). A system that
|
||||
routinely loses files or corrupts file on power loss is POSIX compliant.
|
||||
|
||||
TODO: Use F_FULLSYNC on OSX.
|
||||
TODO: A Windows implementation should use CreateFile with FILE_FLAG_WRITE_THROUGH.
|
||||
"""
|
||||
|
||||
def __init__(self, path):
|
||||
self.fd = open(path, 'wb')
|
||||
self.fileno = self.fd.fileno()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
|
||||
def write(self, data):
|
||||
self.fd.write(data)
|
||||
|
||||
def sync(self):
|
||||
"""
|
||||
Synchronize file contents. Everything written prior to sync() must become durable before anything written
|
||||
after sync().
|
||||
"""
|
||||
self.fd.flush()
|
||||
fdatasync(self.fileno)
|
||||
if hasattr(os, 'posix_fadvise'):
|
||||
os.posix_fadvise(self.fileno, 0, 0, os.POSIX_FADV_DONTNEED)
|
||||
|
||||
def close(self):
|
||||
"""sync() and close."""
|
||||
self.sync()
|
||||
self.fd.close()
|
||||
sync_dir(os.path.dirname(self.fd.name))
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
import os
|
||||
from .helpers import user2uid, group2gid, safe_decode, safe_encode
|
||||
|
||||
API_VERSION = 2
|
||||
API_VERSION = 3
|
||||
|
||||
cdef extern from "sys/acl.h":
|
||||
ctypedef struct _acl_t:
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import os
|
||||
from .helpers import posix_acl_use_stored_uid_gid, safe_encode, safe_decode
|
||||
|
||||
API_VERSION = 2
|
||||
API_VERSION = 3
|
||||
|
||||
cdef extern from "errno.h":
|
||||
int errno
|
||||
|
|
|
|||
|
|
@ -1,13 +1,17 @@
|
|||
import os
|
||||
import re
|
||||
import resource
|
||||
from stat import S_ISLNK
|
||||
from .helpers import posix_acl_use_stored_uid_gid, user2uid, group2gid, safe_decode, safe_encode
|
||||
from .platform_base import SyncFile as BaseSyncFile
|
||||
from libc cimport errno
|
||||
|
||||
API_VERSION = 2
|
||||
API_VERSION = 3
|
||||
|
||||
cdef extern from "sys/types.h":
|
||||
int ACL_TYPE_ACCESS
|
||||
int ACL_TYPE_DEFAULT
|
||||
ctypedef off64_t
|
||||
|
||||
cdef extern from "sys/acl.h":
|
||||
ctypedef struct _acl_t:
|
||||
|
|
@ -23,6 +27,12 @@ cdef extern from "sys/acl.h":
|
|||
cdef extern from "acl/libacl.h":
|
||||
int acl_extended_file(const char *path)
|
||||
|
||||
cdef extern from "fcntl.h":
|
||||
int sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags)
|
||||
unsigned int SYNC_FILE_RANGE_WRITE
|
||||
unsigned int SYNC_FILE_RANGE_WAIT_BEFORE
|
||||
unsigned int SYNC_FILE_RANGE_WAIT_AFTER
|
||||
|
||||
|
||||
_comment_re = re.compile(' *#.*', re.M)
|
||||
|
||||
|
|
@ -77,10 +87,6 @@ cdef acl_numeric_ids(acl):
|
|||
|
||||
|
||||
def acl_get(path, item, st, numeric_owner=False):
|
||||
"""Saves ACL Entries
|
||||
|
||||
If `numeric_owner` is True the user/group field is not preserved only uid/gid
|
||||
"""
|
||||
cdef acl_t default_acl = NULL
|
||||
cdef acl_t access_acl = NULL
|
||||
cdef char *default_text = NULL
|
||||
|
|
@ -112,11 +118,6 @@ def acl_get(path, item, st, numeric_owner=False):
|
|||
|
||||
|
||||
def acl_set(path, item, numeric_owner=False):
|
||||
"""Restore ACL Entries
|
||||
|
||||
If `numeric_owner` is True the stored uid/gid is used instead
|
||||
of the user/group names
|
||||
"""
|
||||
cdef acl_t access_acl = NULL
|
||||
cdef acl_t default_acl = NULL
|
||||
|
||||
|
|
@ -141,3 +142,45 @@ def acl_set(path, item, numeric_owner=False):
|
|||
acl_set_file(p, ACL_TYPE_DEFAULT, default_acl)
|
||||
finally:
|
||||
acl_free(default_acl)
|
||||
|
||||
cdef _sync_file_range(fd, offset, length, flags):
|
||||
assert offset & PAGE_MASK == 0, "offset %d not page-aligned" % offset
|
||||
assert length & PAGE_MASK == 0, "length %d not page-aligned" % length
|
||||
if sync_file_range(fd, offset, length, flags) != 0:
|
||||
raise OSError(errno, os.strerror(errno))
|
||||
os.posix_fadvise(fd, offset, length, os.POSIX_FADV_DONTNEED)
|
||||
|
||||
cdef unsigned PAGE_MASK = resource.getpagesize() - 1
|
||||
|
||||
|
||||
class SyncFile(BaseSyncFile):
|
||||
"""
|
||||
Implemented using sync_file_range for asynchronous write-out and fdatasync for actual durability.
|
||||
|
||||
"write-out" means that dirty pages (= data that was written) are submitted to an I/O queue and will be send to
|
||||
disk in the immediate future.
|
||||
"""
|
||||
|
||||
def __init__(self, path):
|
||||
super().__init__(path)
|
||||
self.offset = 0
|
||||
self.write_window = (16 * 1024 ** 2) & ~PAGE_MASK
|
||||
self.last_sync = 0
|
||||
self.pending_sync = None
|
||||
|
||||
def write(self, data):
|
||||
self.offset += self.fd.write(data)
|
||||
offset = self.offset & ~PAGE_MASK
|
||||
if offset >= self.last_sync + self.write_window:
|
||||
self.fd.flush()
|
||||
_sync_file_range(self.fileno, self.last_sync, offset - self.last_sync, SYNC_FILE_RANGE_WRITE)
|
||||
if self.pending_sync is not None:
|
||||
_sync_file_range(self.fileno, self.pending_sync, self.last_sync - self.pending_sync,
|
||||
SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WAIT_AFTER)
|
||||
self.pending_sync = self.last_sync
|
||||
self.last_sync = offset
|
||||
|
||||
def sync(self):
|
||||
self.fd.flush()
|
||||
os.fdatasync(self.fileno)
|
||||
os.posix_fadvise(self.fileno, 0, 0, os.POSIX_FADV_DONTNEED)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ from .helpers import Error, ErrorWithTraceback, IntegrityError, Location, Progre
|
|||
from .hashindex import NSIndex
|
||||
from .locking import UpgradableLock, LockError, LockErrorT
|
||||
from .lrucache import LRUCache
|
||||
from .platform import SyncFile, sync_dir
|
||||
|
||||
MAX_OBJECT_SIZE = 20 * 1024 * 1024
|
||||
MAGIC = b'BORG_SEG'
|
||||
|
|
@ -32,7 +33,7 @@ class Repository:
|
|||
On disk layout:
|
||||
dir/README
|
||||
dir/config
|
||||
dir/data/<X / SEGMENTS_PER_DIR>/<X>
|
||||
dir/data/<X // SEGMENTS_PER_DIR>/<X>
|
||||
dir/index.X
|
||||
dir/hints.X
|
||||
"""
|
||||
|
|
@ -507,7 +508,7 @@ class LoggedIO:
|
|||
def __init__(self, path, limit, segments_per_dir, capacity=90):
|
||||
self.path = path
|
||||
self.fds = LRUCache(capacity,
|
||||
dispose=lambda fd: fd.close())
|
||||
dispose=self.close_fd)
|
||||
self.segment = 0
|
||||
self.limit = limit
|
||||
self.segments_per_dir = segments_per_dir
|
||||
|
|
@ -519,6 +520,11 @@ class LoggedIO:
|
|||
self.fds.clear()
|
||||
self.fds = None # Just to make sure we're disabled
|
||||
|
||||
def close_fd(self, fd):
|
||||
if hasattr(os, 'posix_fadvise'): # only on UNIX
|
||||
os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
|
||||
fd.close()
|
||||
|
||||
def segment_iterator(self, reverse=False):
|
||||
data_path = os.path.join(self.path, 'data')
|
||||
dirs = sorted((dir for dir in os.listdir(data_path) if dir.isdigit()), key=int, reverse=reverse)
|
||||
|
|
@ -535,7 +541,7 @@ class LoggedIO:
|
|||
return None
|
||||
|
||||
def get_segments_transaction_id(self):
|
||||
"""Verify that the transaction id is consistent with the index transaction id
|
||||
"""Return the last committed segment.
|
||||
"""
|
||||
for segment, filename in self.segment_iterator(reverse=True):
|
||||
if self.is_committed_segment(filename):
|
||||
|
|
@ -578,7 +584,8 @@ class LoggedIO:
|
|||
dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
|
||||
if not os.path.exists(dirname):
|
||||
os.mkdir(dirname)
|
||||
self._write_fd = open(self.segment_filename(self.segment), 'ab')
|
||||
sync_dir(os.path.join(self.path, 'data'))
|
||||
self._write_fd = SyncFile(self.segment_filename(self.segment))
|
||||
self._write_fd.write(MAGIC)
|
||||
self.offset = MAGIC_LEN
|
||||
return self._write_fd
|
||||
|
|
@ -591,6 +598,13 @@ class LoggedIO:
|
|||
self.fds[segment] = fd
|
||||
return fd
|
||||
|
||||
def close_segment(self):
|
||||
if self._write_fd:
|
||||
self.segment += 1
|
||||
self.offset = 0
|
||||
self._write_fd.close()
|
||||
self._write_fd = None
|
||||
|
||||
def delete_segment(self, segment):
|
||||
if segment in self.fds:
|
||||
del self.fds[segment]
|
||||
|
|
@ -641,7 +655,7 @@ class LoggedIO:
|
|||
|
||||
def read(self, segment, offset, id):
|
||||
if segment == self.segment and self._write_fd:
|
||||
self._write_fd.flush()
|
||||
self._write_fd.sync()
|
||||
fd = self.get_fd(segment)
|
||||
fd.seek(offset)
|
||||
header = fd.read(self.put_header_fmt.size)
|
||||
|
|
@ -703,20 +717,8 @@ class LoggedIO:
|
|||
|
||||
def write_commit(self):
|
||||
fd = self.get_write_fd(no_new=True)
|
||||
fd.sync()
|
||||
header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
|
||||
crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
|
||||
fd.write(b''.join((crc, header)))
|
||||
self.close_segment()
|
||||
|
||||
def close_segment(self):
|
||||
if self._write_fd:
|
||||
self.segment += 1
|
||||
self.offset = 0
|
||||
self._write_fd.flush()
|
||||
os.fsync(self._write_fd.fileno())
|
||||
if hasattr(os, 'posix_fadvise'): # only on UNIX
|
||||
# tell the OS that it does not need to cache what we just wrote,
|
||||
# avoids spoiling the cache for the OS and other processes.
|
||||
os.posix_fadvise(self._write_fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
|
||||
self._write_fd.close()
|
||||
self._write_fd = None
|
||||
|
|
|
|||
|
|
@ -170,6 +170,22 @@ Network:
|
|||
|
||||
In case you are interested in more details, please read the internals documentation.
|
||||
|
||||
File systems
|
||||
~~~~~~~~~~~~
|
||||
|
||||
We strongly recommend against using Borg (or any other database-like
|
||||
software) on non-journaling file systems like FAT, since it is not
|
||||
possible to assume any consistency in case of power failures (or a
|
||||
sudden disconnect of an external drive or similar failures).
|
||||
|
||||
While Borg uses a data store that is resilient against these failures
|
||||
when used on journaling file systems, it is not possible to guarantee
|
||||
this with some hardware -- independent of the software used. We don't
|
||||
know a list of affected hardware.
|
||||
|
||||
If you are suspicious whether your Borg repository is still consistent
|
||||
and readable after one of the failures mentioned above occured, run
|
||||
``borg check --verify-data`` to make sure it is consistent.
|
||||
|
||||
Units
|
||||
~~~~~
|
||||
|
|
|
|||
Loading…
Reference in a new issue