From 4298bab2b053a9934182134c6b5f37eb05c215fe Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Thu, 2 May 2013 12:37:24 +0200 Subject: [PATCH] Publicize Curator and Processors. This commit publicizes the curation and processor frameworks for purposes of making them available in the main processor loop. --- storage/metric/curator.go | 19 ++++++------------- storage/metric/processor.go | 16 ++++++++-------- storage/metric/processor_test.go | 29 +++++++++++++++++------------ 3 files changed, 31 insertions(+), 33 deletions(-) diff --git a/storage/metric/curator.go b/storage/metric/curator.go index ac6c15febc..78b3cdcd75 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -47,7 +47,7 @@ type watermarkFilter struct { ignoreYoungerThan time.Duration // processor is the post-processor that performs whatever action is desired on // the data that is deemed valid to be worked on. - processor processor + processor Processor // stop functions as the global stop channel for all future operations. stop chan bool // stopAt is used to determine the elegibility of series for compaction. @@ -59,7 +59,7 @@ type watermarkFilter struct { // curator is responsible for effectuating a given curation policy across the // stored samples on-disk. This is useful to compact sparse sample values into // single sample entities to reduce keyspace load on the datastore. -type curator struct { +type Curator struct { // stop functions as a channel that when empty allows the curator to operate. // The moment a value is ingested inside of it, the curator goes into drain // mode. @@ -86,7 +86,7 @@ type watermarkOperator struct { ignoreYoungerThan time.Duration // processor is responsible for executing a given stategy on the // to-be-operated-on series. - processor processor + processor Processor // sampleIterator is a snapshotted iterator for the time series. sampleIterator leveldb.Iterator // samples @@ -95,20 +95,13 @@ type watermarkOperator struct { stopAt time.Time } -// newCurator builds a new curator for the given LevelDB databases. -func newCurator() curator { - return curator{ - stop: make(chan bool), - } -} - // run facilitates the curation lifecycle. // // recencyThreshold represents the most recent time up to which values will be // curated. // curationState is the on-disk store where the curation remarks are made for // how much progress has been made. -func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, processor processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) { +func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) { defer func(t time.Time) { duration := float64(time.Since(t)) @@ -173,7 +166,7 @@ func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, process // drain instructs the curator to stop at the next convenient moment as to not // introduce data inconsistencies. -func (c curator) drain() { +func (c Curator) Drain() { if len(c.stop) == 0 { c.stop <- true } @@ -211,7 +204,7 @@ func (w watermarkFilter) shouldStop() bool { return len(w.stop) != 0 } -func getCurationRemark(states raw.Persistence, processor processor, ignoreYoungerThan time.Duration, fingerprint model.Fingerprint) (remark *model.CurationRemark, err error) { +func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint model.Fingerprint) (remark *model.CurationRemark, err error) { rawSignature, err := processor.Signature() if err != nil { return diff --git a/storage/metric/processor.go b/storage/metric/processor.go index 8cc1bc8a9e..dc7881ec08 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -26,7 +26,7 @@ import ( // processor models a post-processing agent that performs work given a sample // corpus. -type processor interface { +type Processor interface { // Name emits the name of this processor's signature encoder. It must be // fully-qualified in the sense that it could be used via a Protocol Buffer // registry to extract the descriptor to reassemble this message. @@ -44,9 +44,9 @@ type processor interface { Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) } -// compactionProcessor combines sparse values in the database together such +// CompactionProcessor combines sparse values in the database together such // that at least MinimumGroupSize-sized chunks are grouped together. -type compactionProcessor struct { +type CompactionProcessor struct { // MaximumMutationPoolBatch represents approximately the largest pending // batch of mutation operations for the database before pausing to // commit before resumption. @@ -56,16 +56,16 @@ type compactionProcessor struct { // MinimumGroupSize represents the smallest allowed sample chunk size in the // database. MinimumGroupSize int - // signature is the byte representation of the compactionProcessor's settings, + // signature is the byte representation of the CompactionProcessor's settings, // used for purely memoization purposes across an instance. signature []byte } -func (p compactionProcessor) Name() string { +func (p CompactionProcessor) Name() string { return "io.prometheus.CompactionProcessorDefinition" } -func (p *compactionProcessor) Signature() (out []byte, err error) { +func (p *CompactionProcessor) Signature() (out []byte, err error) { if len(p.signature) == 0 { out, err = proto.Marshal(&dto.CompactionProcessorDefinition{ MinimumGroupSize: proto.Uint32(uint32(p.MinimumGroupSize)), @@ -79,11 +79,11 @@ func (p *compactionProcessor) Signature() (out []byte, err error) { return } -func (p compactionProcessor) String() string { +func (p CompactionProcessor) String() string { return fmt.Sprintf("compactionProcess for minimum group size %d", p.MinimumGroupSize) } -func (p compactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) { +func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) { var pendingBatch raw.Batch = nil defer func() { diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index 6502ff5f0d..26691fa8e8 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -29,7 +29,7 @@ type curationState struct { fingerprint string ignoreYoungerThan time.Duration lastCurated time.Time - processor processor + processor Processor } type watermarkState struct { @@ -48,7 +48,7 @@ type in struct { sampleGroups fixture.Pairs ignoreYoungerThan time.Duration groupSize uint32 - processor processor + processor Processor } type out struct { @@ -101,7 +101,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { }{ { in: in{ - processor: &compactionProcessor{ + processor: &CompactionProcessor{ MinimumGroupSize: 5, MaximumMutationPoolBatch: 15, }, @@ -112,7 +112,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { fingerprint: "0001-A-1-Z", ignoreYoungerThan: 1 * time.Hour, lastCurated: testInstant.Add(-1 * 30 * time.Minute), - processor: &compactionProcessor{ + processor: &CompactionProcessor{ MinimumGroupSize: 5, MaximumMutationPoolBatch: 15, }, @@ -121,7 +121,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { fingerprint: "0002-A-2-Z", ignoreYoungerThan: 1 * time.Hour, lastCurated: testInstant.Add(-1 * 90 * time.Minute), - processor: &compactionProcessor{ + processor: &CompactionProcessor{ MinimumGroupSize: 5, MaximumMutationPoolBatch: 15, }, @@ -129,7 +129,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { // This rule should effectively be ignored. curationState{ fingerprint: "0002-A-2-Z", - processor: &compactionProcessor{ + processor: &CompactionProcessor{ MinimumGroupSize: 2, MaximumMutationPoolBatch: 15, }, @@ -531,7 +531,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { fingerprint: "0001-A-1-Z", ignoreYoungerThan: time.Hour, lastCurated: testInstant.Add(-1 * 30 * time.Minute), - processor: &compactionProcessor{ + processor: &CompactionProcessor{ MinimumGroupSize: 5, MaximumMutationPoolBatch: 15, }, @@ -540,7 +540,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { fingerprint: "0002-A-2-Z", ignoreYoungerThan: 30 * time.Minute, lastCurated: testInstant.Add(-1 * 90 * time.Minute), - processor: &compactionProcessor{ + processor: &CompactionProcessor{ MinimumGroupSize: 2, MaximumMutationPoolBatch: 15, }, @@ -549,7 +549,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { fingerprint: "0002-A-2-Z", ignoreYoungerThan: time.Hour, lastCurated: testInstant.Add(-1 * 60 * time.Minute), - processor: &compactionProcessor{ + processor: &CompactionProcessor{ MinimumGroupSize: 5, MaximumMutationPoolBatch: 15, }, @@ -848,8 +848,14 @@ func TestCuratorCompactionProcessor(t *testing.T) { updates := make(chan CurationState, 100) defer close(updates) - c := newCurator() - err = c.run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates) + stop := make(chan bool) + defer close(stop) + + c := Curator{ + stop: stop, + } + + err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates) if err != nil { t.Fatal(err) } @@ -883,7 +889,6 @@ func TestCuratorCompactionProcessor(t *testing.T) { curationKey := model.NewCurationKeyFromDTO(curationKeyDto) actualCurationRemark := model.NewCurationRemarkFromDTO(curationValueDto) - signature, err := expected.processor.Signature() if err != nil { t.Fatal(err)