Merge pull request #133978 from aramase/aramase/i/fix_kms_test_133945

kmsv2: run `TestKMSv2ProviderKeyIDStaleness` tests in parallel
This commit is contained in:
Kubernetes Prow Robot 2025-09-10 15:05:55 -07:00 committed by GitHub
commit d433db0782
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 53 additions and 18 deletions

View file

@ -395,7 +395,7 @@ func (h *kmsv2PluginProbe) rotateDEKOnKeyIDChange(ctx context.Context, statusKey
// allow writes indefinitely as long as there is no error
// allow writes for only up to kmsv2PluginWriteDEKSourceMaxTTL from now when there are errors
// we start the timer before we make the network call because kmsv2PluginWriteDEKSourceMaxTTL is meant to be the upper bound
expirationTimestamp := envelopekmsv2.NowFunc().Add(kmsv2PluginWriteDEKSourceMaxTTL)
expirationTimestamp := envelopekmsv2.GetNowFunc(h.name)().Add(kmsv2PluginWriteDEKSourceMaxTTL)
// dynamically check if we want to use KDF seed to derive DEKs or just a single DEK
// this gate can only change during tests, but the check is cheap enough to always make
@ -431,6 +431,7 @@ func (h *kmsv2PluginProbe) rotateDEKOnKeyIDChange(ctx context.Context, statusKey
UID: uid,
ExpirationTimestamp: expirationTimestamp,
CacheKey: cacheKey,
KMSProviderName: h.name,
})
// it should be logically impossible for the new state to be invalid but check just in case
@ -792,7 +793,9 @@ func kmsPrefixTransformer(ctx context.Context, config *apiserver.KMSConfiguratio
apiServerID: apiServerID,
}
// initialize state so that Load always works
probe.state.Store(&envelopekmsv2.State{})
probe.state.Store(&envelopekmsv2.State{
KMSProviderName: kmsName,
})
primeAndProbeKMSv2(ctx, probe, kmsName)
transformer := storagevalue.PrefixTransformer{

View file

@ -1853,10 +1853,8 @@ func TestComputeEncryptionConfigHash(t *testing.T) {
func Test_kmsv2PluginProbe_rotateDEKOnKeyIDChange(t *testing.T) {
defaultUseSeed := GetKDF("")
origNowFunc := envelopekmsv2.NowFunc
origNowFunc := envelopekmsv2.GetNowFunc("")
now := origNowFunc() // freeze time
t.Cleanup(func() { envelopekmsv2.NowFunc = origNowFunc })
envelopekmsv2.NowFunc = func() time.Time { return now }
klog.LogToStderr(false)
var level klog.Level
@ -2083,6 +2081,9 @@ func Test_kmsv2PluginProbe_rotateDEKOnKeyIDChange(t *testing.T) {
kmsName := fmt.Sprintf("panda-%d", i)
defer SetKDFForTests(kmsName, tt.useSeed)()
resetNowFunc := envelopekmsv2.SetNowFuncForTests(kmsName, func() time.Time { return now })
t.Cleanup(resetNowFunc)
var buf bytes.Buffer
klog.SetOutput(&buf)
@ -2092,6 +2093,7 @@ func Test_kmsv2PluginProbe_rotateDEKOnKeyIDChange(t *testing.T) {
name: kmsName,
service: tt.service,
}
tt.state.KMSProviderName = kmsName
h.state.Store(&tt.state)
err := h.rotateDEKOnKeyIDChange(ctx, tt.statusKeyID, "panda")
@ -2106,6 +2108,8 @@ func Test_kmsv2PluginProbe_rotateDEKOnKeyIDChange(t *testing.T) {
ignoredFields := sets.NewString("Transformer", "EncryptedObjectEncryptedDEKSource", "UID", "CacheKey")
gotState := *h.state.Load()
gotState.KMSProviderName = kmsName
tt.wantState.KMSProviderName = kmsName
if diff := cmp.Diff(tt.wantState, gotState,
cmp.FilterPath(func(path cmp.Path) bool { return ignoredFields.Has(path.String()) }, cmp.Ignore()),

View file

@ -24,6 +24,7 @@ import (
"crypto/sha256"
"fmt"
"sort"
"sync"
"time"
"unsafe"
@ -51,6 +52,29 @@ func init() {
metrics.RegisterMetrics()
}
// nowFuncPerKMS allows us to swap the NowFunc for KMS providers in tests
// Note: it cannot be set by an end user
var nowFuncPerKMS sync.Map // map[string]func() time.Time, KMS name -> NowFunc
// SetNowFuncForTests should only be called in tests to swap the NowFunc for KMS providers
// Caller must guarantee that all KMS providers have distinct names across all tests.
func SetNowFuncForTests(kmsName string, fn func() time.Time) func() {
if len(kmsName) == 0 { // guarantee that GetNowFunc("") returns the default value
panic("empty KMS name used in test")
}
nowFuncPerKMS.Store(kmsName, fn)
return func() { nowFuncPerKMS.Delete(kmsName) }
}
// GetNowFunc returns the time function for the given KMS provider name
func GetNowFunc(kmsName string) func() time.Time {
nowFunc, ok := nowFuncPerKMS.Load(kmsName)
if !ok {
return time.Now
}
return nowFunc.(func() time.Time)
}
const (
// KMSAPIVersionv2 is a version of the KMS API.
KMSAPIVersionv2 = "v2"
@ -81,9 +105,6 @@ const (
errKeyIDTooLongCode ErrCodeKeyID = "too_long"
)
// NowFunc is exported so tests can override it.
var NowFunc = time.Now
type StateFunc func() (State, error)
type ErrCodeKeyID string
@ -101,10 +122,14 @@ type State struct {
// CacheKey is the key used to cache the DEK/seed in envelopeTransformer.cache.
CacheKey []byte
// KMSProviderName is used to dynamically look up the time function for tests
KMSProviderName string
}
func (s *State) ValidateEncryptCapability() error {
if now := NowFunc(); now.After(s.ExpirationTimestamp) {
nowFunc := GetNowFunc(s.KMSProviderName)
if now := nowFunc(); now.After(s.ExpirationTimestamp) {
return fmt.Errorf("encryptedDEKSource with keyID hash %q expired at %s (current time is %s)",
GetHashIfNotEmpty(s.EncryptedObjectKeyID), s.ExpirationTimestamp.Format(time.RFC3339), now.Format(time.RFC3339))
}

View file

@ -145,9 +145,10 @@ func (g *gRPCService) Status(ctx context.Context) (*kmsservice.StatusResponse, e
func recordMetricsInterceptor(providerName string) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := NowFunc()
nowFunc := GetNowFunc(providerName)
start := nowFunc()
respErr := invoker(ctx, method, req, reply, cc, opts...)
elapsed := NowFunc().Sub(start)
elapsed := nowFunc().Sub(start)
metrics.RecordKMSOperationLatency(providerName, method, elapsed, respErr)
return respErr
}

View file

@ -405,13 +405,14 @@ resources:
// 7. when kms-plugin is down, no-op update for a pod should succeed and not result in RV change even once the DEK/seed is valid
func TestKMSv2ProviderKeyIDStaleness(t *testing.T) {
t.Parallel()
// testKMSv2ProviderKeyIDStaleness modifies global state (kmsv2.NowFunc) and thus the following two tests
// have to run sequentially. No other test is allowed to change kmsv2.NowFunc.
t.Run("regular gcm", func(t *testing.T) {
t.Parallel()
kmsName := "kms-provider-key-id-stale-false"
testKMSv2ProviderKeyIDStaleness(t, kmsName, encryptionconfig.SetKDFForTests(kmsName, false))
})
t.Run("extended nonce gcm", func(t *testing.T) {
t.Parallel()
testKMSv2ProviderKeyIDStaleness(t, "kms-provider-key-id-stale-true", func() {})
})
}
@ -605,9 +606,10 @@ resources:
}
// Invalidate the DEK by moving the current time forward
origNowFunc := kmsv2.NowFunc
t.Cleanup(func() { kmsv2.NowFunc = origNowFunc })
kmsv2.NowFunc = func() time.Time { return origNowFunc().Add(5 * time.Minute) }
origNowFunc := kmsv2.GetNowFunc(kmsName)
t.Cleanup(kmsv2.SetNowFuncForTests(kmsName, func() time.Time {
return origNowFunc().Add(5 * time.Minute)
}))
// 6. when kms-plugin is down, expect creation of new pod and encryption to fail because the DEK is invalid
_, err = test.createPod(testNamespace, dynamicClient)
@ -630,7 +632,7 @@ resources:
)
// fix plugin and wait for new writes to start working again
kmsv2.NowFunc = origNowFunc
t.Cleanup(kmsv2.SetNowFuncForTests(kmsName, origNowFunc))
pluginMock.ExitFailedState()
err = wait.Poll(time.Second, 3*time.Minute,
func() (bool, error) {
@ -1441,7 +1443,7 @@ resources:
t.Fatal(err)
}
if !proto.Equal(expectedDEKSourceHKDFSHA256XNonceAESGCMSeedObject, legacyDEKSourceHKDFSHA256XNonceAESGCMSeedObject) {
t.Errorf("kms v2 legacy encrypted object diff, want: %+v; got: %+v", expectedDEKSourceAESGCMKeyObject, legacyDEKSourceAESGCMKeyObject)
t.Errorf("kms v2 legacy encrypted object diff, want: %+v; got: %+v", expectedDEKSourceHKDFSHA256XNonceAESGCMSeedObject, legacyDEKSourceHKDFSHA256XNonceAESGCMSeedObject)
}
ctx := testContext(t)