From 2992aaec8e3ab3882a99442536c81b4dcce2d0dd Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 27 Aug 2024 10:51:27 -0700 Subject: [PATCH] rip out interning, except if we have multiple remote writes Signed-off-by: Callum Styan --- go.mod | 2 +- storage/remote/intern-refs | 46 ++++++++++++++++ storage/remote/intern.go | 78 ++++++++++++++++------------ storage/remote/intern_test.go | 62 +++++++++++++--------- storage/remote/queue_manager.go | 64 ++++++++++++----------- storage/remote/queue_manager_test.go | 13 +++-- storage/remote/write.go | 6 ++- 7 files changed, 174 insertions(+), 97 deletions(-) create mode 100644 storage/remote/intern-refs diff --git a/go.mod b/go.mod index 50d560bc3a..8496227abf 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/prometheus/prometheus -go 1.21.0 +go 1.23.0 toolchain go1.22.5 diff --git a/storage/remote/intern-refs b/storage/remote/intern-refs new file mode 100644 index 0000000000..5f6134d621 --- /dev/null +++ b/storage/remote/intern-refs @@ -0,0 +1,46 @@ +goos: linux +goarch: amd64 +pkg: github.com/prometheus/prometheus/storage/remote +cpu: AMD Ryzen 9 5950X 16-Core Processor +BenchmarkStoreSeries/plain-32 4168 256049 ns/op 228660 B/op 1596 allocs/op +BenchmarkStoreSeries/plain-32 4500 254302 ns/op 228643 B/op 1596 allocs/op +BenchmarkStoreSeries/plain-32 4552 255622 ns/op 228618 B/op 1596 allocs/op +BenchmarkStoreSeries/plain-32 4460 254980 ns/op 228615 B/op 1596 allocs/op +BenchmarkStoreSeries/plain-32 4506 259763 ns/op 228626 B/op 1596 allocs/op +BenchmarkStoreSeries/externalLabels-32 1262 895862 ns/op 869235 B/op 2596 allocs/op +BenchmarkStoreSeries/externalLabels-32 1324 897306 ns/op 869256 B/op 2597 allocs/op +BenchmarkStoreSeries/externalLabels-32 1297 894041 ns/op 869225 B/op 2597 allocs/op +BenchmarkStoreSeries/externalLabels-32 1315 907147 ns/op 869157 B/op 2596 allocs/op +BenchmarkStoreSeries/externalLabels-32 1207 895866 ns/op 869218 B/op 2597 allocs/op +BenchmarkStoreSeries/relabel-32 1635 784739 ns/op 741725 B/op 5597 allocs/op +BenchmarkStoreSeries/relabel-32 1532 727039 ns/op 741748 B/op 5598 allocs/op +BenchmarkStoreSeries/relabel-32 1604 729110 ns/op 741750 B/op 5597 allocs/op +BenchmarkStoreSeries/relabel-32 1614 729609 ns/op 741696 B/op 5597 allocs/op +BenchmarkStoreSeries/relabel-32 1626 727394 ns/op 741669 B/op 5597 allocs/op +BenchmarkStoreSeries/externalLabels+relabel-32 987 1208797 ns/op 837849 B/op 5597 allocs/op +BenchmarkStoreSeries/externalLabels+relabel-32 985 1197194 ns/op 837867 B/op 5597 allocs/op +BenchmarkStoreSeries/externalLabels+relabel-32 992 1195505 ns/op 837853 B/op 5597 allocs/op +BenchmarkStoreSeries/externalLabels+relabel-32 999 1201181 ns/op 837831 B/op 5597 allocs/op +BenchmarkStoreSeries/externalLabels+relabel-32 1000 1195945 ns/op 837889 B/op 5597 allocs/op +BenchmarkStoreSeries_TwoEndpoints/plain-32 2469 475513 ns/op 436320 B/op 2731 allocs/op +BenchmarkStoreSeries_TwoEndpoints/plain-32 2523 478113 ns/op 436257 B/op 2731 allocs/op +BenchmarkStoreSeries_TwoEndpoints/plain-32 2458 475820 ns/op 436279 B/op 2731 allocs/op +BenchmarkStoreSeries_TwoEndpoints/plain-32 2492 472694 ns/op 436243 B/op 2731 allocs/op +BenchmarkStoreSeries_TwoEndpoints/plain-32 2437 476160 ns/op 436259 B/op 2731 allocs/op +BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 679 1747548 ns/op 1717397 B/op 4731 allocs/op +BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 676 1754213 ns/op 1717468 B/op 4732 allocs/op +BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 674 1739374 ns/op 1717653 B/op 4732 allocs/op +BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 679 1738507 ns/op 1717426 B/op 4732 allocs/op +BenchmarkStoreSeries_TwoEndpoints/externalLabels-32 694 1739284 ns/op 1717384 B/op 4732 allocs/op +BenchmarkStoreSeries_TwoEndpoints/relabel-32 850 1399855 ns/op 1462383 B/op 10733 allocs/op +BenchmarkStoreSeries_TwoEndpoints/relabel-32 834 1405426 ns/op 1462345 B/op 10733 allocs/op +BenchmarkStoreSeries_TwoEndpoints/relabel-32 842 1424322 ns/op 1462449 B/op 10734 allocs/op +BenchmarkStoreSeries_TwoEndpoints/relabel-32 841 1404540 ns/op 1462356 B/op 10733 allocs/op +BenchmarkStoreSeries_TwoEndpoints/relabel-32 843 1414279 ns/op 1462380 B/op 10733 allocs/op +BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 508 2351248 ns/op 1654492 B/op 10733 allocs/op +BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 507 2368400 ns/op 1654660 B/op 10734 allocs/op +BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 505 2347374 ns/op 1654649 B/op 10734 allocs/op +BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 507 2349636 ns/op 1654516 B/op 10733 allocs/op +BenchmarkStoreSeries_TwoEndpoints/externalLabels+relabel-32 504 2349570 ns/op 1654583 B/op 10733 allocs/op +PASS +ok github.com/prometheus/prometheus/storage/remote 53.470s diff --git a/storage/remote/intern.go b/storage/remote/intern.go index 23047acd9b..af84d21bc9 100644 --- a/storage/remote/intern.go +++ b/storage/remote/intern.go @@ -23,6 +23,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunks" "go.uber.org/atomic" ) @@ -34,53 +36,33 @@ var noReferenceReleases = promauto.NewCounter(prometheus.CounterOpts{ }) type pool struct { - mtx sync.RWMutex - pool map[string]*entry + mtx sync.RWMutex + pool map[chunks.HeadSeriesRef]*entry + shouldIntern bool } type entry struct { refs atomic.Int64 - - s string + lset labels.Labels } -func newEntry(s string) *entry { - return &entry{s: s} +func newEntry(lset labels.Labels) *entry { + return &entry{lset: lset} } -func newPool() *pool { +func newPool(shouldIntern bool) *pool { return &pool{ - pool: map[string]*entry{}, + pool: map[chunks.HeadSeriesRef]*entry{}, + shouldIntern: shouldIntern, } } -func (p *pool) intern(s string) string { - if s == "" { - return "" +func (p *pool) release(ref chunks.HeadSeriesRef) { + if !p.shouldIntern { + return } - p.mtx.RLock() - interned, ok := p.pool[s] - p.mtx.RUnlock() - if ok { - interned.refs.Inc() - return interned.s - } - p.mtx.Lock() - defer p.mtx.Unlock() - if interned, ok := p.pool[s]; ok { - interned.refs.Inc() - return interned.s - } - - p.pool[s] = newEntry(s) - p.pool[s].refs.Store(1) - return s -} - -func (p *pool) release(s string) { - p.mtx.RLock() - interned, ok := p.pool[s] + interned, ok := p.pool[ref] p.mtx.RUnlock() if !ok { @@ -98,5 +80,33 @@ func (p *pool) release(s string) { if interned.refs.Load() != 0 { return } - delete(p.pool, s) + delete(p.pool, ref) +} + +func (p *pool) intern(ref chunks.HeadSeriesRef, lset labels.Labels) labels.Labels { + if !p.shouldIntern { + return lset + } + + p.mtx.RLock() + interned, ok := p.pool[ref] + p.mtx.RUnlock() + if ok { + interned.refs.Inc() + return interned.lset + } + p.mtx.Lock() + defer p.mtx.Unlock() + if interned, ok := p.pool[ref]; ok { + interned.refs.Inc() + return interned.lset + } + + if len(lset) == 0 { + return nil + } + + p.pool[ref] = newEntry(lset) + p.pool[ref].refs.Store(1) + return p.pool[ref].lset } diff --git a/storage/remote/intern_test.go b/storage/remote/intern_test.go index 917c42e912..9fa6611c61 100644 --- a/storage/remote/intern_test.go +++ b/storage/remote/intern_test.go @@ -24,67 +24,79 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunks" ) func TestIntern(t *testing.T) { - interner := newPool() - testString := "TestIntern" - interner.intern(testString) - interned, ok := interner.pool[testString] + interner := newPool(true) + testString := "TestIntern_DeleteRef" + ref := chunks.HeadSeriesRef(1234) + + lset := labels.FromStrings("name", testString) + interner.intern(ref, lset) + interned, ok := interner.pool[ref] require.True(t, ok) + require.Equal(t, lset, interned.lset) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) } func TestIntern_MultiRef(t *testing.T) { - interner := newPool() - testString := "TestIntern_MultiRef" + interner := newPool(true) + testString := "TestIntern_DeleteRef" + ref := chunks.HeadSeriesRef(1234) - interner.intern(testString) - interned, ok := interner.pool[testString] + lset := labels.FromStrings("name", testString) + interner.intern(ref, lset) + interned, ok := interner.pool[ref] require.True(t, ok) + require.Equal(t, lset, interned.lset) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) - interner.intern(testString) - interned, ok = interner.pool[testString] + interner.intern(ref, lset) + interned, ok = interner.pool[ref] - require.True(t, ok) + require.NotNil(t, interned) require.Equal(t, int64(2), interned.refs.Load(), fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs.Load())) } func TestIntern_DeleteRef(t *testing.T) { - interner := newPool() + interner := newPool(true) testString := "TestIntern_DeleteRef" + ref := chunks.HeadSeriesRef(1234) + interner.intern(ref, labels.FromStrings("name", testString)) + interned, ok := interner.pool[ref] - interner.intern(testString) - interned, ok := interner.pool[testString] - - require.True(t, ok) + require.NotNil(t, interned) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) - interner.release(testString) - _, ok = interner.pool[testString] + interner.release(ref) + _, ok = interner.pool[ref] require.False(t, ok) } func TestIntern_MultiRef_Concurrent(t *testing.T) { - interner := newPool() + interner := newPool(true) testString := "TestIntern_MultiRef_Concurrent" + ref := chunks.HeadSeriesRef(1234) - interner.intern(testString) - interned, ok := interner.pool[testString] - require.True(t, ok) + interner.intern(ref, labels.FromStrings("name", testString)) + interned, ok := interner.pool[ref] + + require.NotNil(t, interned) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) - go interner.release(testString) + go interner.release(ref) - interner.intern(testString) + interner.intern(ref, labels.FromStrings("name", testString)) time.Sleep(time.Millisecond) interner.mtx.RLock() - interned, ok = interner.pool[testString] + interned, ok = interner.pool[ref] interner.mtx.RUnlock() require.True(t, ok) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b1c8997268..a8f356575a 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -702,8 +702,13 @@ outer: continue } t.seriesMtx.Lock() - lbls, ok := t.seriesLabels[s.Ref] - if !ok { + var lbls labels.Labels + if t.interner.shouldIntern { + lbls = t.interner.intern(s.Ref, nil) + } else { + lbls = t.seriesLabels[s.Ref] + } + if len(lbls) == 0 { t.dataDropped.incr(1) if _, ok := t.droppedSeries[s.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) @@ -764,8 +769,13 @@ outer: continue } t.seriesMtx.Lock() - lbls, ok := t.seriesLabels[e.Ref] - if !ok { + var lbls labels.Labels + if t.interner.shouldIntern { + lbls = t.interner.intern(e.Ref, nil) + } else { + lbls = t.seriesLabels[e.Ref] + } + if len(lbls) == 0 { // Track dropped exemplars in the same EWMA for sharding calc. t.dataDropped.incr(1) if _, ok := t.droppedSeries[e.Ref]; !ok { @@ -821,8 +831,13 @@ outer: continue } t.seriesMtx.Lock() - lbls, ok := t.seriesLabels[h.Ref] - if !ok { + var lbls labels.Labels + if t.interner.shouldIntern { + lbls = t.interner.intern(h.Ref, nil) + } else { + lbls = t.seriesLabels[h.Ref] + } + if len(lbls) == 0 { t.dataDropped.incr(1) if _, ok := t.droppedSeries[h.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) @@ -876,8 +891,13 @@ outer: continue } t.seriesMtx.Lock() - lbls, ok := t.seriesLabels[h.Ref] - if !ok { + var lbls labels.Labels + if t.interner.shouldIntern { + lbls = t.interner.intern(h.Ref, nil) + } else { + lbls = t.seriesLabels[h.Ref] + } + if len(lbls) == 0 { t.dataDropped.incr(1) if _, ok := t.droppedSeries[h.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) @@ -960,9 +980,6 @@ func (t *QueueManager) Stop() { // On shutdown, release the strings in the labels from the intern pool. t.seriesMtx.Lock() - for _, labels := range t.seriesLabels { - t.releaseLabels(labels) - } t.seriesMtx.Unlock() t.metrics.unregister() } @@ -985,15 +1002,11 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { continue } lbls := t.builder.Labels() - t.internLabels(lbls) - - // We should not ever be replacing a series labels in the map, but just - // in case we do we need to ensure we do not leak the replaced interned - // strings. - if orig, ok := t.seriesLabels[s.Ref]; ok { - t.releaseLabels(orig) + if t.interner.shouldIntern { + t.interner.intern(s.Ref, lbls) + } else { + t.seriesLabels[s.Ref] = lbls } - t.seriesLabels[s.Ref] = lbls } } @@ -1037,8 +1050,9 @@ func (t *QueueManager) SeriesReset(index int) { for k, v := range t.seriesSegmentIndexes { if v < index { delete(t.seriesSegmentIndexes, k) - t.releaseLabels(t.seriesLabels[k]) - delete(t.seriesLabels, k) + t.interner.release(k) + //t.releaseLabels(t.seriesLabels[k]) + //delete(t.seriesLabels, k) delete(t.seriesMetadata, k) delete(t.droppedSeries, k) } @@ -1059,14 +1073,6 @@ func (t *QueueManager) client() WriteClient { return t.storeClient } -func (t *QueueManager) internLabels(lbls labels.Labels) { - lbls.InternStrings(t.interner.intern) -} - -func (t *QueueManager) releaseLabels(ls labels.Labels) { - ls.ReleaseStrings(t.interner.release) -} - // processExternalLabels merges externalLabels into b. If b contains // a label in externalLabels, the value in b wins. func processExternalLabels(b *labels.Builder, externalLabels []labels.Label) { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index c57e8cdff5..49c97a4dcd 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -294,7 +294,6 @@ func TestSampleDelivery(t *testing.T) { qm.AppendHistograms(histograms[:len(histograms)/2]) qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2]) c.waitForExpectedData(t, 30*time.Second) - // Send second half of data. c.expectSamples(samples[len(samples)/2:], series) c.expectExemplars(exemplars[len(exemplars)/2:], series) @@ -319,7 +318,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(false), newHighestTimestampMetric(), nil, false, false, protoMsg) return m } @@ -772,7 +771,7 @@ func TestDisableReshardOnRetry(t *testing.T) { } ) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) m.StoreSeries(fakeSeries, 0) // Attempt to samples while the manager is running. We immediately stop the @@ -1385,7 +1384,7 @@ func BenchmarkStoreSeries(b *testing.B) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) m.externalLabels = tc.externalLabels m.relabelConfigs = tc.relabelConfigs @@ -1437,13 +1436,13 @@ func BenchmarkStoreSeries_TwoEndpoints(b *testing.B) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) m.externalLabels = tc.externalLabels m.relabelConfigs = tc.relabelConfigs m.StoreSeries(series, 0) - m2 := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + m2 := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) m2.externalLabels = tc.externalLabels m2.relabelConfigs = tc.relabelConfigs @@ -1483,7 +1482,7 @@ func BenchmarkStartup(b *testing.B) { // todo: test with new proto type(s) m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) + cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(false), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() diff --git a/storage/remote/write.go b/storage/remote/write.go index 3d2f1fdfcd..0c784eacb2 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -91,7 +91,7 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f flushDeadline: flushDeadline, samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), dir: dir, - interner: newPool(), + interner: newPool(false), scraper: sm, quit: make(chan struct{}), metadataInWAL: metadataInWal, @@ -227,6 +227,10 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { q.Stop() } + if len(newHashes) > 1 { + rws.interner.shouldIntern = true + } + for _, hash := range newHashes { newQueues[hash].Start() }