diff --git a/storage/fanout_test.go b/storage/fanout_test.go index fb2f8dd553..92a20a3ecb 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -21,6 +21,12 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/util/testutil" + + "github.com/prometheus/prometheus/tsdb/tsdbutil" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -355,3 +361,213 @@ func (errQuerier) Close() error { return nil } func (errChunkQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet { return storage.ErrChunkSeriesSet(errSelect) } + +type mockStorage struct { + app storage.Appendable + appV2 storage.AppendableV2 + storage.Storage +} + +func (m mockStorage) Appender(ctx context.Context) storage.Appender { + return m.app.Appender(ctx) +} + +func (m mockStorage) AppenderV2(ctx context.Context) storage.AppenderV2 { + return m.appV2.AppenderV2(ctx) +} + +type sample = teststorage.Sample + +func withoutExemplars(s []sample) (ret []sample) { + ret = make([]sample, len(s)) + copy(ret, s) + for i := range ret { + ret[i].ES = nil + } + return ret +} + +type fanoutAppenderTestCase struct { + name string + primary *teststorage.Appendable + secondary *teststorage.Appendable + + expectAppendErr bool + expectExemplarError bool + expectCommitError bool + + expectPrimarySamples []sample + expectSecondarySamples []sample +} + +func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase { + appErr := errors.New("append test error") + exErr := errors.New("exemplar test error") + commitErr := errors.New("commit test error") + + return []fanoutAppenderTestCase{ + { + name: "both works", + primary: teststorage.NewAppendable(), + secondary: teststorage.NewAppendable(), + + expectPrimarySamples: expected, + expectSecondarySamples: expected, + }, + { + name: "primary errors", + primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr), + secondary: teststorage.NewAppendable(), + + expectAppendErr: true, + expectExemplarError: true, + expectCommitError: true, + }, + { + name: "primary exemplar errors", + primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return nil }, exErr, nil), + secondary: teststorage.NewAppendable(), + + expectAppendErr: false, + expectExemplarError: true, + expectCommitError: false, + + expectPrimarySamples: withoutExemplars(expected), + expectSecondarySamples: withoutExemplars(expected), + }, + { + name: "secondary errors", + primary: teststorage.NewAppendable(), + secondary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr), + + expectAppendErr: true, + expectExemplarError: true, + expectCommitError: true, + + expectPrimarySamples: expected, + }, + } +} + +func TestFanoutAppender(t *testing.T) { + h := tsdbutil.GenerateTestHistogram(0) + fh := tsdbutil.GenerateTestFloatHistogram(0) + ex := exemplar.Exemplar{Value: 1} + + expected := []sample{ + {L: labels.FromStrings(model.MetricNameLabel, "metric1"), V: 1, ES: []exemplar.Exemplar{ex}}, + {L: labels.FromStrings(model.MetricNameLabel, "metric2"), T: 1, H: h}, + {L: labels.FromStrings(model.MetricNameLabel, "metric3"), T: 2, FH: fh}, + } + for _, tt := range fanoutAppenderTestCases(expected) { + t.Run(tt.name, func(t *testing.T) { + f := storage.NewFanout(nil, mockStorage{app: tt.primary}, mockStorage{app: tt.secondary}) + + app := f.Appender(t.Context()) + ref, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), 0, 1) + if tt.expectAppendErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + _, err = app.AppendExemplar(ref, labels.FromStrings(model.MetricNameLabel, "metric1"), ex) + if tt.expectExemplarError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + _, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric2"), 1, h, nil) + if tt.expectAppendErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + _, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric3"), 2, nil, fh) + if tt.expectAppendErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + err = app.Commit() + if tt.expectCommitError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + require.Nil(t, tt.primary.PendingSamples()) + testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples()) + require.Nil(t, tt.primary.RolledbackSamples()) + + require.Nil(t, tt.secondary.PendingSamples()) + testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples()) + require.Nil(t, tt.secondary.RolledbackSamples()) + }) + } +} + +func TestFanoutAppenderV2(t *testing.T) { + h := tsdbutil.GenerateTestHistogram(0) + fh := tsdbutil.GenerateTestFloatHistogram(0) + ex := exemplar.Exemplar{Value: 1} + + expected := []sample{ + {L: labels.FromStrings(model.MetricNameLabel, "metric1"), ST: -1, V: 1, ES: []exemplar.Exemplar{ex}}, + {L: labels.FromStrings(model.MetricNameLabel, "metric2"), ST: -2, T: 1, H: h}, + {L: labels.FromStrings(model.MetricNameLabel, "metric3"), ST: -3, T: 2, FH: fh}, + } + + for _, tt := range fanoutAppenderTestCases(expected) { + t.Run(tt.name, func(t *testing.T) { + f := storage.NewFanout(nil, mockStorage{appV2: tt.primary}, mockStorage{appV2: tt.secondary}) + + app := f.AppenderV2(t.Context()) + _, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), -1, 0, 1, nil, nil, storage.AOptions{ + Exemplars: []exemplar.Exemplar{ex}, + }) + switch { + case tt.expectAppendErr: + require.Error(t, err) + case tt.expectExemplarError: + pErr := &storage.AppendPartialError{} + require.ErrorAs(t, err, &pErr) + require.Len(t, pErr.ExemplarErrors, 1) + default: + require.NoError(t, err) + } + + _, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric2"), -2, 1, 0, h, nil, storage.AOptions{}) + if tt.expectAppendErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + _, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric3"), -3, 2, 0, nil, fh, storage.AOptions{}) + if tt.expectAppendErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + err = app.Commit() + if tt.expectCommitError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + require.Nil(t, tt.primary.PendingSamples()) + testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples()) + require.Nil(t, tt.primary.RolledbackSamples()) + + require.Nil(t, tt.secondary.PendingSamples()) + testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples()) + require.Nil(t, tt.secondary.RolledbackSamples()) + }) + } +} diff --git a/util/teststorage/appender.go b/util/teststorage/appender.go index a98ff9c48f..fff9833b60 100644 --- a/util/teststorage/appender.go +++ b/util/teststorage/appender.go @@ -374,7 +374,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem // with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632 i := len(a.a.pendingSamples) - 1 for ; i >= 0; i-- { // Attach exemplars to the last matching sample. - if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) { + if labels.Equal(l, a.a.pendingSamples[i].L) { a.a.pendingSamples[i].ES = append(a.a.pendingSamples[i].ES, e) appended = true break @@ -415,7 +415,7 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta // with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632 i := len(a.a.pendingSamples) - 1 for ; i >= 0; i-- { // Attach metadata to the last matching sample. - if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) { + if labels.Equal(l, a.a.pendingSamples[i].L) { a.a.pendingSamples[i].M = m updated = true break