From 0e2569ad33a1ea98ddc8309501d02752eaf74806 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Mon, 12 Jan 2026 08:20:01 +0000 Subject: [PATCH] feat(teststorage)[PART4a]: Add AppendableV2 support for mock Appendable Signed-off-by: bwplotka --- util/teststorage/appender.go | 113 +++++++++++++++++++++++++++--- util/teststorage/appender_test.go | 38 ++++++++++ 2 files changed, 143 insertions(+), 8 deletions(-) diff --git a/util/teststorage/appender.go b/util/teststorage/appender.go index 058a09561c..795c9a4f47 100644 --- a/util/teststorage/appender.go +++ b/util/teststorage/appender.go @@ -65,13 +65,17 @@ func (s Sample) String() string { // Print all value types on purpose, to catch bugs for appending multiple sample types at once. h := "" if s.H != nil { - h = s.H.String() + h = " " + s.H.String() } fh := "" if s.FH != nil { - fh = s.FH.String() + fh = " " + s.FH.String() } - b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v\n", s.L.String(), s.V, h, fh, s.ST, s.T)) + b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v", s.L.String(), s.V, h, fh, s.ST, s.T)) + if len(s.ES) > 0 { + b.WriteString(fmt.Sprintf(" %v", s.ES)) + } + b.WriteString("\n") return b.String() } @@ -104,7 +108,8 @@ type Appendable struct { rolledbackSamples []Sample // Optional chain (Appender will collect samples, then run next). - next storage.Appendable + next storage.Appendable + nextV2 storage.AppendableV2 } // NewAppendable returns mock Appendable. @@ -112,12 +117,18 @@ func NewAppendable() *Appendable { return &Appendable{} } -// Then chains another appender from the provided appendable for the Appender calls. +// Then chains another appender from the provided Appendable for the Appender calls. func (a *Appendable) Then(appendable storage.Appendable) *Appendable { a.next = appendable return a } +// ThenV2 chains another appenderV2 from the provided AppendableV2 for the AppenderV2 calls. +func (a *Appendable) ThenV2(appendable storage.AppendableV2) *Appendable { + a.nextV2 = appendable + return a +} + // WithErrs allows injecting errors to the appender. func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendExemplarsError, commitErr error) *Appendable { a.appendErrFn = appendErrFn @@ -130,6 +141,9 @@ func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendEx func (a *Appendable) PendingSamples() []Sample { a.mtx.Lock() defer a.mtx.Unlock() + if len(a.pendingSamples) == 0 { + return nil + } ret := make([]Sample, len(a.pendingSamples)) copy(ret, a.pendingSamples) @@ -140,6 +154,9 @@ func (a *Appendable) PendingSamples() []Sample { func (a *Appendable) ResultSamples() []Sample { a.mtx.Lock() defer a.mtx.Unlock() + if len(a.resultSamples) == 0 { + return nil + } ret := make([]Sample, len(a.resultSamples)) copy(ret, a.resultSamples) @@ -150,6 +167,9 @@ func (a *Appendable) ResultSamples() []Sample { func (a *Appendable) RolledbackSamples() []Sample { a.mtx.Lock() defer a.mtx.Unlock() + if len(a.rolledbackSamples) == 0 { + return nil + } ret := make([]Sample, len(a.rolledbackSamples)) copy(ret, a.rolledbackSamples) @@ -206,10 +226,10 @@ func (a *Appendable) String() string { var errClosedAppender = errors.New("appender was already committed/rolledback") type appender struct { - err error - next storage.Appender + err error - a *Appendable + next storage.Appender + a *Appendable } func (a *appender) checkErr() error { @@ -227,6 +247,11 @@ func (a *Appendable) Appender(ctx context.Context) storage.Appender { if a.next != nil { ret.next = a.next.Appender(ctx) + } else if a.nextV2 != nil { + ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.Appender() invoked with .ThenV2 but no .Then was supplied; likely bug")) + } + if a.appendExemplarsError != nil { + ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.Appender() invoked with .WithErr exemplar partial error injection. This is not supported for Appender V1 flow")) } return ret } @@ -397,3 +422,75 @@ func (a *appender) Rollback() error { } return nil } + +type appenderV2 struct { + appender + + next storage.AppenderV2 +} + +func (a *Appendable) AppenderV2(ctx context.Context) storage.AppenderV2 { + ret := &appenderV2{appender: appender{a: a}} + if a.openAppenders.Inc() > 1 { + ret.err = errors.New("teststorage.Appendable.AppenderV2() concurrent use is not supported; attempted opening new AppenderV2() without Commit/Rollback of the previous one. Extend the implementation if concurrent mock is needed") + } + + if a.nextV2 != nil { + ret.next = a.nextV2.AppenderV2(ctx) + } else if a.next != nil { + ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.AppenderV2() invoked with .Then but no .ThenV2 was supplied; likely bug")) + } + return ret +} + +func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (_ storage.SeriesRef, err error) { + if err := a.checkErr(); err != nil { + return 0, err + } + + if a.a.appendErrFn != nil { + if err := a.a.appendErrFn(ls); err != nil { + return 0, err + } + } + + a.a.mtx.Lock() + var es []exemplar.Exemplar + if len(opts.Exemplars) > 0 { + // As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse. + es = make([]exemplar.Exemplar, len(opts.Exemplars)) + copy(es, opts.Exemplars) + } + a.a.pendingSamples = append(a.a.pendingSamples, Sample{ + MF: opts.MetricFamilyName, + M: opts.Metadata, + L: ls, + ST: st, T: t, + V: v, H: h, FH: fh, + ES: es, + }) + a.a.mtx.Unlock() + + var partialErr error + if a.a.appendExemplarsError != nil { + var exErrs []error + for range opts.Exemplars { + exErrs = append(exErrs, a.a.appendExemplarsError) + } + if len(exErrs) > 0 { + partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs} + } + } + + if a.next != nil { + ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts) + if err != nil { + return ref, err + } + } + ref, err = computeOrCheckRef(ref, ls) + if err != nil { + return ref, err + } + return ref, partialErr +} diff --git a/util/teststorage/appender_test.go b/util/teststorage/appender_test.go index 8c2a825c3a..31cdca50b4 100644 --- a/util/teststorage/appender_test.go +++ b/util/teststorage/appender_test.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/testutil" ) @@ -129,3 +130,40 @@ func TestConcurrentAppender_ReturnsErrAppender(t *testing.T) { require.Error(t, app.Commit()) require.Error(t, app.Rollback()) } + +func TestConcurrentAppenderV2_ReturnsErrAppender(t *testing.T) { + a := NewAppendable() + + // Non-concurrent multiple use if fine. + app := a.AppenderV2(t.Context()) + require.Equal(t, int32(1), a.openAppenders.Load()) + require.NoError(t, app.Commit()) + // Repeated commit fails. + require.Error(t, app.Commit()) + + app = a.AppenderV2(t.Context()) + require.NoError(t, app.Rollback()) + // Commit after rollback fails. + require.Error(t, app.Commit()) + + a.WithErrs( + nil, + nil, + errors.New("commit err"), + ) + app = a.AppenderV2(t.Context()) + require.Error(t, app.Commit()) + + a.WithErrs(nil, nil, nil) + app = a.AppenderV2(t.Context()) + require.NoError(t, app.Commit()) + require.Equal(t, int32(0), a.openAppenders.Load()) + + // Concurrent use should return appender that errors. + _ = a.AppenderV2(t.Context()) + app = a.AppenderV2(t.Context()) + _, err := app.Append(0, labels.EmptyLabels(), 0, 0, 0, nil, nil, storage.AOptions{}) + require.Error(t, err) + require.Error(t, app.Commit()) + require.Error(t, app.Rollback()) +}