diff --git a/darc/_hashindex.c b/darc/_hashindex.c new file mode 100644 index 000000000..08962e8fe --- /dev/null +++ b/darc/_hashindex.c @@ -0,0 +1,243 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "hashindex.h" + +typedef struct { + char magic[8]; + int32_t num_entries; + int32_t num_buckets; + int8_t key_size; + int8_t value_size; +} __attribute__((__packed__)) HashHeader; + + +#define MAGIC "DARCHASH" +#define EMPTY ((int32_t)-1) +#define DELETED ((int32_t)-2) +#define BUCKET_ADDR(index, idx) (index->buckets + (idx * index->bucket_size)) + +#define BUCKET_IS_DELETED(index, idx) (*((int32_t *)(BUCKET_ADDR(index, idx) + index->key_size)) == DELETED) +#define BUCKET_IS_EMPTY(index, idx) (*((int32_t *)(BUCKET_ADDR(index, idx) + index->key_size)) == EMPTY) + +#define BUCKET_MATCHES_KEY(index, idx, key) (memcmp(key, BUCKET_ADDR(index, idx), index->key_size) == 0) + +#define BUCKET_MARK_DELETED(index, idx) (*((int32_t *)(BUCKET_ADDR(index, idx) + index->key_size)) = DELETED) + + +/* Private API */ +static int +hashindex_index(HashIndex *index, const void *key) +{ + return *((uint32_t *)key) % index->num_buckets; +} + +static int +hashindex_lookup(HashIndex *index, const void *key) +{ + int didx = -1; + int idx = hashindex_index(index, key); + for(;;) { + while(BUCKET_IS_DELETED(index, idx)) { + if(didx == -1) { + didx = idx; + } + idx = (idx + 1) % index->num_buckets; + } + if(BUCKET_IS_EMPTY(index, idx)) + { + return -1; + } + if(BUCKET_MATCHES_KEY(index, idx, key)) { + if (didx != -1) { + memcpy(BUCKET_ADDR(index, didx), BUCKET_ADDR(index, idx), index->bucket_size); + BUCKET_MARK_DELETED(index, idx); + idx = didx; + } + return idx; + } + idx = (idx + 1) % index->num_buckets; + } +} + +static void +hashindex_resize(HashIndex *index, int capacity) +{ + printf("Resizing => %d\n", capacity); + char *new_path = malloc(strlen(index->path) + 5); + strcpy(new_path, index->path); + strcat(new_path, ".tmp"); + HashIndex *new = hashindex_create(new_path, capacity, index->key_size, index->value_size); + void *key = NULL; + while((key = hashindex_next_key(index, key))) { + hashindex_set(new, key, hashindex_get(index, key)); + } + munmap(index->map_addr, index->map_length); + index->map_addr = new->map_addr; + index->map_length = new->map_length; + index->num_buckets = new->num_buckets; + index->limit = new->limit; + index->buckets = new->buckets; + unlink(index->path); + rename(new_path, index->path); + free(new_path); + free(new->path); + free(new); +} + +/* Public API */ +HashIndex * +hashindex_open(const char *path) +{ + int fd = open(path, O_RDWR); + if(fd < 0) { + printf("Failed to open %s\n", path); + return NULL; + } + off_t length = lseek(fd, 0, SEEK_END); + void *addr = mmap(0, length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + close(fd); + if(addr == MAP_FAILED) { + printf("Failed to mmap %s", path); + } + HashHeader *header = (HashHeader *)addr; + HashIndex *index = malloc(sizeof(HashIndex)); + index->path = malloc(strlen(path) + 1); + strcpy(index->path, path); + index->map_addr = addr; + index->map_length = length; + index->num_entries = header->num_entries; + index->num_buckets = header->num_buckets; + index->key_size = header->key_size; + index->value_size = header->value_size; + index->bucket_size = index->key_size + index->value_size; + index->buckets = (addr + sizeof(HashHeader)); + index->limit = (int)(index->num_buckets * .75); + return index; +} + +HashIndex * +hashindex_create(const char *path, int capacity, int key_size, int value_size) +{ + FILE *fd; + int i; + if(!(fd = fopen(path, "w"))) { + printf("Failed to create %s\n", path); + return NULL; + } + HashHeader header; + memcpy(header.magic, MAGIC, sizeof(MAGIC) - 1); + header.num_entries = 0; + header.num_buckets = capacity; + header.key_size = key_size; + header.value_size = value_size; + int bucket_size = key_size + value_size; + char *bucket = calloc(bucket_size, 1); + if(fwrite(&header, 1, sizeof(header), fd) != sizeof(header)) + goto error; + *((int32_t *)(bucket + key_size)) = EMPTY; + for(i = 0; i < capacity; i++) { + if(fwrite(bucket, 1, bucket_size, fd) != bucket_size) + goto error; + } + free(bucket); + fclose(fd); + return hashindex_open(path); +error: + fclose(fd); + free(bucket); + return NULL; +} + +void +hashindex_flush(HashIndex *index) +{ + *((int32_t *)(index->map_addr + 8)) = index->num_entries; + *((int32_t *)(index->map_addr + 12)) = index->num_buckets; + msync(index->map_addr, index->map_length, MS_SYNC); +} + +void +hashindex_close(HashIndex *index) +{ + hashindex_flush(index); + munmap(index->map_addr, index->map_length); + free(index->path); + free(index); +} + +const void * +hashindex_get(HashIndex *index, const void *key) +{ + int idx = hashindex_lookup(index, key); + if(idx < 0) { + return NULL; + } + return BUCKET_ADDR(index, idx) + index->key_size; +} + +void +hashindex_set(HashIndex *index, const void *key, const void *value) +{ + int idx = hashindex_lookup(index, key); + if(idx < 0) + { + if(index->num_entries > index->limit) { + hashindex_resize(index, index->num_buckets * 2); + } + idx = hashindex_index(index, key); + while(!BUCKET_IS_EMPTY(index, idx) && !BUCKET_IS_DELETED(index, idx)) { + idx = (idx + 1) % index->num_buckets; + } + memcpy(BUCKET_ADDR(index, idx), key, index->key_size); + memcpy(BUCKET_ADDR(index, idx) + index->key_size, value, index->value_size); + index->num_entries += 1; + } + else + { + memcpy(BUCKET_ADDR(index, idx) + index->key_size, value, index->value_size); + } +} + +void +hashindex_delete(HashIndex *index, const void *key) +{ + int idx = hashindex_lookup(index, key); + if (idx < 0) { + return; + } + BUCKET_MARK_DELETED(index, idx); + index->num_entries -= 1; +} + +void * +hashindex_next_key(HashIndex *index, const void *key) +{ + int idx = 0; + if(key) { + idx = 1 + (key - index->buckets) / index->bucket_size; + } + if (idx == index->num_buckets) + return NULL; + while(BUCKET_IS_EMPTY(index, idx) || BUCKET_IS_DELETED(index, idx)) { + idx ++; + if (idx == index->num_buckets) + return NULL; + } + return BUCKET_ADDR(index, idx); +} + +int +hashindex_get_size(HashIndex *index) +{ + return index->num_entries; +} + diff --git a/darc/hashindex.h b/darc/hashindex.h new file mode 100644 index 000000000..6e725a35b --- /dev/null +++ b/darc/hashindex.h @@ -0,0 +1,27 @@ +#ifndef __HASHINDEX_H__ +#define __HASHINDEX_H__ + +typedef struct { + char *path; + void *map_addr; + off_t map_length; + void *buckets; + int num_entries; + int num_buckets; + int key_size; + int value_size; + int bucket_size; + int limit; +} HashIndex; + +HashIndex *hashindex_open(const char *path); +void hashindex_close(HashIndex *index); +void hashindex_flush(HashIndex *index); +HashIndex *hashindex_create(const char *path, int capacity, int key_size, int value_size); +const void *hashindex_get(HashIndex *index, const void *key); +void hashindex_set(HashIndex *index, const void *key, const void *value); +void hashindex_delete(HashIndex *index, const void *key); +void *hashindex_next_key(HashIndex *index, const void *key); +int hashindex_get_size(HashIndex *index); + +#endif diff --git a/darc/hashindex.py b/darc/hashindex.py deleted file mode 100644 index 34fa6e81c..000000000 --- a/darc/hashindex.py +++ /dev/null @@ -1,237 +0,0 @@ -import numpy -import os -import random -import shutil -import struct -import tempfile -import unittest -from UserDict import DictMixin - - -class HashIndexBase(DictMixin): - EMPTY, DELETED = -1, -2 - FREE = (EMPTY, DELETED) - - i_fmt = struct.Struct('= self.limit: - self.resize() - try: - idx = self.lookup(key) - self.buckets[idx][1], self.buckets[idx][2] = value - return - except KeyError: - idx = self.index(key) - while self.buckets[idx][1] not in self.FREE: - idx = (idx + 1) % self.buckets.size - self.buckets[idx][1], self.buckets[idx][2] = value - self.buckets[idx][0] = key - self.num_entries += 1 - - def iteritems(self, limit=0, marker=None): - n = 0 - for idx in xrange(self.buckets.size): - if self.buckets[idx][1] in self.FREE: - continue - key = str(self.buckets[idx][0]) - if marker and key != marker: - continue - elif marker: - marker = None - yield key, (self.buckets[idx][1], self.buckets[idx][2]) - n += 1 - if n == limit: - return - - -class BandIndex(HashIndexBase): - MAGIC = 'BANDINDEX' - idx_type = numpy.dtype('= self.limit: - self.resize() - try: - idx = self.lookup(key) - self.buckets[idx][1] = value - return - except KeyError: - idx = self.index(key) - while self.buckets[idx][1] not in self.FREE: - idx = (idx + 1) % self.buckets.size - self.buckets[idx][1] = value - self.buckets[idx][0] = key - self.num_entries += 1 - - def iteritems(self, limit=0, marker=None): - n = 0 - for idx in xrange(self.buckets.size): - if self.buckets[idx][1] in self.FREE: - continue - key = self.buckets[idx][0] - if marker and key != marker: - continue - elif marker: - marker = None - yield key, self.buckets[idx][1] - n += 1 - if n == limit: - return - - -class HashIndexTestCase(unittest.TestCase): - - def setUp(self): - self.tmppath = tempfile.mkdtemp() - - def tearDown(self): - shutil.rmtree(self.tmppath) - - def test_bandindex(self): - ref = {} - idx = BandIndex.create(os.path.join(self.tmppath, 'idx'), 16) - for x in range(1000): - band = random.randint(0, 100) - ref.setdefault(band, 0) - ref[band] += 1 - idx.setdefault(band, 0) - idx[band] += 1 - idx.flush() - idx2 = BandIndex(os.path.join(self.tmppath, 'idx')) - for key, value in ref.iteritems(): - self.assertEqual(idx2[key], value) - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(HashIndexTestCase) - -if __name__ == '__main__': - unittest.main() - - - - diff --git a/darc/hashindex.pyx b/darc/hashindex.pyx new file mode 100644 index 000000000..392438be6 --- /dev/null +++ b/darc/hashindex.pyx @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- + +cdef extern from "hashindex.h": + ctypedef struct HashIndex: + pass + + HashIndex *hashindex_open(char *path) + HashIndex *hashindex_create(char *path, int capacity, int key_size, int value_size) + int hashindex_get_size(HashIndex *index) + void hashindex_close(HashIndex *index) + void hashindex_flush(HashIndex *index) + void *hashindex_get(HashIndex *index, void *key) + void *hashindex_next_key(HashIndex *index, void *key) + void hashindex_delete(HashIndex *index, void *key) + void hashindex_set(HashIndex *index, void *key, void *value) + + +cdef class IndexBase: + cdef HashIndex *index + + def __cinit__(self, path): + self.index = hashindex_open(path) + if not self.index: + raise Exception('Failed to open %s' % path) + + def __dealloc__(self): + hashindex_close(self.index) + + def flush(self): + hashindex_flush(self.index) + + def setdefault(self, key, value): + if not key in self: + self[key] = value + + def pop(self, key): + value = self[key] + del self[key] + return value + + def __len__(self): + return hashindex_get_size(self.index) + + +cdef class NSIndex(IndexBase): + + @classmethod + def create(cls, path, capacity=16): + index = hashindex_create(path, capacity, 32, 8) + hashindex_close(index) + return cls(path) + + def __getitem__(self, key): + assert len(key) == 32 + data = hashindex_get(self.index, key) + if not data: + raise KeyError + return data[0], data[1] + + def __delitem__(self, key): + assert len(key) == 32 + hashindex_delete(self.index, key) + + def __setitem__(self, key, value): + assert len(key) == 32 + cdef int[2] data + data[0] = value[0] + data[1] = value[1] + hashindex_set(self.index, key, data) + + def __contains__(self, key): + assert len(key) == 32 + data = hashindex_get(self.index, key) + return data != NULL + + def iteritems(self, marker=None, limit=0): + iter = NSKeyIterator() + iter.index = self.index + return iter + + +cdef class NSKeyIterator: + cdef HashIndex *index + cdef char *key + + def __cinit__(self): + self.key = NULL + + def __iter__(self): + return self + + def __next__(self): + self.key = hashindex_next_key(self.index, self.key) + if not self.key: + raise StopIteration + cdef int *value = (self.key + 32) + return self.key[:32], (value[0], value[1]) + + +cdef class BandIndex(IndexBase): + + @classmethod + def create(cls, path, capacity=16): + index = hashindex_create(path, capacity, 4, 4) + hashindex_close(index) + return cls(path) + + def __getitem__(self, key): + cdef int k = key + data = hashindex_get(self.index, &k) + if not data: + raise KeyError + return data[0] + + def __delitem__(self, key): + cdef int k = key + hashindex_delete(self.index, &k) + + def __setitem__(self, key, value): + cdef int k = key + cdef int v = value + hashindex_set(self.index, &k, &v) + + def __contains__(self, key): + cdef int k = key + data = hashindex_get(self.index, &k) + return data != NULL + + def iteritems(self, marker=None, limit=0): + iter = BandKeyIterator() + iter.index = self.index + return iter + + +cdef class BandKeyIterator: + cdef HashIndex *index + cdef int *key + + def __cinit__(self): + self.key = NULL + + def __iter__(self): + return self + + def __next__(self): + self.key = hashindex_next_key(self.index, self.key) + if not self.key: + raise StopIteration + return self.key[0], self.key[1] diff --git a/darc/test.py b/darc/test.py index 11a0ed95c..770482372 100644 --- a/darc/test.py +++ b/darc/test.py @@ -11,7 +11,7 @@ from xattr import xattr, XATTR_NOFOLLOW import getpass getpass.getpass = lambda m: 'abc123' -from . import store, helpers, lrucache, hashindex +from . import store, helpers, lrucache from .archiver import Archiver @@ -124,7 +124,6 @@ def suite(): suite.addTest(store.suite()) suite.addTest(doctest.DocTestSuite(helpers)) suite.addTest(lrucache.suite()) - suite.addTest(hashindex.suite()) return suite if __name__ == '__main__': diff --git a/setup.py b/setup.py index 802ddddd9..e6df3b34e 100644 --- a/setup.py +++ b/setup.py @@ -2,8 +2,9 @@ #!/usr/bin/env python import sys from setuptools import setup, Extension +from Cython.Distutils import build_ext -dependencies = ['pycrypto', 'msgpack-python', 'pbkdf2.py', 'xattr', 'paramiko', 'numpy'] +dependencies = ['pycrypto', 'msgpack-python', 'pbkdf2.py', 'xattr', 'paramiko'] if sys.version_info < (2, 7): dependencies.append('argparse') @@ -13,7 +14,10 @@ setup(name='darc', author=u'Jonas Borgström', author_email='jonas@borgstrom.se', packages=['darc'], - ext_modules=[Extension('darc._speedups', ['darc/_speedups.c'])], + cmdclass = {'build_ext': build_ext}, + ext_modules=[ + Extension('darc._speedups', ['darc/_speedups.c']), + Extension('darc.hashindex', ['darc/hashindex.pyx', 'darc/_hashindex.c'])], install_requires=dependencies, entry_points = { 'console_scripts': [