From 7e85711df00ebc377d8810962a63ce6980ce1332 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 21 Aug 2014 22:06:11 +0200 Subject: [PATCH] Beginnings of a tiered index implementation. This reintroduces a LevelDB-based metrics index. Change-Id: I4111540301c52255a07b2f570761707a32f72c05 --- .build/Makefile | 6 +- storage/local/index/batch.go | 21 ++ storage/local/index/codec.go | 200 +++++++++++ storage/local/index/codec_test.go | 112 ++++++ storage/local/index/index.go | 561 ++++++++++++++++++++++++++++++ storage/local/index/index_test.go | 252 ++++++++++++++ storage/local/index/interface.go | 43 +++ storage/local/index/leveldb.go | 80 +++++ storage/local/interface.go | 21 +- storage/local/persistence.go | 428 +++++------------------ storage/local/persistence_test.go | 177 +--------- storage/local/storage.go | 36 +- utility/test/directory.go | 14 + 13 files changed, 1389 insertions(+), 562 deletions(-) create mode 100644 storage/local/index/batch.go create mode 100644 storage/local/index/codec.go create mode 100644 storage/local/index/codec_test.go create mode 100644 storage/local/index/index.go create mode 100644 storage/local/index/index_test.go create mode 100644 storage/local/index/interface.go create mode 100644 storage/local/index/leveldb.go diff --git a/.build/Makefile b/.build/Makefile index 943bb87b29..54482b7fd6 100644 --- a/.build/Makefile +++ b/.build/Makefile @@ -45,7 +45,7 @@ cc-implementation-Linux-stamp: [ -x "$$(which cc)" ] || $(APT_GET_INSTALL) build-essential touch $@ -dependencies-stamp: cache-stamp cc-stamp leveldb-stamp snappy-stamp godns-stamp +dependencies-stamp: cache-stamp cc-stamp leveldb-stamp snappy-stamp godns-stamp goleveldb-stamp touch $@ goprotobuf-protoc-gen-go-stamp: protoc-stamp goprotobuf-stamp @@ -60,6 +60,10 @@ godns-stamp: $(GO_GET) github.com/miekg/dns $(THIRD_PARTY_BUILD_OUTPUT) touch $@ +goleveldb-stamp: + $(GO_GET) github.com/syndtr/goleveldb/leveldb $(THIRD_PARTY_BUILD_OUTPUT) + touch $@ + leveldb-stamp: cache-stamp cache/leveldb-$(LEVELDB_VERSION).tar.gz cc-stamp rsync-stamp snappy-stamp tar xzvf cache/leveldb-$(LEVELDB_VERSION).tar.gz -C dirty $(THIRD_PARTY_BUILD_OUTPUT) cd dirty/leveldb-$(LEVELDB_VERSION) && CFLAGS="$(CFLAGS) -lsnappy" CXXFLAGS="$(CXXFLAGS) -lsnappy $(LDFLAGS)" LDFLAGS="-lsnappy $(LDFLAGS)" bash -x ./build_detect_platform build_config.mk ./ diff --git a/storage/local/index/batch.go b/storage/local/index/batch.go new file mode 100644 index 0000000000..57d7761dce --- /dev/null +++ b/storage/local/index/batch.go @@ -0,0 +1,21 @@ +package index + +import ( + "github.com/syndtr/goleveldb/leveldb" +) + +type batch struct { + batch *leveldb.Batch +} + +func (b *batch) Put(key, value encodable) { + b.batch.Put(key.encode(), value.encode()) +} + +func (b *batch) Delete(k encodable) { + b.batch.Delete(k.encode()) +} + +func (b *batch) Reset() { + b.batch.Reset() +} diff --git a/storage/local/index/codec.go b/storage/local/index/codec.go new file mode 100644 index 0000000000..b7eb1ff207 --- /dev/null +++ b/storage/local/index/codec.go @@ -0,0 +1,200 @@ +package index + +import ( + "bytes" + "encoding/binary" + "io" + "sync" + + clientmodel "github.com/prometheus/client_golang/model" + + "github.com/prometheus/prometheus/storage/metric" +) + +type codable interface { + encodable + decodable +} + +type encodable interface { + encode() []byte +} + +type decodable interface { + decode([]byte) +} + +// TODO: yeah, this ain't ideal. A lot of locking and possibly even contention. +var tmpBufMtx sync.Mutex +var tmpBuf = make([]byte, binary.MaxVarintLen64) + +func setTmpBufLen(l int) { + if cap(tmpBuf) >= l { + tmpBuf = tmpBuf[:l] + } else { + tmpBuf = make([]byte, l) + } +} + +func encodeVarint(b *bytes.Buffer, i int) { + tmpBufMtx.Lock() + defer tmpBufMtx.Unlock() + + bytesWritten := binary.PutVarint(tmpBuf, int64(i)) + if _, err := b.Write(tmpBuf[:bytesWritten]); err != nil { + panic(err) + } +} + +func encodeString(b *bytes.Buffer, s string) { + encodeVarint(b, len(s)) + if _, err := b.WriteString(s); err != nil { + panic(err) + } +} + +func decodeString(b *bytes.Reader) string { + length, err := binary.ReadVarint(b) + if err != nil { + panic(err) + } + + tmpBufMtx.Lock() + defer tmpBufMtx.Unlock() + + setTmpBufLen(int(length)) + if _, err := io.ReadFull(b, tmpBuf); err != nil { + panic(err) + } + return string(tmpBuf) +} + +type codableMetric clientmodel.Metric + +func (m codableMetric) encode() []byte { + buf := &bytes.Buffer{} + encodeVarint(buf, len(m)) + for l, v := range m { + encodeString(buf, string(l)) + encodeString(buf, string(v)) + } + return buf.Bytes() +} + +func (m codableMetric) decode(buf []byte) { + r := bytes.NewReader(buf) + numLabelPairs, err := binary.ReadVarint(r) + if err != nil { + panic(err) + } + for ; numLabelPairs > 0; numLabelPairs-- { + ln := decodeString(r) + lv := decodeString(r) + m[clientmodel.LabelName(ln)] = clientmodel.LabelValue(lv) + } +} + +type codableFingerprint clientmodel.Fingerprint + +func (fp codableFingerprint) encode() []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fp)) + return b +} + +func (fp *codableFingerprint) decode(buf []byte) { + *fp = codableFingerprint(binary.BigEndian.Uint64(buf)) +} + +type codableFingerprints clientmodel.Fingerprints + +func (fps codableFingerprints) encode() []byte { + buf := bytes.NewBuffer(make([]byte, 0, binary.MaxVarintLen64+len(fps)*8)) + encodeVarint(buf, len(fps)) + + tmpBufMtx.Lock() + defer tmpBufMtx.Unlock() + + setTmpBufLen(8) + for _, fp := range fps { + binary.BigEndian.PutUint64(tmpBuf, uint64(fp)) + if _, err := buf.Write(tmpBuf[:8]); err != nil { + panic(err) + } + } + return buf.Bytes() +} + +func (fps *codableFingerprints) decode(buf []byte) { + r := bytes.NewReader(buf) + numFPs, err := binary.ReadVarint(r) + if err != nil { + panic(err) + } + *fps = make(codableFingerprints, numFPs) + + offset := len(buf) - r.Len() + for i, _ := range *fps { + (*fps)[i] = clientmodel.Fingerprint(binary.BigEndian.Uint64(buf[offset+i*8:])) + } +} + +type codableLabelPair metric.LabelPair + +func (lp codableLabelPair) encode() []byte { + buf := &bytes.Buffer{} + encodeString(buf, string(lp.Name)) + encodeString(buf, string(lp.Value)) + return buf.Bytes() +} + +func (lp *codableLabelPair) decode(buf []byte) { + r := bytes.NewReader(buf) + lp.Name = clientmodel.LabelName(decodeString(r)) + lp.Value = clientmodel.LabelValue(decodeString(r)) +} + +type codableLabelName clientmodel.LabelName + +func (l codableLabelName) encode() []byte { + buf := &bytes.Buffer{} + encodeString(buf, string(l)) + return buf.Bytes() +} + +func (l *codableLabelName) decode(buf []byte) { + r := bytes.NewReader(buf) + *l = codableLabelName(decodeString(r)) +} + +type codableLabelValues clientmodel.LabelValues + +func (vs codableLabelValues) encode() []byte { + buf := &bytes.Buffer{} + encodeVarint(buf, len(vs)) + for _, v := range vs { + encodeString(buf, string(v)) + } + return buf.Bytes() +} + +func (vs *codableLabelValues) decode(buf []byte) { + r := bytes.NewReader(buf) + numValues, err := binary.ReadVarint(r) + if err != nil { + panic(err) + } + *vs = make(codableLabelValues, numValues) + + for i, _ := range *vs { + (*vs)[i] = clientmodel.LabelValue(decodeString(r)) + } +} + +type codableMembership struct{} + +func (m codableMembership) encode() []byte { + return []byte{} +} + +func (m codableMembership) decode(buf []byte) {} diff --git a/storage/local/index/codec_test.go b/storage/local/index/codec_test.go new file mode 100644 index 0000000000..d6c9c2f21f --- /dev/null +++ b/storage/local/index/codec_test.go @@ -0,0 +1,112 @@ +package index + +import ( + "testing" + + clientmodel "github.com/prometheus/client_golang/model" +) + +func newCodableFingerprint(fp int64) *codableFingerprint { + cfp := codableFingerprint(fp) + return &cfp +} + +func newCodableLabelName(ln string) *codableLabelName { + cln := codableLabelName(ln) + return &cln +} + +func TestCodec(t *testing.T) { + scenarios := []struct { + in codable + out codable + equal func(in, out codable) bool + }{ + { + in: codableMetric{ + "label_1": "value_2", + "label_2": "value_2", + "label_3": "value_3", + }, + out: codableMetric{}, + equal: func(in, out codable) bool { + m1 := clientmodel.Metric(in.(codableMetric)) + m2 := clientmodel.Metric(out.(codableMetric)) + return m1.Equal(m2) + }, + }, { + in: newCodableFingerprint(12345), + out: newCodableFingerprint(0), + equal: func(in, out codable) bool { + return *in.(*codableFingerprint) == *out.(*codableFingerprint) + }, + }, { + in: &codableFingerprints{1, 2, 56, 1234}, + out: &codableFingerprints{}, + equal: func(in, out codable) bool { + fps1 := *in.(*codableFingerprints) + fps2 := *out.(*codableFingerprints) + if len(fps1) != len(fps2) { + return false + } + for i, _ := range fps1 { + if fps1[i] != fps2[i] { + return false + } + } + return true + }, + }, { + in: &codableLabelPair{ + Name: "label_name", + Value: "label_value", + }, + out: &codableLabelPair{}, + equal: func(in, out codable) bool { + lp1 := *in.(*codableLabelPair) + lp2 := *out.(*codableLabelPair) + return lp1 == lp2 + }, + }, { + in: newCodableLabelName("label_name"), + out: newCodableLabelName(""), + equal: func(in, out codable) bool { + ln1 := *in.(*codableLabelName) + ln2 := *out.(*codableLabelName) + return ln1 == ln2 + }, + }, { + in: &codableLabelValues{"value_1", "value_2", "value_3"}, + out: &codableLabelValues{}, + equal: func(in, out codable) bool { + lvs1 := *in.(*codableLabelValues) + lvs2 := *out.(*codableLabelValues) + if len(lvs1) != len(lvs2) { + return false + } + for i, _ := range lvs1 { + if lvs1[i] != lvs2[i] { + return false + } + } + return true + }, + }, { + in: &codableMembership{}, + out: &codableMembership{}, + equal: func(in, out codable) bool { + // We don't care about the membership value. Just test if the + // encoding/decoding works at all. + return true + }, + }, + } + + for i, s := range scenarios { + encoded := s.in.encode() + s.out.decode(encoded) + if !s.equal(s.in, s.out) { + t.Fatalf("%d. Got: %v; want %v; encoded bytes are: %v", i, s.out, s.in, encoded) + } + } +} diff --git a/storage/local/index/index.go b/storage/local/index/index.go new file mode 100644 index 0000000000..0948680679 --- /dev/null +++ b/storage/local/index/index.go @@ -0,0 +1,561 @@ +package index + +import ( + "io" + "sync" + + clientmodel "github.com/prometheus/client_golang/model" + + "github.com/prometheus/prometheus/storage/metric" + "github.com/prometheus/prometheus/utility" +) + +// FingerprintMetricMapping is an in-memory map of fingerprints to metrics. +type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric + +// FingerprintMetricIndex models a database mapping fingerprints to metrics. +type FingerprintMetricIndex struct { + KeyValueStore +} + +// IndexBatch indexes a batch of mappings from fingerprints to metrics. +func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error { + b := i.NewBatch() + + for fp, m := range mapping { + b.Put(codableFingerprint(fp), codableMetric(m)) + } + + return i.Commit(b) +} + +// UnindexBatch unindexes a batch of mappings from fingerprints to metrics. +func (i *FingerprintMetricIndex) UnindexBatch(mapping FingerprintMetricMapping) error { + b := i.NewBatch() + + for fp, _ := range mapping { + b.Delete(codableFingerprint(fp)) + } + + return i.Commit(b) +} + +// Lookup looks up a metric by fingerprint. +func (i *FingerprintMetricIndex) Lookup(fp clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) { + m = clientmodel.Metric{} + if ok, err := i.Get(codableFingerprint(fp), codableMetric(m)); !ok { + return nil, false, nil + } else if err != nil { + return nil, false, err + } + + return m, true, nil +} + +// NewFingerprintMetricIndex returns a FingerprintMetricIndex +// object ready to use. +func NewFingerprintMetricIndex(db KeyValueStore) *FingerprintMetricIndex { + return &FingerprintMetricIndex{ + KeyValueStore: db, + } +} + +// LabelNameLabelValuesMapping is an in-memory map of label names to +// label values. +type LabelNameLabelValuesMapping map[clientmodel.LabelName]clientmodel.LabelValues + +// LabelNameLabelValuesIndex models a database mapping label names to +// label values. +type LabelNameLabelValuesIndex struct { + KeyValueStore +} + +// IndexBatch implements LabelNameLabelValuesIndex. +func (i *LabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) error { + batch := i.NewBatch() + + for name, values := range b { + if len(values) == 0 { + batch.Delete(codableLabelName(name)) + } else { + batch.Put(codableLabelName(name), codableLabelValues(values)) + } + } + + return i.Commit(batch) +} + +// Lookup looks up all label values for a given label name. +func (i *LabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) { + ok, err = i.Get(codableLabelName(l), (*codableLabelValues)(&values)) + if err != nil { + return nil, false, err + } + if !ok { + return nil, false, nil + } + + return values, true, nil +} + +// NewLabelNameLabelValuesIndex returns a LabelNameLabelValuesIndex +// ready to use. +func NewLabelNameLabelValuesIndex(db KeyValueStore) *LabelNameLabelValuesIndex { + return &LabelNameLabelValuesIndex{ + KeyValueStore: db, + } +} + +// LabelPairFingerprintsMapping is an in-memory map of label pairs to +// fingerprints. +type LabelPairFingerprintsMapping map[metric.LabelPair]clientmodel.Fingerprints + +// LabelPairFingerprintIndex models a database mapping label pairs to +// fingerprints. +type LabelPairFingerprintIndex struct { + KeyValueStore +} + +// IndexBatch indexes a batch of mappings from label pairs to fingerprints. +func (i *LabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintsMapping) error { + batch := i.NewBatch() + + for pair, fps := range m { + if len(fps) == 0 { + batch.Delete(codableLabelPair(pair)) + } else { + batch.Put(codableLabelPair(pair), codableFingerprints(fps)) + } + } + + return i.Commit(batch) +} + +// Lookup looks up all fingerprints for a given label pair. +func (i *LabelPairFingerprintIndex) Lookup(p *metric.LabelPair) (fps clientmodel.Fingerprints, ok bool, err error) { + ok, err = i.Get((*codableLabelPair)(p), (*codableFingerprints)(&fps)) + if !ok { + return nil, false, nil + } + if err != nil { + return nil, false, err + } + + return fps, true, nil +} + +// NewLabelPairFingerprintIndex returns a LabelPairFingerprintIndex +// object ready to use. +func NewLabelPairFingerprintIndex(db KeyValueStore) *LabelPairFingerprintIndex { + return &LabelPairFingerprintIndex{ + KeyValueStore: db, + } +} + +// FingerprintMembershipIndex models a database tracking the existence +// of metrics by their fingerprints. +type FingerprintMembershipIndex struct { + KeyValueStore +} + +// IndexBatch indexes a batch of fingerprints. +func (i *FingerprintMembershipIndex) IndexBatch(b FingerprintMetricMapping) error { + batch := i.NewBatch() + + for fp, _ := range b { + batch.Put(codableFingerprint(fp), codableMembership{}) + } + + return i.Commit(batch) +} + +// UnindexBatch unindexes a batch of fingerprints. +func (i *FingerprintMembershipIndex) UnindexBatch(b FingerprintMetricMapping) error { + batch := i.NewBatch() + + for fp, _ := range b { + batch.Delete(codableFingerprint(fp)) + } + + return i.Commit(batch) +} + +// Has returns true if the given fingerprint is present. +func (i *FingerprintMembershipIndex) Has(fp clientmodel.Fingerprint) (ok bool, err error) { + return i.KeyValueStore.Has(codableFingerprint(fp)) +} + +// NewFingerprintMembershipIndex returns a FingerprintMembershipIndex object +// ready to use. +func NewFingerprintMembershipIndex(db KeyValueStore) *FingerprintMembershipIndex { + return &FingerprintMembershipIndex{ + KeyValueStore: db, + } +} + +// TODO(julius): Currently unused, is it needed? +// SynchronizedIndexer provides naive locking for any MetricIndexer. +type SynchronizedIndexer struct { + mu sync.Mutex + i MetricIndexer +} + +// IndexMetrics calls IndexMetrics of the wrapped MetricIndexer after acquiring +// a lock. +func (i *SynchronizedIndexer) IndexMetrics(b FingerprintMetricMapping) error { + i.mu.Lock() + defer i.mu.Unlock() + + return i.i.IndexMetrics(b) +} + +type flusher interface { + Flush() error +} + +// Flush calls Flush of the wrapped MetricIndexer after acquiring a lock. If the +// wrapped MetricIndexer has no Flush method, this is a no-op. +func (i *SynchronizedIndexer) Flush() error { + if flusher, ok := i.i.(flusher); ok { + i.mu.Lock() + defer i.mu.Unlock() + + return flusher.Flush() + } + + return nil +} + +// Close calls Close of the wrapped MetricIndexer after acquiring a lock. If the +// wrapped MetricIndexer has no Close method, this is a no-op. +func (i *SynchronizedIndexer) Close() error { + if closer, ok := i.i.(io.Closer); ok { + i.mu.Lock() + defer i.mu.Unlock() + + return closer.Close() + } + + return nil +} + +// NewSynchronizedIndexer returns a SynchronizedIndexer wrapping the given +// MetricIndexer. +func NewSynchronizedIndexer(i MetricIndexer) *SynchronizedIndexer { + return &SynchronizedIndexer{ + i: i, + } +} + +// BufferedIndexer provides unsynchronized index buffering. +type BufferedIndexer struct { + i MetricIndexer + limit int + buf []FingerprintMetricMapping +} + +// IndexMetrics writes the entries in the given FingerprintMetricMapping to the +// index. +func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error { + if len(i.buf) < i.limit { + i.buf = append(i.buf, b) + return nil + } + return i.Flush() +} + +// Flush writes all pending entries to the index. +func (i *BufferedIndexer) Flush() error { + if len(i.buf) == 0 { + return nil + } + + union := FingerprintMetricMapping{} + for _, b := range i.buf { + for fp, m := range b { + union[fp] = m + } + } + + i.buf = make([]FingerprintMetricMapping, 0, i.limit) + return i.i.IndexMetrics(union) +} + +// Close flushes and closes the underlying buffer. +func (i *BufferedIndexer) Close() error { + if err := i.Flush(); err != nil { + return err + } + + if closer, ok := i.i.(io.Closer); ok { + return closer.Close() + } + + return nil +} + +// NewBufferedIndexer returns a BufferedIndexer ready to use. +func NewBufferedIndexer(i MetricIndexer, limit int) *BufferedIndexer { + return &BufferedIndexer{ + i: i, + limit: limit, + buf: make([]FingerprintMetricMapping, 0, limit), + } +} + +// TotalIndexer is a MetricIndexer that indexes all standard facets of a metric +// that a user or the Prometheus subsystem would want to query against: +// +// -> +//