refactor(appenderV2): add AppendableV2 support for scrape
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-01-15 10:01:28 +00:00
parent a362f690d7
commit 96bdd68589
8 changed files with 507 additions and 3557 deletions

View file

@ -50,13 +50,19 @@ func withAppendable(appendable storage.Appendable) func(sl *scrapeLoop) {
}
}
func withAppendableV2(appendableV2 storage.AppendableV2) func(sl *scrapeLoop) {
return func(sl *scrapeLoop) {
sl.appendableV2 = appendableV2
}
}
// newTestScrapeLoop is the initial scrape loop for all tests.
// It returns scrapeLoop and mock scraper you can customize.
//
// It's recommended to use withXYZ functions for simple option customizations, e.g:
//
// appTest := teststorage.NewAppendable()
// sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
// sl, _ := newTestScrapeLoop(t, withAppendableV2(appTest))
//
// However, when changing more than one scrapeLoop options it's more readable to have one explicit opt function:
//
@ -64,7 +70,7 @@ func withAppendable(appendable storage.Appendable) func(sl *scrapeLoop) {
// appTest := teststorage.NewAppendable()
// sl, scraper := newTestScrapeLoop(t, func(sl *scrapeLoop) {
// sl.ctx = ctx
// sl.appendable = appTest
// sl.appendableV2 = appTest
// // Since we're writing samples directly below we need to provide a protocol fallback.
// sl.fallbackScrapeProtocol = "text/plain"
// })
@ -84,8 +90,6 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo
timeout: 1 * time.Hour,
sampleMutator: nopMutator,
reportSampleMutator: nopMutator,
appendable: teststorage.NewAppendable(),
buffers: pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) }),
metrics: metrics,
maxSchema: histogram.ExponentialSchemaMax,
@ -98,6 +102,7 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo
for _, o := range opts {
o(sl)
}
// Validate user opts for convenience.
require.Nil(t, sl.parentCtx, "newTestScrapeLoop does not support injecting non-nil parent context")
require.Nil(t, sl.appenderCtx, "newTestScrapeLoop does not support injecting non-nil appender context")

View file

@ -114,7 +114,8 @@ type Manager struct {
opts *Options
logger *slog.Logger
appendable storage.Appendable
appendable storage.Appendable
appendableV2 storage.AppendableV2
graceShut chan struct{}
@ -196,7 +197,7 @@ func (m *Manager) reload() {
continue
}
m.metrics.targetScrapePools.Inc()
sp, err := newScrapePool(scrapeConfig, m.appendable, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
sp, err := newScrapePool(scrapeConfig, m.appendable, m.appendableV2, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
if err != nil {
m.metrics.targetScrapePoolsFailed.Inc()
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName)

View file

@ -82,11 +82,12 @@ type FailureLogger interface {
// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appendable storage.Appendable
logger *slog.Logger
ctx context.Context
cancel context.CancelFunc
options *Options
appendable storage.Appendable
appendableV2 storage.AppendableV2
logger *slog.Logger
ctx context.Context
cancel context.CancelFunc
options *Options
// mtx must not be taken after targetMtx.
mtx sync.Mutex
@ -139,6 +140,7 @@ type scrapeLoopAppendAdapter interface {
func newScrapePool(
cfg *config.ScrapeConfig,
appendable storage.Appendable,
appendableV2 storage.AppendableV2,
offsetSeed uint64,
logger *slog.Logger,
buffers *pool.Pool,
@ -171,6 +173,7 @@ func newScrapePool(
ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{
appendable: appendable,
appendableV2: appendableV2,
logger: logger,
ctx: ctx,
cancel: cancel,
@ -842,11 +845,12 @@ type scrapeLoop struct {
scraper scraper
// Static params per scrapePool.
appendable storage.Appendable
buffers *pool.Pool
offsetSeed uint64
symbolTable *labels.SymbolTable
metrics *scrapeMetrics
appendable storage.Appendable
appendableV2 storage.AppendableV2
buffers *pool.Pool
offsetSeed uint64
symbolTable *labels.SymbolTable
metrics *scrapeMetrics
// Options from config.ScrapeConfig.
sampleLimit int
@ -1190,11 +1194,12 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
scraper: opts.scraper,
// Static params per scrapePool.
appendable: opts.sp.appendable,
buffers: opts.sp.buffers,
offsetSeed: opts.sp.offsetSeed,
symbolTable: opts.sp.symbolTable,
metrics: opts.sp.metrics,
appendable: opts.sp.appendable,
appendableV2: opts.sp.appendableV2,
buffers: opts.sp.buffers,
offsetSeed: opts.sp.offsetSeed,
symbolTable: opts.sp.symbolTable,
metrics: opts.sp.metrics,
// config.ScrapeConfig.
sampleLimit: int(opts.sp.config.SampleLimit),
@ -1303,7 +1308,9 @@ mainLoop:
}
func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter {
// NOTE(bwplotka): Add AppenderV2 implementation, see https://github.com/prometheus/prometheus/issues/17632.
if sl.appendableV2 != nil {
return &scrapeLoopAppenderV2{scrapeLoop: sl, AppenderV2: sl.appendableV2.AppenderV2(sl.appenderCtx)}
}
return &scrapeLoopAppender{scrapeLoop: sl, Appender: sl.appendable.Appender(sl.appenderCtx)}
}
@ -1637,7 +1644,7 @@ loop:
break
}
switch et {
// TODO(bwplotka): Consider changing parser to give metadata at once instead of type, help and unit in separation, ideally on `Series()/Histogram()
// TODO(bwplotka): Consider changing parser to give metadata at once instead of type, help and unit in separation, ideally on `Series()/Histogram()`
// otherwise we can expose metadata without series on metadata API.
case textparse.EntryType:
// TODO(bwplotka): Build meta entry directly instead of locking and updating the map. This will
@ -1753,7 +1760,7 @@ loop:
}
}
sampleAdded, err = sl.checkAddError(met, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
sampleAdded, err = sl.checkAddError(met, nil, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
sl.l.Debug("Unexpected error", "series", string(met), "err", err)
@ -1942,7 +1949,8 @@ func isSeriesPartOfFamily(mName string, mfName []byte, typ model.MetricType) boo
// during normal operation (e.g., accidental cardinality explosion, sudden traffic spikes).
// Current case ordering prevents exercising other cases when limits are exceeded.
// Remaining error cases typically occur only a few times, often during initial setup.
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (sampleAdded bool, _ error) {
func (sl *scrapeLoop) checkAddError(met []byte, exemplars []exemplar.Exemplar, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (sampleAdded bool, _ error) {
var pErr *storage.AppendPartialError
switch {
case err == nil:
return true, nil
@ -1973,6 +1981,23 @@ func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucke
return false, nil
case errors.Is(err, storage.ErrNotFound):
return false, storage.ErrNotFound
case errors.As(err, &pErr):
outOfOrderExemplars := 0
for _, e := range pErr.ExemplarErrors {
if errors.Is(e, storage.ErrOutOfOrderExemplar) {
outOfOrderExemplars++
}
// Since exemplar storage is still experimental, we don't fail or check other errors.
// Debug log is emitted in TSDB already.
}
if outOfOrderExemplars > 0 && outOfOrderExemplars == len(exemplars) {
// Only report out of order exemplars if all are out of order, otherwise this was a partial update
// to some existing set of exemplars.
appErrs.numExemplarOutOfOrder += outOfOrderExemplars
sl.l.Debug("Out of order exemplars", "count", outOfOrderExemplars, "latest", fmt.Sprintf("%+v", exemplars[len(exemplars)-1]))
sl.metrics.targetScrapeExemplarOutOfOrder.Add(float64(outOfOrderExemplars))
}
return true, nil
default:
return false, err
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -94,7 +94,7 @@ func TestNewScrapePool(t *testing.T) {
MetricNameValidationScheme: model.UTF8Validation,
MetricNameEscapingScheme: model.AllowUTF8,
}
sp, err = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err = newScrapePool(cfg, app, nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
)
require.NoError(t, err)
@ -337,7 +337,7 @@ func TestDroppedTargetsList(t *testing.T) {
},
},
}
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ = newScrapePool(cfg, app, nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
expectedLength = 2
)
@ -821,7 +821,7 @@ func TestScrapePoolRaces(t *testing.T) {
MetricNameEscapingScheme: model.AllowUTF8,
}
}
sp, _ := newScrapePool(newConfig(), teststorage.NewAppendable(), 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ := newScrapePool(newConfig(), teststorage.NewAppendable(), nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
tgts := []*targetgroup.Group{
{
Targets: []model.LabelSet{
@ -1007,7 +1007,7 @@ func TestScrapeLoopRun(t *testing.T) {
)
ctx, cancel := context.WithCancel(t.Context())
sl, scraper := newTestScrapeLoop(t, withCtx(ctx))
sl, scraper := newTestScrapeLoop(t, withCtx(ctx), withAppendable(teststorage.NewAppendable()))
// The loop must terminate during the initial offset if the context
// is canceled.
scraper.offsetDur = time.Hour
@ -1031,6 +1031,7 @@ func TestScrapeLoopRun(t *testing.T) {
ctx, cancel = context.WithCancel(t.Context())
sl, scraper = newTestScrapeLoop(t, func(sl *scrapeLoop) {
sl.appendable = teststorage.NewAppendable()
sl.ctx = ctx
sl.timeout = 100 * time.Millisecond
})
@ -1082,7 +1083,7 @@ func TestScrapeLoopForcedErr(t *testing.T) {
)
ctx, cancel := context.WithCancel(t.Context())
sl, scraper := newTestScrapeLoop(t, withCtx(ctx))
sl, scraper := newTestScrapeLoop(t, withCtx(ctx), withAppendable(teststorage.NewAppendable()))
forcedErr := errors.New("forced err")
sl.setForcedError(forcedErr)
@ -1122,7 +1123,7 @@ func TestScrapeLoopRun_ContextCancelTerminatesBlockedSend(t *testing.T) {
)
ctx, cancel := context.WithCancel(t.Context())
sl, scraper := newTestScrapeLoop(t, withCtx(ctx))
sl, scraper := newTestScrapeLoop(t, withCtx(ctx), withAppendable(teststorage.NewAppendable()))
forcedErr := errors.New("forced err")
sl.setForcedError(forcedErr)
@ -1149,7 +1150,7 @@ func TestScrapeLoopRun_ContextCancelTerminatesBlockedSend(t *testing.T) {
}
func TestScrapeLoopMetadata(t *testing.T) {
sl, _ := newTestScrapeLoop(t)
sl, _ := newTestScrapeLoop(t, withAppendable(teststorage.NewAppendable()))
app := sl.appender()
total, _, _, err := app.append([]byte(`# TYPE test_metric counter
@ -1183,7 +1184,7 @@ test_metric_total 1
}
func TestScrapeLoopSeriesAdded(t *testing.T) {
sl, _ := newTestScrapeLoop(t)
sl, _ := newTestScrapeLoop(t, withAppendable(teststorage.NewAppendable()))
app := sl.appender()
total, added, seriesAdded, err := app.append([]byte("test_metric 1\n"), "text/plain", time.Time{})
@ -1214,6 +1215,7 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
NameValidationScheme: model.UTF8Validation,
}}
sl, _ := newTestScrapeLoop(t, func(sl *scrapeLoop) {
sl.appendable = teststorage.NewAppendable()
sl.sampleMutator = func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, target, true, relabelConfig)
}
@ -1230,6 +1232,7 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
func TestScrapeLoopFailLegacyUnderUTF8(t *testing.T) {
sl, _ := newTestScrapeLoop(t, func(sl *scrapeLoop) {
sl.appendable = teststorage.NewAppendable()
sl.validationScheme = model.LegacyValidation
})
@ -1243,6 +1246,7 @@ func TestScrapeLoopFailLegacyUnderUTF8(t *testing.T) {
// When scrapeloop has validation set to UTF-8, the metric is allowed.
sl, _ = newTestScrapeLoop(t, func(sl *scrapeLoop) {
sl.appendable = teststorage.NewAppendable()
sl.validationScheme = model.UTF8Validation
})
@ -1772,6 +1776,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
sl, scraper := newTestScrapeLoop(t, func(sl *scrapeLoop) {
sl.appendable = teststorage.NewAppendable().Then(s)
sl.ctx = ctx
})
numScrapes := 0
@ -2231,6 +2236,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
func TestScrapeLoopAppendFailsWithNoContentType(t *testing.T) {
sl, _ := newTestScrapeLoop(t, func(sl *scrapeLoop) {
sl.appendable = teststorage.NewAppendable()
// Explicitly setting the lack of fallback protocol here to make it obvious.
sl.fallbackScrapeProtocol = ""
})
@ -2245,6 +2251,7 @@ func TestScrapeLoopAppendFailsWithNoContentType(t *testing.T) {
// TestScrapeLoopAppendEmptyWithNoContentType ensures we there are no errors when we get a blank scrape or just want to append a stale marker.
func TestScrapeLoopAppendEmptyWithNoContentType(t *testing.T) {
sl, _ := newTestScrapeLoop(t, func(sl *scrapeLoop) {
sl.appendable = teststorage.NewAppendable()
// Explicitly setting the lack of fallback protocol here to make it obvious.
sl.fallbackScrapeProtocol = ""
})
@ -3651,7 +3658,7 @@ func TestReuseScrapeCache(t *testing.T) {
MetricNameValidationScheme: model.UTF8Validation,
MetricNameEscapingScheme: model.AllowUTF8,
}
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ = newScrapePool(cfg, app, nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
t1 = &Target{
labels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"),
scrapeConfig: &config.ScrapeConfig{
@ -3858,7 +3865,7 @@ func TestReuseCacheRace(t *testing.T) {
MetricNameEscapingScheme: model.AllowUTF8,
}
buffers = pool.New(1e3, 100e6, 3, func(sz int) any { return make([]byte, 0, sz) })
sp, _ = newScrapePool(cfg, teststorage.NewAppendable(), 0, nil, buffers, &Options{}, newTestScrapeMetrics(t))
sp, _ = newScrapePool(cfg, teststorage.NewAppendable(), nil, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t))
t1 = &Target{
labels: labels.FromStrings("labelNew", "nameNew"),
scrapeConfig: &config.ScrapeConfig{},
@ -3888,7 +3895,7 @@ func TestCheckAddError(t *testing.T) {
var appErrs appendErrors
sl, _ := newTestScrapeLoop(t)
// TODO: Check err etc
_, _ = sl.checkAddError(nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs)
_, _ = sl.checkAddError(nil, nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs)
require.Equal(t, 1, appErrs.numOutOfOrder)
// TODO(bwplotka): Test partial error check and other cases
@ -3967,7 +3974,7 @@ func TestScrapeReportLimit(t *testing.T) {
ts, scrapedTwice := newScrapableServer("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n")
defer ts.Close()
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, s, nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -4021,7 +4028,7 @@ func TestScrapeUTF8(t *testing.T) {
ts, scrapedTwice := newScrapableServer("{\"with.dots\"} 42\n")
defer ts.Close()
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, s, nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -4121,6 +4128,7 @@ func TestScrapeLoopLabelLimit(t *testing.T) {
}
sl, _ := newTestScrapeLoop(t, func(sl *scrapeLoop) {
sl.appendable = teststorage.NewAppendable()
sl.sampleMutator = func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, discoveryLabels, false, nil)
}
@ -4170,7 +4178,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
},
},
}
sp, _ := newScrapePool(cfg, teststorage.NewAppendable(), 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, _ := newScrapePool(cfg, teststorage.NewAppendable(), nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
tgts := []*targetgroup.Group{
{
Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},
@ -4256,7 +4264,7 @@ test_summary_count 199
ts, scrapedTwice := newScrapableServer(metricsText)
defer ts.Close()
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, s, nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -4837,7 +4845,7 @@ disk_usage_bytes 456
ts, scrapedTwice := newScrapableServer(metricsText)
defer ts.Close()
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, s, nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -4961,7 +4969,7 @@ func TestScrapeLoopCompression(t *testing.T) {
MetricNameEscapingScheme: model.AllowUTF8,
}
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, s, nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -5133,7 +5141,7 @@ func BenchmarkTargetScraperGzip(b *testing.B) {
// When a scrape contains multiple instances for the same time series we should increment
// prometheus_target_scrapes_sample_duplicate_timestamp_total metric.
func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) {
sl, _ := newTestScrapeLoop(t)
sl, _ := newTestScrapeLoop(t, withAppendable(teststorage.NewAppendable()))
app := sl.appender()
total, added, seriesAdded, err := app.append([]byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "text/plain", time.Time{})
@ -5346,7 +5354,7 @@ func TestTargetScrapeConfigWithLabels(t *testing.T) {
}
}
sp, err := newScrapePool(cfg, teststorage.NewAppendable(), 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, teststorage.NewAppendable(), nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
t.Cleanup(sp.stop)
@ -5509,7 +5517,7 @@ func TestScrapePoolScrapeAfterReload(t *testing.T) {
},
}
p, err := newScrapePool(cfg, teststorage.NewAppendable(), 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
p, err := newScrapePool(cfg, teststorage.NewAppendable(), nil, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
require.NoError(t, err)
t.Cleanup(p.stop)
@ -5832,6 +5840,7 @@ func BenchmarkScrapePoolRestartLoops(b *testing.B) {
ScrapeTimeout: model.Duration(1 * time.Hour),
},
nil,
nil,
0,
nil,
nil,
@ -5900,7 +5909,7 @@ func TestNewScrapeLoopHonorLabelsWiring(t *testing.T) {
MetricNameValidationScheme: model.UTF8Validation,
}
sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t))
sp, err := newScrapePool(cfg, s, nil, 0, nil, nil, &Options{skipOffsetting: true}, newTestScrapeMetrics(t))
require.NoError(t, err)
defer sp.stop()
@ -5950,6 +5959,7 @@ func TestDropsSeriesFromMetricRelabeling(t *testing.T) {
},
}
sl, _ := newTestScrapeLoop(t, func(sl *scrapeLoop) {
sl.appendable = teststorage.NewAppendable()
sl.sampleMutator = func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, target, true, relabelConfig)
}

View file

@ -454,6 +454,105 @@ func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels
return ref, nil
}
// limitAppender limits the number of total appended samples in a batch.
type limitAppenderV2 struct {
storage.AppenderV2
limit int
i int
}
func (app *limitAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
// Bypass sample_limit checks only if we have a staleness marker for a known series (ref value is non-zero).
// This ensures that if a series is already in TSDB then we always write the marker.
if ref == 0 || !value.IsStaleNaN(v) {
app.i++
if app.i > app.limit {
return 0, errSampleLimit
}
}
return app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts)
}
type timeLimitAppenderV2 struct {
storage.AppenderV2
maxTime int64
}
func (app *timeLimitAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
if t > app.maxTime {
return 0, storage.ErrOutOfBounds
}
return app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts)
}
// bucketLimitAppender limits the number of total appended samples in a batch.
type bucketLimitAppenderV2 struct {
storage.AppenderV2
limit int
}
func (app *bucketLimitAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (_ storage.SeriesRef, err error) {
if h != nil {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(h.Schema) {
return 0, errBucketLimit
}
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
if h.Schema <= histogram.ExponentialSchemaMin {
return 0, errBucketLimit
}
if err = h.ReduceResolution(h.Schema - 1); err != nil {
return 0, err
}
}
}
if fh != nil {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(fh.Schema) {
return 0, errBucketLimit
}
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
if fh.Schema <= histogram.ExponentialSchemaMin {
return 0, errBucketLimit
}
if err = fh.ReduceResolution(fh.Schema - 1); err != nil {
return 0, err
}
}
}
return app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts)
}
type maxSchemaAppenderV2 struct {
storage.AppenderV2
maxSchema int32
}
func (app *maxSchemaAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (_ storage.SeriesRef, err error) {
if h != nil {
if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema {
if err = h.ReduceResolution(app.maxSchema); err != nil {
return 0, err
}
}
}
if fh != nil {
if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema {
if err = fh.ReduceResolution(app.maxSchema); err != nil {
return 0, err
}
}
}
return app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts)
}
// PopulateDiscoveredLabels sets base labels on lb from target and group labels and scrape configuration, before relabeling.
func PopulateDiscoveredLabels(lb *labels.Builder, cfg *config.ScrapeConfig, tLabels, tgLabels model.LabelSet) {
lb.Reset(labels.EmptyLabels())

View file

@ -332,7 +332,8 @@ func computeOrCheckRef(ref storage.SeriesRef, ls labels.Labels) (storage.SeriesR
}
if storage.SeriesRef(h) != ref {
// Check for buggy ref while we at it.
// Check for buggy ref while we are at it. This only makes sense for cases without .Then*, because further appendable
// might have a different ref computation logic e.g. TSDB uses atomic increments.
return 0, errors.New("teststorage.appender: found input ref not matching labels; potential bug in Appendable usage")
}
return ref, nil
@ -498,13 +499,14 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
if a.next != nil {
ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts)
if err != nil {
return 0, err
}
} else {
ref, err = computeOrCheckRef(ref, ls)
if err != nil {
return ref, err
}
}
ref, err = computeOrCheckRef(ref, ls)
if err != nil {
return ref, err
}
return ref, partialErr
}