mirror of
https://github.com/prometheus/prometheus.git
synced 2026-04-22 14:52:43 -04:00
tests: test ST in a cheapest way possible
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
ba1b87f51f
commit
c2eac549d5
1 changed files with 33 additions and 17 deletions
|
|
@ -871,7 +871,8 @@ func generateRecords(c recCase) (ret records) {
|
|||
// test exact semantics.
|
||||
ret.samples[i*c.samplesPerSeries+j] = record.RefSample{
|
||||
Ref: chunks.HeadSeriesRef(i),
|
||||
T: st,
|
||||
ST: st,
|
||||
T: ts,
|
||||
V: float64(i),
|
||||
}
|
||||
}
|
||||
|
|
@ -952,8 +953,8 @@ func getSeriesIDFromRef(r record.RefSeries) string {
|
|||
// TestWriteClient represents write client which does not call remote storage,
|
||||
// but instead re-implements fake WriteHandler for test purposes.
|
||||
type TestWriteClient struct {
|
||||
receivedSamples map[string][]prompb.Sample
|
||||
expectedSamples map[string][]prompb.Sample
|
||||
receivedSamples map[string][]writev2.Sample
|
||||
expectedSamples map[string][]writev2.Sample
|
||||
receivedExemplars map[string][]prompb.Exemplar
|
||||
expectedExemplars map[string][]prompb.Exemplar
|
||||
receivedHistograms map[string][]prompb.Histogram
|
||||
|
|
@ -977,8 +978,8 @@ type TestWriteClient struct {
|
|||
// NewTestWriteClient creates a new testing write client.
|
||||
func NewTestWriteClient(protoMsg remoteapi.WriteMessageType) *TestWriteClient {
|
||||
return &TestWriteClient{
|
||||
receivedSamples: map[string][]prompb.Sample{},
|
||||
expectedSamples: map[string][]prompb.Sample{},
|
||||
receivedSamples: map[string][]writev2.Sample{},
|
||||
expectedSamples: map[string][]writev2.Sample{},
|
||||
receivedMetadata: map[string][]prompb.MetricMetadata{},
|
||||
expectedMetadata: map[string][]prompb.MetricMetadata{},
|
||||
protoMsg: protoMsg,
|
||||
|
|
@ -993,18 +994,20 @@ func (c *TestWriteClient) injectErrors(injectedErrs []error) {
|
|||
c.retry = false
|
||||
}
|
||||
|
||||
// expectSamples injects samples that will be expected on waitForExpectedData.
|
||||
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
c.expectedSamples = map[string][]prompb.Sample{}
|
||||
c.receivedSamples = map[string][]prompb.Sample{}
|
||||
c.expectedSamples = map[string][]writev2.Sample{}
|
||||
c.receivedSamples = map[string][]writev2.Sample{}
|
||||
|
||||
for _, s := range ss {
|
||||
tsID := getSeriesIDFromRef(series[s.Ref])
|
||||
c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
|
||||
Timestamp: s.T,
|
||||
Value: s.V,
|
||||
c.expectedSamples[tsID] = append(c.expectedSamples[tsID], writev2.Sample{
|
||||
StartTimestamp: s.ST,
|
||||
Timestamp: s.T,
|
||||
Value: s.V,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -1182,7 +1185,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResp
|
|||
}
|
||||
}
|
||||
|
||||
var reqProto *prompb.WriteRequest
|
||||
var (
|
||||
reqProto *prompb.WriteRequest
|
||||
reqProtoV2 *writev2.Request
|
||||
)
|
||||
switch c.protoMsg {
|
||||
case remoteapi.WriteV1MessageType:
|
||||
reqProto = &prompb.WriteRequest{}
|
||||
|
|
@ -1190,10 +1196,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResp
|
|||
case remoteapi.WriteV2MessageType:
|
||||
// NOTE(bwplotka): v1 msg can be unmarshaled to v2 sometimes, without
|
||||
// errors.
|
||||
var reqProtoV2 writev2.Request
|
||||
err = proto.Unmarshal(reqBuf, &reqProtoV2)
|
||||
reqProtoV2 = &writev2.Request{}
|
||||
err = proto.Unmarshal(reqBuf, reqProtoV2)
|
||||
if err == nil {
|
||||
reqProto, err = v2RequestToWriteRequest(&reqProtoV2)
|
||||
reqProto, err = v2RequestToWriteRequest(reqProtoV2)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
|
|
@ -1202,11 +1208,21 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResp
|
|||
|
||||
rs := WriteResponseStats{}
|
||||
b := labels.NewScratchBuilder(0)
|
||||
for _, ts := range reqProto.Timeseries {
|
||||
for i, ts := range reqProto.Timeseries {
|
||||
labels := ts.ToLabels(&b, nil)
|
||||
tsID := labels.String()
|
||||
if len(ts.Samples) > 0 {
|
||||
c.receivedSamples[tsID] = append(c.receivedSamples[tsID], ts.Samples...)
|
||||
for j, s := range ts.Samples {
|
||||
st := int64(0)
|
||||
if reqProtoV2 != nil {
|
||||
// TODO(bwplotka): Refactor queue manager TestWriteClient for tighter validation
|
||||
// and native support for new RW2 features. For now we inject STs in RW2 case to the existing test suite.
|
||||
st = reqProtoV2.Timeseries[i].Samples[j].StartTimestamp
|
||||
}
|
||||
c.receivedSamples[tsID] = append(c.receivedSamples[tsID], writev2.Sample{
|
||||
StartTimestamp: st,
|
||||
Timestamp: s.Timestamp,
|
||||
Value: s.Value,
|
||||
})
|
||||
}
|
||||
rs.Samples += len(ts.Samples)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue