mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-18 18:25:24 -05:00
tsdb: add support for OOO exemplars in CircularExemplarStorage (#17469)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
* tsdb: add support for OOO exemplars in CircularExemplarStorage Doubly linked exemplar storage resize. Split exemplar buffer resize into shrink and grow functions. Skip duplicate OOO exemplars, re-initialize emptied index after deleting its last exemplar. Signed-off-by: Julius Hinze <julius.hinze@grafana.com>
This commit is contained in:
parent
99c8351d0e
commit
22463b1e9f
4 changed files with 1018 additions and 136 deletions
372
tsdb/exemplar.go
372
tsdb/exemplar.go
|
|
@ -36,10 +36,11 @@ const (
|
|||
)
|
||||
|
||||
type CircularExemplarStorage struct {
|
||||
lock sync.RWMutex
|
||||
exemplars []circularBufferEntry
|
||||
nextIndex int
|
||||
metrics *ExemplarMetrics
|
||||
lock sync.RWMutex
|
||||
exemplars []circularBufferEntry
|
||||
nextIndex int
|
||||
metrics *ExemplarMetrics
|
||||
oooTimeWindowMillis int64
|
||||
|
||||
// Map of series labels as a string to index entry, which points to the first
|
||||
// and last exemplar for the series in the exemplars circular buffer.
|
||||
|
|
@ -55,6 +56,7 @@ type indexEntry struct {
|
|||
type circularBufferEntry struct {
|
||||
exemplar exemplar.Exemplar
|
||||
next int
|
||||
prev int
|
||||
ref *indexEntry
|
||||
}
|
||||
|
||||
|
|
@ -115,15 +117,19 @@ func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics {
|
|||
// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in
|
||||
// 1GB of extra memory, accounting for the fact that this is heap allocated space.
|
||||
// If len <= 0, then the exemplar storage is essentially a noop storage but can later be
|
||||
// resized to store exemplars.
|
||||
func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStorage, error) {
|
||||
// resized to store exemplars. If oooTimeWindowMillis <= 0, out-of-order exemplars are disabled.
|
||||
func NewCircularExemplarStorage(length int64, m *ExemplarMetrics, oooTimeWindowMillis int64) (ExemplarStorage, error) {
|
||||
if length < 0 {
|
||||
length = 0
|
||||
}
|
||||
if oooTimeWindowMillis < 0 {
|
||||
oooTimeWindowMillis = 0
|
||||
}
|
||||
c := &CircularExemplarStorage{
|
||||
exemplars: make([]circularBufferEntry, length),
|
||||
index: make(map[string]*indexEntry, length/estimatedExemplarsPerSeries),
|
||||
metrics: m,
|
||||
exemplars: make([]circularBufferEntry, length),
|
||||
index: make(map[string]*indexEntry, length/estimatedExemplarsPerSeries),
|
||||
metrics: m,
|
||||
oooTimeWindowMillis: oooTimeWindowMillis,
|
||||
}
|
||||
|
||||
c.metrics.maxExemplars.Set(float64(length))
|
||||
|
|
@ -171,6 +177,9 @@ func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*label
|
|||
}
|
||||
se.SeriesLabels = idx.seriesLabels
|
||||
|
||||
// TODO: Since we maintain a doubly-linked-list, we can also iterate from head to tail
|
||||
// which might be more performant if the selected interval is skewed to the head.
|
||||
|
||||
// Loop through all exemplars in the circular buffer for the current series.
|
||||
for e.exemplar.Ts <= end {
|
||||
if e.exemplar.Ts >= start {
|
||||
|
|
@ -253,16 +262,12 @@ func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar.
|
|||
return storage.ErrDuplicateExemplar
|
||||
}
|
||||
|
||||
// Since during the scrape the exemplars are sorted first by timestamp, then value, then labels,
|
||||
// if any of these conditions are true, we know that the exemplar is either a duplicate
|
||||
// of a previous one (but not the most recent one as that is checked above) or out of order.
|
||||
// We now allow exemplars with duplicate timestamps as long as they have different values and/or labels
|
||||
// since that can happen for different buckets of a native histogram.
|
||||
// We do not distinguish between duplicates and out of order as iterating through the exemplars
|
||||
// to check for that would be expensive (versus just comparing with the most recent one) especially
|
||||
// since this is run under a lock, and not worth it as we just need to return an error so we do not
|
||||
// append the exemplar.
|
||||
if e.Ts < newestExemplar.Ts ||
|
||||
// Reject exemplars older than the OOO time window relative to the newest exemplar.
|
||||
// Exemplars with the same timestamp are ordered by value then label hash to detect
|
||||
// duplicates without iterating through all stored exemplars, which would be too
|
||||
// expensive under lock. Exemplars with equal timestamps but different values or
|
||||
// labels are allowed to support multiple buckets of native histograms.
|
||||
if (e.Ts < newestExemplar.Ts && e.Ts <= newestExemplar.Ts-ce.oooTimeWindowMillis) ||
|
||||
(e.Ts == newestExemplar.Ts && e.Value < newestExemplar.Value) ||
|
||||
(e.Ts == newestExemplar.Ts && e.Value == newestExemplar.Value && e.Labels.Hash() < newestExemplar.Labels.Hash()) {
|
||||
if appended {
|
||||
|
|
@ -273,8 +278,19 @@ func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar.
|
|||
return nil
|
||||
}
|
||||
|
||||
// Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it.
|
||||
// Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.
|
||||
// SetOutOfOrderTimeWindow sets the out-of-order time window for exemplars in
|
||||
// milliseconds. Exemplars older than it are not added to the circular exemplar
|
||||
// buffer.
|
||||
func (ce *CircularExemplarStorage) SetOutOfOrderTimeWindow(d int64) {
|
||||
ce.lock.Lock()
|
||||
defer ce.lock.Unlock()
|
||||
ce.oooTimeWindowMillis = d
|
||||
}
|
||||
|
||||
// Resize changes the size of exemplar buffer by allocating a new buffer and
|
||||
// migrating data to it. Exemplars are kept when possible. Shrinking will discard
|
||||
// old data (in order of ingestion) as needed. Returns the number of migrated
|
||||
// exemplars.
|
||||
func (ce *CircularExemplarStorage) Resize(l int64) int {
|
||||
// Accept negative values as just 0 size.
|
||||
if l <= 0 {
|
||||
|
|
@ -284,65 +300,83 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {
|
|||
ce.lock.Lock()
|
||||
defer ce.lock.Unlock()
|
||||
|
||||
if l == int64(len(ce.exemplars)) {
|
||||
return 0
|
||||
}
|
||||
|
||||
oldBuffer := ce.exemplars
|
||||
oldNextIndex := int64(ce.nextIndex)
|
||||
|
||||
ce.exemplars = make([]circularBufferEntry, l)
|
||||
ce.index = make(map[string]*indexEntry, l/estimatedExemplarsPerSeries)
|
||||
ce.nextIndex = 0
|
||||
|
||||
// Replay as many entries as needed, starting with oldest first.
|
||||
count := min(l, int64(len(oldBuffer)))
|
||||
|
||||
oldSize := int64(len(ce.exemplars))
|
||||
migrated := 0
|
||||
|
||||
if l > 0 && len(oldBuffer) > 0 {
|
||||
// Rewind previous next index by count with wrap-around.
|
||||
// This math is essentially looking at nextIndex, where we would write the next exemplar to,
|
||||
// and find the index in the old exemplar buffer that we should start migrating exemplars from.
|
||||
// This way we don't migrate exemplars that would just be overwritten when migrating later exemplars.
|
||||
startIndex := (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer))
|
||||
|
||||
var buf [1024]byte
|
||||
for i := range count {
|
||||
idx := (startIndex + i) % int64(len(oldBuffer))
|
||||
if oldBuffer[idx].ref != nil {
|
||||
ce.migrate(&oldBuffer[idx], buf[:])
|
||||
migrated++
|
||||
}
|
||||
}
|
||||
switch {
|
||||
case l == oldSize:
|
||||
// NOOP.
|
||||
return migrated
|
||||
case l > oldSize:
|
||||
migrated = ce.grow(l)
|
||||
case l < oldSize:
|
||||
migrated = ce.shrink(l)
|
||||
}
|
||||
|
||||
ce.computeMetrics()
|
||||
ce.metrics.maxExemplars.Set(float64(l))
|
||||
|
||||
return migrated
|
||||
}
|
||||
|
||||
// migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires
|
||||
// external lock and does not compute metrics.
|
||||
func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry, buf []byte) {
|
||||
seriesLabels := entry.ref.seriesLabels.Bytes(buf[:0])
|
||||
|
||||
idx, ok := ce.index[string(seriesLabels)]
|
||||
if !ok {
|
||||
idx = entry.ref
|
||||
idx.oldest = ce.nextIndex
|
||||
ce.index[string(seriesLabels)] = idx
|
||||
} else {
|
||||
entry.ref = idx
|
||||
ce.exemplars[idx.newest].next = ce.nextIndex
|
||||
// grow the circular buffer to have size l by allocating a new slice and copying
|
||||
// the old data to it. After growing, ce.nextIndex points to the next free entry
|
||||
// in the buffer. This function must be called with the lock acquired.
|
||||
func (ce *CircularExemplarStorage) grow(l int64) int {
|
||||
oldSize := len(ce.exemplars)
|
||||
newSlice := make([]circularBufferEntry, l)
|
||||
ranges := []intRange{
|
||||
{from: ce.nextIndex, to: oldSize},
|
||||
{from: 0, to: ce.nextIndex},
|
||||
}
|
||||
idx.newest = ce.nextIndex
|
||||
ce.nextIndex = copyExemplarRanges(ce.index, newSlice, ce.exemplars, ranges)
|
||||
ce.exemplars = newSlice
|
||||
return oldSize
|
||||
}
|
||||
|
||||
entry.next = noExemplar
|
||||
ce.exemplars[ce.nextIndex] = *entry
|
||||
// shrink the circular buffer by either trimming from the right or deleting the
|
||||
// oldest samples to accommodate the new size l. This function must be called
|
||||
// with the lock acquired.
|
||||
func (ce *CircularExemplarStorage) shrink(l int64) (migrated int) {
|
||||
oldSize := len(ce.exemplars)
|
||||
diff := int(int64(oldSize) - l)
|
||||
deleteStart := ce.nextIndex
|
||||
deleteEnd := (deleteStart + diff) % oldSize
|
||||
|
||||
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
|
||||
// Remove items from the buffer starting from c.nextIndex. This drops older
|
||||
// entries first in the order of ingestion.
|
||||
for i := range diff {
|
||||
idx := (deleteStart + i) % oldSize
|
||||
ref := ce.exemplars[idx].ref
|
||||
if ce.removeExemplar(&ce.exemplars[idx]) {
|
||||
ce.removeIndex(ref)
|
||||
}
|
||||
}
|
||||
|
||||
newSlice := make([]circularBufferEntry, int(l))
|
||||
|
||||
switch {
|
||||
case deleteStart == deleteEnd:
|
||||
// The entire buffer was cleared (shrink to zero). Note that we don't have to
|
||||
// delete the index since removeExemplar already did. Simply remove all elements
|
||||
// and reset tracking pointers.
|
||||
ce.exemplars = newSlice
|
||||
ce.nextIndex = 0
|
||||
return 0
|
||||
case deleteStart < deleteEnd:
|
||||
// We delete an "inner" section of the circular buffer.
|
||||
migrated = copyExemplarRanges(ce.index, newSlice, ce.exemplars, []intRange{
|
||||
{from: deleteEnd, to: oldSize},
|
||||
{from: 0, to: deleteStart},
|
||||
})
|
||||
case deleteStart > deleteEnd:
|
||||
// We keep an "inner" section of the circular buffer.
|
||||
migrated = copyExemplarRanges(ce.index, newSlice, ce.exemplars, []intRange{
|
||||
{from: deleteEnd, to: deleteStart},
|
||||
})
|
||||
}
|
||||
|
||||
ce.nextIndex = migrated % int(l)
|
||||
ce.exemplars = newSlice
|
||||
return migrated
|
||||
}
|
||||
|
||||
func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error {
|
||||
|
|
@ -358,7 +392,7 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
|
|||
var buf [1024]byte
|
||||
seriesLabels := l.Bytes(buf[:])
|
||||
|
||||
idx, ok := ce.index[string(seriesLabels)]
|
||||
idx, indexExists := ce.index[string(seriesLabels)]
|
||||
err := ce.validateExemplar(idx, e, true)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrDuplicateExemplar) {
|
||||
|
|
@ -368,32 +402,77 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
|
|||
return err
|
||||
}
|
||||
|
||||
if !ok {
|
||||
idx = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
|
||||
ce.index[string(seriesLabels)] = idx
|
||||
} else {
|
||||
ce.exemplars[idx.newest].next = ce.nextIndex
|
||||
}
|
||||
|
||||
if prev := &ce.exemplars[ce.nextIndex]; prev.ref != nil {
|
||||
// There exists an exemplar already on this ce.nextIndex entry,
|
||||
// drop it, to make place for others.
|
||||
if prev.next == noExemplar {
|
||||
// Last item for this series, remove index entry.
|
||||
var buf [1024]byte
|
||||
prevLabels := prev.ref.seriesLabels.Bytes(buf[:])
|
||||
delete(ce.index, string(prevLabels))
|
||||
} else {
|
||||
prev.ref.oldest = prev.next
|
||||
// If we insert an out-of-order exemplar, we preemptively find the insertion
|
||||
// index to check for duplicates.
|
||||
var insertionIndex int
|
||||
if indexExists {
|
||||
outOfOrder := e.Ts >= ce.exemplars[idx.oldest].exemplar.Ts && e.Ts < ce.exemplars[idx.newest].exemplar.Ts
|
||||
if outOfOrder {
|
||||
insertionIndex = ce.findInsertionIndex(e, idx)
|
||||
if ce.exemplars[insertionIndex].exemplar.Ts == e.Ts {
|
||||
// Assume duplicate exemplar, noop.
|
||||
// Native histograms will exercise this code path a lot due to
|
||||
// having multiple exemplars per series so checking the
|
||||
// value and labels would be too expensive.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select)
|
||||
// since this is the first exemplar stored for this series.
|
||||
ce.exemplars[ce.nextIndex].next = noExemplar
|
||||
// If the index didn't exist (new series), create one.
|
||||
if !indexExists {
|
||||
idx = &indexEntry{seriesLabels: l}
|
||||
ce.index[string(seriesLabels)] = idx
|
||||
}
|
||||
|
||||
// Remove entries if the buffer is full. Note that this doesn't invalidate the
|
||||
// insertion index since out-of-order exemplars cannot be the oldest exemplar.
|
||||
if prev := &ce.exemplars[ce.nextIndex]; prev.ref != nil {
|
||||
prevRef := prev.ref
|
||||
if ce.removeExemplar(prev) {
|
||||
if prevRef == idx {
|
||||
// Do not delete the indexEntry we're inserting to.
|
||||
indexExists = false
|
||||
} else {
|
||||
ce.removeIndex(prevRef)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We create a new entry in the linked list.
|
||||
ce.exemplars[ce.nextIndex].exemplar = e
|
||||
ce.exemplars[ce.nextIndex].ref = idx
|
||||
idx.newest = ce.nextIndex
|
||||
|
||||
switch {
|
||||
case !indexExists:
|
||||
// Add the first and only exemplar to the list.
|
||||
idx.oldest = ce.nextIndex
|
||||
idx.newest = ce.nextIndex
|
||||
ce.exemplars[ce.nextIndex].prev = noExemplar
|
||||
ce.exemplars[ce.nextIndex].next = noExemplar
|
||||
case e.Ts >= ce.exemplars[idx.newest].exemplar.Ts:
|
||||
// Add the exemplar at the tip (after newest).
|
||||
ce.exemplars[idx.newest].next = ce.nextIndex
|
||||
ce.exemplars[ce.nextIndex].prev = idx.newest
|
||||
ce.exemplars[ce.nextIndex].next = noExemplar
|
||||
idx.newest = ce.nextIndex
|
||||
case e.Ts < ce.exemplars[idx.oldest].exemplar.Ts:
|
||||
// Add the exemplar at the tail (before oldest).
|
||||
ce.exemplars[idx.oldest].prev = ce.nextIndex
|
||||
ce.exemplars[ce.nextIndex].prev = noExemplar
|
||||
ce.exemplars[ce.nextIndex].next = idx.oldest
|
||||
idx.oldest = ce.nextIndex
|
||||
default:
|
||||
// Insert the exemplar into the list by finding the most recent
|
||||
// in-order exemplar that precedes it, and placing it after.
|
||||
nextExemplar := ce.exemplars[insertionIndex].next
|
||||
ce.exemplars[ce.nextIndex].prev = insertionIndex
|
||||
ce.exemplars[ce.nextIndex].next = nextExemplar
|
||||
ce.exemplars[insertionIndex].next = ce.nextIndex
|
||||
if nextExemplar != noExemplar {
|
||||
ce.exemplars[nextExemplar].prev = ce.nextIndex
|
||||
}
|
||||
}
|
||||
|
||||
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
|
||||
|
||||
|
|
@ -402,6 +481,56 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
|
|||
return nil
|
||||
}
|
||||
|
||||
// removeExemplar removes the given entry from the circular buffer. Returns true
|
||||
// iff the deleted entry was the last entry (and the index is now empty).
|
||||
// This function must be called with the lock acquired.
|
||||
func (ce *CircularExemplarStorage) removeExemplar(entry *circularBufferEntry) bool {
|
||||
ref := entry.ref
|
||||
if ref == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if entry.prev != noExemplar {
|
||||
ce.exemplars[entry.prev].next = entry.next
|
||||
} else {
|
||||
ref.oldest = entry.next
|
||||
}
|
||||
|
||||
if entry.next != noExemplar {
|
||||
ce.exemplars[entry.next].prev = entry.prev
|
||||
} else {
|
||||
ref.newest = entry.prev
|
||||
}
|
||||
|
||||
// Mark this item as deleted.
|
||||
entry.ref = nil
|
||||
|
||||
return ref.oldest == noExemplar && ref.newest == noExemplar
|
||||
}
|
||||
|
||||
// removeIndex removes an indexEntry from the circular exemplar storage.
|
||||
// This function must be called with the lock acquired.
|
||||
func (ce *CircularExemplarStorage) removeIndex(ref *indexEntry) {
|
||||
var buf [1024]byte
|
||||
entryLabels := ref.seriesLabels.Bytes(buf[:])
|
||||
delete(ce.index, string(entryLabels))
|
||||
}
|
||||
|
||||
// findInsertionIndex finds the position at which e should be placed in the
|
||||
// doubly-linked list by traversing the linked list from idx.newest to idx.oldest
|
||||
// and following back links. Since out-of-order exemplars commonly lie close to
|
||||
// the newest entry, traversing from newest to oldest is usually faster.
|
||||
func (ce *CircularExemplarStorage) findInsertionIndex(e exemplar.Exemplar, idx *indexEntry) int {
|
||||
for i := idx.newest; i != noExemplar; {
|
||||
current := ce.exemplars[i]
|
||||
if current.exemplar.Ts <= e.Ts {
|
||||
return i
|
||||
}
|
||||
i = current.prev
|
||||
}
|
||||
return idx.oldest
|
||||
}
|
||||
|
||||
func (ce *CircularExemplarStorage) computeMetrics() {
|
||||
ce.metrics.seriesWithExemplarsInStorage.Set(float64(len(ce.index)))
|
||||
|
||||
|
|
@ -443,3 +572,64 @@ func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.L
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type intRange struct {
|
||||
from, to int
|
||||
}
|
||||
|
||||
func (e intRange) contains(i int) bool {
|
||||
return i >= e.from && i < e.to
|
||||
}
|
||||
|
||||
// copyExemplarRanges copies non-overlapping ranges from src into dest and
|
||||
// adjusts list pointers in dest and index accordingly. Returns the number of
|
||||
// copied items.
|
||||
func copyExemplarRanges(
|
||||
index map[string]*indexEntry,
|
||||
dest, src []circularBufferEntry,
|
||||
ranges []intRange,
|
||||
) int {
|
||||
offsets := make([]int, len(ranges))
|
||||
n := 0
|
||||
for i, rng := range ranges {
|
||||
offsets[i] = n - rng.from
|
||||
n += copy(dest[n:], src[rng.from:rng.to])
|
||||
}
|
||||
migratedEntries := n
|
||||
for di := range n {
|
||||
e := &dest[di]
|
||||
if e.ref == nil {
|
||||
// We potentially copied empty entries. Subtract them now to correctly show the
|
||||
// number of "migrated" items.
|
||||
migratedEntries--
|
||||
continue
|
||||
}
|
||||
for i, rng := range ranges {
|
||||
if rng.contains(e.prev) {
|
||||
e.prev += offsets[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
for i, rng := range ranges {
|
||||
if rng.contains(e.next) {
|
||||
e.next += offsets[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, idx := range index {
|
||||
for i, rng := range ranges {
|
||||
if rng.contains(idx.oldest) {
|
||||
idx.oldest += offsets[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
for i, rng := range ranges {
|
||||
if rng.contains(idx.newest) {
|
||||
idx.newest += offsets[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return migratedEntries
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
|
@ -35,7 +36,7 @@ var eMetrics = NewExemplarMetrics(prometheus.DefaultRegisterer)
|
|||
|
||||
// Tests the same exemplar cases as AddExemplar, but specifically the ValidateExemplar function so it can be relied on externally.
|
||||
func TestValidateExemplar(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(2, eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(2, eMetrics, 0)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
|
|
@ -76,54 +77,624 @@ func TestValidateExemplar(t *testing.T) {
|
|||
require.Equal(t, storage.ErrExemplarLabelLength, es.ValidateExemplar(l, e4))
|
||||
}
|
||||
|
||||
func TestAddExemplar(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(2, eMetrics)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
func TestCircularExemplarStorage_AddExemplar(t *testing.T) {
|
||||
series1 := labels.FromStrings("trace_id", "foo")
|
||||
series2 := labels.FromStrings("trace_id", "bar")
|
||||
|
||||
l := labels.FromStrings("service", "asdf")
|
||||
e := exemplar.Exemplar{
|
||||
Labels: labels.FromStrings("trace_id", "qwerty"),
|
||||
Value: 0.1,
|
||||
Ts: 1,
|
||||
series1Matcher := []*labels.Matcher{{
|
||||
Type: labels.MatchEqual,
|
||||
Name: "trace_id",
|
||||
Value: series1.Get("trace_id"),
|
||||
}}
|
||||
|
||||
series2Matcher := []*labels.Matcher{{
|
||||
Type: labels.MatchEqual,
|
||||
Name: "trace_id",
|
||||
Value: series2.Get("trace_id"),
|
||||
}}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
size int64
|
||||
exemplars []exemplar.Exemplar
|
||||
wantExemplars []exemplar.Exemplar
|
||||
matcher []*labels.Matcher
|
||||
wantError error
|
||||
}{
|
||||
{
|
||||
name: "insert after newest",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "insert before oldest",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 2},
|
||||
{Labels: series1, Value: 0.2, Ts: 1},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.2, Ts: 1},
|
||||
{Labels: series1, Value: 0.1, Ts: 2},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "insert in between",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 3},
|
||||
{Labels: series1, Value: 0.3, Ts: 2},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.3, Ts: 2},
|
||||
{Labels: series1, Value: 0.2, Ts: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "insert after newest with overflow",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "insert before oldest with overflow",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 0},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.4, Ts: 0},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "insert between with overflow",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 3},
|
||||
{Labels: series1, Value: 0.3, Ts: 4},
|
||||
{Labels: series1, Value: 0.4, Ts: 2},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.4, Ts: 2},
|
||||
{Labels: series1, Value: 0.2, Ts: 3},
|
||||
{Labels: series1, Value: 0.3, Ts: 4},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "insert out of the OOO window",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 200},
|
||||
{Labels: series1, Value: 0.2, Ts: 1},
|
||||
},
|
||||
wantError: storage.ErrOutOfOrderExemplar,
|
||||
},
|
||||
{
|
||||
name: "insert multiple series",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 3},
|
||||
{Labels: series2, Value: 0.3, Ts: 4},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "insert multiple series with overflow",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series2, Value: 0.1, Ts: 1},
|
||||
{Labels: series2, Value: 0.2, Ts: 2},
|
||||
{Labels: series2, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
},
|
||||
matcher: series2Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series2, Value: 0.2, Ts: 2},
|
||||
{Labels: series2, Value: 0.3, Ts: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "series1 overflows series2 out-of-order",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series2, Value: 0.1, Ts: 3},
|
||||
{Labels: series2, Value: 0.2, Ts: 2},
|
||||
{Labels: series2, Value: 0.3, Ts: 4},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
{Labels: series1, Value: 0.5, Ts: 1},
|
||||
},
|
||||
matcher: series2Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series2, Value: 0.3, Ts: 4},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ignore duplicate exemplars",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 3},
|
||||
{Labels: series1, Value: 0.1, Ts: 3},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ignore duplicate exemplars when buffer is full",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 3},
|
||||
{Labels: series1, Value: 0.2, Ts: 4},
|
||||
{Labels: series1, Value: 0.3, Ts: 5},
|
||||
{Labels: series1, Value: 0.3, Ts: 5},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 3},
|
||||
{Labels: series1, Value: 0.2, Ts: 4},
|
||||
{Labels: series1, Value: 0.3, Ts: 5},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty timestamps are valid",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 0},
|
||||
{Labels: series1, Value: 0.2, Ts: 0},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 0},
|
||||
{Labels: series1, Value: 0.2, Ts: 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "exemplar label length exceeds maximum",
|
||||
size: 3,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: labels.FromStrings("a", strings.Repeat("b", exemplar.ExemplarMaxLabelSetLength)), Value: 0.1, Ts: 2},
|
||||
},
|
||||
wantError: storage.ErrExemplarLabelLength,
|
||||
},
|
||||
{
|
||||
name: "native histograms",
|
||||
size: 6,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "evict only exemplar for series then re-add",
|
||||
size: 2,
|
||||
exemplars: []exemplar.Exemplar{
|
||||
// series1 at index 0, series2 at index 1, then series1 evicts its own only exemplar
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series2, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
matcher: series1Matcher,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(tc.size, eMetrics, 100)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
// Add exemplars and compare tc.wantErr against the first exemplar failing.
|
||||
var addError error
|
||||
for i, ex := range tc.exemplars {
|
||||
addError = es.AddExemplar(ex.Labels, ex)
|
||||
if addError != nil {
|
||||
break
|
||||
}
|
||||
if testing.Verbose() {
|
||||
t.Logf("Buffer[%d]:\n%s", i, debugCircularBuffer(es))
|
||||
}
|
||||
}
|
||||
if tc.wantError == nil {
|
||||
require.NoError(t, addError)
|
||||
} else {
|
||||
require.ErrorIs(t, addError, tc.wantError)
|
||||
}
|
||||
if addError != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Ensure exemplars are returned correctly and in-order.
|
||||
gotExemplars, err := es.Select(0, 1000, tc.matcher)
|
||||
require.NoError(t, err)
|
||||
if len(tc.wantExemplars) == 0 {
|
||||
require.Empty(t, gotExemplars)
|
||||
} else {
|
||||
require.Len(t, gotExemplars, 1)
|
||||
require.Equal(t, tc.wantExemplars, gotExemplars[0].Exemplars)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCircularExemplarStorage_Resize(t *testing.T) {
|
||||
series1 := labels.FromStrings("trace_id", "foo")
|
||||
series2 := labels.FromStrings("trace_id", "bar")
|
||||
matcher1 := []*labels.Matcher{
|
||||
labels.MustNewMatcher(labels.MatchRegexp, "trace_id", "(foo|bar)"),
|
||||
}
|
||||
|
||||
require.NoError(t, es.AddExemplar(l, e))
|
||||
require.Equal(t, 0, es.index[string(l.Bytes(nil))].newest, "exemplar was not stored correctly")
|
||||
|
||||
e2 := exemplar.Exemplar{
|
||||
Labels: labels.FromStrings("trace_id", "zxcvb"),
|
||||
Value: 0.1,
|
||||
Ts: 2,
|
||||
testCases := []struct {
|
||||
name string
|
||||
exemplars []exemplar.Exemplar
|
||||
resize int64
|
||||
wantExemplars []exemplar.Exemplar
|
||||
wantNextIndex int
|
||||
wantError error
|
||||
}{
|
||||
{
|
||||
name: "in-order, grow",
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
resize: 10,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
wantNextIndex: 2,
|
||||
},
|
||||
{
|
||||
name: "in-order, shrink",
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
resize: 2,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
wantNextIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "out-of-order, shrink",
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
},
|
||||
resize: 2,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
wantNextIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "out-of-order, grow",
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
resize: 5,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
wantNextIndex: 2,
|
||||
},
|
||||
{
|
||||
name: "duplicate timestamps",
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 1},
|
||||
{Labels: series1, Value: 0.3, Ts: 2},
|
||||
},
|
||||
resize: 3,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 1},
|
||||
{Labels: series1, Value: 0.3, Ts: 2},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty input, grow",
|
||||
exemplars: []exemplar.Exemplar{},
|
||||
resize: 10,
|
||||
wantExemplars: []exemplar.Exemplar{},
|
||||
wantNextIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "empty input, shrink",
|
||||
exemplars: []exemplar.Exemplar{},
|
||||
resize: 1,
|
||||
wantExemplars: []exemplar.Exemplar{},
|
||||
wantNextIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "shrink to zero",
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
resize: 0,
|
||||
wantExemplars: []exemplar.Exemplar{},
|
||||
wantNextIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "multiple series, shrink",
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series2, Value: 1.1, Ts: 2},
|
||||
{Labels: series1, Value: 0.2, Ts: 3},
|
||||
{Labels: series2, Value: 1.2, Ts: 4},
|
||||
},
|
||||
resize: 2,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.2, Ts: 3},
|
||||
{Labels: series2, Value: 1.2, Ts: 4},
|
||||
},
|
||||
wantNextIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "shrink to one",
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
resize: 1,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
wantNextIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "shrink to two",
|
||||
exemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
},
|
||||
resize: 2,
|
||||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
},
|
||||
wantNextIndex: 1,
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(t, es.AddExemplar(l, e2))
|
||||
require.Equal(t, 1, es.index[string(l.Bytes(nil))].newest, "exemplar was not stored correctly, location of newest exemplar for series in index did not update")
|
||||
require.True(t, es.exemplars[es.index[string(l.Bytes(nil))].newest].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[string(l.Bytes(nil))].newest].exemplar)
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(3, eMetrics, 100)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
require.NoError(t, es.AddExemplar(l, e2), "no error is expected attempting to add duplicate exemplar")
|
||||
for _, ex := range tc.exemplars {
|
||||
require.NoError(t, es.AddExemplar(ex.Labels, ex))
|
||||
}
|
||||
|
||||
e3 := e2
|
||||
e3.Ts = 3
|
||||
require.NoError(t, es.AddExemplar(l, e3), "no error is expected when attempting to add duplicate exemplar, even with different timestamp")
|
||||
// Resize the circular buffer.
|
||||
if testing.Verbose() {
|
||||
t.Logf("Buffer[before-resize]:\n%s", debugCircularBuffer(es))
|
||||
}
|
||||
es.Resize(tc.resize)
|
||||
if testing.Verbose() {
|
||||
t.Logf("Buffer[after-resize]:\n%s", debugCircularBuffer(es))
|
||||
}
|
||||
|
||||
e3.Ts = 1
|
||||
e3.Value = 0.3
|
||||
require.Equal(t, storage.ErrOutOfOrderExemplar, es.AddExemplar(l, e3))
|
||||
|
||||
e4 := exemplar.Exemplar{
|
||||
Labels: labels.FromStrings("a", strings.Repeat("b", exemplar.ExemplarMaxLabelSetLength)),
|
||||
Value: 0.1,
|
||||
Ts: 2,
|
||||
// Ensure exemplars are returned correctly and in-order.
|
||||
gotExemplars, err := es.Select(0, 1000, matcher1)
|
||||
require.NoError(t, err)
|
||||
flat := make([]exemplar.Exemplar, 0)
|
||||
for _, group := range gotExemplars {
|
||||
flat = append(flat, group.Exemplars...)
|
||||
}
|
||||
sort.Slice(flat, func(i, j int) bool {
|
||||
return flat[i].Ts < flat[j].Ts
|
||||
})
|
||||
require.Equal(t, tc.wantExemplars, flat, "exemplar mismatch")
|
||||
require.Equal(t, tc.wantNextIndex, es.nextIndex, "next index mismatch")
|
||||
})
|
||||
}
|
||||
|
||||
resizeTwiceCases := []struct {
|
||||
name string
|
||||
addExemplars1 []exemplar.Exemplar
|
||||
resize1 int64
|
||||
wantExemplars1 []exemplar.Exemplar
|
||||
resize2 int64
|
||||
addExemplars2 []exemplar.Exemplar
|
||||
wantExemplars2 []exemplar.Exemplar
|
||||
}{
|
||||
{
|
||||
name: "shrink then grow ordered",
|
||||
addExemplars1: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
},
|
||||
resize1: 2,
|
||||
wantExemplars1: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
},
|
||||
resize2: 5,
|
||||
addExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.5, Ts: 5},
|
||||
{Labels: series1, Value: 0.6, Ts: 6},
|
||||
{Labels: series1, Value: 0.7, Ts: 7},
|
||||
},
|
||||
wantExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
{Labels: series1, Value: 0.5, Ts: 5},
|
||||
{Labels: series1, Value: 0.6, Ts: 6},
|
||||
{Labels: series1, Value: 0.7, Ts: 7},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "shrink then grow out-of-order",
|
||||
addExemplars1: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
resize1: 2,
|
||||
wantExemplars1: []exemplar.Exemplar{
|
||||
// We delete in the order of ingestion, not temporally.
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
resize2: 5,
|
||||
addExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.7, Ts: 7},
|
||||
{Labels: series1, Value: 0.6, Ts: 6},
|
||||
{Labels: series1, Value: 0.5, Ts: 5},
|
||||
},
|
||||
wantExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.5, Ts: 5},
|
||||
{Labels: series1, Value: 0.6, Ts: 6},
|
||||
{Labels: series1, Value: 0.7, Ts: 7},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "grow then shrink ordered",
|
||||
addExemplars1: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
},
|
||||
resize1: 5,
|
||||
wantExemplars1: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
},
|
||||
resize2: 2,
|
||||
addExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.5, Ts: 5},
|
||||
{Labels: series1, Value: 0.6, Ts: 6},
|
||||
{Labels: series1, Value: 0.7, Ts: 7},
|
||||
},
|
||||
wantExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.6, Ts: 6},
|
||||
{Labels: series1, Value: 0.7, Ts: 7},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "grow then shrink out-of-order",
|
||||
addExemplars1: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
resize1: 5,
|
||||
wantExemplars1: []exemplar.Exemplar{
|
||||
// We delete in the order of ingestion, not temporally.
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
},
|
||||
resize2: 2,
|
||||
addExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.7, Ts: 7},
|
||||
{Labels: series1, Value: 0.5, Ts: 5},
|
||||
{Labels: series1, Value: 0.6, Ts: 6},
|
||||
},
|
||||
wantExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.5, Ts: 5},
|
||||
{Labels: series1, Value: 0.6, Ts: 6},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range resizeTwiceCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(3, eMetrics, 100)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
for _, ex := range tc.addExemplars1 {
|
||||
require.NoError(t, es.AddExemplar(ex.Labels, ex))
|
||||
}
|
||||
es.Resize(tc.resize1)
|
||||
gotExemplars, err := es.Select(0, 1000, matcher1)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gotExemplars, 1)
|
||||
require.Equal(t, tc.wantExemplars1, gotExemplars[0].Exemplars)
|
||||
es.Resize(tc.resize2)
|
||||
for _, ex := range tc.addExemplars2 {
|
||||
require.NoError(t, es.AddExemplar(ex.Labels, ex))
|
||||
}
|
||||
if testing.Verbose() {
|
||||
t.Logf("Buffer[after-resize2]:\n%s", debugCircularBuffer(es))
|
||||
}
|
||||
gotExemplars, err = es.Select(0, 1000, matcher1)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gotExemplars, 1)
|
||||
require.Equal(t, tc.wantExemplars2, gotExemplars[0].Exemplars)
|
||||
})
|
||||
}
|
||||
require.Equal(t, storage.ErrExemplarLabelLength, es.AddExemplar(l, e4))
|
||||
}
|
||||
|
||||
func TestStorageOverflow(t *testing.T) {
|
||||
// Test that circular buffer index and assignment
|
||||
// works properly, adding more exemplars than can
|
||||
// be stored and then querying for them.
|
||||
exs, err := NewCircularExemplarStorage(5, eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(5, eMetrics, 0)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
|
|
@ -152,7 +723,7 @@ func TestStorageOverflow(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSelectExemplar(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(5, eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(5, eMetrics, 0)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
|
|
@ -179,7 +750,7 @@ func TestSelectExemplar(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSelectExemplar_MultiSeries(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(5, eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(5, eMetrics, 0)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
|
|
@ -223,7 +794,7 @@ func TestSelectExemplar_MultiSeries(t *testing.T) {
|
|||
|
||||
func TestSelectExemplar_TimeRange(t *testing.T) {
|
||||
var lenEs int64 = 5
|
||||
exs, err := NewCircularExemplarStorage(lenEs, eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(lenEs, eMetrics, 0)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
|
|
@ -251,7 +822,7 @@ func TestSelectExemplar_TimeRange(t *testing.T) {
|
|||
// Test to ensure that even though a series matches more than one matcher from the
|
||||
// query that it's exemplars are only included in the result a single time.
|
||||
func TestSelectExemplar_DuplicateSeries(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(4, eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(4, eMetrics, 0)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
|
|
@ -286,7 +857,7 @@ func TestSelectExemplar_DuplicateSeries(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIndexOverwrite(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(2, eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(2, eMetrics, 0)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
|
|
@ -374,7 +945,7 @@ func TestResize(t *testing.T) {
|
|||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics, 0)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
|
|
@ -386,7 +957,14 @@ func TestResize(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if testing.Verbose() {
|
||||
t.Logf("Buffer[before-resize]:\n%s", debugCircularBuffer(es))
|
||||
}
|
||||
resized := es.Resize(tc.newCount)
|
||||
if testing.Verbose() {
|
||||
t.Logf("Buffer[after-resize]:\n%s", debugCircularBuffer(es))
|
||||
}
|
||||
|
||||
require.Equal(t, tc.expectedMigrated, resized)
|
||||
|
||||
q, err := es.Querier(context.TODO())
|
||||
|
|
@ -421,7 +999,7 @@ func BenchmarkAddExemplar(b *testing.B) {
|
|||
b.Run(fmt.Sprintf("%d/%d", n, capacity), func(b *testing.B) {
|
||||
for b.Loop() {
|
||||
b.StopTimer()
|
||||
exs, err := NewCircularExemplarStorage(int64(capacity), eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(int64(capacity), eMetrics, 0)
|
||||
require.NoError(b, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
var l labels.Labels
|
||||
|
|
@ -442,6 +1020,91 @@ func BenchmarkAddExemplar(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
func BenchmarkAddExemplar_OutOfOrder(b *testing.B) {
|
||||
// We need to include these labels since we do length calculation
|
||||
// before adding.
|
||||
exLabels := labels.FromStrings("trace_id", "89620921")
|
||||
|
||||
const (
|
||||
capacity = 5000
|
||||
)
|
||||
|
||||
fillOneSeries := func(es *CircularExemplarStorage) {
|
||||
for i := range capacity {
|
||||
e := exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels}
|
||||
if err := es.AddExemplar(exLabels, e); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fillMultipleSeries := func(es *CircularExemplarStorage) {
|
||||
for i := range capacity {
|
||||
l := labels.FromStrings("service", strconv.Itoa(i))
|
||||
e := exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: l}
|
||||
if err := es.AddExemplar(l, e); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
outOfOrder := func(ts *int64, _ *labels.Labels) {
|
||||
switch *ts % 3 {
|
||||
case 0:
|
||||
return
|
||||
case 1:
|
||||
*ts = capacity - *ts
|
||||
case 2:
|
||||
*ts = (capacity - *ts) + 100
|
||||
}
|
||||
}
|
||||
|
||||
reverseOrder := func(ts *int64, _ *labels.Labels) {
|
||||
*ts = capacity - *ts
|
||||
}
|
||||
|
||||
multipleSeries := func(f func(*int64, *labels.Labels)) func(*int64, *labels.Labels) {
|
||||
return func(ts *int64, l *labels.Labels) {
|
||||
f(ts, l)
|
||||
*l = labels.FromStrings("service", strconv.Itoa(int(*ts)))
|
||||
}
|
||||
}
|
||||
|
||||
for fillName, setup := range map[string]func(es *CircularExemplarStorage){
|
||||
"empty": func(*CircularExemplarStorage) {},
|
||||
"full-one": fillOneSeries,
|
||||
"full-multiple": fillMultipleSeries,
|
||||
} {
|
||||
for orderName, forEach := range map[string]func(ts *int64, l *labels.Labels){
|
||||
"in-order": func(*int64, *labels.Labels) {},
|
||||
"reverse": reverseOrder,
|
||||
"out-of-order": outOfOrder,
|
||||
"multi-in-order": multipleSeries(func(*int64, *labels.Labels) {}),
|
||||
"multi-reverse": multipleSeries(reverseOrder),
|
||||
"multi-out-of-order": multipleSeries(outOfOrder),
|
||||
} {
|
||||
b.Run(fmt.Sprintf("%s/%s", fillName, orderName), func(b *testing.B) {
|
||||
exs, err := NewCircularExemplarStorage(int64(capacity), eMetrics, 100000)
|
||||
require.NoError(b, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
l := labels.FromStrings("service", "0")
|
||||
setup(es)
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
for i := range capacity {
|
||||
ts := int64(i)
|
||||
forEach(&ts, &l)
|
||||
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: ts, Labels: l})
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to insert item %d %s: %v", i, l, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkResizeExemplars(b *testing.B) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
|
@ -479,7 +1142,7 @@ func BenchmarkResizeExemplars(b *testing.B) {
|
|||
b.Run(fmt.Sprintf("%s-%d-to-%d", tc.name, tc.startSize, tc.endSize), func(b *testing.B) {
|
||||
for b.Loop() {
|
||||
b.StopTimer()
|
||||
exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics, 0)
|
||||
require.NoError(b, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
|
|
@ -504,7 +1167,7 @@ func BenchmarkResizeExemplars(b *testing.B) {
|
|||
// TestCircularExemplarStorage_Concurrent_AddExemplar_Resize tries to provoke a data race between AddExemplar and Resize.
|
||||
// Run with race detection enabled.
|
||||
func TestCircularExemplarStorage_Concurrent_AddExemplar_Resize(t *testing.T) {
|
||||
exs, err := NewCircularExemplarStorage(0, eMetrics)
|
||||
exs, err := NewCircularExemplarStorage(0, eMetrics, 0)
|
||||
require.NoError(t, err)
|
||||
es := exs.(*CircularExemplarStorage)
|
||||
|
||||
|
|
@ -537,3 +1200,30 @@ func TestCircularExemplarStorage_Concurrent_AddExemplar_Resize(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// debugCircularBuffer iterates all exemplars in the circular exemplar storage
|
||||
// and returns them as a string. The textual representation contains index
|
||||
// pointers and helps debugging exemplar storage.
|
||||
func debugCircularBuffer(ce *CircularExemplarStorage) string {
|
||||
var sb strings.Builder
|
||||
for i, e := range ce.exemplars {
|
||||
if e.ref == nil {
|
||||
continue
|
||||
}
|
||||
sb.WriteString(fmt.Sprintf(
|
||||
"i: %d, ts: %d, next: %d, prev: %d",
|
||||
i, e.exemplar.Ts, e.next, e.prev,
|
||||
))
|
||||
for _, idx := range ce.index {
|
||||
if i == idx.newest {
|
||||
sb.WriteString(" <- newest " + idx.seriesLabels.String())
|
||||
}
|
||||
if i == idx.oldest {
|
||||
sb.WriteString(" <- oldest " + idx.seriesLabels.String())
|
||||
}
|
||||
}
|
||||
sb.WriteString("\n")
|
||||
}
|
||||
sb.WriteString(fmt.Sprintf("Next index: %d\n", ce.nextIndex))
|
||||
return sb.String()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -327,7 +327,7 @@ func (h *Head) resetInMemoryState() error {
|
|||
if em == nil {
|
||||
em = NewExemplarMetrics(h.reg)
|
||||
}
|
||||
es, err := NewCircularExemplarStorage(h.opts.MaxExemplars.Load(), em)
|
||||
es, err := NewCircularExemplarStorage(h.opts.MaxExemplars.Load(), em, h.opts.OutOfOrderTimeWindow.Load())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -1037,6 +1037,8 @@ func (h *Head) ApplyConfig(cfg *config.Config, wbl *wlog.WL) {
|
|||
return
|
||||
}
|
||||
|
||||
h.exemplars.(*CircularExemplarStorage).SetOutOfOrderTimeWindow(oooTimeWindow)
|
||||
|
||||
// Head uses opts.MaxExemplars in combination with opts.EnableExemplarStorage
|
||||
// to decide if it should pass exemplars along to its exemplar storage, so we
|
||||
// need to update opts.MaxExemplars here.
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ func NewWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) {
|
|||
reg := prometheus.NewRegistry()
|
||||
eMetrics := tsdb.NewExemplarMetrics(reg)
|
||||
|
||||
es, err := tsdb.NewCircularExemplarStorage(10, eMetrics)
|
||||
es, err := tsdb.NewCircularExemplarStorage(10, eMetrics, opts.OutOfOrderTimeWindow)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening test exemplar storage: %w", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue