diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index e5ad4028bb..7f3b2a5968 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -343,7 +343,7 @@ func (a *FloatHistogramAppender) appendable(h *histogram.FloatHistogram) ( // original deltas to a new set of deltas to match a new span layout that adds // buckets, we simply need to generate a list of inserts. // -// Note: Within expandSpansForward we don't have to worry about the changes to the +// Note: Within expandFloatSpansAndBuckets we don't have to worry about the changes to the // spans themselves, thanks to the iterators we get to work with the more useful // bucket indices (which of course directly correspond to the buckets we have to // adjust). @@ -378,6 +378,48 @@ func expandFloatSpansAndBuckets(a, b []histogram.Span, aBuckets []xorValue, bBuc bCount = bBuckets[bCountIdx] } + // addInsert updates the current Insert with a new insert at the given + // bucket index (otherIdx). + addInsert := func(inserts []Insert, insert *Insert, otherIdx int) []Insert { + if insert.num == 0 { + // First insert. + insert.bucketIdx = otherIdx + } else if insert.bucketIdx+insert.num != otherIdx { + // Insert is not continuous from previous insert. + inserts = append(inserts, *insert) + insert.num = 0 + insert.bucketIdx = otherIdx + } + insert.num++ + return inserts + } + + advanceA := func() { + if aInter.num > 0 { + aInserts = append(aInserts, aInter) + aInter.num = 0 + } + aIdx, aOK = ai.Next() + aInter.pos++ + aCountIdx++ + if aOK { + aCount = aBuckets[aCountIdx].value + } + } + + advanceB := func() { + if bInter.num > 0 { + bInserts = append(bInserts, bInter) + bInter.num = 0 + } + bIdx, bOK = bi.Next() + bInter.pos++ + bCountIdx++ + if bOK { + bCount = bBuckets[bCountIdx] + } + } + loop: for { switch { @@ -389,105 +431,37 @@ loop: return nil, nil, false } - // Finish WIP insert for a and reset. - if aInter.num > 0 { - aInserts = append(aInserts, aInter) - aInter.num = 0 - } - - // Finish WIP insert for b and reset. - if bInter.num > 0 { - bInserts = append(bInserts, bInter) - bInter.num = 0 - } - - aIdx, aOK = ai.Next() - bIdx, bOK = bi.Next() - aInter.pos++ // Advance potential insert position. - aCountIdx++ // Advance absolute bucket count index for a. - if aOK { - aCount = aBuckets[aCountIdx].value - } - bInter.pos++ // Advance potential insert position. - bCountIdx++ // Advance absolute bucket count index for b. - if bOK { - bCount = bBuckets[bCountIdx] - } + advanceA() + advanceB() continue case aIdx < bIdx: // b misses a bucket index that is in a. // This is ok if the count in a is 0, in which case we make a note to // fill in the bucket in b and advance a. if aCount == 0 { - bInter.num++ // Mark that we need to insert a bucket in b. - bInter.bucketIdx = aIdx - // Advance a - if aInter.num > 0 { - aInserts = append(aInserts, aInter) - aInter.num = 0 - } - aIdx, aOK = ai.Next() - aInter.pos++ - aCountIdx++ - if aOK { - aCount = aBuckets[aCountIdx].value - } + bInserts = addInsert(bInserts, &bInter, aIdx) + advanceA() continue } // Otherwise we are missing a bucket that was in use in a, which is a reset. return nil, nil, false case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare. - aInter.num++ - bInter.bucketIdx = bIdx - // Advance b - if bInter.num > 0 { - bInserts = append(bInserts, bInter) - bInter.num = 0 - } - bIdx, bOK = bi.Next() - bInter.pos++ - bCountIdx++ - if bOK { - bCount = bBuckets[bCountIdx] - } + aInserts = addInsert(aInserts, &aInter, bIdx) + advanceB() } case aOK && !bOK: // b misses a value that is in a. // This is ok if the count in a is 0, in which case we make a note to // fill in the bucket in b and advance a. if aCount == 0 { - bInter.num++ - bInter.bucketIdx = aIdx - // Advance a - if aInter.num > 0 { - aInserts = append(aInserts, aInter) - aInter.num = 0 - } - aIdx, aOK = ai.Next() - aInter.pos++ // Advance potential insert position. - // Update absolute bucket counts for a. - aCountIdx++ - if aOK { - aCount = aBuckets[aCountIdx].value - } + bInserts = addInsert(bInserts, &bInter, aIdx) + advanceA() continue } // Otherwise we are missing a bucket that was in use in a, which is a reset. return nil, nil, false case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. - aInter.num++ - bInter.bucketIdx = bIdx - // Advance b - if bInter.num > 0 { - bInserts = append(bInserts, bInter) - bInter.num = 0 - } - bIdx, bOK = bi.Next() - bInter.pos++ // Advance potential insert position. - // Update absolute bucket counts for b. - bCountIdx++ - if bOK { - bCount = bBuckets[bCountIdx] - } + aInserts = addInsert(aInserts, &aInter, bIdx) + advanceB() default: // Both iterators ran out. We're done. if aInter.num > 0 { aInserts = append(aInserts, aInter) @@ -782,7 +756,7 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend // The histogram needs to be expanded to have the extra empty buckets // of the chunk. if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 { - // No new chunks from the histogram, so the spans of the appender can accommodate the new buckets. + // No new buckets from the histogram, so the spans of the appender can accommodate the new buckets. // However we need to make a copy in case the input is sharing spans from an iterator. h.PositiveSpans = make([]histogram.Span, len(a.pSpans)) copy(h.PositiveSpans, a.pSpans) diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index e99fd1c160..f715716b65 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -195,7 +195,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1)) } -// Mimics the scenario described for expandSpansForward. +// Mimics the scenario described for expandFloatSpansAndBuckets. func TestFloatHistogramChunkBucketChanges(t *testing.T) { c := Chunk(NewFloatHistogramChunk()) @@ -1420,3 +1420,45 @@ func assertFirstFloatHistogramSampleHint(t *testing.T, chunk Chunk, expected his _, v := it.AtFloatHistogram(nil) require.Equal(t, expected, v.CounterResetHint) } + +func TestFloatHistogramEmptyBucketsWithGaps(t *testing.T) { + h1 := &histogram.FloatHistogram{ + PositiveSpans: []histogram.Span{ + {Offset: -19, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{0, 0, 0, 0}, + } + require.NoError(t, h1.Validate()) + + c := NewFloatHistogramChunk() + app, err := c.Appender() + require.NoError(t, err) + _, _, _, err = app.AppendFloatHistogram(nil, 1, h1, false) + require.NoError(t, err) + + h2 := &histogram.FloatHistogram{ + PositiveSpans: []histogram.Span{ + {Offset: -19, Length: 1}, + {Offset: 4, Length: 1}, + {Offset: 3, Length: 1}, + }, + PositiveBuckets: []float64{0, 0, 0}, + } + require.NoError(t, h2.Validate()) + + newC, recoded, _, err := app.AppendFloatHistogram(nil, 2, h2, false) + require.NoError(t, err) + require.True(t, recoded) + require.NotNil(t, newC) + + it := newC.Iterator(nil) + require.Equal(t, ValFloatHistogram, it.Next()) + _, h := it.AtFloatHistogram(nil) + require.NoError(t, h.Validate()) + require.Equal(t, ValFloatHistogram, it.Next()) + _, h = it.AtFloatHistogram(nil) + require.NoError(t, h.Validate()) + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) +} diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 0f54eb6928..4ba0c467d8 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -374,7 +374,7 @@ func (a *HistogramAppender) appendable(h *histogram.Histogram) ( // original deltas to a new set of deltas to match a new span layout that adds // buckets, we simply need to generate a list of inserts. // -// Note: Within expandSpansForward we don't have to worry about the changes to the +// Note: Within expandIntSpansAndBuckets we don't have to worry about the changes to the // spans themselves, thanks to the iterators we get to work with the more useful // bucket indices (which of course directly correspond to the buckets we have to // adjust). @@ -409,6 +409,48 @@ func expandIntSpansAndBuckets(a, b []histogram.Span, aBuckets, bBuckets []int64) bCount = bBuckets[bCountIdx] } + // addInsert updates the current Insert with a new insert at the given + // bucket index (otherIdx). + addInsert := func(inserts []Insert, insert *Insert, otherIdx int) []Insert { + if insert.num == 0 { + // First insert. + insert.bucketIdx = otherIdx + } else if insert.bucketIdx+insert.num != otherIdx { + // Insert is not continuous from previous insert. + inserts = append(inserts, *insert) + insert.num = 0 + insert.bucketIdx = otherIdx + } + insert.num++ + return inserts + } + + advanceA := func() { + if aInter.num > 0 { + aInserts = append(aInserts, aInter) + aInter.num = 0 + } + aIdx, aOK = ai.Next() + aInter.pos++ + aCountIdx++ + if aOK { + aCount += aBuckets[aCountIdx] + } + } + + advanceB := func() { + if bInter.num > 0 { + bInserts = append(bInserts, bInter) + bInter.num = 0 + } + bIdx, bOK = bi.Next() + bInter.pos++ + bCountIdx++ + if bOK { + bCount += bBuckets[bCountIdx] + } + } + loop: for { switch { @@ -420,105 +462,37 @@ loop: return nil, nil, false } - // Finish WIP insert for a and reset. - if aInter.num > 0 { - aInserts = append(aInserts, aInter) - aInter.num = 0 - } - - // Finish WIP insert for b and reset. - if bInter.num > 0 { - bInserts = append(bInserts, bInter) - bInter.num = 0 - } - - aIdx, aOK = ai.Next() - bIdx, bOK = bi.Next() - aInter.pos++ // Advance potential insert position. - aCountIdx++ // Advance absolute bucket count index for a. - if aOK { - aCount += aBuckets[aCountIdx] - } - bInter.pos++ // Advance potential insert position. - bCountIdx++ // Advance absolute bucket count index for b. - if bOK { - bCount += bBuckets[bCountIdx] - } + advanceA() + advanceB() continue case aIdx < bIdx: // b misses a bucket index that is in a. // This is ok if the count in a is 0, in which case we make a note to // fill in the bucket in b and advance a. if aCount == 0 { - bInter.num++ // Mark that we need to insert a bucket in b. - bInter.bucketIdx = aIdx - // Advance a - if aInter.num > 0 { - aInserts = append(aInserts, aInter) - aInter.num = 0 - } - aIdx, aOK = ai.Next() - aInter.pos++ - aCountIdx++ - if aOK { - aCount += aBuckets[aCountIdx] - } + bInserts = addInsert(bInserts, &bInter, aIdx) + advanceA() continue } // Otherwise we are missing a bucket that was in use in a, which is a reset. return nil, nil, false case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare. - aInter.num++ - aInter.bucketIdx = bIdx - // Advance b - if bInter.num > 0 { - bInserts = append(bInserts, bInter) - bInter.num = 0 - } - bIdx, bOK = bi.Next() - bInter.pos++ - bCountIdx++ - if bOK { - bCount += bBuckets[bCountIdx] - } + aInserts = addInsert(aInserts, &aInter, bIdx) + advanceB() } case aOK && !bOK: // b misses a value that is in a. // This is ok if the count in a is 0, in which case we make a note to // fill in the bucket in b and advance a. if aCount == 0 { - bInter.num++ - bInter.bucketIdx = aIdx - // Advance a - if aInter.num > 0 { - aInserts = append(aInserts, aInter) - aInter.num = 0 - } - aIdx, aOK = ai.Next() - aInter.pos++ // Advance potential insert position. - // Update absolute bucket counts for a. - aCountIdx++ - if aOK { - aCount += aBuckets[aCountIdx] - } + bInserts = addInsert(bInserts, &bInter, aIdx) + advanceA() continue } // Otherwise we are missing a bucket that was in use in a, which is a reset. return nil, nil, false case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. - aInter.num++ - aInter.bucketIdx = bIdx - // Advance b - if bInter.num > 0 { - bInserts = append(bInserts, bInter) - bInter.num = 0 - } - bIdx, bOK = bi.Next() - bInter.pos++ // Advance potential insert position. - // Update absolute bucket counts for b. - bCountIdx++ - if bOK { - bCount += bBuckets[bCountIdx] - } + aInserts = addInsert(aInserts, &aInter, bIdx) + advanceB() default: // Both iterators ran out. We're done. if aInter.num > 0 { aInserts = append(aInserts, aInter) @@ -823,7 +797,7 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h // The histogram needs to be expanded to have the extra empty buckets // of the chunk. if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 { - // No new chunks from the histogram, so the spans of the appender can accommodate the new buckets. + // No new buckets from the histogram, so the spans of the appender can accommodate the new buckets. // However we need to make a copy in case the input is sharing spans from an iterator. h.PositiveSpans = make([]histogram.Span, len(a.pSpans)) copy(h.PositiveSpans, a.pSpans) diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index 7bb31acf00..5ee783fd68 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -284,101 +284,12 @@ type Insert struct { bucketIdx int } -// Deprecated: expandSpansForward, use expandIntSpansAndBuckets or -// expandFloatSpansAndBuckets instead. -// expandSpansForward is left here for reference. -// expandSpansForward returns the inserts to expand the bucket spans 'a' so that -// they match the spans in 'b'. 'b' must cover the same or more buckets than -// 'a', otherwise the function will return false. -// -// Example: -// -// Let's say the old buckets look like this: -// -// span syntax: [offset, length] -// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1] -// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15] -// raw values 6 3 3 2 4 5 1 -// deltas 6 -3 0 -1 2 1 -4 -// -// But now we introduce a new bucket layout. (Carefully chosen example where we -// have a span appended, one unchanged[*], one prepended, and two merge - in -// that order.) -// -// [*] unchanged in terms of which bucket indices they represent. but to achieve -// that, their offset needs to change if "disrupted" by spans changing ahead of -// them -// -// \/ this one is "unchanged" -// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ] -// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15] -// raw values 6 3 0 3 0 0 2 4 5 0 1 -// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1 -// delta mods: / \ / \ / \ -// -// Note for histograms with delta-encoded buckets: Whenever any new buckets are -// introduced, the subsequent "old" bucket needs to readjust its delta to the -// new base of 0. Thus, for the caller who wants to transform the set of -// original deltas to a new set of deltas to match a new span layout that adds -// buckets, we simply need to generate a list of inserts. -// -// Note: Within expandSpansForward we don't have to worry about the changes to the -// spans themselves, thanks to the iterators we get to work with the more useful -// bucket indices (which of course directly correspond to the buckets we have to -// adjust). -func expandSpansForward(a, b []histogram.Span) (forward []Insert, ok bool) { - ai := newBucketIterator(a) - bi := newBucketIterator(b) - - var inserts []Insert - - // When inter.num becomes > 0, this becomes a valid insert that should - // be yielded when we finish a streak of new buckets. - var inter Insert - - av, aOK := ai.Next() - bv, bOK := bi.Next() -loop: - for { - switch { - case aOK && bOK: - switch { - case av == bv: // Both have an identical value. move on! - // Finish WIP insert and reset. - if inter.num > 0 { - inserts = append(inserts, inter) - } - inter.num = 0 - av, aOK = ai.Next() - bv, bOK = bi.Next() - inter.pos++ - case av < bv: // b misses a value that is in a. - return inserts, false - case av > bv: // a misses a value that is in b. Forward b and recompare. - inter.num++ - bv, bOK = bi.Next() - } - case aOK && !bOK: // b misses a value that is in a. - return inserts, false - case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. - inter.num++ - bv, bOK = bi.Next() - default: // Both iterators ran out. We're done. - if inter.num > 0 { - inserts = append(inserts, inter) - } - break loop - } - } - - return inserts, true -} - -// expandSpansBothWays is similar to expandSpansForward, but now b may also -// cover an entirely different set of buckets. The function returns the -// “forward” inserts to expand 'a' to also cover all the buckets exclusively -// covered by 'b', and it returns the “backward” inserts to expand 'b' to also -// cover all the buckets exclusively covered by 'a'. +// expandSpansBothWays is similar to expandFloatSpansAndBuckets and +// expandIntSpansAndBuckets, but now b may also cover an entirely different set +// of buckets and counter resets are ignored. The function returns the “forward” +// inserts to expand 'a' to also cover all the buckets exclusively covered by +// 'b', and it returns the “backward” inserts to expand 'b' to also cover all +// the buckets exclusively covered by 'a'. func expandSpansBothWays(a, b []histogram.Span) (forward, backward []Insert, mergedSpans []histogram.Span) { ai := newBucketIterator(a) bi := newBucketIterator(b) @@ -488,14 +399,24 @@ func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV { ii int // The next insert to process. ) for i, d := range in { - if ii < len(inserts) && i == inserts[ii].pos { + if ii >= len(inserts) || i != inserts[ii].pos { + // No inserts at this position, the original delta is still valid. + out[oi] = d + oi++ + v += d + continue + } + // Process inserts. + firstInsert := true + for ii < len(inserts) && i == inserts[ii].pos { // We have an insert! // Add insert.num new delta values such that their // bucket values equate 0. When deltas==false, it means // that it is an absolute value. So we set it to 0 // directly. - if deltas { + if deltas && firstInsert { out[oi] = -v + firstInsert = false // No need to go to 0 in further inserts. } else { out[oi] = 0 } @@ -505,32 +426,30 @@ func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV { oi++ } ii++ - - // Now save the value from the input. The delta value we - // should save is the original delta value + the last - // value of the point before the insert (to undo the - // delta that was introduced by the insert). When - // deltas==false, it means that it is an absolute value, - // so we set it directly to the value in the 'in' slice. - if deltas { - out[oi] = d + v - } else { - out[oi] = d - } - oi++ - v = d + v - continue } - // If there was no insert, the original delta is still valid. - out[oi] = d + // Now save the value from the input. The delta value we + // should save is the original delta value + the last + // value of the point before the insert (to undo the + // delta that was introduced by the insert). When + // deltas==false, it means that it is an absolute value, + // so we set it directly to the value in the 'in' slice. + if deltas { + out[oi] = d + v + } else { + out[oi] = d + } oi++ v += d } - switch ii { - case len(inserts): - // All inserts processed. Nothing more to do. - case len(inserts) - 1: - // One more insert to process at the end. + // Insert empty buckets at the end. + for ii < len(inserts) { + if inserts[ii].pos < len(in) { + panic("leftover inserts must be after the current buckets") + } + // Add insert.num new delta values such that their + // bucket values equate 0. When deltas==false, it means + // that it is an absolute value. So we set it to 0 + // directly. if deltas { out[oi] = -v } else { @@ -541,8 +460,8 @@ func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV { out[oi] = 0 oi++ } - default: - panic("unprocessed inserts left") + ii++ + v = 0 } return out } @@ -628,7 +547,7 @@ func adjustForInserts(spans []histogram.Span, inserts []Insert) (mergedSpans []h } } for i < len(inserts) { - addBucket(inserts[i].bucketIdx) + addBucket(insertIdx) consumeInsert() } return diff --git a/tsdb/chunkenc/histogram_meta_test.go b/tsdb/chunkenc/histogram_meta_test.go index 1774dee867..d3aa979b5e 100644 --- a/tsdb/chunkenc/histogram_meta_test.go +++ b/tsdb/chunkenc/histogram_meta_test.go @@ -78,7 +78,7 @@ func TestBucketIterator(t *testing.T) { }, idxs: []int{100, 101, 102, 103, 112, 113, 114, 115, 116, 117, 118, 119}, }, - // The below 2 sets ore the ones described in expandSpansForward's comments. + // The below 2 sets ore the ones described in expandFloatSpansAndBuckets's comments. { spans: []histogram.Span{ {Offset: 0, Length: 2}, @@ -111,12 +111,13 @@ func TestBucketIterator(t *testing.T) { } } -func TestCompareSpansAndInsert(t *testing.T) { +func TestExpandSpansBothWaysAndInsert(t *testing.T) { scenarios := []struct { description string spansA, spansB []histogram.Span fInserts, bInserts []Insert bucketsIn, bucketsOut []int64 + mergedSpans []histogram.Span }{ { description: "single prepend at the beginning", @@ -134,6 +135,9 @@ func TestCompareSpansAndInsert(t *testing.T) { }, bucketsIn: []int64{6, -3, 0}, bucketsOut: []int64{0, 6, -3, 0}, + mergedSpans: []histogram.Span{ + {Offset: -11, Length: 4}, + }, }, { description: "single append at the end", @@ -151,6 +155,9 @@ func TestCompareSpansAndInsert(t *testing.T) { }, bucketsIn: []int64{6, -3, 0}, bucketsOut: []int64{6, -3, 0, -3}, + mergedSpans: []histogram.Span{ + {Offset: -10, Length: 4}, + }, }, { description: "double prepend at the beginning", @@ -168,6 +175,9 @@ func TestCompareSpansAndInsert(t *testing.T) { }, bucketsIn: []int64{6, -3, 0}, bucketsOut: []int64{0, 0, 6, -3, 0}, + mergedSpans: []histogram.Span{ + {Offset: -12, Length: 5}, + }, }, { description: "double append at the end", @@ -185,6 +195,9 @@ func TestCompareSpansAndInsert(t *testing.T) { }, bucketsIn: []int64{6, -3, 0}, bucketsOut: []int64{6, -3, 0, -3, 0}, + mergedSpans: []histogram.Span{ + {Offset: -10, Length: 5}, + }, }, { description: "double prepond at the beginning and double append at the end", @@ -206,6 +219,9 @@ func TestCompareSpansAndInsert(t *testing.T) { }, bucketsIn: []int64{6, -3, 0}, bucketsOut: []int64{0, 0, 6, -3, 0, -3, 0}, + mergedSpans: []histogram.Span{ + {Offset: -12, Length: 7}, + }, }, { description: "single removal of bucket at the start", @@ -218,6 +234,11 @@ func TestCompareSpansAndInsert(t *testing.T) { bInserts: []Insert{ {pos: 0, num: 1}, }, + bucketsIn: []int64{1, 2, -1, 2}, + bucketsOut: []int64{1, 2, -1, 2}, + mergedSpans: []histogram.Span{ + {Offset: -10, Length: 4}, + }, }, { description: "single removal of bucket in the middle", @@ -231,6 +252,11 @@ func TestCompareSpansAndInsert(t *testing.T) { bInserts: []Insert{ {pos: 2, num: 1}, }, + bucketsIn: []int64{1, 2, -1, 2}, + bucketsOut: []int64{1, 2, -1, 2}, + mergedSpans: []histogram.Span{ + {Offset: -10, Length: 4}, + }, }, { description: "single removal of bucket at the end", @@ -243,6 +269,11 @@ func TestCompareSpansAndInsert(t *testing.T) { bInserts: []Insert{ {pos: 3, num: 1}, }, + mergedSpans: []histogram.Span{ + {Offset: -10, Length: 4}, + }, + bucketsIn: []int64{1, 2, -1, 2}, + bucketsOut: []int64{1, 2, -1, 2}, }, { description: "as described in doc comment", @@ -275,6 +306,12 @@ func TestCompareSpansAndInsert(t *testing.T) { }, bucketsIn: []int64{6, -3, 0, -1, 2, 1, -4}, bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}, + mergedSpans: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + }, }, { description: "both forward and backward inserts, complex case", @@ -324,27 +361,51 @@ func TestCompareSpansAndInsert(t *testing.T) { num: 1, }, }, + bucketsIn: []int64{1, 2, -1, 2, 0, 3, 1}, + bucketsOut: []int64{1, 2, -3, 2, -2, 0, 4, 0, 3, -7, 8}, + mergedSpans: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + }, + }, + { + description: "inserts with gaps", + spansA: []histogram.Span{ + {Offset: -19, Length: 2}, + {Offset: 1, Length: 2}, + }, + spansB: []histogram.Span{ + {Offset: -19, Length: 1}, + {Offset: 4, Length: 1}, + {Offset: 3, Length: 1}, + }, + fInserts: []Insert{ + {pos: 4, num: 2}, + }, + bInserts: []Insert{ + {pos: 1, num: 3}, + }, + bucketsIn: []int64{1, 2, -1, 1}, + bucketsOut: []int64{1, 2, -1, 1, -3, 0}, + mergedSpans: []histogram.Span{ + {Offset: -19, Length: 2}, + {Offset: 1, Length: 3}, + {Offset: 3, Length: 1}, + }, }, } for _, s := range scenarios { t.Run(s.description, func(t *testing.T) { - if len(s.bInserts) > 0 { - fInserts, bInserts, _ := expandSpansBothWays(s.spansA, s.spansB) - require.Equal(t, s.fInserts, fInserts) - require.Equal(t, s.bInserts, bInserts) - } - - inserts, valid := expandSpansForward(s.spansA, s.spansB) - if len(s.bInserts) > 0 { - require.False(t, valid, "compareScan unexpectedly returned true") - return - } - require.True(t, valid, "compareScan unexpectedly returned false") - require.Equal(t, s.fInserts, inserts) + fInserts, bInserts, m := expandSpansBothWays(s.spansA, s.spansB) + require.Equal(t, s.fInserts, fInserts) + require.Equal(t, s.bInserts, bInserts) + require.Equal(t, s.mergedSpans, m) gotBuckets := make([]int64, len(s.bucketsOut)) - insert(s.bucketsIn, gotBuckets, inserts, true) + insert(s.bucketsIn, gotBuckets, fInserts, true) require.Equal(t, s.bucketsOut, gotBuckets) floatBucketsIn := make([]float64, len(s.bucketsIn)) @@ -362,7 +423,7 @@ func TestCompareSpansAndInsert(t *testing.T) { floatBucketsOut[i] = float64(last) } gotFloatBuckets := make([]float64, len(floatBucketsOut)) - insert(floatBucketsIn, gotFloatBuckets, inserts, false) + insert(floatBucketsIn, gotFloatBuckets, fInserts, false) require.Equal(t, floatBucketsOut, gotFloatBuckets) }) } @@ -599,3 +660,259 @@ func TestSpansFromBidirectionalCompareSpans(t *testing.T) { require.Equal(t, c.exp, act) } } + +func TestExpandIntOrFloatSpansAndBuckets(t *testing.T) { + testCases := map[string]struct { + spansA []histogram.Span + bucketsA []int64 + spansB []histogram.Span + bucketsB []int64 + + expectReset bool + expectForwardInserts []Insert + expectBackwardInserts []Insert + expectMergedSpans []histogram.Span + expectBucketsA []int64 + expectBucketsB []int64 + }{ + "empty": { + spansA: []histogram.Span{}, + bucketsA: []int64{}, + spansB: []histogram.Span{}, + bucketsB: []int64{}, + expectReset: false, + expectForwardInserts: nil, + expectBackwardInserts: nil, + expectMergedSpans: []histogram.Span{}, + expectBucketsA: []int64{}, + expectBucketsB: []int64{}, + }, + "single bucket reset to none": { + spansA: []histogram.Span{{Offset: 1, Length: 1}}, + bucketsA: []int64{1}, + spansB: []histogram.Span{}, + bucketsB: []int64{}, + expectReset: true, + }, + "single bucket reset to lower": { + spansA: []histogram.Span{{Offset: 1, Length: 1}}, + bucketsA: []int64{2}, + spansB: []histogram.Span{{Offset: 1, Length: 1}}, + bucketsB: []int64{1}, + expectReset: true, + }, + "single bucket increase": { + spansA: []histogram.Span{{Offset: 1, Length: 1}}, + bucketsA: []int64{1}, + spansB: []histogram.Span{{Offset: 1, Length: 1}}, + bucketsB: []int64{2}, + expectReset: false, + expectForwardInserts: nil, + expectBackwardInserts: nil, + expectMergedSpans: []histogram.Span{{Offset: 1, Length: 1}}, + expectBucketsA: []int64{1}, + expectBucketsB: []int64{2}, + }, + "distinct new buckets and increase": { + // A: ___1_____ + // B: 22_22___2 + // B': 22_22___2 + spansA: []histogram.Span{{Offset: 1, Length: 1}}, + bucketsA: []int64{1}, + spansB: []histogram.Span{{Offset: -2, Length: 2}, {Offset: 1, Length: 2}, {Offset: 3, Length: 1}}, + bucketsB: []int64{2, 0, 0, 0, 0}, + expectReset: false, + expectForwardInserts: []Insert{{pos: 0, num: 2, bucketIdx: -2}, {pos: 1, num: 1, bucketIdx: 2}, {pos: 1, num: 1, bucketIdx: 6}}, + expectBackwardInserts: nil, + expectMergedSpans: []histogram.Span{{Offset: -2, Length: 2}, {Offset: 1, Length: 2}, {Offset: 3, Length: 1}}, + expectBucketsA: []int64{0, 0, 1, -1, 0}, + expectBucketsB: []int64{2, 0, 0, 0, 0}, + }, + "distinct new buckets but reset": { + // A: ___2_____ + // B: 11_11___1 + spansA: []histogram.Span{{Offset: 1, Length: 1}}, + bucketsA: []int64{2}, + spansB: []histogram.Span{{Offset: -2, Length: 2}, {Offset: 1, Length: 2}, {Offset: 3, Length: 1}}, + bucketsB: []int64{1, 0, 0, 0, 0}, + expectReset: true, + }, + "distinct new buckets but missing": { + // A: ___2_____ + // B: 11__1___1 + spansA: []histogram.Span{{Offset: 1, Length: 1}}, + bucketsA: []int64{2}, + spansB: []histogram.Span{{Offset: -2, Length: 2}, {Offset: 2, Length: 1}, {Offset: 3, Length: 1}}, + bucketsB: []int64{1, 0, 0, 0}, + expectReset: true, + }, + "distinct new buckets and missing an empty bucket": { + // A: _0__ + // B: ___1 + // B': _0_1 + spansA: []histogram.Span{{Offset: 1, Length: 1}}, + bucketsA: []int64{0}, + spansB: []histogram.Span{{Offset: 3, Length: 1}}, + bucketsB: []int64{1}, + expectReset: false, + expectForwardInserts: []Insert{{pos: 1, num: 1, bucketIdx: 3}}, + expectBackwardInserts: []Insert{{pos: 0, num: 1, bucketIdx: 1}}, + expectMergedSpans: []histogram.Span{{Offset: 1, Length: 1}, {Offset: 1, Length: 1}}, + expectBucketsA: []int64{0, 0}, + expectBucketsB: []int64{0, 1}, + }, + "distinct new buckets and missing multiple empty buckets": { + // Idx: 01234567890123 + // A: _000_00__0__00 + // B; ________1_____ + // B': _000_00_10__00 + spansA: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 2, Length: 1}, {Offset: 2, Length: 2}}, + bucketsA: []int64{0, 0, 0, 0, 0, 0, 0, 0}, + spansB: []histogram.Span{{Offset: 8, Length: 1}}, + bucketsB: []int64{1}, + expectReset: false, + expectForwardInserts: []Insert{{pos: 5, num: 1, bucketIdx: 8}}, + expectBackwardInserts: []Insert{{pos: 0, num: 3, bucketIdx: 1}, {pos: 0, num: 2, bucketIdx: 5}, {pos: 1, num: 1, bucketIdx: 9}, {pos: 1, num: 2, bucketIdx: 12}}, + expectMergedSpans: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 1, Length: 2}, {Offset: 2, Length: 2}}, + expectBucketsA: []int64{0, 0, 0, 0, 0, 0, 0, 0, 0}, + expectBucketsB: []int64{0, 0, 0, 0, 0, 1, -1, 0, 0}, + }, + "overlap new buckets and missing multiple empty buckets": { + // Idx: 01234567890123 + // A: _000_00_10__00 + // B; ________2_____ + // B': _000_00_20__00 + spansA: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 1, Length: 2}, {Offset: 2, Length: 2}}, + bucketsA: []int64{0, 0, 0, 0, 0, 1, -1, 0, 0}, + spansB: []histogram.Span{{Offset: 8, Length: 1}}, + bucketsB: []int64{2}, + expectReset: false, + expectForwardInserts: nil, + expectBackwardInserts: []Insert{{pos: 0, num: 3, bucketIdx: 1}, {pos: 0, num: 2, bucketIdx: 5}, {pos: 1, num: 1, bucketIdx: 9}, {pos: 1, num: 2, bucketIdx: 12}}, + expectMergedSpans: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 1, Length: 2}, {Offset: 2, Length: 2}}, + expectBucketsA: []int64{0, 0, 0, 0, 0, 1, -1, 0, 0}, + expectBucketsB: []int64{0, 0, 0, 0, 0, 2, -2, 0, 0}, + }, + "overlap new buckets and missing multiple empty buckets with 0 length/offset spans": { + // Idx: 01234567890123 + // A: _000_00_10__00 + // B; ________2_____ + // B': _000_00_20__00 + spansA: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 1, Length: 2}, {Offset: 1, Length: 0}, {Offset: 1, Length: 2}}, + bucketsA: []int64{0, 0, 0, 0, 0, 1, -1, 0, 0}, + spansB: []histogram.Span{{Offset: 1, Length: 0}, {Offset: 7, Length: 1}}, + bucketsB: []int64{2}, + expectReset: false, + expectForwardInserts: nil, + expectBackwardInserts: []Insert{{pos: 0, num: 3, bucketIdx: 1}, {pos: 0, num: 2, bucketIdx: 5}, {pos: 1, num: 1, bucketIdx: 9}, {pos: 1, num: 2, bucketIdx: 12}}, + expectMergedSpans: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 1, Length: 2}, {Offset: 2, Length: 2}}, + expectBucketsA: []int64{0, 0, 0, 0, 0, 1, -1, 0, 0}, + expectBucketsB: []int64{0, 0, 0, 0, 0, 2, -2, 0, 0}, + }, + "new empty buckets between filled buckets": { + // A: 11212332____1__1 + // B: 122323321__11__1 + // A': 112123320__01__1 + // B': 122323321__11__1 + spansA: []histogram.Span{{Offset: -51, Length: 8}, {Offset: 11, Length: 1}, {Offset: 14, Length: 1}}, + bucketsA: []int64{1, 0, 1, -1, 1, 1, 0, -1, -1, 0}, + spansB: []histogram.Span{{Offset: -51, Length: 9}, {Offset: 9, Length: 2}, {Offset: 14, Length: 1}}, + bucketsB: []int64{1, 1, 0, 1, -1, 1, 0, -1, -1, 0, 0, 0}, + expectReset: false, + expectForwardInserts: []Insert{{pos: 8, num: 1, bucketIdx: -43}, {pos: 8, num: 1, bucketIdx: -33}}, + expectMergedSpans: []histogram.Span{{Offset: -51, Length: 9}, {Offset: 9, Length: 2}, {Offset: 14, Length: 1}}, + expectBucketsA: []int64{1, 0, 1, -1, 1, 1, 0, -1, -2, 0, 1, 0}, + // 1 0 1 -1 1 1 0 -1 -2 -2 1 0 + + expectBucketsB: []int64{1, 1, 0, 1, -1, 1, 0, -1, -1, 0, 0, 0}, + }, + "real example 1": { + // I- 6543210987654321 + // A: 0__2_______0__ + // B: _0130_____00_0 + // A': 00020_____00_0 + // B': 00130_____00_0 + spansA: []histogram.Span{{Offset: -16, Length: 1}, {Offset: 2, Length: 1}, {Offset: 7, Length: 1}}, + bucketsA: []int64{0, 2, -2}, + spansB: []histogram.Span{{Offset: -15, Length: 4}, {Offset: 5, Length: 2}, {Offset: 1, Length: 1}}, + bucketsB: []int64{0, 1, 2, -3, 0, 0, 0}, + expectReset: false, + expectForwardInserts: []Insert{{pos: 1, num: 2, bucketIdx: -15}, {pos: 2, num: 1, bucketIdx: -12}, {pos: 2, num: 1, bucketIdx: -6}, {pos: 3, num: 1, bucketIdx: -3}}, + expectBackwardInserts: []Insert{{pos: 0, num: 1, bucketIdx: -16}}, + expectMergedSpans: []histogram.Span{{Offset: -16, Length: 5}, {Offset: 5, Length: 2}, {Offset: 1, Length: 1}}, + expectBucketsA: []int64{0, 0, 0, 2, -2, 0, 0, 0}, + expectBucketsB: []int64{0, 0, 1, 2, -3, 0, 0, 0}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + // Sanity check. + require.Len(t, tc.bucketsA, countSpans(tc.spansA)) + require.Len(t, tc.bucketsB, countSpans(tc.spansB)) + require.Len(t, tc.expectBucketsA, countSpans(tc.expectMergedSpans)) + require.Len(t, tc.expectBucketsB, countSpans(tc.expectMergedSpans)) + + t.Run("integers", func(t *testing.T) { + fInserts, bInserts, ok := expandIntSpansAndBuckets(tc.spansA, tc.spansB, tc.bucketsA, tc.bucketsB) + if tc.expectReset { + require.False(t, ok) + return + } + require.Equal(t, tc.expectForwardInserts, fInserts, "forward inserts") + require.Equal(t, tc.expectBackwardInserts, bInserts, "backward inserts") + + gotBspans := adjustForInserts(tc.spansB, bInserts) + require.Equal(t, tc.expectMergedSpans, gotBspans) + + gotAbuckets := make([]int64, len(tc.expectBucketsA)) + insert(tc.bucketsA, gotAbuckets, fInserts, true) + require.Equal(t, tc.expectBucketsA, gotAbuckets) + + gotBbuckets := make([]int64, len(tc.expectBucketsB)) + insert(tc.bucketsB, gotBbuckets, bInserts, true) + require.Equal(t, tc.expectBucketsB, gotBbuckets) + }) + + t.Run("floats", func(t *testing.T) { + aXorValues := make([]xorValue, len(tc.bucketsA)) + absolute := float64(0) + for i, v := range tc.bucketsA { + absolute += float64(v) + aXorValues[i].value = absolute + } + + makeFloatBuckets := func(in []int64) []float64 { + out := make([]float64, len(in)) + absolute = float64(0) + for i, v := range in { + absolute += float64(v) + out[i] = absolute + } + return out + } + + bFloatBuckets := makeFloatBuckets(tc.bucketsB) + + fInserts, bInserts, ok := expandFloatSpansAndBuckets(tc.spansA, tc.spansB, aXorValues, bFloatBuckets) + if tc.expectReset { + require.False(t, ok) + return + } + require.Equal(t, tc.expectForwardInserts, fInserts, "forward inserts") + require.Equal(t, tc.expectBackwardInserts, bInserts, "backward inserts") + + gotBspans := adjustForInserts(tc.spansB, bInserts) + require.Equal(t, tc.expectMergedSpans, gotBspans) + + gotAbuckets := make([]float64, len(tc.expectBucketsA)) + insert(makeFloatBuckets(tc.bucketsA), gotAbuckets, fInserts, false) + require.Equal(t, makeFloatBuckets(tc.expectBucketsA), gotAbuckets) + + gotBbuckets := make([]float64, len(tc.expectBucketsB)) + insert(makeFloatBuckets(tc.bucketsB), gotBbuckets, bInserts, false) + require.Equal(t, makeFloatBuckets(tc.expectBucketsB), gotBbuckets) + }) + }) + } +} diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 304a9858f1..4233f0bae1 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -206,7 +206,7 @@ func TestHistogramChunkSameBuckets(t *testing.T) { require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1)) } -// Mimics the scenario described for expandSpansForward. +// Mimics the scenario described for expandIntSpansAndBuckets. func TestHistogramChunkBucketChanges(t *testing.T) { c := Chunk(NewHistogramChunk()) @@ -1776,3 +1776,45 @@ func assertFirstIntHistogramSampleHint(t *testing.T, chunk Chunk, expected histo _, v := it.AtHistogram(nil) require.Equal(t, expected, v.CounterResetHint) } + +func TestIntHistogramEmptyBucketsWithGaps(t *testing.T) { + h1 := &histogram.Histogram{ + PositiveSpans: []histogram.Span{ + {Offset: -19, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{0, 0, 0, 0}, + } + require.NoError(t, h1.Validate()) + + c := NewHistogramChunk() + app, err := c.Appender() + require.NoError(t, err) + _, _, _, err = app.AppendHistogram(nil, 1, h1, false) + require.NoError(t, err) + + h2 := &histogram.Histogram{ + PositiveSpans: []histogram.Span{ + {Offset: -19, Length: 1}, + {Offset: 4, Length: 1}, + {Offset: 3, Length: 1}, + }, + PositiveBuckets: []int64{0, 0, 0}, + } + require.NoError(t, h2.Validate()) + + newC, recoded, _, err := app.AppendHistogram(nil, 2, h2, false) + require.NoError(t, err) + require.True(t, recoded) + require.NotNil(t, newC) + + it := newC.Iterator(nil) + require.Equal(t, ValHistogram, it.Next()) + _, h := it.AtFloatHistogram(nil) + require.NoError(t, h.Validate()) + require.Equal(t, ValHistogram, it.Next()) + _, h = it.AtFloatHistogram(nil) + require.NoError(t, h.Validate()) + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) +}