manager: Fixed CT OMText conversion bug; Refactored tests.

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2024-10-01 13:12:46 +01:00
parent e62f0c8d5b
commit 5061ae7ec4
4 changed files with 171 additions and 204 deletions

View file

@ -91,10 +91,9 @@ func New(b []byte, contentType string, parseClassicHistograms, skipOMCTSeries bo
}
switch mediaType {
case "application/openmetrics-text":
opts := func(o *openMetricsParserOptions) {
return NewOpenMetricsParser(b, st, func(o *openMetricsParserOptions) {
o.SkipCTSeries = skipOMCTSeries
}
return NewOpenMetricsParser(b, st, opts), nil
}), nil
case "application/vnd.google.protobuf":
return NewProtobufParser(b, parseClassicHistograms, st), nil
default:

View file

@ -297,7 +297,10 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
// CT line for a different series, for our series no CT.
return nil
}
ct := int64(peek.val)
// All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds.
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps
ct := int64(peek.val * 1000.0)
return &ct
}
}

View file

@ -69,23 +69,23 @@ testmetric{label="\"bar\""} 1
# HELP foo Counter with and without labels to certify CT is parsed for both cases
# TYPE foo counter
foo_total 17.0 1520879607.789 # {id="counter-test"} 5
foo_created 1000
foo_created 1520872607.123
foo_total{a="b"} 17.0 1520879607.789 # {id="counter-test"} 5
foo_created{a="b"} 1000
foo_created{a="b"} 1520872607.123
# HELP bar Summary with CT at the end, making sure we find CT even if it's multiple lines a far
# TYPE bar summary
bar_count 17.0
bar_sum 324789.3
bar{quantile="0.95"} 123.7
bar{quantile="0.99"} 150.0
bar_created 1520430000
bar_created 1520872607.123
# HELP baz Histogram with the same objective as above's summary
# TYPE baz histogram
baz_bucket{le="0.0"} 0
baz_bucket{le="+Inf"} 17
baz_count 17
baz_sum 324789.3
baz_created 1520430000
baz_created 1520872607.123
# HELP fizz_created Gauge which shouldn't be parsed as CT
# TYPE fizz_created gauge
fizz_created 17.0`
@ -250,14 +250,14 @@ fizz_created 17.0`
lset: labels.FromStrings("__name__", "foo_total"),
t: int64p(1520879607789),
e: &exemplar.Exemplar{Labels: labels.FromStrings("id", "counter-test"), Value: 5},
ct: int64p(1000),
ct: int64p(1520872607123),
}, {
m: `foo_total{a="b"}`,
v: 17.0,
lset: labels.FromStrings("__name__", "foo_total", "a", "b"),
t: int64p(1520879607789),
e: &exemplar.Exemplar{Labels: labels.FromStrings("id", "counter-test"), Value: 5},
ct: int64p(1000),
ct: int64p(1520872607123),
}, {
m: "bar",
help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far",
@ -268,22 +268,22 @@ fizz_created 17.0`
m: "bar_count",
v: 17.0,
lset: labels.FromStrings("__name__", "bar_count"),
ct: int64p(1520430000),
ct: int64p(1520872607123),
}, {
m: "bar_sum",
v: 324789.3,
lset: labels.FromStrings("__name__", "bar_sum"),
ct: int64p(1520430000),
ct: int64p(1520872607123),
}, {
m: `bar{quantile="0.95"}`,
v: 123.7,
lset: labels.FromStrings("__name__", "bar", "quantile", "0.95"),
ct: int64p(1520430000),
ct: int64p(1520872607123),
}, {
m: `bar{quantile="0.99"}`,
v: 150.0,
lset: labels.FromStrings("__name__", "bar", "quantile", "0.99"),
ct: int64p(1520430000),
ct: int64p(1520872607123),
}, {
m: "baz",
help: "Histogram with the same objective as above's summary",
@ -294,22 +294,22 @@ fizz_created 17.0`
m: `baz_bucket{le="0.0"}`,
v: 0,
lset: labels.FromStrings("__name__", "baz_bucket", "le", "0.0"),
ct: int64p(1520430000),
ct: int64p(1520872607123),
}, {
m: `baz_bucket{le="+Inf"}`,
v: 17,
lset: labels.FromStrings("__name__", "baz_bucket", "le", "+Inf"),
ct: int64p(1520430000),
ct: int64p(1520872607123),
}, {
m: `baz_count`,
v: 17,
lset: labels.FromStrings("__name__", "baz_count"),
ct: int64p(1520430000),
ct: int64p(1520872607123),
}, {
m: `baz_sum`,
v: 324789.3,
lset: labels.FromStrings("__name__", "baz_sum"),
ct: int64p(1520430000),
ct: int64p(1520872607123),
}, {
m: "fizz_created",
help: "Gauge which shouldn't be parsed as CT",
@ -346,7 +346,7 @@ func TestUTF8OpenMetricsParse(t *testing.T) {
# UNIT "go.gc_duration_seconds" seconds
{"go.gc_duration_seconds",quantile="0"} 4.9351e-05
{"go.gc_duration_seconds",quantile="0.25"} 7.424100000000001e-05
{"go.gc_duration_seconds_created"} 12313
{"go.gc_duration_seconds_created"} 1520872607.123
{"go.gc_duration_seconds",quantile="0.5",a="b"} 8.3835e-05
{"http.status",q="0.9",a="b"} 8.3835e-05
{"http.status",q="0.9",a="b"} 8.3835e-05
@ -370,12 +370,12 @@ func TestUTF8OpenMetricsParse(t *testing.T) {
m: `{"go.gc_duration_seconds",quantile="0"}`,
v: 4.9351e-05,
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0"),
ct: int64p(12313),
ct: int64p(1520872607123),
}, {
m: `{"go.gc_duration_seconds",quantile="0.25"}`,
v: 7.424100000000001e-05,
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0.25"),
ct: int64p(12313),
ct: int64p(1520872607123),
}, {
m: `{"go.gc_duration_seconds",quantile="0.5",a="b"}`,
v: 8.3835e-05,
@ -787,12 +787,12 @@ func TestCTParseFailures(t *testing.T) {
# TYPE something histogram
something_count 17
something_sum 324789.3
something_created 1520430001
something_created 1520872607.123
something_bucket{le="0.0"} 0
something_bucket{le="+Inf"} 17
# HELP thing Histogram with _created as first line
# TYPE thing histogram
thing_created 1520430002
thing_created 1520872607.123
thing_count 17
thing_sum 324789.3
thing_bucket{le="0.0"} 0
@ -801,12 +801,12 @@ thing_bucket{le="+Inf"} 17
# TYPE yum summary
yum_count 17.0
yum_sum 324789.3
yum_created 1520430003
yum_created 1520872607.123
yum{quantile="0.95"} 123.7
yum{quantile="0.99"} 150.0
# HELP foobar Summary with _created as the first line
# TYPE foobar summary
foobar_created 1520430004
foobar_created 1520872607.123
foobar_count 17.0
foobar_sum 324789.3
foobar{quantile="0.95"} 123.7
@ -835,19 +835,19 @@ foobar{quantile="0.99"} 150.0`
isErr: false,
}, {
m: `something_count`,
ct: int64p(1520430001),
ct: int64p(1520872607123),
isErr: false,
}, {
m: `something_sum`,
ct: int64p(1520430001),
ct: int64p(1520872607123),
isErr: false,
}, {
m: `something_bucket{le="0.0"}`,
ct: int64p(1520430001),
ct: int64p(1520872607123),
isErr: true,
}, {
m: `something_bucket{le="+Inf"}`,
ct: int64p(1520430001),
ct: int64p(1520872607123),
isErr: true,
}, {
m: "thing",
@ -859,19 +859,19 @@ foobar{quantile="0.99"} 150.0`
isErr: false,
}, {
m: `thing_count`,
ct: int64p(1520430002),
ct: int64p(1520872607123),
isErr: true,
}, {
m: `thing_sum`,
ct: int64p(1520430002),
ct: int64p(1520872607123),
isErr: true,
}, {
m: `thing_bucket{le="0.0"}`,
ct: int64p(1520430002),
ct: int64p(1520872607123),
isErr: true,
}, {
m: `thing_bucket{le="+Inf"}`,
ct: int64p(1520430002),
ct: int64p(1520872607123),
isErr: true,
}, {
m: "yum",
@ -883,19 +883,19 @@ foobar{quantile="0.99"} 150.0`
isErr: false,
}, {
m: "yum_count",
ct: int64p(1520430003),
ct: int64p(1520872607123),
isErr: false,
}, {
m: "yum_sum",
ct: int64p(1520430003),
ct: int64p(1520872607123),
isErr: false,
}, {
m: `yum{quantile="0.95"}`,
ct: int64p(1520430003),
ct: int64p(1520872607123),
isErr: true,
}, {
m: `yum{quantile="0.99"}`,
ct: int64p(1520430003),
ct: int64p(1520872607123),
isErr: true,
}, {
m: "foobar",

View file

@ -14,6 +14,7 @@
package scrape
import (
"bytes"
"context"
"fmt"
"net/http"
@ -30,7 +31,9 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/yaml.v2"
@ -721,7 +724,7 @@ scrape_configs:
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
}
func setupScrapeManager(t *testing.T, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) {
func setupScrapeManager(t *testing.T, honorTimestamps bool, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) {
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{
@ -742,7 +745,7 @@ func setupScrapeManager(t *testing.T, enableCTZeroIngestion bool) (*collectResul
ScrapeTimeout: model.Duration(5 * time.Second),
ScrapeProtocols: []config.ScrapeProtocol{config.OpenMetricsText1_0_0, config.PrometheusProto},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test", HonorTimestamps: honorTimestamps}},
}))
return app, scrapeManager
@ -771,183 +774,145 @@ func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server
return server
}
func prepareWriteData(t *testing.T, mName, typ string, counterSampleProto *dto.Counter, counterSampleText string) []byte {
var toWrite []byte
switch typ {
case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
ctrType := dto.MetricType_COUNTER
toWrite = protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Counter: counterSampleProto}},
})
case "application/openmetrics-text; version=1.0.0; charset=utf-8":
toWrite = []byte(counterSampleText)
}
return toWrite
}
// TestManagerCTZeroIngestion tests scrape manager for CT cases.
// TestManagerCTZeroIngestion tests scrape manager for various CT cases.
func TestManagerCTZeroIngestion(t *testing.T) {
const mName = "expected_counter"
const (
// _total suffix is required, otherwise expfmt with OMText will mark metric as "unknown"
expectedMetricName = "expected_metric_total"
expectedCreatedMetricName = "expected_metric_created"
expectedSampleValue = 17.0
)
type expectCTLineAppended struct {
value float64
ts int64
}
for _, testFormat := range []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0} {
t.Run(fmt.Sprintf("format=%s", testFormat), func(t *testing.T) {
for _, testWithCT := range []bool{false, true} {
t.Run(fmt.Sprintf("withCT=%v", testWithCT), func(t *testing.T) {
for _, testCTZeroIngest := range []bool{false, true} {
t.Run(fmt.Sprintf("ctZeroIngest=%v", testCTZeroIngest), func(t *testing.T) {
sampleTs := time.Now()
ctTs := time.Time{}
if testWithCT {
ctTs = sampleTs.Add(-2 * time.Minute)
}
for _, tc := range []struct {
name string
counterSampleProto *dto.Counter
counterSampleText string
enableCTZeroIngestion bool
expectCTLineAppended []expectCTLineAppended
typ string
}{
{
name: "Protobuf disabled with CT on counter",
counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(),
},
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
},
{
name: "Protobuf enabled with CT on counter",
counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(),
},
enableCTZeroIngestion: true,
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
},
{
name: "Protobuf enabled without CT on counter",
counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0),
},
enableCTZeroIngestion: true,
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
},
{
name: "OMText disabled with CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
expected_counter_created 1000
# EOF`,
expectCTLineAppended: []expectCTLineAppended{{
value: 17.0,
ts: 1520879607789,
}},
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
},
{
name: "OMText enabled with CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
expected_counter_created 1000
# EOF`,
enableCTZeroIngestion: true,
expectCTLineAppended: []expectCTLineAppended{
{
value: 0.0,
ts: 1000,
},
{
value: 17.0,
ts: 1520879607789,
},
},
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
},
{
name: "OMText enabled without CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
# EOF`,
enableCTZeroIngestion: true,
expectCTLineAppended: []expectCTLineAppended{{
value: 17.0,
ts: 1520879607789,
}},
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
},
} {
t.Run(tc.name, func(t *testing.T) {
app, scrapeManager := setupScrapeManager(t, tc.enableCTZeroIngestion)
// TODO(bwplotka): Add more types than just counter?
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, ctTs)
app, scrapeManager := setupScrapeManager(t, true, testCTZeroIngest)
toWrite := prepareWriteData(t, mName, tc.typ, tc.counterSampleProto, tc.counterSampleText)
// Perform the test.
doOneScrape(t, scrapeManager, app, setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded))
server := setupTestServer(t, tc.typ, toWrite)
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Verify results.
// Verify what we got vs expectations around CT injection.
samples := findSamplesForMetric(app.resultFloats, expectedMetricName)
if testWithCT && testCTZeroIngest {
require.Len(t, samples, 2)
require.Equal(t, 0.0, samples[0].f)
require.Equal(t, timestamp.FromTime(ctTs), samples[0].t)
require.Equal(t, expectedSampleValue, samples[1].f)
require.Equal(t, timestamp.FromTime(sampleTs), samples[1].t)
} else {
require.Len(t, samples, 1)
require.Equal(t, expectedSampleValue, samples[0].f)
require.Equal(t, timestamp.FromTime(sampleTs), samples[0].t)
}
// Add fake target directly into tsets + reload
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
}},
})
scrapeManager.reload()
var got []float64
// Wait for one scrape.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
app.mtx.Lock()
defer app.mtx.Unlock()
// Check if scrape happened and grab the relevant samples, they have to be there - or it's a bug
// and it's not worth waiting.
for _, f := range app.resultFloats {
if f.metric.Get(model.MetricNameLabel) == mName {
got = append(got, f.f)
// Verify what we got vs expectations around additional _created series for OM text.
// enableCTZeroInjection also kills that _created line.
createdSeriesSamples := findSamplesForMetric(app.resultFloats, expectedCreatedMetricName)
if testFormat == config.OpenMetricsText1_0_0 && testWithCT && !testCTZeroIngest {
// For OM Text, when counter has CT, and feature flag disabled we should see _created lines.
require.Len(t, createdSeriesSamples, 1)
// Conversion taken from common/expfmt.writeOpenMetricsFloat.
// We don't check the ct timestamp as explicit ts was not implemented in expfmt.Encoder,
// but exists in OM https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#:~:text=An%20example%20with%20a%20Metric%20with%20no%20labels%2C%20and%20a%20MetricPoint%20with%20a%20timestamp%20and%20a%20created
// We can implement this, but we want to potentially get rid of OM 1.0 CT lines
require.Equal(t, float64(timestamppb.New(ctTs).AsTime().UnixNano())/1e9, createdSeriesSamples[0].f)
} else {
require.Len(t, createdSeriesSamples, 0)
}
})
}
}
if len(app.resultFloats) > 0 {
return nil
}
return fmt.Errorf("expected some samples, got none")
}), "after 1 minute")
scrapeManager.Stop()
switch tc.typ {
case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
// Check for zero samples, assuming we only injected always one sample.
// Did it contain CT to inject? If yes, was CT zero enabled?
if tc.counterSampleProto.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
require.Len(t, got, 2)
require.Equal(t, 0.0, got[0])
require.Equal(t, tc.counterSampleProto.GetValue(), got[1])
return
}
// Expect only one, valid sample.
require.Len(t, got, 1)
require.Equal(t, tc.counterSampleProto.GetValue(), got[0])
case "application/openmetrics-text; version=1.0.0; charset=utf-8":
require.Len(t, got, len(tc.expectCTLineAppended))
for i, e := range tc.expectCTLineAppended {
require.Equal(t, e.value, got[i])
}
// We expect _created lines to be appended as a new metric if ct ingestion is disabled
if !tc.enableCTZeroIngestion {
require.Equal(t, "expected_counter_created", app.resultFloats[1].metric.Get(model.MetricNameLabel))
}
})
}
})
}
}
func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName string, v float64, ts time.Time, ct time.Time) (encoded []byte) {
t.Helper()
counter := &dto.Counter{Value: proto.Float64(v)}
if !ct.IsZero() {
counter.CreatedTimestamp = timestamppb.New(ct)
}
ctrType := dto.MetricType_COUNTER
inputMetric := &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{
TimestampMs: proto.Int64(timestamp.FromTime(ts)),
Counter: counter,
}},
}
switch format {
case config.PrometheusProto:
return protoMarshalDelimited(t, inputMetric)
case config.OpenMetricsText1_0_0:
buf := &bytes.Buffer{}
require.NoError(t, expfmt.NewEncoder(buf, expfmt.NewFormat(expfmt.TypeOpenMetrics), expfmt.WithCreatedLines(), expfmt.WithUnit()).Encode(inputMetric))
_, _ = buf.WriteString("# EOF")
t.Log("produced OM text to expose:", buf.String())
return buf.Bytes()
default:
t.Fatalf("not implemented format: %v", format)
return nil
}
}
func doOneScrape(t *testing.T, manager *Manager, appender *collectResultAppender, server *httptest.Server) {
t.Helper()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload
manager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
}},
})
manager.reload()
// Wait for one scrape.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
appender.mtx.Lock()
defer appender.mtx.Unlock()
// Check if scrape happened and grab the relevant samples.
if len(appender.resultFloats) > 0 {
return nil
}
return fmt.Errorf("expected some float samples, got none")
}), "after 1 minute")
manager.Stop()
}
func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) {
for _, f := range floats {
if f.metric.Get(model.MetricNameLabel) == metricName {
ret = append(ret, f)
}
}
return ret
}
func TestUnregisterMetrics(t *testing.T) {
reg := prometheus.NewRegistry()
// Check that all metrics can be unregistered, allowing a second manager to be created.