diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 9631de1c9c..17e57ea85c 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1037,24 +1037,22 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline anyways. var ( - max = s.qm.cfg.MaxSamplesPerSend - // Rough estimate, 1% of active series will contain an exemplar on each scrape. - // TODO(cstyan): Casting this many times smells, also we could get index out of bounds issues here. - maxExemplars = int(math.Max(1, float64(max/10))) + max = s.qm.cfg.MaxSamplesPerSend nPending, nPendingSamples, nPendingExemplars = 0, 0, 0 - sampleBuffer = allocateSampleBuffer(max) - buf []byte - pendingData []prompb.TimeSeries - exemplarBuffer [][]prompb.Exemplar + buf []byte ) - totalPending := max if s.qm.sendExemplars { - exemplarBuffer = allocateExemplarBuffer(maxExemplars) - totalPending += maxExemplars + max += int(float64(max) * 0.1) } - pendingData = make([]prompb.TimeSeries, totalPending) + var pendingData = make([]prompb.TimeSeries, max) + for i := range pendingData { + pendingData[i].Samples = []prompb.Sample{{}} + if s.qm.sendExemplars { + pendingData[i].Exemplars = []prompb.Exemplar{{}} + } + } timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) stop := func() { @@ -1094,28 +1092,28 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface return } + pendingData[nPending].Samples = pendingData[nPending].Samples[:0] + if s.qm.sendExemplars { + pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] + } // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. switch d := sample.(type) { case writeSample: - sampleBuffer[nPendingSamples][0] = d.sample pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) - pendingData[nPending].Samples = sampleBuffer[nPendingSamples] - pendingData[nPending].Exemplars = nil + pendingData[nPending].Samples = append(pendingData[nPending].Samples, d.sample) nPendingSamples++ nPending++ case writeExemplar: - exemplarBuffer[nPendingExemplars][0] = d.exemplar pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) - pendingData[nPending].Samples = nil - pendingData[nPending].Exemplars = exemplarBuffer[nPendingExemplars] + pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, d.exemplar) nPendingExemplars++ nPending++ } - if nPendingSamples >= max || nPendingExemplars >= maxExemplars { + if nPending >= max { s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf) s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) @@ -1298,19 +1296,3 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta compressed := snappy.Encode(buf, data) return compressed, highest, nil } - -func allocateSampleBuffer(capacity int) [][]prompb.Sample { - buf := make([][]prompb.Sample, capacity) - for i := range buf { - buf[i] = []prompb.Sample{{}} - } - return buf -} - -func allocateExemplarBuffer(capacity int) [][]prompb.Exemplar { - buf := make([][]prompb.Exemplar, capacity) - for i := range buf { - buf[i] = []prompb.Exemplar{{}} - } - return buf -}