mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-04 22:22:39 -04:00
refactor(storage): add AppendableV2 support to Storage interface
Signed-off-by: bwplotka <bwplotka@gmail.com> comm Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
c97952c21f
commit
c869386039
8 changed files with 298 additions and 30 deletions
|
|
@ -1742,6 +1742,14 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
|
|||
return notReadyAppender{}
|
||||
}
|
||||
|
||||
// AppenderV2 implements the Storage interface.
|
||||
func (s *readyStorage) AppenderV2(ctx context.Context) storage.AppenderV2 {
|
||||
if x := s.get(); x != nil {
|
||||
return x.AppenderV2(ctx)
|
||||
}
|
||||
return notReadyAppenderV2{}
|
||||
}
|
||||
|
||||
type notReadyAppender struct{}
|
||||
|
||||
// SetOptions does nothing in this appender implementation.
|
||||
|
|
@ -1775,6 +1783,15 @@ func (notReadyAppender) Commit() error { return tsdb.ErrNotReady }
|
|||
|
||||
func (notReadyAppender) Rollback() error { return tsdb.ErrNotReady }
|
||||
|
||||
type notReadyAppenderV2 struct{}
|
||||
|
||||
func (notReadyAppenderV2) Append(storage.SeriesRef, labels.Labels, int64, int64, float64, *histogram.Histogram, *histogram.FloatHistogram, storage.AOptions) (storage.SeriesRef, error) {
|
||||
return 0, tsdb.ErrNotReady
|
||||
}
|
||||
func (notReadyAppenderV2) Commit() error { return tsdb.ErrNotReady }
|
||||
|
||||
func (notReadyAppenderV2) Rollback() error { return tsdb.ErrNotReady }
|
||||
|
||||
// Close implements the Storage interface.
|
||||
func (s *readyStorage) Close() error {
|
||||
if x := s.get(); x != nil {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
|
@ -130,6 +130,19 @@ func (f *fanout) Appender(ctx context.Context) Appender {
|
|||
}
|
||||
}
|
||||
|
||||
func (f *fanout) AppenderV2(ctx context.Context) AppenderV2 {
|
||||
primary := f.primary.AppenderV2(ctx)
|
||||
secondaries := make([]AppenderV2, 0, len(f.secondaries))
|
||||
for _, storage := range f.secondaries {
|
||||
secondaries = append(secondaries, storage.AppenderV2(ctx))
|
||||
}
|
||||
return &fanoutAppenderV2{
|
||||
logger: f.logger,
|
||||
primary: primary,
|
||||
secondaries: secondaries,
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the storage and all its underlying resources.
|
||||
func (f *fanout) Close() error {
|
||||
errs := tsdb_errors.NewMulti(f.primary.Close())
|
||||
|
|
@ -270,3 +283,54 @@ func (f *fanoutAppender) Rollback() (err error) {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type fanoutAppenderV2 struct {
|
||||
logger *slog.Logger
|
||||
|
||||
primary AppenderV2
|
||||
secondaries []AppenderV2
|
||||
}
|
||||
|
||||
func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) {
|
||||
ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts)
|
||||
if err != nil {
|
||||
return ref, err
|
||||
}
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func (f *fanoutAppenderV2) Commit() (err error) {
|
||||
err = f.primary.Commit()
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
if err == nil {
|
||||
err = appender.Commit()
|
||||
} else {
|
||||
if rollbackErr := appender.Rollback(); rollbackErr != nil {
|
||||
f.logger.Error("Squashed rollback error on commit", "err", rollbackErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *fanoutAppenderV2) Rollback() (err error) {
|
||||
err = f.primary.Rollback()
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
rollbackErr := appender.Rollback()
|
||||
switch {
|
||||
case err == nil:
|
||||
err = rollbackErr
|
||||
case rollbackErr != nil:
|
||||
f.logger.Error("Squashed rollback error on rollback", "err", rollbackErr)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2020 The Prometheus Authors
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
|
@ -224,9 +224,10 @@ type errChunkQuerier struct{ errQuerier }
|
|||
func (errStorage) ChunkQuerier(_, _ int64) (storage.ChunkQuerier, error) {
|
||||
return errChunkQuerier{}, nil
|
||||
}
|
||||
func (errStorage) Appender(context.Context) storage.Appender { return nil }
|
||||
func (errStorage) StartTime() (int64, error) { return 0, nil }
|
||||
func (errStorage) Close() error { return nil }
|
||||
func (errStorage) Appender(context.Context) storage.Appender { return nil }
|
||||
func (errStorage) AppenderV2(context.Context) storage.AppenderV2 { return nil }
|
||||
func (errStorage) StartTime() (int64, error) { return 0, nil }
|
||||
func (errStorage) Close() error { return nil }
|
||||
|
||||
func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
||||
return storage.ErrSeriesSet(errSelect)
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2014 The Prometheus Authors
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
|
@ -61,7 +61,8 @@ type SeriesRef uint64
|
|||
|
||||
// Appendable allows creating Appender.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appendable will be removed soon (ETA: Q2 2026).
|
||||
type Appendable interface {
|
||||
// Appender returns a new appender for the storage.
|
||||
//
|
||||
|
|
@ -77,10 +78,15 @@ type SampleAndChunkQueryable interface {
|
|||
}
|
||||
|
||||
// Storage ingests and manages samples, along with various indexes. All methods
|
||||
// are goroutine-safe. Storage implements storage.Appender.
|
||||
// are goroutine-safe.
|
||||
type Storage interface {
|
||||
SampleAndChunkQueryable
|
||||
|
||||
// Appendable allows appending to storage.
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appendable will be removed soon (ETA: Q2 2026).
|
||||
Appendable
|
||||
AppendableV2
|
||||
|
||||
// StartTime returns the oldest timestamp stored in the storage.
|
||||
StartTime() (int64, error)
|
||||
|
|
@ -261,7 +267,8 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) {
|
|||
|
||||
// AppendOptions provides options for implementations of the Appender interface.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appendable will be removed soon (ETA: Q2 2026).
|
||||
type AppendOptions struct {
|
||||
// DiscardOutOfOrder tells implementation that this append should not be out
|
||||
// of order. An OOO append MUST be rejected with storage.ErrOutOfOrderSample
|
||||
|
|
@ -278,7 +285,8 @@ type AppendOptions struct {
|
|||
// series. I.e. samples are not reordered per timestamp, or by float/histogram
|
||||
// type.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appendable will be removed soon (ETA: Q2 2026).
|
||||
type Appender interface {
|
||||
AppenderTransaction
|
||||
|
||||
|
|
@ -315,7 +323,8 @@ type GetRef interface {
|
|||
// ExemplarAppender provides an interface for adding samples to exemplar storage, which
|
||||
// within Prometheus is in-memory only.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appendable will be removed soon (ETA: Q2 2026).
|
||||
type ExemplarAppender interface {
|
||||
// AppendExemplar adds an exemplar for the given series labels.
|
||||
// An optional reference number can be provided to accelerate calls.
|
||||
|
|
@ -333,7 +342,8 @@ type ExemplarAppender interface {
|
|||
|
||||
// HistogramAppender provides an interface for appending histograms to the storage.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appendable will be removed soon (ETA: Q2 2026).
|
||||
type HistogramAppender interface {
|
||||
// AppendHistogram adds a histogram for the given series labels. An
|
||||
// optional reference number can be provided to accelerate calls. A
|
||||
|
|
@ -365,7 +375,8 @@ type HistogramAppender interface {
|
|||
|
||||
// MetadataUpdater provides an interface for associating metadata to stored series.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appendable will be removed soon (ETA: Q2 2026).
|
||||
type MetadataUpdater interface {
|
||||
// UpdateMetadata updates a metadata entry for the given series and labels.
|
||||
// A series reference number is returned which can be used to modify the
|
||||
|
|
@ -379,7 +390,8 @@ type MetadataUpdater interface {
|
|||
|
||||
// StartTimestampAppender provides an interface for appending ST to storage.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appendable will be removed soon (ETA: Q2 2026).
|
||||
type StartTimestampAppender interface {
|
||||
// AppendSTZeroSample adds synthetic zero sample for the given st timestamp,
|
||||
// which will be associated with given series, labels and the incoming
|
||||
|
|
|
|||
|
|
@ -63,6 +63,8 @@ type Storage struct {
|
|||
localStartTimeCallback startTimeCallback
|
||||
}
|
||||
|
||||
var _ storage.Storage = &Storage{}
|
||||
|
||||
// NewStorage returns a remote.Storage.
|
||||
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage {
|
||||
if l == nil {
|
||||
|
|
@ -193,6 +195,11 @@ func (s *Storage) Appender(ctx context.Context) storage.Appender {
|
|||
return s.rws.Appender(ctx)
|
||||
}
|
||||
|
||||
// AppenderV2 implements storage.Storage.
|
||||
func (s *Storage) AppenderV2(ctx context.Context) storage.AppenderV2 {
|
||||
return s.rws.AppenderV2(ctx)
|
||||
}
|
||||
|
||||
// LowestSentTimestamp returns the lowest sent timestamp across all queues.
|
||||
func (s *Storage) LowestSentTimestamp() int64 {
|
||||
return s.rws.LowestSentTimestamp()
|
||||
|
|
|
|||
|
|
@ -238,8 +238,20 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
|||
// Appender implements storage.Storage.
|
||||
func (rws *WriteStorage) Appender(context.Context) storage.Appender {
|
||||
return ×tampTracker{
|
||||
writeStorage: rws,
|
||||
highestRecvTimestamp: rws.highestTimestamp,
|
||||
baseTimestampTracker: baseTimestampTracker{
|
||||
writeStorage: rws,
|
||||
highestRecvTimestamp: rws.highestTimestamp,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// AppenderV2 implements storage.Storage.
|
||||
func (rws *WriteStorage) AppenderV2(context.Context) storage.AppenderV2 {
|
||||
return ×tampTrackerV2{
|
||||
baseTimestampTracker: baseTimestampTracker{
|
||||
writeStorage: rws,
|
||||
highestRecvTimestamp: rws.highestTimestamp,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -282,9 +294,9 @@ func (rws *WriteStorage) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type timestampTracker struct {
|
||||
writeStorage *WriteStorage
|
||||
appendOptions *storage.AppendOptions
|
||||
type baseTimestampTracker struct {
|
||||
writeStorage *WriteStorage
|
||||
|
||||
samples int64
|
||||
exemplars int64
|
||||
histograms int64
|
||||
|
|
@ -292,6 +304,12 @@ type timestampTracker struct {
|
|||
highestRecvTimestamp *maxTimestamp
|
||||
}
|
||||
|
||||
type timestampTracker struct {
|
||||
baseTimestampTracker
|
||||
|
||||
appendOptions *storage.AppendOptions
|
||||
}
|
||||
|
||||
func (t *timestampTracker) SetOptions(opts *storage.AppendOptions) {
|
||||
t.appendOptions = opts
|
||||
}
|
||||
|
|
@ -345,7 +363,7 @@ func (*timestampTracker) UpdateMetadata(storage.SeriesRef, labels.Labels, metada
|
|||
}
|
||||
|
||||
// Commit implements storage.Appender.
|
||||
func (t *timestampTracker) Commit() error {
|
||||
func (t *baseTimestampTracker) Commit() error {
|
||||
t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms)
|
||||
|
||||
samplesIn.Add(float64(t.samples))
|
||||
|
|
@ -356,6 +374,24 @@ func (t *timestampTracker) Commit() error {
|
|||
}
|
||||
|
||||
// Rollback implements storage.Appender.
|
||||
func (*timestampTracker) Rollback() error {
|
||||
func (*baseTimestampTracker) Rollback() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type timestampTrackerV2 struct {
|
||||
baseTimestampTracker
|
||||
}
|
||||
|
||||
func (t *timestampTrackerV2) Append(ref storage.SeriesRef, _ labels.Labels, _, ts int64, _ float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
|
||||
switch {
|
||||
case fh != nil, h != nil:
|
||||
t.histograms++
|
||||
default:
|
||||
t.samples++
|
||||
}
|
||||
if ts > t.highestTimestamp {
|
||||
t.highestTimestamp = ts
|
||||
}
|
||||
t.exemplars += int64(len(opts.Exemplars))
|
||||
return ref, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,13 +65,17 @@ func (s Sample) String() string {
|
|||
// Print all value types on purpose, to catch bugs for appending multiple sample types at once.
|
||||
h := ""
|
||||
if s.H != nil {
|
||||
h = s.H.String()
|
||||
h = "" + s.H.String()
|
||||
}
|
||||
fh := ""
|
||||
if s.FH != nil {
|
||||
fh = s.FH.String()
|
||||
fh = " " + s.FH.String()
|
||||
}
|
||||
b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v\n", s.L.String(), s.V, h, fh, s.ST, s.T))
|
||||
b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v", s.L.String(), s.V, h, fh, s.ST, s.T))
|
||||
if len(s.ES) > 0 {
|
||||
b.WriteString(fmt.Sprintf(" %v", s.ES))
|
||||
}
|
||||
b.WriteString("\n")
|
||||
return b.String()
|
||||
}
|
||||
|
||||
|
|
@ -104,7 +108,8 @@ type Appendable struct {
|
|||
rolledbackSamples []Sample
|
||||
|
||||
// Optional chain (Appender will collect samples, then run next).
|
||||
next storage.Appendable
|
||||
next storage.Appendable
|
||||
nextV2 storage.AppendableV2
|
||||
}
|
||||
|
||||
// NewAppendable returns mock Appendable.
|
||||
|
|
@ -112,12 +117,18 @@ func NewAppendable() *Appendable {
|
|||
return &Appendable{}
|
||||
}
|
||||
|
||||
// Then chains another appender from the provided appendable for the Appender calls.
|
||||
// Then chains another appender from the provided Appendable for the Appender calls.
|
||||
func (a *Appendable) Then(appendable storage.Appendable) *Appendable {
|
||||
a.next = appendable
|
||||
return a
|
||||
}
|
||||
|
||||
// ThenV2 chains another appenderV2 from the provided AppendableV2 for the AppenderV2 calls.
|
||||
func (a *Appendable) ThenV2(appendable storage.AppendableV2) *Appendable {
|
||||
a.nextV2 = appendable
|
||||
return a
|
||||
}
|
||||
|
||||
// WithErrs allows injecting errors to the appender.
|
||||
func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendExemplarsError, commitErr error) *Appendable {
|
||||
a.appendErrFn = appendErrFn
|
||||
|
|
@ -130,6 +141,9 @@ func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendEx
|
|||
func (a *Appendable) PendingSamples() []Sample {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
if len(a.pendingSamples) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ret := make([]Sample, len(a.pendingSamples))
|
||||
copy(ret, a.pendingSamples)
|
||||
|
|
@ -140,6 +154,9 @@ func (a *Appendable) PendingSamples() []Sample {
|
|||
func (a *Appendable) ResultSamples() []Sample {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
if len(a.resultSamples) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ret := make([]Sample, len(a.resultSamples))
|
||||
copy(ret, a.resultSamples)
|
||||
|
|
@ -149,7 +166,11 @@ func (a *Appendable) ResultSamples() []Sample {
|
|||
// RolledbackSamples returns rolled back samples.
|
||||
func (a *Appendable) RolledbackSamples() []Sample {
|
||||
a.mtx.Lock()
|
||||
|
||||
defer a.mtx.Unlock()
|
||||
if len(a.rolledbackSamples) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ret := make([]Sample, len(a.rolledbackSamples))
|
||||
copy(ret, a.rolledbackSamples)
|
||||
|
|
@ -206,10 +227,10 @@ func (a *Appendable) String() string {
|
|||
var errCloseAppender = errors.New("appender was already committed/rolledback")
|
||||
|
||||
type appender struct {
|
||||
err error
|
||||
next storage.Appender
|
||||
err error
|
||||
|
||||
a *Appendable
|
||||
next storage.Appender
|
||||
a *Appendable
|
||||
}
|
||||
|
||||
func (a *appender) checkErr() error {
|
||||
|
|
@ -227,6 +248,8 @@ func (a *Appendable) Appender(ctx context.Context) storage.Appender {
|
|||
|
||||
if a.next != nil {
|
||||
ret.next = a.next.Appender(ctx)
|
||||
} else if a.nextV2 != nil {
|
||||
ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.Appender() invoked with .ThenV2 but no .Then was supplied; likely bug"))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
|
@ -397,3 +420,73 @@ func (a *appender) Rollback() error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type appenderV2 struct {
|
||||
appender
|
||||
|
||||
next storage.AppenderV2
|
||||
}
|
||||
|
||||
func (a *Appendable) AppenderV2(ctx context.Context) storage.AppenderV2 {
|
||||
ret := &appenderV2{appender: appender{a: a}}
|
||||
if a.openAppenders.Inc() > 1 {
|
||||
ret.err = errors.New("teststorage.Appendable.AppenderV2() concurrent use is not supported; attempted opening new AppenderV2() without Commit/Rollback of the previous one. Extend the implementation if concurrent mock is needed")
|
||||
}
|
||||
|
||||
if a.nextV2 != nil {
|
||||
ret.next = a.nextV2.AppenderV2(ctx)
|
||||
} else if a.next != nil {
|
||||
ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.AppenderV2() invoked with .Then but no .ThenV2 was supplied; likely bug"))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if a.a.appendErrFn != nil {
|
||||
if err := a.a.appendErrFn(ls); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
var es []exemplar.Exemplar
|
||||
if len(opts.Exemplars) > 0 {
|
||||
// As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse.
|
||||
es = make([]exemplar.Exemplar, len(opts.Exemplars))
|
||||
copy(es, opts.Exemplars)
|
||||
}
|
||||
a.a.pendingSamples = append(a.a.pendingSamples, Sample{
|
||||
MF: opts.MetricFamilyName,
|
||||
M: opts.Metadata,
|
||||
L: ls,
|
||||
ST: st, T: t,
|
||||
V: v, H: h, FH: fh,
|
||||
ES: es,
|
||||
})
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
var partialErr error
|
||||
if a.a.appendExemplarsError != nil {
|
||||
var exErrs []error
|
||||
for range opts.Exemplars {
|
||||
exErrs = append(exErrs, a.a.appendExemplarsError)
|
||||
}
|
||||
if len(exErrs) > 0 {
|
||||
partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs}
|
||||
}
|
||||
}
|
||||
|
||||
if a.next != nil && partialErr == nil {
|
||||
return a.next.Append(ref, ls, st, t, v, h, fh, opts)
|
||||
}
|
||||
|
||||
if ref == 0 {
|
||||
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
|
||||
ref = storage.SeriesRef(ls.Hash())
|
||||
}
|
||||
return ref, partialErr
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/metadata"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
|
|
@ -91,7 +92,7 @@ func requireNotEqual(t testing.TB, a, b any) {
|
|||
"b: %s", a, b))
|
||||
}
|
||||
|
||||
func TestConcurrentAppender_Panic(t *testing.T) {
|
||||
func TestConcurrentAppender_ReturnsErrAppender(t *testing.T) {
|
||||
a := NewAppendable()
|
||||
|
||||
// Non-concurrent multiple use if fine.
|
||||
|
|
@ -129,3 +130,40 @@ func TestConcurrentAppender_Panic(t *testing.T) {
|
|||
require.Error(t, app.Commit())
|
||||
require.Error(t, app.Rollback())
|
||||
}
|
||||
|
||||
func TestConcurrentAppenderV2_ReturnsErrAppender(t *testing.T) {
|
||||
a := NewAppendable()
|
||||
|
||||
// Non-concurrent multiple use if fine.
|
||||
app := a.AppenderV2(t.Context())
|
||||
require.Equal(t, int32(1), a.openAppenders.Load())
|
||||
require.NoError(t, app.Commit())
|
||||
// Repeated commit fails.
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
app = a.AppenderV2(t.Context())
|
||||
require.NoError(t, app.Rollback())
|
||||
// Commit after rollback fails.
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
a.WithErrs(
|
||||
nil,
|
||||
nil,
|
||||
errors.New("commit err"),
|
||||
)
|
||||
app = a.AppenderV2(t.Context())
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
a.WithErrs(nil, nil, nil)
|
||||
app = a.AppenderV2(t.Context())
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, int32(0), a.openAppenders.Load())
|
||||
|
||||
// Concurrent use should return appender that errors.
|
||||
_ = a.AppenderV2(t.Context())
|
||||
app = a.AppenderV2(t.Context())
|
||||
_, err := app.Append(0, labels.EmptyLabels(), 0, 0, 0, nil, nil, storage.AOptions{})
|
||||
require.Error(t, err)
|
||||
require.Error(t, app.Commit())
|
||||
require.Error(t, app.Rollback())
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue