diff --git a/changelog/_9165.txt b/changelog/_9165.txt new file mode 100644 index 0000000000..53548c31f4 --- /dev/null +++ b/changelog/_9165.txt @@ -0,0 +1,3 @@ +```release-note:improvement +proxy/cache (enterprise): Vault Proxy will now use vault_index on events to be able to update cached static secrets from performance secondaries without needing to be forwarded. This will take precedence over attempting to forward the request to the primary. +``` diff --git a/command/agentproxyshared/cache/static_secret_cache_updater.go b/command/agentproxyshared/cache/static_secret_cache_updater.go index 63f5507051..e60cc5be4e 100644 --- a/command/agentproxyshared/cache/static_secret_cache_updater.go +++ b/command/agentproxyshared/cache/static_secret_cache_updater.go @@ -30,6 +30,7 @@ import ( ) // Example write event (this does not contain all possible fields): +// Note that vault_index was added in 1.21. //{ // "id": "a3be9fb1-b514-519f-5b25-b6f144a8c1ce", // "source": "https://vaultproject.io/", @@ -43,6 +44,7 @@ import ( // "data_path": "secret/data/foo", // "modified": "true", // "oldest_version": "0", +// "vault_index":"djE6N2YzYTQ0NzAtN2UxMi03ODA3LTUxYmQtZmJiMmI5YzJhYjUxOjY4OjY4OjNlODcwOTdkNTc5YTgwNjVlNjNkMjUxMGYzYjcyNDExMmUwMmJlNjVmZDg2MjNlNzA0ZWZiYTg0NzM4Njk3N2U=" // "operation": "data-write", // "path": "secret/data/foo" // } @@ -72,6 +74,7 @@ import ( // "current_version": "3", // "destroyed_versions": "[2,3]", // "modified": "true", +// "vault_index":"djE6N2YzYTQ0NzAtN2UxMi03ODA3LTUxYmQtZmJiMmI5YzJhYjUxOjY4OjY4OjNlODcwOTdkNTc5YTgwNjVlNjNkMjUxMGYzYjcyNDExMmUwMmJlNjVmZDg2MjNlNzA0ZWZiYTg0NzM4Njk3N2U=" // "oldest_version": "0", // "operation": "destroy", // "path": "secret-v2/destroy/my-secret" @@ -102,6 +105,9 @@ type StaticSecretCacheUpdater struct { // allow_forwarding_via_header is disabled on the cluster we're talking to. // If we get an error back saying that it's disabled, we'll set this to true // and never try to forward again. + // This is only ever used on clusters with version below 1.21 (whose events do + // not have a vault_index), as we will prefer using client controlled consistency + // when possible. allowForwardingViaHeaderDisabled bool } @@ -225,7 +231,8 @@ func (updater *StaticSecretCacheUpdater) streamStaticSecretEvents(ctx context.Co // to update the secret will 404. This is consistent with other behaviour. For Proxy, this means // the secret may be evicted. That's okay. - err = updater.updateStaticSecret(ctx, path) + vaultIndex, _ := metadata["vault_index"].(string) + err = updater.updateStaticSecret(ctx, path, vaultIndex) if err != nil { // While we are kind of 'missing' an event this way, re-calling this function will // result in the secret remaining up to date. @@ -351,7 +358,7 @@ func (updater *StaticSecretCacheUpdater) preEventStreamUpdate(ctx context.Contex if index.Type != cacheboltdb.StaticSecretType { continue } - err = updater.updateStaticSecret(ctx, index.RequestPath) + err = updater.updateStaticSecret(ctx, index.RequestPath, "") if err != nil { errs = multierror.Append(errs, err) } @@ -401,7 +408,9 @@ func (updater *StaticSecretCacheUpdater) handleDeleteDestroyVersions(path string // updateStaticSecret checks for updates for a static secret on the path given, // and updates the cache if appropriate. For KVv2 secrets, we will also update // the version at index.Versions[currentVersion] with the same data. -func (updater *StaticSecretCacheUpdater) updateStaticSecret(ctx context.Context, path string) error { +// if requiredVaultIndex is provided, we will use api.RequireState to +// require that state in the request. If it isn't provided, we won't. +func (updater *StaticSecretCacheUpdater) updateStaticSecret(ctx context.Context, path string, requiredVaultIndex string) error { // We clone the client, as we won't be using the same token. client, err := updater.client.Clone() if err != nil { @@ -434,13 +443,11 @@ func (updater *StaticSecretCacheUpdater) updateStaticSecret(ctx context.Context, } request.Headers.Set("User-Agent", useragent.ProxyString()) - var resp *api.Response - var tokensToRemove []string - var successfulAttempt bool - for _, token := range maps.Keys(index.Tokens) { - client.SetToken(token) - request.Headers.Set(api.AuthHeaderName, token) - + // requiredVaultIndex will be present on any modified event from a Vault 1.21+ cluster. + if requiredVaultIndex != "" { + client = client.WithRequestCallbacks(api.RequireState(requiredVaultIndex)) + } else { + // Pre-1.21 event system fallback behaviour. Try and forward the request instead. if !updater.allowForwardingViaHeaderDisabled { // Set this to always forward to active, since events could come before // replication, and if we're connected to the standby, then we will be @@ -453,9 +460,26 @@ func (updater *StaticSecretCacheUpdater) updateStaticSecret(ctx context.Context, // we will never set this header again. request.Headers.Set(api.HeaderForward, "active-node") } + } + + minRetries := 10 + if client.MaxRetries() < minRetries { + // If for whatever reason we have less retries configured than 10, + // set the retries to 10. This is important for managing the retries for + // 412 (consistency) errors when retrying on performance secondaries. + client.SetMaxRetries(minRetries) + } + + var resp *api.Response + var tokensToRemove []string + var successfulAttempt bool + for _, token := range maps.Keys(index.Tokens) { + client.SetToken(token) + request.Headers.Set(api.AuthHeaderName, token) resp, err = client.RawRequestWithContext(ctx, request) - if err != nil { + // 1.21 fallback behaviour. Validate if we should keep trying to forward the event. + if requiredVaultIndex == "" && err != nil { if strings.Contains(err.Error(), "forwarding via header X-Vault-Forward disabled") { updater.logger.Info("allow_forwarding_via_header disabled, re-attempting update and no longer attempting to forward") updater.allowForwardingViaHeaderDisabled = true diff --git a/command/agentproxyshared/cache/static_secret_cache_updater_test.go b/command/agentproxyshared/cache/static_secret_cache_updater_test.go index 4a40b2546e..3d63f8b9ef 100644 --- a/command/agentproxyshared/cache/static_secret_cache_updater_test.go +++ b/command/agentproxyshared/cache/static_secret_cache_updater_test.go @@ -4,8 +4,11 @@ package cache import ( + "bufio" + "bytes" "context" "fmt" + "net/http" "os" "sync" syncatomic "sync/atomic" @@ -600,7 +603,7 @@ func TestUpdateStaticSecret(t *testing.T) { require.NoError(t, err) // attempt the update - err = updater.updateStaticSecret(context.Background(), path) + err = updater.updateStaticSecret(context.Background(), path, "") require.NoError(t, err) newIndex, err := leaseCache.db.Get(cachememdb.IndexNameID, indexId) @@ -611,6 +614,14 @@ func TestUpdateStaticSecret(t *testing.T) { require.Equal(t, index.RequestPath, newIndex.RequestPath) require.Equal(t, index.Tokens, newIndex.Tokens) require.Len(t, newIndex.Versions, 0) + + reader := bufio.NewReader(bytes.NewReader(newIndex.Response)) + resp, err := http.ReadResponse(reader, nil) + require.NoError(t, err) + + secret, err := api.ParseSecret(resp.Body) + require.NoError(t, err) + require.Equal(t, secretData, secret.Data) } // TestUpdateStaticSecret_KVv2 tests that updateStaticSecret works as expected, reaching out @@ -662,7 +673,7 @@ func TestUpdateStaticSecret_KVv2(t *testing.T) { require.NoError(t, err) // attempt the update - err = updater.updateStaticSecret(context.Background(), path) + err = updater.updateStaticSecret(context.Background(), path, "") require.NoError(t, err) newIndex, err := leaseCache.db.Get(cachememdb.IndexNameID, indexId) @@ -677,6 +688,16 @@ func TestUpdateStaticSecret_KVv2(t *testing.T) { require.Len(t, newIndex.Versions, 1) require.NotNil(t, newIndex.Versions[1]) require.Equal(t, newIndex.Versions[1], newIndex.Response) + + reader := bufio.NewReader(bytes.NewReader(newIndex.Response)) + resp, err := http.ReadResponse(reader, nil) + require.NoError(t, err) + + secret, err := api.ParseSecret(resp.Body) + require.NoError(t, err) + data, ok := secret.Data["data"] + require.True(t, ok) + require.Equal(t, secretData, data) } // TestUpdateStaticSecret_EvictsIfInvalidTokens tests that updateStaticSecret will @@ -717,7 +738,7 @@ func TestUpdateStaticSecret_EvictsIfInvalidTokens(t *testing.T) { require.NoError(t, err) // attempt the update - err = updater.updateStaticSecret(context.Background(), path) + err = updater.updateStaticSecret(context.Background(), path, "") require.NoError(t, err) newIndex, err := leaseCache.db.Get(cachememdb.IndexNameID, indexId) @@ -740,7 +761,7 @@ func TestUpdateStaticSecret_HandlesNonCachedPaths(t *testing.T) { path := "secret/foo" // Attempt the update - err := updater.updateStaticSecret(context.Background(), path) + err := updater.updateStaticSecret(context.Background(), path, "") require.NoError(t, err) require.Nil(t, err) }