fix(notify): flaky tests (#17899)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

Add a helper function to set up AlertmanagerSets.
Fix all flaky tests.

Signed-off-by: Siavash Safi <siavash@cloudflare.com>
This commit is contained in:
Siavash Safi 2026-01-22 12:24:35 +01:00 committed by GitHub
parent 8e6b097560
commit d9ccd70ac1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -90,6 +90,33 @@ func newTestHTTPServerBuilder(expected *[]*Alert, errc chan<- error, u, p string
}))
}
func newTestAlertmanagerSet(
cfg *config.AlertmanagerConfig,
client *http.Client,
opts *Options,
metrics *alertMetrics,
alertmanagerURLs ...string,
) *alertmanagerSet {
ams := make([]alertmanager, len(alertmanagerURLs))
for i, am := range alertmanagerURLs {
ams[i] = alertmanagerMock{urlf: func() string { return am }}
}
logger := slog.New(slog.DiscardHandler)
sendLoops := make(map[string]*sendLoop)
for _, am := range alertmanagerURLs {
sendLoops[am] = newSendLoop(am, client, cfg, opts, logger, metrics)
}
return &alertmanagerSet{
ams: ams,
cfg: cfg,
client: client,
logger: logger,
metrics: metrics,
opts: opts,
sendLoops: sendLoops,
}
}
func TestHandlerSendAll(t *testing.T) {
var (
errc = make(chan error, 1)
@ -107,7 +134,8 @@ func TestHandlerSendAll(t *testing.T) {
defer server2.Close()
defer server3.Close()
h := NewManager(&Options{}, model.UTF8Validation, nil)
reg := prometheus.NewRegistry()
h := NewManager(&Options{Registerer: reg}, model.UTF8Validation, nil)
authClient, _ := config_util.NewClientFromConfig(
config_util.HTTPClientConfig{
@ -129,53 +157,10 @@ func TestHandlerSendAll(t *testing.T) {
am3Cfg.Timeout = model.Duration(time.Second)
opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: DefaultMaxBatchSize}
logger := slog.New(slog.DiscardHandler)
h.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server1.URL },
},
},
cfg: &am1Cfg,
client: authClient,
sendLoops: map[string]*sendLoop{
server1.URL: newSendLoop(server1.URL, authClient, &am1Cfg, opts, logger, h.metrics),
},
opts: opts,
metrics: h.metrics,
logger: logger,
}
h.alertmanagers["2"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server2.URL },
},
alertmanagerMock{
urlf: func() string { return server3.URL },
},
},
cfg: &am2Cfg,
sendLoops: map[string]*sendLoop{
server2.URL: newSendLoop(server2.URL, nil, &am2Cfg, opts, logger, h.metrics),
server3.URL: newSendLoop(server3.URL, nil, &am3Cfg, opts, logger, h.metrics),
},
opts: opts,
metrics: h.metrics,
logger: logger,
}
h.alertmanagers["3"] = &alertmanagerSet{
ams: []alertmanager{}, // empty set
cfg: &am3Cfg,
sendLoops: map[string]*sendLoop{
server3.URL: newSendLoop(server3.URL, nil, &am3Cfg, opts, logger, h.metrics),
},
opts: opts,
metrics: h.metrics,
logger: logger,
}
h.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, authClient, opts, h.metrics, server1.URL)
h.alertmanagers["2"] = newTestAlertmanagerSet(&am2Cfg, nil, opts, h.metrics, server2.URL, server3.URL)
h.alertmanagers["3"] = newTestAlertmanagerSet(&am3Cfg, nil, opts, h.metrics)
var alerts []*Alert
for i := range DefaultMaxBatchSize {
@ -196,7 +181,7 @@ func TestHandlerSendAll(t *testing.T) {
}
}
// start send loops
// Start send loops.
for _, ams := range h.alertmanagers {
ams.startSendLoops(ams.ams)
}
@ -212,32 +197,38 @@ func TestHandlerSendAll(t *testing.T) {
}, time.Second*2, time.Millisecond*10)
checkNoErr()
// the only am in set 1 is down
// The only am in set 1 is down.
status1.Store(int32(http.StatusNotFound))
h.Send(alerts...)
// Wait for all send loops to process before changing any status.
require.Eventually(t, func() bool {
return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server1.URL)) == DefaultMaxBatchSize
return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server1.URL)) == DefaultMaxBatchSize &&
prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server2.URL)) == DefaultMaxBatchSize*2 &&
prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server3.URL)) == DefaultMaxBatchSize*2
}, time.Second*2, time.Millisecond*10)
checkNoErr()
// fix the am
// Fix the am.
status1.Store(int32(http.StatusOK))
// only one of the ams in set 2 is down
// Only one of the ams in set 2 is down.
status2.Store(int32(http.StatusInternalServerError))
h.Send(alerts...)
// Wait for all send loops to either send or fail with errors depending on their status.
require.Eventually(t, func() bool {
return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server2.URL)) == DefaultMaxBatchSize
return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server2.URL)) == DefaultMaxBatchSize &&
prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server1.URL)) == DefaultMaxBatchSize*2 &&
prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server3.URL)) == DefaultMaxBatchSize*3
}, time.Second*2, time.Millisecond*10)
checkNoErr()
// both ams in set 2 are down
// Both ams in set 2 are down.
status3.Store(int32(http.StatusInternalServerError))
h.Send(alerts...)
require.Eventually(t, func() bool {
return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server2.URL)) == DefaultMaxBatchSize*2 &&
prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server3.URL)) == DefaultMaxBatchSize
}, time.Second*3, time.Millisecond*10)
}, time.Second*2, time.Millisecond*10)
checkNoErr()
}
@ -262,7 +253,8 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
defer server2.Close()
defer server3.Close()
h := NewManager(&Options{}, model.UTF8Validation, nil)
reg := prometheus.NewRegistry()
h := NewManager(&Options{QueueCapacity: 10_000, Registerer: reg}, model.UTF8Validation, nil)
h.alertmanagers = make(map[string]*alertmanagerSet)
am1Cfg := config.DefaultAlertmanagerConfig
@ -290,65 +282,14 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
},
}
opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: DefaultMaxBatchSize}
logger := slog.New(slog.DiscardHandler)
h.alertmanagers = map[string]*alertmanagerSet{
// Drop no alerts.
"1": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server1.URL },
},
},
cfg: &am1Cfg,
sendLoops: map[string]*sendLoop{
server1.URL: newSendLoop(server1.URL, nil, &am1Cfg, opts, logger, h.metrics),
},
opts: opts,
metrics: h.metrics,
logger: logger,
},
// Drop only alerts with the "alertnamedrop" label.
"2": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server2.URL },
},
},
cfg: &am2Cfg,
sendLoops: map[string]*sendLoop{
server2.URL: newSendLoop(server2.URL, nil, &am2Cfg, opts, logger, h.metrics),
},
opts: opts,
metrics: h.metrics,
logger: logger,
},
// Drop all alerts.
"3": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server3.URL },
},
},
cfg: &am3Cfg,
sendLoops: map[string]*sendLoop{
server3.URL: newSendLoop(server3.URL, nil, &am3Cfg, opts, logger, h.metrics),
},
opts: opts,
metrics: h.metrics,
logger: logger,
},
// Empty list of Alertmanager endpoints.
"4": {
ams: []alertmanager{},
cfg: &config.DefaultAlertmanagerConfig,
sendLoops: make(map[string]*sendLoop),
opts: opts,
metrics: h.metrics,
logger: logger,
},
}
// Drop no alerts.
h.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, h.opts, h.metrics, server1.URL)
// Drop only alerts with the "alertnamedrop" label.
h.alertmanagers["2"] = newTestAlertmanagerSet(&am2Cfg, nil, h.opts, h.metrics, server2.URL)
// Drop all alerts.
h.alertmanagers["3"] = newTestAlertmanagerSet(&am3Cfg, nil, h.opts, h.metrics, server3.URL)
// Empty list of Alertmanager endpoints.
h.alertmanagers["4"] = newTestAlertmanagerSet(&config.DefaultAlertmanagerConfig, nil, h.opts, h.metrics)
var alerts []*Alert
for i := range make([]struct{}, DefaultMaxBatchSize/2) {
@ -383,38 +324,38 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
}
}
// start send loops
// Start send loops.
for _, ams := range h.alertmanagers {
ams.startSendLoops(ams.ams)
}
defer func() {
// stop send loops.
// Stop send loops.
for _, ams := range h.alertmanagers {
ams.cleanSendLoops(ams.ams...)
}
}()
// all ams are up
// All ams are up.
h.Send(alerts...)
require.Eventually(t, func() bool {
return prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server1.URL)) == DefaultMaxBatchSize
}, time.Second*2, time.Millisecond*10)
checkNoErr()
// the only am in set 1 goes down
// The only am in set 1 goes down.
status1.Store(int32(http.StatusInternalServerError))
h.Send(alerts...)
// wait for metrics to update
// Wait for metrics to update.
require.Eventually(t, func() bool {
return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server1.URL)) == DefaultMaxBatchSize
}, time.Second*2, time.Millisecond*10)
checkNoErr()
// reset set 1
// Reset set 1.
status1.Store(int32(http.StatusOK))
// set 3 loses its only am, but all alerts were dropped
// so there was nothing to send, keeping sendAll true
// Set 3 loses its only am, but all alerts were dropped
// so there was nothing to send, keeping sendAll true.
status3.Store(int32(http.StatusInternalServerError))
h.Send(alerts...)
checkNoErr()
@ -441,12 +382,7 @@ func TestExternalLabels(t *testing.T) {
cfg := config.DefaultAlertmanagerConfig
h.alertmanagers = map[string]*alertmanagerSet{
"test": {
cfg: &cfg,
sendLoops: map[string]*sendLoop{
"test": newSendLoop("test", nil, &cfg, h.opts, slog.New(slog.DiscardHandler), h.metrics),
},
},
"test": newTestAlertmanagerSet(&cfg, nil, h.opts, h.metrics, "test"),
}
// This alert should get the external label attached.
@ -494,12 +430,7 @@ func TestHandlerRelabel(t *testing.T) {
cfg := config.DefaultAlertmanagerConfig
h.alertmanagers = map[string]*alertmanagerSet{
"test": {
cfg: &cfg,
sendLoops: map[string]*sendLoop{
"test": newSendLoop("test", nil, &cfg, h.opts, slog.New(slog.DiscardHandler), h.metrics),
},
},
"test": newTestAlertmanagerSet(&cfg, nil, h.opts, h.metrics, "test"),
}
// This alert should be dropped due to the configuration
@ -576,23 +507,12 @@ func TestHandlerQueuing(t *testing.T) {
am1Cfg := config.DefaultAlertmanagerConfig
am1Cfg.Timeout = model.Duration(time.Second)
h.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server.URL },
},
},
cfg: &am1Cfg,
sendLoops: map[string]*sendLoop{
server.URL: newSendLoop(server.URL, nil, &am1Cfg, h.opts, slog.New(slog.DiscardHandler), h.metrics),
},
}
h.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, h.opts, h.metrics, server.URL)
go h.Run(nil)
defer h.Stop()
// start send loops
// Start send loops.
for _, ams := range h.alertmanagers {
ams.startSendLoops(ams.ams)
}
@ -619,13 +539,6 @@ func TestHandlerQueuing(t *testing.T) {
}
}
// If the batch is larger than the queue capacity, it should be truncated
// from the front.
h.Send(alerts[:4*DefaultMaxBatchSize]...)
for i := 1; i < 4; i++ {
assertAlerts(alerts[i*DefaultMaxBatchSize : (i+1)*DefaultMaxBatchSize])
}
// Send one batch, wait for it to arrive and block the server so the queue fills up.
h.Send(alerts[:DefaultMaxBatchSize]...)
<-called
@ -633,7 +546,7 @@ func TestHandlerQueuing(t *testing.T) {
// Send several batches while the server is still blocked so the queue
// fills up to its maximum capacity (3*DefaultMaxBatchSize). Then check that the
// queue is truncated in the front.
h.Send(alerts[1*DefaultMaxBatchSize : 2*DefaultMaxBatchSize]...) // this batch should be dropped.
h.Send(alerts[1*DefaultMaxBatchSize : 2*DefaultMaxBatchSize]...) // This batch should be dropped.
h.Send(alerts[2*DefaultMaxBatchSize : 3*DefaultMaxBatchSize]...)
h.Send(alerts[3*DefaultMaxBatchSize : 4*DefaultMaxBatchSize]...)
@ -854,24 +767,7 @@ func TestHangingNotifier(t *testing.T) {
notifier.alertmanagers = make(map[string]*alertmanagerSet)
amCfg := config.DefaultAlertmanagerConfig
amCfg.Timeout = model.Duration(sendTimeout)
notifier.alertmanagers["config-0"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return faultyURL.String() },
},
alertmanagerMock{
urlf: func() string { return functionalURL.String() },
},
},
cfg: &amCfg,
metrics: notifier.metrics,
sendLoops: map[string]*sendLoop{
faultyURL.String(): newSendLoop(faultyURL.String(), nil, &amCfg, notifier.opts, slog.New(slog.DiscardHandler), notifier.metrics),
functionalURL.String(): newSendLoop(functionalURL.String(), nil, &amCfg, notifier.opts, slog.New(slog.DiscardHandler), notifier.metrics),
},
opts: &Options{Do: do, MaxBatchSize: DefaultMaxBatchSize},
logger: slog.New(slog.DiscardHandler),
}
notifier.alertmanagers["config-0"] = newTestAlertmanagerSet(&amCfg, nil, notifier.opts, notifier.metrics, faultyURL.String(), functionalURL.String())
for _, ams := range notifier.alertmanagers {
ams.startSendLoops(ams.ams)
@ -932,7 +828,7 @@ loop2:
// The faulty alertmanager was dropped.
if len(notifier.Alertmanagers()) == 1 {
// The notifier should not wait until the alerts queue of the functional am is empty to apply the discovery changes.
require.NotEmpty(t, notifier.alertmanagers["config-0"].sendLoops[functionalURL.String()].queue)
require.NotZero(t, notifier.alertmanagers["config-0"].sendLoops[functionalURL.String()].queueLen())
break loop2
}
}
@ -982,20 +878,7 @@ func TestStop_DrainingDisabled(t *testing.T) {
am1Cfg := config.DefaultAlertmanagerConfig
am1Cfg.Timeout = model.Duration(time.Second)
m.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server.URL },
},
},
cfg: &am1Cfg,
sendLoops: map[string]*sendLoop{
server.URL: newSendLoop(server.URL, nil, &am1Cfg, m.opts, slog.New(slog.DiscardHandler), m.metrics),
},
opts: &Options{Do: do, MaxBatchSize: DefaultMaxBatchSize},
logger: slog.New(slog.DiscardHandler),
}
m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, server.URL)
for _, ams := range m.alertmanagers {
ams.startSendLoops(ams.ams)
@ -1080,21 +963,7 @@ func TestStop_DrainingEnabled(t *testing.T) {
am1Cfg := config.DefaultAlertmanagerConfig
am1Cfg.Timeout = model.Duration(time.Second)
m.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server.URL },
},
},
cfg: &am1Cfg,
sendLoops: map[string]*sendLoop{
server.URL: newSendLoop(server.URL, nil, &am1Cfg, m.opts, slog.New(slog.DiscardHandler), m.metrics),
},
opts: &Options{Do: do, MaxBatchSize: DefaultMaxBatchSize},
metrics: m.metrics,
logger: slog.New(slog.DiscardHandler),
}
m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, server.URL)
for _, ams := range m.alertmanagers {
ams.startSendLoops(ams.ams)
@ -1145,29 +1014,12 @@ func TestQueuesDrainingOnApplyConfig(t *testing.T) {
server := newImmediateAlertManager(alertSent)
defer server.Close()
h := NewManager(&Options{}, model.UTF8Validation, nil)
h := NewManager(&Options{QueueCapacity: 10, DrainOnShutdown: drainOnShutDown}, model.UTF8Validation, nil)
h.alertmanagers = make(map[string]*alertmanagerSet)
amCfg := config.DefaultAlertmanagerConfig
amCfg.Timeout = model.Duration(time.Second)
opts := &Options{Do: do, QueueCapacity: 10, MaxBatchSize: DefaultMaxBatchSize, DrainOnShutdown: drainOnShutDown}
logger := slog.New(slog.DiscardHandler)
h.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server.URL },
},
},
cfg: &amCfg,
sendLoops: map[string]*sendLoop{
server.URL: newSendLoop(server.URL, nil, &amCfg, opts, logger, h.metrics),
},
opts: opts,
metrics: h.metrics,
logger: logger,
}
h.alertmanagers["1"] = newTestAlertmanagerSet(&amCfg, nil, h.opts, h.metrics, server.URL)
// The send loops were not started, nothing will be sent.
h.Send([]*Alert{{Labels: labels.FromStrings("alertname", "foo")}}...)
@ -1313,7 +1165,7 @@ func TestAlerstRelabelingIsIsolated(t *testing.T) {
defer server1.Close()
defer server2.Close()
h := NewManager(&Options{}, model.UTF8Validation, nil)
h := NewManager(&Options{QueueCapacity: 10}, model.UTF8Validation, nil)
h.alertmanagers = make(map[string]*alertmanagerSet)
am1Cfg := config.DefaultAlertmanagerConfig
@ -1333,37 +1185,11 @@ func TestAlerstRelabelingIsIsolated(t *testing.T) {
am2Cfg.Timeout = model.Duration(time.Second)
h.alertmanagers = map[string]*alertmanagerSet{
"am1": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server1.URL },
},
},
cfg: &am1Cfg,
sendLoops: map[string]*sendLoop{
server1.URL: newSendLoop(server1.URL, nil, &am1Cfg, &Options{}, h.logger, h.metrics),
},
opts: &Options{},
metrics: h.metrics,
logger: h.logger,
},
"am2": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server2.URL },
},
},
cfg: &am2Cfg,
sendLoops: map[string]*sendLoop{
server2.URL: newSendLoop(server2.URL, nil, &am2Cfg, &Options{}, h.logger, h.metrics),
},
opts: &Options{},
metrics: h.metrics,
logger: h.logger,
},
"am1": newTestAlertmanagerSet(&am1Cfg, nil, h.opts, h.metrics, server1.URL),
"am2": newTestAlertmanagerSet(&am2Cfg, nil, h.opts, h.metrics, server2.URL),
}
// start send loops
// Start send loops.
for _, ams := range h.alertmanagers {
ams.startSendLoops(ams.ams)
}
@ -1381,7 +1207,7 @@ func TestAlerstRelabelingIsIsolated(t *testing.T) {
Labels: labels.FromStrings("alertname", "test", "parasite", "yes"),
})
// am2 shouldn't get the parasite label.
// Am2 shouldn't get the parasite label.
expected2 = append(expected2, &Alert{
Labels: labels.FromStrings("alertname", "test"),
})
@ -1431,34 +1257,8 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) {
amCfg := config.DefaultAlertmanagerConfig
amCfg.Timeout = model.Duration(time.Hour * 24 * 365)
h.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return blackHoleAM.URL },
},
},
cfg: &amCfg,
opts: h.opts,
sendLoops: map[string]*sendLoop{
blackHoleAM.URL: newSendLoop(blackHoleAM.URL, http.DefaultClient, &amCfg, h.opts, slog.New(slog.DiscardHandler), h.metrics),
},
metrics: h.metrics,
}
h.alertmanagers["2"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return immediateAM.URL },
},
},
cfg: &amCfg,
opts: h.opts,
sendLoops: map[string]*sendLoop{
immediateAM.URL: newSendLoop(immediateAM.URL, http.DefaultClient, &amCfg, h.opts, slog.New(slog.DiscardHandler), h.metrics),
},
metrics: h.metrics,
}
h.alertmanagers["1"] = newTestAlertmanagerSet(&amCfg, http.DefaultClient, h.opts, h.metrics, blackHoleAM.URL)
h.alertmanagers["2"] = newTestAlertmanagerSet(&amCfg, http.DefaultClient, h.opts, h.metrics, immediateAM.URL)
doneSendAll := make(chan struct{})
for _, ams := range h.alertmanagers {