diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go
index 0c64b19736..7364a5b4b5 100644
--- a/cmd/promtool/main.go
+++ b/cmd/promtool/main.go
@@ -222,6 +222,7 @@ func main() {
pushMetricsLabels := pushMetricsCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times.").Default("job=promtool").StringMap()
pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration()
pushMetricsHeaders := pushMetricsCmd.Flag("header", "Prometheus remote write header.").StringMap()
+ pushMetricsProtoMsg := pushMetricsCmd.Flag("protobuf_message", "Protobuf message to use when writing (prometheus.WriteRequest or io.prometheus.write.v2.Request).").Default("prometheus.WriteRequest").String()
testCmd := app.Command("test", "Unit testing.")
junitOutFile := testCmd.Flag("junit", "File path to store JUnit XML test results.").OpenFile(os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
@@ -377,7 +378,7 @@ func main() {
os.Exit(CheckMetrics(*checkMetricsExtended))
case pushMetricsCmd.FullCommand():
- os.Exit(PushMetrics(remoteWriteURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *pushMetricsLabels, *metricFiles...))
+ os.Exit(PushMetrics(remoteWriteURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *pushMetricsProtoMsg, *pushMetricsLabels, *metricFiles...))
case queryInstantCmd.FullCommand():
os.Exit(QueryInstant(serverURL, httpRoundTripper, *queryInstantExpr, *queryInstantTime, p))
diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go
index ee9c03d61c..c21ef15fd8 100644
--- a/cmd/promtool/metrics.go
+++ b/cmd/promtool/metrics.go
@@ -29,7 +29,7 @@ import (
)
// PushMetrics to a prometheus remote write (for testing purpose only).
-func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, labels map[string]string, files ...string) int {
+func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, protoMsg string, labels map[string]string, files ...string) int {
addressURL, err := url.Parse(url.String())
if err != nil {
fmt.Fprintln(os.Stderr, err)
@@ -52,6 +52,18 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
return failureExitCode
}
+ // Determine message type based on protobuf_message flag.
+ var messageType remoteapi.WriteMessageType
+ switch protoMsg {
+ case "io.prometheus.write.v2.Request":
+ messageType = remoteapi.WriteV2MessageType
+ case "prometheus.WriteRequest":
+ messageType = remoteapi.WriteV1MessageType
+ default:
+ fmt.Fprintf(os.Stderr, " FAILED: invalid protobuf message %q, must be prometheus.WriteRequest or io.prometheus.write.v2.Request\n", protoMsg)
+ return failureExitCode
+ }
+
var data []byte
var failed bool
@@ -62,7 +74,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
return failureExitCode
}
fmt.Printf("Parsing standard input\n")
- if parseAndPushMetrics(writeAPI, data, labels) {
+ if parseAndPushMetrics(writeAPI, messageType, data, labels) {
fmt.Printf(" SUCCESS: metrics pushed to remote write.\n")
return successExitCode
}
@@ -78,7 +90,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
}
fmt.Printf("Parsing metrics file %s\n", file)
- if parseAndPushMetrics(writeAPI, data, labels) {
+ if parseAndPushMetrics(writeAPI, messageType, data, labels) {
fmt.Printf(" SUCCESS: metrics file %s pushed to remote write.\n", file)
continue
}
@@ -92,7 +104,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
return successExitCode
}
-func parseAndPushMetrics(writeAPI *remoteapi.API, data []byte, labels map[string]string) bool {
+func parseAndPushMetrics(writeAPI *remoteapi.API, messageType remoteapi.WriteMessageType, data []byte, labels map[string]string) bool {
metricsData, err := fmtutil.MetricTextToWriteRequest(bytes.NewReader(data), labels)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
@@ -100,8 +112,7 @@ func parseAndPushMetrics(writeAPI *remoteapi.API, data []byte, labels map[string
}
// Use remoteapi.Write which handles marshaling and compression internally.
- // TODO: Add feature flags to support V2.
- _, err = writeAPI.Write(context.Background(), remoteapi.WriteV1MessageType, metricsData)
+ _, err = writeAPI.Write(context.Background(), messageType, metricsData)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false
diff --git a/cmd/promtool/metrics_test.go b/cmd/promtool/metrics_test.go
index c745869a31..938f1cadfd 100644
--- a/cmd/promtool/metrics_test.go
+++ b/cmd/promtool/metrics_test.go
@@ -26,64 +26,88 @@ import (
"github.com/stretchr/testify/require"
)
+// setupTestServer creates a test server with a mock storage for remote write testing.
+func setupTestServer(t *testing.T, supportedTypes remoteapi.MessageTypes) (*httptest.Server, *mockStorage, *url.URL) {
+ t.Helper()
+
+ store := &mockStorage{}
+ handler := remoteapi.NewWriteHandler(store, supportedTypes)
+ server := httptest.NewServer(handler)
+ t.Cleanup(server.Close)
+
+ serverURL, err := url.Parse(server.URL)
+ require.NoError(t, err)
+
+ return server, store, serverURL
+}
+
+// createMetricsFile creates a temporary file with the given metrics data.
+func createMetricsFile(t *testing.T, metricsData string) string {
+ t.Helper()
+
+ tmpFile := t.TempDir() + "/metrics.txt"
+ err := os.WriteFile(tmpFile, []byte(metricsData), 0o644)
+ require.NoError(t, err)
+
+ return tmpFile
+}
+
func TestPushMetrics(t *testing.T) {
tests := []struct {
name string
metricsData string
+ protoMsg string
+ serverTypes remoteapi.MessageTypes
}{
{
- name: "successful push with gauge metrics",
+ name: "successful push with gauge metrics (V1)",
metricsData: `# HELP test_metric A test metric
# TYPE test_metric gauge
test_metric{label="value1"} 42.0
test_metric{label="value2"} 43.0
`,
+ protoMsg: "prometheus.WriteRequest",
+ serverTypes: remoteapi.MessageTypes{remoteapi.WriteV1MessageType},
},
{
- name: "successful push with counter metrics",
+ name: "successful push with counter metrics (V1)",
metricsData: `# HELP test_counter A test counter
# TYPE test_counter counter
test_counter 100
`,
+ protoMsg: "prometheus.WriteRequest",
+ serverTypes: remoteapi.MessageTypes{remoteapi.WriteV1MessageType},
+ },
+ {
+ name: "successful push with gauge metrics (V2)",
+ metricsData: `# HELP test_metric A test metric
+# TYPE test_metric gauge
+test_metric{label="value1"} 42.0
+test_metric{label="value2"} 43.0
+`,
+ protoMsg: "io.prometheus.write.v2.Request",
+ serverTypes: remoteapi.MessageTypes{remoteapi.WriteV2MessageType},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
- // Create test server using client_golang's remote write handler.
- store := &mockStorage{}
- handler := remoteapi.NewWriteHandler(
- store,
- remoteapi.MessageTypes{remoteapi.WriteV1MessageType},
- )
+ _, store, serverURL := setupTestServer(t, tc.serverTypes)
+ tmpFile := createMetricsFile(t, tc.metricsData)
- server := httptest.NewServer(handler)
- defer server.Close()
-
- serverURL, err := url.Parse(server.URL)
- require.NoError(t, err)
-
- // Create a temp file with metrics data.
- tmpFile := t.TempDir() + "/metrics.txt"
- err = os.WriteFile(tmpFile, []byte(tc.metricsData), 0o644)
- require.NoError(t, err)
-
- // Call PushMetrics.
status := PushMetrics(
serverURL,
http.DefaultTransport,
map[string]string{},
30*time.Second,
+ tc.protoMsg,
map[string]string{"job": "test"},
tmpFile,
)
require.Equal(t, successExitCode, status)
- // Verify that the handler received and processed the request.
require.True(t, store.called, "Handler should have been called")
require.NoError(t, store.lastErr, "Handler should not have returned an error")
-
- // Verify proper data propagation.
require.NotEmpty(t, store.receivedData, "Request should contain data (compression and decompression successful)")
require.Contains(t, store.receivedContentType, "application/x-protobuf", "Content-Type should be protobuf")
})
@@ -118,3 +142,21 @@ func (m *mockStorage) Store(req *http.Request, _ remoteapi.WriteMessageType) (*r
resp.SetStatusCode(http.StatusNoContent)
return resp, nil
}
+
+func TestPushMetricsInvalidProtoMsg(t *testing.T) {
+ _, store, serverURL := setupTestServer(t, remoteapi.MessageTypes{remoteapi.WriteV1MessageType})
+ tmpFile := createMetricsFile(t, "test_metric 1\n")
+
+ status := PushMetrics(
+ serverURL,
+ http.DefaultTransport,
+ map[string]string{},
+ 30*time.Second,
+ "blablalba", // Invalid protoMsg.
+ map[string]string{"job": "test"},
+ tmpFile,
+ )
+
+ require.Equal(t, failureExitCode, status)
+ require.False(t, store.called, "Handler should not have been called for invalid protoMsg")
+}
diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md
index ec9e5d62af..69fcd265f2 100644
--- a/docs/command-line/promtool.md
+++ b/docs/command-line/promtool.md
@@ -422,6 +422,7 @@ Push metrics to a prometheus remote write (for testing purpose only).
| --label | Label to attach to metrics. Can be specified multiple times. | `job=promtool` |
| --timeout | The time to wait for pushing metrics. | `30s` |
| --header | Prometheus remote write header. | |
+| --protobuf_message | Protobuf message to use when writing (prometheus.WriteRequest or io.prometheus.write.v2.Request). | `prometheus.WriteRequest` |