diff --git a/pkg/kubelet/podcertificate/podcertificatemanager_test.go b/pkg/kubelet/podcertificate/podcertificatemanager_test.go index 331a2c4e8e7..ab3f1507873 100644 --- a/pkg/kubelet/podcertificate/podcertificatemanager_test.go +++ b/pkg/kubelet/podcertificate/podcertificatemanager_test.go @@ -21,7 +21,9 @@ import ( "crypto/x509" "encoding/pem" "fmt" + "math/rand" "slices" + "sync/atomic" "testing" "time" @@ -30,12 +32,15 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" certlistersv1beta1 "k8s.io/client-go/listers/certificates/v1beta1" corelistersv1 "k8s.io/client-go/listers/core/v1" + k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/test/utils/hermeticpodcertificatesigner" "k8s.io/kubernetes/test/utils/ktesting" @@ -293,9 +298,51 @@ func TestFullFlow(t *testing.T) { ctx, cancel := context.WithCancel(ktesting.Init(t)) defer cancel() - kc := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactoryWithOptions(kc, 0) clock := testclock.NewFakeClock(mustRFC3339(t, "2010-01-01T00:00:00Z")) + kc := fake.NewSimpleClientset() + + // Assign PCR name and creationTimeStamp + kc.Fake.PrependReactor("create", "podcertificaterequests", + func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(k8stesting.CreateAction).GetObject().(*certsv1beta1.PodCertificateRequest) + // Simulate server-side GenerateName behavior + if obj.Name == "" { + obj.Name = fmt.Sprintf("%s-pcr-%d", obj.Spec.PodName, rand.Int63n(1_000_000)) + } + obj.CreationTimestamp = metav1.NewTime(clock.Now()) + + return false, obj, nil // allow normal processing + }) + + // fake.Clientset suffers from a race condition related to informers: + // it does not implement resource version support in its Watch + // implementation and instead assumes that watches are set up + // before further changes are made. + // + // If a test waits for caches to be synced and then immediately + // adds an object, that new object will never be seen by event handlers + // if the race goes wrong and the Watch call hadn't completed yet + // + // To work around this, we count all watches and only proceed when all of them are in place + var numWatches atomic.Int32 + kc.Fake.PrependWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) { + // Recreate the default fake-client watch behaviour, but count watches + var opts metav1.ListOptions + if wa, ok := action.(k8stesting.WatchActionImpl); ok { + opts = wa.ListOptions + } + + gvr := action.GetResource() + ns := action.GetNamespace() + w, err := kc.Tracker().Watch(gvr, ns, opts) + if err != nil { + return false, nil, err + } + + numWatches.Add(1) + return true, w, nil + }) + informerFactory := informers.NewSharedInformerFactoryWithOptions(kc, 0) // // Configure and boot up a fake podcertificaterequest signing controller. @@ -309,7 +356,6 @@ func TestFullFlow(t *testing.T) { } pcrSigner := hermeticpodcertificatesigner.New(clock, signerName, caKeys, caCerts, kc) go pcrSigner.Run(ctx) - // // Configure and boot up enough Kubelet subsystems to run an IssuingManager. // @@ -336,8 +382,25 @@ func TestFullFlow(t *testing.T) { types.NodeName(node1.ObjectMeta.Name), clock, ) - + // Start informers informerFactory.Start(ctx.Done()) + + // wait until the informers' watches are actually registered, + const expectedWatches = 3 // Pods, Nodes, PodCertificateRequests + if err := wait.PollUntilContextTimeout( + ctx, + 100*time.Millisecond, + 5*time.Second, + true, + func(ctx context.Context) (bool, error) { + return numWatches.Load() >= expectedWatches, nil + }, + ); err != nil { + t.Fatalf("timed out waiting for informer watches, got %d (want >= %d): %v", + numWatches.Load(), expectedWatches, err) + } + + // Now it's safe to run the manager – informers are watching. go node1PodCertificateManager.Run(ctx) // @@ -367,6 +430,7 @@ func TestFullFlow(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: workloadNS.ObjectMeta.Name, Name: "workload", + UID: "test-workload-uid", }, Spec: corev1.PodSpec{ ServiceAccountName: workloadSA.ObjectMeta.Name, @@ -408,7 +472,6 @@ func TestFullFlow(t *testing.T) { if err != nil { t.Fatalf("Unexpected error creating workload pod: %v", err) } - // Because our fake podManager is based on an informer, we need to poll // until workloadPod is reflected in the informer. err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {