mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-11 09:53:38 -04:00
Merge pull request #139237 from aojea/webhook_idle_
webhook use resolved endpoint IP instead of cached
This commit is contained in:
commit
b4e4d2cfc1
9 changed files with 837 additions and 13 deletions
|
|
@ -2222,6 +2222,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
|
|||
{Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta},
|
||||
},
|
||||
|
||||
genericfeatures.WebhookRoundTripLoadBalancing: {
|
||||
{Version: version.MustParse("1.37"), Default: true, PreRelease: featuregate.Beta},
|
||||
},
|
||||
|
||||
kcmfeatures.CloudControllerManagerWatchBasedRoutesReconciliation: {
|
||||
{Version: version.MustParse("1.35"), Default: false, PreRelease: featuregate.Alpha},
|
||||
},
|
||||
|
|
@ -2663,6 +2667,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
|
|||
|
||||
genericfeatures.WatchList: {},
|
||||
|
||||
genericfeatures.WebhookRoundTripLoadBalancing: {},
|
||||
|
||||
kcmfeatures.CloudControllerManagerWatchBasedRoutesReconciliation: {},
|
||||
|
||||
kcmfeatures.CloudControllerManagerWebhook: {},
|
||||
|
|
|
|||
|
|
@ -276,6 +276,14 @@ const (
|
|||
//
|
||||
// Allow the API server to stream individual items instead of chunking
|
||||
WatchList featuregate.Feature = "WatchList"
|
||||
|
||||
// owner: @aojea
|
||||
// beta: v1.37
|
||||
//
|
||||
// Enables using a custom resolving RoundTripper to load-balance admission
|
||||
// webhook requests across service endpoints instead of caching connections
|
||||
// by service name.
|
||||
WebhookRoundTripLoadBalancing featuregate.Feature = "WebhookRoundTripLoadBalancing"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
@ -452,4 +460,8 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
|
|||
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Beta},
|
||||
{Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta},
|
||||
},
|
||||
|
||||
WebhookRoundTripLoadBalancing: {
|
||||
{Version: version.MustParse("1.37"), Default: true, PreRelease: featuregate.Beta},
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -30,6 +31,8 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/apiserver/pkg/util/x509metrics"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/utils/lru"
|
||||
|
|
@ -194,20 +197,36 @@ func (cm *ClientManager) hookClientConfig(cc ClientConfig) (*rest.Config, error)
|
|||
cfg.TLSClientConfig.ServerName = serverName
|
||||
}
|
||||
|
||||
delegateDialer := cfg.Dial
|
||||
if delegateDialer == nil {
|
||||
var d net.Dialer
|
||||
delegateDialer = d.DialContext
|
||||
}
|
||||
cfg.Dial = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
if addr == host {
|
||||
u, err := cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
addr = u.Host
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.WebhookRoundTripLoadBalancing) {
|
||||
delegateDialer := cfg.Dial
|
||||
if delegateDialer == nil {
|
||||
var d net.Dialer
|
||||
delegateDialer = d.DialContext
|
||||
}
|
||||
return delegateDialer(ctx, network, addr)
|
||||
cfg.Dial = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
if addr == host {
|
||||
u, err := cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
addr = u.Host
|
||||
}
|
||||
return delegateDialer(ctx, network, addr)
|
||||
}
|
||||
} else {
|
||||
// Use a custom roundtripper since http transport caches
|
||||
// the connections by the URL. Host. The service resolver
|
||||
// provides the actual endpoint address, if we use a custom
|
||||
// dialer then the cached connection may not be closed.
|
||||
cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
||||
return &resolvingRoundTripper{
|
||||
delegate: rt,
|
||||
serviceResolver: cm.serviceResolver,
|
||||
namespace: cc.Service.Namespace,
|
||||
serviceName: cc.Service.Name,
|
||||
port: port,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return complete(cfg)
|
||||
|
|
@ -255,3 +274,36 @@ func isLocalHost(u *url.URL) bool {
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// resolvingRoundTripper is a roundtripper that resolves the endpoint address
|
||||
// for the given service and updates the request URL to use the resolved endpoint address.
|
||||
type resolvingRoundTripper struct {
|
||||
delegate http.RoundTripper
|
||||
serviceResolver ServiceResolver
|
||||
namespace string
|
||||
serviceName string
|
||||
port int32
|
||||
}
|
||||
|
||||
func (r *resolvingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
serverName := r.serviceName + "." + r.namespace + ".svc"
|
||||
host := net.JoinHostPort(serverName, strconv.Itoa(int(r.port)))
|
||||
if req.URL.Host != host {
|
||||
return r.delegate.RoundTrip(req)
|
||||
}
|
||||
u, err := r.serviceResolver.ResolveEndpoint(r.namespace, r.serviceName, r.port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newReq := req.Clone(req.Context())
|
||||
// Preserve the original Host header
|
||||
if len(newReq.Host) == 0 {
|
||||
newReq.Host = req.URL.Host
|
||||
}
|
||||
newReq.URL.Host = u.Host
|
||||
return r.delegate.RoundTrip(newReq)
|
||||
}
|
||||
|
||||
func (r *resolvingRoundTripper) WrappedRoundTripper() http.RoundTripper {
|
||||
return r.delegate
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,10 +17,20 @@ limitations under the License.
|
|||
package webhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/pem"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/rest"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
)
|
||||
|
||||
func TestWebhookClientConfig(t *testing.T) {
|
||||
|
|
@ -89,3 +99,155 @@ func allowHTTP2(nextProtos []string) bool {
|
|||
// the transport explicitly set NextProtos and excluded http/2
|
||||
return false
|
||||
}
|
||||
|
||||
type fakeAuthInfoResolver struct{}
|
||||
|
||||
func (f *fakeAuthInfoResolver) ClientConfigFor(server string) (*rest.Config, error) {
|
||||
return &rest.Config{
|
||||
TLSClientConfig: rest.TLSClientConfig{
|
||||
ServerName: "example.com",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (f *fakeAuthInfoResolver) ClientConfigForService(serviceName, namespace string, port int) (*rest.Config, error) {
|
||||
return &rest.Config{
|
||||
TLSClientConfig: rest.TLSClientConfig{
|
||||
ServerName: "example.com",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// fakeDynamicServiceResolver returns the next endpoint in the list for each request.
|
||||
type fakeDynamicServiceResolver struct {
|
||||
endpoints []*url.URL
|
||||
counter int32
|
||||
}
|
||||
|
||||
func (f *fakeDynamicServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
|
||||
val := atomic.AddInt32(&f.counter, 1) - 1
|
||||
if val >= int32(len(f.endpoints)) {
|
||||
val = int32(len(f.endpoints)) - 1
|
||||
}
|
||||
return f.endpoints[val], nil
|
||||
}
|
||||
|
||||
// TestWebhookClientIdleConnectionIPReuse tests that the webhook client follow the resolver
|
||||
// endpoint instead of reusing the previous endpoint when there are IP address changes.
|
||||
func TestWebhookClientIdleConnectionIPReuse(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
enableFeatureGate bool
|
||||
expectedServerACalls int32
|
||||
expectedServerBCalls int32
|
||||
expectedSecondResponse string
|
||||
}{
|
||||
{
|
||||
name: "feature gate enabled - round-trip load balancing routes to Server B",
|
||||
enableFeatureGate: true,
|
||||
expectedServerACalls: 1,
|
||||
expectedServerBCalls: 1,
|
||||
expectedSecondResponse: "ServerB",
|
||||
},
|
||||
{
|
||||
name: "feature gate disabled - dialer resolution caches to Server A",
|
||||
enableFeatureGate: false,
|
||||
expectedServerACalls: 2,
|
||||
expectedServerBCalls: 0,
|
||||
expectedSecondResponse: "ServerA",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WebhookRoundTripLoadBalancing, tc.enableFeatureGate)
|
||||
|
||||
var serverACalls int32
|
||||
serverA := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&serverACalls, 1)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte("ServerA"))
|
||||
}))
|
||||
defer serverA.Close()
|
||||
|
||||
var serverBCalls int32
|
||||
serverB := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&serverBCalls, 1)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte("ServerB"))
|
||||
}))
|
||||
defer serverB.Close()
|
||||
|
||||
urlA, err := url.Parse(serverA.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
urlB, err := url.Parse(serverB.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Combine CAs from both test servers
|
||||
var caBundle []byte
|
||||
for _, cert := range serverA.TLS.Certificates[0].Certificate {
|
||||
caBundle = append(caBundle, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})...)
|
||||
}
|
||||
for _, cert := range serverB.TLS.Certificates[0].Certificate {
|
||||
caBundle = append(caBundle, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})...)
|
||||
}
|
||||
|
||||
resolver := &fakeDynamicServiceResolver{
|
||||
endpoints: []*url.URL{urlA, urlB},
|
||||
}
|
||||
|
||||
cm, err := NewClientManager([]schema.GroupVersion{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cm.SetAuthenticationInfoResolver(&fakeAuthInfoResolver{})
|
||||
cm.SetServiceResolver(resolver)
|
||||
|
||||
cc := ClientConfig{
|
||||
Name: "test-webhook",
|
||||
CABundle: caBundle,
|
||||
Service: &ClientConfigService{
|
||||
Name: "test-service",
|
||||
Namespace: "default",
|
||||
Port: 443,
|
||||
},
|
||||
}
|
||||
|
||||
client, err := cm.HookClient(cc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Request 1: Resolves to Server A
|
||||
req1 := client.Post().Body([]byte("test"))
|
||||
res1, err := req1.DoRaw(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("First request failed: %v", err)
|
||||
}
|
||||
if string(res1) != "ServerA" {
|
||||
t.Errorf("Expected Response ServerA, got %s", string(res1))
|
||||
}
|
||||
|
||||
// Request 2: Resolves to Server B if feature gate enabled, Server A if disabled
|
||||
req2 := client.Post().Body([]byte("test"))
|
||||
res2, err := req2.DoRaw(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("Second request failed: %v", err)
|
||||
}
|
||||
if string(res2) != tc.expectedSecondResponse {
|
||||
t.Errorf("Expected Response %s, got %s", tc.expectedSecondResponse, string(res2))
|
||||
}
|
||||
|
||||
if callsA := atomic.LoadInt32(&serverACalls); callsA != tc.expectedServerACalls {
|
||||
t.Errorf("Expected %d calls to Server A, got %d", tc.expectedServerACalls, callsA)
|
||||
}
|
||||
if callsB := atomic.LoadInt32(&serverBCalls); callsB != tc.expectedServerBCalls {
|
||||
t.Errorf("Expected %d calls to Server B, got %d", tc.expectedServerBCalls, callsB)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -219,6 +219,7 @@
|
|||
| WatchCacheInitializationPostStartHook | :ballot_box_with_check: 1.36+ | :closed_lock_with_key: 1.37+ | | 1.31–1.36 | 1.37– | | | [code](https://cs.k8s.io/?q=%5CbWatchCacheInitializationPostStartHook%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWatchCacheInitializationPostStartHook%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| WatchList | :ballot_box_with_check: 1.32+ | | 1.27–1.31 | 1.32– | | | | [code](https://cs.k8s.io/?q=%5CbWatchList%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWatchList%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| WatchListClient | :ballot_box_with_check: 1.35+ | | | 1.30– | | | | [code](https://cs.k8s.io/?q=%5CbWatchListClient%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWatchListClient%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| WebhookRoundTripLoadBalancing | :ballot_box_with_check: 1.37+ | | | 1.37– | | | | [code](https://cs.k8s.io/?q=%5CbWebhookRoundTripLoadBalancing%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWebhookRoundTripLoadBalancing%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| WinDSR | :ballot_box_with_check: 1.33+ | :closed_lock_with_key: 1.34+ | 1.14–1.32 | 1.33 | 1.34– | | | [code](https://cs.k8s.io/?q=%5CbWinDSR%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWinDSR%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| WinOverlay | :ballot_box_with_check: 1.20+ | :closed_lock_with_key: 1.34+ | 1.14–1.19 | 1.20–1.33 | 1.34– | | | [code](https://cs.k8s.io/?q=%5CbWinOverlay%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWinOverlay%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| WindowsCPUAndMemoryAffinity | | | 1.32– | | | | | [code](https://cs.k8s.io/?q=%5CbWindowsCPUAndMemoryAffinity%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbWindowsCPUAndMemoryAffinity%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
|
|
|
|||
|
|
@ -2047,6 +2047,12 @@
|
|||
lockToDefault: false
|
||||
preRelease: Beta
|
||||
version: "1.34"
|
||||
- name: WebhookRoundTripLoadBalancing
|
||||
versionedSpecs:
|
||||
- default: true
|
||||
lockToDefault: false
|
||||
preRelease: Beta
|
||||
version: "1.37"
|
||||
- name: WindowsCPUAndMemoryAffinity
|
||||
versionedSpecs:
|
||||
- default: false
|
||||
|
|
|
|||
|
|
@ -392,6 +392,77 @@ var _ = SIGDescribe("AdmissionWebhook [Privileged:ClusterAdmin]", func() {
|
|||
slowWebhookCleanup(ctx)
|
||||
})
|
||||
|
||||
/*
|
||||
Release: v1.31
|
||||
Testname: Admission webhook, connection pool isolation on pod restart
|
||||
Description: Verify that the admission webhook connection pool is isolated by resolved IP address.
|
||||
When the backend pod behind the service is recreated and its IP address changes, subsequent admission
|
||||
requests must successfully route to the new pod IP address without hitting the old connection pool.
|
||||
*/
|
||||
ginkgo.It("should isolate connection pool by resolved backend IP address on pod restart", func(ctx context.Context) {
|
||||
client := f.ClientSet
|
||||
|
||||
ginkgo.By("Registering a validating webhook")
|
||||
registerWebhook(ctx, f, markersNamespaceName, f.UniqueName, certCtx, servicePort)
|
||||
|
||||
// Get the active webhook pod details
|
||||
ginkgo.By("Getting the current webhook pod details")
|
||||
pods, err := client.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: "app=sample-webhook"})
|
||||
framework.ExpectNoError(err)
|
||||
gomega.Expect(pods.Items).To(gomega.HaveLen(1))
|
||||
oldPod := pods.Items[0]
|
||||
oldIP := oldPod.Status.PodIP
|
||||
framework.Logf("Webhook currently running on pod %s with IP %s", oldPod.Name, oldIP)
|
||||
|
||||
// Recreate/restart the webhook pod by rolling out a change to the deployment
|
||||
ginkgo.By("Triggering a rolling update of the webhook deployment")
|
||||
deployment, err := client.AppsV1().Deployments(f.Namespace.Name).Get(ctx, deploymentName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// Update the deployment template labels or env to trigger a rolling update
|
||||
if deployment.Spec.Template.Annotations == nil {
|
||||
deployment.Spec.Template.Annotations = make(map[string]string)
|
||||
}
|
||||
deployment.Spec.Template.Annotations["restartedAt"] = time.Now().Format(time.RFC3339)
|
||||
|
||||
deployment, err = client.AppsV1().Deployments(f.Namespace.Name).Update(ctx, deployment, metav1.UpdateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Wait for the deployment rolling update to complete")
|
||||
err = e2edeployment.WaitForDeploymentComplete(client, deployment)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// Get the new webhook pod details
|
||||
ginkgo.By("Getting the new webhook pod details")
|
||||
newPods, err := client.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: "app=sample-webhook"})
|
||||
framework.ExpectNoError(err)
|
||||
gomega.Expect(newPods.Items).To(gomega.HaveLen(1))
|
||||
newPod := newPods.Items[0]
|
||||
newIP := newPod.Status.PodIP
|
||||
framework.Logf("Webhook now running on pod %s with IP %s", newPod.Name, newIP)
|
||||
|
||||
// Make sure the IP actually changed (or at least the pod was recreated)
|
||||
gomega.Expect(newPod.Name).ToNot(gomega.Equal(oldPod.Name), "expected webhook pod to be recreated")
|
||||
|
||||
// Execute new admission request and verify it succeeds
|
||||
ginkgo.By("Sending a new admission request to verify the webhook connection pool successfully routes to the new pod IP")
|
||||
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
cm := &v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: string(uuid.NewUUID()),
|
||||
},
|
||||
}
|
||||
_, err = client.CoreV1().ConfigMaps(f.Namespace.Name).Create(ctx, cm, metav1.CreateOptions{})
|
||||
if err == nil {
|
||||
_ = client.CoreV1().ConfigMaps(f.Namespace.Name).Delete(ctx, cm.Name, metav1.DeleteOptions{})
|
||||
return true, nil
|
||||
}
|
||||
framework.Logf("Retrying admission request, last error: %v", err)
|
||||
return false, nil
|
||||
})
|
||||
framework.ExpectNoError(err, "admission request failed after webhook pod restart")
|
||||
})
|
||||
|
||||
/*
|
||||
Release: v1.16
|
||||
Testname: Admission webhook, update validating webhook
|
||||
|
|
|
|||
288
test/integration/apiserver/admissionwebhook/ip_reuse_test.go
Normal file
288
test/integration/apiserver/admissionwebhook/ip_reuse_test.go
Normal file
|
|
@ -0,0 +1,288 @@
|
|||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package admissionwebhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
admissionv1 "k8s.io/api/admission/v1"
|
||||
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
type dynamicServiceResolver struct {
|
||||
lock sync.Mutex
|
||||
targetURL string
|
||||
}
|
||||
|
||||
func (r *dynamicServiceResolver) SetTarget(url string) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.targetURL = url
|
||||
}
|
||||
|
||||
func (r *dynamicServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
return url.Parse(r.targetURL)
|
||||
}
|
||||
|
||||
// TestWebhookConnectionPoolIPReuse asserts that the API Server's webhook client
|
||||
// isolates its HTTP connection pool by the resolved backend IP address.
|
||||
// If a service's resolved endpoint changes to a new IP, requests must not
|
||||
// be routed to the old connection in the pool, preventing IP-reuse security bypasses.
|
||||
func TestWebhookConnectionPoolIPReuse(t *testing.T) {
|
||||
roots := x509.NewCertPool()
|
||||
if !roots.AppendCertsFromPEM(localhostCert) {
|
||||
t.Fatal("Failed to append Cert from PEM")
|
||||
}
|
||||
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to build cert: %v", err)
|
||||
}
|
||||
|
||||
// Start Server A
|
||||
var serverACalls int32
|
||||
serverAListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
serverA := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&serverACalls, 1)
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var review struct {
|
||||
Request struct {
|
||||
UID string `json:"uid"`
|
||||
} `json:"request"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &review); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "admission.k8s.io/v1",
|
||||
Kind: "AdmissionReview",
|
||||
},
|
||||
Response: &admissionv1.AdmissionResponse{
|
||||
UID: types.UID(review.Request.UID),
|
||||
Allowed: true,
|
||||
},
|
||||
})
|
||||
}),
|
||||
TLSConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
},
|
||||
}
|
||||
go func() {
|
||||
_ = serverA.ServeTLS(serverAListener, "", "")
|
||||
}()
|
||||
defer func() { _ = serverA.Close() }()
|
||||
|
||||
// Start Server B
|
||||
var serverBCalls int32
|
||||
serverBListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
serverB := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&serverBCalls, 1)
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var review struct {
|
||||
Request struct {
|
||||
UID string `json:"uid"`
|
||||
} `json:"request"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &review); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "admission.k8s.io/v1",
|
||||
Kind: "AdmissionReview",
|
||||
},
|
||||
Response: &admissionv1.AdmissionResponse{
|
||||
UID: types.UID(review.Request.UID),
|
||||
Allowed: true,
|
||||
},
|
||||
})
|
||||
}),
|
||||
TLSConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
},
|
||||
}
|
||||
go func() {
|
||||
_ = serverB.ServeTLS(serverBListener, "", "")
|
||||
}()
|
||||
defer func() { _ = serverB.Close() }()
|
||||
|
||||
urlA := "https://" + serverAListener.Addr().String()
|
||||
urlB := "https://" + serverBListener.Addr().String()
|
||||
|
||||
// Set up dynamic service resolver starting with Server A
|
||||
resolver := &dynamicServiceResolver{targetURL: urlA}
|
||||
t.Cleanup(app.SetServiceResolverForTests(resolver))
|
||||
|
||||
// Start Test API Server
|
||||
s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
}, framework.SharedEtcd())
|
||||
defer s.TearDownFn()
|
||||
|
||||
clientConfig := rest.CopyConfig(s.ClientConfig)
|
||||
client, err := clientset.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error creating client: %v", err)
|
||||
}
|
||||
|
||||
// Register Validating Webhook Configuration using a ServiceReference
|
||||
fail := admissionregistrationv1.Fail
|
||||
sideEffectsNone := admissionregistrationv1.SideEffectClassNone
|
||||
webhookConfig, err := client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Create(context.TODO(), &admissionregistrationv1.ValidatingWebhookConfiguration{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "ip-reuse.integration.test"},
|
||||
Webhooks: []admissionregistrationv1.ValidatingWebhook{{
|
||||
Name: "ip-reuse.integration.test",
|
||||
ClientConfig: admissionregistrationv1.WebhookClientConfig{
|
||||
CABundle: localhostCert,
|
||||
Service: &admissionregistrationv1.ServiceReference{
|
||||
Namespace: "test",
|
||||
Name: "webhook",
|
||||
Port: ptr.To[int32](443),
|
||||
},
|
||||
},
|
||||
Rules: []admissionregistrationv1.RuleWithOperations{{
|
||||
Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create},
|
||||
Rule: admissionregistrationv1.Rule{
|
||||
APIGroups: []string{""},
|
||||
APIVersions: []string{"v1"},
|
||||
Resources: []string{"pods"},
|
||||
},
|
||||
}},
|
||||
FailurePolicy: &fail,
|
||||
SideEffects: &sideEffectsNone,
|
||||
AdmissionReviewVersions: []string{"v1"},
|
||||
}},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create validating webhook config: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Delete(context.TODO(), webhookConfig.Name, metav1.DeleteOptions{})
|
||||
}()
|
||||
|
||||
// Request 1: Resolves to Server A. Trigger hook.
|
||||
podFixture := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "test-pod-1", Namespace: "default"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "test", Image: "nginx"}},
|
||||
},
|
||||
}
|
||||
|
||||
// Wait for webhook to become ready/active
|
||||
err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
_, err = client.CoreV1().Pods("default").Create(ctx, podFixture, metav1.CreateOptions{})
|
||||
if err == nil {
|
||||
_ = client.CoreV1().Pods("default").Delete(ctx, podFixture.Name, metav1.DeleteOptions{})
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Webhook failed to become active: %v", err)
|
||||
}
|
||||
|
||||
// Reset call counts after warm-up
|
||||
atomic.StoreInt32(&serverACalls, 0)
|
||||
atomic.StoreInt32(&serverBCalls, 0)
|
||||
|
||||
// Execute Request 1 (Must go to Server A)
|
||||
pod1 := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "pod-a", Namespace: "default"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "test", Image: "nginx"}},
|
||||
},
|
||||
}
|
||||
_, err = client.CoreV1().Pods("default").Create(context.TODO(), pod1, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("First pod creation failed: %v", err)
|
||||
}
|
||||
_ = client.CoreV1().Pods("default").Delete(context.TODO(), pod1.Name, metav1.DeleteOptions{})
|
||||
|
||||
if atomic.LoadInt32(&serverACalls) != 1 {
|
||||
t.Fatalf("Expected 1 call to Server A, got %d", serverACalls)
|
||||
}
|
||||
|
||||
// Update Endpoint Resolution to Server B (simulating Pod upgrade/IP change)
|
||||
resolver.SetTarget(urlB)
|
||||
|
||||
// Execute Request 2 (Must go to Server B)
|
||||
pod2 := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "pod-b", Namespace: "default"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "test", Image: "nginx"}},
|
||||
},
|
||||
}
|
||||
_, err = client.CoreV1().Pods("default").Create(context.TODO(), pod2, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Second pod creation failed: %v", err)
|
||||
}
|
||||
_ = client.CoreV1().Pods("default").Delete(context.TODO(), pod2.Name, metav1.DeleteOptions{})
|
||||
|
||||
// Assertions
|
||||
// If the connection pool was NOT isolated by IP, the second request would reuse
|
||||
// the idle connection to Server A.
|
||||
// With the fix, it must correctly connect to Server B.
|
||||
if atomic.LoadInt32(&serverBCalls) != 1 {
|
||||
t.Errorf("Expected 1 call to Server B (New Pod), got %d. The request was incorrectly routed/stuck!", serverBCalls)
|
||||
}
|
||||
if atomic.LoadInt32(&serverACalls) != 1 {
|
||||
t.Errorf("Expected exactly 1 call to Server A (Old Pod), got %d. Stale connection reuse detected!", serverACalls)
|
||||
}
|
||||
}
|
||||
|
|
@ -31,6 +31,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
admissionv1 "k8s.io/api/admission/v1"
|
||||
"k8s.io/api/admission/v1beta1"
|
||||
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
|
@ -42,6 +43,7 @@ import (
|
|||
"k8s.io/kubernetes/cmd/kube-apiserver/app"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -359,3 +361,227 @@ func (c *connectionTrackingListener) Close() error {
|
|||
func (c *connectionTrackingListener) Addr() net.Addr {
|
||||
return c.delegate.Addr()
|
||||
}
|
||||
|
||||
// TestWebhookLoadBalanceAcrossEndpoints ensures that the webhook client distributes sequential requests
|
||||
// across different endpoint IPs behind a service when using a ServiceReference.
|
||||
// Prior to the host-rewrite pool isolation fix, all sequential requests would reuse the first established
|
||||
// TCP connection from the single shared pool, completely starving other endpoints.
|
||||
func TestWebhookLoadBalanceAcrossEndpoints(t *testing.T) {
|
||||
roots := x509.NewCertPool()
|
||||
if !roots.AppendCertsFromPEM(localhostCert) {
|
||||
t.Fatal("Failed to append Cert from PEM")
|
||||
}
|
||||
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to build cert with error: %+v", err)
|
||||
}
|
||||
|
||||
// 1. Start Server A
|
||||
var serverACalls int64
|
||||
serverAListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
serverA := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt64(&serverACalls, 1)
|
||||
defer func() { _ = r.Body.Close() }()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var review struct {
|
||||
Request struct {
|
||||
UID string `json:"uid"`
|
||||
} `json:"request"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &review); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "admission.k8s.io/v1",
|
||||
Kind: "AdmissionReview",
|
||||
},
|
||||
Response: &admissionv1.AdmissionResponse{
|
||||
UID: types.UID(review.Request.UID),
|
||||
Allowed: true,
|
||||
},
|
||||
})
|
||||
}),
|
||||
TLSConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
},
|
||||
}
|
||||
go func() {
|
||||
_ = serverA.ServeTLS(serverAListener, "", "")
|
||||
}()
|
||||
defer func() { _ = serverA.Close() }()
|
||||
|
||||
// 2. Start Server B
|
||||
var serverBCalls int64
|
||||
serverBListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
serverB := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt64(&serverBCalls, 1)
|
||||
defer func() { _ = r.Body.Close() }()
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var review struct {
|
||||
Request struct {
|
||||
UID string `json:"uid"`
|
||||
} `json:"request"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &review); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(&admissionv1.AdmissionReview{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "admission.k8s.io/v1",
|
||||
Kind: "AdmissionReview",
|
||||
},
|
||||
Response: &admissionv1.AdmissionResponse{
|
||||
UID: types.UID(review.Request.UID),
|
||||
Allowed: true,
|
||||
},
|
||||
})
|
||||
}),
|
||||
TLSConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
},
|
||||
}
|
||||
go func() {
|
||||
_ = serverB.ServeTLS(serverBListener, "", "")
|
||||
}()
|
||||
defer func() { _ = serverB.Close() }()
|
||||
|
||||
// Parse URLs to extract target hosts (IP:Port)
|
||||
urlA, _ := url.Parse("https://" + serverAListener.Addr().String())
|
||||
urlB, _ := url.Parse("https://" + serverBListener.Addr().String())
|
||||
|
||||
// Set up a round-robin service resolver that returns alternate endpoints for each resolve call
|
||||
var resolveIndex int64
|
||||
resolver := funcServiceResolver(func(namespace, name string, port int32) (*url.URL, error) {
|
||||
idx := atomic.AddInt64(&resolveIndex, 1)
|
||||
if idx%2 == 0 {
|
||||
return urlA, nil
|
||||
}
|
||||
return urlB, nil
|
||||
})
|
||||
|
||||
t.Cleanup(app.SetServiceResolverForTests(resolver))
|
||||
|
||||
// 3. Start Test API Server
|
||||
s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
}, framework.SharedEtcd())
|
||||
defer s.TearDownFn()
|
||||
|
||||
clientConfig := rest.CopyConfig(s.ClientConfig)
|
||||
client, err := clientset.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error creating client: %v", err)
|
||||
}
|
||||
|
||||
// 4. Register a Validating Webhook with ServiceReference
|
||||
fail := admissionregistrationv1.Fail
|
||||
sideEffectsNone := admissionregistrationv1.SideEffectClassNone
|
||||
webhookConfig, err := client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Create(context.TODO(), &admissionregistrationv1.ValidatingWebhookConfiguration{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "load-balance-endpoints.integration.test"},
|
||||
Webhooks: []admissionregistrationv1.ValidatingWebhook{{
|
||||
Name: "load-balance-endpoints.integration.test",
|
||||
ClientConfig: admissionregistrationv1.WebhookClientConfig{
|
||||
CABundle: localhostCert,
|
||||
Service: &admissionregistrationv1.ServiceReference{
|
||||
Namespace: "test",
|
||||
Name: "webhook",
|
||||
Port: ptr.To[int32](443),
|
||||
},
|
||||
},
|
||||
Rules: []admissionregistrationv1.RuleWithOperations{{
|
||||
Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create},
|
||||
Rule: admissionregistrationv1.Rule{
|
||||
APIGroups: []string{""},
|
||||
APIVersions: []string{"v1"},
|
||||
Resources: []string{"pods"},
|
||||
},
|
||||
}},
|
||||
FailurePolicy: &fail,
|
||||
SideEffects: &sideEffectsNone,
|
||||
AdmissionReviewVersions: []string{"v1"},
|
||||
}},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create validating webhook config: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Delete(context.TODO(), webhookConfig.Name, metav1.DeleteOptions{})
|
||||
}()
|
||||
|
||||
// 5. Warm up the webhook until it is ready/active
|
||||
podFixture := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "warmup-pod", Namespace: "default"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "test", Image: "nginx"}},
|
||||
},
|
||||
}
|
||||
err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
_, err = client.CoreV1().Pods("default").Create(ctx, podFixture, metav1.CreateOptions{})
|
||||
if err == nil {
|
||||
_ = client.CoreV1().Pods("default").Delete(ctx, podFixture.Name, metav1.DeleteOptions{})
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Webhook failed to become active: %v", err)
|
||||
}
|
||||
|
||||
// Reset counts
|
||||
atomic.StoreInt64(&serverACalls, 0)
|
||||
atomic.StoreInt64(&serverBCalls, 0)
|
||||
|
||||
// 6. Send 100 sequential requests to trigger validating webhook
|
||||
for i := range 100 {
|
||||
pod := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("test-pod-%d", i), Namespace: "default"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "test", Image: "nginx"}},
|
||||
},
|
||||
}
|
||||
_, err = client.CoreV1().Pods("default").Create(context.TODO(), pod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Request %d failed: %v", i, err)
|
||||
}
|
||||
_ = client.CoreV1().Pods("default").Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
// 7. Assertions
|
||||
callsA := atomic.LoadInt64(&serverACalls)
|
||||
callsB := atomic.LoadInt64(&serverBCalls)
|
||||
|
||||
t.Logf("Sequential requests distribution: ServerA = %d calls, ServerB = %d calls", callsA, callsB)
|
||||
|
||||
// Both servers MUST receive a balanced portion of calls under the pool isolation fix.
|
||||
// We assert a minimum 70-30 distribution split to verify dynamic load-balancing.
|
||||
if callsA < 30 || callsB < 30 {
|
||||
t.Errorf("Expected balanced load-balancing across both servers (minimum 70-30 split), but got ServerA = %d calls, ServerB = %d calls. Stale connection pool reuse or starvation detected!", callsA, callsB)
|
||||
}
|
||||
}
|
||||
|
||||
type funcServiceResolver func(namespace, name string, port int32) (*url.URL, error)
|
||||
|
||||
func (f funcServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
|
||||
return f(namespace, name, port)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue