diff --git a/documentation/examples/remote_storage/example_write_adapter/server.go b/documentation/examples/remote_storage/example_write_adapter/server.go index c2ec7184e3..0ced162129 100644 --- a/documentation/examples/remote_storage/example_write_adapter/server.go +++ b/documentation/examples/remote_storage/example_write_adapter/server.go @@ -104,7 +104,10 @@ func printV2(req *writev2.Request) error { if err != nil { return err } - m := ts.ToMetadata(req.Symbols) + m, err := ts.ToMetadata(req.Symbols) + if err != nil { + return err + } fmt.Println(l, m) for _, s := range ts.Samples { diff --git a/prompb/io/prometheus/write/v2/codec.go b/prompb/io/prometheus/write/v2/codec.go index ae4d0f635a..034ac84da3 100644 --- a/prompb/io/prometheus/write/v2/codec.go +++ b/prompb/io/prometheus/write/v2/codec.go @@ -14,6 +14,8 @@ package writev2 import ( + "fmt" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" @@ -29,8 +31,8 @@ func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) (labels return desymbolizeLabels(b, m.GetLabelsRefs(), symbols) } -// ToMetadata return model metadata from timeseries' remote metadata. -func (m TimeSeries) ToMetadata(symbols []string) metadata.Metadata { +// ToMetadata returns model metadata from timeseries' remote metadata. +func (m TimeSeries) ToMetadata(symbols []string) (metadata.Metadata, error) { typ := model.MetricTypeUnknown switch m.Metadata.Type { case Metadata_METRIC_TYPE_COUNTER: @@ -48,11 +50,17 @@ func (m TimeSeries) ToMetadata(symbols []string) metadata.Metadata { case Metadata_METRIC_TYPE_STATESET: typ = model.MetricTypeStateset } + if int(m.Metadata.UnitRef) >= len(symbols) { + return metadata.Metadata{}, fmt.Errorf("metadata unit_ref %d outside of symbols table (size %d)", m.Metadata.UnitRef, len(symbols)) + } + if int(m.Metadata.HelpRef) >= len(symbols) { + return metadata.Metadata{}, fmt.Errorf("metadata help_ref %d outside of symbols table (size %d)", m.Metadata.HelpRef, len(symbols)) + } return metadata.Metadata{ Type: typ, Unit: symbols[m.Metadata.UnitRef], Help: symbols[m.Metadata.HelpRef], - } + }, nil } // FromMetadataType transforms a Prometheus metricType into writev2 metricType. diff --git a/prompb/rwcommon/codec_test.go b/prompb/rwcommon/codec_test.go index ee92581f59..3b80580600 100644 --- a/prompb/rwcommon/codec_test.go +++ b/prompb/rwcommon/codec_test.go @@ -130,9 +130,31 @@ func TestToMetadata(t *testing.T) { } { t.Run("", func(t *testing.T) { ts := writev2.TimeSeries{Metadata: tc.input} - require.Equal(t, tc.expected, ts.ToMetadata(sym.Symbols())) + meta, err := ts.ToMetadata(sym.Symbols()) + require.NoError(t, err) + require.Equal(t, tc.expected, meta) }) } + + t.Run("out of bounds unit ref", func(t *testing.T) { + ts := writev2.TimeSeries{Metadata: writev2.Metadata{UnitRef: 999}} + _, err := ts.ToMetadata(sym.Symbols()) + require.Error(t, err) + require.Contains(t, err.Error(), "metadata unit_ref 999 outside of symbols table") + }) + + t.Run("out of bounds help ref", func(t *testing.T) { + ts := writev2.TimeSeries{Metadata: writev2.Metadata{HelpRef: 999}} + _, err := ts.ToMetadata(sym.Symbols()) + require.Error(t, err) + require.Contains(t, err.Error(), "metadata help_ref 999 outside of symbols table") + }) + + t.Run("empty symbols table", func(t *testing.T) { + ts := writev2.TimeSeries{Metadata: writev2.Metadata{}} + _, err := ts.ToMetadata([]string{}) + require.Error(t, err) + }) } func TestToHistogram_Empty(t *testing.T) { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 061f72c92c..74e5a52920 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -1216,7 +1216,10 @@ func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, erro if err != nil { return nil, fmt.Errorf("failed to convert metadata labels: %w", err) } - metadata := rts.ToMetadata(v2Req.Symbols) + metadata, err := rts.ToMetadata(v2Req.Symbols) + if err != nil { + return nil, fmt.Errorf("failed to convert metadata: %w", err) + } metricFamilyName := labels.String() diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 9fdd750692..5e392da335 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -319,7 +319,11 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * continue } - m := ts.ToMetadata(req.Symbols) + m, err := ts.ToMetadata(req.Symbols) + if err != nil { + badRequestErrs = append(badRequestErrs, fmt.Errorf("parsing metadata for series %v: %w", ts.LabelsRefs, err)) + continue + } if h.enableTypeAndUnitLabels && (m.Type != model.MetricTypeUnknown || m.Unit != "") { slb := labels.NewScratchBuilder(ls.Len() + 2) // +2 for __type__ and __unit__ ls.Range(func(l labels.Label) { @@ -446,7 +450,7 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * continue } // TODO(bwplotka): Add strict mode which would trigger rollback of everything if needed. - // For now we keep the previously released flow (just error not debug leve) of dropping them without rollback and 5xx. + // For now we keep the previously released flow (just error not debug level) of dropping them without rollback and 5xx. h.logger.Error("failed to ingest exemplar, emitting error log, but no error for PRW caller", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 2cf1217933..0163a311be 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -422,6 +422,36 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { expectedCode: http.StatusBadRequest, expectedRespBody: "parsing labels for series [1 999]: labelRefs 1 (name) = 999 (value) outside of symbols table (size 18)\n", }, + { + desc: "Partial write; first series with out-of-bounds metadata unit ref", + input: append( + []writev2.TimeSeries{{ + LabelsRefs: []uint32{1, 2}, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_GAUGE, + UnitRef: 999, + }, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }}, + writeV2RequestFixture.Timeseries...), + expectedCode: http.StatusBadRequest, + expectedRespBody: "parsing metadata for series [1 2]: metadata unit_ref 999 outside of symbols table (size 18)\n", + }, + { + desc: "Partial write; first series with out-of-bounds metadata help ref", + input: append( + []writev2.TimeSeries{{ + LabelsRefs: []uint32{1, 2}, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_GAUGE, + HelpRef: 999, + }, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }}, + writeV2RequestFixture.Timeseries...), + expectedCode: http.StatusBadRequest, + expectedRespBody: "parsing metadata for series [1 2]: metadata help_ref 999 outside of symbols table (size 18)\n", + }, { desc: "Partial write; TimeSeries with only exemplars (no samples or histograms)", input: append( @@ -791,7 +821,8 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { } } if tc.appendMetadata && tc.updateMetadataErr == nil { - expectedMeta := ts.ToMetadata(writeV2RequestFixture.Symbols) + expectedMeta, err := ts.ToMetadata(writeV2RequestFixture.Symbols) + require.NoError(t, err) requireEqual(t, mockMetadata{ls, expectedMeta}, appendable.metadata[m]) m++ }