hashindex: Use Python I/O (#2496)

- Preparation for #1688 / #1101
- Support hash indices >2 GB
- Better error reporting
This commit is contained in:
enkore 2017-05-09 21:30:14 +02:00 committed by GitHub
parent c805adc267
commit 6cd7d415ca
3 changed files with 181 additions and 105 deletions

View file

@ -1,3 +1,5 @@
#include <Python.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
@ -54,6 +56,8 @@ typedef struct {
int lower_limit;
int upper_limit;
int min_empty;
/* buckets may be backed by a Python buffer. If buckets_buffer.buf is NULL then this is not used. */
Py_buffer buckets_buffer;
} HashIndex;
/* prime (or w/ big prime factors) hash table sizes
@ -102,8 +106,8 @@ static int hash_sizes[] = {
#define EPRINTF(msg, ...) fprintf(stderr, "hashindex: " msg "(%s)\n", ##__VA_ARGS__, strerror(errno))
#define EPRINTF_PATH(path, msg, ...) fprintf(stderr, "hashindex: %s: " msg " (%s)\n", path, ##__VA_ARGS__, strerror(errno))
static HashIndex *hashindex_read(const char *path);
static int hashindex_write(HashIndex *index, const char *path);
static HashIndex *hashindex_read(PyObject *file_py);
static void hashindex_write(HashIndex *index, PyObject *file_py);
static HashIndex *hashindex_init(int capacity, int key_size, int value_size);
static const void *hashindex_get(HashIndex *index, const void *key);
static int hashindex_set(HashIndex *index, const void *key, const void *value);
@ -113,6 +117,16 @@ static void *hashindex_next_key(HashIndex *index, const void *key);
/* Private API */
static void hashindex_free(HashIndex *index);
static void
hashindex_free_buckets(HashIndex *index)
{
if(index->buckets_buffer.buf) {
PyBuffer_Release(&index->buckets_buffer);
} else {
free(index->buckets);
}
}
static int
hashindex_index(HashIndex *index, const void *key)
{
@ -171,7 +185,7 @@ hashindex_resize(HashIndex *index, int capacity)
return 0;
}
}
free(index->buckets);
hashindex_free_buckets(index);
index->buckets = new->buckets;
index->num_buckets = new->num_buckets;
index->num_empty = index->num_buckets - index->num_entries;
@ -248,99 +262,146 @@ count_empty(HashIndex *index)
}
/* Public API */
static HashIndex *
hashindex_read(const char *path)
hashindex_read(PyObject *file_py)
{
FILE *fd;
off_t length, buckets_length, bytes_read;
HashHeader header;
Py_ssize_t length, buckets_length, bytes_read;
Py_buffer header_buffer;
PyObject *header_bytes, *length_object, *bucket_bytes;
HashHeader *header;
HashIndex *index = NULL;
if((fd = fopen(path, "rb")) == NULL) {
EPRINTF_PATH(path, "fopen for reading failed");
return NULL;
header_bytes = PyObject_CallMethod(file_py, "read", "n", (Py_ssize_t)sizeof(HashHeader));
if(!header_bytes) {
assert(PyErr_Occurred());
goto fail;
}
bytes_read = PyBytes_Size(header_bytes);
if(PyErr_Occurred()) {
/* TypeError, not a bytes() object */
goto fail_decref_header;
}
bytes_read = fread(&header, 1, sizeof(HashHeader), fd);
if(bytes_read != sizeof(HashHeader)) {
if(ferror(fd)) {
EPRINTF_PATH(path, "fread header failed (expected %ju, got %ju)",
(uintmax_t) sizeof(HashHeader), (uintmax_t) bytes_read);
}
else {
EPRINTF_MSG_PATH(path, "fread header failed (expected %ju, got %ju)",
(uintmax_t) sizeof(HashHeader), (uintmax_t) bytes_read);
}
goto fail;
/* Truncated file */
/* Note: %zd is the format for Py_ssize_t, %zu is for size_t */
PyErr_Format(PyExc_ValueError, "Could not read header (expected %zu, but read %zd bytes)",
sizeof(HashHeader), bytes_read);
goto fail_decref_header;
}
if(fseek(fd, 0, SEEK_END) < 0) {
EPRINTF_PATH(path, "fseek failed");
goto fail;
/* Find length of file */
length_object = PyObject_CallMethod(file_py, "seek", "ni", (Py_ssize_t)0, SEEK_END);
if(PyErr_Occurred()) {
goto fail_decref_header;
}
if((length = ftell(fd)) < 0) {
EPRINTF_PATH(path, "ftell failed");
goto fail;
length = PyNumber_AsSsize_t(length_object, PyExc_OverflowError);
Py_DECREF(length_object);
if(PyErr_Occurred()) {
/* This shouldn't generally happen; but can if seek() returns something that's not a number */
goto fail_decref_header;
}
if(fseek(fd, sizeof(HashHeader), SEEK_SET) < 0) {
EPRINTF_PATH(path, "fseek failed");
goto fail;
}
if(memcmp(header.magic, MAGIC, MAGIC_LEN)) {
EPRINTF_MSG_PATH(path, "Unknown MAGIC in header");
goto fail;
}
buckets_length = (off_t)_le32toh(header.num_buckets) * (header.key_size + header.value_size);
if((size_t) length != sizeof(HashHeader) + buckets_length) {
EPRINTF_MSG_PATH(path, "Incorrect file length (expected %ju, got %ju)",
(uintmax_t) sizeof(HashHeader) + buckets_length, (uintmax_t) length);
goto fail;
Py_XDECREF(PyObject_CallMethod(file_py, "seek", "ni", (Py_ssize_t)sizeof(HashHeader), SEEK_SET));
if(PyErr_Occurred()) {
goto fail_decref_header;
}
/* Set up the in-memory header */
if(!(index = malloc(sizeof(HashIndex)))) {
EPRINTF_PATH(path, "malloc header failed");
goto fail;
PyErr_NoMemory();
goto fail_decref_header;
}
if(!(index->buckets = malloc(buckets_length))) {
EPRINTF_PATH(path, "malloc buckets failed");
free(index);
index = NULL;
goto fail;
PyObject_GetBuffer(header_bytes, &header_buffer, PyBUF_SIMPLE);
if(PyErr_Occurred()) {
goto fail_free_index;
}
bytes_read = fread(index->buckets, 1, buckets_length, fd);
if(bytes_read != buckets_length) {
if(ferror(fd)) {
EPRINTF_PATH(path, "fread buckets failed (expected %ju, got %ju)",
(uintmax_t) buckets_length, (uintmax_t) bytes_read);
}
else {
EPRINTF_MSG_PATH(path, "fread buckets failed (expected %ju, got %ju)",
(uintmax_t) buckets_length, (uintmax_t) bytes_read);
}
free(index->buckets);
free(index);
index = NULL;
goto fail;
header = (HashHeader*) header_buffer.buf;
if(memcmp(header->magic, MAGIC, MAGIC_LEN)) {
PyErr_Format(PyExc_ValueError, "Unknown MAGIC in header");
goto fail_release_header_buffer;
}
index->num_entries = _le32toh(header.num_entries);
index->num_buckets = _le32toh(header.num_buckets);
index->key_size = header.key_size;
index->value_size = header.value_size;
buckets_length = (Py_ssize_t)_le32toh(header->num_buckets) * (header->key_size + header->value_size);
if((Py_ssize_t)length != (Py_ssize_t)sizeof(HashHeader) + buckets_length) {
PyErr_Format(PyExc_ValueError, "Incorrect file length (expected %zd, got %zd)",
sizeof(HashHeader) + buckets_length, length);
goto fail_release_header_buffer;
}
index->num_entries = _le32toh(header->num_entries);
index->num_buckets = _le32toh(header->num_buckets);
index->key_size = header->key_size;
index->value_size = header->value_size;
index->bucket_size = index->key_size + index->value_size;
index->lower_limit = get_lower_limit(index->num_buckets);
index->upper_limit = get_upper_limit(index->num_buckets);
/*
* For indices read from disk we don't malloc() the buckets ourselves,
* we have them backed by a Python bytes() object instead, and go through
* Python I/O.
*
* Note: Issuing read(buckets_length) is okay here, because buffered readers
* will issue multiple underlying reads if necessary. This supports indices
* >2 GB on Linux. We also compare lengths later.
*/
bucket_bytes = PyObject_CallMethod(file_py, "read", "n", buckets_length);
if(!bucket_bytes) {
assert(PyErr_Occurred());
goto fail_release_header_buffer;
}
bytes_read = PyBytes_Size(bucket_bytes);
if(PyErr_Occurred()) {
/* TypeError, not a bytes() object */
goto fail_decref_buckets;
}
if(bytes_read != buckets_length) {
PyErr_Format(PyExc_ValueError, "Could not read buckets (expected %zd, got %zd)", buckets_length, bytes_read);
goto fail_decref_buckets;
}
PyObject_GetBuffer(bucket_bytes, &index->buckets_buffer, PyBUF_SIMPLE);
if(PyErr_Occurred()) {
goto fail_decref_buckets;
}
index->buckets = index->buckets_buffer.buf;
index->min_empty = get_min_empty(index->num_buckets);
index->num_empty = count_empty(index);
if(index->num_empty < index->min_empty) {
/* too many tombstones here / not enough empty buckets, do a same-size rebuild */
if(!hashindex_resize(index, index->num_buckets)) {
free(index->buckets);
free(index);
index = NULL;
goto fail;
PyErr_Format(PyExc_ValueError, "Failed to rebuild table");
goto fail_free_buckets;
}
}
fail:
if(fclose(fd) < 0) {
EPRINTF_PATH(path, "fclose failed");
/*
* Clean intermediary objects up. Note that index is only freed if an error has occurred.
* Also note that the buffer in index->buckets_buffer holds a reference to buckets_bytes.
*/
fail_free_buckets:
if(PyErr_Occurred()) {
hashindex_free_buckets(index);
}
fail_decref_buckets:
Py_DECREF(bucket_bytes);
fail_release_header_buffer:
PyBuffer_Release(&header_buffer);
fail_free_index:
if(PyErr_Occurred()) {
free(index);
index = NULL;
}
fail_decref_header:
Py_DECREF(header_bytes);
fail:
return index;
}
@ -369,6 +430,7 @@ hashindex_init(int capacity, int key_size, int value_size)
index->lower_limit = get_lower_limit(index->num_buckets);
index->upper_limit = get_upper_limit(index->num_buckets);
index->min_empty = get_min_empty(index->num_buckets);
index->buckets_buffer.buf = NULL;
for(i = 0; i < capacity; i++) {
BUCKET_MARK_EMPTY(index, i);
}
@ -378,15 +440,17 @@ hashindex_init(int capacity, int key_size, int value_size)
static void
hashindex_free(HashIndex *index)
{
free(index->buckets);
hashindex_free_buckets(index);
free(index);
}
static int
hashindex_write(HashIndex *index, const char *path)
static void
hashindex_write(HashIndex *index, PyObject *file_py)
{
off_t buckets_length = (off_t)index->num_buckets * index->bucket_size;
FILE *fd;
PyObject *length_object, *buckets_view;
Py_ssize_t length;
Py_ssize_t buckets_length = (Py_ssize_t)index->num_buckets * index->bucket_size;
HashHeader header = {
.magic = MAGIC,
.num_entries = _htole32(index->num_entries),
@ -394,24 +458,41 @@ hashindex_write(HashIndex *index, const char *path)
.key_size = index->key_size,
.value_size = index->value_size
};
int ret = 1;
if((fd = fopen(path, "wb")) == NULL) {
EPRINTF_PATH(path, "fopen for writing failed");
return 0;
length_object = PyObject_CallMethod(file_py, "write", "y#", &header, (int)sizeof(HashHeader));
if(PyErr_Occurred()) {
return;
}
if(fwrite(&header, 1, sizeof(header), fd) != sizeof(header)) {
EPRINTF_PATH(path, "fwrite header failed");
ret = 0;
length = PyNumber_AsSsize_t(length_object, PyExc_OverflowError);
Py_DECREF(length_object);
if(PyErr_Occurred()) {
return;
}
if(fwrite(index->buckets, 1, buckets_length, fd) != (size_t) buckets_length) {
EPRINTF_PATH(path, "fwrite buckets failed");
ret = 0;
if(length != sizeof(HashHeader)) {
PyErr_SetString(PyExc_ValueError, "Failed to write header");
return;
}
if(fclose(fd) < 0) {
EPRINTF_PATH(path, "fclose failed");
/* Note: explicitly construct view; BuildValue can convert (pointer, length) to Python objects, but copies them for doing so */
buckets_view = PyMemoryView_FromMemory((char*)index->buckets, buckets_length, PyBUF_READ);
if(!buckets_view) {
assert(PyErr_Occurred());
return;
}
length_object = PyObject_CallMethod(file_py, "write", "O", buckets_view);
Py_DECREF(buckets_view);
if(PyErr_Occurred()) {
return;
}
length = PyNumber_AsSsize_t(length_object, PyExc_OverflowError);
Py_DECREF(length_object);
if(PyErr_Occurred()) {
return;
}
if(length != buckets_length) {
PyErr_SetString(PyExc_ValueError, "Failed to write buckets");
return;
}
return ret;
}
static const void *

View file

@ -15,12 +15,12 @@ cdef extern from "_hashindex.c":
ctypedef struct HashIndex:
pass
HashIndex *hashindex_read(char *path)
HashIndex *hashindex_read(object file_py) except *
HashIndex *hashindex_init(int capacity, int key_size, int value_size)
void hashindex_free(HashIndex *index)
int hashindex_len(HashIndex *index)
int hashindex_size(HashIndex *index)
int hashindex_write(HashIndex *index, char *path)
void hashindex_write(HashIndex *index, object file_py) except *
void *hashindex_get(HashIndex *index, void *key)
void *hashindex_next_key(HashIndex *index, void *key)
int hashindex_delete(HashIndex *index, void *key)
@ -67,13 +67,9 @@ cdef class IndexBase:
def __cinit__(self, capacity=0, path=None, key_size=32):
self.key_size = key_size
if path:
path = os.fsencode(path)
self.index = hashindex_read(path)
if not self.index:
if errno:
PyErr_SetFromErrnoWithFilename(OSError, path)
return
raise RuntimeError('hashindex_read failed')
with open(path, 'rb') as fd:
self.index = hashindex_read(fd)
assert self.index, 'hashindex_read() returned NULL with no exception set'
else:
self.index = hashindex_init(capacity, self.key_size, self.value_size)
if not self.index:
@ -88,9 +84,8 @@ cdef class IndexBase:
return cls(path=path)
def write(self, path):
path = os.fsencode(path)
if not hashindex_write(self.index, path):
raise Exception('hashindex_write failed')
with open(path, 'wb') as fd:
hashindex_write(self.index, fd)
def clear(self):
hashindex_free(self.index)

View file

@ -324,9 +324,8 @@ class Repository:
index_path = os.path.join(self.path, 'index.%d' % transaction_id).encode('utf-8')
try:
return NSIndex.read(index_path)
except RuntimeError as error:
assert str(error) == 'hashindex_read failed' # everything else means we're in *deep* trouble
logger.warning('Repository index missing or corrupted, trying to recover')
except (ValueError, OSError) as exc:
logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc)
os.unlink(index_path)
if not auto_recover:
raise
@ -357,7 +356,8 @@ class Repository:
if not self.index or transaction_id is None:
try:
self.index = self.open_index(transaction_id, False)
except RuntimeError:
except (ValueError, OSError) as exc:
logger.warning('Checking repository transaction due to previous error: %s', exc)
self.check_transaction()
self.index = self.open_index(transaction_id, False)
if transaction_id is None: