mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-28 04:02:21 -04:00
tsdb: fix grow/shrink nextIndex calculation (#17863)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Signed-off-by: Julius Hinze <julius.hinze@grafana.com>
This commit is contained in:
parent
3374d2e56f
commit
ccb7468b09
2 changed files with 57 additions and 13 deletions
|
|
@ -327,7 +327,8 @@ func (ce *CircularExemplarStorage) grow(l int64) int {
|
|||
{from: ce.nextIndex, to: oldSize},
|
||||
{from: 0, to: ce.nextIndex},
|
||||
}
|
||||
ce.nextIndex = copyExemplarRanges(ce.index, newSlice, ce.exemplars, ranges)
|
||||
totalCopied, _ := copyExemplarRanges(ce.index, newSlice, ce.exemplars, ranges)
|
||||
ce.nextIndex = totalCopied
|
||||
ce.exemplars = newSlice
|
||||
return oldSize
|
||||
}
|
||||
|
|
@ -353,6 +354,7 @@ func (ce *CircularExemplarStorage) shrink(l int64) (migrated int) {
|
|||
|
||||
newSlice := make([]circularBufferEntry, int(l))
|
||||
|
||||
var totalCopied int
|
||||
switch {
|
||||
case deleteStart == deleteEnd:
|
||||
// The entire buffer was cleared (shrink to zero). Note that we don't have to
|
||||
|
|
@ -363,18 +365,18 @@ func (ce *CircularExemplarStorage) shrink(l int64) (migrated int) {
|
|||
return 0
|
||||
case deleteStart < deleteEnd:
|
||||
// We delete an "inner" section of the circular buffer.
|
||||
migrated = copyExemplarRanges(ce.index, newSlice, ce.exemplars, []intRange{
|
||||
totalCopied, migrated = copyExemplarRanges(ce.index, newSlice, ce.exemplars, []intRange{
|
||||
{from: deleteEnd, to: oldSize},
|
||||
{from: 0, to: deleteStart},
|
||||
})
|
||||
case deleteStart > deleteEnd:
|
||||
// We keep an "inner" section of the circular buffer.
|
||||
migrated = copyExemplarRanges(ce.index, newSlice, ce.exemplars, []intRange{
|
||||
totalCopied, migrated = copyExemplarRanges(ce.index, newSlice, ce.exemplars, []intRange{
|
||||
{from: deleteEnd, to: deleteStart},
|
||||
})
|
||||
}
|
||||
|
||||
ce.nextIndex = migrated % int(l)
|
||||
ce.nextIndex = totalCopied % int(l)
|
||||
ce.exemplars = newSlice
|
||||
return migrated
|
||||
}
|
||||
|
|
@ -582,20 +584,21 @@ func (e intRange) contains(i int) bool {
|
|||
}
|
||||
|
||||
// copyExemplarRanges copies non-overlapping ranges from src into dest and
|
||||
// adjusts list pointers in dest and index accordingly. Returns the number of
|
||||
// copied items.
|
||||
// adjusts list pointers in dest and index accordingly. Returns the total
|
||||
// number of slots copied (for nextIndex) and the number of non-empty entries
|
||||
// migrated.
|
||||
func copyExemplarRanges(
|
||||
index map[string]*indexEntry,
|
||||
dest, src []circularBufferEntry,
|
||||
ranges []intRange,
|
||||
) int {
|
||||
) (totalCopied, migratedEntries int) {
|
||||
offsets := make([]int, len(ranges))
|
||||
n := 0
|
||||
for i, rng := range ranges {
|
||||
offsets[i] = n - rng.from
|
||||
n += copy(dest[n:], src[rng.from:rng.to])
|
||||
}
|
||||
migratedEntries := n
|
||||
migratedEntries = n
|
||||
for di := range n {
|
||||
e := &dest[di]
|
||||
if e.ref == nil {
|
||||
|
|
@ -631,5 +634,5 @@ func copyExemplarRanges(
|
|||
}
|
||||
}
|
||||
}
|
||||
return migratedEntries
|
||||
return n, migratedEntries
|
||||
}
|
||||
|
|
|
|||
|
|
@ -390,7 +390,7 @@ func TestCircularExemplarStorage_Resize(t *testing.T) {
|
|||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
wantNextIndex: 2,
|
||||
wantNextIndex: 3,
|
||||
},
|
||||
{
|
||||
name: "in-order, shrink",
|
||||
|
|
@ -431,7 +431,7 @@ func TestCircularExemplarStorage_Resize(t *testing.T) {
|
|||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
},
|
||||
wantNextIndex: 2,
|
||||
wantNextIndex: 3,
|
||||
},
|
||||
{
|
||||
name: "duplicate timestamps",
|
||||
|
|
@ -452,7 +452,7 @@ func TestCircularExemplarStorage_Resize(t *testing.T) {
|
|||
exemplars: []exemplar.Exemplar{},
|
||||
resize: 10,
|
||||
wantExemplars: []exemplar.Exemplar{},
|
||||
wantNextIndex: 0,
|
||||
wantNextIndex: 3,
|
||||
},
|
||||
{
|
||||
name: "empty input, shrink",
|
||||
|
|
@ -507,7 +507,7 @@ func TestCircularExemplarStorage_Resize(t *testing.T) {
|
|||
wantExemplars: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
},
|
||||
wantNextIndex: 1,
|
||||
wantNextIndex: 0,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -660,6 +660,47 @@ func TestCircularExemplarStorage_Resize(t *testing.T) {
|
|||
{Labels: series1, Value: 0.6, Ts: 6},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "grow non-full buffer then add entries",
|
||||
addExemplars1: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
resize1: 10,
|
||||
wantExemplars1: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
resize2: 10,
|
||||
addExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
},
|
||||
wantExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
{Labels: series1, Value: 0.3, Ts: 3},
|
||||
{Labels: series1, Value: 0.4, Ts: 4},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "shrink non-full buffer then add entries",
|
||||
addExemplars1: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
},
|
||||
resize1: 2,
|
||||
wantExemplars1: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
},
|
||||
resize2: 2,
|
||||
addExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
wantExemplars2: []exemplar.Exemplar{
|
||||
{Labels: series1, Value: 0.1, Ts: 1},
|
||||
{Labels: series1, Value: 0.2, Ts: 2},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range resizeTwiceCases {
|
||||
|
|
|
|||
Loading…
Reference in a new issue