prometheus/notifier/manager.go

327 lines
8.6 KiB
Go
Raw Permalink Normal View History

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-03-01 06:37:22 -05:00
package notifier
import (
"context"
2015-12-10 10:31:50 -05:00
"fmt"
"log/slog"
"net/http"
"net/url"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/config"
Refactor SD configuration to remove `config` dependency (#3629) * refactor: move targetGroup struct and CheckOverflow() to their own package * refactor: move auth and security related structs to a utility package, fix import error in utility package * refactor: Azure SD, remove SD struct from config * refactor: DNS SD, remove SD struct from config into dns package * refactor: ec2 SD, move SD struct from config into the ec2 package * refactor: file SD, move SD struct from config to file discovery package * refactor: gce, move SD struct from config to gce discovery package * refactor: move HTTPClientConfig and URL into util/config, fix import error in httputil * refactor: consul, move SD struct from config into consul discovery package * refactor: marathon, move SD struct from config into marathon discovery package * refactor: triton, move SD struct from config to triton discovery package, fix test * refactor: zookeeper, move SD structs from config to zookeeper discovery package * refactor: openstack, remove SD struct from config, move into openstack discovery package * refactor: kubernetes, move SD struct from config into kubernetes discovery package * refactor: notifier, use targetgroup package instead of config * refactor: tests for file, marathon, triton SD - use targetgroup package instead of config.TargetGroup * refactor: retrieval, use targetgroup package instead of config.TargetGroup * refactor: storage, use config util package * refactor: discovery manager, use targetgroup package instead of config.TargetGroup * refactor: use HTTPClient and TLS config from configUtil instead of config * refactor: tests, use targetgroup package instead of config.TargetGroup * refactor: fix tagetgroup.Group pointers that were removed by mistake * refactor: openstack, kubernetes: drop prefixes * refactor: remove import aliases forced due to vscode bug * refactor: move main SD struct out of config into discovery/config * refactor: rename configUtil to config_util * refactor: rename yamlUtil to yaml_config * refactor: kubernetes, remove prefixes * refactor: move the TargetGroup package to discovery/ * refactor: fix order of imports
2017-12-29 15:01:34 -05:00
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
)
const (
// DefaultMaxBatchSize is the default maximum number of alerts to send in a single request to the alertmanager.
DefaultMaxBatchSize = 256
contentTypeJSON = "application/json"
)
// String constants for instrumentation.
const (
namespace = "prometheus"
subsystem = "notifications"
alertmanagerLabel = "alertmanager"
)
var userAgent = version.PrometheusUserAgent()
2018-01-30 12:45:37 -05:00
// Manager is responsible for dispatching alert notifications to an
2015-12-10 10:31:50 -05:00
// alert manager service.
2018-01-30 12:45:37 -05:00
type Manager struct {
opts *Options
metrics *alertMetrics
mtx sync.RWMutex
stopOnce *sync.Once
stopRequested chan struct{}
alertmanagers map[string]*alertmanagerSet
logger *slog.Logger
}
2016-05-11 08:20:36 -04:00
// Options are the configurable parameters of a Handler.
2016-03-01 06:37:22 -05:00
type Options struct {
QueueCapacity int
DrainOnShutdown bool
ExternalLabels labels.Labels
RelabelConfigs []*relabel.Config
// Used for sending HTTP requests to the Alertmanager.
Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error)
2015-06-15 07:03:17 -04:00
Registerer prometheus.Registerer
// MaxBatchSize determines the maximum number of alerts to send in a single request to the alertmanager.
MaxBatchSize int
2015-06-15 07:03:17 -04:00
}
func do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
return client.Do(req.WithContext(ctx))
}
2018-01-30 12:45:37 -05:00
// NewManager is the manager constructor.
func NewManager(o *Options, nameValidationScheme model.ValidationScheme, logger *slog.Logger) *Manager {
if o.Do == nil {
o.Do = do
}
// Set default MaxBatchSize if not provided.
if o.MaxBatchSize <= 0 {
o.MaxBatchSize = DefaultMaxBatchSize
}
2017-08-11 14:45:52 -04:00
if logger == nil {
logger = promslog.NewNopLogger()
2017-08-11 14:45:52 -04:00
}
for _, rc := range o.RelabelConfigs {
switch rc.NameValidationScheme {
case model.LegacyValidation, model.UTF8Validation:
default:
rc.NameValidationScheme = nameValidationScheme
}
}
2018-01-30 12:45:37 -05:00
n := &Manager{
stopRequested: make(chan struct{}),
stopOnce: &sync.Once{},
opts: o,
logger: logger,
}
alertmanagersDiscoveredFunc := func() float64 { return float64(len(n.Alertmanagers())) }
2017-08-11 14:45:52 -04:00
n.metrics = newAlertMetrics(o.Registerer, alertmanagersDiscoveredFunc)
n.metrics.queueCapacity.Set(float64(o.QueueCapacity))
2017-08-11 14:45:52 -04:00
return n
}
// ApplyConfig updates the status state as the new config requires.
2018-01-30 12:45:37 -05:00
func (n *Manager) ApplyConfig(conf *config.Config) error {
n.mtx.Lock()
defer n.mtx.Unlock()
2015-12-10 10:31:50 -05:00
n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs
for i, rc := range n.opts.RelabelConfigs {
switch rc.NameValidationScheme {
case model.LegacyValidation, model.UTF8Validation:
default:
n.opts.RelabelConfigs[i].NameValidationScheme = conf.GlobalConfig.MetricNameValidationScheme
}
}
amSets := make(map[string]*alertmanagerSet)
// configToAlertmanagers maps alertmanager sets for each unique AlertmanagerConfig,
// helping to avoid dropping known alertmanagers and re-use them without waiting for SD updates when applying the config.
configToAlertmanagers := make(map[string]*alertmanagerSet, len(n.alertmanagers))
for _, oldAmSet := range n.alertmanagers {
hash, err := oldAmSet.configHash()
if err != nil {
return err
}
configToAlertmanagers[hash] = oldAmSet
}
for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() {
ams, err := newAlertmanagerSet(cfg, n.opts, n.logger, n.metrics)
if err != nil {
return err
}
hash, err := ams.configHash()
if err != nil {
return err
}
if oldAmSet, ok := configToAlertmanagers[hash]; ok {
ams.ams = oldAmSet.ams
ams.droppedAms = oldAmSet.droppedAms
// Only transfer sendLoops to the first new config with this hash.
// Subsequent configs with the same hash should not share the sendLoops
// map reference, as that would cause shared mutable state between
// alertmanagerSets (cleanup in one would affect the other).
oldAmSet.mtx.Lock()
if oldAmSet.sendLoops != nil {
ams.mtx.Lock()
ams.sendLoops = oldAmSet.sendLoops
oldAmSet.sendLoops = nil
ams.mtx.Unlock()
}
oldAmSet.mtx.Unlock()
}
amSets[k] = ams
}
// Clean up sendLoops that weren't transferred to new config.
// This happens when: (1) key was removed, or (2) key exists but hash changed.
// After the transfer loop above, any oldAmSet with non-nil sendLoops
// had its sendLoops NOT transferred (since we set it to nil on transfer).
for _, oldAmSet := range n.alertmanagers {
oldAmSet.mtx.Lock()
if oldAmSet.sendLoops != nil {
oldAmSet.cleanSendLoops(oldAmSet.ams...)
}
oldAmSet.mtx.Unlock()
}
n.alertmanagers = amSets
return nil
}
// Run dispatches notifications continuously, returning once Stop has been called and all
// pending notifications have been drained from the queue (if draining is enabled).
//
// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other.
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
n.targetUpdateLoop(tsets)
2015-12-10 10:31:50 -05:00
n.mtx.Lock()
defer n.mtx.Unlock()
for _, ams := range n.alertmanagers {
ams.mtx.Lock()
ams.cleanSendLoops(ams.ams...)
ams.mtx.Unlock()
}
}
// targetUpdateLoop receives updates of target groups and triggers a reload.
func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) {
for {
// If we've been asked to stop, that takes priority over processing any further target group updates.
select {
case <-n.stopRequested:
return
default:
select {
case <-n.stopRequested:
return
case ts, ok := <-tsets:
if !ok {
break
}
n.reload(ts)
}
}
}
}
2018-01-30 12:45:37 -05:00
func (n *Manager) reload(tgs map[string][]*targetgroup.Group) {
n.mtx.Lock()
defer n.mtx.Unlock()
for id, tgroup := range tgs {
am, ok := n.alertmanagers[id]
if !ok {
n.logger.Error("couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id))
continue
}
am.sync(tgroup)
}
}
2016-02-29 16:58:32 -05:00
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
2018-01-30 12:45:37 -05:00
func (n *Manager) Send(alerts ...*Alert) {
// If we've been asked to stop, that takes priority over accepting new alerts.
select {
case <-n.stopRequested:
return
default:
}
n.mtx.RLock()
defer n.mtx.RUnlock()
2015-12-10 10:31:50 -05:00
alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts)
if len(alerts) == 0 {
return
2015-12-10 10:31:50 -05:00
}
for _, ams := range n.alertmanagers {
ams.send(alerts...)
2015-12-10 10:31:50 -05:00
}
}
// Alertmanagers returns a slice of Alertmanager URLs.
2018-01-30 12:45:37 -05:00
func (n *Manager) Alertmanagers() []*url.URL {
n.mtx.RLock()
amSets := n.alertmanagers
n.mtx.RUnlock()
var res []*url.URL
for _, ams := range amSets {
ams.mtx.RLock()
for _, am := range ams.ams {
res = append(res, am.url())
}
ams.mtx.RUnlock()
}
return res
}
// DroppedAlertmanagers returns a slice of Alertmanager URLs.
func (n *Manager) DroppedAlertmanagers() []*url.URL {
n.mtx.RLock()
amSets := n.alertmanagers
n.mtx.RUnlock()
var res []*url.URL
for _, ams := range amSets {
ams.mtx.RLock()
for _, dam := range ams.droppedAms {
res = append(res, dam.url())
}
ams.mtx.RUnlock()
}
return res
}
// Stop signals the notification manager to shut down and immediately returns.
//
// Run will return once the notification manager has successfully shut down.
//
// The manager will optionally drain send loops before shutting down.
//
// Stop is safe to call multiple times.
2018-01-30 12:45:37 -05:00
func (n *Manager) Stop() {
n.logger.Info("Stopping notification manager...")
n.stopOnce.Do(func() {
close(n.stopRequested)
})
}