From 9f93c2d2e13798c22229347d2606a1e7108ad27b Mon Sep 17 00:00:00 2001 From: Minh Nguyen <148210689+pipiland2612@users.noreply.github.com> Date: Fri, 31 Oct 2025 13:38:40 +0200 Subject: [PATCH] promtool: Add Remote Write 2.0 support to push metrics command (#17417) * add feature flag for remote write v2 Signed-off-by: pipiland2612 * change from number to protobuf_message Signed-off-by: pipiland2612 * fix test Signed-off-by: pipiland2612 * fix name Signed-off-by: pipiland2612 * run make cli-documentation Signed-off-by: pipiland2612 * fix help Signed-off-by: pipiland2612 * run make cli-documentation Signed-off-by: pipiland2612 --------- Signed-off-by: pipiland2612 --- cmd/promtool/main.go | 3 +- cmd/promtool/metrics.go | 23 ++++++--- cmd/promtool/metrics_test.go | 88 ++++++++++++++++++++++++++--------- docs/command-line/promtool.md | 1 + 4 files changed, 85 insertions(+), 30 deletions(-) diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 0c64b19736..7364a5b4b5 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -222,6 +222,7 @@ func main() { pushMetricsLabels := pushMetricsCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times.").Default("job=promtool").StringMap() pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration() pushMetricsHeaders := pushMetricsCmd.Flag("header", "Prometheus remote write header.").StringMap() + pushMetricsProtoMsg := pushMetricsCmd.Flag("protobuf_message", "Protobuf message to use when writing (prometheus.WriteRequest or io.prometheus.write.v2.Request).").Default("prometheus.WriteRequest").String() testCmd := app.Command("test", "Unit testing.") junitOutFile := testCmd.Flag("junit", "File path to store JUnit XML test results.").OpenFile(os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) @@ -377,7 +378,7 @@ func main() { os.Exit(CheckMetrics(*checkMetricsExtended)) case pushMetricsCmd.FullCommand(): - os.Exit(PushMetrics(remoteWriteURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *pushMetricsLabels, *metricFiles...)) + os.Exit(PushMetrics(remoteWriteURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *pushMetricsProtoMsg, *pushMetricsLabels, *metricFiles...)) case queryInstantCmd.FullCommand(): os.Exit(QueryInstant(serverURL, httpRoundTripper, *queryInstantExpr, *queryInstantTime, p)) diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index ee9c03d61c..c21ef15fd8 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -29,7 +29,7 @@ import ( ) // PushMetrics to a prometheus remote write (for testing purpose only). -func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, labels map[string]string, files ...string) int { +func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, protoMsg string, labels map[string]string, files ...string) int { addressURL, err := url.Parse(url.String()) if err != nil { fmt.Fprintln(os.Stderr, err) @@ -52,6 +52,18 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin return failureExitCode } + // Determine message type based on protobuf_message flag. + var messageType remoteapi.WriteMessageType + switch protoMsg { + case "io.prometheus.write.v2.Request": + messageType = remoteapi.WriteV2MessageType + case "prometheus.WriteRequest": + messageType = remoteapi.WriteV1MessageType + default: + fmt.Fprintf(os.Stderr, " FAILED: invalid protobuf message %q, must be prometheus.WriteRequest or io.prometheus.write.v2.Request\n", protoMsg) + return failureExitCode + } + var data []byte var failed bool @@ -62,7 +74,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin return failureExitCode } fmt.Printf("Parsing standard input\n") - if parseAndPushMetrics(writeAPI, data, labels) { + if parseAndPushMetrics(writeAPI, messageType, data, labels) { fmt.Printf(" SUCCESS: metrics pushed to remote write.\n") return successExitCode } @@ -78,7 +90,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin } fmt.Printf("Parsing metrics file %s\n", file) - if parseAndPushMetrics(writeAPI, data, labels) { + if parseAndPushMetrics(writeAPI, messageType, data, labels) { fmt.Printf(" SUCCESS: metrics file %s pushed to remote write.\n", file) continue } @@ -92,7 +104,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin return successExitCode } -func parseAndPushMetrics(writeAPI *remoteapi.API, data []byte, labels map[string]string) bool { +func parseAndPushMetrics(writeAPI *remoteapi.API, messageType remoteapi.WriteMessageType, data []byte, labels map[string]string) bool { metricsData, err := fmtutil.MetricTextToWriteRequest(bytes.NewReader(data), labels) if err != nil { fmt.Fprintln(os.Stderr, " FAILED:", err) @@ -100,8 +112,7 @@ func parseAndPushMetrics(writeAPI *remoteapi.API, data []byte, labels map[string } // Use remoteapi.Write which handles marshaling and compression internally. - // TODO: Add feature flags to support V2. - _, err = writeAPI.Write(context.Background(), remoteapi.WriteV1MessageType, metricsData) + _, err = writeAPI.Write(context.Background(), messageType, metricsData) if err != nil { fmt.Fprintln(os.Stderr, " FAILED:", err) return false diff --git a/cmd/promtool/metrics_test.go b/cmd/promtool/metrics_test.go index c745869a31..938f1cadfd 100644 --- a/cmd/promtool/metrics_test.go +++ b/cmd/promtool/metrics_test.go @@ -26,64 +26,88 @@ import ( "github.com/stretchr/testify/require" ) +// setupTestServer creates a test server with a mock storage for remote write testing. +func setupTestServer(t *testing.T, supportedTypes remoteapi.MessageTypes) (*httptest.Server, *mockStorage, *url.URL) { + t.Helper() + + store := &mockStorage{} + handler := remoteapi.NewWriteHandler(store, supportedTypes) + server := httptest.NewServer(handler) + t.Cleanup(server.Close) + + serverURL, err := url.Parse(server.URL) + require.NoError(t, err) + + return server, store, serverURL +} + +// createMetricsFile creates a temporary file with the given metrics data. +func createMetricsFile(t *testing.T, metricsData string) string { + t.Helper() + + tmpFile := t.TempDir() + "/metrics.txt" + err := os.WriteFile(tmpFile, []byte(metricsData), 0o644) + require.NoError(t, err) + + return tmpFile +} + func TestPushMetrics(t *testing.T) { tests := []struct { name string metricsData string + protoMsg string + serverTypes remoteapi.MessageTypes }{ { - name: "successful push with gauge metrics", + name: "successful push with gauge metrics (V1)", metricsData: `# HELP test_metric A test metric # TYPE test_metric gauge test_metric{label="value1"} 42.0 test_metric{label="value2"} 43.0 `, + protoMsg: "prometheus.WriteRequest", + serverTypes: remoteapi.MessageTypes{remoteapi.WriteV1MessageType}, }, { - name: "successful push with counter metrics", + name: "successful push with counter metrics (V1)", metricsData: `# HELP test_counter A test counter # TYPE test_counter counter test_counter 100 `, + protoMsg: "prometheus.WriteRequest", + serverTypes: remoteapi.MessageTypes{remoteapi.WriteV1MessageType}, + }, + { + name: "successful push with gauge metrics (V2)", + metricsData: `# HELP test_metric A test metric +# TYPE test_metric gauge +test_metric{label="value1"} 42.0 +test_metric{label="value2"} 43.0 +`, + protoMsg: "io.prometheus.write.v2.Request", + serverTypes: remoteapi.MessageTypes{remoteapi.WriteV2MessageType}, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - // Create test server using client_golang's remote write handler. - store := &mockStorage{} - handler := remoteapi.NewWriteHandler( - store, - remoteapi.MessageTypes{remoteapi.WriteV1MessageType}, - ) + _, store, serverURL := setupTestServer(t, tc.serverTypes) + tmpFile := createMetricsFile(t, tc.metricsData) - server := httptest.NewServer(handler) - defer server.Close() - - serverURL, err := url.Parse(server.URL) - require.NoError(t, err) - - // Create a temp file with metrics data. - tmpFile := t.TempDir() + "/metrics.txt" - err = os.WriteFile(tmpFile, []byte(tc.metricsData), 0o644) - require.NoError(t, err) - - // Call PushMetrics. status := PushMetrics( serverURL, http.DefaultTransport, map[string]string{}, 30*time.Second, + tc.protoMsg, map[string]string{"job": "test"}, tmpFile, ) require.Equal(t, successExitCode, status) - // Verify that the handler received and processed the request. require.True(t, store.called, "Handler should have been called") require.NoError(t, store.lastErr, "Handler should not have returned an error") - - // Verify proper data propagation. require.NotEmpty(t, store.receivedData, "Request should contain data (compression and decompression successful)") require.Contains(t, store.receivedContentType, "application/x-protobuf", "Content-Type should be protobuf") }) @@ -118,3 +142,21 @@ func (m *mockStorage) Store(req *http.Request, _ remoteapi.WriteMessageType) (*r resp.SetStatusCode(http.StatusNoContent) return resp, nil } + +func TestPushMetricsInvalidProtoMsg(t *testing.T) { + _, store, serverURL := setupTestServer(t, remoteapi.MessageTypes{remoteapi.WriteV1MessageType}) + tmpFile := createMetricsFile(t, "test_metric 1\n") + + status := PushMetrics( + serverURL, + http.DefaultTransport, + map[string]string{}, + 30*time.Second, + "blablalba", // Invalid protoMsg. + map[string]string{"job": "test"}, + tmpFile, + ) + + require.Equal(t, failureExitCode, status) + require.False(t, store.called, "Handler should not have been called for invalid protoMsg") +} diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md index ec9e5d62af..69fcd265f2 100644 --- a/docs/command-line/promtool.md +++ b/docs/command-line/promtool.md @@ -422,6 +422,7 @@ Push metrics to a prometheus remote write (for testing purpose only). | --label | Label to attach to metrics. Can be specified multiple times. | `job=promtool` | | --timeout | The time to wait for pushing metrics. | `30s` | | --header | Prometheus remote write header. | | +| --protobuf_message | Protobuf message to use when writing (prometheus.WriteRequest or io.prometheus.write.v2.Request). | `prometheus.WriteRequest` |