Merge pull request #137065 from richabanker/mvp-metrics2

Add more metrics for Mixed Version Proxy
This commit is contained in:
Kubernetes Prow Robot 2026-02-26 01:58:29 +05:30 committed by GitHub
commit 070823cce0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 182 additions and 34 deletions

View file

@ -30,6 +30,19 @@ const (
group = "group"
version = "version"
resource = "resource"
errorType = "type"
// ProxyErrorEndpointResolution indicates a failure to resolve the network address of a peer apiserver.
ProxyErrorEndpointResolution = "endpoint_resolution"
// ProxyErrorTransport indicates a failure to build the proxy transport for the request.
ProxyErrorTransport = "proxy_transport"
// DiscoveryErrorLeaseList indicates a failure to list apiserver identity leases.
DiscoveryErrorLeaseList = "lease_list"
// DiscoveryErrorHostPortResolution indicates a failure to resolve host/port from an identity lease.
DiscoveryErrorHostPortResolution = "hostport_resolution"
// DiscoveryErrorFetch indicates a failure to fetch discovery document from a peer.
DiscoveryErrorFetch = "fetch_discovery"
)
var registerMetricsOnce sync.Once
@ -45,11 +58,35 @@ var (
},
[]string{statuscode, group, version, resource},
)
// peerProxyErrorsTotal counts the number of errors encountered while proxying requests to a peer kube-apiserver.
peerProxyErrorsTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: subsystem,
Name: "peer_proxy_errors_total",
Help: "Total number of errors encountered while proxying requests to a peer kube apiserver",
StabilityLevel: metrics.ALPHA,
},
[]string{errorType, group, version, resource},
)
// peerDiscoverySyncErrorsTotal counts the number of errors encountered while syncing discovery information from a peer kube-apiserver.
peerDiscoverySyncErrorsTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: subsystem,
Name: "peer_discovery_sync_errors_total",
Help: "Total number of errors encountered while syncing discovery information from a peer kube-apiserver",
StabilityLevel: metrics.ALPHA,
},
[]string{errorType},
)
)
func Register() {
registerMetricsOnce.Do(func() {
legacyregistry.MustRegister(peerProxiedRequestsTotal)
legacyregistry.MustRegister(peerProxyErrorsTotal)
legacyregistry.MustRegister(peerDiscoverySyncErrorsTotal)
})
}
@ -62,3 +99,13 @@ func Reset() {
func IncPeerProxiedRequest(ctx context.Context, status, g, v, r string) {
peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status, g, v, r).Add(1)
}
// IncPeerProxyError increments the # of errors encountered during peer proxying
func IncPeerProxyError(ctx context.Context, e, g, v, r string) {
peerProxyErrorsTotal.WithContext(ctx).WithLabelValues(e, g, v, r).Add(1)
}
// IncPeerDiscoverySyncError increments the # of errors encountered during peer discovery sync
func IncPeerDiscoverySyncError(ctx context.Context, e string) {
peerDiscoverySyncErrorsTotal.WithContext(ctx).WithLabelValues(e).Add(1)
}

View file

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user"
peerproxymetrics "k8s.io/apiserver/pkg/util/peerproxy/metrics"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
@ -94,6 +95,7 @@ func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error {
// Rebuild the peer discovery cache from available leases.
leases, err := h.apiserverIdentityInformer.Lister().List(h.identityLeaseLabelSelector)
if err != nil {
peerproxymetrics.IncPeerDiscoverySyncError(ctx, peerproxymetrics.DiscoveryErrorLeaseList)
utilruntime.HandleError(err)
return err
}
@ -125,6 +127,7 @@ func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error {
func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string) (PeerDiscoveryCacheEntry, error) {
hostport, err := h.hostportInfo(serverID)
if err != nil {
peerproxymetrics.IncPeerDiscoverySyncError(ctx, peerproxymetrics.DiscoveryErrorHostPortResolution)
return PeerDiscoveryCacheEntry{}, fmt.Errorf("failed to get host port info from identity lease for server %s: %w", serverID, err)
}
@ -142,6 +145,7 @@ func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID st
for _, path := range discoveryPaths {
discoveryResponse, discoveryErr = h.aggregateDiscovery(ctx, path, hostport)
if discoveryErr != nil {
peerproxymetrics.IncPeerDiscoverySyncError(ctx, peerproxymetrics.DiscoveryErrorFetch)
klog.ErrorS(discoveryErr, "error querying discovery endpoint for serverID", "path", path, "serverID", serverID)
continue
}

View file

@ -22,6 +22,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
@ -36,16 +37,17 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
v1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
peerproxymetrics "k8s.io/apiserver/pkg/util/peerproxy/metrics"
)
func TestRunPeerDiscoveryCacheSync(t *testing.T) {
localServerID := "local-server"
testCases := []struct {
desc string
leases []*v1.Lease
@ -136,31 +138,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
leaseInformer := fakeInformerFactory.Coordination().V1().Leases()
fakeReconciler := newFakeReconciler()
negotiatedSerializer := serializer.NewCodecFactory(runtime.NewScheme())
loopbackConfig := &rest.Config{}
proxyConfig := &transport.Config{
TLS: transport.TLSConfig{Insecure: true},
}
h, err := NewPeerProxyHandler(
localServerID,
tt.labelSelectorString,
leaseInformer,
fakeReconciler,
negotiatedSerializer,
loopbackConfig,
proxyConfig,
)
if err != nil {
t.Fatalf("failed to create handler: %v", err)
}
h, fakeReconciler, leaseInformer, fakeClient, fakeInformerFactory := setupPeerProxyHandler(t, tt.labelSelectorString)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -170,13 +148,13 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}
if err = leaseInformer.Informer().GetIndexer().Add(lease); err != nil {
if err = leaseInformer.GetIndexer().Add(lease); err != nil {
t.Fatalf("failed to create lease: %v", err)
}
}
go fakeInformerFactory.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), leaseInformer.Informer().HasSynced)
cache.WaitForCacheSync(ctx.Done(), leaseInformer.HasSynced)
// Create test servers based on leases
testServers := make(map[string]*httptest.Server)
@ -199,7 +177,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
for _, lease := range tt.leases {
initialCache[lease.Name] = makePeerDiscoveryCacheEntry("testgroup", "v1", "testresources")
}
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) {
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
@ -219,7 +197,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
if err != nil {
t.Fatalf("failed to update lease: %v", err)
}
if err = leaseInformer.Informer().GetIndexer().Update(updatedLease); err != nil {
if err = leaseInformer.GetIndexer().Update(updatedLease); err != nil {
t.Fatalf("failed to update lease: %v", err)
}
}
@ -227,7 +205,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
// Delete leases if indicated.
if len(tt.deletedLeaseNames) > 0 {
for _, leaseName := range tt.deletedLeaseNames {
lease, exists, err := leaseInformer.Informer().GetIndexer().GetByKey("default/" + leaseName)
lease, exists, err := leaseInformer.GetIndexer().GetByKey("default/" + leaseName)
if err != nil {
t.Fatalf("failed to get lease from indexer: %v", err)
}
@ -239,7 +217,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
if err != nil {
t.Fatalf("failed to delete lease: %v", err)
}
if err = leaseInformer.Informer().GetIndexer().Delete(deletedLease); err != nil {
if err = leaseInformer.GetIndexer().Delete(deletedLease); err != nil {
t.Fatalf("failed to delete lease: %v", err)
}
@ -264,6 +242,118 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
}
}
func TestPeerDiscoveryMetrics(t *testing.T) {
testCases := []struct {
desc string
leases []*v1.Lease
peerServerConfig map[string]http.HandlerFunc
wantMetrics string
}{
{
desc: "hostport resolution error",
leases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-resolution-error",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-error")},
},
},
// No peer server configured means no endpoint will be registered in the reconciler.
// This should cause GetEndpoint to fail, triggering the "hostport_resolution" error.
peerServerConfig: nil,
wantMetrics: `
# HELP apiserver_peer_discovery_sync_errors_total [ALPHA] Total number of errors encountered while syncing discovery information from a peer kube-apiserver
# TYPE apiserver_peer_discovery_sync_errors_total counter
apiserver_peer_discovery_sync_errors_total{type="hostport_resolution"} 1
`,
},
{
desc: "fetch discovery error",
leases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-fetch-error",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-fetch-error")},
},
},
peerServerConfig: map[string]http.HandlerFunc{
"remote-fetch-error": func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
},
},
wantMetrics: `
# HELP apiserver_peer_discovery_sync_errors_total [ALPHA] Total number of errors encountered while syncing discovery information from a peer kube-apiserver
# TYPE apiserver_peer_discovery_sync_errors_total counter
apiserver_peer_discovery_sync_errors_total{type="fetch_discovery"} 2
`,
},
}
peerproxymetrics.Register()
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
defer peerproxymetrics.Reset()
h, fakeReconciler, leaseInformer, _, _ := setupPeerProxyHandler(t, "apiserver-identity=testserver")
for _, lease := range tt.leases {
if err := leaseInformer.GetIndexer().Add(lease); err != nil {
t.Fatalf("failed to create lease: %v", err)
}
}
for leaseName, handler := range tt.peerServerConfig {
if handler == nil {
handler = func(w http.ResponseWriter, r *http.Request) {}
}
ts := httptest.NewServer(handler)
defer ts.Close()
fakeReconciler.setEndpoint(leaseName, ts.URL[7:])
}
// Directly call syncPeerDiscoveryCache
// We don't care about the return error of syncPeerDiscoveryCache for this test,
// we only care that metrics are incremented.
_ = h.syncPeerDiscoveryCache(context.Background())
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.wantMetrics), "apiserver_peer_discovery_sync_errors_total"); err != nil {
t.Error(err)
}
})
}
}
func setupPeerProxyHandler(t *testing.T, labelSelector string) (*peerProxyHandler, *fakeReconciler, cache.SharedIndexInformer, *fake.Clientset, informers.SharedInformerFactory) {
localServerID := "local-server"
fakeClient := fake.NewSimpleClientset()
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
leaseInformer := fakeInformerFactory.Coordination().V1().Leases()
fakeReconciler := newFakeReconciler()
negotiatedSerializer := serializer.NewCodecFactory(runtime.NewScheme())
loopbackConfig := &rest.Config{}
proxyConfig := &transport.Config{
TLS: transport.TLSConfig{Insecure: true},
}
h, err := NewPeerProxyHandler(
localServerID,
labelSelector,
leaseInformer,
fakeReconciler,
negotiatedSerializer,
loopbackConfig,
proxyConfig,
)
if err != nil {
t.Fatalf("failed to create handler: %v", err)
}
return h, fakeReconciler, leaseInformer.Informer(), fakeClient, fakeInformerFactory
}
// newTestTLSServer creates a new httptest.NewTLSServer that serves discovery endpoints.
func newTestTLSServer(t *testing.T) *httptest.Server {
return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

View file

@ -189,6 +189,7 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
peerEndpoints, err := h.resolveServingLocation(peerServerIDs)
if err != nil {
metrics.IncPeerProxyError(ctx, metrics.ProxyErrorEndpointResolution, gvr.Group, gvr.Version, gvr.Resource)
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr)
responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r)
@ -252,6 +253,7 @@ func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request,
proxyRoundTripper, err := h.buildProxyRoundtripper(req)
if err != nil {
metrics.IncPeerProxyError(req.Context(), metrics.ProxyErrorTransport, gvr.Group, gvr.Version, gvr.Resource)
klog.Errorf("failed to build proxy round tripper: %v", err)
return
}

View file

@ -125,6 +125,11 @@ func TestPeerProxy(t *testing.T) {
},
},
wantStatus: http.StatusServiceUnavailable,
wantMetricsData: `
# HELP apiserver_peer_proxy_errors_total [ALPHA] Total number of errors encountered while proxying requests to a peer kube apiserver
# TYPE apiserver_peer_proxy_errors_total counter
apiserver_peer_proxy_errors_total{group="",resource="bar",type="endpoint_resolution",version="foo"} 1
`,
},
{
desc: "503 unreachable peer bind address",
@ -230,7 +235,7 @@ func TestPeerProxy(t *testing.T) {
// compare metric
if tt.wantMetricsData != "" {
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.wantMetricsData), []string{"apiserver_rerouted_request_total"}...); err != nil {
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.wantMetricsData), []string{"apiserver_rerouted_request_total", "apiserver_peer_proxy_errors_total"}...); err != nil {
t.Fatal(err)
}
}