From c3e3460ca6e23ef8f9041150591c51b883fcbacd Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 5 Apr 2013 13:07:13 +0200 Subject: [PATCH] Spin up curator run in the tests. After this commit, we'll need to add validations that it does the desired work, which we presently know that it doesn't. Given the changes I made with a plethora of renamings, I want to commit this now before it gets even larger. --- coding/protocol_buffer.go | 13 +++-- storage/metric/.gitignore | 1 + storage/metric/curator.go | 63 ++++++++++++---------- storage/metric/curator_test.go | 79 ++++++++++++++++------------ storage/metric/frontier.go | 4 +- storage/metric/leveldb.go | 32 +++++------ storage/metric/tiered.go | 2 +- storage/raw/index/leveldb/leveldb.go | 2 +- storage/raw/leveldb/test/fixtures.go | 16 +----- 9 files changed, 112 insertions(+), 100 deletions(-) create mode 100644 storage/metric/.gitignore diff --git a/coding/protocol_buffer.go b/coding/protocol_buffer.go index 273395207e..19c8bef150 100644 --- a/coding/protocol_buffer.go +++ b/coding/protocol_buffer.go @@ -15,13 +15,14 @@ package coding import ( "code.google.com/p/goprotobuf/proto" + "fmt" ) -type ProtocolBufferEncoder struct { +type ProtocolBuffer struct { message proto.Message } -func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { +func (p ProtocolBuffer) Encode() (raw []byte, err error) { raw, err = proto.Marshal(p.message) // XXX: Adjust legacy users of this to not check for error. @@ -32,8 +33,12 @@ func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { return } -func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder { - return &ProtocolBufferEncoder{ +func (p ProtocolBuffer) String() string { + return fmt.Sprintf("ProtocolBufferEncoder of %s", p.message) +} + +func NewProtocolBuffer(message proto.Message) *ProtocolBuffer { + return &ProtocolBuffer{ message: message, } } diff --git a/storage/metric/.gitignore b/storage/metric/.gitignore new file mode 100644 index 0000000000..3460f0346d --- /dev/null +++ b/storage/metric/.gitignore @@ -0,0 +1 @@ +command-line-arguments.test diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 62918dc430..592f4ae876 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -37,8 +37,9 @@ type curator struct { // watermarks is the on-disk store that is scanned for high watermarks for // given metrics. watermarks raw.Persistence - // cutOff represents the most recent time up to which values will be curated. - cutOff time.Time + // recencyThreshold represents the most recent time up to which values will be + // curated. + recencyThreshold time.Duration // groupingQuantity represents the number of samples below which encountered // samples will be dismembered and reaggregated into larger groups. groupingQuantity uint32 @@ -48,9 +49,9 @@ type curator struct { } // newCurator builds a new curator for the given LevelDB databases. -func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { +func newCurator(recencyThreshold time.Duration, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { return curator{ - cutOff: cutOff, + recencyThreshold: recencyThreshold, stop: make(chan bool), samples: samples, curationState: curationState, @@ -60,19 +61,19 @@ func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, sample } // run facilitates the curation lifecycle. -func (c curator) run() (err error) { - var ( - decoder watermarkDecoder - filter = watermarkFilter{ - stop: c.stop, - curationState: c.curationState, - } - operator = watermarkOperator{ - olderThan: c.cutOff, - groupSize: c.groupingQuantity, - curationState: c.curationState, - } - ) +func (c curator) run(instant time.Time) (err error) { + decoder := watermarkDecoder{} + filter := watermarkFilter{ + stop: c.stop, + curationState: c.curationState, + groupSize: c.groupingQuantity, + recencyThreshold: c.recencyThreshold, + } + operator := watermarkOperator{ + olderThan: instant.Add(-1 * c.recencyThreshold), + groupSize: c.groupingQuantity, + curationState: c.curationState, + } _, err = c.watermarks.ForEach(decoder, filter, operator) @@ -126,24 +127,28 @@ func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err erro // watermarkFilter determines whether to include or exclude candidate // values from the curation process by virtue of how old the high watermark is. type watermarkFilter struct { - // curationState is the table of CurationKey to CurationValues that remark on + // curationState is the table of CurationKey to CurationValues that rema // far along the curation process has gone for a given metric fingerprint. curationState raw.Persistence // stop, when non-empty, instructs the filter to stop operation. stop chan bool + // groupSize refers to the target groupSize from the curator. + groupSize uint32 + // recencyThreshold refers to the target recencyThreshold from the curator. + recencyThreshold time.Duration } func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) { - var ( - fingerprint = key.(model.Fingerprint) - watermark = value.(model.Watermark) - curationKey = fingerprint.ToDTO() - rawCurationValue []byte - err error - curationValue = &dto.CurationValue{} - ) + fingerprint := key.(model.Fingerprint) + watermark := value.(model.Watermark) + curationKey := &dto.CurationKey{ + Fingerprint: fingerprint.ToDTO(), + MinimumGroupSize: proto.Uint32(w.groupSize), + OlderThan: proto.Int64(int64(w.recencyThreshold)), + } + curationValue := &dto.CurationValue{} - rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + rawCurationValue, err := w.curationState.Get(coding.NewProtocolBuffer(curationKey)) if err != nil { panic(err) } @@ -229,7 +234,7 @@ func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, er MinimumGroupSize: proto.Uint32(w.groupSize), } - curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey)) + curated, err = w.curationState.Has(coding.NewProtocolBuffer(curationKey)) return } @@ -247,7 +252,7 @@ func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark mod } ) - rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + rawValue, err = w.curationState.Get(coding.NewProtocolBuffer(curationKey)) if err != nil { return } diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go index 7757ae4f53..c950ffb42f 100644 --- a/storage/metric/curator_test.go +++ b/storage/metric/curator_test.go @@ -27,10 +27,10 @@ import ( type ( curationState struct { - fingerprint string - groupSize int - olderThan time.Duration - lastCurated time.Time + fingerprint string + groupSize int + recencyThreshold time.Duration + lastCurated time.Time } watermarkState struct { @@ -48,21 +48,23 @@ type ( values []sample } - context struct { - curationStates fixture.Pairs - watermarkStates fixture.Pairs - sampleGroups fixture.Pairs + in struct { + curationStates fixture.Pairs + watermarkStates fixture.Pairs + sampleGroups fixture.Pairs + recencyThreshold time.Duration + groupSize uint32 } ) func (c curationState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(&dto.CurationKey{ + key = coding.NewProtocolBuffer(&dto.CurationKey{ Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(), MinimumGroupSize: proto.Uint32(uint32(c.groupSize)), - OlderThan: proto.Int64(int64(c.olderThan)), + OlderThan: proto.Int64(int64(c.recencyThreshold)), }) - value = coding.NewProtocolBufferEncoder(&dto.CurationValue{ + value = coding.NewProtocolBuffer(&dto.CurationValue{ LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()), }) @@ -70,13 +72,13 @@ func (c curationState) Get() (key, value coding.Encoder) { } func (w watermarkState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) - value = coding.NewProtocolBufferEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) + key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) + value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) return } func (s sampleGroup) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(&dto.SampleKey{ + key = coding.NewProtocolBuffer(&dto.SampleKey{ Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(), Timestamp: indexable.EncodeTime(s.values[0].time), LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()), @@ -92,7 +94,7 @@ func (s sampleGroup) Get() (key, value coding.Encoder) { }) } - value = coding.NewProtocolBufferEncoder(series) + value = coding.NewProtocolBuffer(series) return } @@ -100,22 +102,31 @@ func (s sampleGroup) Get() (key, value coding.Encoder) { func TestCurator(t *testing.T) { var ( scenarios = []struct { - context context + in in }{ { - context: context{ + in: in{ + recencyThreshold: 1 * time.Hour, + groupSize: 5, curationStates: fixture.Pairs{ curationState{ - fingerprint: "0001-A-1-Z", - groupSize: 5, - olderThan: 1 * time.Hour, - lastCurated: testInstant.Add(-1 * 30 * time.Minute), + fingerprint: "0001-A-1-Z", + groupSize: 5, + recencyThreshold: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), }, curationState{ - fingerprint: "0002-A-2-Z", - groupSize: 5, - olderThan: 1 * time.Hour, - lastCurated: testInstant.Add(-1 * 90 * time.Minute), + fingerprint: "0002-A-2-Z", + groupSize: 5, + recencyThreshold: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + }, + // This rule should effectively be ignored. + curationState{ + fingerprint: "0002-A-2-Z", + groupSize: 2, + recencyThreshold: 30 * time.Minute, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), }, }, watermarkStates: fixture.Pairs{ @@ -124,7 +135,7 @@ func TestCurator(t *testing.T) { lastAppended: testInstant.Add(-1 * 15 * time.Minute), }, watermarkState{ - fingerprint: "0002-A-1-Z", + fingerprint: "0002-A-2-Z", lastAppended: testInstant.Add(-1 * 15 * time.Minute), }, }, @@ -479,26 +490,26 @@ func TestCurator(t *testing.T) { ) for _, scenario := range scenarios { - curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates)) + curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates)) defer curatorDirectory.Close() - watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates)) + watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates)) defer watermarkDirectory.Close() - sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups)) + sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) defer sampleDirectory.Close() - curatorState, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) if err != nil { t.Fatal(err) } - defer curatorState.Close() + defer curatorStates.Close() - watermarkState, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) if err != nil { t.Fatal(err) } - defer watermarkState.Close() + defer watermarkStates.Close() samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) if err != nil { @@ -506,5 +517,7 @@ func TestCurator(t *testing.T) { } defer samples.Close() + c := newCurator(scenario.in.recencyThreshold, scenario.in.groupSize, curatorStates, samples, watermarkStates) + c.run(testInstant) } } diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index 9c4183902d..ecc5c93215 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -106,7 +106,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) Timestamp: upperSeek, } - raw, err := coding.NewProtocolBufferEncoder(key).Encode() + raw, err := coding.NewProtocolBuffer(key).Encode() if err != nil { panic(err) } @@ -151,7 +151,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) key.Timestamp = lowerSeek - raw, err = coding.NewProtocolBufferEncoder(key).Encode() + raw, err = coding.NewProtocolBuffer(key).Encode() if err != nil { panic(err) } diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 654ae978b9..d1d59f98d1 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -339,7 +339,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } err = l.labelNameToFingerprints.Commit(batch) @@ -414,7 +414,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } err = l.labelSetToFingerprints.Commit(batch) @@ -442,8 +442,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri defer batch.Close() for fingerprint, metric := range metrics { - key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO()) - value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + key := coding.NewProtocolBuffer(fingerprint.ToDTO()) + value := coding.NewProtocolBuffer(model.MetricToDTO(metric)) batch.Put(key, value) } @@ -528,7 +528,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri // WART: We should probably encode simple fingerprints. for _, metric := range absentMetrics { - key := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + key := coding.NewProtocolBuffer(model.MetricToDTO(metric)) batch.Put(key, key) } @@ -563,7 +563,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger value = &dto.MetricHighWatermark{} raw []byte newestSampleTimestamp = samples[len(samples)-1].Timestamp - keyEncoded = coding.NewProtocolBufferEncoder(key) + keyEncoded = coding.NewProtocolBuffer(key) ) key.Signature = proto.String(fingerprint.ToRowKey()) @@ -585,7 +585,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger } } value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) - batch.Put(keyEncoded, coding.NewProtocolBufferEncoder(value)) + batch.Put(keyEncoded, coding.NewProtocolBuffer(value)) mutationCount++ } @@ -661,7 +661,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err }) } - samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + samplesBatch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } } @@ -752,7 +752,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.metricMembershipIndex.Has(dtoKey) return @@ -767,7 +767,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelSetToFingerprints.Has(dtoKey) return @@ -782,7 +782,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelNameToFingerprints.Has(dtoKey) return @@ -800,7 +800,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab sets := []utility.Set{} for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) { - f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)) + f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBuffer(labelSetDTO)) if err != nil { return fps, err } @@ -847,7 +847,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }() - raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(&labelName))) + raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName))) if err != nil { return } @@ -876,7 +876,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }() - raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))) + raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f))) if err != nil { return } @@ -958,7 +958,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T Timestamp: indexable.EncodeTime(t), } - e, err := coding.NewProtocolBufferEncoder(k).Encode() + e, err := coding.NewProtocolBuffer(k).Encode() if err != nil { return } @@ -1161,7 +1161,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model. Timestamp: indexable.EncodeTime(i.OldestInclusive), } - e, err := coding.NewProtocolBufferEncoder(k).Encode() + e, err := coding.NewProtocolBuffer(k).Encode() if err != nil { return } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index c8dc40d1e3..daad234a84 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -469,7 +469,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier } // Try seeking to target key. - rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode() + rawKey, _ := coding.NewProtocolBuffer(targetKey).Encode() iterator.Seek(rawKey) foundKey, err := extractSampleKey(iterator) diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index e00152d3da..a877a6d87a 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -21,7 +21,7 @@ import ( ) var ( - existenceValue = coding.NewProtocolBufferEncoder(&dto.MembershipIndexValue{}) + existenceValue = coding.NewProtocolBuffer(&dto.MembershipIndexValue{}) ) type LevelDBMembershipIndex struct { diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index 0499b8daca..2cb8d35d9c 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -15,7 +15,6 @@ package test import ( "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility/test" ) @@ -64,13 +63,7 @@ type ( func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) { t = test.NewTemporaryDirectory(n, p.tester) - - var ( - persistence raw.Persistence - err error - ) - - persistence, err = leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) + persistence, err := leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) if err != nil { defer t.Close() p.tester.Fatal(err) @@ -83,12 +76,7 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory }() for f.HasNext() { - var ( - key coding.Encoder - value coding.Encoder - ) - - key, value = f.Next() + key, value := f.Next() err = persistence.Put(key, value) if err != nil {