diff --git a/dedupestore/_speedups.c b/dedupestore/_speedups.c index 62ca2da58..7b83e4566 100644 --- a/dedupestore/_speedups.c +++ b/dedupestore/_speedups.c @@ -1,5 +1,5 @@ -#include -#include +#include +#include static unsigned long int checksum(const unsigned char *data, int len, unsigned long int sum) @@ -28,15 +28,23 @@ roll_checksum(unsigned long int sum, unsigned char remove, unsigned char add, in typedef struct { PyObject_HEAD - int chunk_size, i, full_sum, done, buf_size, data_len; - PyObject *chunks, *fd, *extra; - unsigned long sum; + int chunk_size, window_size, i, last, eof, done, buf_size, data_len, initial; + PyObject *chunks, *fd; + unsigned long int sum; unsigned char *data, add, remove; } ChunkifyIter; static PyObject* ChunkifyIter_iter(PyObject *self) { + ChunkifyIter *c = (ChunkifyIter *)self; + c->data_len = 0; + c->done = 0; + c->eof = 0; + c->i = 0; + c->sum = 0; + c->last = -1; + c->initial = c->window_size; Py_INCREF(self); return self; } @@ -46,7 +54,6 @@ ChunkifyIter_dealloc(PyObject *self) { ChunkifyIter *c = (ChunkifyIter *)self; Py_DECREF(c->fd); - Py_XDECREF(c->chunks); free(c->data); self->ob_type->tp_free(self); } @@ -55,28 +62,25 @@ static PyObject* ChunkifyIter_iternext(PyObject *self) { ChunkifyIter *c = (ChunkifyIter *)self; - PyObject *pysum; - int o = 0; if(c->done) { PyErr_SetNone(PyExc_StopIteration); return NULL; } - if(c->extra) - { - c->done = 1; - Py_INCREF(c->extra); - return c->extra; - } for(;;) { - if(c->i > c->buf_size - c->chunk_size) + if(c->i == c->buf_size) { - memmove(c->data, c->data + c->i - o, c->data_len - c->i + o); - c->data_len -= c->i - o; - c->i = o; + int diff = c->last + 1 - c->window_size; + memmove(c->data, c->data + diff, c->buf_size - diff); + c->i -= diff; + c->last -= diff; + c->data_len -= diff; + assert(c->i >= 0); + assert(c->last >= -1); + assert(c->data_len >= 0); } - if(c->data_len - c->i < c->chunk_size) + if(c->i == c->data_len) { PyObject *data = PyObject_CallMethod(c->fd, "read", "i", c->buf_size - c->data_len); int n = PyString_Size(data); @@ -86,59 +90,42 @@ ChunkifyIter_iternext(PyObject *self) } if(c->i == c->data_len) { + if(c->last < c->i - 1) { + c->done = 1; + return PyString_FromStringAndSize((char *)(c->data + c->last + 1), + c->data_len - c->last - 1); + } PyErr_SetNone(PyExc_StopIteration); return NULL; } - if(c->data_len - c->i < c->chunk_size) /* EOF ? */ + if(c->initial) { - if(o == 1) - { - c->done = 1; - return PyString_FromStringAndSize((char *)(c->data + c->i - 1), c->data_len - c->i + 1); - } - else if(o > 1) - { - c->extra = PyString_FromStringAndSize((char *)(c->data + c->i - 1), c->chunk_size); - return PyString_FromStringAndSize((char *)(c->data + c->i - o), o - 1); - } - else - { - c->done = 1; - return PyString_FromStringAndSize((char *)(c->data + c->i), c->data_len - c->i); - } - } - if(o == c->chunk_size) - { - return PyString_FromStringAndSize((char *)(c->data + c->i - c->chunk_size), c->chunk_size); - } - if(c->full_sum || c->i + c->chunk_size > c->data_len) - { - c->full_sum = 0; - c->sum = checksum(c->data + c->i, c->chunk_size, 0); + c->initial--; + c->sum = checksum(c->data + c->i, 1, c->sum); } else { - c->sum = roll_checksum(c->sum, c->remove, c->data[c->i + c->chunk_size - 1], c->chunk_size); + c->sum = roll_checksum(c->sum, + c->data[c->i - c->window_size], + c->data[c->i], + c->window_size); } - c->remove = c->data[c->i]; - pysum = PyInt_FromLong(c->sum); - if(PySequence_Contains(c->chunks, pysum) == 1) - { - Py_DECREF(pysum); - c->full_sum = 1; - if(o > 0) - { - return PyString_FromStringAndSize((char *)(c->data + c->i - o), o); - } - else - { - c->i += c->chunk_size; - return PyString_FromStringAndSize((char *)(c->data + c->i - c->chunk_size), c->chunk_size); - } - } - Py_DECREF(pysum); - o++; c->i++; + if(c->i == c->buf_size && c->last == c->window_size - 1) + { + int old_last = c->last; + c->last = c->i - 1; + printf("Max chunk size reached %d\n", c->last - old_last); + return PyString_FromStringAndSize((char *)(c->data + old_last + 1), + c->last - old_last); + } + else if((c->sum % c->chunk_size) == 0) + { + int old_last = c->last; + c->last = c->i - 1; + return PyString_FromStringAndSize((char *)(c->data + old_last + 1), + c->last - old_last); + } } PyErr_SetNone(PyExc_StopIteration); return NULL; @@ -180,11 +167,11 @@ static PyTypeObject ChunkifyIterType = { static PyObject * chunkify(PyObject *self, PyObject *args) { - PyObject *fd, *chunks; - long int chunk_size; + PyObject *fd; + long int chunk_size, window_size; ChunkifyIter *c; - if (!PyArg_ParseTuple(args, "OiO", &fd, &chunk_size, &chunks)) + if (!PyArg_ParseTuple(args, "Oii", &fd, &chunk_size, &window_size)) { return NULL; } @@ -193,18 +180,12 @@ chunkify(PyObject *self, PyObject *args) return NULL; } PyObject_Init((PyObject *)c, &ChunkifyIterType); - c->buf_size = chunk_size * 10; + c->buf_size = 10 * 1024 * 1024; c->data = malloc(c->buf_size); - c->data_len = 0; - c->i = 0; - c->full_sum = 1; - c->done = 0; - c->extra = NULL; c->fd = fd; c->chunk_size = chunk_size; - c->chunks = chunks; + c->window_size = window_size; Py_INCREF(fd); - Py_INCREF(chunks); return (PyObject *)c; } diff --git a/dedupestore/archiver.py b/dedupestore/archiver.py index 4c3284282..fc558bcfb 100644 --- a/dedupestore/archiver.py +++ b/dedupestore/archiver.py @@ -9,7 +9,7 @@ from cache import Cache, NS_ARCHIVES, NS_CHUNKS #from sqlitestore import SqliteStore from bandstore import BandStore -CHUNK_SIZE = 256 * 1024 +CHUNK_SIZE = 55001 class Archive(object): @@ -22,12 +22,12 @@ class Archive(object): if name: self.open(name) - def add_chunk(self, id, sum, csize, osize): + def add_chunk(self, id, csize, osize): try: return self.chunk_idx[id] except KeyError: idx = len(self.chunks) - self.chunks.append((id, sum, csize, osize)) + self.chunks.append((id, csize, osize)) self.chunk_idx[id] = idx return idx @@ -36,7 +36,7 @@ class Archive(object): self.items = archive['items'] self.name = archive['name'] self.chunks = archive['chunks'] - for i, (id, sum, csize, osize) in enumerate(archive['chunks']): + for i, (id, csize, osize) in enumerate(archive['chunks']): self.chunk_idx[i] = id def save(self, name): @@ -57,7 +57,7 @@ class Archive(object): chunk_count.setdefault(id, 0) chunk_count[id] += 1 for id, c in chunk_count.items(): - count, sum, csize, osize = cache.chunkmap[id] + count, csize, osize = cache.chunkmap[id] total_csize += csize if c == count: total_usize += csize @@ -134,7 +134,7 @@ class Archive(object): path = path.lstrip('/\\:') chunks = [] size = 0 - for chunk in chunkify(fd, CHUNK_SIZE, cache.summap): + for chunk in chunkify(fd, CHUNK_SIZE, 30): size += len(chunk) chunks.append(self.add_chunk(*cache.add_chunk(chunk))) return {'type': 'FILE', 'path': path, 'chunks': chunks, 'size': size} diff --git a/dedupestore/cache.py b/dedupestore/cache.py index 07339a41f..a0c23d692 100644 --- a/dedupestore/cache.py +++ b/dedupestore/cache.py @@ -4,8 +4,6 @@ import os import sys import zlib -from chunkifier import checksum - NS_ARCHIVES = 'ARCHIVES' NS_CHUNKS = 'CHUNKS' @@ -16,10 +14,12 @@ class Cache(object): def __init__(self, store): self.store = store - self.path = os.path.join(os.path.expanduser('~'), '.dedupestore', 'cache', + self.path = os.path.join(os.path.expanduser('~'), '.dedupestore', 'cache', '%s.cache' % self.store.uuid) self.tid = -1 self.open() + self.total = 0 + self.max = 0 if self.tid != self.store.tid: self.init() @@ -32,7 +32,6 @@ class Cache(object): print >> sys.stderr, 'Cache UUID mismatch' return self.chunkmap = data['chunkmap'] - self.summap = data['summap'] self.archives = data['archives'] self.tid = data['tid'] print 'done' @@ -56,14 +55,14 @@ class Cache(object): if self.seen_chunk(id): self.chunk_incref(id) else: - self.init_chunk(id, sum, csize, osize) + self.init_chunk(id, csize, osize) print 'done' def save(self): assert self.store.state == self.store.OPEN print 'saving cache' data = {'uuid': self.store.uuid, - 'chunkmap': self.chunkmap, 'summap': self.summap, + 'chunkmap': self.chunkmap, 'tid': self.store.tid, 'archives': self.archives} print 'Saving cache as:', self.path cachedir = os.path.dirname(self.path) @@ -74,41 +73,40 @@ class Cache(object): print 'done' def add_chunk(self, data): - sum = checksum(data) osize = len(data) data = zlib.compress(data) id = hashlib.sha1(data).digest() + self.total += 1 + if osize == 55001* 4: + self.max += 1 + print 'rate = %.2f' % (100.*self.max/self.total) if self.seen_chunk(id): + print 'yay %d bytes' % osize return self.chunk_incref(id) csize = len(data) self.store.put(NS_CHUNKS, id, data) - return self.init_chunk(id, sum, csize, osize) + return self.init_chunk(id, csize, osize) - def init_chunk(self, id, sum, csize, osize): - self.chunkmap[id] = (1, sum, csize, osize) - self.summap[sum] = self.summap.get(sum, 0) + 1 - return id, sum, csize, osize + def init_chunk(self, id, csize, osize): + self.chunkmap[id] = (1, csize, osize) + return id, csize, osize def seen_chunk(self, id): - count, sum, csize, osize = self.chunkmap.get(id, (0, 0, 0, 0)) + count, csize, osize = self.chunkmap.get(id, (0, 0, 0)) return count def chunk_incref(self, id): - count, sum, csize, osize = self.chunkmap[id] - self.chunkmap[id] = (count + 1, sum, csize, osize) - self.summap[sum] += 1 - return id, sum, csize, osize + count, csize, osize = self.chunkmap[id] + self.chunkmap[id] = (count + 1, csize, osize) + return id, csize, osize def chunk_decref(self, id): - count, sum, csize, osize = self.chunkmap[id] - sumcount = self.summap[sum] - if sumcount == 1: - del self.summap[sum] - else: - self.summap[sum] = sumcount - 1 + count, csize, osize = self.chunkmap[id] if count == 1: del self.chunkmap[id] print 'deleting chunk: ', id.encode('hex') self.store.delete(NS_CHUNKS, id) else: - self.chunkmap[id] = (count - 1, sum, csize, osize) + self.chunkmap[id] = (count - 1, csize, osize) + + diff --git a/dedupestore/chunkifier.py b/dedupestore/chunkifier.py index a9d6f63b6..d2b7ff6ed 100644 --- a/dedupestore/chunkifier.py +++ b/dedupestore/chunkifier.py @@ -30,103 +30,79 @@ def roll_checksum(sum, remove, add, len): class ChunkifyIter(object): - def __init__(self, fd, chunk_size, chunks): + def __init__(self, fd, chunk_size, window_size): self.fd = fd self.chunk_size = chunk_size - self.chunks = chunks + self.window_size = window_size + self.buf_size = self.chunk_size * 10 def __iter__(self): self.data = '' - self.i = 0 - self.full_sum = True - self.extra = None self.done = False - self.buf_size = self.chunk_size * 10 + self.i = 0 + self.sum = 0 + self.last = -1 + self.initial = self.window_size return self def next(self): - o = 0 if self.done: raise StopIteration - if self.extra: - self.done = True - return self.extra while True: - if self.i > self.buf_size - self.chunk_size: - self.data = self.data[self.i - o:] - self.i = o - if len(self.data) - self.i < self.chunk_size: + if self.i == self.buf_size: + diff = self.last + 1 - self.window_size + if diff < 0: + import ipdb + ipdb.set_trace() + self.data = self.data[diff:] + self.last -= diff + self.i -= diff + if self.i == len(self.data): self.data += self.fd.read(self.buf_size - len(self.data)) - if len(self.data) == self.i: + if self.i == len(self.data): + if self.last < self.i - 1: + self.done = True + return self.data[self.last + 1:] raise StopIteration - if len(self.data) - self.i < self.chunk_size: # EOF? - if o == 1: - self.done = True - return self.data[self.i - 1:] - elif o > 1: - self.extra = self.data[-self.chunk_size:] - return self.data[-self.chunk_size - o + 1:-self.chunk_size] - else: - self.done = True - return self.data[self.i:] - elif o == self.chunk_size: - return self.data[self.i-self.chunk_size:self.i] - if self.full_sum or len(self.data) - self.i < self.chunk_size: - self.sum = checksum(self.data[self.i:self.i + self.chunk_size]) - self.full_sum = False - self.remove = self.data[self.i] + if self.initial: + self.initial -= 1 + self.sum = checksum(self.data[self.i], self.sum) else: - self.sum = roll_checksum(self.sum, self.remove, self.data[self.i + self.chunk_size - 1], - self.chunk_size) - self.remove = self.data[self.i] - if self.sum in self.chunks: - if o > 0: - chunk = self.data[self.i - o:self.i] - else: - chunk = self.data[self.i:self.i + self.chunk_size] - self.i += self.chunk_size - self.full_sum = True - return chunk - else: - self.i += 1 - o += 1 + self.sum = roll_checksum(self.sum, + self.data[self.i - self.window_size], + self.data[self.i], + self.window_size) + self.i += 1 + if self.i == self.buf_size and self.last == self.window_size - 1: + old_last = self.last + self.last = self.i - 1 + return self.data[old_last + 1:self.last + 1] + elif self.sum % self.chunk_size == 0: + old_last = self.last + self.last = self.i - 1 + return self.data[old_last + 1:self.last + 1] def chunkify(fd, chunk_size, chunks): """ - >>> list(chunkify(StringIO.StringIO('A'), 4, {})) + >>> list(chunkify(StringIO.StringIO(''), 5, 3)) + [] + >>> list(chunkify(StringIO.StringIO('A'), 5, 3)) ['A'] - >>> list(chunkify(StringIO.StringIO('AB'), 4, {})) + >>> list(chunkify(StringIO.StringIO('AB'), 5, 3)) ['AB'] - >>> list(chunkify(StringIO.StringIO('ABC'), 4, {})) - ['ABC'] - >>> list(chunkify(StringIO.StringIO('ABCD'), 4, {})) - ['ABCD'] - >>> list(chunkify(StringIO.StringIO('ABCDE'), 4, {})) - ['A', 'BCDE'] - >>> list(chunkify(StringIO.StringIO('ABCDEF'), 4, {})) - ['AB', 'CDEF'] - >>> list(chunkify(StringIO.StringIO('ABCDEFG'), 4, {})) - ['ABC', 'DEFG'] - >>> list(chunkify(StringIO.StringIO('ABCDEFGH'), 4, {})) - ['ABCD', 'EFGH'] - >>> list(chunkify(StringIO.StringIO('ABCDEFGHI'), 4, {})) - ['ABCD', 'E', 'FGHI'] - - >>> list(chunkify(StringIO.StringIO('ABCDEFGHIJKLMN'), 4, {})) - ['ABCD', 'EFGH', 'IJ', 'KLMN'] - - >>> chunks = {44564754: True} # 'BCDE' - >>> list(chunkify(StringIO.StringIO('ABCDEFGHIJKLMN'), 4, chunks)) - ['A', 'BCDE', 'FGHI', 'J', 'KLMN'] - - >>> chunks = {44564754: True, 48496938: True} # 'BCDE', 'HIJK' - >>> list(chunkify(StringIO.StringIO('ABCDEFGHIJKLMN'), 4, chunks)) - ['A', 'BCDE', 'FG', 'HIJK', 'LMN'] - - >>> chunks = {43909390: True, 50463030: True} # 'ABCD', 'KLMN' - >>> list(chunkify(StringIO.StringIO('ABCDEFGHIJKLMN'), 4, chunks)) - ['ABCD', 'EFGH', 'IJ', 'KLMN'] + >>> list(chunkify(StringIO.StringIO('1B'), 5, 3)) + ['1', 'B'] + >>> list(chunkify(StringIO.StringIO('ABCDEFGHIJKLMNOPQ'), 5, 3)) + ['ABCD', 'EFGHI', 'JKLMN', 'OPQ'] + >>> list(chunkify(StringIO.StringIO('1ABCDEFGHIJKLMNOPQ'), 5, 3)) + ['1', 'ABCD', 'EFGHI', 'JKLMN', 'OPQ'] + >>> list(chunkify(StringIO.StringIO('12ABCDEFGHIJKLMNOPQ'), 5, 3)) + ['1', '2A', 'BCD', 'EFGHI', 'JKLMN', 'OPQ'] + >>> list(chunkify(StringIO.StringIO('12ABCDEFGHIJKLMNOPQRSTUVWXYZ'), 5, 3)) + ['1', '2A', 'BCD', 'EFGHI', 'JKLMN', 'OPQRS', 'TUVWX', 'YZ'] + >>> list(chunkify(StringIO.StringIO('12ABCDEFGHIJKLMNOPQRSTUVWXYZ'), 5, 3)) + ['1', '2A', 'BCD', 'EFGHI', 'JKLMN', 'OPQRS', 'TUVWX', 'YZ'] """ return ChunkifyIter(fd, chunk_size, chunks) @@ -142,4 +118,5 @@ except ImportError: if __name__ == '__main__': import doctest + import StringIO doctest.testmod()