mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
webhook use resolved endpoint IP instead of cached
Previously, the webhook transport was switched from HTTP/2 to HTTP/1.1 to work around HTTP/2's single-connection multiplexing, which prevented concurrent requests from load-balancing across multiple backend pods. However, under HTTP/1.1, connections are kept alive and cached as idle in the transport's pool. Because Go's http.Transport keys its connection cache by the request's URL Host (in this case the service name) and we overrode the DialContext to perform dynamic endpoint resolution, when a new request is sent, if there is an idle connection in the pool matching the service hostname, the connection is reused and the dialer is skipped.
This commit is contained in:
parent
f196ee6f5f
commit
59faa4ece9
5 changed files with 737 additions and 15 deletions
|
|
@ -17,11 +17,11 @@ limitations under the License.
|
|||
package webhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -194,21 +194,19 @@ func (cm *ClientManager) hookClientConfig(cc ClientConfig) (*rest.Config, error)
|
|||
cfg.TLSClientConfig.ServerName = serverName
|
||||
}
|
||||
|
||||
delegateDialer := cfg.Dial
|
||||
if delegateDialer == nil {
|
||||
var d net.Dialer
|
||||
delegateDialer = d.DialContext
|
||||
}
|
||||
cfg.Dial = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
if addr == host {
|
||||
u, err := cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
addr = u.Host
|
||||
// Use a custom roundtripper since http transport caches
|
||||
// the connections by the URL. Host. The service resolver
|
||||
// provides the actual endpoint address, if we use a custom
|
||||
// dialer then the cached connection may not be closed.
|
||||
cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
||||
return &resolvingRoundTripper{
|
||||
delegate: rt,
|
||||
serviceResolver: cm.serviceResolver,
|
||||
namespace: cc.Service.Namespace,
|
||||
serviceName: cc.Service.Name,
|
||||
port: port,
|
||||
}
|
||||
return delegateDialer(ctx, network, addr)
|
||||
}
|
||||
})
|
||||
|
||||
return complete(cfg)
|
||||
}
|
||||
|
|
@ -255,3 +253,31 @@ func isLocalHost(u *url.URL) bool {
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// resolvingRoundTripper is a roundtripper that resolves the endpoint address
|
||||
// for the given service and updates the request URL to use the resolved endpoint address.
|
||||
type resolvingRoundTripper struct {
|
||||
delegate http.RoundTripper
|
||||
serviceResolver ServiceResolver
|
||||
namespace string
|
||||
serviceName string
|
||||
port int32
|
||||
}
|
||||
|
||||
func (r *resolvingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
u, err := r.serviceResolver.ResolveEndpoint(r.namespace, r.serviceName, r.port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newReq := req.Clone(req.Context())
|
||||
// Preserve the original Host header
|
||||
if len(newReq.Host) == 0 {
|
||||
newReq.Host = req.URL.Host
|
||||
}
|
||||
newReq.URL.Host = u.Host
|
||||
return r.delegate.RoundTrip(newReq)
|
||||
}
|
||||
|
||||
func (r *resolvingRoundTripper) WrappedRoundTripper() http.RoundTripper {
|
||||
return r.delegate
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,10 +17,17 @@ limitations under the License.
|
|||
package webhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/pem"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
func TestWebhookClientConfig(t *testing.T) {
|
||||
|
|
@ -89,3 +96,126 @@ func allowHTTP2(nextProtos []string) bool {
|
|||
// the transport explicitly set NextProtos and excluded http/2
|
||||
return false
|
||||
}
|
||||
|
||||
type fakeAuthInfoResolver struct{}
|
||||
|
||||
func (f *fakeAuthInfoResolver) ClientConfigFor(server string) (*rest.Config, error) {
|
||||
return &rest.Config{
|
||||
TLSClientConfig: rest.TLSClientConfig{
|
||||
ServerName: "example.com",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (f *fakeAuthInfoResolver) ClientConfigForService(serviceName, namespace string, port int) (*rest.Config, error) {
|
||||
return &rest.Config{
|
||||
TLSClientConfig: rest.TLSClientConfig{
|
||||
ServerName: "example.com",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// fakeDynamicServiceResolver returns the next endpoint in the list for each request.
|
||||
type fakeDynamicServiceResolver struct {
|
||||
endpoints []*url.URL
|
||||
counter int32
|
||||
}
|
||||
|
||||
func (f *fakeDynamicServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
|
||||
val := atomic.AddInt32(&f.counter, 1) - 1
|
||||
if val >= int32(len(f.endpoints)) {
|
||||
val = int32(len(f.endpoints)) - 1
|
||||
}
|
||||
return f.endpoints[val], nil
|
||||
}
|
||||
|
||||
// TestWebhookClientIdleConnectionIPReuse tests that the webhook client follow the resolver
|
||||
// endpoint instead of reusing the previous endpoint when there are IP address changes.
|
||||
func TestWebhookClientIdleConnectionIPReuse(t *testing.T) {
|
||||
var serverACalls int32
|
||||
serverA := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&serverACalls, 1)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte("ServerA"))
|
||||
}))
|
||||
defer serverA.Close()
|
||||
|
||||
var serverBCalls int32
|
||||
serverB := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&serverBCalls, 1)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte("ServerB"))
|
||||
}))
|
||||
defer serverB.Close()
|
||||
|
||||
urlA, err := url.Parse(serverA.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
urlB, err := url.Parse(serverB.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Combine CAs from both test servers
|
||||
var caBundle []byte
|
||||
for _, cert := range serverA.TLS.Certificates[0].Certificate {
|
||||
caBundle = append(caBundle, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})...)
|
||||
}
|
||||
for _, cert := range serverB.TLS.Certificates[0].Certificate {
|
||||
caBundle = append(caBundle, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})...)
|
||||
}
|
||||
|
||||
resolver := &fakeDynamicServiceResolver{
|
||||
endpoints: []*url.URL{urlA, urlB},
|
||||
}
|
||||
|
||||
cm, err := NewClientManager([]schema.GroupVersion{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cm.SetAuthenticationInfoResolver(&fakeAuthInfoResolver{})
|
||||
cm.SetServiceResolver(resolver)
|
||||
|
||||
cc := ClientConfig{
|
||||
Name: "test-webhook",
|
||||
CABundle: caBundle,
|
||||
Service: &ClientConfigService{
|
||||
Name: "test-service",
|
||||
Namespace: "default",
|
||||
Port: 443,
|
||||
},
|
||||
}
|
||||
|
||||
client, err := cm.HookClient(cc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Request 1: Resolves to Server A
|
||||
req1 := client.Post().Body([]byte("test"))
|
||||
res1, err := req1.DoRaw(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("First request failed: %v", err)
|
||||
}
|
||||
if string(res1) != "ServerA" {
|
||||
t.Errorf("Expected Response ServerA, got %s", string(res1))
|
||||
}
|
||||
|
||||
// Request 2: Resolves to Server B
|
||||
req2 := client.Post().Body([]byte("test"))
|
||||
res2, err := req2.DoRaw(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("Second request failed: %v", err)
|
||||
}
|
||||
if string(res2) != "ServerB" {
|
||||
t.Errorf("Expected Response ServerB, got %s", string(res2))
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(&serverACalls) != 1 {
|
||||
t.Errorf("Expected 1 call to Server A, got %d", serverACalls)
|
||||
}
|
||||
if atomic.LoadInt32(&serverBCalls) != 1 {
|
||||
t.Errorf("Expected 1 call to Server B, got %d", serverBCalls)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -392,6 +392,77 @@ var _ = SIGDescribe("AdmissionWebhook [Privileged:ClusterAdmin]", func() {
|
|||
slowWebhookCleanup(ctx)
|
||||
})
|
||||
|
||||
/*
|
||||
Release: v1.31
|
||||
Testname: Admission webhook, connection pool isolation on pod restart
|
||||
Description: Verify that the admission webhook connection pool is isolated by resolved IP address.
|
||||
When the backend pod behind the service is recreated and its IP address changes, subsequent admission
|
||||
requests must successfully route to the new pod IP address without hitting the old connection pool.
|
||||
*/
|
||||
ginkgo.It("should isolate connection pool by resolved backend IP address on pod restart", func(ctx context.Context) {
|
||||
client := f.ClientSet
|
||||
|
||||
ginkgo.By("Registering a validating webhook")
|
||||
registerWebhook(ctx, f, markersNamespaceName, f.UniqueName, certCtx, servicePort)
|
||||
|
||||
// Get the active webhook pod details
|
||||
ginkgo.By("Getting the current webhook pod details")
|
||||
pods, err := client.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: "app=sample-webhook"})
|
||||
framework.ExpectNoError(err)
|
||||
gomega.Expect(pods.Items).To(gomega.HaveLen(1))
|
||||
oldPod := pods.Items[0]
|
||||
oldIP := oldPod.Status.PodIP
|
||||
framework.Logf("Webhook currently running on pod %s with IP %s", oldPod.Name, oldIP)
|
||||
|
||||
// Recreate/restart the webhook pod by rolling out a change to the deployment
|
||||
ginkgo.By("Triggering a rolling update of the webhook deployment")
|
||||
deployment, err := client.AppsV1().Deployments(f.Namespace.Name).Get(ctx, deploymentName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// Update the deployment template labels or env to trigger a rolling update
|
||||
if deployment.Spec.Template.Annotations == nil {
|
||||
deployment.Spec.Template.Annotations = make(map[string]string)
|
||||
}
|
||||
deployment.Spec.Template.Annotations["restartedAt"] = time.Now().Format(time.RFC3339)
|
||||
|
||||
deployment, err = client.AppsV1().Deployments(f.Namespace.Name).Update(ctx, deployment, metav1.UpdateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Wait for the deployment rolling update to complete")
|
||||
err = e2edeployment.WaitForDeploymentComplete(client, deployment)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// Get the new webhook pod details
|
||||
ginkgo.By("Getting the new webhook pod details")
|
||||
newPods, err := client.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: "app=sample-webhook"})
|
||||
framework.ExpectNoError(err)
|
||||
gomega.Expect(newPods.Items).To(gomega.HaveLen(1))
|
||||
newPod := newPods.Items[0]
|
||||
newIP := newPod.Status.PodIP
|
||||
framework.Logf("Webhook now running on pod %s with IP %s", newPod.Name, newIP)
|
||||
|
||||
// Make sure the IP actually changed (or at least the pod was recreated)
|
||||
gomega.Expect(newPod.Name).ToNot(gomega.Equal(oldPod.Name), "expected webhook pod to be recreated")
|
||||
|
||||
// Execute new admission request and verify it succeeds
|
||||
ginkgo.By("Sending a new admission request to verify the webhook connection pool successfully routes to the new pod IP")
|
||||
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
cm := &v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: string(uuid.NewUUID()),
|
||||
},
|
||||
}
|
||||
_, err = client.CoreV1().ConfigMaps(f.Namespace.Name).Create(ctx, cm, metav1.CreateOptions{})
|
||||
if err == nil {
|
||||
_ = client.CoreV1().ConfigMaps(f.Namespace.Name).Delete(ctx, cm.Name, metav1.DeleteOptions{})
|
||||
return true, nil
|
||||
}
|
||||
framework.Logf("Retrying admission request, last error: %v", err)
|
||||
return false, nil
|
||||
})
|
||||
framework.ExpectNoError(err, "admission request failed after webhook pod restart")
|
||||
})
|
||||
|
||||
/*
|
||||
Release: v1.16
|
||||
Testname: Admission webhook, update validating webhook
|
||||
|
|
|
|||
288
test/integration/apiserver/admissionwebhook/ip_reuse_test.go
Normal file
288
test/integration/apiserver/admissionwebhook/ip_reuse_test.go
Normal file
|
|
@ -0,0 +1,288 @@
|
|||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package admissionwebhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
admissionv1 "k8s.io/api/admission/v1"
|
||||
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
type dynamicServiceResolver struct {
|
||||
lock sync.Mutex
|
||||
targetURL string
|
||||
}
|
||||
|
||||
func (r *dynamicServiceResolver) SetTarget(url string) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.targetURL = url
|
||||
}
|
||||
|
||||
func (r *dynamicServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
return url.Parse(r.targetURL)
|
||||
}
|
||||
|
||||
// TestWebhookConnectionPoolIPReuse asserts that the API Server's webhook client
|
||||
// isolates its HTTP connection pool by the resolved backend IP address.
|
||||
// If a service's resolved endpoint changes to a new IP, requests must not
|
||||
// be routed to the old connection in the pool, preventing IP-reuse security bypasses.
|
||||
func TestWebhookConnectionPoolIPReuse(t *testing.T) {
|
||||
roots := x509.NewCertPool()
|
||||
if !roots.AppendCertsFromPEM(localhostCert) {
|
||||
t.Fatal("Failed to append Cert from PEM")
|
||||
}
|
||||
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to build cert: %v", err)
|
||||
}
|
||||
|
||||
// Start Server A
|
||||
var serverACalls int32
|
||||
serverAListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
serverA := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&serverACalls, 1)
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var review struct {
|
||||
Request struct {
|
||||
UID string `json:"uid"`
|
||||
} `json:"request"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &review); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "admission.k8s.io/v1",
|
||||
Kind: "AdmissionReview",
|
||||
},
|
||||
Response: &admissionv1.AdmissionResponse{
|
||||
UID: types.UID(review.Request.UID),
|
||||
Allowed: true,
|
||||
},
|
||||
})
|
||||
}),
|
||||
TLSConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
},
|
||||
}
|
||||
go func() {
|
||||
_ = serverA.ServeTLS(serverAListener, "", "")
|
||||
}()
|
||||
defer func() { _ = serverA.Close() }()
|
||||
|
||||
// Start Server B
|
||||
var serverBCalls int32
|
||||
serverBListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
serverB := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&serverBCalls, 1)
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var review struct {
|
||||
Request struct {
|
||||
UID string `json:"uid"`
|
||||
} `json:"request"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &review); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "admission.k8s.io/v1",
|
||||
Kind: "AdmissionReview",
|
||||
},
|
||||
Response: &admissionv1.AdmissionResponse{
|
||||
UID: types.UID(review.Request.UID),
|
||||
Allowed: true,
|
||||
},
|
||||
})
|
||||
}),
|
||||
TLSConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
},
|
||||
}
|
||||
go func() {
|
||||
_ = serverB.ServeTLS(serverBListener, "", "")
|
||||
}()
|
||||
defer func() { _ = serverB.Close() }()
|
||||
|
||||
urlA := "https://" + serverAListener.Addr().String()
|
||||
urlB := "https://" + serverBListener.Addr().String()
|
||||
|
||||
// Set up dynamic service resolver starting with Server A
|
||||
resolver := &dynamicServiceResolver{targetURL: urlA}
|
||||
t.Cleanup(app.SetServiceResolverForTests(resolver))
|
||||
|
||||
// Start Test API Server
|
||||
s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
}, framework.SharedEtcd())
|
||||
defer s.TearDownFn()
|
||||
|
||||
clientConfig := rest.CopyConfig(s.ClientConfig)
|
||||
client, err := clientset.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error creating client: %v", err)
|
||||
}
|
||||
|
||||
// Register Validating Webhook Configuration using a ServiceReference
|
||||
fail := admissionregistrationv1.Fail
|
||||
sideEffectsNone := admissionregistrationv1.SideEffectClassNone
|
||||
webhookConfig, err := client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Create(context.TODO(), &admissionregistrationv1.ValidatingWebhookConfiguration{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "ip-reuse.integration.test"},
|
||||
Webhooks: []admissionregistrationv1.ValidatingWebhook{{
|
||||
Name: "ip-reuse.integration.test",
|
||||
ClientConfig: admissionregistrationv1.WebhookClientConfig{
|
||||
CABundle: localhostCert,
|
||||
Service: &admissionregistrationv1.ServiceReference{
|
||||
Namespace: "test",
|
||||
Name: "webhook",
|
||||
Port: ptr.To[int32](443),
|
||||
},
|
||||
},
|
||||
Rules: []admissionregistrationv1.RuleWithOperations{{
|
||||
Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create},
|
||||
Rule: admissionregistrationv1.Rule{
|
||||
APIGroups: []string{""},
|
||||
APIVersions: []string{"v1"},
|
||||
Resources: []string{"pods"},
|
||||
},
|
||||
}},
|
||||
FailurePolicy: &fail,
|
||||
SideEffects: &sideEffectsNone,
|
||||
AdmissionReviewVersions: []string{"v1"},
|
||||
}},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create validating webhook config: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Delete(context.TODO(), webhookConfig.Name, metav1.DeleteOptions{})
|
||||
}()
|
||||
|
||||
// Request 1: Resolves to Server A. Trigger hook.
|
||||
podFixture := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "test-pod-1", Namespace: "default"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "test", Image: "nginx"}},
|
||||
},
|
||||
}
|
||||
|
||||
// Wait for webhook to become ready/active
|
||||
err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
_, err = client.CoreV1().Pods("default").Create(ctx, podFixture, metav1.CreateOptions{})
|
||||
if err == nil {
|
||||
_ = client.CoreV1().Pods("default").Delete(ctx, podFixture.Name, metav1.DeleteOptions{})
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Webhook failed to become active: %v", err)
|
||||
}
|
||||
|
||||
// Reset call counts after warm-up
|
||||
atomic.StoreInt32(&serverACalls, 0)
|
||||
atomic.StoreInt32(&serverBCalls, 0)
|
||||
|
||||
// Execute Request 1 (Must go to Server A)
|
||||
pod1 := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "pod-a", Namespace: "default"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "test", Image: "nginx"}},
|
||||
},
|
||||
}
|
||||
_, err = client.CoreV1().Pods("default").Create(context.TODO(), pod1, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("First pod creation failed: %v", err)
|
||||
}
|
||||
_ = client.CoreV1().Pods("default").Delete(context.TODO(), pod1.Name, metav1.DeleteOptions{})
|
||||
|
||||
if atomic.LoadInt32(&serverACalls) != 1 {
|
||||
t.Fatalf("Expected 1 call to Server A, got %d", serverACalls)
|
||||
}
|
||||
|
||||
// Update Endpoint Resolution to Server B (simulating Pod upgrade/IP change)
|
||||
resolver.SetTarget(urlB)
|
||||
|
||||
// Execute Request 2 (Must go to Server B)
|
||||
pod2 := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "pod-b", Namespace: "default"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "test", Image: "nginx"}},
|
||||
},
|
||||
}
|
||||
_, err = client.CoreV1().Pods("default").Create(context.TODO(), pod2, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Second pod creation failed: %v", err)
|
||||
}
|
||||
_ = client.CoreV1().Pods("default").Delete(context.TODO(), pod2.Name, metav1.DeleteOptions{})
|
||||
|
||||
// Assertions
|
||||
// If the connection pool was NOT isolated by IP, the second request would reuse
|
||||
// the idle connection to Server A.
|
||||
// With the fix, it must correctly connect to Server B.
|
||||
if atomic.LoadInt32(&serverBCalls) != 1 {
|
||||
t.Errorf("Expected 1 call to Server B (New Pod), got %d. The request was incorrectly routed/stuck!", serverBCalls)
|
||||
}
|
||||
if atomic.LoadInt32(&serverACalls) != 1 {
|
||||
t.Errorf("Expected exactly 1 call to Server A (Old Pod), got %d. Stale connection reuse detected!", serverACalls)
|
||||
}
|
||||
}
|
||||
|
|
@ -42,6 +42,7 @@ import (
|
|||
"k8s.io/kubernetes/cmd/kube-apiserver/app"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -359,3 +360,209 @@ func (c *connectionTrackingListener) Close() error {
|
|||
func (c *connectionTrackingListener) Addr() net.Addr {
|
||||
return c.delegate.Addr()
|
||||
}
|
||||
|
||||
// TestWebhookLoadBalanceAcrossEndpoints ensures that the webhook client distributes sequential requests
|
||||
// across different endpoint IPs behind a service when using a ServiceReference.
|
||||
// Prior to the host-rewrite pool isolation fix, all sequential requests would reuse the first established
|
||||
// TCP connection from the single shared pool, completely starving other endpoints.
|
||||
func TestWebhookLoadBalanceAcrossEndpoints(t *testing.T) {
|
||||
roots := x509.NewCertPool()
|
||||
if !roots.AppendCertsFromPEM(localhostCert) {
|
||||
t.Fatal("Failed to append Cert from PEM")
|
||||
}
|
||||
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to build cert with error: %+v", err)
|
||||
}
|
||||
|
||||
// 1. Start Server A
|
||||
var serverACalls int64
|
||||
serverAListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
serverA := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt64(&serverACalls, 1)
|
||||
defer func() { _ = r.Body.Close() }()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var review struct {
|
||||
Request struct {
|
||||
UID string `json:"uid"`
|
||||
} `json:"request"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &review); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(fmt.Sprintf(`{"apiVersion":"admission.k8s.io/v1","kind":"AdmissionReview","response":{"uid":"%s","allowed":true}}`, review.Request.UID)))
|
||||
}),
|
||||
TLSConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
},
|
||||
}
|
||||
go func() {
|
||||
_ = serverA.ServeTLS(serverAListener, "", "")
|
||||
}()
|
||||
defer func() { _ = serverA.Close() }()
|
||||
|
||||
// 2. Start Server B
|
||||
var serverBCalls int64
|
||||
serverBListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
serverB := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt64(&serverBCalls, 1)
|
||||
defer func() { _ = r.Body.Close() }()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var review struct {
|
||||
Request struct {
|
||||
UID string `json:"uid"`
|
||||
} `json:"request"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &review); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(fmt.Sprintf(`{"apiVersion":"admission.k8s.io/v1","kind":"AdmissionReview","response":{"uid":"%s","allowed":true}}`, review.Request.UID)))
|
||||
}),
|
||||
TLSConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
},
|
||||
}
|
||||
go func() {
|
||||
_ = serverB.ServeTLS(serverBListener, "", "")
|
||||
}()
|
||||
defer func() { _ = serverB.Close() }()
|
||||
|
||||
// Parse URLs to extract target hosts (IP:Port)
|
||||
urlA, _ := url.Parse("https://" + serverAListener.Addr().String())
|
||||
urlB, _ := url.Parse("https://" + serverBListener.Addr().String())
|
||||
|
||||
// Set up a round-robin service resolver that returns alternate endpoints for each resolve call
|
||||
var resolveIndex int64
|
||||
resolver := funcServiceResolver(func(namespace, name string, port int32) (*url.URL, error) {
|
||||
idx := atomic.AddInt64(&resolveIndex, 1)
|
||||
if idx%2 == 0 {
|
||||
return urlA, nil
|
||||
}
|
||||
return urlB, nil
|
||||
})
|
||||
|
||||
t.Cleanup(app.SetServiceResolverForTests(resolver))
|
||||
|
||||
// 3. Start Test API Server
|
||||
s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
}, framework.SharedEtcd())
|
||||
defer s.TearDownFn()
|
||||
|
||||
clientConfig := rest.CopyConfig(s.ClientConfig)
|
||||
client, err := clientset.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error creating client: %v", err)
|
||||
}
|
||||
|
||||
// 4. Register a Validating Webhook with ServiceReference
|
||||
fail := admissionregistrationv1.Fail
|
||||
sideEffectsNone := admissionregistrationv1.SideEffectClassNone
|
||||
webhookConfig, err := client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Create(context.TODO(), &admissionregistrationv1.ValidatingWebhookConfiguration{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "load-balance-endpoints.integration.test"},
|
||||
Webhooks: []admissionregistrationv1.ValidatingWebhook{{
|
||||
Name: "load-balance-endpoints.integration.test",
|
||||
ClientConfig: admissionregistrationv1.WebhookClientConfig{
|
||||
CABundle: localhostCert,
|
||||
Service: &admissionregistrationv1.ServiceReference{
|
||||
Namespace: "test",
|
||||
Name: "webhook",
|
||||
Port: ptr.To[int32](443),
|
||||
},
|
||||
},
|
||||
Rules: []admissionregistrationv1.RuleWithOperations{{
|
||||
Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create},
|
||||
Rule: admissionregistrationv1.Rule{
|
||||
APIGroups: []string{""},
|
||||
APIVersions: []string{"v1"},
|
||||
Resources: []string{"pods"},
|
||||
},
|
||||
}},
|
||||
FailurePolicy: &fail,
|
||||
SideEffects: &sideEffectsNone,
|
||||
AdmissionReviewVersions: []string{"v1"},
|
||||
}},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create validating webhook config: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Delete(context.TODO(), webhookConfig.Name, metav1.DeleteOptions{})
|
||||
}()
|
||||
|
||||
// 5. Warm up the webhook until it is ready/active
|
||||
podFixture := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "warmup-pod", Namespace: "default"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "test", Image: "nginx"}},
|
||||
},
|
||||
}
|
||||
err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
_, err = client.CoreV1().Pods("default").Create(ctx, podFixture, metav1.CreateOptions{})
|
||||
if err == nil {
|
||||
_ = client.CoreV1().Pods("default").Delete(ctx, podFixture.Name, metav1.DeleteOptions{})
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Webhook failed to become active: %v", err)
|
||||
}
|
||||
|
||||
// Reset counts
|
||||
atomic.StoreInt64(&serverACalls, 0)
|
||||
atomic.StoreInt64(&serverBCalls, 0)
|
||||
|
||||
// 6. Send 100 sequential requests to trigger validating webhook
|
||||
for i := range 100 {
|
||||
pod := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-pod-%d", i), Namespace: "default"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "test", Image: "nginx"}},
|
||||
},
|
||||
}
|
||||
_, err = client.CoreV1().Pods("default").Create(context.TODO(), pod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Request %d failed: %v", i, err)
|
||||
}
|
||||
_ = client.CoreV1().Pods("default").Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
// 7. Assertions
|
||||
callsA := atomic.LoadInt64(&serverACalls)
|
||||
callsB := atomic.LoadInt64(&serverBCalls)
|
||||
|
||||
t.Logf("Sequential requests distribution: ServerA = %d calls, ServerB = %d calls", callsA, callsB)
|
||||
|
||||
// Both servers MUST receive a balanced portion of calls under the pool isolation fix.
|
||||
// We assert a minimum 70-30 distribution split to verify dynamic load-balancing.
|
||||
if callsA < 30 || callsB < 30 {
|
||||
t.Errorf("Expected balanced load-balancing across both servers (minimum 70-30 split), but got ServerA = %d calls, ServerB = %d calls. Stale connection pool reuse or starvation detected!", callsA, callsB)
|
||||
}
|
||||
}
|
||||
|
||||
type funcServiceResolver func(namespace, name string, port int32) (*url.URL, error)
|
||||
|
||||
func (f funcServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
|
||||
return f(namespace, name, port)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue