mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-18 18:25:24 -05:00
fix: appenderV2 mock exemplar appendErr injection before appending it
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
1d3c6210f3
commit
f61a83bcd6
4 changed files with 63 additions and 31 deletions
|
|
@ -301,16 +301,19 @@ type fanoutAppenderV2 struct {
|
|||
|
||||
func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) {
|
||||
ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts)
|
||||
if err != nil {
|
||||
var partialErr AppendPartialError
|
||||
if partialErr.Handle(err) != nil {
|
||||
return ref, err
|
||||
}
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil {
|
||||
return 0, err
|
||||
if partialErr.Handle(err) != nil {
|
||||
return ref, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return ref, nil
|
||||
return ref, partialErr.ErrOrNil()
|
||||
}
|
||||
|
||||
func (f *fanoutAppenderV2) Commit() (err error) {
|
||||
|
|
|
|||
|
|
@ -21,17 +21,14 @@ import (
|
|||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
func TestFanout_SelectSorted(t *testing.T) {
|
||||
|
|
@ -416,7 +413,7 @@ func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase {
|
|||
},
|
||||
{
|
||||
name: "primary errors",
|
||||
primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr),
|
||||
primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr),
|
||||
secondary: teststorage.NewAppendable(),
|
||||
|
||||
expectAppendErr: true,
|
||||
|
|
@ -424,9 +421,9 @@ func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase {
|
|||
expectCommitError: true,
|
||||
},
|
||||
{
|
||||
name: "primary exemplar errors",
|
||||
primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return nil }, exErr, nil),
|
||||
secondary: teststorage.NewAppendable(),
|
||||
name: "exemplar errors",
|
||||
primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil),
|
||||
secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil),
|
||||
|
||||
expectAppendErr: false,
|
||||
expectExemplarError: true,
|
||||
|
|
@ -438,7 +435,7 @@ func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase {
|
|||
{
|
||||
name: "secondary errors",
|
||||
primary: teststorage.NewAppendable(),
|
||||
secondary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr),
|
||||
secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr),
|
||||
|
||||
expectAppendErr: true,
|
||||
expectExemplarError: true,
|
||||
|
|
@ -533,9 +530,12 @@ func TestFanoutAppenderV2(t *testing.T) {
|
|||
case tt.expectAppendErr:
|
||||
require.Error(t, err)
|
||||
case tt.expectExemplarError:
|
||||
pErr := &storage.AppendPartialError{}
|
||||
var pErr *storage.AppendPartialError
|
||||
require.ErrorAs(t, err, &pErr)
|
||||
require.Len(t, pErr.ExemplarErrors, 1)
|
||||
// One for primary, one for secondary.
|
||||
// This is because in V2 flow we must append sample even when first append partially failed with exemplars.
|
||||
// Filtering out exemplars is neither feasible, nor important.
|
||||
require.Len(t, pErr.ExemplarErrors, 2)
|
||||
default:
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -97,6 +97,31 @@ func (e *AppendPartialError) Error() string {
|
|||
return errs.Error()
|
||||
}
|
||||
|
||||
// ErrOrNil returns AppendPartialError as error, returning nil
|
||||
// if there are no errors.
|
||||
func (e *AppendPartialError) ErrOrNil() error {
|
||||
if len(e.ExemplarErrors) == 0 {
|
||||
return nil
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// Handle handles the given err that may be an AppendPartialError.
|
||||
// If the err is nil or not an AppendPartialError it returns err.
|
||||
// Otherwise, partial errors are aggregated.
|
||||
func (e *AppendPartialError) Handle(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var pErr *AppendPartialError
|
||||
if !errors.As(err, &pErr) {
|
||||
return err
|
||||
}
|
||||
e.ExemplarErrors = append(e.ExemplarErrors, pErr.ExemplarErrors...)
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ error = &AppendPartialError{}
|
||||
|
||||
// AppenderV2 provides appends against a storage for all types of samples.
|
||||
|
|
|
|||
|
|
@ -464,13 +464,28 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
|
|||
}
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
var es []exemplar.Exemplar
|
||||
var (
|
||||
es []exemplar.Exemplar
|
||||
partialErr error
|
||||
)
|
||||
|
||||
if len(opts.Exemplars) > 0 {
|
||||
// As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse.
|
||||
es = make([]exemplar.Exemplar, len(opts.Exemplars))
|
||||
copy(es, opts.Exemplars)
|
||||
if a.a.appendExemplarsError != nil {
|
||||
var exErrs []error
|
||||
for range opts.Exemplars {
|
||||
exErrs = append(exErrs, a.a.appendExemplarsError)
|
||||
}
|
||||
if len(exErrs) > 0 {
|
||||
partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs}
|
||||
}
|
||||
} else {
|
||||
// As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse.
|
||||
es = make([]exemplar.Exemplar, len(opts.Exemplars))
|
||||
copy(es, opts.Exemplars)
|
||||
}
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.pendingSamples = append(a.a.pendingSamples, Sample{
|
||||
MF: opts.MetricFamilyName,
|
||||
M: opts.Metadata,
|
||||
|
|
@ -481,17 +496,6 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
|
|||
})
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
var partialErr error
|
||||
if a.a.appendExemplarsError != nil {
|
||||
var exErrs []error
|
||||
for range opts.Exemplars {
|
||||
exErrs = append(exErrs, a.a.appendExemplarsError)
|
||||
}
|
||||
if len(exErrs) > 0 {
|
||||
partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs}
|
||||
}
|
||||
}
|
||||
|
||||
if a.next != nil {
|
||||
ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts)
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in a new issue