mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-09 08:32:26 -04:00
promtool: Add Remote Write 2.0 support to push metrics command (#17417)
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
* add feature flag for remote write v2 Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com> * change from number to protobuf_message Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com> * fix test Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com> * fix name Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com> * run make cli-documentation Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com> * fix help Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com> * run make cli-documentation Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com> --------- Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>
This commit is contained in:
parent
c8f1de18a7
commit
9f93c2d2e1
4 changed files with 85 additions and 30 deletions
|
|
@ -222,6 +222,7 @@ func main() {
|
|||
pushMetricsLabels := pushMetricsCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times.").Default("job=promtool").StringMap()
|
||||
pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration()
|
||||
pushMetricsHeaders := pushMetricsCmd.Flag("header", "Prometheus remote write header.").StringMap()
|
||||
pushMetricsProtoMsg := pushMetricsCmd.Flag("protobuf_message", "Protobuf message to use when writing (prometheus.WriteRequest or io.prometheus.write.v2.Request).").Default("prometheus.WriteRequest").String()
|
||||
|
||||
testCmd := app.Command("test", "Unit testing.")
|
||||
junitOutFile := testCmd.Flag("junit", "File path to store JUnit XML test results.").OpenFile(os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
|
||||
|
|
@ -377,7 +378,7 @@ func main() {
|
|||
os.Exit(CheckMetrics(*checkMetricsExtended))
|
||||
|
||||
case pushMetricsCmd.FullCommand():
|
||||
os.Exit(PushMetrics(remoteWriteURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *pushMetricsLabels, *metricFiles...))
|
||||
os.Exit(PushMetrics(remoteWriteURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *pushMetricsProtoMsg, *pushMetricsLabels, *metricFiles...))
|
||||
|
||||
case queryInstantCmd.FullCommand():
|
||||
os.Exit(QueryInstant(serverURL, httpRoundTripper, *queryInstantExpr, *queryInstantTime, p))
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ import (
|
|||
)
|
||||
|
||||
// PushMetrics to a prometheus remote write (for testing purpose only).
|
||||
func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, labels map[string]string, files ...string) int {
|
||||
func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, protoMsg string, labels map[string]string, files ...string) int {
|
||||
addressURL, err := url.Parse(url.String())
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
|
|
@ -52,6 +52,18 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
|
|||
return failureExitCode
|
||||
}
|
||||
|
||||
// Determine message type based on protobuf_message flag.
|
||||
var messageType remoteapi.WriteMessageType
|
||||
switch protoMsg {
|
||||
case "io.prometheus.write.v2.Request":
|
||||
messageType = remoteapi.WriteV2MessageType
|
||||
case "prometheus.WriteRequest":
|
||||
messageType = remoteapi.WriteV1MessageType
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, " FAILED: invalid protobuf message %q, must be prometheus.WriteRequest or io.prometheus.write.v2.Request\n", protoMsg)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
var data []byte
|
||||
var failed bool
|
||||
|
||||
|
|
@ -62,7 +74,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
|
|||
return failureExitCode
|
||||
}
|
||||
fmt.Printf("Parsing standard input\n")
|
||||
if parseAndPushMetrics(writeAPI, data, labels) {
|
||||
if parseAndPushMetrics(writeAPI, messageType, data, labels) {
|
||||
fmt.Printf(" SUCCESS: metrics pushed to remote write.\n")
|
||||
return successExitCode
|
||||
}
|
||||
|
|
@ -78,7 +90,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
|
|||
}
|
||||
|
||||
fmt.Printf("Parsing metrics file %s\n", file)
|
||||
if parseAndPushMetrics(writeAPI, data, labels) {
|
||||
if parseAndPushMetrics(writeAPI, messageType, data, labels) {
|
||||
fmt.Printf(" SUCCESS: metrics file %s pushed to remote write.\n", file)
|
||||
continue
|
||||
}
|
||||
|
|
@ -92,7 +104,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
|
|||
return successExitCode
|
||||
}
|
||||
|
||||
func parseAndPushMetrics(writeAPI *remoteapi.API, data []byte, labels map[string]string) bool {
|
||||
func parseAndPushMetrics(writeAPI *remoteapi.API, messageType remoteapi.WriteMessageType, data []byte, labels map[string]string) bool {
|
||||
metricsData, err := fmtutil.MetricTextToWriteRequest(bytes.NewReader(data), labels)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, " FAILED:", err)
|
||||
|
|
@ -100,8 +112,7 @@ func parseAndPushMetrics(writeAPI *remoteapi.API, data []byte, labels map[string
|
|||
}
|
||||
|
||||
// Use remoteapi.Write which handles marshaling and compression internally.
|
||||
// TODO: Add feature flags to support V2.
|
||||
_, err = writeAPI.Write(context.Background(), remoteapi.WriteV1MessageType, metricsData)
|
||||
_, err = writeAPI.Write(context.Background(), messageType, metricsData)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, " FAILED:", err)
|
||||
return false
|
||||
|
|
|
|||
|
|
@ -26,64 +26,88 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// setupTestServer creates a test server with a mock storage for remote write testing.
|
||||
func setupTestServer(t *testing.T, supportedTypes remoteapi.MessageTypes) (*httptest.Server, *mockStorage, *url.URL) {
|
||||
t.Helper()
|
||||
|
||||
store := &mockStorage{}
|
||||
handler := remoteapi.NewWriteHandler(store, supportedTypes)
|
||||
server := httptest.NewServer(handler)
|
||||
t.Cleanup(server.Close)
|
||||
|
||||
serverURL, err := url.Parse(server.URL)
|
||||
require.NoError(t, err)
|
||||
|
||||
return server, store, serverURL
|
||||
}
|
||||
|
||||
// createMetricsFile creates a temporary file with the given metrics data.
|
||||
func createMetricsFile(t *testing.T, metricsData string) string {
|
||||
t.Helper()
|
||||
|
||||
tmpFile := t.TempDir() + "/metrics.txt"
|
||||
err := os.WriteFile(tmpFile, []byte(metricsData), 0o644)
|
||||
require.NoError(t, err)
|
||||
|
||||
return tmpFile
|
||||
}
|
||||
|
||||
func TestPushMetrics(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
metricsData string
|
||||
protoMsg string
|
||||
serverTypes remoteapi.MessageTypes
|
||||
}{
|
||||
{
|
||||
name: "successful push with gauge metrics",
|
||||
name: "successful push with gauge metrics (V1)",
|
||||
metricsData: `# HELP test_metric A test metric
|
||||
# TYPE test_metric gauge
|
||||
test_metric{label="value1"} 42.0
|
||||
test_metric{label="value2"} 43.0
|
||||
`,
|
||||
protoMsg: "prometheus.WriteRequest",
|
||||
serverTypes: remoteapi.MessageTypes{remoteapi.WriteV1MessageType},
|
||||
},
|
||||
{
|
||||
name: "successful push with counter metrics",
|
||||
name: "successful push with counter metrics (V1)",
|
||||
metricsData: `# HELP test_counter A test counter
|
||||
# TYPE test_counter counter
|
||||
test_counter 100
|
||||
`,
|
||||
protoMsg: "prometheus.WriteRequest",
|
||||
serverTypes: remoteapi.MessageTypes{remoteapi.WriteV1MessageType},
|
||||
},
|
||||
{
|
||||
name: "successful push with gauge metrics (V2)",
|
||||
metricsData: `# HELP test_metric A test metric
|
||||
# TYPE test_metric gauge
|
||||
test_metric{label="value1"} 42.0
|
||||
test_metric{label="value2"} 43.0
|
||||
`,
|
||||
protoMsg: "io.prometheus.write.v2.Request",
|
||||
serverTypes: remoteapi.MessageTypes{remoteapi.WriteV2MessageType},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Create test server using client_golang's remote write handler.
|
||||
store := &mockStorage{}
|
||||
handler := remoteapi.NewWriteHandler(
|
||||
store,
|
||||
remoteapi.MessageTypes{remoteapi.WriteV1MessageType},
|
||||
)
|
||||
_, store, serverURL := setupTestServer(t, tc.serverTypes)
|
||||
tmpFile := createMetricsFile(t, tc.metricsData)
|
||||
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
serverURL, err := url.Parse(server.URL)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a temp file with metrics data.
|
||||
tmpFile := t.TempDir() + "/metrics.txt"
|
||||
err = os.WriteFile(tmpFile, []byte(tc.metricsData), 0o644)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Call PushMetrics.
|
||||
status := PushMetrics(
|
||||
serverURL,
|
||||
http.DefaultTransport,
|
||||
map[string]string{},
|
||||
30*time.Second,
|
||||
tc.protoMsg,
|
||||
map[string]string{"job": "test"},
|
||||
tmpFile,
|
||||
)
|
||||
|
||||
require.Equal(t, successExitCode, status)
|
||||
// Verify that the handler received and processed the request.
|
||||
require.True(t, store.called, "Handler should have been called")
|
||||
require.NoError(t, store.lastErr, "Handler should not have returned an error")
|
||||
|
||||
// Verify proper data propagation.
|
||||
require.NotEmpty(t, store.receivedData, "Request should contain data (compression and decompression successful)")
|
||||
require.Contains(t, store.receivedContentType, "application/x-protobuf", "Content-Type should be protobuf")
|
||||
})
|
||||
|
|
@ -118,3 +142,21 @@ func (m *mockStorage) Store(req *http.Request, _ remoteapi.WriteMessageType) (*r
|
|||
resp.SetStatusCode(http.StatusNoContent)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func TestPushMetricsInvalidProtoMsg(t *testing.T) {
|
||||
_, store, serverURL := setupTestServer(t, remoteapi.MessageTypes{remoteapi.WriteV1MessageType})
|
||||
tmpFile := createMetricsFile(t, "test_metric 1\n")
|
||||
|
||||
status := PushMetrics(
|
||||
serverURL,
|
||||
http.DefaultTransport,
|
||||
map[string]string{},
|
||||
30*time.Second,
|
||||
"blablalba", // Invalid protoMsg.
|
||||
map[string]string{"job": "test"},
|
||||
tmpFile,
|
||||
)
|
||||
|
||||
require.Equal(t, failureExitCode, status)
|
||||
require.False(t, store.called, "Handler should not have been called for invalid protoMsg")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -422,6 +422,7 @@ Push metrics to a prometheus remote write (for testing purpose only).
|
|||
| <code class="text-nowrap">--label</code> | Label to attach to metrics. Can be specified multiple times. | `job=promtool` |
|
||||
| <code class="text-nowrap">--timeout</code> | The time to wait for pushing metrics. | `30s` |
|
||||
| <code class="text-nowrap">--header</code> | Prometheus remote write header. | |
|
||||
| <code class="text-nowrap">--protobuf_message</code> | Protobuf message to use when writing (prometheus.WriteRequest or io.prometheus.write.v2.Request). | `prometheus.WriteRequest` |
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue