mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-28 04:02:21 -04:00
* notifier: unit test for dropping throughput on stuck AM Ref: https://github.com/prometheus/prometheus/issues/7676 Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> Signed-off-by: Siavash Safi <siavash@cloudflare.com> * chore(notifier): remove year from copyrights Signed-off-by: Siavash Safi <siavash@cloudflare.com> * feat(notifier): independent alertmanager sendloops Independent Alertmanager sendloops avoid issues with queue overflowing when one or more Alertmanager instances are unavailable which could result in lost alert notifications. The sendloops are managed per AlertmanagerSet which are dynamically added/removed with service discovery or configuration reload. The following metrics now include an extra dimention for alertmanager label: - prometheus_notifications_dropped_total - prometheus_notifications_queue_capacity - prometheus_notifications_queue_length This change also includes the test from #14099 Closes #7676 Signed-off-by: machine424 <ayoubmrini424@gmail.com> Signed-off-by: Siavash Safi <siavash@cloudflare.com> --------- Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> Signed-off-by: Siavash Safi <siavash@cloudflare.com> Signed-off-by: machine424 <ayoubmrini424@gmail.com> Co-authored-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
1508 lines
40 KiB
Go
1508 lines
40 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package notifier
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
|
config_util "github.com/prometheus/common/config"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/promslog"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/atomic"
|
|
"go.yaml.in/yaml/v2"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/discovery"
|
|
_ "github.com/prometheus/prometheus/discovery/file"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/relabel"
|
|
)
|
|
|
|
func alertsEqual(a, b []*Alert) error {
|
|
if len(a) != len(b) {
|
|
return fmt.Errorf("length mismatch: %v != %v", a, b)
|
|
}
|
|
for i, alert := range a {
|
|
if !labels.Equal(alert.Labels, b[i].Labels) {
|
|
return fmt.Errorf("label mismatch at index %d: %s != %s", i, alert.Labels, b[i].Labels)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newTestHTTPServerBuilder(expected *[]*Alert, errc chan<- error, u, p string, status *atomic.Int32) *httptest.Server {
|
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
var err error
|
|
defer func() {
|
|
if err == nil {
|
|
return
|
|
}
|
|
select {
|
|
case errc <- err:
|
|
default:
|
|
}
|
|
}()
|
|
user, pass, _ := r.BasicAuth()
|
|
if user != u || pass != p {
|
|
err = fmt.Errorf("unexpected user/password: %s/%s != %s/%s", user, pass, u, p)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
b, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
err = fmt.Errorf("error reading body: %w", err)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
var alerts []*Alert
|
|
err = json.Unmarshal(b, &alerts)
|
|
if err == nil {
|
|
err = alertsEqual(*expected, alerts)
|
|
}
|
|
w.WriteHeader(int(status.Load()))
|
|
}))
|
|
}
|
|
|
|
func TestHandlerSendAll(t *testing.T) {
|
|
var (
|
|
errc = make(chan error, 1)
|
|
expected = make([]*Alert, 0)
|
|
status1, status2, status3 atomic.Int32
|
|
)
|
|
status1.Store(int32(http.StatusOK))
|
|
status2.Store(int32(http.StatusOK))
|
|
status3.Store(int32(http.StatusOK))
|
|
|
|
server1 := newTestHTTPServerBuilder(&expected, errc, "prometheus", "testing_password", &status1)
|
|
server2 := newTestHTTPServerBuilder(&expected, errc, "", "", &status2)
|
|
server3 := newTestHTTPServerBuilder(&expected, errc, "", "", &status3)
|
|
defer server1.Close()
|
|
defer server2.Close()
|
|
defer server3.Close()
|
|
|
|
h := NewManager(&Options{}, model.UTF8Validation, nil)
|
|
|
|
authClient, _ := config_util.NewClientFromConfig(
|
|
config_util.HTTPClientConfig{
|
|
BasicAuth: &config_util.BasicAuth{
|
|
Username: "prometheus",
|
|
Password: "testing_password",
|
|
},
|
|
}, "auth_alertmanager")
|
|
|
|
h.alertmanagers = make(map[string]*alertmanagerSet)
|
|
|
|
am1Cfg := config.DefaultAlertmanagerConfig
|
|
am1Cfg.Timeout = model.Duration(time.Second)
|
|
|
|
am2Cfg := config.DefaultAlertmanagerConfig
|
|
am2Cfg.Timeout = model.Duration(time.Second)
|
|
|
|
am3Cfg := config.DefaultAlertmanagerConfig
|
|
am3Cfg.Timeout = model.Duration(time.Second)
|
|
|
|
opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: DefaultMaxBatchSize}
|
|
logger := slog.New(slog.DiscardHandler)
|
|
|
|
h.alertmanagers["1"] = &alertmanagerSet{
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server1.URL },
|
|
},
|
|
},
|
|
cfg: &am1Cfg,
|
|
client: authClient,
|
|
sendLoops: map[string]*sendLoop{
|
|
server1.URL: newSendLoop(server1.URL, authClient, &am1Cfg, opts, logger, h.metrics),
|
|
},
|
|
opts: opts,
|
|
metrics: h.metrics,
|
|
logger: logger,
|
|
}
|
|
|
|
h.alertmanagers["2"] = &alertmanagerSet{
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server2.URL },
|
|
},
|
|
alertmanagerMock{
|
|
urlf: func() string { return server3.URL },
|
|
},
|
|
},
|
|
cfg: &am2Cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server2.URL: newSendLoop(server2.URL, nil, &am2Cfg, opts, logger, h.metrics),
|
|
server3.URL: newSendLoop(server3.URL, nil, &am3Cfg, opts, logger, h.metrics),
|
|
},
|
|
opts: opts,
|
|
metrics: h.metrics,
|
|
logger: logger,
|
|
}
|
|
|
|
h.alertmanagers["3"] = &alertmanagerSet{
|
|
ams: []alertmanager{}, // empty set
|
|
cfg: &am3Cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server3.URL: newSendLoop(server3.URL, nil, &am3Cfg, opts, logger, h.metrics),
|
|
},
|
|
opts: opts,
|
|
metrics: h.metrics,
|
|
logger: logger,
|
|
}
|
|
|
|
var alerts []*Alert
|
|
for i := range DefaultMaxBatchSize {
|
|
alerts = append(alerts, &Alert{
|
|
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
|
|
})
|
|
expected = append(expected, &Alert{
|
|
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
|
|
})
|
|
}
|
|
|
|
checkNoErr := func() {
|
|
t.Helper()
|
|
select {
|
|
case err := <-errc:
|
|
require.NoError(t, err)
|
|
default:
|
|
}
|
|
}
|
|
|
|
// start send loops
|
|
for _, ams := range h.alertmanagers {
|
|
ams.startSendLoops(ams.ams)
|
|
}
|
|
defer func() {
|
|
for _, ams := range h.alertmanagers {
|
|
ams.cleanSendLoops(ams.ams...)
|
|
}
|
|
}()
|
|
|
|
h.Send(alerts...)
|
|
require.Eventually(t, func() bool {
|
|
return prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server1.URL)) == DefaultMaxBatchSize
|
|
}, time.Second*2, time.Millisecond*10)
|
|
checkNoErr()
|
|
|
|
// the only am in set 1 is down
|
|
status1.Store(int32(http.StatusNotFound))
|
|
h.Send(alerts...)
|
|
require.Eventually(t, func() bool {
|
|
return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server1.URL)) == DefaultMaxBatchSize
|
|
}, time.Second*2, time.Millisecond*10)
|
|
checkNoErr()
|
|
|
|
// fix the am
|
|
status1.Store(int32(http.StatusOK))
|
|
|
|
// only one of the ams in set 2 is down
|
|
status2.Store(int32(http.StatusInternalServerError))
|
|
h.Send(alerts...)
|
|
require.Eventually(t, func() bool {
|
|
return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server2.URL)) == DefaultMaxBatchSize
|
|
}, time.Second*2, time.Millisecond*10)
|
|
checkNoErr()
|
|
|
|
// both ams in set 2 are down
|
|
status3.Store(int32(http.StatusInternalServerError))
|
|
h.Send(alerts...)
|
|
require.Eventually(t, func() bool {
|
|
return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server2.URL)) == DefaultMaxBatchSize*2 &&
|
|
prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server3.URL)) == DefaultMaxBatchSize
|
|
}, time.Second*3, time.Millisecond*10)
|
|
checkNoErr()
|
|
}
|
|
|
|
func TestHandlerSendAllRemapPerAm(t *testing.T) {
|
|
var (
|
|
errc = make(chan error, 1)
|
|
expected1 = make([]*Alert, 0)
|
|
expected2 = make([]*Alert, 0)
|
|
expected3 = make([]*Alert, 0)
|
|
|
|
status1, status2, status3 atomic.Int32
|
|
)
|
|
status1.Store(int32(http.StatusOK))
|
|
status2.Store(int32(http.StatusOK))
|
|
status3.Store(int32(http.StatusOK))
|
|
|
|
server1 := newTestHTTPServerBuilder(&expected1, errc, "", "", &status1)
|
|
server2 := newTestHTTPServerBuilder(&expected2, errc, "", "", &status2)
|
|
server3 := newTestHTTPServerBuilder(&expected3, errc, "", "", &status3)
|
|
|
|
defer server1.Close()
|
|
defer server2.Close()
|
|
defer server3.Close()
|
|
|
|
h := NewManager(&Options{}, model.UTF8Validation, nil)
|
|
h.alertmanagers = make(map[string]*alertmanagerSet)
|
|
|
|
am1Cfg := config.DefaultAlertmanagerConfig
|
|
am1Cfg.Timeout = model.Duration(time.Second)
|
|
|
|
am2Cfg := config.DefaultAlertmanagerConfig
|
|
am2Cfg.Timeout = model.Duration(time.Second)
|
|
am2Cfg.AlertRelabelConfigs = []*relabel.Config{
|
|
{
|
|
SourceLabels: model.LabelNames{"alertnamedrop"},
|
|
Action: "drop",
|
|
Regex: relabel.MustNewRegexp(".+"),
|
|
NameValidationScheme: model.UTF8Validation,
|
|
},
|
|
}
|
|
|
|
am3Cfg := config.DefaultAlertmanagerConfig
|
|
am3Cfg.Timeout = model.Duration(time.Second)
|
|
am3Cfg.AlertRelabelConfigs = []*relabel.Config{
|
|
{
|
|
SourceLabels: model.LabelNames{"alertname"},
|
|
Action: "drop",
|
|
Regex: relabel.MustNewRegexp(".+"),
|
|
NameValidationScheme: model.UTF8Validation,
|
|
},
|
|
}
|
|
|
|
opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: DefaultMaxBatchSize}
|
|
logger := slog.New(slog.DiscardHandler)
|
|
|
|
h.alertmanagers = map[string]*alertmanagerSet{
|
|
// Drop no alerts.
|
|
"1": {
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server1.URL },
|
|
},
|
|
},
|
|
cfg: &am1Cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server1.URL: newSendLoop(server1.URL, nil, &am1Cfg, opts, logger, h.metrics),
|
|
},
|
|
opts: opts,
|
|
metrics: h.metrics,
|
|
logger: logger,
|
|
},
|
|
// Drop only alerts with the "alertnamedrop" label.
|
|
"2": {
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server2.URL },
|
|
},
|
|
},
|
|
cfg: &am2Cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server2.URL: newSendLoop(server2.URL, nil, &am2Cfg, opts, logger, h.metrics),
|
|
},
|
|
opts: opts,
|
|
metrics: h.metrics,
|
|
logger: logger,
|
|
},
|
|
// Drop all alerts.
|
|
"3": {
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server3.URL },
|
|
},
|
|
},
|
|
cfg: &am3Cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server3.URL: newSendLoop(server3.URL, nil, &am3Cfg, opts, logger, h.metrics),
|
|
},
|
|
opts: opts,
|
|
metrics: h.metrics,
|
|
logger: logger,
|
|
},
|
|
// Empty list of Alertmanager endpoints.
|
|
"4": {
|
|
ams: []alertmanager{},
|
|
cfg: &config.DefaultAlertmanagerConfig,
|
|
sendLoops: make(map[string]*sendLoop),
|
|
opts: opts,
|
|
metrics: h.metrics,
|
|
logger: logger,
|
|
},
|
|
}
|
|
|
|
var alerts []*Alert
|
|
for i := range make([]struct{}, DefaultMaxBatchSize/2) {
|
|
alerts = append(alerts,
|
|
&Alert{
|
|
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
|
|
},
|
|
&Alert{
|
|
Labels: labels.FromStrings("alertname", "test", "alertnamedrop", strconv.Itoa(i)),
|
|
},
|
|
)
|
|
|
|
expected1 = append(expected1,
|
|
&Alert{
|
|
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
|
|
}, &Alert{
|
|
Labels: labels.FromStrings("alertname", "test", "alertnamedrop", strconv.Itoa(i)),
|
|
},
|
|
)
|
|
|
|
expected2 = append(expected2, &Alert{
|
|
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
|
|
})
|
|
}
|
|
|
|
checkNoErr := func() {
|
|
t.Helper()
|
|
select {
|
|
case err := <-errc:
|
|
require.NoError(t, err)
|
|
default:
|
|
}
|
|
}
|
|
|
|
// start send loops
|
|
for _, ams := range h.alertmanagers {
|
|
ams.startSendLoops(ams.ams)
|
|
}
|
|
defer func() {
|
|
// stop send loops.
|
|
for _, ams := range h.alertmanagers {
|
|
ams.cleanSendLoops(ams.ams...)
|
|
}
|
|
}()
|
|
|
|
// all ams are up
|
|
h.Send(alerts...)
|
|
require.Eventually(t, func() bool {
|
|
return prom_testutil.ToFloat64(h.metrics.sent.WithLabelValues(server1.URL)) == DefaultMaxBatchSize
|
|
}, time.Second*2, time.Millisecond*10)
|
|
checkNoErr()
|
|
|
|
// the only am in set 1 goes down
|
|
status1.Store(int32(http.StatusInternalServerError))
|
|
h.Send(alerts...)
|
|
// wait for metrics to update
|
|
require.Eventually(t, func() bool {
|
|
return prom_testutil.ToFloat64(h.metrics.errors.WithLabelValues(server1.URL)) == DefaultMaxBatchSize
|
|
}, time.Second*2, time.Millisecond*10)
|
|
checkNoErr()
|
|
|
|
// reset set 1
|
|
status1.Store(int32(http.StatusOK))
|
|
|
|
// set 3 loses its only am, but all alerts were dropped
|
|
// so there was nothing to send, keeping sendAll true
|
|
status3.Store(int32(http.StatusInternalServerError))
|
|
h.Send(alerts...)
|
|
checkNoErr()
|
|
}
|
|
|
|
func TestExternalLabels(t *testing.T) {
|
|
reg := prometheus.NewRegistry()
|
|
h := NewManager(&Options{
|
|
QueueCapacity: 3 * DefaultMaxBatchSize,
|
|
MaxBatchSize: DefaultMaxBatchSize,
|
|
ExternalLabels: labels.FromStrings("a", "b"),
|
|
RelabelConfigs: []*relabel.Config{
|
|
{
|
|
SourceLabels: model.LabelNames{"alertname"},
|
|
TargetLabel: "a",
|
|
Action: "replace",
|
|
Regex: relabel.MustNewRegexp("externalrelabelthis"),
|
|
Replacement: "c",
|
|
NameValidationScheme: model.UTF8Validation,
|
|
},
|
|
},
|
|
Registerer: reg,
|
|
}, model.UTF8Validation, nil)
|
|
|
|
cfg := config.DefaultAlertmanagerConfig
|
|
h.alertmanagers = map[string]*alertmanagerSet{
|
|
"test": {
|
|
cfg: &cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
"test": newSendLoop("test", nil, &cfg, h.opts, slog.New(slog.DiscardHandler), h.metrics),
|
|
},
|
|
},
|
|
}
|
|
|
|
// This alert should get the external label attached.
|
|
h.Send(&Alert{
|
|
Labels: labels.FromStrings("alertname", "test"),
|
|
})
|
|
|
|
// This alert should get the external label attached, but then set to "c"
|
|
// through relabelling.
|
|
h.Send(&Alert{
|
|
Labels: labels.FromStrings("alertname", "externalrelabelthis"),
|
|
})
|
|
|
|
expected := []*Alert{
|
|
{Labels: labels.FromStrings("alertname", "test", "a", "b")},
|
|
{Labels: labels.FromStrings("alertname", "externalrelabelthis", "a", "c")},
|
|
}
|
|
|
|
require.NoError(t, alertsEqual(expected, h.alertmanagers["test"].sendLoops["test"].queue))
|
|
}
|
|
|
|
func TestHandlerRelabel(t *testing.T) {
|
|
reg := prometheus.NewRegistry()
|
|
h := NewManager(&Options{
|
|
QueueCapacity: 3 * DefaultMaxBatchSize,
|
|
MaxBatchSize: DefaultMaxBatchSize,
|
|
RelabelConfigs: []*relabel.Config{
|
|
{
|
|
SourceLabels: model.LabelNames{"alertname"},
|
|
Action: "drop",
|
|
Regex: relabel.MustNewRegexp("drop"),
|
|
NameValidationScheme: model.UTF8Validation,
|
|
},
|
|
{
|
|
SourceLabels: model.LabelNames{"alertname"},
|
|
TargetLabel: "alertname",
|
|
Action: "replace",
|
|
Regex: relabel.MustNewRegexp("rename"),
|
|
Replacement: "renamed",
|
|
NameValidationScheme: model.UTF8Validation,
|
|
},
|
|
},
|
|
Registerer: reg,
|
|
}, model.UTF8Validation, nil)
|
|
|
|
cfg := config.DefaultAlertmanagerConfig
|
|
h.alertmanagers = map[string]*alertmanagerSet{
|
|
"test": {
|
|
cfg: &cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
"test": newSendLoop("test", nil, &cfg, h.opts, slog.New(slog.DiscardHandler), h.metrics),
|
|
},
|
|
},
|
|
}
|
|
|
|
// This alert should be dropped due to the configuration
|
|
h.Send(&Alert{
|
|
Labels: labels.FromStrings("alertname", "drop"),
|
|
})
|
|
|
|
// This alert should be replaced due to the configuration
|
|
h.Send(&Alert{
|
|
Labels: labels.FromStrings("alertname", "rename"),
|
|
})
|
|
|
|
expected := []*Alert{
|
|
{Labels: labels.FromStrings("alertname", "renamed")},
|
|
}
|
|
|
|
require.NoError(t, alertsEqual(expected, h.alertmanagers["test"].sendLoops["test"].queue))
|
|
}
|
|
|
|
func TestHandlerQueuing(t *testing.T) {
|
|
var (
|
|
expectedc = make(chan []*Alert)
|
|
called = make(chan struct{})
|
|
done = make(chan struct{})
|
|
errc = make(chan error, 1)
|
|
)
|
|
|
|
server := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
|
|
// Notify the test function that we have received something.
|
|
select {
|
|
case called <- struct{}{}:
|
|
case <-done:
|
|
return
|
|
}
|
|
|
|
// Wait for the test function to unblock us.
|
|
select {
|
|
case expected := <-expectedc:
|
|
var alerts []*Alert
|
|
|
|
b, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
err = json.Unmarshal(b, &alerts)
|
|
if err == nil {
|
|
err = alertsEqual(expected, alerts)
|
|
}
|
|
select {
|
|
case errc <- err:
|
|
default:
|
|
}
|
|
case <-done:
|
|
}
|
|
}))
|
|
defer func() {
|
|
close(done)
|
|
server.Close()
|
|
}()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
h := NewManager(
|
|
&Options{
|
|
QueueCapacity: 3 * DefaultMaxBatchSize,
|
|
MaxBatchSize: DefaultMaxBatchSize,
|
|
Registerer: reg,
|
|
},
|
|
model.UTF8Validation,
|
|
nil,
|
|
)
|
|
|
|
h.alertmanagers = make(map[string]*alertmanagerSet)
|
|
|
|
am1Cfg := config.DefaultAlertmanagerConfig
|
|
am1Cfg.Timeout = model.Duration(time.Second)
|
|
|
|
h.alertmanagers["1"] = &alertmanagerSet{
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server.URL },
|
|
},
|
|
},
|
|
cfg: &am1Cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server.URL: newSendLoop(server.URL, nil, &am1Cfg, h.opts, slog.New(slog.DiscardHandler), h.metrics),
|
|
},
|
|
}
|
|
|
|
go h.Run(nil)
|
|
defer h.Stop()
|
|
|
|
// start send loops
|
|
for _, ams := range h.alertmanagers {
|
|
ams.startSendLoops(ams.ams)
|
|
}
|
|
|
|
var alerts []*Alert
|
|
for i := range make([]struct{}, 20*DefaultMaxBatchSize) {
|
|
alerts = append(alerts, &Alert{
|
|
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
|
|
})
|
|
}
|
|
|
|
assertAlerts := func(expected []*Alert) {
|
|
t.Helper()
|
|
for {
|
|
select {
|
|
case <-called:
|
|
expectedc <- expected
|
|
case err := <-errc:
|
|
require.NoError(t, err)
|
|
return
|
|
case <-time.After(5 * time.Second):
|
|
require.FailNow(t, "Alerts were not pushed.")
|
|
}
|
|
}
|
|
}
|
|
|
|
// If the batch is larger than the queue capacity, it should be truncated
|
|
// from the front.
|
|
h.Send(alerts[:4*DefaultMaxBatchSize]...)
|
|
for i := 1; i < 4; i++ {
|
|
assertAlerts(alerts[i*DefaultMaxBatchSize : (i+1)*DefaultMaxBatchSize])
|
|
}
|
|
|
|
// Send one batch, wait for it to arrive and block the server so the queue fills up.
|
|
h.Send(alerts[:DefaultMaxBatchSize]...)
|
|
<-called
|
|
|
|
// Send several batches while the server is still blocked so the queue
|
|
// fills up to its maximum capacity (3*DefaultMaxBatchSize). Then check that the
|
|
// queue is truncated in the front.
|
|
h.Send(alerts[1*DefaultMaxBatchSize : 2*DefaultMaxBatchSize]...) // this batch should be dropped.
|
|
h.Send(alerts[2*DefaultMaxBatchSize : 3*DefaultMaxBatchSize]...)
|
|
h.Send(alerts[3*DefaultMaxBatchSize : 4*DefaultMaxBatchSize]...)
|
|
|
|
// Send the batch that drops the first one.
|
|
h.Send(alerts[4*DefaultMaxBatchSize : 5*DefaultMaxBatchSize]...)
|
|
|
|
// Unblock the server.
|
|
expectedc <- alerts[:DefaultMaxBatchSize]
|
|
select {
|
|
case err := <-errc:
|
|
require.NoError(t, err)
|
|
case <-time.After(5 * time.Second):
|
|
require.FailNow(t, "Alerts were not pushed.")
|
|
}
|
|
|
|
// Verify that we receive the last 3 batches.
|
|
for i := 2; i < 5; i++ {
|
|
assertAlerts(alerts[i*DefaultMaxBatchSize : (i+1)*DefaultMaxBatchSize])
|
|
}
|
|
}
|
|
|
|
type alertmanagerMock struct {
|
|
urlf func() string
|
|
}
|
|
|
|
func (a alertmanagerMock) url() *url.URL {
|
|
u, err := url.Parse(a.urlf())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return u
|
|
}
|
|
|
|
func TestReload(t *testing.T) {
|
|
tests := []struct {
|
|
in *targetgroup.Group
|
|
out string
|
|
}{
|
|
{
|
|
in: &targetgroup.Group{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
"__address__": "alertmanager:9093",
|
|
},
|
|
},
|
|
},
|
|
out: "http://alertmanager:9093/api/v2/alerts",
|
|
},
|
|
}
|
|
|
|
n := NewManager(&Options{}, model.UTF8Validation, nil)
|
|
|
|
cfg := &config.Config{}
|
|
s := `
|
|
alerting:
|
|
alertmanagers:
|
|
- static_configs:
|
|
`
|
|
err := yaml.UnmarshalStrict([]byte(s), cfg)
|
|
require.NoError(t, err, "Unable to load YAML config.")
|
|
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 1)
|
|
|
|
err = n.ApplyConfig(cfg)
|
|
require.NoError(t, err, "Error applying the config.")
|
|
|
|
tgs := make(map[string][]*targetgroup.Group)
|
|
for _, tt := range tests {
|
|
for k := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
|
|
tgs[k] = []*targetgroup.Group{
|
|
tt.in,
|
|
}
|
|
break
|
|
}
|
|
n.reload(tgs)
|
|
res := n.Alertmanagers()[0].String()
|
|
|
|
require.Equal(t, tt.out, res)
|
|
}
|
|
}
|
|
|
|
func TestDroppedAlertmanagers(t *testing.T) {
|
|
tests := []struct {
|
|
in *targetgroup.Group
|
|
out string
|
|
}{
|
|
{
|
|
in: &targetgroup.Group{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
"__address__": "alertmanager:9093",
|
|
},
|
|
},
|
|
},
|
|
out: "http://alertmanager:9093/api/v2/alerts",
|
|
},
|
|
}
|
|
|
|
n := NewManager(&Options{}, model.UTF8Validation, nil)
|
|
|
|
cfg := &config.Config{}
|
|
s := `
|
|
alerting:
|
|
alertmanagers:
|
|
- static_configs:
|
|
relabel_configs:
|
|
- source_labels: ['__address__']
|
|
regex: 'alertmanager:9093'
|
|
action: drop
|
|
`
|
|
err := yaml.UnmarshalStrict([]byte(s), cfg)
|
|
require.NoError(t, err, "Unable to load YAML config.")
|
|
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 1)
|
|
|
|
err = n.ApplyConfig(cfg)
|
|
require.NoError(t, err, "Error applying the config.")
|
|
|
|
tgs := make(map[string][]*targetgroup.Group)
|
|
for _, tt := range tests {
|
|
for k := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
|
|
tgs[k] = []*targetgroup.Group{
|
|
tt.in,
|
|
}
|
|
break
|
|
}
|
|
|
|
n.reload(tgs)
|
|
res := n.DroppedAlertmanagers()[0].String()
|
|
|
|
require.Equal(t, res, tt.out)
|
|
}
|
|
}
|
|
|
|
func makeInputTargetGroup() *targetgroup.Group {
|
|
return &targetgroup.Group{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
model.AddressLabel: model.LabelValue("1.1.1.1:9090"),
|
|
model.LabelName("notcommon1"): model.LabelValue("label"),
|
|
},
|
|
},
|
|
Labels: model.LabelSet{
|
|
model.LabelName("common"): model.LabelValue("label"),
|
|
},
|
|
Source: "testsource",
|
|
}
|
|
}
|
|
|
|
// TestHangingNotifier ensures that the notifier takes into account SD changes even when there are
|
|
// queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676.
|
|
// and https://github.com/prometheus/prometheus/issues/8768.
|
|
func TestHangingNotifier(t *testing.T) {
|
|
const (
|
|
batches = 100
|
|
alertsCount = DefaultMaxBatchSize * batches
|
|
)
|
|
|
|
var (
|
|
sendTimeout = 100 * time.Millisecond
|
|
sdUpdatert = sendTimeout / 2
|
|
|
|
done = make(chan struct{})
|
|
)
|
|
|
|
// Set up a faulty Alertmanager.
|
|
var faultyCalled atomic.Bool
|
|
faultyServer := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
|
|
faultyCalled.Store(true)
|
|
select {
|
|
case <-done:
|
|
case <-time.After(time.Hour):
|
|
}
|
|
}))
|
|
defer func() {
|
|
close(done)
|
|
}()
|
|
|
|
faultyURL, err := url.Parse(faultyServer.URL)
|
|
require.NoError(t, err)
|
|
faultyURL.Path = "/api/v2/alerts"
|
|
|
|
// Set up a functional Alertmanager.
|
|
var functionalCalled atomic.Bool
|
|
functionalServer := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
|
|
functionalCalled.Store(true)
|
|
}))
|
|
defer functionalServer.Close()
|
|
functionalURL, err := url.Parse(functionalServer.URL)
|
|
require.NoError(t, err)
|
|
functionalURL.Path = "/api/v2/alerts"
|
|
|
|
// Initialize the discovery manager
|
|
// This is relevant as the updates aren't sent continually in real life, but only each updatert.
|
|
// The old implementation of TestHangingNotifier didn't take that into account.
|
|
ctx, cancelSdManager := context.WithCancel(t.Context())
|
|
defer cancelSdManager()
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
|
|
require.NoError(t, err)
|
|
sdManager := discovery.NewManager(
|
|
ctx,
|
|
promslog.NewNopLogger(),
|
|
reg,
|
|
sdMetrics,
|
|
discovery.Name("sd-manager"),
|
|
discovery.Updatert(sdUpdatert),
|
|
)
|
|
go sdManager.Run()
|
|
|
|
// Set up the notifier with both faulty and functional Alertmanagers.
|
|
notifier := NewManager(
|
|
&Options{
|
|
QueueCapacity: alertsCount,
|
|
Registerer: reg,
|
|
},
|
|
model.UTF8Validation,
|
|
nil,
|
|
)
|
|
notifier.alertmanagers = make(map[string]*alertmanagerSet)
|
|
amCfg := config.DefaultAlertmanagerConfig
|
|
amCfg.Timeout = model.Duration(sendTimeout)
|
|
notifier.alertmanagers["config-0"] = &alertmanagerSet{
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return faultyURL.String() },
|
|
},
|
|
alertmanagerMock{
|
|
urlf: func() string { return functionalURL.String() },
|
|
},
|
|
},
|
|
cfg: &amCfg,
|
|
metrics: notifier.metrics,
|
|
sendLoops: map[string]*sendLoop{
|
|
faultyURL.String(): newSendLoop(faultyURL.String(), nil, &amCfg, notifier.opts, slog.New(slog.DiscardHandler), notifier.metrics),
|
|
functionalURL.String(): newSendLoop(functionalURL.String(), nil, &amCfg, notifier.opts, slog.New(slog.DiscardHandler), notifier.metrics),
|
|
},
|
|
opts: &Options{Do: do, MaxBatchSize: DefaultMaxBatchSize},
|
|
logger: slog.New(slog.DiscardHandler),
|
|
}
|
|
|
|
for _, ams := range notifier.alertmanagers {
|
|
ams.startSendLoops(ams.ams)
|
|
}
|
|
|
|
go notifier.Run(sdManager.SyncCh())
|
|
defer notifier.Stop()
|
|
|
|
require.Len(t, notifier.Alertmanagers(), 2)
|
|
|
|
// Enqueue the alerts.
|
|
var alerts []*Alert
|
|
for i := range make([]struct{}, alertsCount) {
|
|
alerts = append(alerts, &Alert{
|
|
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
|
|
})
|
|
}
|
|
notifier.Send(alerts...)
|
|
|
|
// Wait for the Alertmanagers to start receiving alerts.
|
|
// 10*sdUpdatert is used as an arbitrary timeout here.
|
|
timeout := time.After(10 * sdUpdatert)
|
|
loop1:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatalf("Timeout waiting for the alertmanagers to be reached for the first time.")
|
|
default:
|
|
if faultyCalled.Load() && functionalCalled.Load() {
|
|
break loop1
|
|
}
|
|
}
|
|
}
|
|
|
|
// Request to remove the faulty Alertmanager.
|
|
c := map[string]discovery.Configs{
|
|
"config-0": {
|
|
discovery.StaticConfig{
|
|
&targetgroup.Group{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
model.AddressLabel: model.LabelValue(functionalURL.Host),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
require.NoError(t, sdManager.ApplyConfig(c))
|
|
|
|
timeout = time.After(batches * sendTimeout)
|
|
loop2:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatalf("Timeout, the faulty alertmanager not removed on time.")
|
|
default:
|
|
// The faulty alertmanager was dropped.
|
|
if len(notifier.Alertmanagers()) == 1 {
|
|
// The notifier should not wait until the alerts queue of the functional am is empty to apply the discovery changes.
|
|
require.NotEmpty(t, notifier.alertmanagers["config-0"].sendLoops[functionalURL.String()].queue)
|
|
break loop2
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestStop_DrainingDisabled(t *testing.T) {
|
|
releaseReceiver := make(chan struct{})
|
|
receiverReceivedRequest := make(chan struct{}, 2)
|
|
alertsReceived := atomic.NewInt64(0)
|
|
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
// Let the test know we've received a request.
|
|
receiverReceivedRequest <- struct{}{}
|
|
|
|
var alerts []*Alert
|
|
|
|
b, err := io.ReadAll(r.Body)
|
|
require.NoError(t, err)
|
|
|
|
err = json.Unmarshal(b, &alerts)
|
|
require.NoError(t, err)
|
|
|
|
alertsReceived.Add(int64(len(alerts)))
|
|
|
|
// Wait for the test to release us.
|
|
<-releaseReceiver
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer func() {
|
|
server.Close()
|
|
}()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
m := NewManager(
|
|
&Options{
|
|
QueueCapacity: 10,
|
|
DrainOnShutdown: false,
|
|
Registerer: reg,
|
|
},
|
|
model.UTF8Validation,
|
|
nil,
|
|
)
|
|
|
|
m.alertmanagers = make(map[string]*alertmanagerSet)
|
|
|
|
am1Cfg := config.DefaultAlertmanagerConfig
|
|
am1Cfg.Timeout = model.Duration(time.Second)
|
|
|
|
m.alertmanagers["1"] = &alertmanagerSet{
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server.URL },
|
|
},
|
|
},
|
|
cfg: &am1Cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server.URL: newSendLoop(server.URL, nil, &am1Cfg, m.opts, slog.New(slog.DiscardHandler), m.metrics),
|
|
},
|
|
opts: &Options{Do: do, MaxBatchSize: DefaultMaxBatchSize},
|
|
logger: slog.New(slog.DiscardHandler),
|
|
}
|
|
|
|
for _, ams := range m.alertmanagers {
|
|
ams.startSendLoops(ams.ams)
|
|
}
|
|
|
|
notificationManagerStopped := make(chan struct{})
|
|
|
|
go func() {
|
|
defer close(notificationManagerStopped)
|
|
m.Run(nil)
|
|
}()
|
|
|
|
// Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later.
|
|
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})
|
|
|
|
select {
|
|
case <-receiverReceivedRequest:
|
|
// Nothing more to do.
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "gave up waiting for receiver to receive notification of first alert")
|
|
}
|
|
|
|
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
|
|
|
|
// Stop the notification manager, pause to allow the shutdown to be observed, and then allow the receiver to proceed.
|
|
m.Stop()
|
|
time.Sleep(time.Second)
|
|
close(releaseReceiver)
|
|
|
|
// Wait for the notification manager to stop and confirm only the first notification was sent.
|
|
// The second notification should be dropped.
|
|
select {
|
|
case <-notificationManagerStopped:
|
|
// Nothing more to do.
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "gave up waiting for notification manager to stop")
|
|
}
|
|
|
|
require.Equal(t, int64(1), alertsReceived.Load())
|
|
}
|
|
|
|
func TestStop_DrainingEnabled(t *testing.T) {
|
|
releaseReceiver := make(chan struct{})
|
|
receiverReceivedRequest := make(chan struct{}, 2)
|
|
alertsReceived := atomic.NewInt64(0)
|
|
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
var alerts []*Alert
|
|
|
|
// Let the test know we've received a request.
|
|
receiverReceivedRequest <- struct{}{}
|
|
|
|
b, err := io.ReadAll(r.Body)
|
|
require.NoError(t, err)
|
|
|
|
err = json.Unmarshal(b, &alerts)
|
|
require.NoError(t, err)
|
|
|
|
alertsReceived.Add(int64(len(alerts)))
|
|
|
|
// Wait for the test to release us.
|
|
<-releaseReceiver
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer func() {
|
|
server.Close()
|
|
}()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
m := NewManager(
|
|
&Options{
|
|
QueueCapacity: 10,
|
|
DrainOnShutdown: true,
|
|
Registerer: reg,
|
|
},
|
|
model.UTF8Validation,
|
|
nil,
|
|
)
|
|
|
|
m.alertmanagers = make(map[string]*alertmanagerSet)
|
|
|
|
am1Cfg := config.DefaultAlertmanagerConfig
|
|
am1Cfg.Timeout = model.Duration(time.Second)
|
|
|
|
m.alertmanagers["1"] = &alertmanagerSet{
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server.URL },
|
|
},
|
|
},
|
|
cfg: &am1Cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server.URL: newSendLoop(server.URL, nil, &am1Cfg, m.opts, slog.New(slog.DiscardHandler), m.metrics),
|
|
},
|
|
opts: &Options{Do: do, MaxBatchSize: DefaultMaxBatchSize},
|
|
metrics: m.metrics,
|
|
logger: slog.New(slog.DiscardHandler),
|
|
}
|
|
|
|
for _, ams := range m.alertmanagers {
|
|
ams.startSendLoops(ams.ams)
|
|
}
|
|
|
|
notificationManagerStopped := make(chan struct{})
|
|
|
|
go func() {
|
|
defer close(notificationManagerStopped)
|
|
m.Run(nil)
|
|
}()
|
|
|
|
// Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later.
|
|
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})
|
|
|
|
select {
|
|
case <-receiverReceivedRequest:
|
|
// Nothing more to do.
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "gave up waiting for receiver to receive notification of first alert")
|
|
}
|
|
|
|
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
|
|
|
|
// Stop the notification manager and allow the receiver to proceed.
|
|
m.Stop()
|
|
close(releaseReceiver)
|
|
|
|
// Wait for the notification manager to stop and confirm both notifications were sent.
|
|
select {
|
|
case <-notificationManagerStopped:
|
|
// Nothing more to do.
|
|
case <-time.After(200 * time.Millisecond):
|
|
require.FailNow(t, "gave up waiting for notification manager to stop")
|
|
}
|
|
|
|
require.Equal(t, int64(2), alertsReceived.Load())
|
|
}
|
|
|
|
// TestQueuesDrainingOnApplyConfig ensures that when an alertmanagerSet disappears after an ApplyConfig(), its
|
|
// sendLoops queues are drained only when DrainOnShutdown is set.
|
|
func TestQueuesDrainingOnApplyConfig(t *testing.T) {
|
|
for _, drainOnShutDown := range []bool{false, true} {
|
|
t.Run(strconv.FormatBool(drainOnShutDown), func(t *testing.T) {
|
|
t.Parallel()
|
|
alertSent := make(chan struct{})
|
|
|
|
server := newImmediateAlertManager(alertSent)
|
|
defer server.Close()
|
|
|
|
h := NewManager(&Options{}, model.UTF8Validation, nil)
|
|
h.alertmanagers = make(map[string]*alertmanagerSet)
|
|
|
|
amCfg := config.DefaultAlertmanagerConfig
|
|
amCfg.Timeout = model.Duration(time.Second)
|
|
|
|
opts := &Options{Do: do, QueueCapacity: 10, MaxBatchSize: DefaultMaxBatchSize, DrainOnShutdown: drainOnShutDown}
|
|
logger := slog.New(slog.DiscardHandler)
|
|
|
|
h.alertmanagers["1"] = &alertmanagerSet{
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server.URL },
|
|
},
|
|
},
|
|
cfg: &amCfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server.URL: newSendLoop(server.URL, nil, &amCfg, opts, logger, h.metrics),
|
|
},
|
|
opts: opts,
|
|
metrics: h.metrics,
|
|
logger: logger,
|
|
}
|
|
|
|
// The send loops were not started, nothing will be sent.
|
|
h.Send([]*Alert{{Labels: labels.FromStrings("alertname", "foo")}}...)
|
|
|
|
// Remove the alertmanagerSet.
|
|
h.ApplyConfig(&config.Config{})
|
|
|
|
select {
|
|
case <-alertSent:
|
|
if !drainOnShutDown {
|
|
require.FailNow(t, "no alert should be sent")
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
if drainOnShutDown {
|
|
require.FailNow(t, "alert wasn't received")
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestApplyConfig(t *testing.T) {
|
|
targetURL := "alertmanager:9093"
|
|
targetGroup := &targetgroup.Group{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
"__address__": model.LabelValue(targetURL),
|
|
},
|
|
},
|
|
}
|
|
alertmanagerURL := fmt.Sprintf("http://%s/api/v2/alerts", targetURL)
|
|
|
|
n := NewManager(&Options{}, model.UTF8Validation, nil)
|
|
cfg := &config.Config{}
|
|
s := `
|
|
alerting:
|
|
alertmanagers:
|
|
- file_sd_configs:
|
|
- files:
|
|
- foo.json
|
|
`
|
|
// 1. Ensure known alertmanagers are not dropped during ApplyConfig.
|
|
require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg))
|
|
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 1)
|
|
|
|
// First, apply the config and reload.
|
|
require.NoError(t, n.ApplyConfig(cfg))
|
|
tgs := map[string][]*targetgroup.Group{"config-0": {targetGroup}}
|
|
n.reload(tgs)
|
|
require.Len(t, n.Alertmanagers(), 1)
|
|
require.Equal(t, alertmanagerURL, n.Alertmanagers()[0].String())
|
|
|
|
// Reapply the config.
|
|
require.NoError(t, n.ApplyConfig(cfg))
|
|
// Ensure the known alertmanagers are not dropped.
|
|
require.Len(t, n.Alertmanagers(), 1)
|
|
require.Equal(t, alertmanagerURL, n.Alertmanagers()[0].String())
|
|
|
|
// 2. Ensure known alertmanagers are not dropped during ApplyConfig even when
|
|
// the config order changes.
|
|
s = `
|
|
alerting:
|
|
alertmanagers:
|
|
- static_configs:
|
|
- file_sd_configs:
|
|
- files:
|
|
- foo.json
|
|
`
|
|
require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg))
|
|
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 2)
|
|
|
|
require.NoError(t, n.ApplyConfig(cfg))
|
|
require.Len(t, n.Alertmanagers(), 1)
|
|
// Ensure no unnecessary alertmanagers are injected.
|
|
require.Empty(t, n.alertmanagers["config-0"].ams)
|
|
// Ensure the config order is taken into account.
|
|
ams := n.alertmanagers["config-1"].ams
|
|
require.Len(t, ams, 1)
|
|
require.Equal(t, alertmanagerURL, ams[0].url().String())
|
|
|
|
// 3. Ensure known alertmanagers are reused for new config with identical AlertmanagerConfig.
|
|
s = `
|
|
alerting:
|
|
alertmanagers:
|
|
- file_sd_configs:
|
|
- files:
|
|
- foo.json
|
|
- file_sd_configs:
|
|
- files:
|
|
- foo.json
|
|
`
|
|
require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg))
|
|
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 2)
|
|
|
|
require.NoError(t, n.ApplyConfig(cfg))
|
|
require.Len(t, n.Alertmanagers(), 2)
|
|
for cfgIdx := range 2 {
|
|
ams := n.alertmanagers[fmt.Sprintf("config-%d", cfgIdx)].ams
|
|
require.Len(t, ams, 1)
|
|
require.Equal(t, alertmanagerURL, ams[0].url().String())
|
|
}
|
|
|
|
// 4. Ensure known alertmanagers are reused only for identical AlertmanagerConfig.
|
|
s = `
|
|
alerting:
|
|
alertmanagers:
|
|
- file_sd_configs:
|
|
- files:
|
|
- foo.json
|
|
path_prefix: /bar
|
|
- file_sd_configs:
|
|
- files:
|
|
- foo.json
|
|
relabel_configs:
|
|
- source_labels: ['__address__']
|
|
regex: 'doesntmatter:1234'
|
|
action: drop
|
|
`
|
|
require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg))
|
|
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 2)
|
|
|
|
require.NoError(t, n.ApplyConfig(cfg))
|
|
require.Empty(t, n.Alertmanagers())
|
|
}
|
|
|
|
// TestAlerstRelabelingIsIsolated ensures that a mutation alerts relabeling in an
|
|
// alertmanagerSet doesn't affect others.
|
|
// See https://github.com/prometheus/prometheus/pull/17063.
|
|
func TestAlerstRelabelingIsIsolated(t *testing.T) {
|
|
var (
|
|
errc = make(chan error, 1)
|
|
expected1 = make([]*Alert, 0)
|
|
expected2 = make([]*Alert, 0)
|
|
|
|
status1, status2 atomic.Int32
|
|
)
|
|
status1.Store(int32(http.StatusOK))
|
|
status2.Store(int32(http.StatusOK))
|
|
|
|
server1 := newTestHTTPServerBuilder(&expected1, errc, "", "", &status1)
|
|
server2 := newTestHTTPServerBuilder(&expected2, errc, "", "", &status2)
|
|
|
|
defer server1.Close()
|
|
defer server2.Close()
|
|
|
|
h := NewManager(&Options{}, model.UTF8Validation, nil)
|
|
h.alertmanagers = make(map[string]*alertmanagerSet)
|
|
|
|
am1Cfg := config.DefaultAlertmanagerConfig
|
|
am1Cfg.Timeout = model.Duration(time.Second)
|
|
am1Cfg.AlertRelabelConfigs = []*relabel.Config{
|
|
{
|
|
SourceLabels: model.LabelNames{"alertname"},
|
|
Regex: relabel.MustNewRegexp("(.*)"),
|
|
TargetLabel: "parasite",
|
|
Action: relabel.Replace,
|
|
Replacement: "yes",
|
|
NameValidationScheme: model.UTF8Validation,
|
|
},
|
|
}
|
|
|
|
am2Cfg := config.DefaultAlertmanagerConfig
|
|
am2Cfg.Timeout = model.Duration(time.Second)
|
|
|
|
h.alertmanagers = map[string]*alertmanagerSet{
|
|
"am1": {
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server1.URL },
|
|
},
|
|
},
|
|
cfg: &am1Cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server1.URL: newSendLoop(server1.URL, nil, &am1Cfg, &Options{}, h.logger, h.metrics),
|
|
},
|
|
opts: &Options{},
|
|
metrics: h.metrics,
|
|
logger: h.logger,
|
|
},
|
|
"am2": {
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return server2.URL },
|
|
},
|
|
},
|
|
cfg: &am2Cfg,
|
|
sendLoops: map[string]*sendLoop{
|
|
server2.URL: newSendLoop(server2.URL, nil, &am2Cfg, &Options{}, h.logger, h.metrics),
|
|
},
|
|
opts: &Options{},
|
|
metrics: h.metrics,
|
|
logger: h.logger,
|
|
},
|
|
}
|
|
|
|
// start send loops
|
|
for _, ams := range h.alertmanagers {
|
|
ams.startSendLoops(ams.ams)
|
|
}
|
|
defer func() {
|
|
for _, ams := range h.alertmanagers {
|
|
ams.cleanSendLoops(ams.ams...)
|
|
}
|
|
}()
|
|
|
|
testAlert := &Alert{
|
|
Labels: labels.FromStrings("alertname", "test"),
|
|
}
|
|
|
|
expected1 = append(expected1, &Alert{
|
|
Labels: labels.FromStrings("alertname", "test", "parasite", "yes"),
|
|
})
|
|
|
|
// am2 shouldn't get the parasite label.
|
|
expected2 = append(expected2, &Alert{
|
|
Labels: labels.FromStrings("alertname", "test"),
|
|
})
|
|
|
|
checkNoErr := func() {
|
|
t.Helper()
|
|
select {
|
|
case err := <-errc:
|
|
require.NoError(t, err)
|
|
default:
|
|
}
|
|
}
|
|
|
|
h.Send(testAlert)
|
|
checkNoErr()
|
|
}
|
|
|
|
// Regression test for https://github.com/prometheus/prometheus/issues/7676
|
|
// The test creates a black hole alertmanager that never responds to any requests.
|
|
// The alertmanager_config.timeout is set to infinite (1 year).
|
|
// We check that the notifier does not hang and throughput is not affected.
|
|
func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) {
|
|
stopBlackHole := make(chan struct{})
|
|
blackHoleAM := newBlackHoleAlertmanager(stopBlackHole)
|
|
defer func() {
|
|
close(stopBlackHole)
|
|
blackHoleAM.Close()
|
|
}()
|
|
|
|
doneAlertReceive := make(chan struct{})
|
|
immediateAM := newImmediateAlertManager(doneAlertReceive)
|
|
defer immediateAM.Close()
|
|
|
|
do := func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
|
|
return client.Do(req.WithContext(ctx))
|
|
}
|
|
|
|
reg := prometheus.NewRegistry()
|
|
h := NewManager(&Options{
|
|
Do: do,
|
|
QueueCapacity: 10,
|
|
MaxBatchSize: DefaultMaxBatchSize,
|
|
Registerer: reg,
|
|
}, model.UTF8Validation, nil)
|
|
|
|
h.alertmanagers = make(map[string]*alertmanagerSet)
|
|
|
|
amCfg := config.DefaultAlertmanagerConfig
|
|
amCfg.Timeout = model.Duration(time.Hour * 24 * 365)
|
|
|
|
h.alertmanagers["1"] = &alertmanagerSet{
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return blackHoleAM.URL },
|
|
},
|
|
},
|
|
cfg: &amCfg,
|
|
opts: h.opts,
|
|
sendLoops: map[string]*sendLoop{
|
|
blackHoleAM.URL: newSendLoop(blackHoleAM.URL, http.DefaultClient, &amCfg, h.opts, slog.New(slog.DiscardHandler), h.metrics),
|
|
},
|
|
metrics: h.metrics,
|
|
}
|
|
|
|
h.alertmanagers["2"] = &alertmanagerSet{
|
|
ams: []alertmanager{
|
|
alertmanagerMock{
|
|
urlf: func() string { return immediateAM.URL },
|
|
},
|
|
},
|
|
cfg: &amCfg,
|
|
opts: h.opts,
|
|
sendLoops: map[string]*sendLoop{
|
|
immediateAM.URL: newSendLoop(immediateAM.URL, http.DefaultClient, &amCfg, h.opts, slog.New(slog.DiscardHandler), h.metrics),
|
|
},
|
|
metrics: h.metrics,
|
|
}
|
|
|
|
doneSendAll := make(chan struct{})
|
|
for _, ams := range h.alertmanagers {
|
|
ams.startSendLoops(ams.ams)
|
|
}
|
|
defer func() {
|
|
for _, ams := range h.alertmanagers {
|
|
ams.cleanSendLoops(ams.ams...)
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
h.Send(&Alert{
|
|
Labels: labels.FromStrings("alertname", "test"),
|
|
})
|
|
close(doneSendAll)
|
|
}()
|
|
|
|
select {
|
|
case <-doneAlertReceive:
|
|
// This is the happy case, the alert was received by the immediate alertmanager.
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Timeout waiting for alert to be received by immediate alertmanager")
|
|
}
|
|
|
|
select {
|
|
case <-doneSendAll:
|
|
// This is the happy case, the sendAll function returned.
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Timeout waiting for sendAll to return")
|
|
}
|
|
}
|
|
|
|
func newBlackHoleAlertmanager(stop <-chan struct{}) *httptest.Server {
|
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
// Do nothing, wait to be canceled.
|
|
<-stop
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
}
|
|
|
|
func newImmediateAlertManager(done chan<- struct{}) *httptest.Server {
|
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
close(done)
|
|
}))
|
|
}
|