diff --git a/builtin/logical/database/backend.go b/builtin/logical/database/backend.go index e366cb2664..06d92098b3 100644 --- a/builtin/logical/database/backend.go +++ b/builtin/logical/database/backend.go @@ -8,9 +8,12 @@ import ( "sync" "time" + "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-secure-stdlib/strutil" "github.com/hashicorp/go-uuid" + "github.com/hashicorp/vault/helper/metricsutil" + "github.com/hashicorp/vault/internalshared/configutil" v4 "github.com/hashicorp/vault/sdk/database/dbplugin" v5 "github.com/hashicorp/vault/sdk/database/dbplugin/v5" "github.com/hashicorp/vault/sdk/database/helper/dbutil" @@ -57,6 +60,21 @@ func Factory(ctx context.Context, conf *logical.BackendConfig) (logical.Backend, b.credRotationQueue = queue.New() // Load queue and kickoff new periodic ticker go b.initQueue(b.queueCtx, conf, conf.System.ReplicationState()) + + // collect metrics on number of plugin instances + var err error + b.gaugeCollectionProcess, err = metricsutil.NewGaugeCollectionProcess( + []string{"secrets", "database", "backend", "pluginInstances", "count"}, + []metricsutil.Label{}, + b.collectPluginInstanceGaugeValues, + metrics.Default(), + configutil.UsageGaugeDefaultPeriod, // TODO: add config settings for these, or add plumbing to the main config settings + configutil.MaximumGaugeCardinalityDefault, + b.logger) + if err != nil { + return nil, err + } + go b.gaugeCollectionProcess.Run() return b, nil } @@ -103,6 +121,36 @@ func Backend(conf *logical.BackendConfig) *databaseBackend { return &b } +func (b *databaseBackend) collectPluginInstanceGaugeValues(context.Context) ([]metricsutil.GaugeLabelValues, error) { + // copy the map so we can release the lock + connMapCopy := func() map[string]*dbPluginInstance { + b.connLock.RLock() + defer b.connLock.RUnlock() + mapCopy := map[string]*dbPluginInstance{} + for k, v := range b.connections { + mapCopy[k] = v + } + return mapCopy + }() + counts := map[string]int{} + for _, v := range connMapCopy { + dbType, err := v.database.Type() + if err != nil { + // there's a chance this will already be closed since we don't hold the lock + continue + } + if _, ok := counts[dbType]; !ok { + counts[dbType] = 0 + } + counts[dbType] += 1 + } + var gauges []metricsutil.GaugeLabelValues + for k, v := range counts { + gauges = append(gauges, metricsutil.GaugeLabelValues{Labels: []metricsutil.Label{{Name: "dbType", Value: k}}, Value: float32(v)}) + } + return gauges, nil +} + type databaseBackend struct { // connLock is used to synchronize access to the connections map connLock sync.RWMutex @@ -125,6 +173,9 @@ type databaseBackend struct { // concurrent requests are not modifying the same role and possibly causing // issues with the priority queue. roleLocks []*locksutil.LockEntry + + // the running gauge collection process + gaugeCollectionProcess *metricsutil.GaugeCollectionProcess } func (b *databaseBackend) connGet(name string) *dbPluginInstance { @@ -136,8 +187,10 @@ func (b *databaseBackend) connGet(name string) *dbPluginInstance { func (b *databaseBackend) connPop(name string) *dbPluginInstance { b.connLock.Lock() defer b.connLock.Unlock() - dbi := b.connections[name] - delete(b.connections, name) + dbi, ok := b.connections[name] + if ok { + delete(b.connections, name) + } return dbi } @@ -362,6 +415,9 @@ func (b *databaseBackend) clean(_ context.Context) { for _, db := range connections { go db.Close() } + if b.gaugeCollectionProcess != nil { + b.gaugeCollectionProcess.Stop() + } } const backendHelp = ` diff --git a/builtin/logical/database/backend_test.go b/builtin/logical/database/backend_test.go index cf700c6d71..9cdf32ecaa 100644 --- a/builtin/logical/database/backend_test.go +++ b/builtin/logical/database/backend_test.go @@ -1461,6 +1461,89 @@ func TestBackend_ConnectionURL_redacted(t *testing.T) { } } +type hangingPlugin struct{} + +func (h hangingPlugin) Initialize(_ context.Context, req v5.InitializeRequest) (v5.InitializeResponse, error) { + return v5.InitializeResponse{ + Config: req.Config, + }, nil +} + +func (h hangingPlugin) NewUser(_ context.Context, _ v5.NewUserRequest) (v5.NewUserResponse, error) { + return v5.NewUserResponse{}, nil +} + +func (h hangingPlugin) UpdateUser(_ context.Context, _ v5.UpdateUserRequest) (v5.UpdateUserResponse, error) { + return v5.UpdateUserResponse{}, nil +} + +func (h hangingPlugin) DeleteUser(_ context.Context, _ v5.DeleteUserRequest) (v5.DeleteUserResponse, error) { + return v5.DeleteUserResponse{}, nil +} + +func (h hangingPlugin) Type() (string, error) { + return "hanging", nil +} + +func (h hangingPlugin) Close() error { + time.Sleep(1000 * time.Second) + return nil +} + +var _ v5.Database = (*hangingPlugin)(nil) + +func TestBackend_PluginMain_Hanging(t *testing.T) { + if os.Getenv(pluginutil.PluginVaultVersionEnv) == "" { + return + } + v5.Serve(&hangingPlugin{}) +} + +func TestBackend_AsyncClose(t *testing.T) { + // Test that having a plugin that takes a LONG time to close will not cause the cleanup function to take + // longer than 750ms. + cluster, sys := getCluster(t) + vault.TestAddTestPlugin(t, cluster.Cores[0].Core, "hanging-plugin", consts.PluginTypeDatabase, "TestBackend_PluginMain_Hanging", []string{}, "") + t.Cleanup(cluster.Cleanup) + + config := logical.TestBackendConfig() + config.StorageView = &logical.InmemStorage{} + config.System = sys + + b, err := Factory(context.Background(), config) + if err != nil { + t.Fatal(err) + } + + // Configure a connection + data := map[string]interface{}{ + "connection_url": "doesn't matter", + "plugin_name": "hanging-plugin", + "allowed_roles": []string{"plugin-role-test"}, + } + req := &logical.Request{ + Operation: logical.UpdateOperation, + Path: "config/hang", + Storage: config.StorageView, + Data: data, + } + _, err = b.HandleRequest(namespace.RootContext(nil), req) + if err != nil { + t.Fatalf("err: %v", err) + } + timeout := time.NewTimer(750 * time.Millisecond) + done := make(chan bool) + go func() { + b.Cleanup(context.Background()) + done <- true + }() + select { + case <-timeout.C: + t.Error("Hanging plugin caused Close() to take longer than 750ms") + case <-done: + } +} + func testCredsExist(t *testing.T, resp *logical.Response, connURL string) bool { t.Helper() var d struct { diff --git a/builtin/logical/database/rotation_test.go b/builtin/logical/database/rotation_test.go index 863f1a01e3..9f6e65f9b7 100644 --- a/builtin/logical/database/rotation_test.go +++ b/builtin/logical/database/rotation_test.go @@ -1379,6 +1379,7 @@ func setupMockDB(b *databaseBackend) *mockNewDatabase { mockDB := &mockNewDatabase{} mockDB.On("Initialize", mock.Anything, mock.Anything).Return(v5.InitializeResponse{}, nil) mockDB.On("Close").Return(nil) + mockDB.On("Type").Return("mock", nil) dbw := databaseVersionWrapper{ v5: mockDB, } diff --git a/helper/metricsutil/gauge_process.go b/helper/metricsutil/gauge_process.go index fd327bc687..0ad0e9d876 100644 --- a/helper/metricsutil/gauge_process.go +++ b/helper/metricsutil/gauge_process.go @@ -6,6 +6,7 @@ import ( "sort" "time" + "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" ) @@ -60,7 +61,7 @@ type GaugeCollectionProcess struct { collector GaugeCollector // destination for metrics - sink *ClusterMetricSink + sink Metrics logger log.Logger // time between collections @@ -68,10 +69,39 @@ type GaugeCollectionProcess struct { currentInterval time.Duration ticker *time.Ticker + // used to help limit cardinality + maxGaugeCardinality int + // time source clock clock } +// NewGaugeCollectionProcess creates a new collection process for the callback +// function given as an argument, and starts it running. +// A label should be provided for metrics *about* this collection process. +// +// The Run() method must be called to start the process. +func NewGaugeCollectionProcess( + key []string, + id []Label, + collector GaugeCollector, + m metrics.MetricSink, + gaugeInterval time.Duration, + maxGaugeCardinality int, + logger log.Logger, +) (*GaugeCollectionProcess, error) { + return newGaugeCollectionProcessWithClock( + key, + id, + collector, + SinkWrapper{MetricSink: m}, + gaugeInterval, + maxGaugeCardinality, + logger, + defaultClock{}, + ) +} + // NewGaugeCollectionProcess creates a new collection process for the callback // function given as an argument, and starts it running. // A label should be provided for metrics *about* this collection process. @@ -83,41 +113,48 @@ func (m *ClusterMetricSink) NewGaugeCollectionProcess( collector GaugeCollector, logger log.Logger, ) (*GaugeCollectionProcess, error) { - return m.newGaugeCollectionProcessWithClock( + return newGaugeCollectionProcessWithClock( key, id, collector, + m, + m.GaugeInterval, + m.MaxGaugeCardinality, logger, defaultClock{}, ) } // test version allows an alternative clock implementation -func (m *ClusterMetricSink) newGaugeCollectionProcessWithClock( +func newGaugeCollectionProcessWithClock( key []string, id []Label, collector GaugeCollector, + sink Metrics, + gaugeInterval time.Duration, + maxGaugeCardinality int, logger log.Logger, clock clock, ) (*GaugeCollectionProcess, error) { process := &GaugeCollectionProcess{ - stop: make(chan struct{}, 1), - stopped: make(chan struct{}, 1), - key: key, - labels: id, - collector: collector, - sink: m, - originalInterval: m.GaugeInterval, - currentInterval: m.GaugeInterval, - logger: logger, - clock: clock, + stop: make(chan struct{}, 1), + stopped: make(chan struct{}, 1), + key: key, + labels: id, + collector: collector, + sink: sink, + originalInterval: gaugeInterval, + currentInterval: gaugeInterval, + maxGaugeCardinality: maxGaugeCardinality, + logger: logger, + clock: clock, } return process, nil } // delayStart randomly delays by up to one extra interval -// so that collection processes do not all run at the time time. -// If we knew all the procsses in advance, we could just schedule them +// so that collection processes do not all run at the time. +// If we knew all the processes in advance, we could just schedule them // evenly, but a new one could be added per secret engine. func (p *GaugeCollectionProcess) delayStart() bool { randomDelay := time.Duration(rand.Int63n(int64(p.currentInterval))) @@ -187,11 +224,11 @@ func (p *GaugeCollectionProcess) collectAndFilterGauges() { // Filter to top N. // This does not guarantee total cardinality is <= N, but it does slow things down // a little if the cardinality *is* too high and the gauge needs to be disabled. - if len(values) > p.sink.MaxGaugeCardinality { + if len(values) > p.maxGaugeCardinality { sort.Slice(values, func(a, b int) bool { return values[a].Value > values[b].Value }) - values = values[:p.sink.MaxGaugeCardinality] + values = values[:p.maxGaugeCardinality] } p.streamGaugesToSink(values) diff --git a/helper/metricsutil/gauge_process_test.go b/helper/metricsutil/gauge_process_test.go index 89ef813a85..9971714e04 100644 --- a/helper/metricsutil/gauge_process_test.go +++ b/helper/metricsutil/gauge_process_test.go @@ -147,10 +147,13 @@ func TestGauge_StartDelay(t *testing.T) { sink := BlackholeSink() sink.GaugeInterval = 2 * time.Hour - p, err := sink.newGaugeCollectionProcessWithClock( + p, err := newGaugeCollectionProcessWithClock( []string{"example", "count"}, []Label{{"gauge", "test"}}, c.EmptyCollectionFunction, + sink, + sink.GaugeInterval, + sink.MaxGaugeCardinality, log.Default(), s, ) @@ -209,10 +212,13 @@ func TestGauge_StoppedDuringInitialDelay(t *testing.T) { sink := BlackholeSink() sink.GaugeInterval = 2 * time.Hour - p, err := sink.newGaugeCollectionProcessWithClock( + p, err := newGaugeCollectionProcessWithClock( []string{"example", "count"}, []Label{{"gauge", "test"}}, c.EmptyCollectionFunction, + sink, + sink.GaugeInterval, + sink.MaxGaugeCardinality, log.Default(), s, ) @@ -235,10 +241,13 @@ func TestGauge_StoppedAfterInitialDelay(t *testing.T) { sink := BlackholeSink() sink.GaugeInterval = 2 * time.Hour - p, err := sink.newGaugeCollectionProcessWithClock( + p, err := newGaugeCollectionProcessWithClock( []string{"example", "count"}, []Label{{"gauge", "test"}}, c.EmptyCollectionFunction, + sink, + sink.GaugeInterval, + sink.MaxGaugeCardinality, log.Default(), s, ) @@ -274,10 +283,13 @@ func TestGauge_Backoff(t *testing.T) { return []GaugeLabelValues{}, nil } - p, err := sink.newGaugeCollectionProcessWithClock( + p, err := newGaugeCollectionProcessWithClock( []string{"example", "count"}, []Label{{"gauge", "test"}}, f, + sink, + sink.GaugeInterval, + sink.MaxGaugeCardinality, log.Default(), s, ) @@ -300,10 +312,13 @@ func TestGauge_RestartTimer(t *testing.T) { sink := BlackholeSink() sink.GaugeInterval = 2 * time.Hour - p, err := sink.newGaugeCollectionProcessWithClock( + p, err := newGaugeCollectionProcessWithClock( []string{"example", "count"}, []Label{{"gauge", "test"}}, c.EmptyCollectionFunction, + sink, + sink.GaugeInterval, + sink.MaxGaugeCardinality, log.Default(), s, ) @@ -370,10 +385,13 @@ func TestGauge_InterruptedStreaming(t *testing.T) { sink.MaxGaugeCardinality = 500 sink.GaugeInterval = 2 * time.Hour - p, err := sink.newGaugeCollectionProcessWithClock( + p, err := newGaugeCollectionProcessWithClock( []string{"example", "count"}, []Label{{"gauge", "test"}}, nil, // shouldn't be called + sink, + sink.GaugeInterval, + sink.MaxGaugeCardinality, log.Default(), s, ) @@ -445,10 +463,13 @@ func TestGauge_MaximumMeasurements(t *testing.T) { // Advance time by 0.5% of duration advance := time.Duration(int(0.005 * float32(sink.GaugeInterval))) - p, err := sink.newGaugeCollectionProcessWithClock( + p, err := newGaugeCollectionProcessWithClock( []string{"example", "count"}, []Label{{"gauge", "test"}}, c.makeFunctionForValues(values, s, advance), + sink, + sink.GaugeInterval, + sink.MaxGaugeCardinality, log.Default(), s, ) @@ -524,10 +545,13 @@ func TestGauge_MeasurementError(t *testing.T) { return values, errors.New("test error") } - p, err := sink.newGaugeCollectionProcessWithClock( + p, err := newGaugeCollectionProcessWithClock( []string{"example", "count"}, []Label{{"gauge", "test"}}, f, + sink, + sink.GaugeInterval, + sink.MaxGaugeCardinality, log.Default(), s, ) diff --git a/helper/metricsutil/wrapped_metrics.go b/helper/metricsutil/wrapped_metrics.go index dcbd42aad3..67deb3bee1 100644 --- a/helper/metricsutil/wrapped_metrics.go +++ b/helper/metricsutil/wrapped_metrics.go @@ -5,7 +5,7 @@ import ( "sync/atomic" "time" - metrics "github.com/armon/go-metrics" + "github.com/armon/go-metrics" "github.com/hashicorp/vault/helper/namespace" ) @@ -49,6 +49,25 @@ type Metrics interface { var _ Metrics = &ClusterMetricSink{} +// SinkWrapper implements `metricsutil.Metrics` using an instance of +// armon/go-metrics `MetricSink` as the underlying implementation. +type SinkWrapper struct { + metrics.MetricSink +} + +func (s SinkWrapper) AddDurationWithLabels(key []string, d time.Duration, labels []Label) { + val := float32(d) / float32(time.Millisecond) + s.MetricSink.AddSampleWithLabels(key, val, labels) +} + +func (s SinkWrapper) MeasureSinceWithLabels(key []string, start time.Time, labels []Label) { + elapsed := time.Now().Sub(start) + val := float32(elapsed) / float32(time.Millisecond) + s.MetricSink.AddSampleWithLabels(key, val, labels) +} + +var _ Metrics = SinkWrapper{} + // Convenience alias type Label = metrics.Label