diff --git a/compliance/remote_write_sender_test.go b/compliance/remote_write_sender_test.go index 6840132bd3..9822e2d3e6 100644 --- a/compliance/remote_write_sender_test.go +++ b/compliance/remote_write_sender_test.go @@ -53,7 +53,9 @@ scrape_configs: var scrapeConfigTmpl = template.Must(template.New("config").Parse(scrapeConfigTemplate)) -type internalPrometheus struct{} +type internalPrometheus struct { + agentMode bool +} func (p internalPrometheus) Name() string { return "internal-prometheus" } @@ -74,20 +76,33 @@ func (p internalPrometheus) Run(ctx context.Context, opts sender.Options) error } defer os.RemoveAll(dir) - return sender.RunCommand(ctx, "../cmd/prometheus", nil, - "go", "run", ".", + args := []string{ + "run", ".", "--web.listen-address=0.0.0.0:0", - fmt.Sprintf("--storage.tsdb.path=%v", dir), fmt.Sprintf("--config.file=%s", configFile), // Set important flags for the full remote write compliance: "--enable-feature=st-storage", - ) + } + if p.agentMode { + args = append(args, fmt.Sprintf("--storage.agent.path=%v", dir), "--agent") + } else { + args = append(args, fmt.Sprintf("--storage.tsdb.path=%v", dir)) + } + return sender.RunCommand(ctx, "../cmd/prometheus", nil, "go", args...) } var _ sender.Sender = internalPrometheus{} // TestRemoteWriteSender runs remote write sender compliance tests defined in -// https://github.com/prometheus/compliance/tree/main/remotewrite/sender +// https://github.com/prometheus/compliance/tree/main/remotewrite/sender against +// both agent and server modes. func TestRemoteWriteSender(t *testing.T) { - sender.RunTests(t, internalPrometheus{}, sender.ComplianceTests()) + t.Run("mode=server", func(t *testing.T) { + t.Parallel() + sender.RunTests(t, internalPrometheus{}, sender.ComplianceTests()) + }) + t.Run("mode=agent", func(t *testing.T) { + t.Parallel() + sender.RunTests(t, internalPrometheus{agentMode: true}, sender.ComplianceTests()) + }) } diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 3f79d9176a..a5d0879ed9 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -95,7 +95,9 @@ type Options struct { // EnableSTStorage determines whether agent DB should write a Start Timestamp (ST) // per sample to WAL. - // TODO(bwplotka): Implement this option as per PROM-60, currently it's noop. + // Controlled by the `--enable-feature=st-storage` CLI flag; when enabled, ST is + // persisted to the WAL for samples that include a non-zero start timestamp in + // supported record types. EnableSTStorage bool } diff --git a/tsdb/agent/db_append_v2.go b/tsdb/agent/db_append_v2.go index bb2601e1e3..b963608637 100644 --- a/tsdb/agent/db_append_v2.go +++ b/tsdb/agent/db_append_v2.go @@ -72,7 +72,6 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64 lastTS := s.lastTs s.Unlock() - // TODO(bwplotka): Handle ST natively (as per PROM-60). if a.opts.EnableSTAsZeroSample && st != 0 { a.bestEffortAppendSTZeroSample(s, ls, lastTS, st, t, h, fh) } @@ -86,6 +85,7 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64 case fh != nil: isStale = value.IsStaleNaN(fh.Sum) // NOTE: always modify pendingFloatHistograms and floatHistogramSeries together + // TODO(krajorama,ywwg,bwplotka): Pass ST when available in WAL. a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{ Ref: s.ref, T: t, @@ -95,6 +95,7 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64 case h != nil: isStale = value.IsStaleNaN(h.Sum) // NOTE: always modify pendingHistograms and histogramSeries together + // TODO(krajorama,ywwg,bwplotka): Pass ST when available in WAL. a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{ Ref: s.ref, T: t, @@ -107,6 +108,7 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64 // NOTE: always modify pendingSamples and sampleSeries together. a.pendingSamples = append(a.pendingSamples, record.RefSample{ Ref: s.ref, + ST: st, T: t, V: v, }) diff --git a/tsdb/agent/db_append_v2_test.go b/tsdb/agent/db_append_v2_test.go index cbe9b09374..92a5bb8f35 100644 --- a/tsdb/agent/db_append_v2_test.go +++ b/tsdb/agent/db_append_v2_test.go @@ -90,6 +90,9 @@ func TestDB_InvalidSeries_AppendV2(t *testing.T) { }) } +// TestCommit_AppendV2 tests Appender commit. +// TODO(bwplotka): Rewrite this so Refs are generated, then appended, then expected so we test the +// exact data durability. func TestCommit_AppendV2(t *testing.T) { const ( numDatapoints = 1000 @@ -102,15 +105,24 @@ func TestCommit_AppendV2(t *testing.T) { opts.EnableSTStorage = enableSTStorage s := createTestAgentDB(t, nil, opts) - app := s.AppenderV2(context.TODO()) + var ( + expectedSampleSTs []int64 + gotSampleSTs []int64 + ) + if enableSTStorage { + expectedSampleSTs = make([]int64, 0, numSeries*numDatapoints) + gotSampleSTs = make([]int64, 0, numSeries*numDatapoints) + } + app := s.AppenderV2(t.Context()) lbls := labelsForTest(t.Name(), numSeries) for _, l := range lbls { lset := labels.New(l...) for i := range numDatapoints { sample := chunks.GenerateSamples(0, 1) - _, err := app.Append(0, lset, int64(i), sample[0].T()+2000, sample[0].F(), nil, nil, storage.AOptions{ + st := int64(i + 1234) + _, err := app.Append(0, lset, st, sample[0].T()+2000, sample[0].F(), nil, nil, storage.AOptions{ Exemplars: []exemplar.Exemplar{{ Labels: lset, Ts: sample[0].T() + int64(i) + 2000, @@ -119,6 +131,9 @@ func TestCommit_AppendV2(t *testing.T) { }}, }) require.NoError(t, err) + if enableSTStorage { + expectedSampleSTs = append(expectedSampleSTs, st) + } } } @@ -129,7 +144,7 @@ func TestCommit_AppendV2(t *testing.T) { histograms := tsdbutil.GenerateTestHistograms(numHistograms) for i := range numHistograms { - _, err := app.Append(0, lset, int64(i), int64(i+2000), 0, histograms[i], nil, storage.AOptions{}) + _, err := app.Append(0, lset, int64(i+2234), int64(i+2000), 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -141,7 +156,7 @@ func TestCommit_AppendV2(t *testing.T) { customBucketHistograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) for i := range numHistograms { - _, err := app.Append(0, lset, int64(i), int64(i+2000), 0, customBucketHistograms[i], nil, storage.AOptions{}) + _, err := app.Append(0, lset, int64(i+3234), int64(i+2000), 0, customBucketHistograms[i], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -153,7 +168,7 @@ func TestCommit_AppendV2(t *testing.T) { floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) for i := range numHistograms { - _, err := app.Append(0, lset, int64(i), int64(i+2000), 0, nil, floatHistograms[i], storage.AOptions{}) + _, err := app.Append(0, lset, int64(i+4234), int64(i+2000), 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } } @@ -165,7 +180,7 @@ func TestCommit_AppendV2(t *testing.T) { customBucketFloatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) for i := range numHistograms { - _, err := app.Append(0, lset, int64(i), int64(i+2000), 0, nil, customBucketFloatHistograms[i], storage.AOptions{}) + _, err := app.Append(0, lset, int64(i+5234), int64(i+2000), 0, nil, customBucketFloatHistograms[i], storage.AOptions{}) require.NoError(t, err) } } @@ -203,7 +218,6 @@ func TestCommit_AppendV2(t *testing.T) { samples, err = dec.Samples(rec, samples) require.NoError(t, err) walSamplesCount += len(samples) - case record.SamplesV2: if !enableSTStorage { t.Errorf("Got V2 Samples when ST disabled") @@ -211,6 +225,10 @@ func TestCommit_AppendV2(t *testing.T) { var samples []record.RefSample samples, err = dec.Samples(rec, samples) require.NoError(t, err) + + for _, s := range samples { + gotSampleSTs = append(gotSampleSTs, s.ST) + } walSamplesCount += len(samples) case record.HistogramSamples, record.CustomBucketsHistogramSamples: @@ -238,13 +256,15 @@ func TestCommit_AppendV2(t *testing.T) { // Check that the WAL contained the same number of committed series/samples/exemplars. require.Equal(t, numSeries*5, walSeriesCount, "unexpected number of series") require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples") + require.Equal(t, expectedSampleSTs, gotSampleSTs, "unexpected STs received") require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars") require.Equal(t, numSeries*numHistograms*2, walHistogramCount, "unexpected number of histograms") require.Equal(t, numSeries*numHistograms*2, walFloatHistogramCount, "unexpected number of float histograms") - // Check that we can still create both kinds of Appender - see https://github.com/prometheus/prometheus/issues/17800. - _ = s.Appender(context.TODO()) - _ = s.AppenderV2(context.TODO()) + // Check that we can still create both kinds of Appender. + // Regression test against https://github.com/prometheus/prometheus/issues/17800. + _ = s.Appender(t.Context()) + _ = s.AppenderV2(t.Context()) }) } }