diff --git a/darc/helpers.py b/darc/helpers.py index 38584bf7e..782824b07 100644 --- a/darc/helpers.py +++ b/darc/helpers.py @@ -6,6 +6,22 @@ import os import pwd import re import stat +import struct + + +def read_set(path): + """Read set from disk (as int32s) + """ + with open(path, 'rb') as fd: + data = fd.read() + return set(struct.unpack('<%di' % (len(data) / 4), data)) + + +def write_set(s, path): + """Write set to disk (as int32s) + """ + with open(path, 'wb') as fd: + fd.write(struct.pack('<%di' % len(s), *s)) def encode_long(v): diff --git a/darc/store.py b/darc/store.py index dd71cb286..1308aacfb 100644 --- a/darc/store.py +++ b/darc/store.py @@ -8,7 +8,7 @@ import unittest from zlib import crc32 from .hashindex import NSIndex, BandIndex -from .helpers import IntegrityError +from .helpers import IntegrityError, read_set, write_set from .lrucache import LRUCache @@ -76,7 +76,17 @@ class Store(object): self.rollback() self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir) + def delete_bands(self): + delete_path = os.path.join(self.path, 'indexes', 'delete') + if os.path.exists(delete_path): + for band in read_set(delete_path): + assert self.bands.pop(band) == 0 + self.io.delete_band(band) + os.unlink(delete_path) + def begin_txn(self): + self.io.cleanup() + self.delete_bands() txn_dir = os.path.join(self.path, 'txn.tmp') # Initialize transaction snapshot os.mkdir(txn_dir) @@ -111,8 +121,11 @@ class Store(object): shutil.rmtree(os.path.join(self.path, 'txn.tmp')) self.indexes = {} self.txn_active = False + self.delete_bands() def compact_bands(self): + """Compact sparse bands by copying data into new bands + """ if not self.compact: return self.io.close_band() @@ -126,10 +139,7 @@ class Store(object): self.bands[band] -= 1 self.bands.setdefault(new_band, 0) self.bands[new_band] += 1 - - for band in self.compact: - assert self.bands.pop(band) == 0 - self.io.delete_band(band) + write_set(self.compact, os.path.join(self.path, 'indexes', 'delete')) def rollback(self): """ @@ -206,6 +216,17 @@ class BandIO(object): for band in self.fds.keys(): self.fds.pop(band).close() + def cleanup(self): + """Delete band files left by aborted transactions + """ + band = self.band + while True: + filename = self.band_filename(band) + if not os.path.exists(filename): + break + os.unlink(filename) + band += 1 + def band_filename(self, band): return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))