diff --git a/src/borg/_hashindex.c b/src/borg/_hashindex.c index 51290c5a1..289457fe5 100644 --- a/src/borg/_hashindex.c +++ b/src/borg/_hashindex.c @@ -1,3 +1,5 @@ +#include + #include #include #include @@ -54,6 +56,8 @@ typedef struct { int lower_limit; int upper_limit; int min_empty; + /* buckets may be backed by a Python buffer. If buckets_buffer.buf is NULL then this is not used. */ + Py_buffer buckets_buffer; } HashIndex; /* prime (or w/ big prime factors) hash table sizes @@ -102,8 +106,8 @@ static int hash_sizes[] = { #define EPRINTF(msg, ...) fprintf(stderr, "hashindex: " msg "(%s)\n", ##__VA_ARGS__, strerror(errno)) #define EPRINTF_PATH(path, msg, ...) fprintf(stderr, "hashindex: %s: " msg " (%s)\n", path, ##__VA_ARGS__, strerror(errno)) -static HashIndex *hashindex_read(const char *path); -static int hashindex_write(HashIndex *index, const char *path); +static HashIndex *hashindex_read(PyObject *file_py); +static void hashindex_write(HashIndex *index, PyObject *file_py); static HashIndex *hashindex_init(int capacity, int key_size, int value_size); static const void *hashindex_get(HashIndex *index, const void *key); static int hashindex_set(HashIndex *index, const void *key, const void *value); @@ -113,6 +117,16 @@ static void *hashindex_next_key(HashIndex *index, const void *key); /* Private API */ static void hashindex_free(HashIndex *index); +static void +hashindex_free_buckets(HashIndex *index) +{ + if(index->buckets_buffer.buf) { + PyBuffer_Release(&index->buckets_buffer); + } else { + free(index->buckets); + } +} + static int hashindex_index(HashIndex *index, const void *key) { @@ -171,7 +185,7 @@ hashindex_resize(HashIndex *index, int capacity) return 0; } } - free(index->buckets); + hashindex_free_buckets(index); index->buckets = new->buckets; index->num_buckets = new->num_buckets; index->num_empty = index->num_buckets - index->num_entries; @@ -248,99 +262,146 @@ count_empty(HashIndex *index) } /* Public API */ + static HashIndex * -hashindex_read(const char *path) +hashindex_read(PyObject *file_py) { - FILE *fd; - off_t length, buckets_length, bytes_read; - HashHeader header; + Py_ssize_t length, buckets_length, bytes_read; + Py_buffer header_buffer; + PyObject *header_bytes, *length_object, *bucket_bytes; + HashHeader *header; HashIndex *index = NULL; - if((fd = fopen(path, "rb")) == NULL) { - EPRINTF_PATH(path, "fopen for reading failed"); - return NULL; + header_bytes = PyObject_CallMethod(file_py, "read", "n", (Py_ssize_t)sizeof(HashHeader)); + if(!header_bytes) { + assert(PyErr_Occurred()); + goto fail; + } + + bytes_read = PyBytes_Size(header_bytes); + if(PyErr_Occurred()) { + /* TypeError, not a bytes() object */ + goto fail_decref_header; } - bytes_read = fread(&header, 1, sizeof(HashHeader), fd); if(bytes_read != sizeof(HashHeader)) { - if(ferror(fd)) { - EPRINTF_PATH(path, "fread header failed (expected %ju, got %ju)", - (uintmax_t) sizeof(HashHeader), (uintmax_t) bytes_read); - } - else { - EPRINTF_MSG_PATH(path, "fread header failed (expected %ju, got %ju)", - (uintmax_t) sizeof(HashHeader), (uintmax_t) bytes_read); - } - goto fail; + /* Truncated file */ + /* Note: %zd is the format for Py_ssize_t, %zu is for size_t */ + PyErr_Format(PyExc_ValueError, "Could not read header (expected %zu, but read %zd bytes)", + sizeof(HashHeader), bytes_read); + goto fail_decref_header; } - if(fseek(fd, 0, SEEK_END) < 0) { - EPRINTF_PATH(path, "fseek failed"); - goto fail; + + /* Find length of file */ + length_object = PyObject_CallMethod(file_py, "seek", "ni", (Py_ssize_t)0, SEEK_END); + if(PyErr_Occurred()) { + goto fail_decref_header; } - if((length = ftell(fd)) < 0) { - EPRINTF_PATH(path, "ftell failed"); - goto fail; + length = PyNumber_AsSsize_t(length_object, PyExc_OverflowError); + Py_DECREF(length_object); + if(PyErr_Occurred()) { + /* This shouldn't generally happen; but can if seek() returns something that's not a number */ + goto fail_decref_header; } - if(fseek(fd, sizeof(HashHeader), SEEK_SET) < 0) { - EPRINTF_PATH(path, "fseek failed"); - goto fail; - } - if(memcmp(header.magic, MAGIC, MAGIC_LEN)) { - EPRINTF_MSG_PATH(path, "Unknown MAGIC in header"); - goto fail; - } - buckets_length = (off_t)_le32toh(header.num_buckets) * (header.key_size + header.value_size); - if((size_t) length != sizeof(HashHeader) + buckets_length) { - EPRINTF_MSG_PATH(path, "Incorrect file length (expected %ju, got %ju)", - (uintmax_t) sizeof(HashHeader) + buckets_length, (uintmax_t) length); - goto fail; + + Py_XDECREF(PyObject_CallMethod(file_py, "seek", "ni", (Py_ssize_t)sizeof(HashHeader), SEEK_SET)); + if(PyErr_Occurred()) { + goto fail_decref_header; } + + /* Set up the in-memory header */ if(!(index = malloc(sizeof(HashIndex)))) { - EPRINTF_PATH(path, "malloc header failed"); - goto fail; + PyErr_NoMemory(); + goto fail_decref_header; } - if(!(index->buckets = malloc(buckets_length))) { - EPRINTF_PATH(path, "malloc buckets failed"); - free(index); - index = NULL; - goto fail; + + PyObject_GetBuffer(header_bytes, &header_buffer, PyBUF_SIMPLE); + if(PyErr_Occurred()) { + goto fail_free_index; } - bytes_read = fread(index->buckets, 1, buckets_length, fd); - if(bytes_read != buckets_length) { - if(ferror(fd)) { - EPRINTF_PATH(path, "fread buckets failed (expected %ju, got %ju)", - (uintmax_t) buckets_length, (uintmax_t) bytes_read); - } - else { - EPRINTF_MSG_PATH(path, "fread buckets failed (expected %ju, got %ju)", - (uintmax_t) buckets_length, (uintmax_t) bytes_read); - } - free(index->buckets); - free(index); - index = NULL; - goto fail; + + header = (HashHeader*) header_buffer.buf; + if(memcmp(header->magic, MAGIC, MAGIC_LEN)) { + PyErr_Format(PyExc_ValueError, "Unknown MAGIC in header"); + goto fail_release_header_buffer; } - index->num_entries = _le32toh(header.num_entries); - index->num_buckets = _le32toh(header.num_buckets); - index->key_size = header.key_size; - index->value_size = header.value_size; + + buckets_length = (Py_ssize_t)_le32toh(header->num_buckets) * (header->key_size + header->value_size); + if((Py_ssize_t)length != (Py_ssize_t)sizeof(HashHeader) + buckets_length) { + PyErr_Format(PyExc_ValueError, "Incorrect file length (expected %zd, got %zd)", + sizeof(HashHeader) + buckets_length, length); + goto fail_release_header_buffer; + } + + index->num_entries = _le32toh(header->num_entries); + index->num_buckets = _le32toh(header->num_buckets); + index->key_size = header->key_size; + index->value_size = header->value_size; index->bucket_size = index->key_size + index->value_size; index->lower_limit = get_lower_limit(index->num_buckets); index->upper_limit = get_upper_limit(index->num_buckets); + + /* + * For indices read from disk we don't malloc() the buckets ourselves, + * we have them backed by a Python bytes() object instead, and go through + * Python I/O. + * + * Note: Issuing read(buckets_length) is okay here, because buffered readers + * will issue multiple underlying reads if necessary. This supports indices + * >2 GB on Linux. We also compare lengths later. + */ + bucket_bytes = PyObject_CallMethod(file_py, "read", "n", buckets_length); + if(!bucket_bytes) { + assert(PyErr_Occurred()); + goto fail_release_header_buffer; + } + bytes_read = PyBytes_Size(bucket_bytes); + if(PyErr_Occurred()) { + /* TypeError, not a bytes() object */ + goto fail_decref_buckets; + } + if(bytes_read != buckets_length) { + PyErr_Format(PyExc_ValueError, "Could not read buckets (expected %zd, got %zd)", buckets_length, bytes_read); + goto fail_decref_buckets; + } + + PyObject_GetBuffer(bucket_bytes, &index->buckets_buffer, PyBUF_SIMPLE); + if(PyErr_Occurred()) { + goto fail_decref_buckets; + } + index->buckets = index->buckets_buffer.buf; + index->min_empty = get_min_empty(index->num_buckets); index->num_empty = count_empty(index); + if(index->num_empty < index->min_empty) { /* too many tombstones here / not enough empty buckets, do a same-size rebuild */ if(!hashindex_resize(index, index->num_buckets)) { - free(index->buckets); - free(index); - index = NULL; - goto fail; + PyErr_Format(PyExc_ValueError, "Failed to rebuild table"); + goto fail_free_buckets; } } -fail: - if(fclose(fd) < 0) { - EPRINTF_PATH(path, "fclose failed"); + + /* + * Clean intermediary objects up. Note that index is only freed if an error has occurred. + * Also note that the buffer in index->buckets_buffer holds a reference to buckets_bytes. + */ + +fail_free_buckets: + if(PyErr_Occurred()) { + hashindex_free_buckets(index); } +fail_decref_buckets: + Py_DECREF(bucket_bytes); +fail_release_header_buffer: + PyBuffer_Release(&header_buffer); +fail_free_index: + if(PyErr_Occurred()) { + free(index); + index = NULL; + } +fail_decref_header: + Py_DECREF(header_bytes); +fail: return index; } @@ -369,6 +430,7 @@ hashindex_init(int capacity, int key_size, int value_size) index->lower_limit = get_lower_limit(index->num_buckets); index->upper_limit = get_upper_limit(index->num_buckets); index->min_empty = get_min_empty(index->num_buckets); + index->buckets_buffer.buf = NULL; for(i = 0; i < capacity; i++) { BUCKET_MARK_EMPTY(index, i); } @@ -378,15 +440,17 @@ hashindex_init(int capacity, int key_size, int value_size) static void hashindex_free(HashIndex *index) { - free(index->buckets); + hashindex_free_buckets(index); free(index); } -static int -hashindex_write(HashIndex *index, const char *path) + +static void +hashindex_write(HashIndex *index, PyObject *file_py) { - off_t buckets_length = (off_t)index->num_buckets * index->bucket_size; - FILE *fd; + PyObject *length_object, *buckets_view; + Py_ssize_t length; + Py_ssize_t buckets_length = (Py_ssize_t)index->num_buckets * index->bucket_size; HashHeader header = { .magic = MAGIC, .num_entries = _htole32(index->num_entries), @@ -394,24 +458,41 @@ hashindex_write(HashIndex *index, const char *path) .key_size = index->key_size, .value_size = index->value_size }; - int ret = 1; - if((fd = fopen(path, "wb")) == NULL) { - EPRINTF_PATH(path, "fopen for writing failed"); - return 0; + length_object = PyObject_CallMethod(file_py, "write", "y#", &header, (int)sizeof(HashHeader)); + if(PyErr_Occurred()) { + return; } - if(fwrite(&header, 1, sizeof(header), fd) != sizeof(header)) { - EPRINTF_PATH(path, "fwrite header failed"); - ret = 0; + length = PyNumber_AsSsize_t(length_object, PyExc_OverflowError); + Py_DECREF(length_object); + if(PyErr_Occurred()) { + return; } - if(fwrite(index->buckets, 1, buckets_length, fd) != (size_t) buckets_length) { - EPRINTF_PATH(path, "fwrite buckets failed"); - ret = 0; + if(length != sizeof(HashHeader)) { + PyErr_SetString(PyExc_ValueError, "Failed to write header"); + return; } - if(fclose(fd) < 0) { - EPRINTF_PATH(path, "fclose failed"); + + /* Note: explicitly construct view; BuildValue can convert (pointer, length) to Python objects, but copies them for doing so */ + buckets_view = PyMemoryView_FromMemory((char*)index->buckets, buckets_length, PyBUF_READ); + if(!buckets_view) { + assert(PyErr_Occurred()); + return; + } + length_object = PyObject_CallMethod(file_py, "write", "O", buckets_view); + Py_DECREF(buckets_view); + if(PyErr_Occurred()) { + return; + } + length = PyNumber_AsSsize_t(length_object, PyExc_OverflowError); + Py_DECREF(length_object); + if(PyErr_Occurred()) { + return; + } + if(length != buckets_length) { + PyErr_SetString(PyExc_ValueError, "Failed to write buckets"); + return; } - return ret; } static const void * diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index d696ec3d8..fba8c7a38 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -15,12 +15,12 @@ cdef extern from "_hashindex.c": ctypedef struct HashIndex: pass - HashIndex *hashindex_read(char *path) + HashIndex *hashindex_read(object file_py) except * HashIndex *hashindex_init(int capacity, int key_size, int value_size) void hashindex_free(HashIndex *index) int hashindex_len(HashIndex *index) int hashindex_size(HashIndex *index) - int hashindex_write(HashIndex *index, char *path) + void hashindex_write(HashIndex *index, object file_py) except * void *hashindex_get(HashIndex *index, void *key) void *hashindex_next_key(HashIndex *index, void *key) int hashindex_delete(HashIndex *index, void *key) @@ -67,13 +67,9 @@ cdef class IndexBase: def __cinit__(self, capacity=0, path=None, key_size=32): self.key_size = key_size if path: - path = os.fsencode(path) - self.index = hashindex_read(path) - if not self.index: - if errno: - PyErr_SetFromErrnoWithFilename(OSError, path) - return - raise RuntimeError('hashindex_read failed') + with open(path, 'rb') as fd: + self.index = hashindex_read(fd) + assert self.index, 'hashindex_read() returned NULL with no exception set' else: self.index = hashindex_init(capacity, self.key_size, self.value_size) if not self.index: @@ -88,9 +84,8 @@ cdef class IndexBase: return cls(path=path) def write(self, path): - path = os.fsencode(path) - if not hashindex_write(self.index, path): - raise Exception('hashindex_write failed') + with open(path, 'wb') as fd: + hashindex_write(self.index, fd) def clear(self): hashindex_free(self.index) diff --git a/src/borg/repository.py b/src/borg/repository.py index d7a7f449c..b253d3f63 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -324,9 +324,8 @@ class Repository: index_path = os.path.join(self.path, 'index.%d' % transaction_id).encode('utf-8') try: return NSIndex.read(index_path) - except RuntimeError as error: - assert str(error) == 'hashindex_read failed' # everything else means we're in *deep* trouble - logger.warning('Repository index missing or corrupted, trying to recover') + except (ValueError, OSError) as exc: + logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc) os.unlink(index_path) if not auto_recover: raise @@ -357,7 +356,8 @@ class Repository: if not self.index or transaction_id is None: try: self.index = self.open_index(transaction_id, False) - except RuntimeError: + except (ValueError, OSError) as exc: + logger.warning('Checking repository transaction due to previous error: %s', exc) self.check_transaction() self.index = self.open_index(transaction_id, False) if transaction_id is None: