mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-18 18:25:24 -05:00
Merge pull request #16929 from prometheus/codesome/stale-series-compaction
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
tsdb: Early compaction of stale series Closes #13616 Based on https://github.com/prometheus/proposals/pull/55 Stale series tracking was added in #16925. This PR compacts the stale series into its own block before the normal compaction hits. Here is how the settings: stale_series_compaction_threshold: As soon as the ratio of stale series in the head block crosses StaleSeriesImmediateCompactionThreshold, TSDB performs a stale series compaction and puts all the stale series into a block and removed it from the head, but it does not remove it from the WAL. (technically this condition is checked every minute and not exactly immediate) Additional details WAL replay: after a stale series compaction, tombstones are added with (MinInt64, MaxInt64) for all these stale series. During WAL replay we add a special condition where when we find such tombstone, it immediately removes the series from the memory instead of storing the tombstone. This is required so that we don't spike up memory during WAL replay and also don't keep the compacted stale series in the memory. Head block truncation ignores this block via the added metadata, similar to out-of-order blocks.
This commit is contained in:
commit
9eb78735cf
11 changed files with 751 additions and 19 deletions
|
|
@ -692,6 +692,7 @@ func main() {
|
|||
}
|
||||
if cfgFile.StorageConfig.TSDBConfig != nil {
|
||||
cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
|
||||
cfg.tsdb.StaleSeriesCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold
|
||||
if cfgFile.StorageConfig.TSDBConfig.Retention != nil {
|
||||
if cfgFile.StorageConfig.TSDBConfig.Retention.Time > 0 {
|
||||
cfg.tsdb.RetentionDuration = cfgFile.StorageConfig.TSDBConfig.Retention.Time
|
||||
|
|
@ -1943,6 +1944,7 @@ type tsdbOptions struct {
|
|||
UseUncachedIO bool
|
||||
BlockCompactionExcludeFunc tsdb.BlockExcludeFilterFunc
|
||||
BlockReloadInterval model.Duration
|
||||
StaleSeriesCompactionThreshold float64
|
||||
}
|
||||
|
||||
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
||||
|
|
@ -1969,6 +1971,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
|||
BlockCompactionExcludeFunc: opts.BlockCompactionExcludeFunc,
|
||||
BlockReloadInterval: time.Duration(opts.BlockReloadInterval),
|
||||
FeatureRegistry: features.DefaultRegistry,
|
||||
StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1107,6 +1107,10 @@ type TSDBConfig struct {
|
|||
// This should not be used directly and must be converted into OutOfOrderTimeWindow.
|
||||
OutOfOrderTimeWindowFlag model.Duration `yaml:"out_of_order_time_window,omitempty"`
|
||||
|
||||
// StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
|
||||
// the in-memory Head block. If the % of stale series crosses this threshold, stale series compaction is run immediately.
|
||||
StaleSeriesCompactionThreshold float64 `yaml:"stale_series_compaction_threshold,omitempty"`
|
||||
|
||||
Retention *TSDBRetentionConfig `yaml:"retention,omitempty"`
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -447,7 +447,17 @@ func (e errChunksIterator) Err() error { return e.err }
|
|||
// ExpandSamples iterates over all samples in the iterator, buffering all in slice.
|
||||
// Optionally it takes samples constructor, useful when you want to compare sample slices with different
|
||||
// sample implementations. if nil, sample type from this package will be used.
|
||||
// For float sample, NaN values are replaced with -42.
|
||||
func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
|
||||
return expandSamples(iter, true, newSampleFn)
|
||||
}
|
||||
|
||||
// ExpandSamplesWithoutReplacingNaNs is same as ExpandSamples but it does not replace float sample NaN values with anything.
|
||||
func ExpandSamplesWithoutReplacingNaNs(iter chunkenc.Iterator, newSampleFn func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
|
||||
return expandSamples(iter, false, newSampleFn)
|
||||
}
|
||||
|
||||
func expandSamples(iter chunkenc.Iterator, replaceNaN bool, newSampleFn func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
|
||||
if newSampleFn == nil {
|
||||
newSampleFn = func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample {
|
||||
switch {
|
||||
|
|
@ -470,7 +480,7 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(st, t int64, f float
|
|||
t, f := iter.At()
|
||||
st := iter.AtST()
|
||||
// NaNs can't be compared normally, so substitute for another value.
|
||||
if math.IsNaN(f) {
|
||||
if replaceNaN && math.IsNaN(f) {
|
||||
f = -42
|
||||
}
|
||||
result = append(result, newSampleFn(st, t, f, nil, nil))
|
||||
|
|
|
|||
|
|
@ -228,6 +228,18 @@ func (bm *BlockMetaCompaction) FromOutOfOrder() bool {
|
|||
return slices.Contains(bm.Hints, CompactionHintFromOutOfOrder)
|
||||
}
|
||||
|
||||
func (bm *BlockMetaCompaction) SetStaleSeries() {
|
||||
if bm.FromStaleSeries() {
|
||||
return
|
||||
}
|
||||
bm.Hints = append(bm.Hints, CompactionHintFromStaleSeries)
|
||||
slices.Sort(bm.Hints)
|
||||
}
|
||||
|
||||
func (bm *BlockMetaCompaction) FromStaleSeries() bool {
|
||||
return slices.Contains(bm.Hints, CompactionHintFromStaleSeries)
|
||||
}
|
||||
|
||||
const (
|
||||
indexFilename = "index"
|
||||
metaFilename = "meta.json"
|
||||
|
|
@ -236,6 +248,10 @@ const (
|
|||
// CompactionHintFromOutOfOrder is a hint noting that the block
|
||||
// was created from out-of-order chunks.
|
||||
CompactionHintFromOutOfOrder = "from-out-of-order"
|
||||
|
||||
// CompactionHintFromStaleSeries is a hint noting that the block
|
||||
// was created from stale series.
|
||||
CompactionHintFromStaleSeries = "from-stale-series"
|
||||
)
|
||||
|
||||
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
|
||||
|
|
|
|||
|
|
@ -598,6 +598,9 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b
|
|||
if base.Compaction.FromOutOfOrder() {
|
||||
meta.Compaction.SetOutOfOrder()
|
||||
}
|
||||
if base.Compaction.FromStaleSeries() {
|
||||
meta.Compaction.SetStaleSeries()
|
||||
}
|
||||
}
|
||||
|
||||
err := c.write(dest, meta, DefaultBlockPopulator{}, b)
|
||||
|
|
|
|||
90
tsdb/db.go
90
tsdb/db.go
|
|
@ -100,6 +100,10 @@ func DefaultOptions() *Options {
|
|||
|
||||
// Options of the DB storage.
|
||||
type Options struct {
|
||||
// staleSeriesCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks.
|
||||
// This is the one that must be used by the code.
|
||||
staleSeriesCompactionThreshold atomic.Float64
|
||||
|
||||
// Segments (wal files) max size.
|
||||
// WALSegmentSize = 0, segment size is default size.
|
||||
// WALSegmentSize > 0, segment size is WALSegmentSize.
|
||||
|
|
@ -245,6 +249,10 @@ type Options struct {
|
|||
|
||||
// FeatureRegistry is used to register TSDB features.
|
||||
FeatureRegistry features.Collector
|
||||
|
||||
// StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
|
||||
// the in-memory Head block. If the % of stale series crosses this threshold, stale series compaction is run immediately.
|
||||
StaleSeriesCompactionThreshold float64
|
||||
}
|
||||
|
||||
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
|
||||
|
|
@ -305,6 +313,10 @@ type DB struct {
|
|||
// out-of-order compaction and vertical queries.
|
||||
oooWasEnabled atomic.Bool
|
||||
|
||||
// lastHeadCompactionTime is the last wall clock time when the head block compaction was started,
|
||||
// irrespective of success or failure. This does not include out-of-order compaction and stale series compaction.
|
||||
lastHeadCompactionTime time.Time
|
||||
|
||||
writeNotified wlog.WriteNotified
|
||||
|
||||
registerer prometheus.Registerer
|
||||
|
|
@ -857,6 +869,8 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
|
|||
// configured maximum block duration.
|
||||
rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)
|
||||
}
|
||||
|
||||
opts.staleSeriesCompactionThreshold.Store(opts.StaleSeriesCompactionThreshold)
|
||||
return opts, rngs
|
||||
}
|
||||
|
||||
|
|
@ -1151,6 +1165,28 @@ func (db *DB) run(ctx context.Context) {
|
|||
}
|
||||
// We attempt mmapping of head chunks regularly.
|
||||
db.head.mmapHeadChunks()
|
||||
|
||||
numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries()
|
||||
staleSeriesRatio := float64(numStaleSeries) / float64(numSeries)
|
||||
if db.autoCompact && db.opts.staleSeriesCompactionThreshold.Load() > 0 &&
|
||||
staleSeriesRatio >= db.opts.staleSeriesCompactionThreshold.Load() {
|
||||
nextCompactionIsSoon := false
|
||||
if !db.lastHeadCompactionTime.IsZero() {
|
||||
compactionInterval := time.Duration(db.head.chunkRange.Load()) * time.Millisecond
|
||||
nextEstimatedCompactionTime := db.lastHeadCompactionTime.Add(compactionInterval)
|
||||
if time.Now().Add(10 * time.Minute).After(nextEstimatedCompactionTime) {
|
||||
// Next compaction is starting within next 10 mins.
|
||||
nextCompactionIsSoon = true
|
||||
}
|
||||
}
|
||||
|
||||
if !nextCompactionIsSoon {
|
||||
if err := db.CompactStaleHead(); err != nil {
|
||||
db.logger.Error("immediate stale series compaction failed", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case <-db.compactc:
|
||||
db.metrics.compactionsTriggered.Inc()
|
||||
|
||||
|
|
@ -1203,7 +1239,7 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
|
|||
oooTimeWindow := int64(0)
|
||||
if conf.StorageConfig.TSDBConfig != nil {
|
||||
oooTimeWindow = conf.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
|
||||
|
||||
db.opts.staleSeriesCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold)
|
||||
// Update retention configuration if provided.
|
||||
if conf.StorageConfig.TSDBConfig.Retention != nil {
|
||||
db.retentionMtx.Lock()
|
||||
|
|
@ -1217,6 +1253,8 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
|
|||
}
|
||||
db.retentionMtx.Unlock()
|
||||
}
|
||||
} else {
|
||||
db.opts.staleSeriesCompactionThreshold.Store(0)
|
||||
}
|
||||
if oooTimeWindow < 0 {
|
||||
oooTimeWindow = 0
|
||||
|
|
@ -1560,6 +1598,8 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
|
|||
// compactHead compacts the given RangeHead.
|
||||
// The db.cmtx should be held before calling this method.
|
||||
func (db *DB) compactHead(head *RangeHead) error {
|
||||
db.lastHeadCompactionTime = time.Now()
|
||||
|
||||
uids, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("persist head block: %w", err)
|
||||
|
|
@ -1583,6 +1623,52 @@ func (db *DB) compactHead(head *RangeHead) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) CompactStaleHead() error {
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
|
||||
db.logger.Info("Starting stale series compaction")
|
||||
start := time.Now()
|
||||
|
||||
// We get the stale series reference first because this list can change during the compaction below.
|
||||
// It is more efficient and easier to provide an index interface for the stale series when we have a static list.
|
||||
staleSeriesRefs, err := db.head.SortedStaleSeriesRefsNoOOOData(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
meta := &BlockMeta{}
|
||||
meta.Compaction.SetStaleSeries()
|
||||
mint, maxt := db.head.opts.ChunkRange*(db.head.MinTime()/db.head.opts.ChunkRange), db.head.MaxTime()
|
||||
for ; mint < maxt; mint += db.head.chunkRange.Load() {
|
||||
staleHead := NewStaleHead(db.Head(), mint, mint+db.head.chunkRange.Load()-1, staleSeriesRefs)
|
||||
|
||||
uids, err := db.compactor.Write(db.dir, staleHead, staleHead.MinTime(), staleHead.BlockMaxTime(), meta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("persist stale head: %w", err)
|
||||
}
|
||||
|
||||
db.logger.Info("Stale series block created", "ulids", fmt.Sprintf("%v", uids), "min_time", mint, "max_time", maxt)
|
||||
|
||||
if err := db.reloadBlocks(); err != nil {
|
||||
errs := []error{fmt.Errorf("reloadBlocks blocks: %w", err)}
|
||||
for _, uid := range uids {
|
||||
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
|
||||
errs = append(errs, fmt.Errorf("delete persisted stale head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll))
|
||||
}
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.head.truncateStaleSeries(staleSeriesRefs, maxt); err != nil {
|
||||
return fmt.Errorf("head truncate: %w", err)
|
||||
}
|
||||
db.head.RebuildSymbolTable(db.logger)
|
||||
|
||||
db.logger.Info("Ending stale series compaction", "num_series", meta.Stats.NumSeries, "duration", time.Since(start))
|
||||
return nil
|
||||
}
|
||||
|
||||
// compactBlocks compacts all the eligible on-disk blocks.
|
||||
// The db.cmtx should be held before calling this method.
|
||||
func (db *DB) compactBlocks() (err error) {
|
||||
|
|
@ -2042,7 +2128,7 @@ func (db *DB) inOrderBlocksMaxTime() (maxt int64, ok bool) {
|
|||
maxt, ok = int64(math.MinInt64), false
|
||||
// If blocks are overlapping, last block might not have the max time. So check all blocks.
|
||||
for _, b := range db.Blocks() {
|
||||
if !b.meta.Compaction.FromOutOfOrder() && b.meta.MaxTime > maxt {
|
||||
if !b.meta.Compaction.FromOutOfOrder() && !b.meta.Compaction.FromStaleSeries() && b.meta.MaxTime > maxt {
|
||||
ok = true
|
||||
maxt = b.meta.MaxTime
|
||||
}
|
||||
|
|
|
|||
242
tsdb/db_test.go
242
tsdb/db_test.go
|
|
@ -52,6 +52,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/metadata"
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
|
|
@ -145,6 +146,16 @@ func TestDBClose_AfterClose(t *testing.T) {
|
|||
|
||||
// query runs a matcher query against the querier and fully expands its data.
|
||||
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
|
||||
return queryHelper(t, q, true, matchers...)
|
||||
}
|
||||
|
||||
// queryWithoutReplacingNaNs runs a matcher query against the querier and fully expands its data.
|
||||
func queryWithoutReplacingNaNs(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
|
||||
return queryHelper(t, q, false, matchers...)
|
||||
}
|
||||
|
||||
// queryHelper runs a matcher query against the querier and fully expands its data.
|
||||
func queryHelper(t testing.TB, q storage.Querier, withNaNReplacement bool, matchers ...*labels.Matcher) map[string][]chunks.Sample {
|
||||
ss := q.Select(context.Background(), false, nil, matchers...)
|
||||
defer func() {
|
||||
require.NoError(t, q.Close())
|
||||
|
|
@ -156,7 +167,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
|
|||
series := ss.At()
|
||||
|
||||
it = series.Iterator(it)
|
||||
samples, err := storage.ExpandSamples(it, newSample)
|
||||
var samples []chunks.Sample
|
||||
var err error
|
||||
if withNaNReplacement {
|
||||
samples, err = storage.ExpandSamples(it, newSample)
|
||||
} else {
|
||||
samples, err = storage.ExpandSamplesWithoutReplacingNaNs(it, newSample)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, it.Err())
|
||||
|
||||
|
|
@ -2610,7 +2627,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
|
|||
db.DisableCompactions()
|
||||
app := db.Appender(ctx)
|
||||
maxt = 1000
|
||||
for i := 0; i < maxt; i++ {
|
||||
for i := range maxt {
|
||||
_, err := app.Append(0, labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
@ -9323,3 +9340,224 @@ func TestBlockReloadInterval(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStaleSeriesCompaction(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.MinBlockDuration = 1000
|
||||
opts.MaxBlockDuration = 1000
|
||||
db := newTestDB(t, withOpts(opts))
|
||||
db.DisableCompactions()
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
|
||||
var (
|
||||
nonStaleSeries, staleSeries,
|
||||
nonStaleHist, staleHist,
|
||||
nonStaleFHist, staleFHist,
|
||||
staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary []labels.Labels
|
||||
numSeriesPerCategory = 1
|
||||
)
|
||||
for i := range numSeriesPerCategory {
|
||||
nonStaleSeries = append(nonStaleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 1000+i)))
|
||||
nonStaleHist = append(nonStaleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 2000+i)))
|
||||
nonStaleFHist = append(nonStaleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 3000+i)))
|
||||
|
||||
staleSeries = append(staleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 4000+i)))
|
||||
staleHist = append(staleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 5000+i)))
|
||||
staleFHist = append(staleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 6000+i)))
|
||||
|
||||
staleSeriesCrossingBoundary = append(staleSeriesCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 7000+i)))
|
||||
staleHistCrossingBoundary = append(staleHistCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 8000+i)))
|
||||
staleFHistCrossingBoundary = append(staleFHistCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 9000+i)))
|
||||
}
|
||||
|
||||
var (
|
||||
v = 10.0
|
||||
staleV = math.Float64frombits(value.StaleNaN)
|
||||
h = tsdbutil.GenerateTestHistograms(1)[0]
|
||||
fh = tsdbutil.GenerateTestFloatHistograms(1)[0]
|
||||
staleH = &histogram.Histogram{Sum: staleV}
|
||||
staleFH = &histogram.FloatHistogram{Sum: staleV}
|
||||
)
|
||||
|
||||
addNormalSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) {
|
||||
app := db.Appender(context.Background())
|
||||
for i := range len(floatSeries) {
|
||||
_, err := app.Append(0, floatSeries[i], ts, v)
|
||||
require.NoError(t, err)
|
||||
_, err = app.AppendHistogram(0, histSeries[i], ts, h, nil)
|
||||
require.NoError(t, err)
|
||||
_, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, fh)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
addStaleSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) {
|
||||
app := db.Appender(context.Background())
|
||||
for i := range len(floatSeries) {
|
||||
_, err := app.Append(0, floatSeries[i], ts, staleV)
|
||||
require.NoError(t, err)
|
||||
_, err = app.AppendHistogram(0, histSeries[i], ts, staleH, nil)
|
||||
require.NoError(t, err)
|
||||
_, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, staleFH)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
// Normal sample for all.
|
||||
addNormalSamples(100, nonStaleSeries, nonStaleHist, nonStaleFHist)
|
||||
addNormalSamples(100, staleSeries, staleHist, staleFHist)
|
||||
|
||||
// Stale sample for the stale series. Normal sample for the non-stale series.
|
||||
addNormalSamples(200, nonStaleSeries, nonStaleHist, nonStaleFHist)
|
||||
addStaleSamples(200, staleSeries, staleHist, staleFHist)
|
||||
|
||||
// Normal samples for the non-stale series later
|
||||
addNormalSamples(300, nonStaleSeries, nonStaleHist, nonStaleFHist)
|
||||
|
||||
require.Equal(t, uint64(6*numSeriesPerCategory), db.Head().NumSeries())
|
||||
require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumStaleSeries())
|
||||
|
||||
// Series crossing block boundary and gets stale.
|
||||
addNormalSamples(300, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
|
||||
addNormalSamples(700, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
|
||||
addNormalSamples(1100, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
|
||||
addStaleSamples(1200, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
|
||||
|
||||
require.NoError(t, db.CompactStaleHead())
|
||||
|
||||
require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumSeries())
|
||||
require.Equal(t, uint64(0), db.Head().NumStaleSeries())
|
||||
|
||||
require.Len(t, db.Blocks(), 2)
|
||||
m := db.Blocks()[0].Meta()
|
||||
require.Equal(t, int64(0), m.MinTime)
|
||||
require.Equal(t, int64(1000), m.MaxTime)
|
||||
require.Truef(t, m.Compaction.FromStaleSeries(), "stale series info not found in block meta")
|
||||
m = db.Blocks()[1].Meta()
|
||||
require.Equal(t, int64(1000), m.MinTime)
|
||||
require.Equal(t, int64(2000), m.MaxTime)
|
||||
require.Truef(t, m.Compaction.FromStaleSeries(), "stale series info not found in block meta")
|
||||
|
||||
// To make sure that Head is not truncated based on stale series block.
|
||||
require.NoError(t, db.reload())
|
||||
|
||||
nonFirstH := h.Copy()
|
||||
nonFirstH.CounterResetHint = histogram.NotCounterReset
|
||||
nonFirstFH := fh.Copy()
|
||||
nonFirstFH.CounterResetHint = histogram.NotCounterReset
|
||||
|
||||
// Verify head block.
|
||||
verifyHeadBlock := func() {
|
||||
require.Equal(t, uint64(3), db.head.NumSeries())
|
||||
require.Equal(t, uint64(0), db.head.NumStaleSeries())
|
||||
|
||||
expHeadQuery := make(map[string][]chunks.Sample)
|
||||
for i := range numSeriesPerCategory {
|
||||
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleSeries[i].Get("name"))] = []chunks.Sample{
|
||||
sample{t: 100, f: v}, sample{t: 200, f: v}, sample{t: 300, f: v},
|
||||
}
|
||||
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleHist[i].Get("name"))] = []chunks.Sample{
|
||||
sample{t: 100, h: h}, sample{t: 200, h: nonFirstH}, sample{t: 300, h: nonFirstH},
|
||||
}
|
||||
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleFHist[i].Get("name"))] = []chunks.Sample{
|
||||
sample{t: 100, fh: fh}, sample{t: 200, fh: nonFirstFH}, sample{t: 300, fh: nonFirstFH},
|
||||
}
|
||||
}
|
||||
|
||||
querier, err := NewBlockQuerier(NewRangeHead(db.head, 0, 300), 0, 300)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
querier.Close()
|
||||
})
|
||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
|
||||
require.Equal(t, expHeadQuery, seriesSet)
|
||||
}
|
||||
|
||||
verifyHeadBlock()
|
||||
|
||||
// Verify blocks from stale series.
|
||||
{
|
||||
expBlockQuery := make(map[string][]chunks.Sample)
|
||||
for i := range numSeriesPerCategory {
|
||||
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeries[i].Get("name"))] = []chunks.Sample{
|
||||
sample{t: 100, f: v}, sample{t: 200, f: staleV},
|
||||
}
|
||||
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHist[i].Get("name"))] = []chunks.Sample{
|
||||
sample{t: 100, h: h}, sample{t: 200, h: staleH},
|
||||
}
|
||||
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHist[i].Get("name"))] = []chunks.Sample{
|
||||
sample{t: 100, fh: fh}, sample{t: 200, fh: staleFH},
|
||||
}
|
||||
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeriesCrossingBoundary[i].Get("name"))] = []chunks.Sample{
|
||||
sample{t: 300, f: v}, sample{t: 700, f: v}, sample{t: 1100, f: v}, sample{t: 1200, f: staleV},
|
||||
}
|
||||
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHistCrossingBoundary[i].Get("name"))] = []chunks.Sample{
|
||||
sample{t: 300, h: h}, sample{t: 700, h: nonFirstH}, sample{t: 1100, h: h}, sample{t: 1200, h: staleH},
|
||||
}
|
||||
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHistCrossingBoundary[i].Get("name"))] = []chunks.Sample{
|
||||
sample{t: 300, fh: fh}, sample{t: 700, fh: nonFirstFH}, sample{t: 1100, fh: fh}, sample{t: 1200, fh: staleFH},
|
||||
}
|
||||
}
|
||||
|
||||
querier, err := NewBlockQuerier(db.Blocks()[0], 0, 1000)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
querier.Close()
|
||||
})
|
||||
seriesSet := queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
|
||||
|
||||
querier, err = NewBlockQuerier(db.Blocks()[1], 1000, 2000)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
querier.Close()
|
||||
})
|
||||
seriesSet2 := queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
|
||||
for k, v := range seriesSet2 {
|
||||
seriesSet[k] = append(seriesSet[k], v...)
|
||||
}
|
||||
|
||||
require.Len(t, seriesSet, len(expBlockQuery))
|
||||
|
||||
// Compare all the samples except the stale value that needs special handling.
|
||||
for _, category := range [][]labels.Labels{
|
||||
staleSeries, staleHist, staleFHist,
|
||||
staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary,
|
||||
} {
|
||||
for i := range numSeriesPerCategory {
|
||||
seriesKey := fmt.Sprintf(`{name="%s"}`, category[i].Get("name"))
|
||||
samples := expBlockQuery[seriesKey]
|
||||
actSamples, exists := seriesSet[seriesKey]
|
||||
require.Truef(t, exists, "series not found in result %s", seriesKey)
|
||||
require.Len(t, actSamples, len(samples))
|
||||
|
||||
for i := range len(samples) - 1 {
|
||||
require.Equal(t, samples[i], actSamples[i])
|
||||
}
|
||||
|
||||
l := len(samples) - 1
|
||||
require.Equal(t, samples[l].T(), actSamples[l].T())
|
||||
switch {
|
||||
case value.IsStaleNaN(samples[l].F()):
|
||||
require.True(t, value.IsStaleNaN(actSamples[l].F()))
|
||||
case samples[l].H() != nil:
|
||||
require.True(t, value.IsStaleNaN(actSamples[l].H().Sum))
|
||||
default:
|
||||
require.True(t, value.IsStaleNaN(actSamples[l].FH().Sum))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Restart DB and verify that stale series were discarded from WAL replay.
|
||||
require.NoError(t, db.Close())
|
||||
var err error
|
||||
db, err = Open(db.Dir(), db.logger, db.registerer, db.opts, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
verifyHeadBlock()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
256
tsdb/head.go
256
tsdb/head.go
|
|
@ -1203,6 +1203,36 @@ func (h *Head) truncateMemory(mint int64) (err error) {
|
|||
return h.truncateSeriesAndChunkDiskMapper("truncateMemory")
|
||||
}
|
||||
|
||||
// truncateStaleSeries removes the provided series as long as they are still stale.
|
||||
func (h *Head) truncateStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) error {
|
||||
h.chunkSnapshotMtx.Lock()
|
||||
defer h.chunkSnapshotMtx.Unlock()
|
||||
|
||||
if h.MinTime() >= maxt {
|
||||
return nil
|
||||
}
|
||||
|
||||
h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt)
|
||||
|
||||
deleted := h.gcStaleSeries(seriesRefs, maxt)
|
||||
|
||||
// Record these stale series refs in the WAL so that we can ignore them during replay.
|
||||
if h.wal != nil {
|
||||
stones := make([]tombstones.Stone, 0, len(seriesRefs))
|
||||
for ref := range deleted {
|
||||
stones = append(stones, tombstones.Stone{
|
||||
Ref: ref,
|
||||
Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}},
|
||||
})
|
||||
}
|
||||
var enc record.Encoder
|
||||
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
|
||||
// The query timeout limits the max wait time of this function implicitly.
|
||||
// The mint is inclusive and maxt is the truncation time hence exclusive.
|
||||
|
|
@ -1556,6 +1586,53 @@ func (h *RangeHead) String() string {
|
|||
return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
|
||||
}
|
||||
|
||||
// StaleHead allows querying the stale series in the Head via an IndexReader, ChunkReader and tombstones.Reader.
|
||||
// Used only for compactions.
|
||||
type StaleHead struct {
|
||||
RangeHead
|
||||
staleSeriesRefs []storage.SeriesRef
|
||||
}
|
||||
|
||||
// NewStaleHead returns a *StaleHead.
|
||||
func NewStaleHead(head *Head, mint, maxt int64, staleSeriesRefs []storage.SeriesRef) *StaleHead {
|
||||
return &StaleHead{
|
||||
RangeHead: RangeHead{
|
||||
head: head,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
},
|
||||
staleSeriesRefs: staleSeriesRefs,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *StaleHead) Index() (_ IndexReader, err error) {
|
||||
return h.head.staleIndex(h.mint, h.maxt, h.staleSeriesRefs)
|
||||
}
|
||||
|
||||
func (h *StaleHead) NumSeries() uint64 {
|
||||
return h.head.NumStaleSeries()
|
||||
}
|
||||
|
||||
var staleHeadULID = ulid.MustParse("0000000000XXXXXXXSTALEHEAD")
|
||||
|
||||
func (h *StaleHead) Meta() BlockMeta {
|
||||
return BlockMeta{
|
||||
MinTime: h.MinTime(),
|
||||
MaxTime: h.MaxTime(),
|
||||
ULID: staleHeadULID,
|
||||
Stats: BlockStats{
|
||||
NumSeries: h.NumSeries(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// String returns an human readable representation of the stake head. It's important to
|
||||
// keep this function in order to avoid the struct dump when the head is stringified in
|
||||
// errors or logs.
|
||||
func (h *StaleHead) String() string {
|
||||
return fmt.Sprintf("stale head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
|
||||
}
|
||||
|
||||
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
|
||||
// label matchers.
|
||||
func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
|
||||
|
|
@ -1625,13 +1702,14 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
|||
|
||||
// Drop old chunks and remember series IDs and hashes if they can be
|
||||
// deleted entirely.
|
||||
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef, &h.numStaleSeries)
|
||||
deleted, affected, chunksRemoved, staleSeriesDeleted, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
|
||||
seriesRemoved := len(deleted)
|
||||
|
||||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||||
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||||
h.numSeries.Sub(uint64(seriesRemoved))
|
||||
h.numStaleSeries.Sub(uint64(staleSeriesDeleted))
|
||||
|
||||
// Remove deleted series IDs from the postings lists.
|
||||
h.postings.Delete(deleted, affected)
|
||||
|
|
@ -1948,13 +2026,14 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
|
|||
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
|
||||
// and there's no easy way to cast maps.
|
||||
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
|
||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, numStaleSeries *atomic.Uint64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
|
||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _, _ int, _, _ int64, minMmapFile int) {
|
||||
var (
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
rmChunks = 0
|
||||
actualMint int64 = math.MaxInt64
|
||||
minOOOTime int64 = math.MaxInt64
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
rmChunks = 0
|
||||
staleSeriesDeleted = 0
|
||||
actualMint int64 = math.MaxInt64
|
||||
minOOOTime int64 = math.MaxInt64
|
||||
)
|
||||
minMmapFile = math.MaxInt32
|
||||
|
||||
|
|
@ -2009,7 +2088,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n
|
|||
if value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
|
||||
numStaleSeries.Dec()
|
||||
staleSeriesDeleted++
|
||||
}
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
|
|
@ -2025,7 +2104,166 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n
|
|||
actualMint = mint
|
||||
}
|
||||
|
||||
return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile
|
||||
return deleted, affected, rmChunks, staleSeriesDeleted, actualMint, minOOOTime, minMmapFile
|
||||
}
|
||||
|
||||
// gcStaleSeries removes all the provided series as long as they are still stale
|
||||
// and the series maxt is <= the given max.
|
||||
// The returned references are the series that got deleted.
|
||||
func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) map[storage.SeriesRef]struct{} {
|
||||
// Drop old chunks and remember series IDs and hashes if they can be
|
||||
// deleted entirely.
|
||||
deleted, affected, chunksRemoved := h.series.gcStaleSeries(seriesRefs, maxt)
|
||||
seriesRemoved := len(deleted)
|
||||
|
||||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||||
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||||
h.numSeries.Sub(uint64(seriesRemoved))
|
||||
h.numStaleSeries.Sub(uint64(seriesRemoved))
|
||||
|
||||
// Remove deleted series IDs from the postings lists.
|
||||
h.postings.Delete(deleted, affected)
|
||||
|
||||
// Remove tombstones referring to the deleted series.
|
||||
h.tombstones.DeleteTombstones(deleted)
|
||||
|
||||
if h.wal != nil {
|
||||
_, last, _ := wlog.Segments(h.wal.Dir())
|
||||
h.walExpiriesMtx.Lock()
|
||||
// Keep series records until we're past segment 'last'
|
||||
// because the WAL will still have samples records with
|
||||
// this ref ID. If we didn't keep these series records then
|
||||
// on start up when we replay the WAL, or any other code
|
||||
// that reads the WAL, wouldn't be able to use those
|
||||
// samples since we would have no labels for that ref ID.
|
||||
for ref := range deleted {
|
||||
h.walExpiries[chunks.HeadSeriesRef(ref)] = int64(last)
|
||||
}
|
||||
h.walExpiriesMtx.Unlock()
|
||||
}
|
||||
|
||||
return deleted
|
||||
}
|
||||
|
||||
// deleteSeriesByID deletes the series with the given reference.
|
||||
// Only used for WAL replay.
|
||||
func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) {
|
||||
var (
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
staleSeriesDeleted = 0
|
||||
chunksRemoved = 0
|
||||
)
|
||||
|
||||
for _, ref := range refs {
|
||||
refShard := int(ref) & (h.series.size - 1)
|
||||
h.series.locks[refShard].Lock()
|
||||
|
||||
// Copying getByID here to avoid locking and unlocking twice.
|
||||
series := h.series.series[refShard][ref]
|
||||
if series == nil {
|
||||
h.series.locks[refShard].Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
if value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
|
||||
staleSeriesDeleted++
|
||||
}
|
||||
|
||||
hash := series.lset.Hash()
|
||||
hashShard := int(hash) & (h.series.size - 1)
|
||||
|
||||
chunksRemoved += len(series.mmappedChunks)
|
||||
if series.headChunks != nil {
|
||||
chunksRemoved += series.headChunks.len()
|
||||
}
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
||||
h.series.hashes[hashShard].del(hash, series.ref)
|
||||
delete(h.series.series[refShard], series.ref)
|
||||
|
||||
h.series.locks[refShard].Unlock()
|
||||
}
|
||||
|
||||
h.metrics.seriesRemoved.Add(float64(len(deleted)))
|
||||
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||||
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||||
h.numSeries.Sub(uint64(len(deleted)))
|
||||
h.numStaleSeries.Sub(uint64(staleSeriesDeleted))
|
||||
|
||||
// Remove deleted series IDs from the postings lists.
|
||||
h.postings.Delete(deleted, affected)
|
||||
|
||||
// Remove tombstones referring to the deleted series.
|
||||
h.tombstones.DeleteTombstones(deleted)
|
||||
}
|
||||
|
||||
// gcStaleSeries removes all the stale series provided that they are still stale
|
||||
// and the series maxt is <= the given max.
|
||||
func (s *stripeSeries) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) {
|
||||
var (
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
rmChunks = 0
|
||||
)
|
||||
|
||||
staleSeriesMap := map[storage.SeriesRef]struct{}{}
|
||||
for _, ref := range seriesRefs {
|
||||
staleSeriesMap[ref] = struct{}{}
|
||||
}
|
||||
|
||||
check := func(hashShard int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) {
|
||||
if _, exists := staleSeriesMap[storage.SeriesRef(series.ref)]; !exists {
|
||||
// This series was not compacted. Skip it.
|
||||
return
|
||||
}
|
||||
|
||||
series.Lock()
|
||||
defer series.Unlock()
|
||||
|
||||
if series.maxTime() > maxt {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the series is still stale.
|
||||
isStale := value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum))
|
||||
|
||||
if !isStale {
|
||||
return
|
||||
}
|
||||
|
||||
if series.headChunks != nil {
|
||||
rmChunks += series.headChunks.len()
|
||||
}
|
||||
rmChunks += len(series.mmappedChunks)
|
||||
|
||||
// The series is gone entirely. We need to keep the series lock
|
||||
// and make sure we have acquired the stripe locks for hash and ID of the
|
||||
// series alike.
|
||||
// If we don't hold them all, there's a very small chance that a series receives
|
||||
// samples again while we are half-way into deleting it.
|
||||
refShard := int(series.ref) & (s.size - 1)
|
||||
if hashShard != refShard {
|
||||
s.locks[refShard].Lock()
|
||||
defer s.locks[refShard].Unlock()
|
||||
}
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
||||
s.hashes[hashShard].del(hash, series.ref)
|
||||
delete(s.series[refShard], series.ref)
|
||||
deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function.
|
||||
}
|
||||
|
||||
s.iterForDeletion(check)
|
||||
|
||||
return deleted, affected, rmChunks
|
||||
}
|
||||
|
||||
// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
|
|
@ -201,6 +202,112 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
|||
return nil
|
||||
}
|
||||
|
||||
func (h *Head) staleIndex(mint, maxt int64, staleSeriesRefs []storage.SeriesRef) (*headStaleIndexReader, error) {
|
||||
return &headStaleIndexReader{
|
||||
headIndexReader: h.indexRange(mint, maxt),
|
||||
staleSeriesRefs: staleSeriesRefs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// headStaleIndexReader gives the stale series that have no out-of-order data.
|
||||
// This is only used for stale series compaction at the moment, that will only ask for all
|
||||
// the series during compaction. So to make that efficient, this index reader requires the
|
||||
// pre-calculated list of stale series refs that can be returned without re-reading the Head.
|
||||
type headStaleIndexReader struct {
|
||||
*headIndexReader
|
||||
staleSeriesRefs []storage.SeriesRef
|
||||
}
|
||||
|
||||
func (h *headStaleIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
|
||||
// If all postings are requested, return the precalculated list.
|
||||
k, v := index.AllPostingsKey()
|
||||
if len(h.staleSeriesRefs) > 0 && name == k && len(values) == 1 && values[0] == v {
|
||||
return index.NewListPostings(h.staleSeriesRefs), nil
|
||||
}
|
||||
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.Postings(ctx, name, values...))
|
||||
if err != nil {
|
||||
return index.ErrPostings(err), err
|
||||
}
|
||||
return index.NewListPostings(seriesRefs), nil
|
||||
}
|
||||
|
||||
func (h *headStaleIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||
// Unused for compaction, so we don't need to optimise.
|
||||
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForLabelMatching(ctx, name, match))
|
||||
if err != nil {
|
||||
return index.ErrPostings(err)
|
||||
}
|
||||
return index.NewListPostings(seriesRefs)
|
||||
}
|
||||
|
||||
func (h *headStaleIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
|
||||
// Unused for compaction, so we don't need to optimise.
|
||||
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForAllLabelValues(ctx, name))
|
||||
if err != nil {
|
||||
return index.ErrPostings(err)
|
||||
}
|
||||
return index.NewListPostings(seriesRefs)
|
||||
}
|
||||
|
||||
// filterStaleSeriesAndSortPostings returns the stale series references from the given postings
|
||||
// that also do not have any out-of-order data.
|
||||
func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.SeriesRef, error) {
|
||||
series := make([]*memSeries, 0, 1024)
|
||||
|
||||
notFoundSeriesCount := 0
|
||||
for p.Next() {
|
||||
s := h.series.getByID(chunks.HeadSeriesRef(p.At()))
|
||||
if s == nil {
|
||||
notFoundSeriesCount++
|
||||
continue
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
if s.ooo != nil {
|
||||
// Has out-of-order data; skip it because we cannot determine if a series
|
||||
// is stale when it's getting out-of-order data.
|
||||
s.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
if value.IsStaleNaN(s.lastValue) ||
|
||||
(s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) ||
|
||||
(s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) {
|
||||
series = append(series, s)
|
||||
}
|
||||
s.Unlock()
|
||||
}
|
||||
if notFoundSeriesCount > 0 {
|
||||
h.logger.Debug("Looked up stale series not found", "count", notFoundSeriesCount)
|
||||
}
|
||||
if err := p.Err(); err != nil {
|
||||
return nil, fmt.Errorf("expand postings: %w", err)
|
||||
}
|
||||
|
||||
slices.SortFunc(series, func(a, b *memSeries) int {
|
||||
return labels.Compare(a.labels(), b.labels())
|
||||
})
|
||||
|
||||
refs := make([]storage.SeriesRef, 0, len(series))
|
||||
for _, p := range series {
|
||||
refs = append(refs, storage.SeriesRef(p.ref))
|
||||
}
|
||||
return refs, nil
|
||||
}
|
||||
|
||||
// SortedPostings returns the postings as it is because we expect any postings obtained via
|
||||
// headStaleIndexReader to be already sorted.
|
||||
func (*headStaleIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||
// All the postings function above already give the sorted list of postings.
|
||||
return p
|
||||
}
|
||||
|
||||
// SortedStaleSeriesRefsNoOOOData returns all the series refs of the stale series that do not have any out-of-order data.
|
||||
func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.SeriesRef, error) {
|
||||
k, v := index.AllPostingsKey()
|
||||
return h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v))
|
||||
}
|
||||
|
||||
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta {
|
||||
for i, c := range s.mmappedChunks {
|
||||
// Do not expose chunks that are outside of the specified range.
|
||||
|
|
|
|||
|
|
@ -6519,7 +6519,7 @@ func TestStripeSeries_gc(t *testing.T) {
|
|||
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
|
||||
hash := ms1.lset.Hash()
|
||||
|
||||
s.gc(0, 0, nil)
|
||||
s.gc(0, 0)
|
||||
|
||||
// Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series
|
||||
got := s.getByHash(hash, ms1.lset)
|
||||
|
|
|
|||
|
|
@ -308,7 +308,21 @@ Outer:
|
|||
}
|
||||
h.wlReplaySamplesPool.Put(v)
|
||||
case []tombstones.Stone:
|
||||
// Tombstone records will be fairly rare, so not trying to optimise the allocations here.
|
||||
deleteSeriesShards := make([][]chunks.HeadSeriesRef, concurrency)
|
||||
for _, s := range v {
|
||||
if len(s.Intervals) == 1 && s.Intervals[0].Mint == math.MinInt64 && s.Intervals[0].Maxt == math.MaxInt64 {
|
||||
// This series was fully deleted at this point. This record is only done for stale series at the moment.
|
||||
mod := uint64(s.Ref) % uint64(concurrency)
|
||||
deleteSeriesShards[mod] = append(deleteSeriesShards[mod], chunks.HeadSeriesRef(s.Ref))
|
||||
|
||||
// If the series is with a different reference, try deleting that.
|
||||
if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok {
|
||||
mod := uint64(r) % uint64(concurrency)
|
||||
deleteSeriesShards[mod] = append(deleteSeriesShards[mod], r)
|
||||
}
|
||||
continue
|
||||
}
|
||||
for _, itv := range s.Intervals {
|
||||
if itv.Maxt < h.minValidTime.Load() {
|
||||
continue
|
||||
|
|
@ -326,6 +340,14 @@ Outer:
|
|||
h.tombstones.AddInterval(s.Ref, itv)
|
||||
}
|
||||
}
|
||||
|
||||
for i := range concurrency {
|
||||
if len(deleteSeriesShards[i]) > 0 {
|
||||
processors[i].input <- walSubsetProcessorInputItem{deletedSeriesRefs: deleteSeriesShards[i]}
|
||||
deleteSeriesShards[i] = nil
|
||||
}
|
||||
}
|
||||
|
||||
h.wlReplaytStonesPool.Put(v)
|
||||
case []record.RefExemplar:
|
||||
for _, e := range v {
|
||||
|
|
@ -558,10 +580,11 @@ type walSubsetProcessor struct {
|
|||
}
|
||||
|
||||
type walSubsetProcessorInputItem struct {
|
||||
samples []record.RefSample
|
||||
histogramSamples []histogramRecord
|
||||
existingSeries *memSeries
|
||||
walSeriesRef chunks.HeadSeriesRef
|
||||
samples []record.RefSample
|
||||
histogramSamples []histogramRecord
|
||||
existingSeries *memSeries
|
||||
walSeriesRef chunks.HeadSeriesRef
|
||||
deletedSeriesRefs []chunks.HeadSeriesRef
|
||||
}
|
||||
|
||||
func (wp *walSubsetProcessor) setup() {
|
||||
|
|
@ -712,6 +735,10 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
|||
case wp.histogramsOutput <- in.histogramSamples:
|
||||
default:
|
||||
}
|
||||
|
||||
if len(in.deletedSeriesRefs) > 0 {
|
||||
h.deleteSeriesByID(in.deletedSeriesRefs)
|
||||
}
|
||||
}
|
||||
h.updateMinMaxTime(mint, maxt)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue