diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 9d6d864971..08d56d2072 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1742,6 +1742,14 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender { return notReadyAppender{} } +// AppenderV2 implements the Storage interface. +func (s *readyStorage) AppenderV2(ctx context.Context) storage.AppenderV2 { + if x := s.get(); x != nil { + return x.AppenderV2(ctx) + } + return notReadyAppenderV2{} +} + type notReadyAppender struct{} // SetOptions does nothing in this appender implementation. @@ -1775,6 +1783,15 @@ func (notReadyAppender) Commit() error { return tsdb.ErrNotReady } func (notReadyAppender) Rollback() error { return tsdb.ErrNotReady } +type notReadyAppenderV2 struct{} + +func (notReadyAppenderV2) Append(storage.SeriesRef, labels.Labels, int64, int64, float64, *histogram.Histogram, *histogram.FloatHistogram, storage.AOptions) (storage.SeriesRef, error) { + return 0, tsdb.ErrNotReady +} +func (notReadyAppenderV2) Commit() error { return tsdb.ErrNotReady } + +func (notReadyAppenderV2) Rollback() error { return tsdb.ErrNotReady } + // Close implements the Storage interface. func (s *readyStorage) Close() error { if x := s.get(); x != nil { diff --git a/storage/fanout.go b/storage/fanout.go index a699a97b02..33c609029b 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -1,4 +1,4 @@ -// Copyright 2017 The Prometheus Authors +// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -130,6 +130,19 @@ func (f *fanout) Appender(ctx context.Context) Appender { } } +func (f *fanout) AppenderV2(ctx context.Context) AppenderV2 { + primary := f.primary.AppenderV2(ctx) + secondaries := make([]AppenderV2, 0, len(f.secondaries)) + for _, storage := range f.secondaries { + secondaries = append(secondaries, storage.AppenderV2(ctx)) + } + return &fanoutAppenderV2{ + logger: f.logger, + primary: primary, + secondaries: secondaries, + } +} + // Close closes the storage and all its underlying resources. func (f *fanout) Close() error { errs := tsdb_errors.NewMulti(f.primary.Close()) @@ -270,3 +283,54 @@ func (f *fanoutAppender) Rollback() (err error) { } return nil } + +type fanoutAppenderV2 struct { + logger *slog.Logger + + primary AppenderV2 + secondaries []AppenderV2 +} + +func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) { + ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil { + return 0, err + } + } + return ref, nil +} + +func (f *fanoutAppenderV2) Commit() (err error) { + err = f.primary.Commit() + + for _, appender := range f.secondaries { + if err == nil { + err = appender.Commit() + } else { + if rollbackErr := appender.Rollback(); rollbackErr != nil { + f.logger.Error("Squashed rollback error on commit", "err", rollbackErr) + } + } + } + return err +} + +func (f *fanoutAppenderV2) Rollback() (err error) { + err = f.primary.Rollback() + + for _, appender := range f.secondaries { + rollbackErr := appender.Rollback() + switch { + case err == nil: + err = rollbackErr + case rollbackErr != nil: + f.logger.Error("Squashed rollback error on rollback", "err", rollbackErr) + } + } + return nil +} diff --git a/storage/fanout_test.go b/storage/fanout_test.go index b1762ec555..7b63aab9b9 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 The Prometheus Authors +// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -224,9 +224,10 @@ type errChunkQuerier struct{ errQuerier } func (errStorage) ChunkQuerier(_, _ int64) (storage.ChunkQuerier, error) { return errChunkQuerier{}, nil } -func (errStorage) Appender(context.Context) storage.Appender { return nil } -func (errStorage) StartTime() (int64, error) { return 0, nil } -func (errStorage) Close() error { return nil } +func (errStorage) Appender(context.Context) storage.Appender { return nil } +func (errStorage) AppenderV2(context.Context) storage.AppenderV2 { return nil } +func (errStorage) StartTime() (int64, error) { return 0, nil } +func (errStorage) Close() error { return nil } func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { return storage.ErrSeriesSet(errSelect) diff --git a/storage/interface.go b/storage/interface.go index f7d7953de4..0ea114d246 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -1,4 +1,4 @@ -// Copyright 2014 The Prometheus Authors +// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -61,7 +61,8 @@ type SeriesRef uint64 // Appendable allows creating Appender. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appendable will be removed soon (ETA: Q2 2026). type Appendable interface { // Appender returns a new appender for the storage. // @@ -77,10 +78,15 @@ type SampleAndChunkQueryable interface { } // Storage ingests and manages samples, along with various indexes. All methods -// are goroutine-safe. Storage implements storage.Appender. +// are goroutine-safe. type Storage interface { SampleAndChunkQueryable + + // Appendable allows appending to storage. + // WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). + // Appendable will be removed soon (ETA: Q2 2026). Appendable + AppendableV2 // StartTime returns the oldest timestamp stored in the storage. StartTime() (int64, error) @@ -261,7 +267,8 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) { // AppendOptions provides options for implementations of the Appender interface. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appendable will be removed soon (ETA: Q2 2026). type AppendOptions struct { // DiscardOutOfOrder tells implementation that this append should not be out // of order. An OOO append MUST be rejected with storage.ErrOutOfOrderSample @@ -278,7 +285,8 @@ type AppendOptions struct { // series. I.e. samples are not reordered per timestamp, or by float/histogram // type. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appendable will be removed soon (ETA: Q2 2026). type Appender interface { AppenderTransaction @@ -315,7 +323,8 @@ type GetRef interface { // ExemplarAppender provides an interface for adding samples to exemplar storage, which // within Prometheus is in-memory only. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appendable will be removed soon (ETA: Q2 2026). type ExemplarAppender interface { // AppendExemplar adds an exemplar for the given series labels. // An optional reference number can be provided to accelerate calls. @@ -333,7 +342,8 @@ type ExemplarAppender interface { // HistogramAppender provides an interface for appending histograms to the storage. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appendable will be removed soon (ETA: Q2 2026). type HistogramAppender interface { // AppendHistogram adds a histogram for the given series labels. An // optional reference number can be provided to accelerate calls. A @@ -365,7 +375,8 @@ type HistogramAppender interface { // MetadataUpdater provides an interface for associating metadata to stored series. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appendable will be removed soon (ETA: Q2 2026). type MetadataUpdater interface { // UpdateMetadata updates a metadata entry for the given series and labels. // A series reference number is returned which can be used to modify the @@ -379,7 +390,8 @@ type MetadataUpdater interface { // StartTimestampAppender provides an interface for appending ST to storage. // -// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). +// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// Appendable will be removed soon (ETA: Q2 2026). type StartTimestampAppender interface { // AppendSTZeroSample adds synthetic zero sample for the given st timestamp, // which will be associated with given series, labels and the incoming diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 648c91c955..4c13eb25fd 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -63,6 +63,8 @@ type Storage struct { localStartTimeCallback startTimeCallback } +var _ storage.Storage = &Storage{} + // NewStorage returns a remote.Storage. func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage { if l == nil { @@ -193,6 +195,11 @@ func (s *Storage) Appender(ctx context.Context) storage.Appender { return s.rws.Appender(ctx) } +// AppenderV2 implements storage.Storage. +func (s *Storage) AppenderV2(ctx context.Context) storage.AppenderV2 { + return s.rws.AppenderV2(ctx) +} + // LowestSentTimestamp returns the lowest sent timestamp across all queues. func (s *Storage) LowestSentTimestamp() int64 { return s.rws.LowestSentTimestamp() diff --git a/storage/remote/write.go b/storage/remote/write.go index 1a036c1795..9bb443bd46 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -238,8 +238,20 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { // Appender implements storage.Storage. func (rws *WriteStorage) Appender(context.Context) storage.Appender { return ×tampTracker{ - writeStorage: rws, - highestRecvTimestamp: rws.highestTimestamp, + baseTimestampTracker: baseTimestampTracker{ + writeStorage: rws, + highestRecvTimestamp: rws.highestTimestamp, + }, + } +} + +// AppenderV2 implements storage.Storage. +func (rws *WriteStorage) AppenderV2(context.Context) storage.AppenderV2 { + return ×tampTrackerV2{ + baseTimestampTracker: baseTimestampTracker{ + writeStorage: rws, + highestRecvTimestamp: rws.highestTimestamp, + }, } } @@ -282,9 +294,9 @@ func (rws *WriteStorage) Close() error { return nil } -type timestampTracker struct { - writeStorage *WriteStorage - appendOptions *storage.AppendOptions +type baseTimestampTracker struct { + writeStorage *WriteStorage + samples int64 exemplars int64 histograms int64 @@ -292,6 +304,12 @@ type timestampTracker struct { highestRecvTimestamp *maxTimestamp } +type timestampTracker struct { + baseTimestampTracker + + appendOptions *storage.AppendOptions +} + func (t *timestampTracker) SetOptions(opts *storage.AppendOptions) { t.appendOptions = opts } @@ -345,7 +363,7 @@ func (*timestampTracker) UpdateMetadata(storage.SeriesRef, labels.Labels, metada } // Commit implements storage.Appender. -func (t *timestampTracker) Commit() error { +func (t *baseTimestampTracker) Commit() error { t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms) samplesIn.Add(float64(t.samples)) @@ -356,6 +374,24 @@ func (t *timestampTracker) Commit() error { } // Rollback implements storage.Appender. -func (*timestampTracker) Rollback() error { +func (*baseTimestampTracker) Rollback() error { return nil } + +type timestampTrackerV2 struct { + baseTimestampTracker +} + +func (t *timestampTrackerV2) Append(ref storage.SeriesRef, _ labels.Labels, _, ts int64, _ float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) { + switch { + case fh != nil, h != nil: + t.histograms++ + default: + t.samples++ + } + if ts > t.highestTimestamp { + t.highestTimestamp = ts + } + t.exemplars += int64(len(opts.Exemplars)) + return ref, nil +} diff --git a/util/teststorage/appender.go b/util/teststorage/appender.go index e8c4c495c1..73d4f4944d 100644 --- a/util/teststorage/appender.go +++ b/util/teststorage/appender.go @@ -65,13 +65,17 @@ func (s Sample) String() string { // Print all value types on purpose, to catch bugs for appending multiple sample types at once. h := "" if s.H != nil { - h = s.H.String() + h = "" + s.H.String() } fh := "" if s.FH != nil { - fh = s.FH.String() + fh = " " + s.FH.String() } - b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v\n", s.L.String(), s.V, h, fh, s.ST, s.T)) + b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v", s.L.String(), s.V, h, fh, s.ST, s.T)) + if len(s.ES) > 0 { + b.WriteString(fmt.Sprintf(" %v", s.ES)) + } + b.WriteString("\n") return b.String() } @@ -104,7 +108,8 @@ type Appendable struct { rolledbackSamples []Sample // Optional chain (Appender will collect samples, then run next). - next storage.Appendable + next storage.Appendable + nextV2 storage.AppendableV2 } // NewAppendable returns mock Appendable. @@ -112,12 +117,18 @@ func NewAppendable() *Appendable { return &Appendable{} } -// Then chains another appender from the provided appendable for the Appender calls. +// Then chains another appender from the provided Appendable for the Appender calls. func (a *Appendable) Then(appendable storage.Appendable) *Appendable { a.next = appendable return a } +// ThenV2 chains another appenderV2 from the provided AppendableV2 for the AppenderV2 calls. +func (a *Appendable) ThenV2(appendable storage.AppendableV2) *Appendable { + a.nextV2 = appendable + return a +} + // WithErrs allows injecting errors to the appender. func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendExemplarsError, commitErr error) *Appendable { a.appendErrFn = appendErrFn @@ -130,6 +141,9 @@ func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendEx func (a *Appendable) PendingSamples() []Sample { a.mtx.Lock() defer a.mtx.Unlock() + if len(a.pendingSamples) == 0 { + return nil + } ret := make([]Sample, len(a.pendingSamples)) copy(ret, a.pendingSamples) @@ -140,6 +154,9 @@ func (a *Appendable) PendingSamples() []Sample { func (a *Appendable) ResultSamples() []Sample { a.mtx.Lock() defer a.mtx.Unlock() + if len(a.resultSamples) == 0 { + return nil + } ret := make([]Sample, len(a.resultSamples)) copy(ret, a.resultSamples) @@ -149,7 +166,11 @@ func (a *Appendable) ResultSamples() []Sample { // RolledbackSamples returns rolled back samples. func (a *Appendable) RolledbackSamples() []Sample { a.mtx.Lock() + defer a.mtx.Unlock() + if len(a.rolledbackSamples) == 0 { + return nil + } ret := make([]Sample, len(a.rolledbackSamples)) copy(ret, a.rolledbackSamples) @@ -206,10 +227,10 @@ func (a *Appendable) String() string { var errCloseAppender = errors.New("appender was already committed/rolledback") type appender struct { - err error - next storage.Appender + err error - a *Appendable + next storage.Appender + a *Appendable } func (a *appender) checkErr() error { @@ -227,6 +248,8 @@ func (a *Appendable) Appender(ctx context.Context) storage.Appender { if a.next != nil { ret.next = a.next.Appender(ctx) + } else if a.nextV2 != nil { + ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.Appender() invoked with .ThenV2 but no .Then was supplied; likely bug")) } return ret } @@ -397,3 +420,73 @@ func (a *appender) Rollback() error { } return nil } + +type appenderV2 struct { + appender + + next storage.AppenderV2 +} + +func (a *Appendable) AppenderV2(ctx context.Context) storage.AppenderV2 { + ret := &appenderV2{appender: appender{a: a}} + if a.openAppenders.Inc() > 1 { + ret.err = errors.New("teststorage.Appendable.AppenderV2() concurrent use is not supported; attempted opening new AppenderV2() without Commit/Rollback of the previous one. Extend the implementation if concurrent mock is needed") + } + + if a.nextV2 != nil { + ret.next = a.nextV2.AppenderV2(ctx) + } else if a.next != nil { + ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.AppenderV2() invoked with .Then but no .ThenV2 was supplied; likely bug")) + } + return ret +} + +func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) { + if err := a.checkErr(); err != nil { + return 0, err + } + + if a.a.appendErrFn != nil { + if err := a.a.appendErrFn(ls); err != nil { + return 0, err + } + } + + a.a.mtx.Lock() + var es []exemplar.Exemplar + if len(opts.Exemplars) > 0 { + // As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse. + es = make([]exemplar.Exemplar, len(opts.Exemplars)) + copy(es, opts.Exemplars) + } + a.a.pendingSamples = append(a.a.pendingSamples, Sample{ + MF: opts.MetricFamilyName, + M: opts.Metadata, + L: ls, + ST: st, T: t, + V: v, H: h, FH: fh, + ES: es, + }) + a.a.mtx.Unlock() + + var partialErr error + if a.a.appendExemplarsError != nil { + var exErrs []error + for range opts.Exemplars { + exErrs = append(exErrs, a.a.appendExemplarsError) + } + if len(exErrs) > 0 { + partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs} + } + } + + if a.next != nil && partialErr == nil { + return a.next.Append(ref, ls, st, t, v, h, fh, opts) + } + + if ref == 0 { + // Use labels hash as a stand-in for unique series reference, to avoid having to track all series. + ref = storage.SeriesRef(ls.Hash()) + } + return ref, partialErr +} diff --git a/util/teststorage/appender_test.go b/util/teststorage/appender_test.go index fcbb9ddf71..31cdca50b4 100644 --- a/util/teststorage/appender_test.go +++ b/util/teststorage/appender_test.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/testutil" ) @@ -91,7 +92,7 @@ func requireNotEqual(t testing.TB, a, b any) { "b: %s", a, b)) } -func TestConcurrentAppender_Panic(t *testing.T) { +func TestConcurrentAppender_ReturnsErrAppender(t *testing.T) { a := NewAppendable() // Non-concurrent multiple use if fine. @@ -129,3 +130,40 @@ func TestConcurrentAppender_Panic(t *testing.T) { require.Error(t, app.Commit()) require.Error(t, app.Rollback()) } + +func TestConcurrentAppenderV2_ReturnsErrAppender(t *testing.T) { + a := NewAppendable() + + // Non-concurrent multiple use if fine. + app := a.AppenderV2(t.Context()) + require.Equal(t, int32(1), a.openAppenders.Load()) + require.NoError(t, app.Commit()) + // Repeated commit fails. + require.Error(t, app.Commit()) + + app = a.AppenderV2(t.Context()) + require.NoError(t, app.Rollback()) + // Commit after rollback fails. + require.Error(t, app.Commit()) + + a.WithErrs( + nil, + nil, + errors.New("commit err"), + ) + app = a.AppenderV2(t.Context()) + require.Error(t, app.Commit()) + + a.WithErrs(nil, nil, nil) + app = a.AppenderV2(t.Context()) + require.NoError(t, app.Commit()) + require.Equal(t, int32(0), a.openAppenders.Load()) + + // Concurrent use should return appender that errors. + _ = a.AppenderV2(t.Context()) + app = a.AppenderV2(t.Context()) + _, err := app.Append(0, labels.EmptyLabels(), 0, 0, 0, nil, nil, storage.AOptions{}) + require.Error(t, err) + require.Error(t, app.Commit()) + require.Error(t, app.Rollback()) +}