mirror of
https://github.com/hashicorp/vault.git
synced 2026-02-20 00:13:53 -05:00
Add database plugin metrics around connections (#16048)
Add database plugin metrics around connections This is a replacement for #15923 that takes into account recent lock cleanup. I went ahead and added back in the hanging plugin test, which I meant to add in #15944 but forgot. I tested this by spinning up a statsd sink in the tests and verifying I got a stream of metrics: ``` $ nc -u -l 8125 | grep backend test.swenson-Q9Q0L72D39.secrets.database.backend.connections.count.pgx.5.:1.000000|g test.swenson-Q9Q0L72D39.secrets.database.backend.connections.count.pgx.5.:0.000000|g test.swenson-Q9Q0L72D39.secrets.database.backend.connections.count.pgx.5.:1.000000|g test.swenson-Q9Q0L72D39.secrets.database.backend.connections.count.pgx.5.:0.000000|g ``` We have to rework the shared gauge code to work without a full `ClusterMetricSink`, since we don't have access to the core metrics from within a plugin. This only reports metrics every 10 minutes by default, but it solves some problems we would have had with the gauge values becoming stale and needing to be re-sent. Co-authored-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
parent
3f9dbabfc1
commit
53bfb72eca
6 changed files with 248 additions and 28 deletions
|
|
@ -8,9 +8,12 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/helper/metricsutil"
|
||||
"github.com/hashicorp/vault/internalshared/configutil"
|
||||
v4 "github.com/hashicorp/vault/sdk/database/dbplugin"
|
||||
v5 "github.com/hashicorp/vault/sdk/database/dbplugin/v5"
|
||||
"github.com/hashicorp/vault/sdk/database/helper/dbutil"
|
||||
|
|
@ -57,6 +60,21 @@ func Factory(ctx context.Context, conf *logical.BackendConfig) (logical.Backend,
|
|||
b.credRotationQueue = queue.New()
|
||||
// Load queue and kickoff new periodic ticker
|
||||
go b.initQueue(b.queueCtx, conf, conf.System.ReplicationState())
|
||||
|
||||
// collect metrics on number of plugin instances
|
||||
var err error
|
||||
b.gaugeCollectionProcess, err = metricsutil.NewGaugeCollectionProcess(
|
||||
[]string{"secrets", "database", "backend", "pluginInstances", "count"},
|
||||
[]metricsutil.Label{},
|
||||
b.collectPluginInstanceGaugeValues,
|
||||
metrics.Default(),
|
||||
configutil.UsageGaugeDefaultPeriod, // TODO: add config settings for these, or add plumbing to the main config settings
|
||||
configutil.MaximumGaugeCardinalityDefault,
|
||||
b.logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go b.gaugeCollectionProcess.Run()
|
||||
return b, nil
|
||||
}
|
||||
|
||||
|
|
@ -103,6 +121,36 @@ func Backend(conf *logical.BackendConfig) *databaseBackend {
|
|||
return &b
|
||||
}
|
||||
|
||||
func (b *databaseBackend) collectPluginInstanceGaugeValues(context.Context) ([]metricsutil.GaugeLabelValues, error) {
|
||||
// copy the map so we can release the lock
|
||||
connMapCopy := func() map[string]*dbPluginInstance {
|
||||
b.connLock.RLock()
|
||||
defer b.connLock.RUnlock()
|
||||
mapCopy := map[string]*dbPluginInstance{}
|
||||
for k, v := range b.connections {
|
||||
mapCopy[k] = v
|
||||
}
|
||||
return mapCopy
|
||||
}()
|
||||
counts := map[string]int{}
|
||||
for _, v := range connMapCopy {
|
||||
dbType, err := v.database.Type()
|
||||
if err != nil {
|
||||
// there's a chance this will already be closed since we don't hold the lock
|
||||
continue
|
||||
}
|
||||
if _, ok := counts[dbType]; !ok {
|
||||
counts[dbType] = 0
|
||||
}
|
||||
counts[dbType] += 1
|
||||
}
|
||||
var gauges []metricsutil.GaugeLabelValues
|
||||
for k, v := range counts {
|
||||
gauges = append(gauges, metricsutil.GaugeLabelValues{Labels: []metricsutil.Label{{Name: "dbType", Value: k}}, Value: float32(v)})
|
||||
}
|
||||
return gauges, nil
|
||||
}
|
||||
|
||||
type databaseBackend struct {
|
||||
// connLock is used to synchronize access to the connections map
|
||||
connLock sync.RWMutex
|
||||
|
|
@ -125,6 +173,9 @@ type databaseBackend struct {
|
|||
// concurrent requests are not modifying the same role and possibly causing
|
||||
// issues with the priority queue.
|
||||
roleLocks []*locksutil.LockEntry
|
||||
|
||||
// the running gauge collection process
|
||||
gaugeCollectionProcess *metricsutil.GaugeCollectionProcess
|
||||
}
|
||||
|
||||
func (b *databaseBackend) connGet(name string) *dbPluginInstance {
|
||||
|
|
@ -136,8 +187,10 @@ func (b *databaseBackend) connGet(name string) *dbPluginInstance {
|
|||
func (b *databaseBackend) connPop(name string) *dbPluginInstance {
|
||||
b.connLock.Lock()
|
||||
defer b.connLock.Unlock()
|
||||
dbi := b.connections[name]
|
||||
delete(b.connections, name)
|
||||
dbi, ok := b.connections[name]
|
||||
if ok {
|
||||
delete(b.connections, name)
|
||||
}
|
||||
return dbi
|
||||
}
|
||||
|
||||
|
|
@ -362,6 +415,9 @@ func (b *databaseBackend) clean(_ context.Context) {
|
|||
for _, db := range connections {
|
||||
go db.Close()
|
||||
}
|
||||
if b.gaugeCollectionProcess != nil {
|
||||
b.gaugeCollectionProcess.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
const backendHelp = `
|
||||
|
|
|
|||
|
|
@ -1461,6 +1461,89 @@ func TestBackend_ConnectionURL_redacted(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type hangingPlugin struct{}
|
||||
|
||||
func (h hangingPlugin) Initialize(_ context.Context, req v5.InitializeRequest) (v5.InitializeResponse, error) {
|
||||
return v5.InitializeResponse{
|
||||
Config: req.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h hangingPlugin) NewUser(_ context.Context, _ v5.NewUserRequest) (v5.NewUserResponse, error) {
|
||||
return v5.NewUserResponse{}, nil
|
||||
}
|
||||
|
||||
func (h hangingPlugin) UpdateUser(_ context.Context, _ v5.UpdateUserRequest) (v5.UpdateUserResponse, error) {
|
||||
return v5.UpdateUserResponse{}, nil
|
||||
}
|
||||
|
||||
func (h hangingPlugin) DeleteUser(_ context.Context, _ v5.DeleteUserRequest) (v5.DeleteUserResponse, error) {
|
||||
return v5.DeleteUserResponse{}, nil
|
||||
}
|
||||
|
||||
func (h hangingPlugin) Type() (string, error) {
|
||||
return "hanging", nil
|
||||
}
|
||||
|
||||
func (h hangingPlugin) Close() error {
|
||||
time.Sleep(1000 * time.Second)
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ v5.Database = (*hangingPlugin)(nil)
|
||||
|
||||
func TestBackend_PluginMain_Hanging(t *testing.T) {
|
||||
if os.Getenv(pluginutil.PluginVaultVersionEnv) == "" {
|
||||
return
|
||||
}
|
||||
v5.Serve(&hangingPlugin{})
|
||||
}
|
||||
|
||||
func TestBackend_AsyncClose(t *testing.T) {
|
||||
// Test that having a plugin that takes a LONG time to close will not cause the cleanup function to take
|
||||
// longer than 750ms.
|
||||
cluster, sys := getCluster(t)
|
||||
vault.TestAddTestPlugin(t, cluster.Cores[0].Core, "hanging-plugin", consts.PluginTypeDatabase, "TestBackend_PluginMain_Hanging", []string{}, "")
|
||||
t.Cleanup(cluster.Cleanup)
|
||||
|
||||
config := logical.TestBackendConfig()
|
||||
config.StorageView = &logical.InmemStorage{}
|
||||
config.System = sys
|
||||
|
||||
b, err := Factory(context.Background(), config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Configure a connection
|
||||
data := map[string]interface{}{
|
||||
"connection_url": "doesn't matter",
|
||||
"plugin_name": "hanging-plugin",
|
||||
"allowed_roles": []string{"plugin-role-test"},
|
||||
}
|
||||
req := &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "config/hang",
|
||||
Storage: config.StorageView,
|
||||
Data: data,
|
||||
}
|
||||
_, err = b.HandleRequest(namespace.RootContext(nil), req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
timeout := time.NewTimer(750 * time.Millisecond)
|
||||
done := make(chan bool)
|
||||
go func() {
|
||||
b.Cleanup(context.Background())
|
||||
done <- true
|
||||
}()
|
||||
select {
|
||||
case <-timeout.C:
|
||||
t.Error("Hanging plugin caused Close() to take longer than 750ms")
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
|
||||
func testCredsExist(t *testing.T, resp *logical.Response, connURL string) bool {
|
||||
t.Helper()
|
||||
var d struct {
|
||||
|
|
|
|||
|
|
@ -1379,6 +1379,7 @@ func setupMockDB(b *databaseBackend) *mockNewDatabase {
|
|||
mockDB := &mockNewDatabase{}
|
||||
mockDB.On("Initialize", mock.Anything, mock.Anything).Return(v5.InitializeResponse{}, nil)
|
||||
mockDB.On("Close").Return(nil)
|
||||
mockDB.On("Type").Return("mock", nil)
|
||||
dbw := databaseVersionWrapper{
|
||||
v5: mockDB,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
|
|
@ -60,7 +61,7 @@ type GaugeCollectionProcess struct {
|
|||
collector GaugeCollector
|
||||
|
||||
// destination for metrics
|
||||
sink *ClusterMetricSink
|
||||
sink Metrics
|
||||
logger log.Logger
|
||||
|
||||
// time between collections
|
||||
|
|
@ -68,10 +69,39 @@ type GaugeCollectionProcess struct {
|
|||
currentInterval time.Duration
|
||||
ticker *time.Ticker
|
||||
|
||||
// used to help limit cardinality
|
||||
maxGaugeCardinality int
|
||||
|
||||
// time source
|
||||
clock clock
|
||||
}
|
||||
|
||||
// NewGaugeCollectionProcess creates a new collection process for the callback
|
||||
// function given as an argument, and starts it running.
|
||||
// A label should be provided for metrics *about* this collection process.
|
||||
//
|
||||
// The Run() method must be called to start the process.
|
||||
func NewGaugeCollectionProcess(
|
||||
key []string,
|
||||
id []Label,
|
||||
collector GaugeCollector,
|
||||
m metrics.MetricSink,
|
||||
gaugeInterval time.Duration,
|
||||
maxGaugeCardinality int,
|
||||
logger log.Logger,
|
||||
) (*GaugeCollectionProcess, error) {
|
||||
return newGaugeCollectionProcessWithClock(
|
||||
key,
|
||||
id,
|
||||
collector,
|
||||
SinkWrapper{MetricSink: m},
|
||||
gaugeInterval,
|
||||
maxGaugeCardinality,
|
||||
logger,
|
||||
defaultClock{},
|
||||
)
|
||||
}
|
||||
|
||||
// NewGaugeCollectionProcess creates a new collection process for the callback
|
||||
// function given as an argument, and starts it running.
|
||||
// A label should be provided for metrics *about* this collection process.
|
||||
|
|
@ -83,41 +113,48 @@ func (m *ClusterMetricSink) NewGaugeCollectionProcess(
|
|||
collector GaugeCollector,
|
||||
logger log.Logger,
|
||||
) (*GaugeCollectionProcess, error) {
|
||||
return m.newGaugeCollectionProcessWithClock(
|
||||
return newGaugeCollectionProcessWithClock(
|
||||
key,
|
||||
id,
|
||||
collector,
|
||||
m,
|
||||
m.GaugeInterval,
|
||||
m.MaxGaugeCardinality,
|
||||
logger,
|
||||
defaultClock{},
|
||||
)
|
||||
}
|
||||
|
||||
// test version allows an alternative clock implementation
|
||||
func (m *ClusterMetricSink) newGaugeCollectionProcessWithClock(
|
||||
func newGaugeCollectionProcessWithClock(
|
||||
key []string,
|
||||
id []Label,
|
||||
collector GaugeCollector,
|
||||
sink Metrics,
|
||||
gaugeInterval time.Duration,
|
||||
maxGaugeCardinality int,
|
||||
logger log.Logger,
|
||||
clock clock,
|
||||
) (*GaugeCollectionProcess, error) {
|
||||
process := &GaugeCollectionProcess{
|
||||
stop: make(chan struct{}, 1),
|
||||
stopped: make(chan struct{}, 1),
|
||||
key: key,
|
||||
labels: id,
|
||||
collector: collector,
|
||||
sink: m,
|
||||
originalInterval: m.GaugeInterval,
|
||||
currentInterval: m.GaugeInterval,
|
||||
logger: logger,
|
||||
clock: clock,
|
||||
stop: make(chan struct{}, 1),
|
||||
stopped: make(chan struct{}, 1),
|
||||
key: key,
|
||||
labels: id,
|
||||
collector: collector,
|
||||
sink: sink,
|
||||
originalInterval: gaugeInterval,
|
||||
currentInterval: gaugeInterval,
|
||||
maxGaugeCardinality: maxGaugeCardinality,
|
||||
logger: logger,
|
||||
clock: clock,
|
||||
}
|
||||
return process, nil
|
||||
}
|
||||
|
||||
// delayStart randomly delays by up to one extra interval
|
||||
// so that collection processes do not all run at the time time.
|
||||
// If we knew all the procsses in advance, we could just schedule them
|
||||
// so that collection processes do not all run at the time.
|
||||
// If we knew all the processes in advance, we could just schedule them
|
||||
// evenly, but a new one could be added per secret engine.
|
||||
func (p *GaugeCollectionProcess) delayStart() bool {
|
||||
randomDelay := time.Duration(rand.Int63n(int64(p.currentInterval)))
|
||||
|
|
@ -187,11 +224,11 @@ func (p *GaugeCollectionProcess) collectAndFilterGauges() {
|
|||
// Filter to top N.
|
||||
// This does not guarantee total cardinality is <= N, but it does slow things down
|
||||
// a little if the cardinality *is* too high and the gauge needs to be disabled.
|
||||
if len(values) > p.sink.MaxGaugeCardinality {
|
||||
if len(values) > p.maxGaugeCardinality {
|
||||
sort.Slice(values, func(a, b int) bool {
|
||||
return values[a].Value > values[b].Value
|
||||
})
|
||||
values = values[:p.sink.MaxGaugeCardinality]
|
||||
values = values[:p.maxGaugeCardinality]
|
||||
}
|
||||
|
||||
p.streamGaugesToSink(values)
|
||||
|
|
|
|||
|
|
@ -147,10 +147,13 @@ func TestGauge_StartDelay(t *testing.T) {
|
|||
sink := BlackholeSink()
|
||||
sink.GaugeInterval = 2 * time.Hour
|
||||
|
||||
p, err := sink.newGaugeCollectionProcessWithClock(
|
||||
p, err := newGaugeCollectionProcessWithClock(
|
||||
[]string{"example", "count"},
|
||||
[]Label{{"gauge", "test"}},
|
||||
c.EmptyCollectionFunction,
|
||||
sink,
|
||||
sink.GaugeInterval,
|
||||
sink.MaxGaugeCardinality,
|
||||
log.Default(),
|
||||
s,
|
||||
)
|
||||
|
|
@ -209,10 +212,13 @@ func TestGauge_StoppedDuringInitialDelay(t *testing.T) {
|
|||
sink := BlackholeSink()
|
||||
sink.GaugeInterval = 2 * time.Hour
|
||||
|
||||
p, err := sink.newGaugeCollectionProcessWithClock(
|
||||
p, err := newGaugeCollectionProcessWithClock(
|
||||
[]string{"example", "count"},
|
||||
[]Label{{"gauge", "test"}},
|
||||
c.EmptyCollectionFunction,
|
||||
sink,
|
||||
sink.GaugeInterval,
|
||||
sink.MaxGaugeCardinality,
|
||||
log.Default(),
|
||||
s,
|
||||
)
|
||||
|
|
@ -235,10 +241,13 @@ func TestGauge_StoppedAfterInitialDelay(t *testing.T) {
|
|||
sink := BlackholeSink()
|
||||
sink.GaugeInterval = 2 * time.Hour
|
||||
|
||||
p, err := sink.newGaugeCollectionProcessWithClock(
|
||||
p, err := newGaugeCollectionProcessWithClock(
|
||||
[]string{"example", "count"},
|
||||
[]Label{{"gauge", "test"}},
|
||||
c.EmptyCollectionFunction,
|
||||
sink,
|
||||
sink.GaugeInterval,
|
||||
sink.MaxGaugeCardinality,
|
||||
log.Default(),
|
||||
s,
|
||||
)
|
||||
|
|
@ -274,10 +283,13 @@ func TestGauge_Backoff(t *testing.T) {
|
|||
return []GaugeLabelValues{}, nil
|
||||
}
|
||||
|
||||
p, err := sink.newGaugeCollectionProcessWithClock(
|
||||
p, err := newGaugeCollectionProcessWithClock(
|
||||
[]string{"example", "count"},
|
||||
[]Label{{"gauge", "test"}},
|
||||
f,
|
||||
sink,
|
||||
sink.GaugeInterval,
|
||||
sink.MaxGaugeCardinality,
|
||||
log.Default(),
|
||||
s,
|
||||
)
|
||||
|
|
@ -300,10 +312,13 @@ func TestGauge_RestartTimer(t *testing.T) {
|
|||
sink := BlackholeSink()
|
||||
sink.GaugeInterval = 2 * time.Hour
|
||||
|
||||
p, err := sink.newGaugeCollectionProcessWithClock(
|
||||
p, err := newGaugeCollectionProcessWithClock(
|
||||
[]string{"example", "count"},
|
||||
[]Label{{"gauge", "test"}},
|
||||
c.EmptyCollectionFunction,
|
||||
sink,
|
||||
sink.GaugeInterval,
|
||||
sink.MaxGaugeCardinality,
|
||||
log.Default(),
|
||||
s,
|
||||
)
|
||||
|
|
@ -370,10 +385,13 @@ func TestGauge_InterruptedStreaming(t *testing.T) {
|
|||
sink.MaxGaugeCardinality = 500
|
||||
sink.GaugeInterval = 2 * time.Hour
|
||||
|
||||
p, err := sink.newGaugeCollectionProcessWithClock(
|
||||
p, err := newGaugeCollectionProcessWithClock(
|
||||
[]string{"example", "count"},
|
||||
[]Label{{"gauge", "test"}},
|
||||
nil, // shouldn't be called
|
||||
sink,
|
||||
sink.GaugeInterval,
|
||||
sink.MaxGaugeCardinality,
|
||||
log.Default(),
|
||||
s,
|
||||
)
|
||||
|
|
@ -445,10 +463,13 @@ func TestGauge_MaximumMeasurements(t *testing.T) {
|
|||
|
||||
// Advance time by 0.5% of duration
|
||||
advance := time.Duration(int(0.005 * float32(sink.GaugeInterval)))
|
||||
p, err := sink.newGaugeCollectionProcessWithClock(
|
||||
p, err := newGaugeCollectionProcessWithClock(
|
||||
[]string{"example", "count"},
|
||||
[]Label{{"gauge", "test"}},
|
||||
c.makeFunctionForValues(values, s, advance),
|
||||
sink,
|
||||
sink.GaugeInterval,
|
||||
sink.MaxGaugeCardinality,
|
||||
log.Default(),
|
||||
s,
|
||||
)
|
||||
|
|
@ -524,10 +545,13 @@ func TestGauge_MeasurementError(t *testing.T) {
|
|||
return values, errors.New("test error")
|
||||
}
|
||||
|
||||
p, err := sink.newGaugeCollectionProcessWithClock(
|
||||
p, err := newGaugeCollectionProcessWithClock(
|
||||
[]string{"example", "count"},
|
||||
[]Label{{"gauge", "test"}},
|
||||
f,
|
||||
sink,
|
||||
sink.GaugeInterval,
|
||||
sink.MaxGaugeCardinality,
|
||||
log.Default(),
|
||||
s,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
)
|
||||
|
||||
|
|
@ -49,6 +49,25 @@ type Metrics interface {
|
|||
|
||||
var _ Metrics = &ClusterMetricSink{}
|
||||
|
||||
// SinkWrapper implements `metricsutil.Metrics` using an instance of
|
||||
// armon/go-metrics `MetricSink` as the underlying implementation.
|
||||
type SinkWrapper struct {
|
||||
metrics.MetricSink
|
||||
}
|
||||
|
||||
func (s SinkWrapper) AddDurationWithLabels(key []string, d time.Duration, labels []Label) {
|
||||
val := float32(d) / float32(time.Millisecond)
|
||||
s.MetricSink.AddSampleWithLabels(key, val, labels)
|
||||
}
|
||||
|
||||
func (s SinkWrapper) MeasureSinceWithLabels(key []string, start time.Time, labels []Label) {
|
||||
elapsed := time.Now().Sub(start)
|
||||
val := float32(elapsed) / float32(time.Millisecond)
|
||||
s.MetricSink.AddSampleWithLabels(key, val, labels)
|
||||
}
|
||||
|
||||
var _ Metrics = SinkWrapper{}
|
||||
|
||||
// Convenience alias
|
||||
type Label = metrics.Label
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue