diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 11501aa15f..3a75a64940 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -291,6 +291,9 @@ func TestSampleDelivery(t *testing.T) { c.expectExemplars(exemplars[:len(exemplars)/2], series) c.expectHistograms(histograms[:len(histograms)/2], series) c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series) + if tc.protoMsg == config.RemoteWriteProtoMsgV2 && len(metadata) > 0 { + c.expectMetadataForBatch(metadata, series, samples[:len(samples)/2], exemplars[:len(exemplars)/2], histograms[:len(histograms)/2], floatHistograms[:len(floatHistograms)/2]) + } qm.Append(samples[:len(samples)/2]) qm.AppendExemplars(exemplars[:len(exemplars)/2]) qm.AppendHistograms(histograms[:len(histograms)/2]) @@ -961,6 +964,7 @@ type TestWriteClient struct { expectedHistograms map[string][]prompb.Histogram expectedFloatHistograms map[string][]prompb.Histogram receivedMetadata map[string][]prompb.MetricMetadata + expectedMetadata map[string][]prompb.MetricMetadata writesReceived int mtx sync.Mutex protoMsg config.RemoteWriteProtoMsg @@ -979,6 +983,7 @@ func NewTestWriteClient(protoMsg config.RemoteWriteProtoMsg) *TestWriteClient { receivedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{}, receivedMetadata: map[string][]prompb.MetricMetadata{}, + expectedMetadata: map[string][]prompb.MetricMetadata{}, protoMsg: protoMsg, storeWait: 0, returnError: nil, @@ -1051,6 +1056,43 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa } } +func (c *TestWriteClient) expectMetadataForBatch(metadata []record.RefMetadata, series []record.RefSeries, samples []record.RefSample, exemplars []record.RefExemplar, histograms []record.RefHistogramSample, floatHistograms []record.RefFloatHistogramSample) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.expectedMetadata = map[string][]prompb.MetricMetadata{} + c.receivedMetadata = map[string][]prompb.MetricMetadata{} + + // Collect refs that have data in this batch. + refsWithData := make(map[chunks.HeadSeriesRef]struct{}) + for _, s := range samples { + refsWithData[s.Ref] = struct{}{} + } + for _, e := range exemplars { + refsWithData[e.Ref] = struct{}{} + } + for _, h := range histograms { + refsWithData[h.Ref] = struct{}{} + } + for _, fh := range floatHistograms { + refsWithData[fh.Ref] = struct{}{} + } + + // Only expect metadata for series that have data in this batch. + for _, m := range metadata { + if _, ok := refsWithData[m.Ref]; !ok { + continue + } + tsID := getSeriesIDFromRef(series[m.Ref]) + c.expectedMetadata[tsID] = append(c.expectedMetadata[tsID], prompb.MetricMetadata{ + MetricFamilyName: tsID, + Type: prompb.FromMetadataType(record.ToMetricType(m.Type)), + Help: m.Help, + Unit: m.Unit, + }) + } +} + func deepLen[M any](ms ...map[string][]M) int { l := 0 for _, m := range ms { @@ -1068,12 +1110,17 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB, timeout time.Durati defer cancel() if err := runutil.Retry(500*time.Millisecond, ctx.Done(), func() error { c.mtx.Lock() - exp := deepLen(c.expectedSamples) + deepLen(c.expectedExemplars) + deepLen(c.expectedHistograms, c.expectedFloatHistograms) - got := deepLen(c.receivedSamples) + deepLen(c.receivedExemplars) + deepLen(c.receivedHistograms, c.receivedFloatHistograms) + exp := deepLen(c.expectedSamples) + deepLen(c.expectedExemplars) + deepLen(c.expectedHistograms, c.expectedFloatHistograms) + len(c.expectedMetadata) + got := deepLen(c.receivedSamples) + deepLen(c.receivedExemplars) + deepLen(c.receivedHistograms, c.receivedFloatHistograms) + func() int { + if len(c.receivedMetadata) == 0 { + return 0 + } + return len(c.expectedMetadata) // Count unique series that have metadata. + }() c.mtx.Unlock() if got < exp { - return fmt.Errorf("expected %v samples/exemplars/histograms/floathistograms, got %v", exp, got) + return fmt.Errorf("expected %v samples/exemplars/histograms/floathistograms/metadata, got %v", exp, got) } return nil }); err != nil { @@ -1095,6 +1142,12 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB, timeout time.Durati for ts, expectedFloatHistogram := range c.expectedFloatHistograms { require.Equal(tb, expectedFloatHistogram, c.receivedFloatHistograms[ts], ts) } + for ts, expectedMetadata := range c.expectedMetadata { + require.NotEmpty(tb, c.receivedMetadata[ts], "No metadata received for series %s", ts) + // For metadata, we only check that we got at least one entry with the right content + // since v2 protocol sends metadata with each data point + require.Equal(tb, expectedMetadata[0], c.receivedMetadata[ts][0], ts) + } } func (c *TestWriteClient) SetStoreWait(w time.Duration) { @@ -1193,7 +1246,7 @@ func (c *TestWriteClient) Endpoint() string { func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, error) { req := &prompb.WriteRequest{ Timeseries: make([]prompb.TimeSeries, len(v2Req.Timeseries)), - // TODO handle metadata? + Metadata: []prompb.MetricMetadata{}, } b := labels.NewScratchBuilder(0) for i, rts := range v2Req.Timeseries { @@ -1231,6 +1284,21 @@ func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, erro } req.Timeseries[i].Histograms[j] = prompb.FromIntHistogram(h.Timestamp, h.ToIntHistogram()) } + + // Convert v2 metadata to v1 format. + if rts.Metadata.Type != writev2.Metadata_METRIC_TYPE_UNSPECIFIED { + labels := rts.ToLabels(&b, v2Req.Symbols) + metadata := rts.ToMetadata(v2Req.Symbols) + + metricFamilyName := labels.String() + + req.Metadata = append(req.Metadata, prompb.MetricMetadata{ + MetricFamilyName: metricFamilyName, + Type: prompb.FromMetadataType(metadata.Type), + Help: metadata.Help, + Unit: metadata.Unit, + }) + } } return req, nil }