add extra fanout test

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-01-15 13:12:17 +00:00
parent 8a2921e385
commit 1d3c6210f3
2 changed files with 218 additions and 2 deletions

View file

@ -21,6 +21,12 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/util/testutil"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
@ -355,3 +361,213 @@ func (errQuerier) Close() error { return nil }
func (errChunkQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet {
return storage.ErrChunkSeriesSet(errSelect)
}
type mockStorage struct {
app storage.Appendable
appV2 storage.AppendableV2
storage.Storage
}
func (m mockStorage) Appender(ctx context.Context) storage.Appender {
return m.app.Appender(ctx)
}
func (m mockStorage) AppenderV2(ctx context.Context) storage.AppenderV2 {
return m.appV2.AppenderV2(ctx)
}
type sample = teststorage.Sample
func withoutExemplars(s []sample) (ret []sample) {
ret = make([]sample, len(s))
copy(ret, s)
for i := range ret {
ret[i].ES = nil
}
return ret
}
type fanoutAppenderTestCase struct {
name string
primary *teststorage.Appendable
secondary *teststorage.Appendable
expectAppendErr bool
expectExemplarError bool
expectCommitError bool
expectPrimarySamples []sample
expectSecondarySamples []sample
}
func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase {
appErr := errors.New("append test error")
exErr := errors.New("exemplar test error")
commitErr := errors.New("commit test error")
return []fanoutAppenderTestCase{
{
name: "both works",
primary: teststorage.NewAppendable(),
secondary: teststorage.NewAppendable(),
expectPrimarySamples: expected,
expectSecondarySamples: expected,
},
{
name: "primary errors",
primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr),
secondary: teststorage.NewAppendable(),
expectAppendErr: true,
expectExemplarError: true,
expectCommitError: true,
},
{
name: "primary exemplar errors",
primary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return nil }, exErr, nil),
secondary: teststorage.NewAppendable(),
expectAppendErr: false,
expectExemplarError: true,
expectCommitError: false,
expectPrimarySamples: withoutExemplars(expected),
expectSecondarySamples: withoutExemplars(expected),
},
{
name: "secondary errors",
primary: teststorage.NewAppendable(),
secondary: teststorage.NewAppendable().WithErrs(func(ls labels.Labels) error { return appErr }, exErr, commitErr),
expectAppendErr: true,
expectExemplarError: true,
expectCommitError: true,
expectPrimarySamples: expected,
},
}
}
func TestFanoutAppender(t *testing.T) {
h := tsdbutil.GenerateTestHistogram(0)
fh := tsdbutil.GenerateTestFloatHistogram(0)
ex := exemplar.Exemplar{Value: 1}
expected := []sample{
{L: labels.FromStrings(model.MetricNameLabel, "metric1"), V: 1, ES: []exemplar.Exemplar{ex}},
{L: labels.FromStrings(model.MetricNameLabel, "metric2"), T: 1, H: h},
{L: labels.FromStrings(model.MetricNameLabel, "metric3"), T: 2, FH: fh},
}
for _, tt := range fanoutAppenderTestCases(expected) {
t.Run(tt.name, func(t *testing.T) {
f := storage.NewFanout(nil, mockStorage{app: tt.primary}, mockStorage{app: tt.secondary})
app := f.Appender(t.Context())
ref, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), 0, 1)
if tt.expectAppendErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = app.AppendExemplar(ref, labels.FromStrings(model.MetricNameLabel, "metric1"), ex)
if tt.expectExemplarError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric2"), 1, h, nil)
if tt.expectAppendErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric3"), 2, nil, fh)
if tt.expectAppendErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
err = app.Commit()
if tt.expectCommitError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Nil(t, tt.primary.PendingSamples())
testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples())
require.Nil(t, tt.primary.RolledbackSamples())
require.Nil(t, tt.secondary.PendingSamples())
testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples())
require.Nil(t, tt.secondary.RolledbackSamples())
})
}
}
func TestFanoutAppenderV2(t *testing.T) {
h := tsdbutil.GenerateTestHistogram(0)
fh := tsdbutil.GenerateTestFloatHistogram(0)
ex := exemplar.Exemplar{Value: 1}
expected := []sample{
{L: labels.FromStrings(model.MetricNameLabel, "metric1"), ST: -1, V: 1, ES: []exemplar.Exemplar{ex}},
{L: labels.FromStrings(model.MetricNameLabel, "metric2"), ST: -2, T: 1, H: h},
{L: labels.FromStrings(model.MetricNameLabel, "metric3"), ST: -3, T: 2, FH: fh},
}
for _, tt := range fanoutAppenderTestCases(expected) {
t.Run(tt.name, func(t *testing.T) {
f := storage.NewFanout(nil, mockStorage{appV2: tt.primary}, mockStorage{appV2: tt.secondary})
app := f.AppenderV2(t.Context())
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), -1, 0, 1, nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{ex},
})
switch {
case tt.expectAppendErr:
require.Error(t, err)
case tt.expectExemplarError:
pErr := &storage.AppendPartialError{}
require.ErrorAs(t, err, &pErr)
require.Len(t, pErr.ExemplarErrors, 1)
default:
require.NoError(t, err)
}
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric2"), -2, 1, 0, h, nil, storage.AOptions{})
if tt.expectAppendErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric3"), -3, 2, 0, nil, fh, storage.AOptions{})
if tt.expectAppendErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
err = app.Commit()
if tt.expectCommitError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Nil(t, tt.primary.PendingSamples())
testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples())
require.Nil(t, tt.primary.RolledbackSamples())
require.Nil(t, tt.secondary.PendingSamples())
testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples())
require.Nil(t, tt.secondary.RolledbackSamples())
})
}
}

View file

@ -374,7 +374,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
// with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632
i := len(a.a.pendingSamples) - 1
for ; i >= 0; i-- { // Attach exemplars to the last matching sample.
if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) {
if labels.Equal(l, a.a.pendingSamples[i].L) {
a.a.pendingSamples[i].ES = append(a.a.pendingSamples[i].ES, e)
appended = true
break
@ -415,7 +415,7 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta
// with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632
i := len(a.a.pendingSamples) - 1
for ; i >= 0; i-- { // Attach metadata to the last matching sample.
if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) {
if labels.Equal(l, a.a.pendingSamples[i].L) {
a.a.pendingSamples[i].M = m
updated = true
break