From 5061ae7ec4aa53254784c21f0f5d4e7972922b89 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Tue, 1 Oct 2024 13:12:46 +0100 Subject: [PATCH] manager: Fixed CT OMText conversion bug; Refactored tests. Signed-off-by: bwplotka --- model/textparse/interface.go | 5 +- model/textparse/openmetricsparse.go | 5 +- model/textparse/openmetricsparse_test.go | 66 ++--- scrape/manager_test.go | 299 ++++++++++------------- 4 files changed, 171 insertions(+), 204 deletions(-) diff --git a/model/textparse/interface.go b/model/textparse/interface.go index 1e1c3fd59e..7de88a4869 100644 --- a/model/textparse/interface.go +++ b/model/textparse/interface.go @@ -91,10 +91,9 @@ func New(b []byte, contentType string, parseClassicHistograms, skipOMCTSeries bo } switch mediaType { case "application/openmetrics-text": - opts := func(o *openMetricsParserOptions) { + return NewOpenMetricsParser(b, st, func(o *openMetricsParserOptions) { o.SkipCTSeries = skipOMCTSeries - } - return NewOpenMetricsParser(b, st, opts), nil + }), nil case "application/vnd.google.protobuf": return NewProtobufParser(b, parseClassicHistograms, st), nil default: diff --git a/model/textparse/openmetricsparse.go b/model/textparse/openmetricsparse.go index ea7607c3a7..8ec1b62ffb 100644 --- a/model/textparse/openmetricsparse.go +++ b/model/textparse/openmetricsparse.go @@ -297,7 +297,10 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 { // CT line for a different series, for our series no CT. return nil } - ct := int64(peek.val) + + // All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds. + // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps + ct := int64(peek.val * 1000.0) return &ct } } diff --git a/model/textparse/openmetricsparse_test.go b/model/textparse/openmetricsparse_test.go index bcbdb85276..6af5721b51 100644 --- a/model/textparse/openmetricsparse_test.go +++ b/model/textparse/openmetricsparse_test.go @@ -69,23 +69,23 @@ testmetric{label="\"bar\""} 1 # HELP foo Counter with and without labels to certify CT is parsed for both cases # TYPE foo counter foo_total 17.0 1520879607.789 # {id="counter-test"} 5 -foo_created 1000 +foo_created 1520872607.123 foo_total{a="b"} 17.0 1520879607.789 # {id="counter-test"} 5 -foo_created{a="b"} 1000 +foo_created{a="b"} 1520872607.123 # HELP bar Summary with CT at the end, making sure we find CT even if it's multiple lines a far # TYPE bar summary bar_count 17.0 bar_sum 324789.3 bar{quantile="0.95"} 123.7 bar{quantile="0.99"} 150.0 -bar_created 1520430000 +bar_created 1520872607.123 # HELP baz Histogram with the same objective as above's summary # TYPE baz histogram baz_bucket{le="0.0"} 0 baz_bucket{le="+Inf"} 17 baz_count 17 baz_sum 324789.3 -baz_created 1520430000 +baz_created 1520872607.123 # HELP fizz_created Gauge which shouldn't be parsed as CT # TYPE fizz_created gauge fizz_created 17.0` @@ -250,14 +250,14 @@ fizz_created 17.0` lset: labels.FromStrings("__name__", "foo_total"), t: int64p(1520879607789), e: &exemplar.Exemplar{Labels: labels.FromStrings("id", "counter-test"), Value: 5}, - ct: int64p(1000), + ct: int64p(1520872607123), }, { m: `foo_total{a="b"}`, v: 17.0, lset: labels.FromStrings("__name__", "foo_total", "a", "b"), t: int64p(1520879607789), e: &exemplar.Exemplar{Labels: labels.FromStrings("id", "counter-test"), Value: 5}, - ct: int64p(1000), + ct: int64p(1520872607123), }, { m: "bar", help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far", @@ -268,22 +268,22 @@ fizz_created 17.0` m: "bar_count", v: 17.0, lset: labels.FromStrings("__name__", "bar_count"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: "bar_sum", v: 324789.3, lset: labels.FromStrings("__name__", "bar_sum"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: `bar{quantile="0.95"}`, v: 123.7, lset: labels.FromStrings("__name__", "bar", "quantile", "0.95"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: `bar{quantile="0.99"}`, v: 150.0, lset: labels.FromStrings("__name__", "bar", "quantile", "0.99"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: "baz", help: "Histogram with the same objective as above's summary", @@ -294,22 +294,22 @@ fizz_created 17.0` m: `baz_bucket{le="0.0"}`, v: 0, lset: labels.FromStrings("__name__", "baz_bucket", "le", "0.0"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: `baz_bucket{le="+Inf"}`, v: 17, lset: labels.FromStrings("__name__", "baz_bucket", "le", "+Inf"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: `baz_count`, v: 17, lset: labels.FromStrings("__name__", "baz_count"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: `baz_sum`, v: 324789.3, lset: labels.FromStrings("__name__", "baz_sum"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: "fizz_created", help: "Gauge which shouldn't be parsed as CT", @@ -346,7 +346,7 @@ func TestUTF8OpenMetricsParse(t *testing.T) { # UNIT "go.gc_duration_seconds" seconds {"go.gc_duration_seconds",quantile="0"} 4.9351e-05 {"go.gc_duration_seconds",quantile="0.25"} 7.424100000000001e-05 -{"go.gc_duration_seconds_created"} 12313 +{"go.gc_duration_seconds_created"} 1520872607.123 {"go.gc_duration_seconds",quantile="0.5",a="b"} 8.3835e-05 {"http.status",q="0.9",a="b"} 8.3835e-05 {"http.status",q="0.9",a="b"} 8.3835e-05 @@ -370,12 +370,12 @@ func TestUTF8OpenMetricsParse(t *testing.T) { m: `{"go.gc_duration_seconds",quantile="0"}`, v: 4.9351e-05, lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0"), - ct: int64p(12313), + ct: int64p(1520872607123), }, { m: `{"go.gc_duration_seconds",quantile="0.25"}`, v: 7.424100000000001e-05, lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0.25"), - ct: int64p(12313), + ct: int64p(1520872607123), }, { m: `{"go.gc_duration_seconds",quantile="0.5",a="b"}`, v: 8.3835e-05, @@ -787,12 +787,12 @@ func TestCTParseFailures(t *testing.T) { # TYPE something histogram something_count 17 something_sum 324789.3 -something_created 1520430001 +something_created 1520872607.123 something_bucket{le="0.0"} 0 something_bucket{le="+Inf"} 17 # HELP thing Histogram with _created as first line # TYPE thing histogram -thing_created 1520430002 +thing_created 1520872607.123 thing_count 17 thing_sum 324789.3 thing_bucket{le="0.0"} 0 @@ -801,12 +801,12 @@ thing_bucket{le="+Inf"} 17 # TYPE yum summary yum_count 17.0 yum_sum 324789.3 -yum_created 1520430003 +yum_created 1520872607.123 yum{quantile="0.95"} 123.7 yum{quantile="0.99"} 150.0 # HELP foobar Summary with _created as the first line # TYPE foobar summary -foobar_created 1520430004 +foobar_created 1520872607.123 foobar_count 17.0 foobar_sum 324789.3 foobar{quantile="0.95"} 123.7 @@ -835,19 +835,19 @@ foobar{quantile="0.99"} 150.0` isErr: false, }, { m: `something_count`, - ct: int64p(1520430001), + ct: int64p(1520872607123), isErr: false, }, { m: `something_sum`, - ct: int64p(1520430001), + ct: int64p(1520872607123), isErr: false, }, { m: `something_bucket{le="0.0"}`, - ct: int64p(1520430001), + ct: int64p(1520872607123), isErr: true, }, { m: `something_bucket{le="+Inf"}`, - ct: int64p(1520430001), + ct: int64p(1520872607123), isErr: true, }, { m: "thing", @@ -859,19 +859,19 @@ foobar{quantile="0.99"} 150.0` isErr: false, }, { m: `thing_count`, - ct: int64p(1520430002), + ct: int64p(1520872607123), isErr: true, }, { m: `thing_sum`, - ct: int64p(1520430002), + ct: int64p(1520872607123), isErr: true, }, { m: `thing_bucket{le="0.0"}`, - ct: int64p(1520430002), + ct: int64p(1520872607123), isErr: true, }, { m: `thing_bucket{le="+Inf"}`, - ct: int64p(1520430002), + ct: int64p(1520872607123), isErr: true, }, { m: "yum", @@ -883,19 +883,19 @@ foobar{quantile="0.99"} 150.0` isErr: false, }, { m: "yum_count", - ct: int64p(1520430003), + ct: int64p(1520872607123), isErr: false, }, { m: "yum_sum", - ct: int64p(1520430003), + ct: int64p(1520872607123), isErr: false, }, { m: `yum{quantile="0.95"}`, - ct: int64p(1520430003), + ct: int64p(1520872607123), isErr: true, }, { m: `yum{quantile="0.99"}`, - ct: int64p(1520430003), + ct: int64p(1520872607123), isErr: true, }, { m: "foobar", diff --git a/scrape/manager_test.go b/scrape/manager_test.go index b759d97872..63a99a9b76 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -14,6 +14,7 @@ package scrape import ( + "bytes" "context" "fmt" "net/http" @@ -30,7 +31,9 @@ import ( "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/timestamp" "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/timestamppb" "gopkg.in/yaml.v2" @@ -721,7 +724,7 @@ scrape_configs: require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) } -func setupScrapeManager(t *testing.T, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) { +func setupScrapeManager(t *testing.T, honorTimestamps bool, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) { app := &collectResultAppender{} scrapeManager, err := NewManager( &Options{ @@ -742,7 +745,7 @@ func setupScrapeManager(t *testing.T, enableCTZeroIngestion bool) (*collectResul ScrapeTimeout: model.Duration(5 * time.Second), ScrapeProtocols: []config.ScrapeProtocol{config.OpenMetricsText1_0_0, config.PrometheusProto}, }, - ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test", HonorTimestamps: honorTimestamps}}, })) return app, scrapeManager @@ -771,183 +774,145 @@ func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server return server } -func prepareWriteData(t *testing.T, mName, typ string, counterSampleProto *dto.Counter, counterSampleText string) []byte { - var toWrite []byte - switch typ { - case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited": - ctrType := dto.MetricType_COUNTER - toWrite = protoMarshalDelimited(t, &dto.MetricFamily{ - Name: proto.String(mName), - Type: &ctrType, - Metric: []*dto.Metric{{Counter: counterSampleProto}}, - }) - case "application/openmetrics-text; version=1.0.0; charset=utf-8": - toWrite = []byte(counterSampleText) - } - return toWrite -} - -// TestManagerCTZeroIngestion tests scrape manager for CT cases. +// TestManagerCTZeroIngestion tests scrape manager for various CT cases. func TestManagerCTZeroIngestion(t *testing.T) { - const mName = "expected_counter" + const ( + // _total suffix is required, otherwise expfmt with OMText will mark metric as "unknown" + expectedMetricName = "expected_metric_total" + expectedCreatedMetricName = "expected_metric_created" + expectedSampleValue = 17.0 + ) - type expectCTLineAppended struct { - value float64 - ts int64 - } + for _, testFormat := range []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0} { + t.Run(fmt.Sprintf("format=%s", testFormat), func(t *testing.T) { + for _, testWithCT := range []bool{false, true} { + t.Run(fmt.Sprintf("withCT=%v", testWithCT), func(t *testing.T) { + for _, testCTZeroIngest := range []bool{false, true} { + t.Run(fmt.Sprintf("ctZeroIngest=%v", testCTZeroIngest), func(t *testing.T) { + sampleTs := time.Now() + ctTs := time.Time{} + if testWithCT { + ctTs = sampleTs.Add(-2 * time.Minute) + } - for _, tc := range []struct { - name string - counterSampleProto *dto.Counter - counterSampleText string - enableCTZeroIngestion bool - expectCTLineAppended []expectCTLineAppended - typ string - }{ - { - name: "Protobuf disabled with CT on counter", - counterSampleProto: &dto.Counter{ - Value: proto.Float64(1.0), - // Timestamp does not matter as long as it exists in this test. - CreatedTimestamp: timestamppb.Now(), - }, - typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited", - }, - { - name: "Protobuf enabled with CT on counter", - counterSampleProto: &dto.Counter{ - Value: proto.Float64(1.0), - // Timestamp does not matter as long as it exists in this test. - CreatedTimestamp: timestamppb.Now(), - }, - enableCTZeroIngestion: true, - typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited", - }, - { - name: "Protobuf enabled without CT on counter", - counterSampleProto: &dto.Counter{ - Value: proto.Float64(1.0), - }, - enableCTZeroIngestion: true, - typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited", - }, - { - name: "OMText disabled with CT on counter", - counterSampleText: `# TYPE expected_counter counter -expected_counter 17.0 1520879607.789 -expected_counter_created 1000 -# EOF`, - expectCTLineAppended: []expectCTLineAppended{{ - value: 17.0, - ts: 1520879607789, - }}, - typ: "application/openmetrics-text; version=1.0.0; charset=utf-8", - }, - { - name: "OMText enabled with CT on counter", - counterSampleText: `# TYPE expected_counter counter -expected_counter 17.0 1520879607.789 -expected_counter_created 1000 -# EOF`, - enableCTZeroIngestion: true, - expectCTLineAppended: []expectCTLineAppended{ - { - value: 0.0, - ts: 1000, - }, - { - value: 17.0, - ts: 1520879607789, - }, - }, - typ: "application/openmetrics-text; version=1.0.0; charset=utf-8", - }, - { - name: "OMText enabled without CT on counter", - counterSampleText: `# TYPE expected_counter counter -expected_counter 17.0 1520879607.789 -# EOF`, - enableCTZeroIngestion: true, - expectCTLineAppended: []expectCTLineAppended{{ - value: 17.0, - ts: 1520879607789, - }}, - typ: "application/openmetrics-text; version=1.0.0; charset=utf-8", - }, - } { - t.Run(tc.name, func(t *testing.T) { - app, scrapeManager := setupScrapeManager(t, tc.enableCTZeroIngestion) + // TODO(bwplotka): Add more types than just counter? + encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, ctTs) + app, scrapeManager := setupScrapeManager(t, true, testCTZeroIngest) - toWrite := prepareWriteData(t, mName, tc.typ, tc.counterSampleProto, tc.counterSampleText) + // Perform the test. + doOneScrape(t, scrapeManager, app, setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded)) - server := setupTestServer(t, tc.typ, toWrite) - serverURL, err := url.Parse(server.URL) - require.NoError(t, err) + // Verify results. + // Verify what we got vs expectations around CT injection. + samples := findSamplesForMetric(app.resultFloats, expectedMetricName) + if testWithCT && testCTZeroIngest { + require.Len(t, samples, 2) + require.Equal(t, 0.0, samples[0].f) + require.Equal(t, timestamp.FromTime(ctTs), samples[0].t) + require.Equal(t, expectedSampleValue, samples[1].f) + require.Equal(t, timestamp.FromTime(sampleTs), samples[1].t) + } else { + require.Len(t, samples, 1) + require.Equal(t, expectedSampleValue, samples[0].f) + require.Equal(t, timestamp.FromTime(sampleTs), samples[0].t) + } - // Add fake target directly into tsets + reload - scrapeManager.updateTsets(map[string][]*targetgroup.Group{ - "test": {{ - Targets: []model.LabelSet{{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - }}, - }}, - }) - scrapeManager.reload() - - var got []float64 - // Wait for one scrape. - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { - app.mtx.Lock() - defer app.mtx.Unlock() - - // Check if scrape happened and grab the relevant samples, they have to be there - or it's a bug - // and it's not worth waiting. - for _, f := range app.resultFloats { - if f.metric.Get(model.MetricNameLabel) == mName { - got = append(got, f.f) + // Verify what we got vs expectations around additional _created series for OM text. + // enableCTZeroInjection also kills that _created line. + createdSeriesSamples := findSamplesForMetric(app.resultFloats, expectedCreatedMetricName) + if testFormat == config.OpenMetricsText1_0_0 && testWithCT && !testCTZeroIngest { + // For OM Text, when counter has CT, and feature flag disabled we should see _created lines. + require.Len(t, createdSeriesSamples, 1) + // Conversion taken from common/expfmt.writeOpenMetricsFloat. + // We don't check the ct timestamp as explicit ts was not implemented in expfmt.Encoder, + // but exists in OM https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#:~:text=An%20example%20with%20a%20Metric%20with%20no%20labels%2C%20and%20a%20MetricPoint%20with%20a%20timestamp%20and%20a%20created + // We can implement this, but we want to potentially get rid of OM 1.0 CT lines + require.Equal(t, float64(timestamppb.New(ctTs).AsTime().UnixNano())/1e9, createdSeriesSamples[0].f) + } else { + require.Len(t, createdSeriesSamples, 0) + } + }) } - } - - if len(app.resultFloats) > 0 { - return nil - } - return fmt.Errorf("expected some samples, got none") - }), "after 1 minute") - scrapeManager.Stop() - - switch tc.typ { - case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited": - // Check for zero samples, assuming we only injected always one sample. - // Did it contain CT to inject? If yes, was CT zero enabled? - if tc.counterSampleProto.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion { - require.Len(t, got, 2) - require.Equal(t, 0.0, got[0]) - require.Equal(t, tc.counterSampleProto.GetValue(), got[1]) - return - } - - // Expect only one, valid sample. - require.Len(t, got, 1) - require.Equal(t, tc.counterSampleProto.GetValue(), got[0]) - - case "application/openmetrics-text; version=1.0.0; charset=utf-8": - require.Len(t, got, len(tc.expectCTLineAppended)) - for i, e := range tc.expectCTLineAppended { - require.Equal(t, e.value, got[i]) - } - - // We expect _created lines to be appended as a new metric if ct ingestion is disabled - if !tc.enableCTZeroIngestion { - require.Equal(t, "expected_counter_created", app.resultFloats[1].metric.Get(model.MetricNameLabel)) - } + }) } }) } } +func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName string, v float64, ts time.Time, ct time.Time) (encoded []byte) { + t.Helper() + + counter := &dto.Counter{Value: proto.Float64(v)} + if !ct.IsZero() { + counter.CreatedTimestamp = timestamppb.New(ct) + } + ctrType := dto.MetricType_COUNTER + inputMetric := &dto.MetricFamily{ + Name: proto.String(mName), + Type: &ctrType, + Metric: []*dto.Metric{{ + TimestampMs: proto.Int64(timestamp.FromTime(ts)), + Counter: counter, + }}, + } + switch format { + case config.PrometheusProto: + return protoMarshalDelimited(t, inputMetric) + case config.OpenMetricsText1_0_0: + buf := &bytes.Buffer{} + require.NoError(t, expfmt.NewEncoder(buf, expfmt.NewFormat(expfmt.TypeOpenMetrics), expfmt.WithCreatedLines(), expfmt.WithUnit()).Encode(inputMetric)) + _, _ = buf.WriteString("# EOF") + + t.Log("produced OM text to expose:", buf.String()) + return buf.Bytes() + default: + t.Fatalf("not implemented format: %v", format) + return nil + } +} + +func doOneScrape(t *testing.T, manager *Manager, appender *collectResultAppender, server *httptest.Server) { + t.Helper() + + serverURL, err := url.Parse(server.URL) + require.NoError(t, err) + + // Add fake target directly into tsets + reload + manager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }}, + }}, + }) + manager.reload() + + // Wait for one scrape. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { + appender.mtx.Lock() + defer appender.mtx.Unlock() + + // Check if scrape happened and grab the relevant samples. + if len(appender.resultFloats) > 0 { + return nil + } + return fmt.Errorf("expected some float samples, got none") + }), "after 1 minute") + manager.Stop() +} + +func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) { + for _, f := range floats { + if f.metric.Get(model.MetricNameLabel) == metricName { + ret = append(ret, f) + } + } + return ret +} + func TestUnregisterMetrics(t *testing.T) { reg := prometheus.NewRegistry() // Check that all metrics can be unregistered, allowing a second manager to be created.