feat(teststorage)[PART4a]: Add AppendableV2 support for mock Appendable

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-01-12 08:20:01 +00:00
parent 9cb3641ccd
commit 0e2569ad33
2 changed files with 143 additions and 8 deletions

View file

@ -65,13 +65,17 @@ func (s Sample) String() string {
// Print all value types on purpose, to catch bugs for appending multiple sample types at once.
h := ""
if s.H != nil {
h = s.H.String()
h = " " + s.H.String()
}
fh := ""
if s.FH != nil {
fh = s.FH.String()
fh = " " + s.FH.String()
}
b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v\n", s.L.String(), s.V, h, fh, s.ST, s.T))
b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v", s.L.String(), s.V, h, fh, s.ST, s.T))
if len(s.ES) > 0 {
b.WriteString(fmt.Sprintf(" %v", s.ES))
}
b.WriteString("\n")
return b.String()
}
@ -104,7 +108,8 @@ type Appendable struct {
rolledbackSamples []Sample
// Optional chain (Appender will collect samples, then run next).
next storage.Appendable
next storage.Appendable
nextV2 storage.AppendableV2
}
// NewAppendable returns mock Appendable.
@ -112,12 +117,18 @@ func NewAppendable() *Appendable {
return &Appendable{}
}
// Then chains another appender from the provided appendable for the Appender calls.
// Then chains another appender from the provided Appendable for the Appender calls.
func (a *Appendable) Then(appendable storage.Appendable) *Appendable {
a.next = appendable
return a
}
// ThenV2 chains another appenderV2 from the provided AppendableV2 for the AppenderV2 calls.
func (a *Appendable) ThenV2(appendable storage.AppendableV2) *Appendable {
a.nextV2 = appendable
return a
}
// WithErrs allows injecting errors to the appender.
func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendExemplarsError, commitErr error) *Appendable {
a.appendErrFn = appendErrFn
@ -130,6 +141,9 @@ func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendEx
func (a *Appendable) PendingSamples() []Sample {
a.mtx.Lock()
defer a.mtx.Unlock()
if len(a.pendingSamples) == 0 {
return nil
}
ret := make([]Sample, len(a.pendingSamples))
copy(ret, a.pendingSamples)
@ -140,6 +154,9 @@ func (a *Appendable) PendingSamples() []Sample {
func (a *Appendable) ResultSamples() []Sample {
a.mtx.Lock()
defer a.mtx.Unlock()
if len(a.resultSamples) == 0 {
return nil
}
ret := make([]Sample, len(a.resultSamples))
copy(ret, a.resultSamples)
@ -150,6 +167,9 @@ func (a *Appendable) ResultSamples() []Sample {
func (a *Appendable) RolledbackSamples() []Sample {
a.mtx.Lock()
defer a.mtx.Unlock()
if len(a.rolledbackSamples) == 0 {
return nil
}
ret := make([]Sample, len(a.rolledbackSamples))
copy(ret, a.rolledbackSamples)
@ -206,10 +226,10 @@ func (a *Appendable) String() string {
var errClosedAppender = errors.New("appender was already committed/rolledback")
type appender struct {
err error
next storage.Appender
err error
a *Appendable
next storage.Appender
a *Appendable
}
func (a *appender) checkErr() error {
@ -227,6 +247,11 @@ func (a *Appendable) Appender(ctx context.Context) storage.Appender {
if a.next != nil {
ret.next = a.next.Appender(ctx)
} else if a.nextV2 != nil {
ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.Appender() invoked with .ThenV2 but no .Then was supplied; likely bug"))
}
if a.appendExemplarsError != nil {
ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.Appender() invoked with .WithErr exemplar partial error injection. This is not supported for Appender V1 flow"))
}
return ret
}
@ -397,3 +422,75 @@ func (a *appender) Rollback() error {
}
return nil
}
type appenderV2 struct {
appender
next storage.AppenderV2
}
func (a *Appendable) AppenderV2(ctx context.Context) storage.AppenderV2 {
ret := &appenderV2{appender: appender{a: a}}
if a.openAppenders.Inc() > 1 {
ret.err = errors.New("teststorage.Appendable.AppenderV2() concurrent use is not supported; attempted opening new AppenderV2() without Commit/Rollback of the previous one. Extend the implementation if concurrent mock is needed")
}
if a.nextV2 != nil {
ret.next = a.nextV2.AppenderV2(ctx)
} else if a.next != nil {
ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.AppenderV2() invoked with .Then but no .ThenV2 was supplied; likely bug"))
}
return ret
}
func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (_ storage.SeriesRef, err error) {
if err := a.checkErr(); err != nil {
return 0, err
}
if a.a.appendErrFn != nil {
if err := a.a.appendErrFn(ls); err != nil {
return 0, err
}
}
a.a.mtx.Lock()
var es []exemplar.Exemplar
if len(opts.Exemplars) > 0 {
// As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse.
es = make([]exemplar.Exemplar, len(opts.Exemplars))
copy(es, opts.Exemplars)
}
a.a.pendingSamples = append(a.a.pendingSamples, Sample{
MF: opts.MetricFamilyName,
M: opts.Metadata,
L: ls,
ST: st, T: t,
V: v, H: h, FH: fh,
ES: es,
})
a.a.mtx.Unlock()
var partialErr error
if a.a.appendExemplarsError != nil {
var exErrs []error
for range opts.Exemplars {
exErrs = append(exErrs, a.a.appendExemplarsError)
}
if len(exErrs) > 0 {
partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs}
}
}
if a.next != nil {
ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts)
if err != nil {
return ref, err
}
}
ref, err = computeOrCheckRef(ref, ls)
if err != nil {
return ref, err
}
return ref, partialErr
}

View file

@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/testutil"
)
@ -129,3 +130,40 @@ func TestConcurrentAppender_ReturnsErrAppender(t *testing.T) {
require.Error(t, app.Commit())
require.Error(t, app.Rollback())
}
func TestConcurrentAppenderV2_ReturnsErrAppender(t *testing.T) {
a := NewAppendable()
// Non-concurrent multiple use if fine.
app := a.AppenderV2(t.Context())
require.Equal(t, int32(1), a.openAppenders.Load())
require.NoError(t, app.Commit())
// Repeated commit fails.
require.Error(t, app.Commit())
app = a.AppenderV2(t.Context())
require.NoError(t, app.Rollback())
// Commit after rollback fails.
require.Error(t, app.Commit())
a.WithErrs(
nil,
nil,
errors.New("commit err"),
)
app = a.AppenderV2(t.Context())
require.Error(t, app.Commit())
a.WithErrs(nil, nil, nil)
app = a.AppenderV2(t.Context())
require.NoError(t, app.Commit())
require.Equal(t, int32(0), a.openAppenders.Load())
// Concurrent use should return appender that errors.
_ = a.AppenderV2(t.Context())
app = a.AppenderV2(t.Context())
_, err := app.Append(0, labels.EmptyLabels(), 0, 0, 0, nil, nil, storage.AOptions{})
require.Error(t, err)
require.Error(t, app.Commit())
require.Error(t, app.Rollback())
}