// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package remote import ( "bytes" "errors" "fmt" "io" "math" "net/http" "net/http/httptest" "strconv" "strings" "testing" "time" remoteapi "github.com/prometheus/client_golang/exp/api/remote" "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/compression" "github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/testutil" ) type sample = teststorage.Sample func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { payload, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy") require.NoError(t, err) for _, tc := range []struct { name string reqHeaders map[string]string expectedCode int }{ // Generally Prometheus 1.0 Receiver never checked for existence of the headers, so // we keep things permissive. { name: "correct PRW 1.0 headers", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[remoteapi.WriteV1MessageType], "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusNoContent, }, { name: "missing remote write version", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[remoteapi.WriteV1MessageType], "Content-Encoding": compression.Snappy, }, expectedCode: http.StatusNoContent, }, { name: "no headers", reqHeaders: map[string]string{}, expectedCode: http.StatusNoContent, }, { name: "missing content-type", reqHeaders: map[string]string{ "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusNoContent, }, { name: "missing content-encoding", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[remoteapi.WriteV1MessageType], RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusNoContent, }, { name: "wrong content-type", reqHeaders: map[string]string{ "Content-Type": "yolo", "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusUnsupportedMediaType, }, { name: "wrong content-type2", reqHeaders: map[string]string{ "Content-Type": appProtoContentType + ";proto=yolo", "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusUnsupportedMediaType, }, { name: "not supported content-encoding", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[remoteapi.WriteV1MessageType], "Content-Encoding": "zstd", RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusUnsupportedMediaType, }, } { t.Run(tc.name, func(t *testing.T) { req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) for k, v := range tc.reqHeaders { req.Header.Set(k, v) } handler := NewWriteHandler(promslog.NewNopLogger(), nil, teststorage.NewAppendable(), []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() out, err := io.ReadAll(resp.Body) require.NoError(t, err) _ = resp.Body.Close() require.Equal(t, tc.expectedCode, resp.StatusCode, string(out)) }) } } func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { payload, _, _, err := buildV2WriteRequest(promslog.NewNopLogger(), writeV2RequestFixture.Timeseries, writeV2RequestFixture.Symbols, nil, nil, nil, "snappy") require.NoError(t, err) for _, tc := range []struct { name string reqHeaders map[string]string expectedCode int expectedError string }{ { name: "correct PRW 2.0 headers", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[remoteapi.WriteV2MessageType], "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusNoContent, }, { name: "missing remote write version", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[remoteapi.WriteV2MessageType], "Content-Encoding": compression.Snappy, }, expectedCode: http.StatusNoContent, // We don't check for now. }, { name: "no headers", reqHeaders: map[string]string{}, expectedCode: http.StatusUnsupportedMediaType, expectedError: "prometheus.WriteRequest protobuf message is not accepted by this server; only accepts io.prometheus.write.v2.Request", }, { name: "missing content-type", reqHeaders: map[string]string{ "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, // This only gives 415, because we explicitly only support 2.0. If we supported both // (default) it would be empty message parsed and ok response. // This is perhaps better, than 415 for previously working 1.0 flow with // no content-type. expectedCode: http.StatusUnsupportedMediaType, expectedError: "prometheus.WriteRequest protobuf message is not accepted by this server; only accepts io.prometheus.write.v2.Request", }, { name: "missing content-encoding", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[remoteapi.WriteV2MessageType], RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusNoContent, // Similar to 1.0 impl, we default to Snappy, so it works. }, { name: "wrong content-type", reqHeaders: map[string]string{ "Content-Type": "yolo", "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusUnsupportedMediaType, expectedError: "expected application/x-protobuf as the first (media) part, got yolo content-type", }, { name: "wrong content-type2", reqHeaders: map[string]string{ "Content-Type": appProtoContentType + ";proto=yolo", "Content-Encoding": compression.Snappy, RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusUnsupportedMediaType, expectedError: "got application/x-protobuf;proto=yolo content type; unknown type for remote write protobuf message yolo, supported: prometheus.WriteRequest, io.prometheus.write.v2.Request", }, { name: "not supported content-encoding", reqHeaders: map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[remoteapi.WriteV2MessageType], "Content-Encoding": "zstd", RemoteWriteVersionHeader: RemoteWriteVersion20HeaderValue, }, expectedCode: http.StatusUnsupportedMediaType, expectedError: "zstd encoding (compression) is not accepted by this server; only snappy is acceptable", }, } { t.Run(tc.name, func(t *testing.T) { req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) for k, v := range tc.reqHeaders { req.Header.Set(k, v) } appendable := teststorage.NewAppendable() handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() out, err := io.ReadAll(resp.Body) require.NoError(t, err) _ = resp.Body.Close() require.Equal(t, tc.expectedCode, resp.StatusCode, string(out)) if tc.expectedCode/100 == 2 { return } // Invalid request case - no samples should be written. require.Equal(t, tc.expectedError, strings.TrimSpace(string(out))) require.Empty(t, appendable.ResultSamples()) }) } t.Run("unsupported v1 request", func(t *testing.T) { payload, _, _, err := buildWriteRequest(promslog.NewNopLogger(), writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) for k, v := range map[string]string{ "Content-Type": remoteWriteContentTypeHeaders[remoteapi.WriteV1MessageType], "Content-Encoding": compression.Snappy, } { req.Header.Set(k, v) } appendable := teststorage.NewAppendable() handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() out, err := io.ReadAll(resp.Body) require.NoError(t, err) _ = resp.Body.Close() require.Equal(t, http.StatusUnsupportedMediaType, resp.StatusCode, string(out)) require.Equal(t, "prometheus.WriteRequest protobuf message is not accepted by this server; only accepts io.prometheus.write.v2.Request", strings.TrimSpace(string(out))) require.Empty(t, appendable.ResultSamples()) }) } func TestRemoteWriteHandler_V1Message(t *testing.T) { payload, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) // NOTE: Strictly speaking, even for 1.0 we require headers, but we never verified those // in Prometheus, so keeping like this to not break existing 1.0 clients. appendable := teststorage.NewAppendable() handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() require.Equal(t, http.StatusNoContent, resp.StatusCode) testutil.RequireEqual(t, writeRequestFixtureSamples, appendable.ResultSamples()) } func expectHeaderValue(t testing.TB, expected int, got string) { t.Helper() require.NotEmpty(t, got) i, err := strconv.Atoi(got) require.NoError(t, err) require.Equal(t, expected, i) } func TestRemoteWriteHandler_V2Message(t *testing.T) { // V2 supports partial writes for non-retriable errors, so test them. for _, tc := range []struct { desc string input []writev2.TimeSeries symbols []string // Custom symbol table for tests that need it expectedCode int expectedRespBody string commitErr error appendSampleErr error appendExemplarErr error enableTypeAndUnitLabels bool expectedLabels labels.Labels // For verifying type/unit labels }{ { desc: "All timeseries accepted", input: writeV2RequestFixture.Timeseries, expectedCode: http.StatusNoContent, }, { desc: "Partial write; first series with invalid labels (no metric name)", input: append( // Series with test_metric1="test_metric1" labels. []writev2.TimeSeries{{LabelsRefs: []uint32{2, 2}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, writeV2RequestFixture.Timeseries...), expectedCode: http.StatusBadRequest, expectedRespBody: "invalid metric name or labels, got {test_metric1=\"test_metric1\"}\n", }, { desc: "Partial write; first series with invalid labels (empty metric name)", input: append( // Series with __name__="" labels. []writev2.TimeSeries{{LabelsRefs: []uint32{1, 0}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, writeV2RequestFixture.Timeseries...), expectedCode: http.StatusBadRequest, expectedRespBody: "invalid metric name or labels, got {__name__=\"\"}\n", }, { desc: "Partial write; first series with duplicate labels", input: append( // Series with __name__="test_metric1",test_metric1="test_metric1",test_metric1="test_metric1" labels. []writev2.TimeSeries{{LabelsRefs: []uint32{1, 2, 2, 2, 2, 2}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, writeV2RequestFixture.Timeseries...), expectedCode: http.StatusBadRequest, expectedRespBody: "invalid labels for series, labels {__name__=\"test_metric1\", test_metric1=\"test_metric1\", test_metric1=\"test_metric1\"}, duplicated label test_metric1\n", }, { desc: "Partial write; first series with odd number of label refs", input: append( []writev2.TimeSeries{{LabelsRefs: []uint32{1, 2, 3}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, writeV2RequestFixture.Timeseries...), expectedCode: http.StatusBadRequest, expectedRespBody: "parsing labels for series [1 2 3]: invalid labelRefs length 3\n", }, { desc: "Partial write; first series with out-of-bounds symbol references", input: append( []writev2.TimeSeries{{LabelsRefs: []uint32{1, 999}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, writeV2RequestFixture.Timeseries...), expectedCode: http.StatusBadRequest, expectedRespBody: "parsing labels for series [1 999]: labelRefs 1 (name) = 999 (value) outside of symbols table (size 18)\n", }, { desc: "Partial write; TimeSeries with only exemplars (no samples or histograms)", input: append( // Series with only exemplars, no samples or histograms. []writev2.TimeSeries{{ LabelsRefs: []uint32{1, 2}, Exemplars: []writev2.Exemplar{{ LabelsRefs: []uint32{}, Value: 1.0, Timestamp: 1, }}, }}, writeV2RequestFixture.Timeseries...), expectedCode: http.StatusBadRequest, expectedRespBody: "TimeSeries must contain at least one sample or histogram for series {__name__=\"test_metric1\"}\n", }, // Non retriable errors from various parts. { desc: "Internal sample append error; rollback triggered", input: writeV2RequestFixture.Timeseries, appendSampleErr: errors.New("some sample internal append error"), expectedCode: http.StatusInternalServerError, expectedRespBody: "some sample internal append error\n", }, { desc: "Partial write; skipped exemplar; exemplar storage errs are noop", input: writeV2RequestFixture.Timeseries, appendExemplarErr: errors.New("some exemplar internal append error"), expectedCode: http.StatusNoContent, }, { desc: "Internal commit error; rollback triggered", input: writeV2RequestFixture.Timeseries, commitErr: errors.New("storage error"), expectedCode: http.StatusInternalServerError, expectedRespBody: "storage error\n", }, // Type and unit labels tests { desc: "Type and unit labels enabled with counter and bytes unit", input: func() []writev2.TimeSeries { symbolTable := writev2.NewSymbolTable() labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) unitRef := symbolTable.Symbolize("bytes") return []writev2.TimeSeries{ { LabelsRefs: labelRefs, Metadata: writev2.Metadata{ Type: writev2.Metadata_METRIC_TYPE_COUNTER, UnitRef: unitRef, }, Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, }, } }(), symbols: func() []string { symbolTable := writev2.NewSymbolTable() symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) symbolTable.Symbolize("bytes") return symbolTable.Symbols() }(), expectedCode: http.StatusNoContent, enableTypeAndUnitLabels: true, expectedLabels: labels.FromStrings("__name__", "test_metric", "__type__", "counter", "__unit__", "bytes", "foo", "bar"), }, { desc: "Type and unit labels enabled with gauge and seconds unit", input: func() []writev2.TimeSeries { symbolTable := writev2.NewSymbolTable() labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) unitRef := symbolTable.Symbolize("seconds") return []writev2.TimeSeries{ { LabelsRefs: labelRefs, Metadata: writev2.Metadata{ Type: writev2.Metadata_METRIC_TYPE_GAUGE, UnitRef: unitRef, }, Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, }, } }(), symbols: func() []string { symbolTable := writev2.NewSymbolTable() symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) symbolTable.Symbolize("seconds") return symbolTable.Symbols() }(), expectedCode: http.StatusNoContent, enableTypeAndUnitLabels: true, expectedLabels: labels.FromStrings("__name__", "test_metric", "__type__", "gauge", "__unit__", "seconds", "foo", "bar"), }, { desc: "Type and unit labels disabled - no metadata labels", input: func() []writev2.TimeSeries { symbolTable := writev2.NewSymbolTable() labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) unitRef := symbolTable.Symbolize("bytes") return []writev2.TimeSeries{ { LabelsRefs: labelRefs, Metadata: writev2.Metadata{ Type: writev2.Metadata_METRIC_TYPE_COUNTER, UnitRef: unitRef, }, Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, }, } }(), symbols: func() []string { symbolTable := writev2.NewSymbolTable() symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) symbolTable.Symbolize("bytes") return symbolTable.Symbols() }(), expectedCode: http.StatusNoContent, enableTypeAndUnitLabels: false, expectedLabels: labels.FromStrings("__name__", "test_metric", "foo", "bar"), }, { desc: "Type and unit labels enabled but no metadata", input: func() []writev2.TimeSeries { symbolTable := writev2.NewSymbolTable() labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) return []writev2.TimeSeries{ { LabelsRefs: labelRefs, Metadata: writev2.Metadata{ Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED, UnitRef: 0, }, Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, }, } }(), symbols: func() []string { symbolTable := writev2.NewSymbolTable() symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) return symbolTable.Symbols() }(), expectedCode: http.StatusNoContent, enableTypeAndUnitLabels: true, expectedLabels: labels.FromStrings("__name__", "test_metric", "foo", "bar"), }, { desc: "Type and unit labels enabled with only unit (no type)", input: func() []writev2.TimeSeries { symbolTable := writev2.NewSymbolTable() labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) unitRef := symbolTable.Symbolize("milliseconds") return []writev2.TimeSeries{ { LabelsRefs: labelRefs, Metadata: writev2.Metadata{ Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED, UnitRef: unitRef, }, Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, }, } }(), symbols: func() []string { symbolTable := writev2.NewSymbolTable() symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) symbolTable.Symbolize("milliseconds") return symbolTable.Symbols() }(), expectedCode: http.StatusNoContent, enableTypeAndUnitLabels: true, expectedLabels: labels.FromStrings("__name__", "test_metric", "__unit__", "milliseconds", "foo", "bar"), }, } { t.Run(tc.desc, func(t *testing.T) { symbols := writeV2RequestFixture.Symbols if tc.symbols != nil { symbols = tc.symbols } payload, _, _, err := buildV2WriteRequest(promslog.NewNopLogger(), tc.input, symbols, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[remoteapi.WriteV2MessageType]) req.Header.Set("Content-Encoding", compression.Snappy) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) // Instead of simulating OOO handling in mock (which would cause this test to test mock not prod code), // use test storage, after mock. s := teststorage.New(t) t.Cleanup(func() { require.NoError(t, s.Close()) }) appendable := teststorage.NewAppendable(). Then(s). WithErrs( func(labels.Labels) error { return tc.appendSampleErr }, tc.appendExemplarErr, tc.commitErr, ) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, tc.enableTypeAndUnitLabels, true) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() respBody, err := io.ReadAll(resp.Body) require.NoError(t, err) require.Equal(t, tc.expectedRespBody, string(respBody)) require.Equal(t, tc.expectedCode, resp.StatusCode) if tc.expectedCode == http.StatusInternalServerError { // We don't expect writes for partial writes with retry-able code. expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenSamplesHeader)) expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenHistogramsHeader)) expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader)) require.Empty(t, appendable.ResultSamples()) return } got := appendable.ResultSamples() if !tc.expectedLabels.IsEmpty() { require.Len(t, appendable.ResultSamples(), 1) testutil.RequireEqual(t, tc.expectedLabels, got[0].L) return } // Double check mandatory 2.0 stats. expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader)) expectHeaderValue(t, 4, resp.Header.Get(rw20WrittenHistogramsHeader)) if tc.appendExemplarErr != nil { expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader)) } else { expectHeaderValue(t, 4, resp.Header.Get(rw20WrittenExemplarsHeader)) } testutil.RequireEqual(t, len(writeV2RequestFixtureSamples), len(got)) if tc.appendExemplarErr != nil { for i := range got { got[i].ES = writeV2RequestFixtureSamples[i].ES // Exemplars were not ingested, so we inject them from fixture. } } testutil.RequireEqual(t, writeV2RequestFixtureSamples, got) }) } } // TestRemoteWriteHandler_V2Message_NoDuplicateTypeAndUnitLabels verifies that when // type-and-unit-labels feature is enabled, the receiver correctly handles cases where // __type__ and __unit__ labels are already present in the incoming labels. // Regression test for https://github.com/prometheus/prometheus/issues/17480. func TestRemoteWriteHandler_V2Message_NoDuplicateTypeAndUnitLabels(t *testing.T) { for _, tc := range []struct { desc string labelsToSend labels.Labels metadataToSend writev2.Metadata expectedLabels labels.Labels }{ { desc: "Labels with __type__ and __unit__ should not be duplicated", labelsToSend: labels.FromStrings("__name__", "node_cpu_seconds_total", "__type__", "counter", "__unit__", "seconds", "cpu", "0", "mode", "idle"), metadataToSend: writev2.Metadata{ Type: writev2.Metadata_METRIC_TYPE_COUNTER, }, expectedLabels: labels.FromStrings("__name__", "node_cpu_seconds_total", "__type__", "counter", "__unit__", "seconds", "cpu", "0", "mode", "idle"), }, { desc: "Labels with __type__ only should not be duplicated", labelsToSend: labels.FromStrings("__name__", "test_gauge", "__type__", "gauge", "instance", "localhost"), metadataToSend: writev2.Metadata{ Type: writev2.Metadata_METRIC_TYPE_GAUGE, }, expectedLabels: labels.FromStrings("__name__", "test_gauge", "__type__", "gauge", "instance", "localhost"), }, { desc: "Labels with __unit__ only should not be duplicated when metadata has unit", labelsToSend: labels.FromStrings("__name__", "test_metric", "__unit__", "bytes", "job", "test"), metadataToSend: writev2.Metadata{ Type: writev2.Metadata_METRIC_TYPE_GAUGE, }, expectedLabels: labels.FromStrings("__name__", "test_metric", "__type__", "gauge", "__unit__", "bytes", "job", "test"), }, { desc: "Metadata type and unit override labels", labelsToSend: labels.FromStrings("__name__", "test_metric", "__type__", "counter", "__unit__", "seconds", "job", "test"), metadataToSend: writev2.Metadata{ Type: writev2.Metadata_METRIC_TYPE_GAUGE, }, expectedLabels: labels.FromStrings("__name__", "test_metric", "__type__", "gauge", "__unit__", "seconds", "job", "test"), }, } { t.Run(tc.desc, func(t *testing.T) { symbolTable := writev2.NewSymbolTable() labelRefs := symbolTable.SymbolizeLabels(tc.labelsToSend, nil) var unitRef uint32 if unit := tc.labelsToSend.Get("__unit__"); unit != "" { unitRef = symbolTable.Symbolize(unit) } ts := []writev2.TimeSeries{ { LabelsRefs: labelRefs, Metadata: writev2.Metadata{ Type: tc.metadataToSend.Type, UnitRef: unitRef, }, Samples: []writev2.Sample{{Value: 42.0, Timestamp: 1000}}, }, } payload, _, _, err := buildV2WriteRequest(promslog.NewNopLogger(), ts, symbolTable.Symbols(), nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[remoteapi.WriteV2MessageType]) req.Header.Set("Content-Encoding", compression.Snappy) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := teststorage.NewAppendable() handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, true, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() require.Equal(t, http.StatusNoContent, resp.StatusCode) got := appendable.ResultSamples() require.Len(t, got, 1) receivedLabels := got[0].L duplicateLabel, hasDuplicate := receivedLabels.HasDuplicateLabelNames() require.False(t, hasDuplicate, "Labels should NOT contain duplicates, but found duplicate label: %s\nReceived labels: %s", duplicateLabel, receivedLabels.String()) require.Equal(t, tc.expectedLabels.String(), receivedLabels.String(), "Labels should match expected") if tc.expectedLabels.Get("__type__") != "" { require.NotEmpty(t, receivedLabels.Get("__type__"), "__type__ should be present in labels") } }) } } // NOTE: V2 Message is tested in TestRemoteWriteHandler_V2Message. func TestOutOfOrderSample_V1Message(t *testing.T) { for _, tc := range []struct { Name string Timestamp int64 }{ { Name: "historic", Timestamp: 0, }, { Name: "future", Timestamp: math.MaxInt64, }, } { t.Run(tc.Name, func(t *testing.T) { payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{ { Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Samples: []prompb.Sample{{Value: 1, Timestamp: timestamp.FromTime(time.Now())}}, }, { Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Samples: []prompb.Sample{{Value: 1, Timestamp: tc.Timestamp}}, }, }, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) // Instead of simulating OOO handling in mock (which would cause this test to test mock not prod code), // use test storage. s := teststorage.New(t) t.Cleanup(func() { require.NoError(t, s.Close()) }) handler := NewWriteHandler(promslog.NewNopLogger(), nil, s, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() require.Equal(t, http.StatusBadRequest, resp.StatusCode) }) } } // This test case currently aims to verify that the WriteHandler endpoint // don't fail on exemplar ingestion errors since the exemplar storage is // still experimental. // NOTE: V2 Message is tested in TestRemoteWriteHandler_V2Message. func TestOutOfOrderExemplar_V1Message(t *testing.T) { payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 1}}, }}, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) appendable := teststorage.NewAppendable().WithErrs(nil, errors.New("ingestion exemplar error"), nil) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental. require.Equal(t, http.StatusNoContent, resp.StatusCode) } // NOTE: V2 Message is tested in TestRemoteWriteHandler_V2Message. func TestOutOfOrderHistogram_V1Message(t *testing.T) { for _, tc := range []struct { Name string Timestamp int64 }{ { Name: "historic", Timestamp: 0, }, { Name: "future", Timestamp: math.MaxInt64, }, } { t.Run(tc.Name, func(t *testing.T) { payload, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{ { Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Histograms: []prompb.Histogram{ prompb.FromIntHistogram(timestamp.FromTime(time.Now()), &testHistogram), prompb.FromFloatHistogram(tc.Timestamp, testHistogram.ToFloat(nil)), }, }, }, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) // Instead of simulating OOO handling in mock (which would cause this test to test mock not prod code), // use test storage. s := teststorage.New(t) t.Cleanup(func() { require.NoError(t, s.Close()) }) handler := NewWriteHandler(promslog.NewNopLogger(), nil, s, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() require.Equal(t, http.StatusBadRequest, resp.StatusCode) }) } } func BenchmarkRemoteWriteHandler(b *testing.B) { labelStrings := []string{ "__name__", "test_metric", "test_label_name", "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte", } v1Labels := prompb.FromLabels(labels.FromStrings(labelStrings...), nil) testCases := []struct { name string payloadFunc func() ([]byte, error) protoFormat remoteapi.WriteMessageType }{ { name: "V1 Write", payloadFunc: func() ([]byte, error) { buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: v1Labels, Histograms: []prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram)}, }}, nil, nil, nil, nil, "snappy") return buf, err }, protoFormat: remoteapi.WriteV1MessageType, }, { name: "V2 Write", payloadFunc: func() ([]byte, error) { buf, _, _, err := buildV2WriteRequest(promslog.NewNopLogger(), []writev2.TimeSeries{{ LabelsRefs: []uint32{0, 1, 2, 3}, Histograms: []writev2.Histogram{writev2.FromIntHistogram(0, &testHistogram)}, }}, labelStrings, nil, nil, nil, "snappy") return buf, err }, protoFormat: remoteapi.WriteV2MessageType, }, } for _, tc := range testCases { b.Run(tc.name, func(b *testing.B) { handler := NewWriteHandler(promslog.NewNopLogger(), nil, teststorage.NewAppendable(), []remoteapi.WriteMessageType{tc.protoFormat}, false, false, false) b.ResetTimer() for b.Loop() { b.StopTimer() buf, err := tc.payloadFunc() require.NoError(b, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(buf)) require.NoError(b, err) b.StartTimer() recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) } }) } } // V2Message commitErr case is tested in TestRemoteWriteHandler_V2Message. func TestCommitErr_V1Message(t *testing.T) { payload, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) appendable := teststorage.NewAppendable().WithErrs(nil, nil, errors.New("commit error")) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() body, err := io.ReadAll(resp.Body) require.NoError(t, err) require.Equal(t, http.StatusInternalServerError, resp.StatusCode) require.Equal(t, "commit error\n", string(body)) } // Regression test for https://github.com/prometheus/prometheus/issues/17206. func TestHistogramValidationErrorHandling(t *testing.T) { testCases := []struct { desc string hist histogram.Histogram expected string }{ { desc: "count mismatch", hist: histogram.Histogram{ Schema: 2, ZeroThreshold: 1e-128, ZeroCount: 1, Count: 10, Sum: 20, PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, PositiveBuckets: []int64{2}, NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, NegativeBuckets: []int64{3}, // Total: 1 (zero) + 2 (positive) + 3 (negative) = 6, but Count = 10 }, expected: "histogram's observation count should equal", }, { desc: "custom buckets zero count", hist: histogram.Histogram{ Schema: histogram.CustomBucketsSchema, Count: 10, Sum: 20, ZeroCount: 1, // Invalid: custom buckets must have zero count of 0 PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, PositiveBuckets: []int64{10}, CustomValues: []float64{1.0}, }, expected: "custom buckets: must have zero count of 0", }, } for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { protoName := "V1" if protoMsg == remoteapi.WriteV2MessageType { protoName = "V2" } for _, tc := range testCases { t.Run(fmt.Sprintf("desc=%s/proto=%s", tc.desc, protoName), func(t *testing.T) { s := teststorage.New(t) t.Cleanup(func() { require.NoError(t, s.Close()) }) handler := NewWriteHandler(promslog.NewNopLogger(), nil, s.Head(), []remoteapi.WriteMessageType{protoMsg}, false, false, false) recorder := httptest.NewRecorder() var ( buf []byte err error ) if protoMsg == remoteapi.WriteV1MessageType { ts := []prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test"}}, Histograms: []prompb.Histogram{prompb.FromIntHistogram(1, &tc.hist)}, }} buf, _, _, err = buildWriteRequest(nil, ts, nil, nil, nil, nil, "snappy") } else { st := writev2.NewSymbolTable() ts := []writev2.TimeSeries{{ LabelsRefs: st.SymbolizeLabels(labels.FromStrings("__name__", "test"), nil), Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &tc.hist)}, }} buf, _, _, err = buildV2WriteRequest(promslog.NewNopLogger(), ts, st.Symbols(), nil, nil, nil, "snappy") } require.NoError(t, err) req := httptest.NewRequest(http.MethodPost, "/api/v1/write", bytes.NewReader(buf)) req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[protoMsg]) req.Header.Set("Content-Encoding", "snappy") handler.ServeHTTP(recorder, req) require.Equal(t, http.StatusBadRequest, recorder.Code) require.Contains(t, recorder.Body.String(), tc.expected) }) } } } func BenchmarkRemoteWriteOOOSamples(b *testing.B) { b.Skip("Not a valid benchmark (does not count to b.N)") dir := b.TempDir() opts := tsdb.DefaultOptions() opts.OutOfOrderCapMax = 30 opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds() db, err := tsdb.Open(dir, nil, nil, opts, nil) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, db.Close()) }) // TODO: test with other proto format(s) handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") require.NoError(b, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(buf)) require.NoError(b, err) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) require.Equal(b, http.StatusNoContent, recorder.Code) require.Equal(b, uint64(1000), db.Head().NumSeries()) var bufRequests [][]byte for i := range 100 { buf, _, _, err = buildWriteRequest(nil, genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") require.NoError(b, err) bufRequests = append(bufRequests, buf) } b.ResetTimer() for i := range 100 { req, err = http.NewRequest("", "", bytes.NewReader(bufRequests[i])) require.NoError(b, err) recorder = httptest.NewRecorder() handler.ServeHTTP(recorder, req) require.Equal(b, http.StatusNoContent, recorder.Code) require.Equal(b, uint64(1000), db.Head().NumSeries()) } } func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries { var series []prompb.TimeSeries for i := range numSeries { s := prompb.TimeSeries{ Labels: []prompb.Label{{Name: "__name__", Value: fmt.Sprintf("test_metric_%d", i)}}, Samples: []prompb.Sample{{Value: float64(i), Timestamp: ts}}, } series = append(series, s) } return series } var ( highSchemaHistogram = &histogram.Histogram{ Schema: 10, PositiveSpans: []histogram.Span{ { Offset: -1, Length: 2, }, }, PositiveBuckets: []int64{1, 2}, NegativeSpans: []histogram.Span{ { Offset: 0, Length: 1, }, }, NegativeBuckets: []int64{1}, } reducedSchemaHistogram = &histogram.Histogram{ Schema: 8, PositiveSpans: []histogram.Span{ { Offset: 0, Length: 1, }, }, PositiveBuckets: []int64{4}, NegativeSpans: []histogram.Span{ { Offset: 0, Length: 1, }, }, NegativeBuckets: []int64{1}, } ) func TestHistogramsReduction(t *testing.T) { for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { t.Run(string(protoMsg), func(t *testing.T) { appendable := teststorage.NewAppendable() handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{protoMsg}, false, false, false) var ( err error payload []byte ) if protoMsg == remoteapi.WriteV1MessageType { payload, _, _, err = buildWriteRequest(nil, []prompb.TimeSeries{ { Labels: []prompb.Label{{Name: "__name__", Value: "test_metric1"}}, Histograms: []prompb.Histogram{prompb.FromIntHistogram(1, highSchemaHistogram)}, }, { Labels: []prompb.Label{{Name: "__name__", Value: "test_metric2"}}, Histograms: []prompb.Histogram{prompb.FromFloatHistogram(2, highSchemaHistogram.ToFloat(nil))}, }, }, nil, nil, nil, nil, "snappy") } else { payload, _, _, err = buildV2WriteRequest(promslog.NewNopLogger(), []writev2.TimeSeries{ { LabelsRefs: []uint32{0, 1}, Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, highSchemaHistogram)}, }, { LabelsRefs: []uint32{0, 2}, Histograms: []writev2.Histogram{writev2.FromFloatHistogram(2, highSchemaHistogram.ToFloat(nil))}, }, }, []string{"__name__", "test_metric1", "test_metric2"}, nil, nil, nil, "snappy") } require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, "", bytes.NewReader(payload)) require.NoError(t, err) req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[protoMsg]) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) resp := recorder.Result() body, err := io.ReadAll(resp.Body) require.NoError(t, err) require.Equal(t, http.StatusNoContent, resp.StatusCode) require.Empty(t, body) got := appendable.ResultSamples() require.Len(t, got, 2) require.Equal(t, int64(1), got[0].T) require.Equal(t, reducedSchemaHistogram, got[0].H) require.Equal(t, int64(2), got[1].T) require.Equal(t, reducedSchemaHistogram.ToFloat(nil), got[1].FH) }) } } // Regression test for https://github.com/prometheus/prometheus/issues/17659 func TestRemoteWriteHandler_ResponseStats(t *testing.T) { payloadV1, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy") require.NoError(t, err) payloadV2, _, _, err := buildV2WriteRequest(nil, writeV2RequestFixture.Timeseries, writeV2RequestFixture.Symbols, nil, nil, nil, "snappy") require.NoError(t, err) for _, tt := range []struct { msgType remoteapi.WriteMessageType payload []byte forceInjectHeaders bool expectHeaders bool }{ { msgType: remoteapi.WriteV1MessageType, payload: payloadV1, }, { msgType: remoteapi.WriteV1MessageType, payload: payloadV1, forceInjectHeaders: true, expectHeaders: true, }, { msgType: remoteapi.WriteV2MessageType, payload: payloadV2, expectHeaders: true, }, } { t.Run(fmt.Sprintf("msg=%v/force-inject-headers=%v", tt.msgType, tt.forceInjectHeaders), func(t *testing.T) { // Setup server side. appendable := teststorage.NewAppendable() handler := NewWriteHandler( promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType}, false, false, false, ) if tt.forceInjectHeaders { base := handler handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Inject response header. This simulates PRWv1 server that uses PRWv2 response headers // for confirmation of samples. This is not against spec and we support it. w.Header().Set(rw20WrittenSamplesHeader, "2") w.Header().Set(rw20WrittenHistogramsHeader, "4") base.ServeHTTP(w, r) }) } srv := httptest.NewServer(handler) // Send message and do the parse response flow. c := &Client{Client: srv.Client(), urlString: srv.URL, timeout: 5 * time.Minute, writeProtoMsg: tt.msgType} stats, err := c.Store(t.Context(), tt.payload, 0) require.NoError(t, err) if tt.expectHeaders { require.True(t, stats.Confirmed) require.Equal(t, len(appendable.ResultSamples()), stats.AllSamples()) } else { require.False(t, stats.Confirmed) } }) } }