diff --git a/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go b/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go index 63ea4e2666e..478b03c9bdf 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go +++ b/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go @@ -17,11 +17,11 @@ limitations under the License. package webhook import ( - "context" "encoding/json" "errors" "fmt" "net" + "net/http" "net/url" "strconv" "strings" @@ -194,21 +194,19 @@ func (cm *ClientManager) hookClientConfig(cc ClientConfig) (*rest.Config, error) cfg.TLSClientConfig.ServerName = serverName } - delegateDialer := cfg.Dial - if delegateDialer == nil { - var d net.Dialer - delegateDialer = d.DialContext - } - cfg.Dial = func(ctx context.Context, network, addr string) (net.Conn, error) { - if addr == host { - u, err := cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port) - if err != nil { - return nil, err - } - addr = u.Host + // Use a custom roundtripper since http transport caches + // the connections by the URL. Host. The service resolver + // provides the actual endpoint address, if we use a custom + // dialer then the cached connection may not be closed. + cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return &resolvingRoundTripper{ + delegate: rt, + serviceResolver: cm.serviceResolver, + namespace: cc.Service.Namespace, + serviceName: cc.Service.Name, + port: port, } - return delegateDialer(ctx, network, addr) - } + }) return complete(cfg) } @@ -255,3 +253,31 @@ func isLocalHost(u *url.URL) bool { } return false } + +// resolvingRoundTripper is a roundtripper that resolves the endpoint address +// for the given service and updates the request URL to use the resolved endpoint address. +type resolvingRoundTripper struct { + delegate http.RoundTripper + serviceResolver ServiceResolver + namespace string + serviceName string + port int32 +} + +func (r *resolvingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + u, err := r.serviceResolver.ResolveEndpoint(r.namespace, r.serviceName, r.port) + if err != nil { + return nil, err + } + newReq := req.Clone(req.Context()) + // Preserve the original Host header + if len(newReq.Host) == 0 { + newReq.Host = req.URL.Host + } + newReq.URL.Host = u.Host + return r.delegate.RoundTrip(newReq) +} + +func (r *resolvingRoundTripper) WrappedRoundTripper() http.RoundTripper { + return r.delegate +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/webhook/client_test.go b/staging/src/k8s.io/apiserver/pkg/util/webhook/client_test.go index dd00f238dc5..a61947120fb 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/webhook/client_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/webhook/client_test.go @@ -17,10 +17,17 @@ limitations under the License. package webhook import ( + "context" + "encoding/pem" + "net/http" + "net/http/httptest" + "net/url" + "sync/atomic" "testing" "golang.org/x/net/http2" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" ) func TestWebhookClientConfig(t *testing.T) { @@ -89,3 +96,126 @@ func allowHTTP2(nextProtos []string) bool { // the transport explicitly set NextProtos and excluded http/2 return false } + +type fakeAuthInfoResolver struct{} + +func (f *fakeAuthInfoResolver) ClientConfigFor(server string) (*rest.Config, error) { + return &rest.Config{ + TLSClientConfig: rest.TLSClientConfig{ + ServerName: "example.com", + }, + }, nil +} + +func (f *fakeAuthInfoResolver) ClientConfigForService(serviceName, namespace string, port int) (*rest.Config, error) { + return &rest.Config{ + TLSClientConfig: rest.TLSClientConfig{ + ServerName: "example.com", + }, + }, nil +} + +// fakeDynamicServiceResolver returns the next endpoint in the list for each request. +type fakeDynamicServiceResolver struct { + endpoints []*url.URL + counter int32 +} + +func (f *fakeDynamicServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) { + val := atomic.AddInt32(&f.counter, 1) - 1 + if val >= int32(len(f.endpoints)) { + val = int32(len(f.endpoints)) - 1 + } + return f.endpoints[val], nil +} + +// TestWebhookClientIdleConnectionIPReuse tests that the webhook client follow the resolver +// endpoint instead of reusing the previous endpoint when there are IP address changes. +func TestWebhookClientIdleConnectionIPReuse(t *testing.T) { + var serverACalls int32 + serverA := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&serverACalls, 1) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ServerA")) + })) + defer serverA.Close() + + var serverBCalls int32 + serverB := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&serverBCalls, 1) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ServerB")) + })) + defer serverB.Close() + + urlA, err := url.Parse(serverA.URL) + if err != nil { + t.Fatal(err) + } + urlB, err := url.Parse(serverB.URL) + if err != nil { + t.Fatal(err) + } + + // Combine CAs from both test servers + var caBundle []byte + for _, cert := range serverA.TLS.Certificates[0].Certificate { + caBundle = append(caBundle, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})...) + } + for _, cert := range serverB.TLS.Certificates[0].Certificate { + caBundle = append(caBundle, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})...) + } + + resolver := &fakeDynamicServiceResolver{ + endpoints: []*url.URL{urlA, urlB}, + } + + cm, err := NewClientManager([]schema.GroupVersion{}) + if err != nil { + t.Fatal(err) + } + cm.SetAuthenticationInfoResolver(&fakeAuthInfoResolver{}) + cm.SetServiceResolver(resolver) + + cc := ClientConfig{ + Name: "test-webhook", + CABundle: caBundle, + Service: &ClientConfigService{ + Name: "test-service", + Namespace: "default", + Port: 443, + }, + } + + client, err := cm.HookClient(cc) + if err != nil { + t.Fatal(err) + } + + // Request 1: Resolves to Server A + req1 := client.Post().Body([]byte("test")) + res1, err := req1.DoRaw(context.Background()) + if err != nil { + t.Fatalf("First request failed: %v", err) + } + if string(res1) != "ServerA" { + t.Errorf("Expected Response ServerA, got %s", string(res1)) + } + + // Request 2: Resolves to Server B + req2 := client.Post().Body([]byte("test")) + res2, err := req2.DoRaw(context.Background()) + if err != nil { + t.Fatalf("Second request failed: %v", err) + } + if string(res2) != "ServerB" { + t.Errorf("Expected Response ServerB, got %s", string(res2)) + } + + if atomic.LoadInt32(&serverACalls) != 1 { + t.Errorf("Expected 1 call to Server A, got %d", serverACalls) + } + if atomic.LoadInt32(&serverBCalls) != 1 { + t.Errorf("Expected 1 call to Server B, got %d", serverBCalls) + } +} diff --git a/test/e2e/apimachinery/webhook.go b/test/e2e/apimachinery/webhook.go index ae3063ecbd8..d1edbd42d4d 100644 --- a/test/e2e/apimachinery/webhook.go +++ b/test/e2e/apimachinery/webhook.go @@ -392,6 +392,77 @@ var _ = SIGDescribe("AdmissionWebhook [Privileged:ClusterAdmin]", func() { slowWebhookCleanup(ctx) }) + /* + Release: v1.31 + Testname: Admission webhook, connection pool isolation on pod restart + Description: Verify that the admission webhook connection pool is isolated by resolved IP address. + When the backend pod behind the service is recreated and its IP address changes, subsequent admission + requests must successfully route to the new pod IP address without hitting the old connection pool. + */ + ginkgo.It("should isolate connection pool by resolved backend IP address on pod restart", func(ctx context.Context) { + client := f.ClientSet + + ginkgo.By("Registering a validating webhook") + registerWebhook(ctx, f, markersNamespaceName, f.UniqueName, certCtx, servicePort) + + // Get the active webhook pod details + ginkgo.By("Getting the current webhook pod details") + pods, err := client.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: "app=sample-webhook"}) + framework.ExpectNoError(err) + gomega.Expect(pods.Items).To(gomega.HaveLen(1)) + oldPod := pods.Items[0] + oldIP := oldPod.Status.PodIP + framework.Logf("Webhook currently running on pod %s with IP %s", oldPod.Name, oldIP) + + // Recreate/restart the webhook pod by rolling out a change to the deployment + ginkgo.By("Triggering a rolling update of the webhook deployment") + deployment, err := client.AppsV1().Deployments(f.Namespace.Name).Get(ctx, deploymentName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + // Update the deployment template labels or env to trigger a rolling update + if deployment.Spec.Template.Annotations == nil { + deployment.Spec.Template.Annotations = make(map[string]string) + } + deployment.Spec.Template.Annotations["restartedAt"] = time.Now().Format(time.RFC3339) + + deployment, err = client.AppsV1().Deployments(f.Namespace.Name).Update(ctx, deployment, metav1.UpdateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Wait for the deployment rolling update to complete") + err = e2edeployment.WaitForDeploymentComplete(client, deployment) + framework.ExpectNoError(err) + + // Get the new webhook pod details + ginkgo.By("Getting the new webhook pod details") + newPods, err := client.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: "app=sample-webhook"}) + framework.ExpectNoError(err) + gomega.Expect(newPods.Items).To(gomega.HaveLen(1)) + newPod := newPods.Items[0] + newIP := newPod.Status.PodIP + framework.Logf("Webhook now running on pod %s with IP %s", newPod.Name, newIP) + + // Make sure the IP actually changed (or at least the pod was recreated) + gomega.Expect(newPod.Name).ToNot(gomega.Equal(oldPod.Name), "expected webhook pod to be recreated") + + // Execute new admission request and verify it succeeds + ginkgo.By("Sending a new admission request to verify the webhook connection pool successfully routes to the new pod IP") + err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (bool, error) { + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(uuid.NewUUID()), + }, + } + _, err = client.CoreV1().ConfigMaps(f.Namespace.Name).Create(ctx, cm, metav1.CreateOptions{}) + if err == nil { + _ = client.CoreV1().ConfigMaps(f.Namespace.Name).Delete(ctx, cm.Name, metav1.DeleteOptions{}) + return true, nil + } + framework.Logf("Retrying admission request, last error: %v", err) + return false, nil + }) + framework.ExpectNoError(err, "admission request failed after webhook pod restart") + }) + /* Release: v1.16 Testname: Admission webhook, update validating webhook diff --git a/test/integration/apiserver/admissionwebhook/ip_reuse_test.go b/test/integration/apiserver/admissionwebhook/ip_reuse_test.go new file mode 100644 index 00000000000..fa455aa2813 --- /dev/null +++ b/test/integration/apiserver/admissionwebhook/ip_reuse_test.go @@ -0,0 +1,288 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admissionwebhook + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "io" + "net" + "net/http" + "net/url" + "sync" + "sync/atomic" + "testing" + "time" + + admissionv1 "k8s.io/api/admission/v1" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/kubernetes/cmd/kube-apiserver/app" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/test/integration/framework" + "k8s.io/utils/ptr" +) + +type dynamicServiceResolver struct { + lock sync.Mutex + targetURL string +} + +func (r *dynamicServiceResolver) SetTarget(url string) { + r.lock.Lock() + defer r.lock.Unlock() + r.targetURL = url +} + +func (r *dynamicServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) { + r.lock.Lock() + defer r.lock.Unlock() + return url.Parse(r.targetURL) +} + +// TestWebhookConnectionPoolIPReuse asserts that the API Server's webhook client +// isolates its HTTP connection pool by the resolved backend IP address. +// If a service's resolved endpoint changes to a new IP, requests must not +// be routed to the old connection in the pool, preventing IP-reuse security bypasses. +func TestWebhookConnectionPoolIPReuse(t *testing.T) { + roots := x509.NewCertPool() + if !roots.AppendCertsFromPEM(localhostCert) { + t.Fatal("Failed to append Cert from PEM") + } + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Fatalf("Failed to build cert: %v", err) + } + + // Start Server A + var serverACalls int32 + serverAListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + serverA := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&serverACalls, 1) + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var review struct { + Request struct { + UID string `json:"uid"` + } `json:"request"` + } + if err := json.Unmarshal(body, &review); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "admission.k8s.io/v1", + Kind: "AdmissionReview", + }, + Response: &admissionv1.AdmissionResponse{ + UID: types.UID(review.Request.UID), + Allowed: true, + }, + }) + }), + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + }, + } + go func() { + _ = serverA.ServeTLS(serverAListener, "", "") + }() + defer func() { _ = serverA.Close() }() + + // Start Server B + var serverBCalls int32 + serverBListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + serverB := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&serverBCalls, 1) + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var review struct { + Request struct { + UID string `json:"uid"` + } `json:"request"` + } + if err := json.Unmarshal(body, &review); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "admission.k8s.io/v1", + Kind: "AdmissionReview", + }, + Response: &admissionv1.AdmissionResponse{ + UID: types.UID(review.Request.UID), + Allowed: true, + }, + }) + }), + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + }, + } + go func() { + _ = serverB.ServeTLS(serverBListener, "", "") + }() + defer func() { _ = serverB.Close() }() + + urlA := "https://" + serverAListener.Addr().String() + urlB := "https://" + serverBListener.Addr().String() + + // Set up dynamic service resolver starting with Server A + resolver := &dynamicServiceResolver{targetURL: urlA} + t.Cleanup(app.SetServiceResolverForTests(resolver)) + + // Start Test API Server + s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{ + "--disable-admission-plugins=ServiceAccount", + }, framework.SharedEtcd()) + defer s.TearDownFn() + + clientConfig := rest.CopyConfig(s.ClientConfig) + client, err := clientset.NewForConfig(clientConfig) + if err != nil { + t.Fatalf("Unexpected error creating client: %v", err) + } + + // Register Validating Webhook Configuration using a ServiceReference + fail := admissionregistrationv1.Fail + sideEffectsNone := admissionregistrationv1.SideEffectClassNone + webhookConfig, err := client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Create(context.TODO(), &admissionregistrationv1.ValidatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: "ip-reuse.integration.test"}, + Webhooks: []admissionregistrationv1.ValidatingWebhook{{ + Name: "ip-reuse.integration.test", + ClientConfig: admissionregistrationv1.WebhookClientConfig{ + CABundle: localhostCert, + Service: &admissionregistrationv1.ServiceReference{ + Namespace: "test", + Name: "webhook", + Port: ptr.To[int32](443), + }, + }, + Rules: []admissionregistrationv1.RuleWithOperations{{ + Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create}, + Rule: admissionregistrationv1.Rule{ + APIGroups: []string{""}, + APIVersions: []string{"v1"}, + Resources: []string{"pods"}, + }, + }}, + FailurePolicy: &fail, + SideEffects: &sideEffectsNone, + AdmissionReviewVersions: []string{"v1"}, + }}, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create validating webhook config: %v", err) + } + defer func() { + _ = client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Delete(context.TODO(), webhookConfig.Name, metav1.DeleteOptions{}) + }() + + // Request 1: Resolves to Server A. Trigger hook. + podFixture := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod-1", Namespace: "default"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "nginx"}}, + }, + } + + // Wait for webhook to become ready/active + err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + _, err = client.CoreV1().Pods("default").Create(ctx, podFixture, metav1.CreateOptions{}) + if err == nil { + _ = client.CoreV1().Pods("default").Delete(ctx, podFixture.Name, metav1.DeleteOptions{}) + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("Webhook failed to become active: %v", err) + } + + // Reset call counts after warm-up + atomic.StoreInt32(&serverACalls, 0) + atomic.StoreInt32(&serverBCalls, 0) + + // Execute Request 1 (Must go to Server A) + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-a", Namespace: "default"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "nginx"}}, + }, + } + _, err = client.CoreV1().Pods("default").Create(context.TODO(), pod1, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("First pod creation failed: %v", err) + } + _ = client.CoreV1().Pods("default").Delete(context.TODO(), pod1.Name, metav1.DeleteOptions{}) + + if atomic.LoadInt32(&serverACalls) != 1 { + t.Fatalf("Expected 1 call to Server A, got %d", serverACalls) + } + + // Update Endpoint Resolution to Server B (simulating Pod upgrade/IP change) + resolver.SetTarget(urlB) + + // Execute Request 2 (Must go to Server B) + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-b", Namespace: "default"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "nginx"}}, + }, + } + _, err = client.CoreV1().Pods("default").Create(context.TODO(), pod2, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Second pod creation failed: %v", err) + } + _ = client.CoreV1().Pods("default").Delete(context.TODO(), pod2.Name, metav1.DeleteOptions{}) + + // Assertions + // If the connection pool was NOT isolated by IP, the second request would reuse + // the idle connection to Server A. + // With the fix, it must correctly connect to Server B. + if atomic.LoadInt32(&serverBCalls) != 1 { + t.Errorf("Expected 1 call to Server B (New Pod), got %d. The request was incorrectly routed/stuck!", serverBCalls) + } + if atomic.LoadInt32(&serverACalls) != 1 { + t.Errorf("Expected exactly 1 call to Server A (Old Pod), got %d. Stale connection reuse detected!", serverACalls) + } +} diff --git a/test/integration/apiserver/admissionwebhook/load_balance_test.go b/test/integration/apiserver/admissionwebhook/load_balance_test.go index 035d6fa8b4b..512ccb619fc 100644 --- a/test/integration/apiserver/admissionwebhook/load_balance_test.go +++ b/test/integration/apiserver/admissionwebhook/load_balance_test.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/cmd/kube-apiserver/app" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/utils/ptr" ) const ( @@ -359,3 +360,209 @@ func (c *connectionTrackingListener) Close() error { func (c *connectionTrackingListener) Addr() net.Addr { return c.delegate.Addr() } + +// TestWebhookLoadBalanceAcrossEndpoints ensures that the webhook client distributes sequential requests +// across different endpoint IPs behind a service when using a ServiceReference. +// Prior to the host-rewrite pool isolation fix, all sequential requests would reuse the first established +// TCP connection from the single shared pool, completely starving other endpoints. +func TestWebhookLoadBalanceAcrossEndpoints(t *testing.T) { + roots := x509.NewCertPool() + if !roots.AppendCertsFromPEM(localhostCert) { + t.Fatal("Failed to append Cert from PEM") + } + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Fatalf("Failed to build cert with error: %+v", err) + } + + // 1. Start Server A + var serverACalls int64 + serverAListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + serverA := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt64(&serverACalls, 1) + defer func() { _ = r.Body.Close() }() + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var review struct { + Request struct { + UID string `json:"uid"` + } `json:"request"` + } + if err := json.Unmarshal(body, &review); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(fmt.Sprintf(`{"apiVersion":"admission.k8s.io/v1","kind":"AdmissionReview","response":{"uid":"%s","allowed":true}}`, review.Request.UID))) + }), + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + }, + } + go func() { + _ = serverA.ServeTLS(serverAListener, "", "") + }() + defer func() { _ = serverA.Close() }() + + // 2. Start Server B + var serverBCalls int64 + serverBListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + serverB := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt64(&serverBCalls, 1) + defer func() { _ = r.Body.Close() }() + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var review struct { + Request struct { + UID string `json:"uid"` + } `json:"request"` + } + if err := json.Unmarshal(body, &review); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(fmt.Sprintf(`{"apiVersion":"admission.k8s.io/v1","kind":"AdmissionReview","response":{"uid":"%s","allowed":true}}`, review.Request.UID))) + }), + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + }, + } + go func() { + _ = serverB.ServeTLS(serverBListener, "", "") + }() + defer func() { _ = serverB.Close() }() + + // Parse URLs to extract target hosts (IP:Port) + urlA, _ := url.Parse("https://" + serverAListener.Addr().String()) + urlB, _ := url.Parse("https://" + serverBListener.Addr().String()) + + // Set up a round-robin service resolver that returns alternate endpoints for each resolve call + var resolveIndex int64 + resolver := funcServiceResolver(func(namespace, name string, port int32) (*url.URL, error) { + idx := atomic.AddInt64(&resolveIndex, 1) + if idx%2 == 0 { + return urlA, nil + } + return urlB, nil + }) + + t.Cleanup(app.SetServiceResolverForTests(resolver)) + + // 3. Start Test API Server + s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{ + "--disable-admission-plugins=ServiceAccount", + }, framework.SharedEtcd()) + defer s.TearDownFn() + + clientConfig := rest.CopyConfig(s.ClientConfig) + client, err := clientset.NewForConfig(clientConfig) + if err != nil { + t.Fatalf("Unexpected error creating client: %v", err) + } + + // 4. Register a Validating Webhook with ServiceReference + fail := admissionregistrationv1.Fail + sideEffectsNone := admissionregistrationv1.SideEffectClassNone + webhookConfig, err := client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Create(context.TODO(), &admissionregistrationv1.ValidatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: "load-balance-endpoints.integration.test"}, + Webhooks: []admissionregistrationv1.ValidatingWebhook{{ + Name: "load-balance-endpoints.integration.test", + ClientConfig: admissionregistrationv1.WebhookClientConfig{ + CABundle: localhostCert, + Service: &admissionregistrationv1.ServiceReference{ + Namespace: "test", + Name: "webhook", + Port: ptr.To[int32](443), + }, + }, + Rules: []admissionregistrationv1.RuleWithOperations{{ + Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create}, + Rule: admissionregistrationv1.Rule{ + APIGroups: []string{""}, + APIVersions: []string{"v1"}, + Resources: []string{"pods"}, + }, + }}, + FailurePolicy: &fail, + SideEffects: &sideEffectsNone, + AdmissionReviewVersions: []string{"v1"}, + }}, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create validating webhook config: %v", err) + } + defer func() { + _ = client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Delete(context.TODO(), webhookConfig.Name, metav1.DeleteOptions{}) + }() + + // 5. Warm up the webhook until it is ready/active + podFixture := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "warmup-pod", Namespace: "default"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "nginx"}}, + }, + } + err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + _, err = client.CoreV1().Pods("default").Create(ctx, podFixture, metav1.CreateOptions{}) + if err == nil { + _ = client.CoreV1().Pods("default").Delete(ctx, podFixture.Name, metav1.DeleteOptions{}) + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("Webhook failed to become active: %v", err) + } + + // Reset counts + atomic.StoreInt64(&serverACalls, 0) + atomic.StoreInt64(&serverBCalls, 0) + + // 6. Send 100 sequential requests to trigger validating webhook + for i := range 100 { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-pod-%d", i), Namespace: "default"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "nginx"}}, + }, + } + _, err = client.CoreV1().Pods("default").Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Request %d failed: %v", i, err) + } + _ = client.CoreV1().Pods("default").Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + } + + // 7. Assertions + callsA := atomic.LoadInt64(&serverACalls) + callsB := atomic.LoadInt64(&serverBCalls) + + t.Logf("Sequential requests distribution: ServerA = %d calls, ServerB = %d calls", callsA, callsB) + + // Both servers MUST receive a balanced portion of calls under the pool isolation fix. + // We assert a minimum 70-30 distribution split to verify dynamic load-balancing. + if callsA < 30 || callsB < 30 { + t.Errorf("Expected balanced load-balancing across both servers (minimum 70-30 split), but got ServerA = %d calls, ServerB = %d calls. Stale connection pool reuse or starvation detected!", callsA, callsB) + } +} + +type funcServiceResolver func(namespace, name string, port int32) (*url.URL, error) + +func (f funcServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) { + return f(namespace, name, port) +}