optimization(tsdb/wlog): reuse Ref* buffers across WAL watchers' reads

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-03-06 13:14:31 +00:00
parent 3f680836cf
commit 87cc6d3bb9
7 changed files with 172 additions and 38 deletions

View file

@ -488,6 +488,7 @@ func NewQueueManager(
enableNativeHistogramRemoteWrite bool,
enableTypeAndUnitLabels bool,
protoMsg remoteapi.WriteMessageType,
recordBuf *record.BuffersPool,
) *QueueManager {
if logger == nil {
logger = promslog.NewNopLogger()
@ -537,7 +538,7 @@ func NewQueueManager(
walMetadata := t.protoMsg != remoteapi.WriteV1MessageType
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata)
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata, recordBuf)
// The current MetadataWatcher implementation is mutually exclusive
// with the new approach, which stores metadata as WAL records and

View file

@ -302,7 +302,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg remoteapi.WriteMessageType) *QueueManager {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg, record.NewBuffersPool())
return m
}
@ -770,7 +770,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
}
)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType, nil)
m.StoreSeries(recs.Series, 0)
// Attempt to samples while the manager is running. We immediately stop the
@ -1346,7 +1346,7 @@ func BenchmarkStoreSeries(b *testing.B) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType, record.NewBuffersPool())
m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs

View file

@ -25,6 +25,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
@ -72,6 +74,8 @@ type WriteStorage struct {
scraper ReadyScrapeManager
quit chan struct{}
recordBuf *record.BuffersPool
// For timestampTracker.
highestTimestamp *maxTimestamp
enableTypeAndUnitLabels bool
@ -102,6 +106,7 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string,
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet. Deprecated, check prometheus_remote_storage_queue_highest_timestamp_seconds which is more accurate.",
}),
},
recordBuf: record.NewBuffersPool(),
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
}
if reg != nil {
@ -215,6 +220,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rwConf.SendNativeHistograms,
rws.enableTypeAndUnitLabels,
rwConf.ProtobufMessage,
rws.recordBuf,
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)

View file

@ -79,12 +79,13 @@ type Head struct {
// This should be typecasted to chunks.ChunkDiskMapperRef after loading.
minOOOMmapRef atomic.Uint64
metrics *headMetrics
opts *HeadOptions
wal, wbl *wlog.WL
exemplarMetrics *ExemplarMetrics
exemplars ExemplarStorage
logger *slog.Logger
metrics *headMetrics
opts *HeadOptions
wal, wbl *wlog.WL
exemplarMetrics *ExemplarMetrics
exemplars ExemplarStorage
logger *slog.Logger
// TODO(bwplotka): Consider using record.Pools that's reused with WAL watchers.
refSeriesPool zeropool.Pool[[]record.RefSeries]
floatsPool zeropool.Pool[[]record.RefSample]
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]

102
tsdb/record/buffers.go Normal file
View file

@ -0,0 +1,102 @@
package record
import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/util/zeropool"
)
// BuffersPool offers pool of zero-ed record buffers.
type BuffersPool struct {
series zeropool.Pool[[]RefSeries]
samples zeropool.Pool[[]RefSample]
exemplars zeropool.Pool[[]RefExemplar]
histograms zeropool.Pool[[]RefHistogramSample]
floatHistograms zeropool.Pool[[]RefFloatHistogramSample]
metadata zeropool.Pool[[]RefMetadata]
}
// NewBuffersPool returns a new BuffersPool object.
func NewBuffersPool() *BuffersPool {
return &BuffersPool{}
}
func (p *BuffersPool) GetRefSeries(cap int) []RefSeries {
b := p.series.Get()
if b == nil {
return make([]RefSeries, 0, cap)
}
return b
}
func (p *BuffersPool) PutRefSeries(b []RefSeries) {
for i := range b { // Zero out to avoid retaining label data.
b[i].Labels = labels.EmptyLabels()
}
p.series.Put(b[:0])
}
func (p *BuffersPool) GetSamples(cap int) []RefSample {
b := p.samples.Get()
if b == nil {
return make([]RefSample, 0, cap)
}
return b
}
func (p *BuffersPool) PutSamples(b []RefSample) {
p.samples.Put(b[:0])
}
func (p *BuffersPool) GetExemplars(cap int) []RefExemplar {
b := p.exemplars.Get()
if b == nil {
return make([]RefExemplar, 0, cap)
}
return b
}
func (p *BuffersPool) PutExemplars(b []RefExemplar) {
for i := range b { // Zero out to avoid retaining label data.
b[i].Labels = labels.EmptyLabels()
}
p.exemplars.Put(b[:0])
}
func (p *BuffersPool) GetHistograms(cap int) []RefHistogramSample {
b := p.histograms.Get()
if b == nil {
return make([]RefHistogramSample, 0, cap)
}
return b
}
func (p *BuffersPool) PutHistograms(b []RefHistogramSample) {
clear(b)
p.histograms.Put(b[:0])
}
func (p *BuffersPool) GetFloatHistograms(cap int) []RefFloatHistogramSample {
b := p.floatHistograms.Get()
if b == nil {
return make([]RefFloatHistogramSample, 0, cap)
}
return b
}
func (p *BuffersPool) PutFloatHistograms(b []RefFloatHistogramSample) {
clear(b)
p.floatHistograms.Put(b[:0])
}
func (p *BuffersPool) GetMetadata(cap int) []RefMetadata {
b := p.metadata.Get()
if b == nil {
return make([]RefMetadata, 0, cap)
}
return b
}
func (p *BuffersPool) PutMetadata(b []RefMetadata) {
clear(b)
p.metadata.Put(b[:0])
}

View file

@ -89,6 +89,7 @@ type WatcherMetrics struct {
type Watcher struct {
name string
writer WriteTo
recordBuf *record.BuffersPool
logger *slog.Logger
walDir string
lastCheckpoint string
@ -193,12 +194,25 @@ func (m *WatcherMetrics) Unregister() {
}
// NewWatcher creates a new WAL watcher for a given WriteTo.
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger *slog.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *Watcher {
func NewWatcher(
metrics *WatcherMetrics,
readerMetrics *LiveReaderMetrics,
logger *slog.Logger,
name string,
writer WriteTo,
dir string,
sendExemplars, sendHistograms, sendMetadata bool,
recordBuf *record.BuffersPool,
) *Watcher {
if logger == nil {
logger = promslog.NewNopLogger()
}
if recordBuf == nil {
recordBuf = record.NewBuffersPool()
}
return &Watcher{
logger: logger,
recordBuf: recordBuf,
writer: writer,
metrics: metrics,
readerMetrics: readerMetrics,
@ -511,18 +525,28 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) (err err
}
}()
var (
dec = record.NewDecoder(labels.NewSymbolTable(), w.logger) // One table per WAL segment means it won't grow indefinitely.
series []record.RefSeries
samples []record.RefSample
samplesToSend []record.RefSample
exemplars []record.RefExemplar
histograms []record.RefHistogramSample
histogramsToSend []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
floatHistogramsToSend []record.RefFloatHistogramSample
metadata []record.RefMetadata
)
series := w.recordBuf.GetRefSeries(512)
samples := w.recordBuf.GetSamples(512)
samplesToSend := w.recordBuf.GetSamples(512)
exemplars := w.recordBuf.GetExemplars(512)
histograms := w.recordBuf.GetHistograms(512)
histogramsToSend := w.recordBuf.GetHistograms(512)
floatHistograms := w.recordBuf.GetFloatHistograms(512)
floatHistogramsToSend := w.recordBuf.GetFloatHistograms(512)
metadata := w.recordBuf.GetMetadata(512)
defer func() {
w.recordBuf.PutRefSeries(series)
w.recordBuf.PutSamples(samples)
w.recordBuf.PutSamples(samplesToSend)
w.recordBuf.PutExemplars(exemplars)
w.recordBuf.PutHistograms(histograms)
w.recordBuf.PutHistograms(histogramsToSend)
w.recordBuf.PutFloatHistograms(floatHistograms)
w.recordBuf.PutFloatHistograms(floatHistogramsToSend)
w.recordBuf.PutMetadata(metadata)
}()
dec := record.NewDecoder(labels.NewSymbolTable(), w.logger) // One table per WAL segment means it won't grow indefinitely.
for r.Next() && !isClosed(w.quit) {
var err error
rec := r.Record()
@ -660,10 +684,10 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) (err err
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
w.reads.Inc()
var (
dec = record.NewDecoder(labels.NewSymbolTable(), w.logger) // Needed for decoding; labels do not outlive this function.
series []record.RefSeries
)
series := w.recordBuf.GetRefSeries(512)
defer w.recordBuf.PutRefSeries(series)
dec := record.NewDecoder(labels.NewSymbolTable(), w.logger) // Needed for decoding; labels do not outlive this function.
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
w.recordsRead.WithLabelValues(dec.Type(rec).String()).Inc()

View file

@ -341,7 +341,7 @@ func TestWatcher_Tail(t *testing.T) {
// Start watcher to that reads into a mock.
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true, nil)
// Update the time because we just created samples around "now" time and watcher
// only starts watching after that time.
watcher.SetStartTime(now)
@ -433,7 +433,7 @@ func TestInspect(t *testing.T) {
func inspectSeg(t testing.TB, segPath string) {
fmt.Println("-------", segPath, "-----")
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, t.TempDir(), true, true, true)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, t.TempDir(), true, true, true, nil)
watcher.SetMetrics()
watcher.SetStartTime(timestamp.Time(math.MinInt64))
@ -539,7 +539,7 @@ func BenchmarkWatcher_ReadSegment(b *testing.B) {
b.Run("case=one-go", func(b *testing.B) {
// Start watcher to that reads into a bench mock that only records sampleAppends.
wt := newBenchWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, b.TempDir(), true, true, true)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, b.TempDir(), true, true, true, nil)
watcher.SetMetrics()
// Update the time because by default, watchers starts with start time=="now" time and watcher
// only starts reading data after that time.
@ -580,7 +580,7 @@ func BenchmarkWatcher_ReadSegment(b *testing.B) {
b.Run("case=per-scrape", func(b *testing.B) {
// Start watcher to that reads into a bench mock that only records sampleAppends.
wt := newBenchWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, b.TempDir(), true, true, true)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, b.TempDir(), true, true, true, nil)
watcher.SetMetrics()
// Update the time because by default, watchers starts with start time=="now" time and watcher
// only starts reading data after that time.
@ -697,7 +697,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
require.NoError(t, err)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
go watcher.Start()
expected := seriesCount
@ -786,7 +786,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
require.NoError(t, err)
overwriteReadTimeout(t, time.Second)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
go watcher.Start()
expected := seriesCount * 2
@ -858,7 +858,7 @@ func TestReadCheckpoint(t *testing.T) {
require.NoError(t, err)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
go watcher.Start()
expectedSeries := seriesCount
@ -927,7 +927,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
}
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher.MaxSegment = -1
// Set the Watcher's metrics so they're not nil pointers.
@ -1000,7 +1000,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
overwriteReadTimeout(t, time.Second)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher.MaxSegment = -1
go watcher.Start()
@ -1080,7 +1080,7 @@ func TestRun_StartupTime(t *testing.T) {
require.NoError(t, w.Close())
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher.MaxSegment = segments
watcher.SetMetrics()
@ -1151,7 +1151,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
// Set up the watcher and run it in the background.
wt := newWriteToMock(time.Millisecond)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher.SetMetrics()
watcher.MaxSegment = segmentsToRead