mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-28 04:02:21 -04:00
Merge pull request #18270 from prometheus/agentrw
feat(agent): fix ST append; add compliance RW sender test for agent
This commit is contained in:
commit
a3217fe94f
4 changed files with 58 additions and 19 deletions
|
|
@ -53,7 +53,9 @@ scrape_configs:
|
|||
|
||||
var scrapeConfigTmpl = template.Must(template.New("config").Parse(scrapeConfigTemplate))
|
||||
|
||||
type internalPrometheus struct{}
|
||||
type internalPrometheus struct {
|
||||
agentMode bool
|
||||
}
|
||||
|
||||
func (p internalPrometheus) Name() string { return "internal-prometheus" }
|
||||
|
||||
|
|
@ -74,20 +76,33 @@ func (p internalPrometheus) Run(ctx context.Context, opts sender.Options) error
|
|||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
return sender.RunCommand(ctx, "../cmd/prometheus", nil,
|
||||
"go", "run", ".",
|
||||
args := []string{
|
||||
"run", ".",
|
||||
"--web.listen-address=0.0.0.0:0",
|
||||
fmt.Sprintf("--storage.tsdb.path=%v", dir),
|
||||
fmt.Sprintf("--config.file=%s", configFile),
|
||||
// Set important flags for the full remote write compliance:
|
||||
"--enable-feature=st-storage",
|
||||
)
|
||||
}
|
||||
if p.agentMode {
|
||||
args = append(args, fmt.Sprintf("--storage.agent.path=%v", dir), "--agent")
|
||||
} else {
|
||||
args = append(args, fmt.Sprintf("--storage.tsdb.path=%v", dir))
|
||||
}
|
||||
return sender.RunCommand(ctx, "../cmd/prometheus", nil, "go", args...)
|
||||
}
|
||||
|
||||
var _ sender.Sender = internalPrometheus{}
|
||||
|
||||
// TestRemoteWriteSender runs remote write sender compliance tests defined in
|
||||
// https://github.com/prometheus/compliance/tree/main/remotewrite/sender
|
||||
// https://github.com/prometheus/compliance/tree/main/remotewrite/sender against
|
||||
// both agent and server modes.
|
||||
func TestRemoteWriteSender(t *testing.T) {
|
||||
sender.RunTests(t, internalPrometheus{}, sender.ComplianceTests())
|
||||
t.Run("mode=server", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
sender.RunTests(t, internalPrometheus{}, sender.ComplianceTests())
|
||||
})
|
||||
t.Run("mode=agent", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
sender.RunTests(t, internalPrometheus{agentMode: true}, sender.ComplianceTests())
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -95,7 +95,9 @@ type Options struct {
|
|||
|
||||
// EnableSTStorage determines whether agent DB should write a Start Timestamp (ST)
|
||||
// per sample to WAL.
|
||||
// TODO(bwplotka): Implement this option as per PROM-60, currently it's noop.
|
||||
// Controlled by the `--enable-feature=st-storage` CLI flag; when enabled, ST is
|
||||
// persisted to the WAL for samples that include a non-zero start timestamp in
|
||||
// supported record types.
|
||||
EnableSTStorage bool
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -72,7 +72,6 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
|
|||
lastTS := s.lastTs
|
||||
s.Unlock()
|
||||
|
||||
// TODO(bwplotka): Handle ST natively (as per PROM-60).
|
||||
if a.opts.EnableSTAsZeroSample && st != 0 {
|
||||
a.bestEffortAppendSTZeroSample(s, ls, lastTS, st, t, h, fh)
|
||||
}
|
||||
|
|
@ -86,6 +85,7 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
|
|||
case fh != nil:
|
||||
isStale = value.IsStaleNaN(fh.Sum)
|
||||
// NOTE: always modify pendingFloatHistograms and floatHistogramSeries together
|
||||
// TODO(krajorama,ywwg,bwplotka): Pass ST when available in WAL.
|
||||
a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{
|
||||
Ref: s.ref,
|
||||
T: t,
|
||||
|
|
@ -95,6 +95,7 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
|
|||
case h != nil:
|
||||
isStale = value.IsStaleNaN(h.Sum)
|
||||
// NOTE: always modify pendingHistograms and histogramSeries together
|
||||
// TODO(krajorama,ywwg,bwplotka): Pass ST when available in WAL.
|
||||
a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{
|
||||
Ref: s.ref,
|
||||
T: t,
|
||||
|
|
@ -107,6 +108,7 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
|
|||
// NOTE: always modify pendingSamples and sampleSeries together.
|
||||
a.pendingSamples = append(a.pendingSamples, record.RefSample{
|
||||
Ref: s.ref,
|
||||
ST: st,
|
||||
T: t,
|
||||
V: v,
|
||||
})
|
||||
|
|
|
|||
|
|
@ -90,6 +90,9 @@ func TestDB_InvalidSeries_AppendV2(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
// TestCommit_AppendV2 tests Appender commit.
|
||||
// TODO(bwplotka): Rewrite this so Refs are generated, then appended, then expected so we test the
|
||||
// exact data durability.
|
||||
func TestCommit_AppendV2(t *testing.T) {
|
||||
const (
|
||||
numDatapoints = 1000
|
||||
|
|
@ -102,15 +105,24 @@ func TestCommit_AppendV2(t *testing.T) {
|
|||
opts.EnableSTStorage = enableSTStorage
|
||||
s := createTestAgentDB(t, nil, opts)
|
||||
|
||||
app := s.AppenderV2(context.TODO())
|
||||
var (
|
||||
expectedSampleSTs []int64
|
||||
gotSampleSTs []int64
|
||||
)
|
||||
if enableSTStorage {
|
||||
expectedSampleSTs = make([]int64, 0, numSeries*numDatapoints)
|
||||
gotSampleSTs = make([]int64, 0, numSeries*numDatapoints)
|
||||
}
|
||||
|
||||
app := s.AppenderV2(t.Context())
|
||||
lbls := labelsForTest(t.Name(), numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
for i := range numDatapoints {
|
||||
sample := chunks.GenerateSamples(0, 1)
|
||||
_, err := app.Append(0, lset, int64(i), sample[0].T()+2000, sample[0].F(), nil, nil, storage.AOptions{
|
||||
st := int64(i + 1234)
|
||||
_, err := app.Append(0, lset, st, sample[0].T()+2000, sample[0].F(), nil, nil, storage.AOptions{
|
||||
Exemplars: []exemplar.Exemplar{{
|
||||
Labels: lset,
|
||||
Ts: sample[0].T() + int64(i) + 2000,
|
||||
|
|
@ -119,6 +131,9 @@ func TestCommit_AppendV2(t *testing.T) {
|
|||
}},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
if enableSTStorage {
|
||||
expectedSampleSTs = append(expectedSampleSTs, st)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -129,7 +144,7 @@ func TestCommit_AppendV2(t *testing.T) {
|
|||
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
|
||||
|
||||
for i := range numHistograms {
|
||||
_, err := app.Append(0, lset, int64(i), int64(i+2000), 0, histograms[i], nil, storage.AOptions{})
|
||||
_, err := app.Append(0, lset, int64(i+2234), int64(i+2000), 0, histograms[i], nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
|
@ -141,7 +156,7 @@ func TestCommit_AppendV2(t *testing.T) {
|
|||
customBucketHistograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
|
||||
|
||||
for i := range numHistograms {
|
||||
_, err := app.Append(0, lset, int64(i), int64(i+2000), 0, customBucketHistograms[i], nil, storage.AOptions{})
|
||||
_, err := app.Append(0, lset, int64(i+3234), int64(i+2000), 0, customBucketHistograms[i], nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
|
@ -153,7 +168,7 @@ func TestCommit_AppendV2(t *testing.T) {
|
|||
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
|
||||
|
||||
for i := range numHistograms {
|
||||
_, err := app.Append(0, lset, int64(i), int64(i+2000), 0, nil, floatHistograms[i], storage.AOptions{})
|
||||
_, err := app.Append(0, lset, int64(i+4234), int64(i+2000), 0, nil, floatHistograms[i], storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
|
@ -165,7 +180,7 @@ func TestCommit_AppendV2(t *testing.T) {
|
|||
customBucketFloatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
|
||||
|
||||
for i := range numHistograms {
|
||||
_, err := app.Append(0, lset, int64(i), int64(i+2000), 0, nil, customBucketFloatHistograms[i], storage.AOptions{})
|
||||
_, err := app.Append(0, lset, int64(i+5234), int64(i+2000), 0, nil, customBucketFloatHistograms[i], storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
|
@ -203,7 +218,6 @@ func TestCommit_AppendV2(t *testing.T) {
|
|||
samples, err = dec.Samples(rec, samples)
|
||||
require.NoError(t, err)
|
||||
walSamplesCount += len(samples)
|
||||
|
||||
case record.SamplesV2:
|
||||
if !enableSTStorage {
|
||||
t.Errorf("Got V2 Samples when ST disabled")
|
||||
|
|
@ -211,6 +225,10 @@ func TestCommit_AppendV2(t *testing.T) {
|
|||
var samples []record.RefSample
|
||||
samples, err = dec.Samples(rec, samples)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, s := range samples {
|
||||
gotSampleSTs = append(gotSampleSTs, s.ST)
|
||||
}
|
||||
walSamplesCount += len(samples)
|
||||
|
||||
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
|
||||
|
|
@ -238,13 +256,15 @@ func TestCommit_AppendV2(t *testing.T) {
|
|||
// Check that the WAL contained the same number of committed series/samples/exemplars.
|
||||
require.Equal(t, numSeries*5, walSeriesCount, "unexpected number of series")
|
||||
require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples")
|
||||
require.Equal(t, expectedSampleSTs, gotSampleSTs, "unexpected STs received")
|
||||
require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars")
|
||||
require.Equal(t, numSeries*numHistograms*2, walHistogramCount, "unexpected number of histograms")
|
||||
require.Equal(t, numSeries*numHistograms*2, walFloatHistogramCount, "unexpected number of float histograms")
|
||||
|
||||
// Check that we can still create both kinds of Appender - see https://github.com/prometheus/prometheus/issues/17800.
|
||||
_ = s.Appender(context.TODO())
|
||||
_ = s.AppenderV2(context.TODO())
|
||||
// Check that we can still create both kinds of Appender.
|
||||
// Regression test against https://github.com/prometheus/prometheus/issues/17800.
|
||||
_ = s.Appender(t.Context())
|
||||
_ = s.AppenderV2(t.Context())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue