From de9a88b964e0baad49f2eee9185e68c86403664c Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Tue, 22 Apr 2014 14:09:25 +0200 Subject: [PATCH] Ensure temporal order in streams. BenchmarkAppendSample.* before this change: BenchmarkAppendSample1 1000000 1142 ns/op --- BENCH: BenchmarkAppendSample1 memory_test.go:81: 1 cycles with 9992.000000 bytes per cycle, totalling 9992 memory_test.go:81: 100 cycles with 250.399994 bytes per cycle, totalling 25040 memory_test.go:81: 10000 cycles with 239.428802 bytes per cycle, totalling 2394288 memory_test.go:81: 1000000 cycles with 255.504684 bytes per cycle, totalling 255504688 BenchmarkAppendSample10 500000 3823 ns/op --- BENCH: BenchmarkAppendSample10 memory_test.go:81: 1 cycles with 15536.000000 bytes per cycle, totalling 15536 memory_test.go:81: 100 cycles with 662.239990 bytes per cycle, totalling 66224 memory_test.go:81: 10000 cycles with 601.937622 bytes per cycle, totalling 6019376 memory_test.go:81: 500000 cycles with 598.582764 bytes per cycle, totalling 299291408 BenchmarkAppendSample100 50000 41111 ns/op --- BENCH: BenchmarkAppendSample100 memory_test.go:81: 1 cycles with 79824.000000 bytes per cycle, totalling 79824 memory_test.go:81: 100 cycles with 4924.479980 bytes per cycle, totalling 492448 memory_test.go:81: 10000 cycles with 4278.019043 bytes per cycle, totalling 42780192 memory_test.go:81: 50000 cycles with 4275.242676 bytes per cycle, totalling 213762144 BenchmarkAppendSample1000 5000 533933 ns/op --- BENCH: BenchmarkAppendSample1000 memory_test.go:81: 1 cycles with 840224.000000 bytes per cycle, totalling 840224 memory_test.go:81: 100 cycles with 62789.281250 bytes per cycle, totalling 6278928 memory_test.go:81: 5000 cycles with 55208.601562 bytes per cycle, totalling 276043008 ok github.com/prometheus/prometheus/storage/metric/tiered 27.828s BenchmarkAppendSample.* after this change: BenchmarkAppendSample1 1000000 1109 ns/op --- BENCH: BenchmarkAppendSample1 memory_test.go:131: 1 cycles with 9992.000000 bytes per cycle, totalling 9992 memory_test.go:131: 100 cycles with 250.399994 bytes per cycle, totalling 25040 memory_test.go:131: 10000 cycles with 239.220795 bytes per cycle, totalling 2392208 memory_test.go:131: 1000000 cycles with 255.492630 bytes per cycle, totalling 255492624 BenchmarkAppendSample10 500000 3663 ns/op --- BENCH: BenchmarkAppendSample10 memory_test.go:131: 1 cycles with 15536.000000 bytes per cycle, totalling 15536 memory_test.go:131: 100 cycles with 662.239990 bytes per cycle, totalling 66224 memory_test.go:131: 10000 cycles with 601.889587 bytes per cycle, totalling 6018896 memory_test.go:131: 500000 cycles with 598.550903 bytes per cycle, totalling 299275472 BenchmarkAppendSample100 50000 40694 ns/op --- BENCH: BenchmarkAppendSample100 memory_test.go:131: 1 cycles with 78976.000000 bytes per cycle, totalling 78976 memory_test.go:131: 100 cycles with 4928.319824 bytes per cycle, totalling 492832 memory_test.go:131: 10000 cycles with 4277.961426 bytes per cycle, totalling 42779616 memory_test.go:131: 50000 cycles with 4275.054199 bytes per cycle, totalling 213752720 BenchmarkAppendSample1000 5000 530744 ns/op --- BENCH: BenchmarkAppendSample1000 memory_test.go:131: 1 cycles with 842192.000000 bytes per cycle, totalling 842192 memory_test.go:131: 100 cycles with 62765.441406 bytes per cycle, totalling 6276544 memory_test.go:131: 5000 cycles with 55209.812500 bytes per cycle, totalling 276049056 ok github.com/prometheus/prometheus/storage/metric/tiered 27.468s Change-Id: Idaa339cd83539b5e4391614541a2c3a04002d66d --- storage/metric/tiered/memory.go | 20 ++++++++++- storage/metric/tiered/memory_test.go | 50 ++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/storage/metric/tiered/memory.go b/storage/metric/tiered/memory.go index 76b2ff1ada..2015a35147 100644 --- a/storage/metric/tiered/memory.go +++ b/storage/metric/tiered/memory.go @@ -17,6 +17,8 @@ import ( "sort" "sync" + "github.com/golang/glog" + clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/storage/metric" @@ -54,10 +56,26 @@ func (s *arrayStream) metric() clientmodel.Metric { return s.m } +// add implements the stream interface. This implementation requires both +// s.values and the passed in v to be sorted already. Values in v that have a +// timestamp older than the most recent value in s.values are skipped. func (s *arrayStream) add(v metric.Values) { s.Lock() defer s.Unlock() - + // Skip over values that are older than the most recent value in s. + if len(s.values) > 0 { + i := 0 + mostRecentTimestamp := s.values[len(s.values)-1].Timestamp + for ; i < len(v) && mostRecentTimestamp > v[i].Timestamp; i++ { + } + if i > 0 { + glog.Warningf( + "Skipped out-of-order values while adding to %#v: %#v", + s.m, v[:i], + ) + v = v[i:] + } + } s.values = append(s.values, v...) } diff --git a/storage/metric/tiered/memory_test.go b/storage/metric/tiered/memory_test.go index 19a157880e..1d45cbca43 100644 --- a/storage/metric/tiered/memory_test.go +++ b/storage/metric/tiered/memory_test.go @@ -15,6 +15,7 @@ package tiered import ( "fmt" + "reflect" "runtime" "sync" "testing" @@ -49,6 +50,55 @@ func BenchmarkStreamAdd(b *testing.B) { b.Logf("%d cycles with %f bytes per cycle, totalling %d", b.N, float32(post.TotalAlloc-pre.TotalAlloc)/float32(b.N), post.TotalAlloc-pre.TotalAlloc) } +func TestStreamAdd(t *testing.T) { + s := newArrayStream(clientmodel.Metric{}) + // Add empty to empty. + v := metric.Values{} + expected := metric.Values{} + s.add(v) + if got := s.values; !reflect.DeepEqual(expected, got) { + t.Fatalf("Expected values %#v in stream, got %#v.", expected, got) + } + // Add something to empty. + v = metric.Values{ + metric.SamplePair{Timestamp: 1, Value: -1}, + } + expected = append(expected, v...) + s.add(v) + if got := s.values; !reflect.DeepEqual(expected, got) { + t.Fatalf("Expected values %#v in stream, got %#v.", expected, got) + } + // Add something to something. + v = metric.Values{ + metric.SamplePair{Timestamp: 2, Value: -2}, + metric.SamplePair{Timestamp: 5, Value: -5}, + } + expected = append(expected, v...) + s.add(v) + if got := s.values; !reflect.DeepEqual(expected, got) { + t.Fatalf("Expected values %#v in stream, got %#v.", expected, got) + } + // Add something outdated to something. + v = metric.Values{ + metric.SamplePair{Timestamp: 3, Value: -3}, + metric.SamplePair{Timestamp: 4, Value: -4}, + } + s.add(v) + if got := s.values; !reflect.DeepEqual(expected, got) { + t.Fatalf("Expected values %#v in stream, got %#v.", expected, got) + } + // Add something partially outdated to something. + v = metric.Values{ + metric.SamplePair{Timestamp: 3, Value: -3}, + metric.SamplePair{Timestamp: 6, Value: -6}, + } + expected = append(expected, metric.SamplePair{Timestamp: 6, Value: -6}) + s.add(v) + if got := s.values; !reflect.DeepEqual(expected, got) { + t.Fatalf("Expected values %#v in stream, got %#v.", expected, got) + } +} + func benchmarkAppendSamples(b *testing.B, labels int) { b.StopTimer() s := NewMemorySeriesStorage(MemorySeriesOptions{})