From ede2fddcfd44db4a50f46b3bc7ca719e55a6408d Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Thu, 21 May 2026 21:30:25 +0000 Subject: [PATCH] webhook use resolved endpoint IP instead of cached Previously, the webhook transport was switched from HTTP/2 to HTTP/1.1 to work around HTTP/2's single-connection multiplexing, which prevented concurrent requests from load-balancing across multiple backend pods. However, under HTTP/1.1, connections are kept alive and cached as idle in the transport's pool. Because Go's http.Transport keys its connection cache by the request's URL Host (in this case the service name) and we overrode the DialContext to perform dynamic endpoint resolution, when a new request is sent, if there is an idle connection in the pool matching the service hostname, the connection is reused and the dialer is skipped. --- pkg/features/kube_features.go | 6 + .../apiserver/pkg/features/kube_features.go | 12 + .../apiserver/pkg/util/webhook/client.go | 78 ++++- .../apiserver/pkg/util/webhook/client_test.go | 162 ++++++++++ .../reference/feature_list.md | 1 + .../reference/versioned_feature_list.yaml | 6 + test/e2e/apimachinery/webhook.go | 71 +++++ .../admissionwebhook/ip_reuse_test.go | 288 ++++++++++++++++++ .../admissionwebhook/load_balance_test.go | 226 ++++++++++++++ 9 files changed, 837 insertions(+), 13 deletions(-) create mode 100644 test/integration/apiserver/admissionwebhook/ip_reuse_test.go diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index a21bf20a8aa..6121221f239 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -2204,6 +2204,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta}, }, + genericfeatures.WebhookRoundTripLoadBalancing: { + {Version: version.MustParse("1.37"), Default: true, PreRelease: featuregate.Beta}, + }, + kcmfeatures.CloudControllerManagerWatchBasedRoutesReconciliation: { {Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha}, }, @@ -2641,6 +2645,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature genericfeatures.WatchList: {}, + genericfeatures.WebhookRoundTripLoadBalancing: {}, + kcmfeatures.CloudControllerManagerWatchBasedRoutesReconciliation: {}, kcmfeatures.CloudControllerManagerWebhook: {}, diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index a2f3f8a53e8..c3a362a829d 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -271,6 +271,14 @@ const ( // // Allow the API server to stream individual items instead of chunking WatchList featuregate.Feature = "WatchList" + + // owner: @aojea + // beta: v1.37 + // + // Enables using a custom resolving RoundTripper to load-balance admission + // webhook requests across service endpoints instead of caching connections + // by service name. + WebhookRoundTripLoadBalancing featuregate.Feature = "WebhookRoundTripLoadBalancing" ) func init() { @@ -442,4 +450,8 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Beta}, {Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta}, }, + + WebhookRoundTripLoadBalancing: { + {Version: version.MustParse("1.37"), Default: true, PreRelease: featuregate.Beta}, + }, } diff --git a/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go b/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go index 63ea4e2666e..078e4377cb9 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go +++ b/staging/src/k8s.io/apiserver/pkg/util/webhook/client.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "net" + "net/http" "net/url" "strconv" "strings" @@ -30,6 +31,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/x509metrics" "k8s.io/client-go/rest" "k8s.io/utils/lru" @@ -194,20 +197,36 @@ func (cm *ClientManager) hookClientConfig(cc ClientConfig) (*rest.Config, error) cfg.TLSClientConfig.ServerName = serverName } - delegateDialer := cfg.Dial - if delegateDialer == nil { - var d net.Dialer - delegateDialer = d.DialContext - } - cfg.Dial = func(ctx context.Context, network, addr string) (net.Conn, error) { - if addr == host { - u, err := cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port) - if err != nil { - return nil, err - } - addr = u.Host + if !utilfeature.DefaultFeatureGate.Enabled(features.WebhookRoundTripLoadBalancing) { + delegateDialer := cfg.Dial + if delegateDialer == nil { + var d net.Dialer + delegateDialer = d.DialContext } - return delegateDialer(ctx, network, addr) + cfg.Dial = func(ctx context.Context, network, addr string) (net.Conn, error) { + if addr == host { + u, err := cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port) + if err != nil { + return nil, err + } + addr = u.Host + } + return delegateDialer(ctx, network, addr) + } + } else { + // Use a custom roundtripper since http transport caches + // the connections by the URL. Host. The service resolver + // provides the actual endpoint address, if we use a custom + // dialer then the cached connection may not be closed. + cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return &resolvingRoundTripper{ + delegate: rt, + serviceResolver: cm.serviceResolver, + namespace: cc.Service.Namespace, + serviceName: cc.Service.Name, + port: port, + } + }) } return complete(cfg) @@ -255,3 +274,36 @@ func isLocalHost(u *url.URL) bool { } return false } + +// resolvingRoundTripper is a roundtripper that resolves the endpoint address +// for the given service and updates the request URL to use the resolved endpoint address. +type resolvingRoundTripper struct { + delegate http.RoundTripper + serviceResolver ServiceResolver + namespace string + serviceName string + port int32 +} + +func (r *resolvingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + serverName := r.serviceName + "." + r.namespace + ".svc" + host := net.JoinHostPort(serverName, strconv.Itoa(int(r.port))) + if req.URL.Host != host { + return r.delegate.RoundTrip(req) + } + u, err := r.serviceResolver.ResolveEndpoint(r.namespace, r.serviceName, r.port) + if err != nil { + return nil, err + } + newReq := req.Clone(req.Context()) + // Preserve the original Host header + if len(newReq.Host) == 0 { + newReq.Host = req.URL.Host + } + newReq.URL.Host = u.Host + return r.delegate.RoundTrip(newReq) +} + +func (r *resolvingRoundTripper) WrappedRoundTripper() http.RoundTripper { + return r.delegate +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/webhook/client_test.go b/staging/src/k8s.io/apiserver/pkg/util/webhook/client_test.go index dd00f238dc5..26e9c1fce68 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/webhook/client_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/webhook/client_test.go @@ -17,10 +17,20 @@ limitations under the License. package webhook import ( + "context" + "encoding/pem" + "net/http" + "net/http/httptest" + "net/url" + "sync/atomic" "testing" "golang.org/x/net/http2" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" ) func TestWebhookClientConfig(t *testing.T) { @@ -89,3 +99,155 @@ func allowHTTP2(nextProtos []string) bool { // the transport explicitly set NextProtos and excluded http/2 return false } + +type fakeAuthInfoResolver struct{} + +func (f *fakeAuthInfoResolver) ClientConfigFor(server string) (*rest.Config, error) { + return &rest.Config{ + TLSClientConfig: rest.TLSClientConfig{ + ServerName: "example.com", + }, + }, nil +} + +func (f *fakeAuthInfoResolver) ClientConfigForService(serviceName, namespace string, port int) (*rest.Config, error) { + return &rest.Config{ + TLSClientConfig: rest.TLSClientConfig{ + ServerName: "example.com", + }, + }, nil +} + +// fakeDynamicServiceResolver returns the next endpoint in the list for each request. +type fakeDynamicServiceResolver struct { + endpoints []*url.URL + counter int32 +} + +func (f *fakeDynamicServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) { + val := atomic.AddInt32(&f.counter, 1) - 1 + if val >= int32(len(f.endpoints)) { + val = int32(len(f.endpoints)) - 1 + } + return f.endpoints[val], nil +} + +// TestWebhookClientIdleConnectionIPReuse tests that the webhook client follow the resolver +// endpoint instead of reusing the previous endpoint when there are IP address changes. +func TestWebhookClientIdleConnectionIPReuse(t *testing.T) { + tests := []struct { + name string + enableFeatureGate bool + expectedServerACalls int32 + expectedServerBCalls int32 + expectedSecondResponse string + }{ + { + name: "feature gate enabled - round-trip load balancing routes to Server B", + enableFeatureGate: true, + expectedServerACalls: 1, + expectedServerBCalls: 1, + expectedSecondResponse: "ServerB", + }, + { + name: "feature gate disabled - dialer resolution caches to Server A", + enableFeatureGate: false, + expectedServerACalls: 2, + expectedServerBCalls: 0, + expectedSecondResponse: "ServerA", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WebhookRoundTripLoadBalancing, tc.enableFeatureGate) + + var serverACalls int32 + serverA := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&serverACalls, 1) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ServerA")) + })) + defer serverA.Close() + + var serverBCalls int32 + serverB := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&serverBCalls, 1) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ServerB")) + })) + defer serverB.Close() + + urlA, err := url.Parse(serverA.URL) + if err != nil { + t.Fatal(err) + } + urlB, err := url.Parse(serverB.URL) + if err != nil { + t.Fatal(err) + } + + // Combine CAs from both test servers + var caBundle []byte + for _, cert := range serverA.TLS.Certificates[0].Certificate { + caBundle = append(caBundle, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})...) + } + for _, cert := range serverB.TLS.Certificates[0].Certificate { + caBundle = append(caBundle, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})...) + } + + resolver := &fakeDynamicServiceResolver{ + endpoints: []*url.URL{urlA, urlB}, + } + + cm, err := NewClientManager([]schema.GroupVersion{}) + if err != nil { + t.Fatal(err) + } + cm.SetAuthenticationInfoResolver(&fakeAuthInfoResolver{}) + cm.SetServiceResolver(resolver) + + cc := ClientConfig{ + Name: "test-webhook", + CABundle: caBundle, + Service: &ClientConfigService{ + Name: "test-service", + Namespace: "default", + Port: 443, + }, + } + + client, err := cm.HookClient(cc) + if err != nil { + t.Fatal(err) + } + + // Request 1: Resolves to Server A + req1 := client.Post().Body([]byte("test")) + res1, err := req1.DoRaw(context.Background()) + if err != nil { + t.Fatalf("First request failed: %v", err) + } + if string(res1) != "ServerA" { + t.Errorf("Expected Response ServerA, got %s", string(res1)) + } + + // Request 2: Resolves to Server B if feature gate enabled, Server A if disabled + req2 := client.Post().Body([]byte("test")) + res2, err := req2.DoRaw(context.Background()) + if err != nil { + t.Fatalf("Second request failed: %v", err) + } + if string(res2) != tc.expectedSecondResponse { + t.Errorf("Expected Response %s, got %s", tc.expectedSecondResponse, string(res2)) + } + + if callsA := atomic.LoadInt32(&serverACalls); callsA != tc.expectedServerACalls { + t.Errorf("Expected %d calls to Server A, got %d", tc.expectedServerACalls, callsA) + } + if callsB := atomic.LoadInt32(&serverBCalls); callsB != tc.expectedServerBCalls { + t.Errorf("Expected %d calls to Server B, got %d", tc.expectedServerBCalls, callsB) + } + }) + } +} diff --git a/test/compatibility_lifecycle/reference/feature_list.md b/test/compatibility_lifecycle/reference/feature_list.md index 536ef86a909..43f3f9f4f76 100644 --- a/test/compatibility_lifecycle/reference/feature_list.md +++ b/test/compatibility_lifecycle/reference/feature_list.md @@ -217,6 +217,7 @@ | WatchCacheInitializationPostStartHook | :ballot_box_with_check: 1.36+ | | | 1.31– | | | | [code](https://cs.k8s.io/?q=%5CbWatchCacheInitializationPostStartHook%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWatchCacheInitializationPostStartHook%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | WatchList | :ballot_box_with_check: 1.32+ | | 1.27–1.31 | 1.32– | | | | [code](https://cs.k8s.io/?q=%5CbWatchList%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWatchList%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | WatchListClient | :ballot_box_with_check: 1.35+ | | | 1.30– | | | | [code](https://cs.k8s.io/?q=%5CbWatchListClient%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWatchListClient%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | +| WebhookRoundTripLoadBalancing | :ballot_box_with_check: 1.37+ | | | 1.37– | | | | [code](https://cs.k8s.io/?q=%5CbWebhookRoundTripLoadBalancing%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWebhookRoundTripLoadBalancing%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | WinDSR | :ballot_box_with_check: 1.33+ | :closed_lock_with_key: 1.34+ | 1.14–1.32 | 1.33 | 1.34– | | | [code](https://cs.k8s.io/?q=%5CbWinDSR%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWinDSR%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | WinOverlay | :ballot_box_with_check: 1.20+ | :closed_lock_with_key: 1.34+ | 1.14–1.19 | 1.20–1.33 | 1.34– | | | [code](https://cs.k8s.io/?q=%5CbWinOverlay%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWinOverlay%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | WindowsCPUAndMemoryAffinity | | | 1.32– | | | | | [code](https://cs.k8s.io/?q=%5CbWindowsCPUAndMemoryAffinity%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWindowsCPUAndMemoryAffinity%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index a4c03665dab..58a34ccf347 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -2019,6 +2019,12 @@ lockToDefault: false preRelease: Beta version: "1.34" +- name: WebhookRoundTripLoadBalancing + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "1.37" - name: WindowsCPUAndMemoryAffinity versionedSpecs: - default: false diff --git a/test/e2e/apimachinery/webhook.go b/test/e2e/apimachinery/webhook.go index ae3063ecbd8..d1edbd42d4d 100644 --- a/test/e2e/apimachinery/webhook.go +++ b/test/e2e/apimachinery/webhook.go @@ -392,6 +392,77 @@ var _ = SIGDescribe("AdmissionWebhook [Privileged:ClusterAdmin]", func() { slowWebhookCleanup(ctx) }) + /* + Release: v1.31 + Testname: Admission webhook, connection pool isolation on pod restart + Description: Verify that the admission webhook connection pool is isolated by resolved IP address. + When the backend pod behind the service is recreated and its IP address changes, subsequent admission + requests must successfully route to the new pod IP address without hitting the old connection pool. + */ + ginkgo.It("should isolate connection pool by resolved backend IP address on pod restart", func(ctx context.Context) { + client := f.ClientSet + + ginkgo.By("Registering a validating webhook") + registerWebhook(ctx, f, markersNamespaceName, f.UniqueName, certCtx, servicePort) + + // Get the active webhook pod details + ginkgo.By("Getting the current webhook pod details") + pods, err := client.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: "app=sample-webhook"}) + framework.ExpectNoError(err) + gomega.Expect(pods.Items).To(gomega.HaveLen(1)) + oldPod := pods.Items[0] + oldIP := oldPod.Status.PodIP + framework.Logf("Webhook currently running on pod %s with IP %s", oldPod.Name, oldIP) + + // Recreate/restart the webhook pod by rolling out a change to the deployment + ginkgo.By("Triggering a rolling update of the webhook deployment") + deployment, err := client.AppsV1().Deployments(f.Namespace.Name).Get(ctx, deploymentName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + // Update the deployment template labels or env to trigger a rolling update + if deployment.Spec.Template.Annotations == nil { + deployment.Spec.Template.Annotations = make(map[string]string) + } + deployment.Spec.Template.Annotations["restartedAt"] = time.Now().Format(time.RFC3339) + + deployment, err = client.AppsV1().Deployments(f.Namespace.Name).Update(ctx, deployment, metav1.UpdateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Wait for the deployment rolling update to complete") + err = e2edeployment.WaitForDeploymentComplete(client, deployment) + framework.ExpectNoError(err) + + // Get the new webhook pod details + ginkgo.By("Getting the new webhook pod details") + newPods, err := client.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: "app=sample-webhook"}) + framework.ExpectNoError(err) + gomega.Expect(newPods.Items).To(gomega.HaveLen(1)) + newPod := newPods.Items[0] + newIP := newPod.Status.PodIP + framework.Logf("Webhook now running on pod %s with IP %s", newPod.Name, newIP) + + // Make sure the IP actually changed (or at least the pod was recreated) + gomega.Expect(newPod.Name).ToNot(gomega.Equal(oldPod.Name), "expected webhook pod to be recreated") + + // Execute new admission request and verify it succeeds + ginkgo.By("Sending a new admission request to verify the webhook connection pool successfully routes to the new pod IP") + err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (bool, error) { + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(uuid.NewUUID()), + }, + } + _, err = client.CoreV1().ConfigMaps(f.Namespace.Name).Create(ctx, cm, metav1.CreateOptions{}) + if err == nil { + _ = client.CoreV1().ConfigMaps(f.Namespace.Name).Delete(ctx, cm.Name, metav1.DeleteOptions{}) + return true, nil + } + framework.Logf("Retrying admission request, last error: %v", err) + return false, nil + }) + framework.ExpectNoError(err, "admission request failed after webhook pod restart") + }) + /* Release: v1.16 Testname: Admission webhook, update validating webhook diff --git a/test/integration/apiserver/admissionwebhook/ip_reuse_test.go b/test/integration/apiserver/admissionwebhook/ip_reuse_test.go new file mode 100644 index 00000000000..fa455aa2813 --- /dev/null +++ b/test/integration/apiserver/admissionwebhook/ip_reuse_test.go @@ -0,0 +1,288 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admissionwebhook + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "io" + "net" + "net/http" + "net/url" + "sync" + "sync/atomic" + "testing" + "time" + + admissionv1 "k8s.io/api/admission/v1" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/kubernetes/cmd/kube-apiserver/app" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/test/integration/framework" + "k8s.io/utils/ptr" +) + +type dynamicServiceResolver struct { + lock sync.Mutex + targetURL string +} + +func (r *dynamicServiceResolver) SetTarget(url string) { + r.lock.Lock() + defer r.lock.Unlock() + r.targetURL = url +} + +func (r *dynamicServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) { + r.lock.Lock() + defer r.lock.Unlock() + return url.Parse(r.targetURL) +} + +// TestWebhookConnectionPoolIPReuse asserts that the API Server's webhook client +// isolates its HTTP connection pool by the resolved backend IP address. +// If a service's resolved endpoint changes to a new IP, requests must not +// be routed to the old connection in the pool, preventing IP-reuse security bypasses. +func TestWebhookConnectionPoolIPReuse(t *testing.T) { + roots := x509.NewCertPool() + if !roots.AppendCertsFromPEM(localhostCert) { + t.Fatal("Failed to append Cert from PEM") + } + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Fatalf("Failed to build cert: %v", err) + } + + // Start Server A + var serverACalls int32 + serverAListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + serverA := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&serverACalls, 1) + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var review struct { + Request struct { + UID string `json:"uid"` + } `json:"request"` + } + if err := json.Unmarshal(body, &review); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "admission.k8s.io/v1", + Kind: "AdmissionReview", + }, + Response: &admissionv1.AdmissionResponse{ + UID: types.UID(review.Request.UID), + Allowed: true, + }, + }) + }), + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + }, + } + go func() { + _ = serverA.ServeTLS(serverAListener, "", "") + }() + defer func() { _ = serverA.Close() }() + + // Start Server B + var serverBCalls int32 + serverBListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + serverB := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&serverBCalls, 1) + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var review struct { + Request struct { + UID string `json:"uid"` + } `json:"request"` + } + if err := json.Unmarshal(body, &review); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "admission.k8s.io/v1", + Kind: "AdmissionReview", + }, + Response: &admissionv1.AdmissionResponse{ + UID: types.UID(review.Request.UID), + Allowed: true, + }, + }) + }), + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + }, + } + go func() { + _ = serverB.ServeTLS(serverBListener, "", "") + }() + defer func() { _ = serverB.Close() }() + + urlA := "https://" + serverAListener.Addr().String() + urlB := "https://" + serverBListener.Addr().String() + + // Set up dynamic service resolver starting with Server A + resolver := &dynamicServiceResolver{targetURL: urlA} + t.Cleanup(app.SetServiceResolverForTests(resolver)) + + // Start Test API Server + s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{ + "--disable-admission-plugins=ServiceAccount", + }, framework.SharedEtcd()) + defer s.TearDownFn() + + clientConfig := rest.CopyConfig(s.ClientConfig) + client, err := clientset.NewForConfig(clientConfig) + if err != nil { + t.Fatalf("Unexpected error creating client: %v", err) + } + + // Register Validating Webhook Configuration using a ServiceReference + fail := admissionregistrationv1.Fail + sideEffectsNone := admissionregistrationv1.SideEffectClassNone + webhookConfig, err := client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Create(context.TODO(), &admissionregistrationv1.ValidatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: "ip-reuse.integration.test"}, + Webhooks: []admissionregistrationv1.ValidatingWebhook{{ + Name: "ip-reuse.integration.test", + ClientConfig: admissionregistrationv1.WebhookClientConfig{ + CABundle: localhostCert, + Service: &admissionregistrationv1.ServiceReference{ + Namespace: "test", + Name: "webhook", + Port: ptr.To[int32](443), + }, + }, + Rules: []admissionregistrationv1.RuleWithOperations{{ + Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create}, + Rule: admissionregistrationv1.Rule{ + APIGroups: []string{""}, + APIVersions: []string{"v1"}, + Resources: []string{"pods"}, + }, + }}, + FailurePolicy: &fail, + SideEffects: &sideEffectsNone, + AdmissionReviewVersions: []string{"v1"}, + }}, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create validating webhook config: %v", err) + } + defer func() { + _ = client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Delete(context.TODO(), webhookConfig.Name, metav1.DeleteOptions{}) + }() + + // Request 1: Resolves to Server A. Trigger hook. + podFixture := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod-1", Namespace: "default"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "nginx"}}, + }, + } + + // Wait for webhook to become ready/active + err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + _, err = client.CoreV1().Pods("default").Create(ctx, podFixture, metav1.CreateOptions{}) + if err == nil { + _ = client.CoreV1().Pods("default").Delete(ctx, podFixture.Name, metav1.DeleteOptions{}) + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("Webhook failed to become active: %v", err) + } + + // Reset call counts after warm-up + atomic.StoreInt32(&serverACalls, 0) + atomic.StoreInt32(&serverBCalls, 0) + + // Execute Request 1 (Must go to Server A) + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-a", Namespace: "default"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "nginx"}}, + }, + } + _, err = client.CoreV1().Pods("default").Create(context.TODO(), pod1, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("First pod creation failed: %v", err) + } + _ = client.CoreV1().Pods("default").Delete(context.TODO(), pod1.Name, metav1.DeleteOptions{}) + + if atomic.LoadInt32(&serverACalls) != 1 { + t.Fatalf("Expected 1 call to Server A, got %d", serverACalls) + } + + // Update Endpoint Resolution to Server B (simulating Pod upgrade/IP change) + resolver.SetTarget(urlB) + + // Execute Request 2 (Must go to Server B) + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-b", Namespace: "default"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "nginx"}}, + }, + } + _, err = client.CoreV1().Pods("default").Create(context.TODO(), pod2, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Second pod creation failed: %v", err) + } + _ = client.CoreV1().Pods("default").Delete(context.TODO(), pod2.Name, metav1.DeleteOptions{}) + + // Assertions + // If the connection pool was NOT isolated by IP, the second request would reuse + // the idle connection to Server A. + // With the fix, it must correctly connect to Server B. + if atomic.LoadInt32(&serverBCalls) != 1 { + t.Errorf("Expected 1 call to Server B (New Pod), got %d. The request was incorrectly routed/stuck!", serverBCalls) + } + if atomic.LoadInt32(&serverACalls) != 1 { + t.Errorf("Expected exactly 1 call to Server A (Old Pod), got %d. Stale connection reuse detected!", serverACalls) + } +} diff --git a/test/integration/apiserver/admissionwebhook/load_balance_test.go b/test/integration/apiserver/admissionwebhook/load_balance_test.go index 035d6fa8b4b..9fb947a526b 100644 --- a/test/integration/apiserver/admissionwebhook/load_balance_test.go +++ b/test/integration/apiserver/admissionwebhook/load_balance_test.go @@ -31,6 +31,7 @@ import ( "testing" "time" + admissionv1 "k8s.io/api/admission/v1" "k8s.io/api/admission/v1beta1" admissionregistrationv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" @@ -42,6 +43,7 @@ import ( "k8s.io/kubernetes/cmd/kube-apiserver/app" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/utils/ptr" ) const ( @@ -359,3 +361,227 @@ func (c *connectionTrackingListener) Close() error { func (c *connectionTrackingListener) Addr() net.Addr { return c.delegate.Addr() } + +// TestWebhookLoadBalanceAcrossEndpoints ensures that the webhook client distributes sequential requests +// across different endpoint IPs behind a service when using a ServiceReference. +// Prior to the host-rewrite pool isolation fix, all sequential requests would reuse the first established +// TCP connection from the single shared pool, completely starving other endpoints. +func TestWebhookLoadBalanceAcrossEndpoints(t *testing.T) { + roots := x509.NewCertPool() + if !roots.AppendCertsFromPEM(localhostCert) { + t.Fatal("Failed to append Cert from PEM") + } + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Fatalf("Failed to build cert with error: %+v", err) + } + + // 1. Start Server A + var serverACalls int64 + serverAListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + serverA := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt64(&serverACalls, 1) + defer func() { _ = r.Body.Close() }() + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var review struct { + Request struct { + UID string `json:"uid"` + } `json:"request"` + } + if err := json.Unmarshal(body, &review); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "admission.k8s.io/v1", + Kind: "AdmissionReview", + }, + Response: &admissionv1.AdmissionResponse{ + UID: types.UID(review.Request.UID), + Allowed: true, + }, + }) + }), + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + }, + } + go func() { + _ = serverA.ServeTLS(serverAListener, "", "") + }() + defer func() { _ = serverA.Close() }() + + // 2. Start Server B + var serverBCalls int64 + serverBListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + serverB := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt64(&serverBCalls, 1) + defer func() { _ = r.Body.Close() }() + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var review struct { + Request struct { + UID string `json:"uid"` + } `json:"request"` + } + if err := json.Unmarshal(body, &review); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "admission.k8s.io/v1", + Kind: "AdmissionReview", + }, + Response: &admissionv1.AdmissionResponse{ + UID: types.UID(review.Request.UID), + Allowed: true, + }, + }) + }), + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + }, + } + go func() { + _ = serverB.ServeTLS(serverBListener, "", "") + }() + defer func() { _ = serverB.Close() }() + + // Parse URLs to extract target hosts (IP:Port) + urlA, _ := url.Parse("https://" + serverAListener.Addr().String()) + urlB, _ := url.Parse("https://" + serverBListener.Addr().String()) + + // Set up a round-robin service resolver that returns alternate endpoints for each resolve call + var resolveIndex int64 + resolver := funcServiceResolver(func(namespace, name string, port int32) (*url.URL, error) { + idx := atomic.AddInt64(&resolveIndex, 1) + if idx%2 == 0 { + return urlA, nil + } + return urlB, nil + }) + + t.Cleanup(app.SetServiceResolverForTests(resolver)) + + // 3. Start Test API Server + s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{ + "--disable-admission-plugins=ServiceAccount", + }, framework.SharedEtcd()) + defer s.TearDownFn() + + clientConfig := rest.CopyConfig(s.ClientConfig) + client, err := clientset.NewForConfig(clientConfig) + if err != nil { + t.Fatalf("Unexpected error creating client: %v", err) + } + + // 4. Register a Validating Webhook with ServiceReference + fail := admissionregistrationv1.Fail + sideEffectsNone := admissionregistrationv1.SideEffectClassNone + webhookConfig, err := client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Create(context.TODO(), &admissionregistrationv1.ValidatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: "load-balance-endpoints.integration.test"}, + Webhooks: []admissionregistrationv1.ValidatingWebhook{{ + Name: "load-balance-endpoints.integration.test", + ClientConfig: admissionregistrationv1.WebhookClientConfig{ + CABundle: localhostCert, + Service: &admissionregistrationv1.ServiceReference{ + Namespace: "test", + Name: "webhook", + Port: ptr.To[int32](443), + }, + }, + Rules: []admissionregistrationv1.RuleWithOperations{{ + Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create}, + Rule: admissionregistrationv1.Rule{ + APIGroups: []string{""}, + APIVersions: []string{"v1"}, + Resources: []string{"pods"}, + }, + }}, + FailurePolicy: &fail, + SideEffects: &sideEffectsNone, + AdmissionReviewVersions: []string{"v1"}, + }}, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create validating webhook config: %v", err) + } + defer func() { + _ = client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Delete(context.TODO(), webhookConfig.Name, metav1.DeleteOptions{}) + }() + + // 5. Warm up the webhook until it is ready/active + podFixture := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "warmup-pod", Namespace: "default"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "nginx"}}, + }, + } + err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + _, err = client.CoreV1().Pods("default").Create(ctx, podFixture, metav1.CreateOptions{}) + if err == nil { + _ = client.CoreV1().Pods("default").Delete(ctx, podFixture.Name, metav1.DeleteOptions{}) + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("Webhook failed to become active: %v", err) + } + + // Reset counts + atomic.StoreInt64(&serverACalls, 0) + atomic.StoreInt64(&serverBCalls, 0) + + // 6. Send 100 sequential requests to trigger validating webhook + for i := range 100 { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-pod-%d", i), Namespace: "default"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "nginx"}}, + }, + } + _, err = client.CoreV1().Pods("default").Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Request %d failed: %v", i, err) + } + _ = client.CoreV1().Pods("default").Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + } + + // 7. Assertions + callsA := atomic.LoadInt64(&serverACalls) + callsB := atomic.LoadInt64(&serverBCalls) + + t.Logf("Sequential requests distribution: ServerA = %d calls, ServerB = %d calls", callsA, callsB) + + // Both servers MUST receive a balanced portion of calls under the pool isolation fix. + // We assert a minimum 70-30 distribution split to verify dynamic load-balancing. + if callsA < 30 || callsB < 30 { + t.Errorf("Expected balanced load-balancing across both servers (minimum 70-30 split), but got ServerA = %d calls, ServerB = %d calls. Stale connection pool reuse or starvation detected!", callsA, callsB) + } +} + +type funcServiceResolver func(namespace, name string, port int32) (*url.URL, error) + +func (f funcServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) { + return f(namespace, name, port) +}