From f61a83bcd69f4a2e728b55472ecbddd5965583aa Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 15 Jan 2026 14:14:31 +0000 Subject: [PATCH] fix: appenderV2 mock exemplar appendErr injection before appending it Signed-off-by: bwplotka --- storage/fanout.go | 9 ++++++--- storage/fanout_test.go | 24 ++++++++++++------------ storage/interface_append.go | 25 +++++++++++++++++++++++++ util/teststorage/appender.go | 36 ++++++++++++++++++++---------------- 4 files changed, 63 insertions(+), 31 deletions(-) diff --git a/storage/fanout.go b/storage/fanout.go index 95c6499952..9baa31d9af 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -301,16 +301,19 @@ type fanoutAppenderV2 struct { func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) { ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts) - if err != nil { + var partialErr AppendPartialError + if partialErr.Handle(err) != nil { return ref, err } for _, appender := range f.secondaries { if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil { - return 0, err + if partialErr.Handle(err) != nil { + return ref, err + } } } - return ref, nil + return ref, partialErr.ErrOrNil() } func (f *fanoutAppenderV2) Commit() (err error) { diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 92a20a3ecb..25f61341cd 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -21,17 +21,14 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "github.com/prometheus/prometheus/util/testutil" - - "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/teststorage" + "github.com/prometheus/prometheus/util/testutil" ) func TestFanout_SelectSorted(t *testing.T) { @@ -416,7 +413,7 @@ func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase { }, { name: "primary errors", - primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr), + primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr), secondary: teststorage.NewAppendable(), expectAppendErr: true, @@ -424,9 +421,9 @@ func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase { expectCommitError: true, }, { - name: "primary exemplar errors", - primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return nil }, exErr, nil), - secondary: teststorage.NewAppendable(), + name: "exemplar errors", + primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil), + secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil), expectAppendErr: false, expectExemplarError: true, @@ -438,7 +435,7 @@ func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase { { name: "secondary errors", primary: teststorage.NewAppendable(), - secondary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr), + secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr), expectAppendErr: true, expectExemplarError: true, @@ -533,9 +530,12 @@ func TestFanoutAppenderV2(t *testing.T) { case tt.expectAppendErr: require.Error(t, err) case tt.expectExemplarError: - pErr := &storage.AppendPartialError{} + var pErr *storage.AppendPartialError require.ErrorAs(t, err, &pErr) - require.Len(t, pErr.ExemplarErrors, 1) + // One for primary, one for secondary. + // This is because in V2 flow we must append sample even when first append partially failed with exemplars. + // Filtering out exemplars is neither feasible, nor important. + require.Len(t, pErr.ExemplarErrors, 2) default: require.NoError(t, err) } diff --git a/storage/interface_append.go b/storage/interface_append.go index f2dce8e52e..aa4ae84152 100644 --- a/storage/interface_append.go +++ b/storage/interface_append.go @@ -97,6 +97,31 @@ func (e *AppendPartialError) Error() string { return errs.Error() } +// ErrOrNil returns AppendPartialError as error, returning nil +// if there are no errors. +func (e *AppendPartialError) ErrOrNil() error { + if len(e.ExemplarErrors) == 0 { + return nil + } + return e +} + +// Handle handles the given err that may be an AppendPartialError. +// If the err is nil or not an AppendPartialError it returns err. +// Otherwise, partial errors are aggregated. +func (e *AppendPartialError) Handle(err error) error { + if err == nil { + return nil + } + + var pErr *AppendPartialError + if !errors.As(err, &pErr) { + return err + } + e.ExemplarErrors = append(e.ExemplarErrors, pErr.ExemplarErrors...) + return nil +} + var _ error = &AppendPartialError{} // AppenderV2 provides appends against a storage for all types of samples. diff --git a/util/teststorage/appender.go b/util/teststorage/appender.go index fff9833b60..55cb727ee0 100644 --- a/util/teststorage/appender.go +++ b/util/teststorage/appender.go @@ -464,13 +464,28 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64 } } - a.a.mtx.Lock() - var es []exemplar.Exemplar + var ( + es []exemplar.Exemplar + partialErr error + ) + if len(opts.Exemplars) > 0 { - // As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse. - es = make([]exemplar.Exemplar, len(opts.Exemplars)) - copy(es, opts.Exemplars) + if a.a.appendExemplarsError != nil { + var exErrs []error + for range opts.Exemplars { + exErrs = append(exErrs, a.a.appendExemplarsError) + } + if len(exErrs) > 0 { + partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs} + } + } else { + // As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse. + es = make([]exemplar.Exemplar, len(opts.Exemplars)) + copy(es, opts.Exemplars) + } } + + a.a.mtx.Lock() a.a.pendingSamples = append(a.a.pendingSamples, Sample{ MF: opts.MetricFamilyName, M: opts.Metadata, @@ -481,17 +496,6 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64 }) a.a.mtx.Unlock() - var partialErr error - if a.a.appendExemplarsError != nil { - var exErrs []error - for range opts.Exemplars { - exErrs = append(exErrs, a.a.appendExemplarsError) - } - if len(exErrs) > 0 { - partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs} - } - } - if a.next != nil { ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts) if err != nil {