diff --git a/storage/metric/tiered/memory.go b/storage/metric/tiered/memory.go index 76b2ff1ada..2015a35147 100644 --- a/storage/metric/tiered/memory.go +++ b/storage/metric/tiered/memory.go @@ -17,6 +17,8 @@ import ( "sort" "sync" + "github.com/golang/glog" + clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/storage/metric" @@ -54,10 +56,26 @@ func (s *arrayStream) metric() clientmodel.Metric { return s.m } +// add implements the stream interface. This implementation requires both +// s.values and the passed in v to be sorted already. Values in v that have a +// timestamp older than the most recent value in s.values are skipped. func (s *arrayStream) add(v metric.Values) { s.Lock() defer s.Unlock() - + // Skip over values that are older than the most recent value in s. + if len(s.values) > 0 { + i := 0 + mostRecentTimestamp := s.values[len(s.values)-1].Timestamp + for ; i < len(v) && mostRecentTimestamp > v[i].Timestamp; i++ { + } + if i > 0 { + glog.Warningf( + "Skipped out-of-order values while adding to %#v: %#v", + s.m, v[:i], + ) + v = v[i:] + } + } s.values = append(s.values, v...) } diff --git a/storage/metric/tiered/memory_test.go b/storage/metric/tiered/memory_test.go index 19a157880e..1d45cbca43 100644 --- a/storage/metric/tiered/memory_test.go +++ b/storage/metric/tiered/memory_test.go @@ -15,6 +15,7 @@ package tiered import ( "fmt" + "reflect" "runtime" "sync" "testing" @@ -49,6 +50,55 @@ func BenchmarkStreamAdd(b *testing.B) { b.Logf("%d cycles with %f bytes per cycle, totalling %d", b.N, float32(post.TotalAlloc-pre.TotalAlloc)/float32(b.N), post.TotalAlloc-pre.TotalAlloc) } +func TestStreamAdd(t *testing.T) { + s := newArrayStream(clientmodel.Metric{}) + // Add empty to empty. + v := metric.Values{} + expected := metric.Values{} + s.add(v) + if got := s.values; !reflect.DeepEqual(expected, got) { + t.Fatalf("Expected values %#v in stream, got %#v.", expected, got) + } + // Add something to empty. + v = metric.Values{ + metric.SamplePair{Timestamp: 1, Value: -1}, + } + expected = append(expected, v...) + s.add(v) + if got := s.values; !reflect.DeepEqual(expected, got) { + t.Fatalf("Expected values %#v in stream, got %#v.", expected, got) + } + // Add something to something. + v = metric.Values{ + metric.SamplePair{Timestamp: 2, Value: -2}, + metric.SamplePair{Timestamp: 5, Value: -5}, + } + expected = append(expected, v...) + s.add(v) + if got := s.values; !reflect.DeepEqual(expected, got) { + t.Fatalf("Expected values %#v in stream, got %#v.", expected, got) + } + // Add something outdated to something. + v = metric.Values{ + metric.SamplePair{Timestamp: 3, Value: -3}, + metric.SamplePair{Timestamp: 4, Value: -4}, + } + s.add(v) + if got := s.values; !reflect.DeepEqual(expected, got) { + t.Fatalf("Expected values %#v in stream, got %#v.", expected, got) + } + // Add something partially outdated to something. + v = metric.Values{ + metric.SamplePair{Timestamp: 3, Value: -3}, + metric.SamplePair{Timestamp: 6, Value: -6}, + } + expected = append(expected, metric.SamplePair{Timestamp: 6, Value: -6}) + s.add(v) + if got := s.values; !reflect.DeepEqual(expected, got) { + t.Fatalf("Expected values %#v in stream, got %#v.", expected, got) + } +} + func benchmarkAppendSamples(b *testing.B, labels int) { b.StopTimer() s := NewMemorySeriesStorage(MemorySeriesOptions{})