diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go index f0e755839c..b58976c911 100644 --- a/tsdb/exemplar.go +++ b/tsdb/exemplar.go @@ -36,10 +36,11 @@ const ( ) type CircularExemplarStorage struct { - lock sync.RWMutex - exemplars []circularBufferEntry - nextIndex int - metrics *ExemplarMetrics + lock sync.RWMutex + exemplars []circularBufferEntry + nextIndex int + metrics *ExemplarMetrics + oooTimeWindowMillis int64 // Map of series labels as a string to index entry, which points to the first // and last exemplar for the series in the exemplars circular buffer. @@ -55,6 +56,7 @@ type indexEntry struct { type circularBufferEntry struct { exemplar exemplar.Exemplar next int + prev int ref *indexEntry } @@ -115,15 +117,19 @@ func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics { // If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in // 1GB of extra memory, accounting for the fact that this is heap allocated space. // If len <= 0, then the exemplar storage is essentially a noop storage but can later be -// resized to store exemplars. -func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStorage, error) { +// resized to store exemplars. If oooTimeWindowMillis <= 0, out-of-order exemplars are disabled. +func NewCircularExemplarStorage(length int64, m *ExemplarMetrics, oooTimeWindowMillis int64) (ExemplarStorage, error) { if length < 0 { length = 0 } + if oooTimeWindowMillis < 0 { + oooTimeWindowMillis = 0 + } c := &CircularExemplarStorage{ - exemplars: make([]circularBufferEntry, length), - index: make(map[string]*indexEntry, length/estimatedExemplarsPerSeries), - metrics: m, + exemplars: make([]circularBufferEntry, length), + index: make(map[string]*indexEntry, length/estimatedExemplarsPerSeries), + metrics: m, + oooTimeWindowMillis: oooTimeWindowMillis, } c.metrics.maxExemplars.Set(float64(length)) @@ -171,6 +177,9 @@ func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*label } se.SeriesLabels = idx.seriesLabels + // TODO: Since we maintain a doubly-linked-list, we can also iterate from head to tail + // which might be more performant if the selected interval is skewed to the head. + // Loop through all exemplars in the circular buffer for the current series. for e.exemplar.Ts <= end { if e.exemplar.Ts >= start { @@ -253,16 +262,12 @@ func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar. return storage.ErrDuplicateExemplar } - // Since during the scrape the exemplars are sorted first by timestamp, then value, then labels, - // if any of these conditions are true, we know that the exemplar is either a duplicate - // of a previous one (but not the most recent one as that is checked above) or out of order. - // We now allow exemplars with duplicate timestamps as long as they have different values and/or labels - // since that can happen for different buckets of a native histogram. - // We do not distinguish between duplicates and out of order as iterating through the exemplars - // to check for that would be expensive (versus just comparing with the most recent one) especially - // since this is run under a lock, and not worth it as we just need to return an error so we do not - // append the exemplar. - if e.Ts < newestExemplar.Ts || + // Reject exemplars older than the OOO time window relative to the newest exemplar. + // Exemplars with the same timestamp are ordered by value then label hash to detect + // duplicates without iterating through all stored exemplars, which would be too + // expensive under lock. Exemplars with equal timestamps but different values or + // labels are allowed to support multiple buckets of native histograms. + if (e.Ts < newestExemplar.Ts && e.Ts <= newestExemplar.Ts-ce.oooTimeWindowMillis) || (e.Ts == newestExemplar.Ts && e.Value < newestExemplar.Value) || (e.Ts == newestExemplar.Ts && e.Value == newestExemplar.Value && e.Labels.Hash() < newestExemplar.Labels.Hash()) { if appended { @@ -273,8 +278,19 @@ func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar. return nil } -// Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it. -// Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed. +// SetOutOfOrderTimeWindow sets the out-of-order time window for exemplars in +// milliseconds. Exemplars older than it are not added to the circular exemplar +// buffer. +func (ce *CircularExemplarStorage) SetOutOfOrderTimeWindow(d int64) { + ce.lock.Lock() + defer ce.lock.Unlock() + ce.oooTimeWindowMillis = d +} + +// Resize changes the size of exemplar buffer by allocating a new buffer and +// migrating data to it. Exemplars are kept when possible. Shrinking will discard +// old data (in order of ingestion) as needed. Returns the number of migrated +// exemplars. func (ce *CircularExemplarStorage) Resize(l int64) int { // Accept negative values as just 0 size. if l <= 0 { @@ -284,65 +300,83 @@ func (ce *CircularExemplarStorage) Resize(l int64) int { ce.lock.Lock() defer ce.lock.Unlock() - if l == int64(len(ce.exemplars)) { - return 0 - } - - oldBuffer := ce.exemplars - oldNextIndex := int64(ce.nextIndex) - - ce.exemplars = make([]circularBufferEntry, l) - ce.index = make(map[string]*indexEntry, l/estimatedExemplarsPerSeries) - ce.nextIndex = 0 - - // Replay as many entries as needed, starting with oldest first. - count := min(l, int64(len(oldBuffer))) - + oldSize := int64(len(ce.exemplars)) migrated := 0 - - if l > 0 && len(oldBuffer) > 0 { - // Rewind previous next index by count with wrap-around. - // This math is essentially looking at nextIndex, where we would write the next exemplar to, - // and find the index in the old exemplar buffer that we should start migrating exemplars from. - // This way we don't migrate exemplars that would just be overwritten when migrating later exemplars. - startIndex := (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer)) - - var buf [1024]byte - for i := range count { - idx := (startIndex + i) % int64(len(oldBuffer)) - if oldBuffer[idx].ref != nil { - ce.migrate(&oldBuffer[idx], buf[:]) - migrated++ - } - } + switch { + case l == oldSize: + // NOOP. + return migrated + case l > oldSize: + migrated = ce.grow(l) + case l < oldSize: + migrated = ce.shrink(l) } ce.computeMetrics() ce.metrics.maxExemplars.Set(float64(l)) - return migrated } -// migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires -// external lock and does not compute metrics. -func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry, buf []byte) { - seriesLabels := entry.ref.seriesLabels.Bytes(buf[:0]) - - idx, ok := ce.index[string(seriesLabels)] - if !ok { - idx = entry.ref - idx.oldest = ce.nextIndex - ce.index[string(seriesLabels)] = idx - } else { - entry.ref = idx - ce.exemplars[idx.newest].next = ce.nextIndex +// grow the circular buffer to have size l by allocating a new slice and copying +// the old data to it. After growing, ce.nextIndex points to the next free entry +// in the buffer. This function must be called with the lock acquired. +func (ce *CircularExemplarStorage) grow(l int64) int { + oldSize := len(ce.exemplars) + newSlice := make([]circularBufferEntry, l) + ranges := []intRange{ + {from: ce.nextIndex, to: oldSize}, + {from: 0, to: ce.nextIndex}, } - idx.newest = ce.nextIndex + ce.nextIndex = copyExemplarRanges(ce.index, newSlice, ce.exemplars, ranges) + ce.exemplars = newSlice + return oldSize +} - entry.next = noExemplar - ce.exemplars[ce.nextIndex] = *entry +// shrink the circular buffer by either trimming from the right or deleting the +// oldest samples to accommodate the new size l. This function must be called +// with the lock acquired. +func (ce *CircularExemplarStorage) shrink(l int64) (migrated int) { + oldSize := len(ce.exemplars) + diff := int(int64(oldSize) - l) + deleteStart := ce.nextIndex + deleteEnd := (deleteStart + diff) % oldSize - ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) + // Remove items from the buffer starting from c.nextIndex. This drops older + // entries first in the order of ingestion. + for i := range diff { + idx := (deleteStart + i) % oldSize + ref := ce.exemplars[idx].ref + if ce.removeExemplar(&ce.exemplars[idx]) { + ce.removeIndex(ref) + } + } + + newSlice := make([]circularBufferEntry, int(l)) + + switch { + case deleteStart == deleteEnd: + // The entire buffer was cleared (shrink to zero). Note that we don't have to + // delete the index since removeExemplar already did. Simply remove all elements + // and reset tracking pointers. + ce.exemplars = newSlice + ce.nextIndex = 0 + return 0 + case deleteStart < deleteEnd: + // We delete an "inner" section of the circular buffer. + migrated = copyExemplarRanges(ce.index, newSlice, ce.exemplars, []intRange{ + {from: deleteEnd, to: oldSize}, + {from: 0, to: deleteStart}, + }) + case deleteStart > deleteEnd: + // We keep an "inner" section of the circular buffer. + migrated = copyExemplarRanges(ce.index, newSlice, ce.exemplars, []intRange{ + {from: deleteEnd, to: deleteStart}, + }) + } + + ce.nextIndex = migrated % int(l) + ce.exemplars = newSlice + return migrated } func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { @@ -358,7 +392,7 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp var buf [1024]byte seriesLabels := l.Bytes(buf[:]) - idx, ok := ce.index[string(seriesLabels)] + idx, indexExists := ce.index[string(seriesLabels)] err := ce.validateExemplar(idx, e, true) if err != nil { if errors.Is(err, storage.ErrDuplicateExemplar) { @@ -368,32 +402,77 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp return err } - if !ok { - idx = &indexEntry{oldest: ce.nextIndex, seriesLabels: l} - ce.index[string(seriesLabels)] = idx - } else { - ce.exemplars[idx.newest].next = ce.nextIndex - } - - if prev := &ce.exemplars[ce.nextIndex]; prev.ref != nil { - // There exists an exemplar already on this ce.nextIndex entry, - // drop it, to make place for others. - if prev.next == noExemplar { - // Last item for this series, remove index entry. - var buf [1024]byte - prevLabels := prev.ref.seriesLabels.Bytes(buf[:]) - delete(ce.index, string(prevLabels)) - } else { - prev.ref.oldest = prev.next + // If we insert an out-of-order exemplar, we preemptively find the insertion + // index to check for duplicates. + var insertionIndex int + if indexExists { + outOfOrder := e.Ts >= ce.exemplars[idx.oldest].exemplar.Ts && e.Ts < ce.exemplars[idx.newest].exemplar.Ts + if outOfOrder { + insertionIndex = ce.findInsertionIndex(e, idx) + if ce.exemplars[insertionIndex].exemplar.Ts == e.Ts { + // Assume duplicate exemplar, noop. + // Native histograms will exercise this code path a lot due to + // having multiple exemplars per series so checking the + // value and labels would be too expensive. + return nil + } } } - // Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select) - // since this is the first exemplar stored for this series. - ce.exemplars[ce.nextIndex].next = noExemplar + // If the index didn't exist (new series), create one. + if !indexExists { + idx = &indexEntry{seriesLabels: l} + ce.index[string(seriesLabels)] = idx + } + + // Remove entries if the buffer is full. Note that this doesn't invalidate the + // insertion index since out-of-order exemplars cannot be the oldest exemplar. + if prev := &ce.exemplars[ce.nextIndex]; prev.ref != nil { + prevRef := prev.ref + if ce.removeExemplar(prev) { + if prevRef == idx { + // Do not delete the indexEntry we're inserting to. + indexExists = false + } else { + ce.removeIndex(prevRef) + } + } + } + + // We create a new entry in the linked list. ce.exemplars[ce.nextIndex].exemplar = e ce.exemplars[ce.nextIndex].ref = idx - idx.newest = ce.nextIndex + + switch { + case !indexExists: + // Add the first and only exemplar to the list. + idx.oldest = ce.nextIndex + idx.newest = ce.nextIndex + ce.exemplars[ce.nextIndex].prev = noExemplar + ce.exemplars[ce.nextIndex].next = noExemplar + case e.Ts >= ce.exemplars[idx.newest].exemplar.Ts: + // Add the exemplar at the tip (after newest). + ce.exemplars[idx.newest].next = ce.nextIndex + ce.exemplars[ce.nextIndex].prev = idx.newest + ce.exemplars[ce.nextIndex].next = noExemplar + idx.newest = ce.nextIndex + case e.Ts < ce.exemplars[idx.oldest].exemplar.Ts: + // Add the exemplar at the tail (before oldest). + ce.exemplars[idx.oldest].prev = ce.nextIndex + ce.exemplars[ce.nextIndex].prev = noExemplar + ce.exemplars[ce.nextIndex].next = idx.oldest + idx.oldest = ce.nextIndex + default: + // Insert the exemplar into the list by finding the most recent + // in-order exemplar that precedes it, and placing it after. + nextExemplar := ce.exemplars[insertionIndex].next + ce.exemplars[ce.nextIndex].prev = insertionIndex + ce.exemplars[ce.nextIndex].next = nextExemplar + ce.exemplars[insertionIndex].next = ce.nextIndex + if nextExemplar != noExemplar { + ce.exemplars[nextExemplar].prev = ce.nextIndex + } + } ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) @@ -402,6 +481,56 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp return nil } +// removeExemplar removes the given entry from the circular buffer. Returns true +// iff the deleted entry was the last entry (and the index is now empty). +// This function must be called with the lock acquired. +func (ce *CircularExemplarStorage) removeExemplar(entry *circularBufferEntry) bool { + ref := entry.ref + if ref == nil { + return false + } + + if entry.prev != noExemplar { + ce.exemplars[entry.prev].next = entry.next + } else { + ref.oldest = entry.next + } + + if entry.next != noExemplar { + ce.exemplars[entry.next].prev = entry.prev + } else { + ref.newest = entry.prev + } + + // Mark this item as deleted. + entry.ref = nil + + return ref.oldest == noExemplar && ref.newest == noExemplar +} + +// removeIndex removes an indexEntry from the circular exemplar storage. +// This function must be called with the lock acquired. +func (ce *CircularExemplarStorage) removeIndex(ref *indexEntry) { + var buf [1024]byte + entryLabels := ref.seriesLabels.Bytes(buf[:]) + delete(ce.index, string(entryLabels)) +} + +// findInsertionIndex finds the position at which e should be placed in the +// doubly-linked list by traversing the linked list from idx.newest to idx.oldest +// and following back links. Since out-of-order exemplars commonly lie close to +// the newest entry, traversing from newest to oldest is usually faster. +func (ce *CircularExemplarStorage) findInsertionIndex(e exemplar.Exemplar, idx *indexEntry) int { + for i := idx.newest; i != noExemplar; { + current := ce.exemplars[i] + if current.exemplar.Ts <= e.Ts { + return i + } + i = current.prev + } + return idx.oldest +} + func (ce *CircularExemplarStorage) computeMetrics() { ce.metrics.seriesWithExemplarsInStorage.Set(float64(len(ce.index))) @@ -443,3 +572,64 @@ func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.L } return nil } + +type intRange struct { + from, to int +} + +func (e intRange) contains(i int) bool { + return i >= e.from && i < e.to +} + +// copyExemplarRanges copies non-overlapping ranges from src into dest and +// adjusts list pointers in dest and index accordingly. Returns the number of +// copied items. +func copyExemplarRanges( + index map[string]*indexEntry, + dest, src []circularBufferEntry, + ranges []intRange, +) int { + offsets := make([]int, len(ranges)) + n := 0 + for i, rng := range ranges { + offsets[i] = n - rng.from + n += copy(dest[n:], src[rng.from:rng.to]) + } + migratedEntries := n + for di := range n { + e := &dest[di] + if e.ref == nil { + // We potentially copied empty entries. Subtract them now to correctly show the + // number of "migrated" items. + migratedEntries-- + continue + } + for i, rng := range ranges { + if rng.contains(e.prev) { + e.prev += offsets[i] + break + } + } + for i, rng := range ranges { + if rng.contains(e.next) { + e.next += offsets[i] + break + } + } + } + for _, idx := range index { + for i, rng := range ranges { + if rng.contains(idx.oldest) { + idx.oldest += offsets[i] + break + } + } + for i, rng := range ranges { + if rng.contains(idx.newest) { + idx.newest += offsets[i] + break + } + } + } + return migratedEntries +} diff --git a/tsdb/exemplar_test.go b/tsdb/exemplar_test.go index 103332c886..01ffeb9541 100644 --- a/tsdb/exemplar_test.go +++ b/tsdb/exemplar_test.go @@ -18,6 +18,7 @@ import ( "fmt" "math" "reflect" + "sort" "strconv" "strings" "sync" @@ -35,7 +36,7 @@ var eMetrics = NewExemplarMetrics(prometheus.DefaultRegisterer) // Tests the same exemplar cases as AddExemplar, but specifically the ValidateExemplar function so it can be relied on externally. func TestValidateExemplar(t *testing.T) { - exs, err := NewCircularExemplarStorage(2, eMetrics) + exs, err := NewCircularExemplarStorage(2, eMetrics, 0) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -76,54 +77,624 @@ func TestValidateExemplar(t *testing.T) { require.Equal(t, storage.ErrExemplarLabelLength, es.ValidateExemplar(l, e4)) } -func TestAddExemplar(t *testing.T) { - exs, err := NewCircularExemplarStorage(2, eMetrics) - require.NoError(t, err) - es := exs.(*CircularExemplarStorage) +func TestCircularExemplarStorage_AddExemplar(t *testing.T) { + series1 := labels.FromStrings("trace_id", "foo") + series2 := labels.FromStrings("trace_id", "bar") - l := labels.FromStrings("service", "asdf") - e := exemplar.Exemplar{ - Labels: labels.FromStrings("trace_id", "qwerty"), - Value: 0.1, - Ts: 1, + series1Matcher := []*labels.Matcher{{ + Type: labels.MatchEqual, + Name: "trace_id", + Value: series1.Get("trace_id"), + }} + + series2Matcher := []*labels.Matcher{{ + Type: labels.MatchEqual, + Name: "trace_id", + Value: series2.Get("trace_id"), + }} + + testCases := []struct { + name string + size int64 + exemplars []exemplar.Exemplar + wantExemplars []exemplar.Exemplar + matcher []*labels.Matcher + wantError error + }{ + { + name: "insert after newest", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + }, + }, + { + name: "insert before oldest", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 2}, + {Labels: series1, Value: 0.2, Ts: 1}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.2, Ts: 1}, + {Labels: series1, Value: 0.1, Ts: 2}, + }, + }, + { + name: "insert in between", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 3}, + {Labels: series1, Value: 0.3, Ts: 2}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.3, Ts: 2}, + {Labels: series1, Value: 0.2, Ts: 3}, + }, + }, + { + name: "insert after newest with overflow", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.4, Ts: 4}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.4, Ts: 4}, + }, + }, + { + name: "insert before oldest with overflow", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.4, Ts: 0}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.4, Ts: 0}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + }, + }, + { + name: "insert between with overflow", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 3}, + {Labels: series1, Value: 0.3, Ts: 4}, + {Labels: series1, Value: 0.4, Ts: 2}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.4, Ts: 2}, + {Labels: series1, Value: 0.2, Ts: 3}, + {Labels: series1, Value: 0.3, Ts: 4}, + }, + }, + { + name: "insert out of the OOO window", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 200}, + {Labels: series1, Value: 0.2, Ts: 1}, + }, + wantError: storage.ErrOutOfOrderExemplar, + }, + { + name: "insert multiple series", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 3}, + {Labels: series2, Value: 0.3, Ts: 4}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 3}, + }, + }, + { + name: "insert multiple series with overflow", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series2, Value: 0.1, Ts: 1}, + {Labels: series2, Value: 0.2, Ts: 2}, + {Labels: series2, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.4, Ts: 4}, + }, + matcher: series2Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series2, Value: 0.2, Ts: 2}, + {Labels: series2, Value: 0.3, Ts: 3}, + }, + }, + { + name: "series1 overflows series2 out-of-order", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series2, Value: 0.1, Ts: 3}, + {Labels: series2, Value: 0.2, Ts: 2}, + {Labels: series2, Value: 0.3, Ts: 4}, + {Labels: series1, Value: 0.4, Ts: 4}, + {Labels: series1, Value: 0.5, Ts: 1}, + }, + matcher: series2Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series2, Value: 0.3, Ts: 4}, + }, + }, + { + name: "ignore duplicate exemplars", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 3}, + {Labels: series1, Value: 0.1, Ts: 3}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 3}, + }, + }, + { + name: "ignore duplicate exemplars when buffer is full", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 3}, + {Labels: series1, Value: 0.2, Ts: 4}, + {Labels: series1, Value: 0.3, Ts: 5}, + {Labels: series1, Value: 0.3, Ts: 5}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 3}, + {Labels: series1, Value: 0.2, Ts: 4}, + {Labels: series1, Value: 0.3, Ts: 5}, + }, + }, + { + name: "empty timestamps are valid", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 0}, + {Labels: series1, Value: 0.2, Ts: 0}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 0}, + {Labels: series1, Value: 0.2, Ts: 0}, + }, + }, + { + name: "exemplar label length exceeds maximum", + size: 3, + exemplars: []exemplar.Exemplar{ + {Labels: labels.FromStrings("a", strings.Repeat("b", exemplar.ExemplarMaxLabelSetLength)), Value: 0.1, Ts: 2}, + }, + wantError: storage.ErrExemplarLabelLength, + }, + { + name: "native histograms", + size: 6, + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + }, + }, + { + name: "evict only exemplar for series then re-add", + size: 2, + exemplars: []exemplar.Exemplar{ + // series1 at index 0, series2 at index 1, then series1 evicts its own only exemplar + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series2, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + }, + matcher: series1Matcher, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.3, Ts: 3}, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + exs, err := NewCircularExemplarStorage(tc.size, eMetrics, 100) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + // Add exemplars and compare tc.wantErr against the first exemplar failing. + var addError error + for i, ex := range tc.exemplars { + addError = es.AddExemplar(ex.Labels, ex) + if addError != nil { + break + } + if testing.Verbose() { + t.Logf("Buffer[%d]:\n%s", i, debugCircularBuffer(es)) + } + } + if tc.wantError == nil { + require.NoError(t, addError) + } else { + require.ErrorIs(t, addError, tc.wantError) + } + if addError != nil { + return + } + + // Ensure exemplars are returned correctly and in-order. + gotExemplars, err := es.Select(0, 1000, tc.matcher) + require.NoError(t, err) + if len(tc.wantExemplars) == 0 { + require.Empty(t, gotExemplars) + } else { + require.Len(t, gotExemplars, 1) + require.Equal(t, tc.wantExemplars, gotExemplars[0].Exemplars) + } + }) + } +} + +func TestCircularExemplarStorage_Resize(t *testing.T) { + series1 := labels.FromStrings("trace_id", "foo") + series2 := labels.FromStrings("trace_id", "bar") + matcher1 := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "trace_id", "(foo|bar)"), } - require.NoError(t, es.AddExemplar(l, e)) - require.Equal(t, 0, es.index[string(l.Bytes(nil))].newest, "exemplar was not stored correctly") - - e2 := exemplar.Exemplar{ - Labels: labels.FromStrings("trace_id", "zxcvb"), - Value: 0.1, - Ts: 2, + testCases := []struct { + name string + exemplars []exemplar.Exemplar + resize int64 + wantExemplars []exemplar.Exemplar + wantNextIndex int + wantError error + }{ + { + name: "in-order, grow", + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + }, + resize: 10, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + }, + wantNextIndex: 2, + }, + { + name: "in-order, shrink", + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + }, + resize: 2, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + }, + wantNextIndex: 0, + }, + { + name: "out-of-order, shrink", + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.1, Ts: 1}, + }, + resize: 2, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + }, + wantNextIndex: 0, + }, + { + name: "out-of-order, grow", + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.2, Ts: 2}, + }, + resize: 5, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + }, + wantNextIndex: 2, + }, + { + name: "duplicate timestamps", + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 1}, + {Labels: series1, Value: 0.3, Ts: 2}, + }, + resize: 3, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 1}, + {Labels: series1, Value: 0.3, Ts: 2}, + }, + }, + { + name: "empty input, grow", + exemplars: []exemplar.Exemplar{}, + resize: 10, + wantExemplars: []exemplar.Exemplar{}, + wantNextIndex: 0, + }, + { + name: "empty input, shrink", + exemplars: []exemplar.Exemplar{}, + resize: 1, + wantExemplars: []exemplar.Exemplar{}, + wantNextIndex: 0, + }, + { + name: "shrink to zero", + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + }, + resize: 0, + wantExemplars: []exemplar.Exemplar{}, + wantNextIndex: 0, + }, + { + name: "multiple series, shrink", + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series2, Value: 1.1, Ts: 2}, + {Labels: series1, Value: 0.2, Ts: 3}, + {Labels: series2, Value: 1.2, Ts: 4}, + }, + resize: 2, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.2, Ts: 3}, + {Labels: series2, Value: 1.2, Ts: 4}, + }, + wantNextIndex: 0, + }, + { + name: "shrink to one", + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + }, + resize: 1, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.2, Ts: 2}, + }, + wantNextIndex: 0, + }, + { + name: "shrink to two", + exemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + }, + resize: 2, + wantExemplars: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + }, + wantNextIndex: 1, + }, } - require.NoError(t, es.AddExemplar(l, e2)) - require.Equal(t, 1, es.index[string(l.Bytes(nil))].newest, "exemplar was not stored correctly, location of newest exemplar for series in index did not update") - require.True(t, es.exemplars[es.index[string(l.Bytes(nil))].newest].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[string(l.Bytes(nil))].newest].exemplar) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + exs, err := NewCircularExemplarStorage(3, eMetrics, 100) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) - require.NoError(t, es.AddExemplar(l, e2), "no error is expected attempting to add duplicate exemplar") + for _, ex := range tc.exemplars { + require.NoError(t, es.AddExemplar(ex.Labels, ex)) + } - e3 := e2 - e3.Ts = 3 - require.NoError(t, es.AddExemplar(l, e3), "no error is expected when attempting to add duplicate exemplar, even with different timestamp") + // Resize the circular buffer. + if testing.Verbose() { + t.Logf("Buffer[before-resize]:\n%s", debugCircularBuffer(es)) + } + es.Resize(tc.resize) + if testing.Verbose() { + t.Logf("Buffer[after-resize]:\n%s", debugCircularBuffer(es)) + } - e3.Ts = 1 - e3.Value = 0.3 - require.Equal(t, storage.ErrOutOfOrderExemplar, es.AddExemplar(l, e3)) - - e4 := exemplar.Exemplar{ - Labels: labels.FromStrings("a", strings.Repeat("b", exemplar.ExemplarMaxLabelSetLength)), - Value: 0.1, - Ts: 2, + // Ensure exemplars are returned correctly and in-order. + gotExemplars, err := es.Select(0, 1000, matcher1) + require.NoError(t, err) + flat := make([]exemplar.Exemplar, 0) + for _, group := range gotExemplars { + flat = append(flat, group.Exemplars...) + } + sort.Slice(flat, func(i, j int) bool { + return flat[i].Ts < flat[j].Ts + }) + require.Equal(t, tc.wantExemplars, flat, "exemplar mismatch") + require.Equal(t, tc.wantNextIndex, es.nextIndex, "next index mismatch") + }) + } + + resizeTwiceCases := []struct { + name string + addExemplars1 []exemplar.Exemplar + resize1 int64 + wantExemplars1 []exemplar.Exemplar + resize2 int64 + addExemplars2 []exemplar.Exemplar + wantExemplars2 []exemplar.Exemplar + }{ + { + name: "shrink then grow ordered", + addExemplars1: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.4, Ts: 4}, + }, + resize1: 2, + wantExemplars1: []exemplar.Exemplar{ + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.4, Ts: 4}, + }, + resize2: 5, + addExemplars2: []exemplar.Exemplar{ + {Labels: series1, Value: 0.5, Ts: 5}, + {Labels: series1, Value: 0.6, Ts: 6}, + {Labels: series1, Value: 0.7, Ts: 7}, + }, + wantExemplars2: []exemplar.Exemplar{ + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.4, Ts: 4}, + {Labels: series1, Value: 0.5, Ts: 5}, + {Labels: series1, Value: 0.6, Ts: 6}, + {Labels: series1, Value: 0.7, Ts: 7}, + }, + }, + { + name: "shrink then grow out-of-order", + addExemplars1: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.4, Ts: 4}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + }, + resize1: 2, + wantExemplars1: []exemplar.Exemplar{ + // We delete in the order of ingestion, not temporally. + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + }, + resize2: 5, + addExemplars2: []exemplar.Exemplar{ + {Labels: series1, Value: 0.7, Ts: 7}, + {Labels: series1, Value: 0.6, Ts: 6}, + {Labels: series1, Value: 0.5, Ts: 5}, + }, + wantExemplars2: []exemplar.Exemplar{ + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.5, Ts: 5}, + {Labels: series1, Value: 0.6, Ts: 6}, + {Labels: series1, Value: 0.7, Ts: 7}, + }, + }, + { + name: "grow then shrink ordered", + addExemplars1: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.4, Ts: 4}, + }, + resize1: 5, + wantExemplars1: []exemplar.Exemplar{ + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.4, Ts: 4}, + }, + resize2: 2, + addExemplars2: []exemplar.Exemplar{ + {Labels: series1, Value: 0.5, Ts: 5}, + {Labels: series1, Value: 0.6, Ts: 6}, + {Labels: series1, Value: 0.7, Ts: 7}, + }, + wantExemplars2: []exemplar.Exemplar{ + {Labels: series1, Value: 0.6, Ts: 6}, + {Labels: series1, Value: 0.7, Ts: 7}, + }, + }, + { + name: "grow then shrink out-of-order", + addExemplars1: []exemplar.Exemplar{ + {Labels: series1, Value: 0.1, Ts: 1}, + {Labels: series1, Value: 0.4, Ts: 4}, + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + }, + resize1: 5, + wantExemplars1: []exemplar.Exemplar{ + // We delete in the order of ingestion, not temporally. + {Labels: series1, Value: 0.2, Ts: 2}, + {Labels: series1, Value: 0.3, Ts: 3}, + {Labels: series1, Value: 0.4, Ts: 4}, + }, + resize2: 2, + addExemplars2: []exemplar.Exemplar{ + {Labels: series1, Value: 0.7, Ts: 7}, + {Labels: series1, Value: 0.5, Ts: 5}, + {Labels: series1, Value: 0.6, Ts: 6}, + }, + wantExemplars2: []exemplar.Exemplar{ + {Labels: series1, Value: 0.5, Ts: 5}, + {Labels: series1, Value: 0.6, Ts: 6}, + }, + }, + } + + for _, tc := range resizeTwiceCases { + t.Run(tc.name, func(t *testing.T) { + exs, err := NewCircularExemplarStorage(3, eMetrics, 100) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + for _, ex := range tc.addExemplars1 { + require.NoError(t, es.AddExemplar(ex.Labels, ex)) + } + es.Resize(tc.resize1) + gotExemplars, err := es.Select(0, 1000, matcher1) + require.NoError(t, err) + require.Len(t, gotExemplars, 1) + require.Equal(t, tc.wantExemplars1, gotExemplars[0].Exemplars) + es.Resize(tc.resize2) + for _, ex := range tc.addExemplars2 { + require.NoError(t, es.AddExemplar(ex.Labels, ex)) + } + if testing.Verbose() { + t.Logf("Buffer[after-resize2]:\n%s", debugCircularBuffer(es)) + } + gotExemplars, err = es.Select(0, 1000, matcher1) + require.NoError(t, err) + require.Len(t, gotExemplars, 1) + require.Equal(t, tc.wantExemplars2, gotExemplars[0].Exemplars) + }) } - require.Equal(t, storage.ErrExemplarLabelLength, es.AddExemplar(l, e4)) } func TestStorageOverflow(t *testing.T) { // Test that circular buffer index and assignment // works properly, adding more exemplars than can // be stored and then querying for them. - exs, err := NewCircularExemplarStorage(5, eMetrics) + exs, err := NewCircularExemplarStorage(5, eMetrics, 0) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -152,7 +723,7 @@ func TestStorageOverflow(t *testing.T) { } func TestSelectExemplar(t *testing.T) { - exs, err := NewCircularExemplarStorage(5, eMetrics) + exs, err := NewCircularExemplarStorage(5, eMetrics, 0) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -179,7 +750,7 @@ func TestSelectExemplar(t *testing.T) { } func TestSelectExemplar_MultiSeries(t *testing.T) { - exs, err := NewCircularExemplarStorage(5, eMetrics) + exs, err := NewCircularExemplarStorage(5, eMetrics, 0) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -223,7 +794,7 @@ func TestSelectExemplar_MultiSeries(t *testing.T) { func TestSelectExemplar_TimeRange(t *testing.T) { var lenEs int64 = 5 - exs, err := NewCircularExemplarStorage(lenEs, eMetrics) + exs, err := NewCircularExemplarStorage(lenEs, eMetrics, 0) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -251,7 +822,7 @@ func TestSelectExemplar_TimeRange(t *testing.T) { // Test to ensure that even though a series matches more than one matcher from the // query that it's exemplars are only included in the result a single time. func TestSelectExemplar_DuplicateSeries(t *testing.T) { - exs, err := NewCircularExemplarStorage(4, eMetrics) + exs, err := NewCircularExemplarStorage(4, eMetrics, 0) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -286,7 +857,7 @@ func TestSelectExemplar_DuplicateSeries(t *testing.T) { } func TestIndexOverwrite(t *testing.T) { - exs, err := NewCircularExemplarStorage(2, eMetrics) + exs, err := NewCircularExemplarStorage(2, eMetrics, 0) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -374,7 +945,7 @@ func TestResize(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics) + exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics, 0) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -386,7 +957,14 @@ func TestResize(t *testing.T) { require.NoError(t, err) } + if testing.Verbose() { + t.Logf("Buffer[before-resize]:\n%s", debugCircularBuffer(es)) + } resized := es.Resize(tc.newCount) + if testing.Verbose() { + t.Logf("Buffer[after-resize]:\n%s", debugCircularBuffer(es)) + } + require.Equal(t, tc.expectedMigrated, resized) q, err := es.Querier(context.TODO()) @@ -421,7 +999,7 @@ func BenchmarkAddExemplar(b *testing.B) { b.Run(fmt.Sprintf("%d/%d", n, capacity), func(b *testing.B) { for b.Loop() { b.StopTimer() - exs, err := NewCircularExemplarStorage(int64(capacity), eMetrics) + exs, err := NewCircularExemplarStorage(int64(capacity), eMetrics, 0) require.NoError(b, err) es := exs.(*CircularExemplarStorage) var l labels.Labels @@ -442,6 +1020,91 @@ func BenchmarkAddExemplar(b *testing.B) { } } +func BenchmarkAddExemplar_OutOfOrder(b *testing.B) { + // We need to include these labels since we do length calculation + // before adding. + exLabels := labels.FromStrings("trace_id", "89620921") + + const ( + capacity = 5000 + ) + + fillOneSeries := func(es *CircularExemplarStorage) { + for i := range capacity { + e := exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels} + if err := es.AddExemplar(exLabels, e); err != nil { + panic(err) + } + } + } + + fillMultipleSeries := func(es *CircularExemplarStorage) { + for i := range capacity { + l := labels.FromStrings("service", strconv.Itoa(i)) + e := exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: l} + if err := es.AddExemplar(l, e); err != nil { + panic(err) + } + } + } + + outOfOrder := func(ts *int64, _ *labels.Labels) { + switch *ts % 3 { + case 0: + return + case 1: + *ts = capacity - *ts + case 2: + *ts = (capacity - *ts) + 100 + } + } + + reverseOrder := func(ts *int64, _ *labels.Labels) { + *ts = capacity - *ts + } + + multipleSeries := func(f func(*int64, *labels.Labels)) func(*int64, *labels.Labels) { + return func(ts *int64, l *labels.Labels) { + f(ts, l) + *l = labels.FromStrings("service", strconv.Itoa(int(*ts))) + } + } + + for fillName, setup := range map[string]func(es *CircularExemplarStorage){ + "empty": func(*CircularExemplarStorage) {}, + "full-one": fillOneSeries, + "full-multiple": fillMultipleSeries, + } { + for orderName, forEach := range map[string]func(ts *int64, l *labels.Labels){ + "in-order": func(*int64, *labels.Labels) {}, + "reverse": reverseOrder, + "out-of-order": outOfOrder, + "multi-in-order": multipleSeries(func(*int64, *labels.Labels) {}), + "multi-reverse": multipleSeries(reverseOrder), + "multi-out-of-order": multipleSeries(outOfOrder), + } { + b.Run(fmt.Sprintf("%s/%s", fillName, orderName), func(b *testing.B) { + exs, err := NewCircularExemplarStorage(int64(capacity), eMetrics, 100000) + require.NoError(b, err) + es := exs.(*CircularExemplarStorage) + l := labels.FromStrings("service", "0") + setup(es) + b.ResetTimer() + for b.Loop() { + for i := range capacity { + ts := int64(i) + forEach(&ts, &l) + err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: ts, Labels: l}) + if err != nil { + b.Fatalf("Failed to insert item %d %s: %v", i, l, err) + } + } + } + }) + } + } +} + func BenchmarkResizeExemplars(b *testing.B) { testCases := []struct { name string @@ -479,7 +1142,7 @@ func BenchmarkResizeExemplars(b *testing.B) { b.Run(fmt.Sprintf("%s-%d-to-%d", tc.name, tc.startSize, tc.endSize), func(b *testing.B) { for b.Loop() { b.StopTimer() - exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics) + exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics, 0) require.NoError(b, err) es := exs.(*CircularExemplarStorage) @@ -504,7 +1167,7 @@ func BenchmarkResizeExemplars(b *testing.B) { // TestCircularExemplarStorage_Concurrent_AddExemplar_Resize tries to provoke a data race between AddExemplar and Resize. // Run with race detection enabled. func TestCircularExemplarStorage_Concurrent_AddExemplar_Resize(t *testing.T) { - exs, err := NewCircularExemplarStorage(0, eMetrics) + exs, err := NewCircularExemplarStorage(0, eMetrics, 0) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -537,3 +1200,30 @@ func TestCircularExemplarStorage_Concurrent_AddExemplar_Resize(t *testing.T) { } } } + +// debugCircularBuffer iterates all exemplars in the circular exemplar storage +// and returns them as a string. The textual representation contains index +// pointers and helps debugging exemplar storage. +func debugCircularBuffer(ce *CircularExemplarStorage) string { + var sb strings.Builder + for i, e := range ce.exemplars { + if e.ref == nil { + continue + } + sb.WriteString(fmt.Sprintf( + "i: %d, ts: %d, next: %d, prev: %d", + i, e.exemplar.Ts, e.next, e.prev, + )) + for _, idx := range ce.index { + if i == idx.newest { + sb.WriteString(" <- newest " + idx.seriesLabels.String()) + } + if i == idx.oldest { + sb.WriteString(" <- oldest " + idx.seriesLabels.String()) + } + } + sb.WriteString("\n") + } + sb.WriteString(fmt.Sprintf("Next index: %d\n", ce.nextIndex)) + return sb.String() +} diff --git a/tsdb/head.go b/tsdb/head.go index a4df208e6e..955c0ae5a7 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -327,7 +327,7 @@ func (h *Head) resetInMemoryState() error { if em == nil { em = NewExemplarMetrics(h.reg) } - es, err := NewCircularExemplarStorage(h.opts.MaxExemplars.Load(), em) + es, err := NewCircularExemplarStorage(h.opts.MaxExemplars.Load(), em, h.opts.OutOfOrderTimeWindow.Load()) if err != nil { return err } @@ -1037,6 +1037,8 @@ func (h *Head) ApplyConfig(cfg *config.Config, wbl *wlog.WL) { return } + h.exemplars.(*CircularExemplarStorage).SetOutOfOrderTimeWindow(oooTimeWindow) + // Head uses opts.MaxExemplars in combination with opts.EnableExemplarStorage // to decide if it should pass exemplars along to its exemplar storage, so we // need to update opts.MaxExemplars here. diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index 30a63327ab..17efdda77d 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -65,7 +65,7 @@ func NewWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) { reg := prometheus.NewRegistry() eMetrics := tsdb.NewExemplarMetrics(reg) - es, err := tsdb.NewCircularExemplarStorage(10, eMetrics) + es, err := tsdb.NewCircularExemplarStorage(10, eMetrics, opts.OutOfOrderTimeWindow) if err != nil { return nil, fmt.Errorf("opening test exemplar storage: %w", err) }