diff --git a/discovery/manager.go b/discovery/manager.go index 29fd825412..03616247d2 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/cenkalti/backoff/v5" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/promslog" @@ -95,7 +96,7 @@ func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus. targets: make(map[poolKey]map[string]*targetgroup.Group), ctx: ctx, updatert: 5 * time.Second, - triggerSend: make(chan struct{}, 1), + triggerSend: make(chan struct{}, 1), // At least one element to ensure we can do a delayed read. registerer: registerer, sdMetrics: sdMetrics, } @@ -158,17 +159,6 @@ func FeatureRegistry(fr features.Collector) func(*Manager) { } } -// SkipStartupWait configures the manager to skip the initial wait on startup. -// This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver -// which are sensitive to startup latencies. -func SkipStartupWait() func(*Manager) { - return func(m *Manager) { - m.mtx.Lock() - defer m.mtx.Unlock() - m.skipStartupWait = true - } -} - // Manager maintains a set of discovery providers and sends each update to a map channel. // Targets are grouped by the target set name. type Manager struct { @@ -206,11 +196,6 @@ type Manager struct { // featureRegistry is used to track which service discovery providers are configured. featureRegistry features.Collector - - // skipStartupWait allows the discovery manager to skip the initial wait before sending updates - // to the channel. This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver - // which are sensitive to startup latencies. - skipStartupWait bool } // Providers returns the currently configured SD providers. @@ -253,8 +238,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { var ( wg sync.WaitGroup newProviders []*Provider - // triggerSync shows if we should trigger send to notify downstream of changes. - triggerSync bool ) for _, prov := range m.providers { // Cancel obsolete providers if it has no new subs and it has a cancel function. @@ -269,7 +252,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { prov.cancel() prov.mu.RUnlock() - triggerSync = true // Trigger send to notify downstream of dropped targets continue } prov.mu.RUnlock() @@ -281,7 +263,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { m.targetsMtx.Lock() for s := range prov.subs { - triggerSync = true // Trigger send because this is an existing provider (reload) refTargets = m.targets[poolKey{s, prov.name}] // Remove obsolete subs' targets. if _, ok := prov.newSubs[s]; !ok { @@ -314,7 +295,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { // See https://github.com/prometheus/prometheus/pull/8639 for details. // This also helps making the downstream managers drop stale targets as soon as possible. // See https://github.com/prometheus/prometheus/pull/13147 for details. - if triggerSync { + if len(m.providers) > 0 { select { case m.triggerSend <- struct{}{}: default: @@ -402,59 +383,47 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ } } -func (m *Manager) flushUpdates(ctx context.Context, timeout <-chan time.Time) { - m.metrics.SentUpdates.Inc() - select { - case m.syncCh <- m.allGroups(): - case <-timeout: - m.metrics.DelayedUpdates.Inc() - m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle") - select { - case m.triggerSend <- struct{}{}: - case <-ctx.Done(): - return - default: - } - } -} - func (m *Manager) sender() { - ticker := time.NewTicker(m.updatert) defer func() { - ticker.Stop() close(m.syncCh) }() - if m.skipStartupWait { - select { - case <-m.triggerSend: - m.flushUpdates(m.ctx, ticker.C) - case <-m.ctx.Done(): - return - } - ticker.Reset(m.updatert) + // Some discoverers send updates too often, so we throttle these with a backoff interval that + // increased the interval up to m.updatert delay. + lastSent := time.Now().Add(-1 * m.updatert) + b := &backoff.ExponentialBackOff{ + InitialInterval: 100 * time.Millisecond, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: m.updatert, } for { select { case <-m.ctx.Done(): return - case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker. + case <-time.After(b.NextBackOff()): select { case <-m.triggerSend: m.metrics.SentUpdates.Inc() select { case m.syncCh <- m.allGroups(): + lastSent = time.Now() default: m.metrics.DelayedUpdates.Inc() m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle") + // Ensure we don't miss this update. select { case m.triggerSend <- struct{}{}: default: } + } default: } + if time.Since(lastSent) > m.updatert { + b.Reset() // Nothing happened for a while, start again from low interval for prompt updates. + } } } } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 3fb3fae145..8cf9f50eef 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -799,14 +799,14 @@ func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) { { name: "startup wait with short interval succeeds", updatert: 10 * time.Millisecond, - readTimeout: 100 * time.Millisecond, + readTimeout: 300 * time.Millisecond, expectedTargets: 1, }, { name: "skip startup wait", skipInitialWait: true, updatert: 100 * time.Hour, - readTimeout: 100 * time.Millisecond, + readTimeout: 300 * time.Millisecond, expectedTargets: 1, }, } @@ -839,20 +839,25 @@ func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) { synctest.Wait() - var syncedTargets map[string][]*targetgroup.Group - select { - case syncedTargets = <-discoveryManager.SyncCh(): - case <-time.After(tc.readTimeout): + timeout := time.After(tc.readTimeout) + var lastSyncedTargets map[string][]*targetgroup.Group + testFor: + for { + select { + case <-timeout: + break testFor + case lastSyncedTargets = <-discoveryManager.SyncCh(): + } } if tc.expectedTargets == 0 { - require.Nil(t, syncedTargets) + require.Nil(t, lastSyncedTargets) return } - require.Len(t, syncedTargets, 1) - require.Len(t, syncedTargets["prometheus"], tc.expectedTargets) - verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Len(t, lastSyncedTargets, 1) + require.Len(t, lastSyncedTargets["prometheus"], tc.expectedTargets) + verifySyncedPresence(t, lastSyncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) p := pk("static", "prometheus", 0) verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)