alternative

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-03-30 11:06:23 +01:00
parent 0f38319b92
commit 043d710282
2 changed files with 33 additions and 59 deletions

View file

@ -22,6 +22,7 @@ import (
"sync"
"time"
"github.com/cenkalti/backoff/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/promslog"
@ -95,7 +96,7 @@ func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus.
targets: make(map[poolKey]map[string]*targetgroup.Group),
ctx: ctx,
updatert: 5 * time.Second,
triggerSend: make(chan struct{}, 1),
triggerSend: make(chan struct{}, 1), // At least one element to ensure we can do a delayed read.
registerer: registerer,
sdMetrics: sdMetrics,
}
@ -158,17 +159,6 @@ func FeatureRegistry(fr features.Collector) func(*Manager) {
}
}
// SkipStartupWait configures the manager to skip the initial wait on startup.
// This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
// which are sensitive to startup latencies.
func SkipStartupWait() func(*Manager) {
return func(m *Manager) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.skipStartupWait = true
}
}
// Manager maintains a set of discovery providers and sends each update to a map channel.
// Targets are grouped by the target set name.
type Manager struct {
@ -206,11 +196,6 @@ type Manager struct {
// featureRegistry is used to track which service discovery providers are configured.
featureRegistry features.Collector
// skipStartupWait allows the discovery manager to skip the initial wait before sending updates
// to the channel. This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
// which are sensitive to startup latencies.
skipStartupWait bool
}
// Providers returns the currently configured SD providers.
@ -253,8 +238,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
var (
wg sync.WaitGroup
newProviders []*Provider
// triggerSync shows if we should trigger send to notify downstream of changes.
triggerSync bool
)
for _, prov := range m.providers {
// Cancel obsolete providers if it has no new subs and it has a cancel function.
@ -269,7 +252,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
prov.cancel()
prov.mu.RUnlock()
triggerSync = true // Trigger send to notify downstream of dropped targets
continue
}
prov.mu.RUnlock()
@ -281,7 +263,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
m.targetsMtx.Lock()
for s := range prov.subs {
triggerSync = true // Trigger send because this is an existing provider (reload)
refTargets = m.targets[poolKey{s, prov.name}]
// Remove obsolete subs' targets.
if _, ok := prov.newSubs[s]; !ok {
@ -314,7 +295,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
// See https://github.com/prometheus/prometheus/pull/8639 for details.
// This also helps making the downstream managers drop stale targets as soon as possible.
// See https://github.com/prometheus/prometheus/pull/13147 for details.
if triggerSync {
if len(m.providers) > 0 {
select {
case m.triggerSend <- struct{}{}:
default:
@ -402,59 +383,47 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ
}
}
func (m *Manager) flushUpdates(ctx context.Context, timeout <-chan time.Time) {
m.metrics.SentUpdates.Inc()
select {
case m.syncCh <- m.allGroups():
case <-timeout:
m.metrics.DelayedUpdates.Inc()
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
select {
case m.triggerSend <- struct{}{}:
case <-ctx.Done():
return
default:
}
}
}
func (m *Manager) sender() {
ticker := time.NewTicker(m.updatert)
defer func() {
ticker.Stop()
close(m.syncCh)
}()
if m.skipStartupWait {
select {
case <-m.triggerSend:
m.flushUpdates(m.ctx, ticker.C)
case <-m.ctx.Done():
return
}
ticker.Reset(m.updatert)
// Some discoverers send updates too often, so we throttle these with a backoff interval that
// increased the interval up to m.updatert delay.
lastSent := time.Now().Add(-1 * m.updatert)
b := &backoff.ExponentialBackOff{
InitialInterval: 100 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: m.updatert,
}
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker.
case <-time.After(b.NextBackOff()):
select {
case <-m.triggerSend:
m.metrics.SentUpdates.Inc()
select {
case m.syncCh <- m.allGroups():
lastSent = time.Now()
default:
m.metrics.DelayedUpdates.Inc()
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
// Ensure we don't miss this update.
select {
case m.triggerSend <- struct{}{}:
default:
}
}
default:
}
if time.Since(lastSent) > m.updatert {
b.Reset() // Nothing happened for a while, start again from low interval for prompt updates.
}
}
}
}

View file

@ -799,14 +799,14 @@ func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
{
name: "startup wait with short interval succeeds",
updatert: 10 * time.Millisecond,
readTimeout: 100 * time.Millisecond,
readTimeout: 300 * time.Millisecond,
expectedTargets: 1,
},
{
name: "skip startup wait",
skipInitialWait: true,
updatert: 100 * time.Hour,
readTimeout: 100 * time.Millisecond,
readTimeout: 300 * time.Millisecond,
expectedTargets: 1,
},
}
@ -839,20 +839,25 @@ func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
synctest.Wait()
var syncedTargets map[string][]*targetgroup.Group
select {
case syncedTargets = <-discoveryManager.SyncCh():
case <-time.After(tc.readTimeout):
timeout := time.After(tc.readTimeout)
var lastSyncedTargets map[string][]*targetgroup.Group
testFor:
for {
select {
case <-timeout:
break testFor
case lastSyncedTargets = <-discoveryManager.SyncCh():
}
}
if tc.expectedTargets == 0 {
require.Nil(t, syncedTargets)
require.Nil(t, lastSyncedTargets)
return
}
require.Len(t, syncedTargets, 1)
require.Len(t, syncedTargets["prometheus"], tc.expectedTargets)
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Len(t, lastSyncedTargets, 1)
require.Len(t, lastSyncedTargets["prometheus"], tc.expectedTargets)
verifySyncedPresence(t, lastSyncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)