mirror of
https://github.com/prometheus/prometheus.git
synced 2026-04-07 10:15:49 -04:00
alternative
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
0f38319b92
commit
043d710282
2 changed files with 33 additions and 59 deletions
|
|
@ -22,6 +22,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v5"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/promslog"
|
||||
|
|
@ -95,7 +96,7 @@ func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus.
|
|||
targets: make(map[poolKey]map[string]*targetgroup.Group),
|
||||
ctx: ctx,
|
||||
updatert: 5 * time.Second,
|
||||
triggerSend: make(chan struct{}, 1),
|
||||
triggerSend: make(chan struct{}, 1), // At least one element to ensure we can do a delayed read.
|
||||
registerer: registerer,
|
||||
sdMetrics: sdMetrics,
|
||||
}
|
||||
|
|
@ -158,17 +159,6 @@ func FeatureRegistry(fr features.Collector) func(*Manager) {
|
|||
}
|
||||
}
|
||||
|
||||
// SkipStartupWait configures the manager to skip the initial wait on startup.
|
||||
// This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
|
||||
// which are sensitive to startup latencies.
|
||||
func SkipStartupWait() func(*Manager) {
|
||||
return func(m *Manager) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
m.skipStartupWait = true
|
||||
}
|
||||
}
|
||||
|
||||
// Manager maintains a set of discovery providers and sends each update to a map channel.
|
||||
// Targets are grouped by the target set name.
|
||||
type Manager struct {
|
||||
|
|
@ -206,11 +196,6 @@ type Manager struct {
|
|||
|
||||
// featureRegistry is used to track which service discovery providers are configured.
|
||||
featureRegistry features.Collector
|
||||
|
||||
// skipStartupWait allows the discovery manager to skip the initial wait before sending updates
|
||||
// to the channel. This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
|
||||
// which are sensitive to startup latencies.
|
||||
skipStartupWait bool
|
||||
}
|
||||
|
||||
// Providers returns the currently configured SD providers.
|
||||
|
|
@ -253,8 +238,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
|||
var (
|
||||
wg sync.WaitGroup
|
||||
newProviders []*Provider
|
||||
// triggerSync shows if we should trigger send to notify downstream of changes.
|
||||
triggerSync bool
|
||||
)
|
||||
for _, prov := range m.providers {
|
||||
// Cancel obsolete providers if it has no new subs and it has a cancel function.
|
||||
|
|
@ -269,7 +252,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
|||
|
||||
prov.cancel()
|
||||
prov.mu.RUnlock()
|
||||
triggerSync = true // Trigger send to notify downstream of dropped targets
|
||||
continue
|
||||
}
|
||||
prov.mu.RUnlock()
|
||||
|
|
@ -281,7 +263,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
|||
|
||||
m.targetsMtx.Lock()
|
||||
for s := range prov.subs {
|
||||
triggerSync = true // Trigger send because this is an existing provider (reload)
|
||||
refTargets = m.targets[poolKey{s, prov.name}]
|
||||
// Remove obsolete subs' targets.
|
||||
if _, ok := prov.newSubs[s]; !ok {
|
||||
|
|
@ -314,7 +295,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
|||
// See https://github.com/prometheus/prometheus/pull/8639 for details.
|
||||
// This also helps making the downstream managers drop stale targets as soon as possible.
|
||||
// See https://github.com/prometheus/prometheus/pull/13147 for details.
|
||||
if triggerSync {
|
||||
if len(m.providers) > 0 {
|
||||
select {
|
||||
case m.triggerSend <- struct{}{}:
|
||||
default:
|
||||
|
|
@ -402,59 +383,47 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ
|
|||
}
|
||||
}
|
||||
|
||||
func (m *Manager) flushUpdates(ctx context.Context, timeout <-chan time.Time) {
|
||||
m.metrics.SentUpdates.Inc()
|
||||
select {
|
||||
case m.syncCh <- m.allGroups():
|
||||
case <-timeout:
|
||||
m.metrics.DelayedUpdates.Inc()
|
||||
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
|
||||
select {
|
||||
case m.triggerSend <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) sender() {
|
||||
ticker := time.NewTicker(m.updatert)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
close(m.syncCh)
|
||||
}()
|
||||
|
||||
if m.skipStartupWait {
|
||||
select {
|
||||
case <-m.triggerSend:
|
||||
m.flushUpdates(m.ctx, ticker.C)
|
||||
case <-m.ctx.Done():
|
||||
return
|
||||
}
|
||||
ticker.Reset(m.updatert)
|
||||
// Some discoverers send updates too often, so we throttle these with a backoff interval that
|
||||
// increased the interval up to m.updatert delay.
|
||||
lastSent := time.Now().Add(-1 * m.updatert)
|
||||
b := &backoff.ExponentialBackOff{
|
||||
InitialInterval: 100 * time.Millisecond,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: m.updatert,
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-m.ctx.Done():
|
||||
return
|
||||
case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker.
|
||||
case <-time.After(b.NextBackOff()):
|
||||
select {
|
||||
case <-m.triggerSend:
|
||||
m.metrics.SentUpdates.Inc()
|
||||
select {
|
||||
case m.syncCh <- m.allGroups():
|
||||
lastSent = time.Now()
|
||||
default:
|
||||
m.metrics.DelayedUpdates.Inc()
|
||||
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
|
||||
// Ensure we don't miss this update.
|
||||
select {
|
||||
case m.triggerSend <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
}
|
||||
default:
|
||||
}
|
||||
if time.Since(lastSent) > m.updatert {
|
||||
b.Reset() // Nothing happened for a while, start again from low interval for prompt updates.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -799,14 +799,14 @@ func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
|
|||
{
|
||||
name: "startup wait with short interval succeeds",
|
||||
updatert: 10 * time.Millisecond,
|
||||
readTimeout: 100 * time.Millisecond,
|
||||
readTimeout: 300 * time.Millisecond,
|
||||
expectedTargets: 1,
|
||||
},
|
||||
{
|
||||
name: "skip startup wait",
|
||||
skipInitialWait: true,
|
||||
updatert: 100 * time.Hour,
|
||||
readTimeout: 100 * time.Millisecond,
|
||||
readTimeout: 300 * time.Millisecond,
|
||||
expectedTargets: 1,
|
||||
},
|
||||
}
|
||||
|
|
@ -839,20 +839,25 @@ func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
|
|||
|
||||
synctest.Wait()
|
||||
|
||||
var syncedTargets map[string][]*targetgroup.Group
|
||||
select {
|
||||
case syncedTargets = <-discoveryManager.SyncCh():
|
||||
case <-time.After(tc.readTimeout):
|
||||
timeout := time.After(tc.readTimeout)
|
||||
var lastSyncedTargets map[string][]*targetgroup.Group
|
||||
testFor:
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
break testFor
|
||||
case lastSyncedTargets = <-discoveryManager.SyncCh():
|
||||
}
|
||||
}
|
||||
|
||||
if tc.expectedTargets == 0 {
|
||||
require.Nil(t, syncedTargets)
|
||||
require.Nil(t, lastSyncedTargets)
|
||||
return
|
||||
}
|
||||
|
||||
require.Len(t, syncedTargets, 1)
|
||||
require.Len(t, syncedTargets["prometheus"], tc.expectedTargets)
|
||||
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
||||
require.Len(t, lastSyncedTargets, 1)
|
||||
require.Len(t, lastSyncedTargets["prometheus"], tc.expectedTargets)
|
||||
verifySyncedPresence(t, lastSyncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
||||
|
||||
p := pk("static", "prometheus", 0)
|
||||
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
||||
|
|
|
|||
Loading…
Reference in a new issue