diff --git a/darc/_hashindex.c b/darc/_hashindex.c index dc85a8d21..c8e3c800f 100644 --- a/darc/_hashindex.c +++ b/darc/_hashindex.c @@ -159,6 +159,17 @@ error: return NULL; } +void +hashindex_clear(HashIndex *index) +{ + int i; + for(i = 0; i < index->num_buckets; i++) { + BUCKET_MARK_DELETED(index, i); + } + index->num_entries = 0; + hashindex_resize(index, 16); +} + void hashindex_flush(HashIndex *index) { diff --git a/darc/hashindex.h b/darc/hashindex.h index 6e725a35b..3ab7d348b 100644 --- a/darc/hashindex.h +++ b/darc/hashindex.h @@ -16,6 +16,7 @@ typedef struct { HashIndex *hashindex_open(const char *path); void hashindex_close(HashIndex *index); +void hashindex_clear(HashIndex *index); void hashindex_flush(HashIndex *index); HashIndex *hashindex_create(const char *path, int capacity, int key_size, int value_size); const void *hashindex_get(HashIndex *index, const void *key); diff --git a/darc/hashindex.pyx b/darc/hashindex.pyx index e56565d22..00e4b7aae 100644 --- a/darc/hashindex.pyx +++ b/darc/hashindex.pyx @@ -7,6 +7,7 @@ cdef extern from "hashindex.h": HashIndex *hashindex_open(char *path) HashIndex *hashindex_create(char *path, int capacity, int key_size, int value_size) int hashindex_get_size(HashIndex *index) + void hashindex_clear(HashIndex *index) void hashindex_close(HashIndex *index) void hashindex_flush(HashIndex *index) void *hashindex_get(HashIndex *index, void *key) @@ -14,6 +15,7 @@ cdef extern from "hashindex.h": void hashindex_delete(HashIndex *index, void *key) void hashindex_set(HashIndex *index, void *key, void *value) +_NoDefault = object() cdef class IndexBase: cdef HashIndex *index @@ -26,6 +28,9 @@ cdef class IndexBase: def __dealloc__(self): hashindex_close(self.index) + def clear(self): + hashindex_clear(self.index) + def flush(self): hashindex_flush(self.index) @@ -39,10 +44,15 @@ cdef class IndexBase: except KeyError: return default - def pop(self, key): - value = self[key] - del self[key] - return value + def pop(self, key, default=_NoDefault): + try: + value = self[key] + del self[key] + return value + except KeyError: + if default != _NoDefault: + return default + raise def __len__(self): return hashindex_get_size(self.index) diff --git a/darc/store.py b/darc/store.py index 88a03651d..c433ddf24 100644 --- a/darc/store.py +++ b/darc/store.py @@ -1,4 +1,5 @@ from ConfigParser import RawConfigParser +import errno import fcntl import os import shutil @@ -45,7 +46,6 @@ class Store(object): fd.write('This is a DARC store') os.mkdir(os.path.join(path, 'bands')) os.mkdir(os.path.join(path, 'indexes')) - BandIndex.create(os.path.join(path, 'indexes', 'bands')) config = RawConfigParser() config.add_section('store') config.set('store', 'version', '1') @@ -75,18 +75,18 @@ class Store(object): max_band_size = self.config.getint('store', 'max_band_size') bands_per_dir = self.config.getint('store', 'bands_per_dir') self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir) + self.io.cleanup() def delete_bands(self): delete_path = os.path.join(self.path, 'indexes', 'delete') if os.path.exists(delete_path): + bands = self.get_index('bands') for band in read_set(delete_path): - assert self.bands.pop(band) == 0 - self.io.delete_band(band) + assert bands.pop(band, 0) == 0 + self.io.delete_band(band, missing_ok=True) os.unlink(delete_path) def begin_txn(self): - self.io.cleanup() - self.delete_bands() txn_dir = os.path.join(self.path, 'txn.tmp') # Initialize transaction snapshot os.mkdir(txn_dir) @@ -97,7 +97,6 @@ class Store(object): os.path.join(self.path, 'txn.active')) self.compact = set() self.txn_active = True - self.bands = BandIndex(os.path.join(self.path, 'indexes', 'bands')) def close(self): self.rollback() @@ -115,13 +114,11 @@ class Store(object): self.config.write(fd) for i in self.indexes.values(): i.flush() - self.bands.flush() + # If we crash before this line, the transaction will be + # rolled back by open() os.rename(os.path.join(self.path, 'txn.active'), - os.path.join(self.path, 'txn.tmp')) - shutil.rmtree(os.path.join(self.path, 'txn.tmp')) - self.indexes = {} - self.txn_active = False - self.delete_bands() + os.path.join(self.path, 'txn.commit')) + self.rollback() def compact_bands(self): """Compact sparse bands by copying data into new bands @@ -131,22 +128,25 @@ class Store(object): self.io.close_band() def lookup(ns, key): return key in self.get_index(ns) + bands = self.get_index('bands') for band in self.compact: - if self.bands[band] > 0: + if bands[band] > 0: for ns, key, data in self.io.iter_objects(band, lookup): new_band, offset = self.io.write(ns, key, data) self.indexes[ns][key] = new_band, offset - self.bands[band] -= 1 - self.bands.setdefault(new_band, 0) - self.bands[new_band] += 1 + bands[band] -= 1 + bands.setdefault(new_band, 0) + bands[new_band] += 1 write_set(self.compact, os.path.join(self.path, 'indexes', 'delete')) def rollback(self): """ """ - # Remove partial transaction - if os.path.exists(os.path.join(self.path, 'txn.tmp')): - shutil.rmtree(os.path.join(self.path, 'txn.tmp')) + # Commit any half committed transaction + if os.path.exists(os.path.join(self.path, 'txn.commit')): + self.delete_bands() + os.rename(os.path.join(self.path, 'txn.commit'), + os.path.join(self.path, 'txn.tmp')) # Roll back active transaction txn_dir = os.path.join(self.path, 'txn.active') if os.path.exists(txn_dir): @@ -154,7 +154,10 @@ class Store(object): shutil.copytree(os.path.join(txn_dir, 'indexes'), os.path.join(self.path, 'indexes')) shutil.copy(os.path.join(txn_dir, 'config'), self.path) - shutil.rmtree(txn_dir) + os.rename(txn_dir, os.path.join(self.path, 'txn.tmp')) + # Remove partially removed transaction + if os.path.exists(os.path.join(self.path, 'txn.tmp')): + shutil.rmtree(os.path.join(self.path, 'txn.tmp')) self.indexes = {} self.txn_active = False @@ -162,11 +165,16 @@ class Store(object): try: return self.indexes[ns] except KeyError: - filename = os.path.join(self.path, 'indexes', 'ns%d' % ns) - if os.path.exists(filename): - self.indexes[ns] = NSIndex(filename) + if ns == 'bands': + filename = os.path.join(self.path, 'indexes', 'bands') + cls = BandIndex else: - self.indexes[ns] = NSIndex.create(filename) + filename = os.path.join(self.path, 'indexes', 'ns%d' % ns) + cls = NSIndex + if os.path.exists(filename): + self.indexes[ns] = cls(filename) + else: + self.indexes[ns] = cls.create(filename) return self.indexes[ns] def get(self, ns, id): @@ -180,8 +188,9 @@ class Store(object): if not self.txn_active: self.begin_txn() band, offset = self.io.write(ns, id, data) - self.bands.setdefault(band, 0) - self.bands[band] += 1 + bands = self.get_index('bands') + bands.setdefault(band, 0) + bands[band] += 1 self.get_index(ns)[id] = band, offset def delete(self, ns, id): @@ -189,7 +198,7 @@ class Store(object): self.begin_txn() try: band, offset = self.get_index(ns).pop(id) - self.bands[band] -= 1 + self.get_index('bands')[band] -= 1 self.compact.add(band) except KeyError: raise self.DoesNotExist @@ -242,8 +251,12 @@ class BandIO(object): self.fds[band] = fd return fd - def delete_band(self, band): - os.unlink(self.band_filename(band)) + def delete_band(self, band, missing_ok=False): + try: + os.unlink(self.band_filename(band)) + except OSError, e: + if not missing_ok or e.errno != errno.ENOENT: + raise def read(self, band, offset): fd = self.get_fd(band)