From d9ccd70ac1950e201e8391073b50ae79b5265957 Mon Sep 17 00:00:00 2001 From: Siavash Safi Date: Thu, 22 Jan 2026 12:24:35 +0100 Subject: [PATCH] fix(notify): flaky tests (#17899) Add a helper function to set up AlertmanagerSets. Fix all flaky tests. Signed-off-by: Siavash Safi --- notifier/manager_test.go | 364 +++++++++------------------------------ 1 file changed, 82 insertions(+), 282 deletions(-) diff --git a/notifier/manager_test.go b/notifier/manager_test.go index f82a7ad511..39fc35a409 100644 --- a/notifier/manager_test.go +++ b/notifier/manager_test.go @@ -90,6 +90,33 @@ func newTestHTTPServerBuilder(expected *[]*Alert, errc chan<- error, u, p string })) } +func newTestAlertmanagerSet( + cfg *config.AlertmanagerConfig, + client *http.Client, + opts *Options, + metrics *alertMetrics, + alertmanagerURLs ...string, +) *alertmanagerSet { + ams := make([]alertmanager, len(alertmanagerURLs)) + for i, am := range alertmanagerURLs { + ams[i] = alertmanagerMock{urlf: func() string { return am }} + } + logger := slog.New(slog.DiscardHandler) + sendLoops := make(map[string]*sendLoop) + for _, am := range alertmanagerURLs { + sendLoops[am] = newSendLoop(am, client, cfg, opts, logger, metrics) + } + return &alertmanagerSet{ + ams: ams, + cfg: cfg, + client: client, + logger: logger, + metrics: metrics, + opts: opts, + sendLoops: sendLoops, + } +} + func TestHandlerSendAll(t *testing.T) { var ( errc = make(chan error, 1) @@ -107,7 +134,8 @@ func TestHandlerSendAll(t *testing.T) { defer server2.Close() defer server3.Close() - h := NewManager(&Options{}, model.UTF8Validation, nil) + reg := prometheus.NewRegistry() + h := NewManager(&Options{Registerer: reg}, model.UTF8Validation, nil) authClient, _ := config_util.NewClientFromConfig( config_util.HTTPClientConfig{ @@ -129,53 +157,10 @@ func TestHandlerSendAll(t *testing.T) { am3Cfg.Timeout = model.Duration(time.Second) opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: DefaultMaxBatchSize} - logger := slog.New(slog.DiscardHandler) - h.alertmanagers["1"] = &alertmanagerSet{ - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server1.URL }, - }, - }, - cfg: &am1Cfg, - client: authClient, - sendLoops: map[string]*sendLoop{ - server1.URL: newSendLoop(server1.URL, authClient, &am1Cfg, opts, logger, h.metrics), - }, - opts: opts, - metrics: h.metrics, - logger: logger, - } - - h.alertmanagers["2"] = &alertmanagerSet{ - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server2.URL }, - }, - alertmanagerMock{ - urlf: func() string { return server3.URL }, - }, - }, - cfg: &am2Cfg, - sendLoops: map[string]*sendLoop{ - server2.URL: newSendLoop(server2.URL, nil, &am2Cfg, opts, logger, h.metrics), - server3.URL: newSendLoop(server3.URL, nil, &am3Cfg, opts, logger, h.metrics), - }, - opts: opts, - metrics: h.metrics, - logger: logger, - } - - h.alertmanagers["3"] = &alertmanagerSet{ - ams: []alertmanager{}, // empty set - cfg: &am3Cfg, - sendLoops: map[string]*sendLoop{ - server3.URL: newSendLoop(server3.URL, nil, &am3Cfg, opts, logger, h.metrics), - }, - opts: opts, - metrics: h.metrics, - logger: logger, - } + h.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, authClient, opts, h.metrics, server1.URL) + h.alertmanagers["2"] = newTestAlertmanagerSet(&am2Cfg, nil, opts, h.metrics, server2.URL, server3.URL) + h.alertmanagers["3"] = newTestAlertmanagerSet(&am3Cfg, nil, opts, h.metrics) var alerts []*Alert for i := range DefaultMaxBatchSize { @@ -196,7 +181,7 @@ func TestHandlerSendAll(t *testing.T) { } } - // start send loops + // Start send loops. for _, ams := range h.alertmanagers { ams.startSendLoops(ams.ams) } @@ -212,32 +197,38 @@ func TestHandlerSendAll(t *testing.T) { }, time.Second*2, time.Millisecond*10) checkNoErr() - // the only am in set 1 is down + // The only am in set 1 is down. status1.Store(int32(http.StatusNotFound)) h.Send(alerts...) + // Wait for all send loops to process before changing any status. require.Eventually(t, func() bool { - return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server1.URL)) == DefaultMaxBatchSize + return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server1.URL)) == DefaultMaxBatchSize && + prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server2.URL)) == DefaultMaxBatchSize*2 && + prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server3.URL)) == DefaultMaxBatchSize*2 }, time.Second*2, time.Millisecond*10) checkNoErr() - // fix the am + // Fix the am. status1.Store(int32(http.StatusOK)) - // only one of the ams in set 2 is down + // Only one of the ams in set 2 is down. status2.Store(int32(http.StatusInternalServerError)) h.Send(alerts...) + // Wait for all send loops to either send or fail with errors depending on their status. require.Eventually(t, func() bool { - return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server2.URL)) == DefaultMaxBatchSize + return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server2.URL)) == DefaultMaxBatchSize && + prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server1.URL)) == DefaultMaxBatchSize*2 && + prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server3.URL)) == DefaultMaxBatchSize*3 }, time.Second*2, time.Millisecond*10) checkNoErr() - // both ams in set 2 are down + // Both ams in set 2 are down. status3.Store(int32(http.StatusInternalServerError)) h.Send(alerts...) require.Eventually(t, func() bool { return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server2.URL)) == DefaultMaxBatchSize*2 && prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server3.URL)) == DefaultMaxBatchSize - }, time.Second*3, time.Millisecond*10) + }, time.Second*2, time.Millisecond*10) checkNoErr() } @@ -262,7 +253,8 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { defer server2.Close() defer server3.Close() - h := NewManager(&Options{}, model.UTF8Validation, nil) + reg := prometheus.NewRegistry() + h := NewManager(&Options{QueueCapacity: 10_000, Registerer: reg}, model.UTF8Validation, nil) h.alertmanagers = make(map[string]*alertmanagerSet) am1Cfg := config.DefaultAlertmanagerConfig @@ -290,65 +282,14 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { }, } - opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: DefaultMaxBatchSize} - logger := slog.New(slog.DiscardHandler) - - h.alertmanagers = map[string]*alertmanagerSet{ - // Drop no alerts. - "1": { - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server1.URL }, - }, - }, - cfg: &am1Cfg, - sendLoops: map[string]*sendLoop{ - server1.URL: newSendLoop(server1.URL, nil, &am1Cfg, opts, logger, h.metrics), - }, - opts: opts, - metrics: h.metrics, - logger: logger, - }, - // Drop only alerts with the "alertnamedrop" label. - "2": { - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server2.URL }, - }, - }, - cfg: &am2Cfg, - sendLoops: map[string]*sendLoop{ - server2.URL: newSendLoop(server2.URL, nil, &am2Cfg, opts, logger, h.metrics), - }, - opts: opts, - metrics: h.metrics, - logger: logger, - }, - // Drop all alerts. - "3": { - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server3.URL }, - }, - }, - cfg: &am3Cfg, - sendLoops: map[string]*sendLoop{ - server3.URL: newSendLoop(server3.URL, nil, &am3Cfg, opts, logger, h.metrics), - }, - opts: opts, - metrics: h.metrics, - logger: logger, - }, - // Empty list of Alertmanager endpoints. - "4": { - ams: []alertmanager{}, - cfg: &config.DefaultAlertmanagerConfig, - sendLoops: make(map[string]*sendLoop), - opts: opts, - metrics: h.metrics, - logger: logger, - }, - } + // Drop no alerts. + h.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, h.opts, h.metrics, server1.URL) + // Drop only alerts with the "alertnamedrop" label. + h.alertmanagers["2"] = newTestAlertmanagerSet(&am2Cfg, nil, h.opts, h.metrics, server2.URL) + // Drop all alerts. + h.alertmanagers["3"] = newTestAlertmanagerSet(&am3Cfg, nil, h.opts, h.metrics, server3.URL) + // Empty list of Alertmanager endpoints. + h.alertmanagers["4"] = newTestAlertmanagerSet(&config.DefaultAlertmanagerConfig, nil, h.opts, h.metrics) var alerts []*Alert for i := range make([]struct{}, DefaultMaxBatchSize/2) { @@ -383,38 +324,38 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { } } - // start send loops + // Start send loops. for _, ams := range h.alertmanagers { ams.startSendLoops(ams.ams) } defer func() { - // stop send loops. + // Stop send loops. for _, ams := range h.alertmanagers { ams.cleanSendLoops(ams.ams...) } }() - // all ams are up + // All ams are up. h.Send(alerts...) require.Eventually(t, func() bool { return prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server1.URL)) == DefaultMaxBatchSize }, time.Second*2, time.Millisecond*10) checkNoErr() - // the only am in set 1 goes down + // The only am in set 1 goes down. status1.Store(int32(http.StatusInternalServerError)) h.Send(alerts...) - // wait for metrics to update + // Wait for metrics to update. require.Eventually(t, func() bool { return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server1.URL)) == DefaultMaxBatchSize }, time.Second*2, time.Millisecond*10) checkNoErr() - // reset set 1 + // Reset set 1. status1.Store(int32(http.StatusOK)) - // set 3 loses its only am, but all alerts were dropped - // so there was nothing to send, keeping sendAll true + // Set 3 loses its only am, but all alerts were dropped + // so there was nothing to send, keeping sendAll true. status3.Store(int32(http.StatusInternalServerError)) h.Send(alerts...) checkNoErr() @@ -441,12 +382,7 @@ func TestExternalLabels(t *testing.T) { cfg := config.DefaultAlertmanagerConfig h.alertmanagers = map[string]*alertmanagerSet{ - "test": { - cfg: &cfg, - sendLoops: map[string]*sendLoop{ - "test": newSendLoop("test", nil, &cfg, h.opts, slog.New(slog.DiscardHandler), h.metrics), - }, - }, + "test": newTestAlertmanagerSet(&cfg, nil, h.opts, h.metrics, "test"), } // This alert should get the external label attached. @@ -494,12 +430,7 @@ func TestHandlerRelabel(t *testing.T) { cfg := config.DefaultAlertmanagerConfig h.alertmanagers = map[string]*alertmanagerSet{ - "test": { - cfg: &cfg, - sendLoops: map[string]*sendLoop{ - "test": newSendLoop("test", nil, &cfg, h.opts, slog.New(slog.DiscardHandler), h.metrics), - }, - }, + "test": newTestAlertmanagerSet(&cfg, nil, h.opts, h.metrics, "test"), } // This alert should be dropped due to the configuration @@ -576,23 +507,12 @@ func TestHandlerQueuing(t *testing.T) { am1Cfg := config.DefaultAlertmanagerConfig am1Cfg.Timeout = model.Duration(time.Second) - - h.alertmanagers["1"] = &alertmanagerSet{ - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server.URL }, - }, - }, - cfg: &am1Cfg, - sendLoops: map[string]*sendLoop{ - server.URL: newSendLoop(server.URL, nil, &am1Cfg, h.opts, slog.New(slog.DiscardHandler), h.metrics), - }, - } + h.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, h.opts, h.metrics, server.URL) go h.Run(nil) defer h.Stop() - // start send loops + // Start send loops. for _, ams := range h.alertmanagers { ams.startSendLoops(ams.ams) } @@ -619,13 +539,6 @@ func TestHandlerQueuing(t *testing.T) { } } - // If the batch is larger than the queue capacity, it should be truncated - // from the front. - h.Send(alerts[:4*DefaultMaxBatchSize]...) - for i := 1; i < 4; i++ { - assertAlerts(alerts[i*DefaultMaxBatchSize : (i+1)*DefaultMaxBatchSize]) - } - // Send one batch, wait for it to arrive and block the server so the queue fills up. h.Send(alerts[:DefaultMaxBatchSize]...) <-called @@ -633,7 +546,7 @@ func TestHandlerQueuing(t *testing.T) { // Send several batches while the server is still blocked so the queue // fills up to its maximum capacity (3*DefaultMaxBatchSize). Then check that the // queue is truncated in the front. - h.Send(alerts[1*DefaultMaxBatchSize : 2*DefaultMaxBatchSize]...) // this batch should be dropped. + h.Send(alerts[1*DefaultMaxBatchSize : 2*DefaultMaxBatchSize]...) // This batch should be dropped. h.Send(alerts[2*DefaultMaxBatchSize : 3*DefaultMaxBatchSize]...) h.Send(alerts[3*DefaultMaxBatchSize : 4*DefaultMaxBatchSize]...) @@ -854,24 +767,7 @@ func TestHangingNotifier(t *testing.T) { notifier.alertmanagers = make(map[string]*alertmanagerSet) amCfg := config.DefaultAlertmanagerConfig amCfg.Timeout = model.Duration(sendTimeout) - notifier.alertmanagers["config-0"] = &alertmanagerSet{ - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return faultyURL.String() }, - }, - alertmanagerMock{ - urlf: func() string { return functionalURL.String() }, - }, - }, - cfg: &amCfg, - metrics: notifier.metrics, - sendLoops: map[string]*sendLoop{ - faultyURL.String(): newSendLoop(faultyURL.String(), nil, &amCfg, notifier.opts, slog.New(slog.DiscardHandler), notifier.metrics), - functionalURL.String(): newSendLoop(functionalURL.String(), nil, &amCfg, notifier.opts, slog.New(slog.DiscardHandler), notifier.metrics), - }, - opts: &Options{Do: do, MaxBatchSize: DefaultMaxBatchSize}, - logger: slog.New(slog.DiscardHandler), - } + notifier.alertmanagers["config-0"] = newTestAlertmanagerSet(&amCfg, nil, notifier.opts, notifier.metrics, faultyURL.String(), functionalURL.String()) for _, ams := range notifier.alertmanagers { ams.startSendLoops(ams.ams) @@ -932,7 +828,7 @@ loop2: // The faulty alertmanager was dropped. if len(notifier.Alertmanagers()) == 1 { // The notifier should not wait until the alerts queue of the functional am is empty to apply the discovery changes. - require.NotEmpty(t, notifier.alertmanagers["config-0"].sendLoops[functionalURL.String()].queue) + require.NotZero(t, notifier.alertmanagers["config-0"].sendLoops[functionalURL.String()].queueLen()) break loop2 } } @@ -982,20 +878,7 @@ func TestStop_DrainingDisabled(t *testing.T) { am1Cfg := config.DefaultAlertmanagerConfig am1Cfg.Timeout = model.Duration(time.Second) - - m.alertmanagers["1"] = &alertmanagerSet{ - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server.URL }, - }, - }, - cfg: &am1Cfg, - sendLoops: map[string]*sendLoop{ - server.URL: newSendLoop(server.URL, nil, &am1Cfg, m.opts, slog.New(slog.DiscardHandler), m.metrics), - }, - opts: &Options{Do: do, MaxBatchSize: DefaultMaxBatchSize}, - logger: slog.New(slog.DiscardHandler), - } + m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, server.URL) for _, ams := range m.alertmanagers { ams.startSendLoops(ams.ams) @@ -1080,21 +963,7 @@ func TestStop_DrainingEnabled(t *testing.T) { am1Cfg := config.DefaultAlertmanagerConfig am1Cfg.Timeout = model.Duration(time.Second) - - m.alertmanagers["1"] = &alertmanagerSet{ - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server.URL }, - }, - }, - cfg: &am1Cfg, - sendLoops: map[string]*sendLoop{ - server.URL: newSendLoop(server.URL, nil, &am1Cfg, m.opts, slog.New(slog.DiscardHandler), m.metrics), - }, - opts: &Options{Do: do, MaxBatchSize: DefaultMaxBatchSize}, - metrics: m.metrics, - logger: slog.New(slog.DiscardHandler), - } + m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, server.URL) for _, ams := range m.alertmanagers { ams.startSendLoops(ams.ams) @@ -1145,29 +1014,12 @@ func TestQueuesDrainingOnApplyConfig(t *testing.T) { server := newImmediateAlertManager(alertSent) defer server.Close() - h := NewManager(&Options{}, model.UTF8Validation, nil) + h := NewManager(&Options{QueueCapacity: 10, DrainOnShutdown: drainOnShutDown}, model.UTF8Validation, nil) h.alertmanagers = make(map[string]*alertmanagerSet) amCfg := config.DefaultAlertmanagerConfig amCfg.Timeout = model.Duration(time.Second) - - opts := &Options{Do: do, QueueCapacity: 10, MaxBatchSize: DefaultMaxBatchSize, DrainOnShutdown: drainOnShutDown} - logger := slog.New(slog.DiscardHandler) - - h.alertmanagers["1"] = &alertmanagerSet{ - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server.URL }, - }, - }, - cfg: &amCfg, - sendLoops: map[string]*sendLoop{ - server.URL: newSendLoop(server.URL, nil, &amCfg, opts, logger, h.metrics), - }, - opts: opts, - metrics: h.metrics, - logger: logger, - } + h.alertmanagers["1"] = newTestAlertmanagerSet(&amCfg, nil, h.opts, h.metrics, server.URL) // The send loops were not started, nothing will be sent. h.Send([]*Alert{{Labels: labels.FromStrings("alertname", "foo")}}...) @@ -1313,7 +1165,7 @@ func TestAlerstRelabelingIsIsolated(t *testing.T) { defer server1.Close() defer server2.Close() - h := NewManager(&Options{}, model.UTF8Validation, nil) + h := NewManager(&Options{QueueCapacity: 10}, model.UTF8Validation, nil) h.alertmanagers = make(map[string]*alertmanagerSet) am1Cfg := config.DefaultAlertmanagerConfig @@ -1333,37 +1185,11 @@ func TestAlerstRelabelingIsIsolated(t *testing.T) { am2Cfg.Timeout = model.Duration(time.Second) h.alertmanagers = map[string]*alertmanagerSet{ - "am1": { - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server1.URL }, - }, - }, - cfg: &am1Cfg, - sendLoops: map[string]*sendLoop{ - server1.URL: newSendLoop(server1.URL, nil, &am1Cfg, &Options{}, h.logger, h.metrics), - }, - opts: &Options{}, - metrics: h.metrics, - logger: h.logger, - }, - "am2": { - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return server2.URL }, - }, - }, - cfg: &am2Cfg, - sendLoops: map[string]*sendLoop{ - server2.URL: newSendLoop(server2.URL, nil, &am2Cfg, &Options{}, h.logger, h.metrics), - }, - opts: &Options{}, - metrics: h.metrics, - logger: h.logger, - }, + "am1": newTestAlertmanagerSet(&am1Cfg, nil, h.opts, h.metrics, server1.URL), + "am2": newTestAlertmanagerSet(&am2Cfg, nil, h.opts, h.metrics, server2.URL), } - // start send loops + // Start send loops. for _, ams := range h.alertmanagers { ams.startSendLoops(ams.ams) } @@ -1381,7 +1207,7 @@ func TestAlerstRelabelingIsIsolated(t *testing.T) { Labels: labels.FromStrings("alertname", "test", "parasite", "yes"), }) - // am2 shouldn't get the parasite label. + // Am2 shouldn't get the parasite label. expected2 = append(expected2, &Alert{ Labels: labels.FromStrings("alertname", "test"), }) @@ -1431,34 +1257,8 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) { amCfg := config.DefaultAlertmanagerConfig amCfg.Timeout = model.Duration(time.Hour * 24 * 365) - - h.alertmanagers["1"] = &alertmanagerSet{ - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return blackHoleAM.URL }, - }, - }, - cfg: &amCfg, - opts: h.opts, - sendLoops: map[string]*sendLoop{ - blackHoleAM.URL: newSendLoop(blackHoleAM.URL, http.DefaultClient, &amCfg, h.opts, slog.New(slog.DiscardHandler), h.metrics), - }, - metrics: h.metrics, - } - - h.alertmanagers["2"] = &alertmanagerSet{ - ams: []alertmanager{ - alertmanagerMock{ - urlf: func() string { return immediateAM.URL }, - }, - }, - cfg: &amCfg, - opts: h.opts, - sendLoops: map[string]*sendLoop{ - immediateAM.URL: newSendLoop(immediateAM.URL, http.DefaultClient, &amCfg, h.opts, slog.New(slog.DiscardHandler), h.metrics), - }, - metrics: h.metrics, - } + h.alertmanagers["1"] = newTestAlertmanagerSet(&amCfg, http.DefaultClient, h.opts, h.metrics, blackHoleAM.URL) + h.alertmanagers["2"] = newTestAlertmanagerSet(&amCfg, http.DefaultClient, h.opts, h.metrics, immediateAM.URL) doneSendAll := make(chan struct{}) for _, ams := range h.alertmanagers {