From 49c3aea56d37197e5575c336e60a3ee1c9d8a076 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Mon, 12 Jan 2026 08:45:26 +0000 Subject: [PATCH 1/4] feat(storage)[PART4b]: add AppenderV2 to the rest of storage.Storage impl Signed-off-by: bwplotka --- cmd/prometheus/main.go | 17 ++++++ storage/fanout.go | 64 ++++++++++++++++++++ storage/fanout_test.go | 116 +++++++++++++++++++++++++++++++++++- storage/interface.go | 29 ++++++--- storage/interface_append.go | 1 + storage/remote/storage.go | 7 +++ storage/remote/write.go | 50 +++++++++++++--- tsdb/head_append_v2.go | 1 + 8 files changed, 267 insertions(+), 18 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ee60e58b2e..8b82049f50 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1746,6 +1746,14 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender { return notReadyAppender{} } +// AppenderV2 implements the Storage interface. +func (s *readyStorage) AppenderV2(ctx context.Context) storage.AppenderV2 { + if x := s.get(); x != nil { + return x.AppenderV2(ctx) + } + return notReadyAppenderV2{} +} + type notReadyAppender struct{} // SetOptions does nothing in this appender implementation. @@ -1779,6 +1787,15 @@ func (notReadyAppender) Commit() error { return tsdb.ErrNotReady } func (notReadyAppender) Rollback() error { return tsdb.ErrNotReady } +type notReadyAppenderV2 struct{} + +func (notReadyAppenderV2) Append(storage.SeriesRef, labels.Labels, int64, int64, float64, *histogram.Histogram, *histogram.FloatHistogram, storage.AOptions) (storage.SeriesRef, error) { + return 0, tsdb.ErrNotReady +} +func (notReadyAppenderV2) Commit() error { return tsdb.ErrNotReady } + +func (notReadyAppenderV2) Rollback() error { return tsdb.ErrNotReady } + // Close implements the Storage interface. func (s *readyStorage) Close() error { if x := s.get(); x != nil { diff --git a/storage/fanout.go b/storage/fanout.go index afcf993b3f..c5102b442f 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -136,6 +136,19 @@ func (f *fanout) Appender(ctx context.Context) Appender { } } +func (f *fanout) AppenderV2(ctx context.Context) AppenderV2 { + primary := f.primary.AppenderV2(ctx) + secondaries := make([]AppenderV2, 0, len(f.secondaries)) + for _, storage := range f.secondaries { + secondaries = append(secondaries, storage.AppenderV2(ctx)) + } + return &fanoutAppenderV2{ + logger: f.logger, + primary: primary, + secondaries: secondaries, + } +} + // Close closes the storage and all its underlying resources. func (f *fanout) Close() error { errs := []error{ @@ -278,3 +291,54 @@ func (f *fanoutAppender) Rollback() (err error) { } return nil } + +type fanoutAppenderV2 struct { + logger *slog.Logger + + primary AppenderV2 + secondaries []AppenderV2 +} + +func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) { + ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil { + return 0, err + } + } + return ref, nil +} + +func (f *fanoutAppenderV2) Commit() (err error) { + err = f.primary.Commit() + + for _, appender := range f.secondaries { + if err == nil { + err = appender.Commit() + } else { + if rollbackErr := appender.Rollback(); rollbackErr != nil { + f.logger.Error("Squashed rollback error on commit", "err", rollbackErr) + } + } + } + return err +} + +func (f *fanoutAppenderV2) Rollback() (err error) { + err = f.primary.Rollback() + + for _, appender := range f.secondaries { + rollbackErr := appender.Rollback() + switch { + case err == nil: + err = rollbackErr + case rollbackErr != nil: + f.logger.Error("Squashed rollback error on rollback", "err", rollbackErr) + } + } + return nil +} diff --git a/storage/fanout_test.go b/storage/fanout_test.go index ed4cf17696..fb2f8dd553 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -132,6 +132,115 @@ func TestFanout_SelectSorted(t *testing.T) { }) } +func TestFanout_SelectSorted_AppenderV2(t *testing.T) { + inputLabel := labels.FromStrings(model.MetricNameLabel, "a") + outputLabel := labels.FromStrings(model.MetricNameLabel, "a") + + inputTotalSize := 0 + + priStorage := teststorage.New(t) + defer priStorage.Close() + app1 := priStorage.AppenderV2(t.Context()) + _, err := app1.Append(0, inputLabel, 0, 0, 0, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app1.Append(0, inputLabel, 0, 1000, 1, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app1.Append(0, inputLabel, 0, 2000, 2, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + require.NoError(t, app1.Commit()) + + remoteStorage1 := teststorage.New(t) + defer remoteStorage1.Close() + app2 := remoteStorage1.AppenderV2(t.Context()) + _, err = app2.Append(0, inputLabel, 0, 3000, 3, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app2.Append(0, inputLabel, 0, 4000, 4, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app2.Append(0, inputLabel, 0, 5000, 5, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + require.NoError(t, app2.Commit()) + + remoteStorage2 := teststorage.New(t) + defer remoteStorage2.Close() + + app3 := remoteStorage2.AppenderV2(t.Context()) + _, err = app3.Append(0, inputLabel, 0, 6000, 6, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app3.Append(0, inputLabel, 0, 7000, 7, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + _, err = app3.Append(0, inputLabel, 0, 8000, 8, nil, nil, storage.AOptions{}) + require.NoError(t, err) + inputTotalSize++ + + require.NoError(t, app3.Commit()) + + fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2) + + t.Run("querier", func(t *testing.T) { + querier, err := fanoutStorage.Querier(0, 8000) + require.NoError(t, err) + defer querier.Close() + + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") + require.NoError(t, err) + + seriesSet := querier.Select(t.Context(), true, nil, matcher) + + result := make(map[int64]float64) + var labelsResult labels.Labels + var iterator chunkenc.Iterator + for seriesSet.Next() { + series := seriesSet.At() + seriesLabels := series.Labels() + labelsResult = seriesLabels + iterator := series.Iterator(iterator) + for iterator.Next() == chunkenc.ValFloat { + timestamp, value := iterator.At() + result[timestamp] = value + } + } + + require.Equal(t, labelsResult, outputLabel) + require.Len(t, result, inputTotalSize) + }) + t.Run("chunk querier", func(t *testing.T) { + querier, err := fanoutStorage.ChunkQuerier(0, 8000) + require.NoError(t, err) + defer querier.Close() + + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") + require.NoError(t, err) + + seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(t.Context(), true, nil, matcher)) + + result := make(map[int64]float64) + var labelsResult labels.Labels + var iterator chunkenc.Iterator + for seriesSet.Next() { + series := seriesSet.At() + seriesLabels := series.Labels() + labelsResult = seriesLabels + iterator := series.Iterator(iterator) + for iterator.Next() == chunkenc.ValFloat { + timestamp, value := iterator.At() + result[timestamp] = value + } + } + + require.NoError(t, seriesSet.Err()) + require.Equal(t, labelsResult, outputLabel) + require.Len(t, result, inputTotalSize) + }) +} + func TestFanoutErrors(t *testing.T) { workingStorage := teststorage.New(t) defer workingStorage.Close() @@ -224,9 +333,10 @@ type errChunkQuerier struct{ errQuerier } func (errStorage) ChunkQuerier(_, _ int64) (storage.ChunkQuerier, error) { return errChunkQuerier{}, nil } -func (errStorage) Appender(context.Context) storage.Appender { return nil } -func (errStorage) StartTime() (int64, error) { return 0, nil } -func (errStorage) Close() error { return nil } +func (errStorage) Appender(context.Context) storage.Appender { return nil } +func (errStorage) AppenderV2(context.Context) storage.AppenderV2 { return nil } +func (errStorage) StartTime() (int64, error) { return 0, nil } +func (errStorage) Close() error { return nil } func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { return storage.ErrSeriesSet(errSelect) diff --git a/storage/interface.go b/storage/interface.go index 23b8b48a0c..d6ce895d58 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -61,7 +61,8 @@ type SeriesRef uint64 // Appendable allows creating Appender. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appendable will be removed soon (ETA: Q2 2026). type Appendable interface { // Appender returns a new appender for the storage. // @@ -77,10 +78,16 @@ type SampleAndChunkQueryable interface { } // Storage ingests and manages samples, along with various indexes. All methods -// are goroutine-safe. Storage implements storage.Appender. +// are goroutine-safe. type Storage interface { SampleAndChunkQueryable + + // Appendable allows appending to storage. + // WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). + // Appendable will be removed soon (ETA: Q2 2026). Appendable + // AppendableV2 allows appending to storage. + AppendableV2 // StartTime returns the oldest timestamp stored in the storage. StartTime() (int64, error) @@ -261,7 +268,8 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) { // AppendOptions provides options for implementations of the Appender interface. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// AppendOptions will be removed soon (ETA: Q2 2026). type AppendOptions struct { // DiscardOutOfOrder tells implementation that this append should not be out // of order. An OOO append MUST be rejected with storage.ErrOutOfOrderSample @@ -278,7 +286,8 @@ type AppendOptions struct { // I.e. timestamp order within batch is not validated, samples are not reordered per timestamp or by float/histogram // type. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appender will be removed soon (ETA: Q2 2026). type Appender interface { AppenderTransaction @@ -315,7 +324,8 @@ type GetRef interface { // ExemplarAppender provides an interface for adding samples to exemplar storage, which // within Prometheus is in-memory only. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// ExemplarAppender will be removed soon (ETA: Q2 2026). type ExemplarAppender interface { // AppendExemplar adds an exemplar for the given series labels. // An optional reference number can be provided to accelerate calls. @@ -333,7 +343,8 @@ type ExemplarAppender interface { // HistogramAppender provides an interface for appending histograms to the storage. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// HistogramAppender will be removed soon (ETA: Q2 2026). type HistogramAppender interface { // AppendHistogram adds a histogram for the given series labels. An // optional reference number can be provided to accelerate calls. A @@ -365,7 +376,8 @@ type HistogramAppender interface { // MetadataUpdater provides an interface for associating metadata to stored series. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// MetadataUpdater will be removed soon (ETA: Q2 2026). type MetadataUpdater interface { // UpdateMetadata updates a metadata entry for the given series and labels. // A series reference number is returned which can be used to modify the @@ -379,7 +391,8 @@ type MetadataUpdater interface { // StartTimestampAppender provides an interface for appending ST to storage. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// StartTimestampAppender will be removed soon (ETA: Q2 2026). type StartTimestampAppender interface { // AppendSTZeroSample adds synthetic zero sample for the given st timestamp, // which will be associated with given series, labels and the incoming diff --git a/storage/interface_append.go b/storage/interface_append.go index cc7045dbd5..f2dce8e52e 100644 --- a/storage/interface_append.go +++ b/storage/interface_append.go @@ -69,6 +69,7 @@ type AppendV2Options struct { // Exemplars (optional) attached to the appended sample. // Exemplar slice MUST be sorted by Exemplar.TS. // Exemplar slice is unsafe for reuse. + // Duplicate exemplars errors MUST be ignored by implementations. Exemplars []exemplar.Exemplar // RejectOutOfOrder tells implementation that this append should not be out diff --git a/storage/remote/storage.go b/storage/remote/storage.go index f482597249..be75d23383 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -63,6 +63,8 @@ type Storage struct { localStartTimeCallback startTimeCallback } +var _ storage.Storage = &Storage{} + // NewStorage returns a remote.Storage. func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage { if l == nil { @@ -193,6 +195,11 @@ func (s *Storage) Appender(ctx context.Context) storage.Appender { return s.rws.Appender(ctx) } +// AppenderV2 implements storage.Storage. +func (s *Storage) AppenderV2(ctx context.Context) storage.AppenderV2 { + return s.rws.AppenderV2(ctx) +} + // LowestSentTimestamp returns the lowest sent timestamp across all queues. func (s *Storage) LowestSentTimestamp() int64 { return s.rws.LowestSentTimestamp() diff --git a/storage/remote/write.go b/storage/remote/write.go index 92f447d624..91000a1d25 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -238,8 +238,20 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { // Appender implements storage.Storage. func (rws *WriteStorage) Appender(context.Context) storage.Appender { return ×tampTracker{ - writeStorage: rws, - highestRecvTimestamp: rws.highestTimestamp, + baseTimestampTracker: baseTimestampTracker{ + writeStorage: rws, + highestRecvTimestamp: rws.highestTimestamp, + }, + } +} + +// AppenderV2 implements storage.Storage. +func (rws *WriteStorage) AppenderV2(context.Context) storage.AppenderV2 { + return ×tampTrackerV2{ + baseTimestampTracker: baseTimestampTracker{ + writeStorage: rws, + highestRecvTimestamp: rws.highestTimestamp, + }, } } @@ -282,9 +294,9 @@ func (rws *WriteStorage) Close() error { return nil } -type timestampTracker struct { - writeStorage *WriteStorage - appendOptions *storage.AppendOptions +type baseTimestampTracker struct { + writeStorage *WriteStorage + samples int64 exemplars int64 histograms int64 @@ -292,6 +304,12 @@ type timestampTracker struct { highestRecvTimestamp *maxTimestamp } +type timestampTracker struct { + baseTimestampTracker + + appendOptions *storage.AppendOptions +} + func (t *timestampTracker) SetOptions(opts *storage.AppendOptions) { t.appendOptions = opts } @@ -345,7 +363,7 @@ func (*timestampTracker) UpdateMetadata(storage.SeriesRef, labels.Labels, metada } // Commit implements storage.Appender. -func (t *timestampTracker) Commit() error { +func (t *baseTimestampTracker) Commit() error { t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms) samplesIn.Add(float64(t.samples)) @@ -356,6 +374,24 @@ func (t *timestampTracker) Commit() error { } // Rollback implements storage.Appender. -func (*timestampTracker) Rollback() error { +func (*baseTimestampTracker) Rollback() error { return nil } + +type timestampTrackerV2 struct { + baseTimestampTracker +} + +func (t *timestampTrackerV2) Append(ref storage.SeriesRef, _ labels.Labels, _, ts int64, _ float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) { + switch { + case fh != nil, h != nil: + t.histograms++ + default: + t.samples++ + } + if ts > t.highestTimestamp { + t.highestTimestamp = ts + } + t.exemplars += int64(len(opts.Exemplars)) + return ref, nil +} diff --git a/tsdb/head_append_v2.go b/tsdb/head_append_v2.go index 241fb42e97..4a62d56741 100644 --- a/tsdb/head_append_v2.go +++ b/tsdb/head_append_v2.go @@ -323,6 +323,7 @@ func (a *headAppenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemp if err := a.head.exemplars.ValidateExemplar(s.labels(), e); err != nil { if !errors.Is(err, storage.ErrDuplicateExemplar) && !errors.Is(err, storage.ErrExemplarsDisabled) { // Except duplicates, return partial errors. + // TODO(bwplotka): Add exemplar info into error. errs = append(errs, err) continue } From 8a2921e3851d886367308aaedc7545ee4cd8138a Mon Sep 17 00:00:00 2001 From: bwplotka Date: Wed, 14 Jan 2026 13:57:48 +0000 Subject: [PATCH 2/4] addressed feedback Signed-off-by: bwplotka --- storage/fanout.go | 4 ++-- storage/remote/write.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/storage/fanout.go b/storage/fanout.go index c5102b442f..95c6499952 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -289,7 +289,7 @@ func (f *fanoutAppender) Rollback() (err error) { f.logger.Error("Squashed rollback error on rollback", "err", rollbackErr) } } - return nil + return err } type fanoutAppenderV2 struct { @@ -340,5 +340,5 @@ func (f *fanoutAppenderV2) Rollback() (err error) { f.logger.Error("Squashed rollback error on rollback", "err", rollbackErr) } } - return nil + return err } diff --git a/storage/remote/write.go b/storage/remote/write.go index 91000a1d25..6a336dc06b 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -382,6 +382,7 @@ type timestampTrackerV2 struct { baseTimestampTracker } +// Append implements storage.AppenderV2. func (t *timestampTrackerV2) Append(ref storage.SeriesRef, _ labels.Labels, _, ts int64, _ float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) { switch { case fh != nil, h != nil: From 1d3c6210f3320747d24b8a66987e19b5720b2184 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 15 Jan 2026 13:12:17 +0000 Subject: [PATCH 3/4] add extra fanout test Signed-off-by: bwplotka --- storage/fanout_test.go | 216 +++++++++++++++++++++++++++++++++++ util/teststorage/appender.go | 4 +- 2 files changed, 218 insertions(+), 2 deletions(-) diff --git a/storage/fanout_test.go b/storage/fanout_test.go index fb2f8dd553..92a20a3ecb 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -21,6 +21,12 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/util/testutil" + + "github.com/prometheus/prometheus/tsdb/tsdbutil" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -355,3 +361,213 @@ func (errQuerier) Close() error { return nil } func (errChunkQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet { return storage.ErrChunkSeriesSet(errSelect) } + +type mockStorage struct { + app storage.Appendable + appV2 storage.AppendableV2 + storage.Storage +} + +func (m mockStorage) Appender(ctx context.Context) storage.Appender { + return m.app.Appender(ctx) +} + +func (m mockStorage) AppenderV2(ctx context.Context) storage.AppenderV2 { + return m.appV2.AppenderV2(ctx) +} + +type sample = teststorage.Sample + +func withoutExemplars(s []sample) (ret []sample) { + ret = make([]sample, len(s)) + copy(ret, s) + for i := range ret { + ret[i].ES = nil + } + return ret +} + +type fanoutAppenderTestCase struct { + name string + primary *teststorage.Appendable + secondary *teststorage.Appendable + + expectAppendErr bool + expectExemplarError bool + expectCommitError bool + + expectPrimarySamples []sample + expectSecondarySamples []sample +} + +func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase { + appErr := errors.New("append test error") + exErr := errors.New("exemplar test error") + commitErr := errors.New("commit test error") + + return []fanoutAppenderTestCase{ + { + name: "both works", + primary: teststorage.NewAppendable(), + secondary: teststorage.NewAppendable(), + + expectPrimarySamples: expected, + expectSecondarySamples: expected, + }, + { + name: "primary errors", + primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr), + secondary: teststorage.NewAppendable(), + + expectAppendErr: true, + expectExemplarError: true, + expectCommitError: true, + }, + { + name: "primary exemplar errors", + primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return nil }, exErr, nil), + secondary: teststorage.NewAppendable(), + + expectAppendErr: false, + expectExemplarError: true, + expectCommitError: false, + + expectPrimarySamples: withoutExemplars(expected), + expectSecondarySamples: withoutExemplars(expected), + }, + { + name: "secondary errors", + primary: teststorage.NewAppendable(), + secondary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr), + + expectAppendErr: true, + expectExemplarError: true, + expectCommitError: true, + + expectPrimarySamples: expected, + }, + } +} + +func TestFanoutAppender(t *testing.T) { + h := tsdbutil.GenerateTestHistogram(0) + fh := tsdbutil.GenerateTestFloatHistogram(0) + ex := exemplar.Exemplar{Value: 1} + + expected := []sample{ + {L: labels.FromStrings(model.MetricNameLabel, "metric1"), V: 1, ES: []exemplar.Exemplar{ex}}, + {L: labels.FromStrings(model.MetricNameLabel, "metric2"), T: 1, H: h}, + {L: labels.FromStrings(model.MetricNameLabel, "metric3"), T: 2, FH: fh}, + } + for _, tt := range fanoutAppenderTestCases(expected) { + t.Run(tt.name, func(t *testing.T) { + f := storage.NewFanout(nil, mockStorage{app: tt.primary}, mockStorage{app: tt.secondary}) + + app := f.Appender(t.Context()) + ref, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), 0, 1) + if tt.expectAppendErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + _, err = app.AppendExemplar(ref, labels.FromStrings(model.MetricNameLabel, "metric1"), ex) + if tt.expectExemplarError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + _, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric2"), 1, h, nil) + if tt.expectAppendErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + _, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric3"), 2, nil, fh) + if tt.expectAppendErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + err = app.Commit() + if tt.expectCommitError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + require.Nil(t, tt.primary.PendingSamples()) + testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples()) + require.Nil(t, tt.primary.RolledbackSamples()) + + require.Nil(t, tt.secondary.PendingSamples()) + testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples()) + require.Nil(t, tt.secondary.RolledbackSamples()) + }) + } +} + +func TestFanoutAppenderV2(t *testing.T) { + h := tsdbutil.GenerateTestHistogram(0) + fh := tsdbutil.GenerateTestFloatHistogram(0) + ex := exemplar.Exemplar{Value: 1} + + expected := []sample{ + {L: labels.FromStrings(model.MetricNameLabel, "metric1"), ST: -1, V: 1, ES: []exemplar.Exemplar{ex}}, + {L: labels.FromStrings(model.MetricNameLabel, "metric2"), ST: -2, T: 1, H: h}, + {L: labels.FromStrings(model.MetricNameLabel, "metric3"), ST: -3, T: 2, FH: fh}, + } + + for _, tt := range fanoutAppenderTestCases(expected) { + t.Run(tt.name, func(t *testing.T) { + f := storage.NewFanout(nil, mockStorage{appV2: tt.primary}, mockStorage{appV2: tt.secondary}) + + app := f.AppenderV2(t.Context()) + _, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), -1, 0, 1, nil, nil, storage.AOptions{ + Exemplars: []exemplar.Exemplar{ex}, + }) + switch { + case tt.expectAppendErr: + require.Error(t, err) + case tt.expectExemplarError: + pErr := &storage.AppendPartialError{} + require.ErrorAs(t, err, &pErr) + require.Len(t, pErr.ExemplarErrors, 1) + default: + require.NoError(t, err) + } + + _, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric2"), -2, 1, 0, h, nil, storage.AOptions{}) + if tt.expectAppendErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + _, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric3"), -3, 2, 0, nil, fh, storage.AOptions{}) + if tt.expectAppendErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + err = app.Commit() + if tt.expectCommitError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + require.Nil(t, tt.primary.PendingSamples()) + testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples()) + require.Nil(t, tt.primary.RolledbackSamples()) + + require.Nil(t, tt.secondary.PendingSamples()) + testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples()) + require.Nil(t, tt.secondary.RolledbackSamples()) + }) + } +} diff --git a/util/teststorage/appender.go b/util/teststorage/appender.go index a98ff9c48f..fff9833b60 100644 --- a/util/teststorage/appender.go +++ b/util/teststorage/appender.go @@ -374,7 +374,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem // with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632 i := len(a.a.pendingSamples) - 1 for ; i >= 0; i-- { // Attach exemplars to the last matching sample. - if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) { + if labels.Equal(l, a.a.pendingSamples[i].L) { a.a.pendingSamples[i].ES = append(a.a.pendingSamples[i].ES, e) appended = true break @@ -415,7 +415,7 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta // with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632 i := len(a.a.pendingSamples) - 1 for ; i >= 0; i-- { // Attach metadata to the last matching sample. - if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) { + if labels.Equal(l, a.a.pendingSamples[i].L) { a.a.pendingSamples[i].M = m updated = true break From f61a83bcd69f4a2e728b55472ecbddd5965583aa Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 15 Jan 2026 14:14:31 +0000 Subject: [PATCH 4/4] fix: appenderV2 mock exemplar appendErr injection before appending it Signed-off-by: bwplotka --- storage/fanout.go | 9 ++++++--- storage/fanout_test.go | 24 ++++++++++++------------ storage/interface_append.go | 25 +++++++++++++++++++++++++ util/teststorage/appender.go | 36 ++++++++++++++++++++---------------- 4 files changed, 63 insertions(+), 31 deletions(-) diff --git a/storage/fanout.go b/storage/fanout.go index 95c6499952..9baa31d9af 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -301,16 +301,19 @@ type fanoutAppenderV2 struct { func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) { ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts) - if err != nil { + var partialErr AppendPartialError + if partialErr.Handle(err) != nil { return ref, err } for _, appender := range f.secondaries { if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil { - return 0, err + if partialErr.Handle(err) != nil { + return ref, err + } } } - return ref, nil + return ref, partialErr.ErrOrNil() } func (f *fanoutAppenderV2) Commit() (err error) { diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 92a20a3ecb..25f61341cd 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -21,17 +21,14 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "github.com/prometheus/prometheus/util/testutil" - - "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/teststorage" + "github.com/prometheus/prometheus/util/testutil" ) func TestFanout_SelectSorted(t *testing.T) { @@ -416,7 +413,7 @@ func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase { }, { name: "primary errors", - primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr), + primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr), secondary: teststorage.NewAppendable(), expectAppendErr: true, @@ -424,9 +421,9 @@ func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase { expectCommitError: true, }, { - name: "primary exemplar errors", - primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return nil }, exErr, nil), - secondary: teststorage.NewAppendable(), + name: "exemplar errors", + primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil), + secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil), expectAppendErr: false, expectExemplarError: true, @@ -438,7 +435,7 @@ func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase { { name: "secondary errors", primary: teststorage.NewAppendable(), - secondary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr), + secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr), expectAppendErr: true, expectExemplarError: true, @@ -533,9 +530,12 @@ func TestFanoutAppenderV2(t *testing.T) { case tt.expectAppendErr: require.Error(t, err) case tt.expectExemplarError: - pErr := &storage.AppendPartialError{} + var pErr *storage.AppendPartialError require.ErrorAs(t, err, &pErr) - require.Len(t, pErr.ExemplarErrors, 1) + // One for primary, one for secondary. + // This is because in V2 flow we must append sample even when first append partially failed with exemplars. + // Filtering out exemplars is neither feasible, nor important. + require.Len(t, pErr.ExemplarErrors, 2) default: require.NoError(t, err) } diff --git a/storage/interface_append.go b/storage/interface_append.go index f2dce8e52e..aa4ae84152 100644 --- a/storage/interface_append.go +++ b/storage/interface_append.go @@ -97,6 +97,31 @@ func (e *AppendPartialError) Error() string { return errs.Error() } +// ErrOrNil returns AppendPartialError as error, returning nil +// if there are no errors. +func (e *AppendPartialError) ErrOrNil() error { + if len(e.ExemplarErrors) == 0 { + return nil + } + return e +} + +// Handle handles the given err that may be an AppendPartialError. +// If the err is nil or not an AppendPartialError it returns err. +// Otherwise, partial errors are aggregated. +func (e *AppendPartialError) Handle(err error) error { + if err == nil { + return nil + } + + var pErr *AppendPartialError + if !errors.As(err, &pErr) { + return err + } + e.ExemplarErrors = append(e.ExemplarErrors, pErr.ExemplarErrors...) + return nil +} + var _ error = &AppendPartialError{} // AppenderV2 provides appends against a storage for all types of samples. diff --git a/util/teststorage/appender.go b/util/teststorage/appender.go index fff9833b60..55cb727ee0 100644 --- a/util/teststorage/appender.go +++ b/util/teststorage/appender.go @@ -464,13 +464,28 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64 } } - a.a.mtx.Lock() - var es []exemplar.Exemplar + var ( + es []exemplar.Exemplar + partialErr error + ) + if len(opts.Exemplars) > 0 { - // As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse. - es = make([]exemplar.Exemplar, len(opts.Exemplars)) - copy(es, opts.Exemplars) + if a.a.appendExemplarsError != nil { + var exErrs []error + for range opts.Exemplars { + exErrs = append(exErrs, a.a.appendExemplarsError) + } + if len(exErrs) > 0 { + partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs} + } + } else { + // As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse. + es = make([]exemplar.Exemplar, len(opts.Exemplars)) + copy(es, opts.Exemplars) + } } + + a.a.mtx.Lock() a.a.pendingSamples = append(a.a.pendingSamples, Sample{ MF: opts.MetricFamilyName, M: opts.Metadata, @@ -481,17 +496,6 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64 }) a.a.mtx.Unlock() - var partialErr error - if a.a.appendExemplarsError != nil { - var exErrs []error - for range opts.Exemplars { - exErrs = append(exErrs, a.a.appendExemplarsError) - } - if len(exErrs) > 0 { - partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs} - } - } - if a.next != nil { ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts) if err != nil {