diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go index 8ff886a0500..9c7c5e04109 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go @@ -30,6 +30,19 @@ const ( group = "group" version = "version" resource = "resource" + errorType = "type" + + // ProxyErrorEndpointResolution indicates a failure to resolve the network address of a peer apiserver. + ProxyErrorEndpointResolution = "endpoint_resolution" + // ProxyErrorTransport indicates a failure to build the proxy transport for the request. + ProxyErrorTransport = "proxy_transport" + + // DiscoveryErrorLeaseList indicates a failure to list apiserver identity leases. + DiscoveryErrorLeaseList = "lease_list" + // DiscoveryErrorHostPortResolution indicates a failure to resolve host/port from an identity lease. + DiscoveryErrorHostPortResolution = "hostport_resolution" + // DiscoveryErrorFetch indicates a failure to fetch discovery document from a peer. + DiscoveryErrorFetch = "fetch_discovery" ) var registerMetricsOnce sync.Once @@ -45,11 +58,35 @@ var ( }, []string{statuscode, group, version, resource}, ) + + // peerProxyErrorsTotal counts the number of errors encountered while proxying requests to a peer kube-apiserver. + peerProxyErrorsTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: subsystem, + Name: "peer_proxy_errors_total", + Help: "Total number of errors encountered while proxying requests to a peer kube apiserver", + StabilityLevel: metrics.ALPHA, + }, + []string{errorType, group, version, resource}, + ) + + // peerDiscoverySyncErrorsTotal counts the number of errors encountered while syncing discovery information from a peer kube-apiserver. + peerDiscoverySyncErrorsTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: subsystem, + Name: "peer_discovery_sync_errors_total", + Help: "Total number of errors encountered while syncing discovery information from a peer kube-apiserver", + StabilityLevel: metrics.ALPHA, + }, + []string{errorType}, + ) ) func Register() { registerMetricsOnce.Do(func() { legacyregistry.MustRegister(peerProxiedRequestsTotal) + legacyregistry.MustRegister(peerProxyErrorsTotal) + legacyregistry.MustRegister(peerDiscoverySyncErrorsTotal) }) } @@ -62,3 +99,13 @@ func Reset() { func IncPeerProxiedRequest(ctx context.Context, status, g, v, r string) { peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status, g, v, r).Add(1) } + +// IncPeerProxyError increments the # of errors encountered during peer proxying +func IncPeerProxyError(ctx context.Context, e, g, v, r string) { + peerProxyErrorsTotal.WithContext(ctx).WithLabelValues(e, g, v, r).Add(1) +} + +// IncPeerDiscoverySyncError increments the # of errors encountered during peer discovery sync +func IncPeerDiscoverySyncError(ctx context.Context, e string) { + peerDiscoverySyncErrorsTotal.WithContext(ctx).WithLabelValues(e).Add(1) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go index 80997bf554f..693d5faca23 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/authentication/user" + peerproxymetrics "k8s.io/apiserver/pkg/util/peerproxy/metrics" "k8s.io/client-go/discovery" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -94,6 +95,7 @@ func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error { // Rebuild the peer discovery cache from available leases. leases, err := h.apiserverIdentityInformer.Lister().List(h.identityLeaseLabelSelector) if err != nil { + peerproxymetrics.IncPeerDiscoverySyncError(ctx, peerproxymetrics.DiscoveryErrorLeaseList) utilruntime.HandleError(err) return err } @@ -125,6 +127,7 @@ func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error { func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string) (PeerDiscoveryCacheEntry, error) { hostport, err := h.hostportInfo(serverID) if err != nil { + peerproxymetrics.IncPeerDiscoverySyncError(ctx, peerproxymetrics.DiscoveryErrorHostPortResolution) return PeerDiscoveryCacheEntry{}, fmt.Errorf("failed to get host port info from identity lease for server %s: %w", serverID, err) } @@ -142,6 +145,7 @@ func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID st for _, path := range discoveryPaths { discoveryResponse, discoveryErr = h.aggregateDiscovery(ctx, path, hostport) if discoveryErr != nil { + peerproxymetrics.IncPeerDiscoverySyncError(ctx, peerproxymetrics.DiscoveryErrorFetch) klog.ErrorS(discoveryErr, "error querying discovery endpoint for serverID", "path", path, "serverID", serverID) continue } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go index a275e112ff6..d1cfc7b016d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -36,16 +37,17 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/transport" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" apidiscoveryv2 "k8s.io/api/apidiscovery/v2" v1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + peerproxymetrics "k8s.io/apiserver/pkg/util/peerproxy/metrics" ) func TestRunPeerDiscoveryCacheSync(t *testing.T) { - localServerID := "local-server" - testCases := []struct { desc string leases []*v1.Lease @@ -136,31 +138,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { for _, tt := range testCases { t.Run(tt.desc, func(t *testing.T) { - fakeClient := fake.NewSimpleClientset() - fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) - leaseInformer := fakeInformerFactory.Coordination().V1().Leases() - - fakeReconciler := newFakeReconciler() - - negotiatedSerializer := serializer.NewCodecFactory(runtime.NewScheme()) - loopbackConfig := &rest.Config{} - proxyConfig := &transport.Config{ - TLS: transport.TLSConfig{Insecure: true}, - } - - h, err := NewPeerProxyHandler( - localServerID, - tt.labelSelectorString, - leaseInformer, - fakeReconciler, - negotiatedSerializer, - loopbackConfig, - proxyConfig, - ) - if err != nil { - t.Fatalf("failed to create handler: %v", err) - } - + h, fakeReconciler, leaseInformer, fakeClient, fakeInformerFactory := setupPeerProxyHandler(t, tt.labelSelectorString) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -170,13 +148,13 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { if err != nil { t.Fatalf("failed to create lease: %v", err) } - if err = leaseInformer.Informer().GetIndexer().Add(lease); err != nil { + if err = leaseInformer.GetIndexer().Add(lease); err != nil { t.Fatalf("failed to create lease: %v", err) } } go fakeInformerFactory.Start(ctx.Done()) - cache.WaitForCacheSync(ctx.Done(), leaseInformer.Informer().HasSynced) + cache.WaitForCacheSync(ctx.Done(), leaseInformer.HasSynced) // Create test servers based on leases testServers := make(map[string]*httptest.Server) @@ -199,7 +177,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { for _, lease := range tt.leases { initialCache[lease.Name] = makePeerDiscoveryCacheEntry("testgroup", "v1", "testresources") } - err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) { select { case <-ctx.Done(): return false, ctx.Err() @@ -219,7 +197,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { if err != nil { t.Fatalf("failed to update lease: %v", err) } - if err = leaseInformer.Informer().GetIndexer().Update(updatedLease); err != nil { + if err = leaseInformer.GetIndexer().Update(updatedLease); err != nil { t.Fatalf("failed to update lease: %v", err) } } @@ -227,7 +205,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { // Delete leases if indicated. if len(tt.deletedLeaseNames) > 0 { for _, leaseName := range tt.deletedLeaseNames { - lease, exists, err := leaseInformer.Informer().GetIndexer().GetByKey("default/" + leaseName) + lease, exists, err := leaseInformer.GetIndexer().GetByKey("default/" + leaseName) if err != nil { t.Fatalf("failed to get lease from indexer: %v", err) } @@ -239,7 +217,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { if err != nil { t.Fatalf("failed to delete lease: %v", err) } - if err = leaseInformer.Informer().GetIndexer().Delete(deletedLease); err != nil { + if err = leaseInformer.GetIndexer().Delete(deletedLease); err != nil { t.Fatalf("failed to delete lease: %v", err) } @@ -264,6 +242,118 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { } } +func TestPeerDiscoveryMetrics(t *testing.T) { + testCases := []struct { + desc string + leases []*v1.Lease + peerServerConfig map[string]http.HandlerFunc + wantMetrics string + }{ + { + desc: "hostport resolution error", + leases: []*v1.Lease{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-resolution-error", + Labels: map[string]string{"apiserver-identity": "testserver"}, + }, + Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-error")}, + }, + }, + // No peer server configured means no endpoint will be registered in the reconciler. + // This should cause GetEndpoint to fail, triggering the "hostport_resolution" error. + peerServerConfig: nil, + wantMetrics: ` + # HELP apiserver_peer_discovery_sync_errors_total [ALPHA] Total number of errors encountered while syncing discovery information from a peer kube-apiserver + # TYPE apiserver_peer_discovery_sync_errors_total counter + apiserver_peer_discovery_sync_errors_total{type="hostport_resolution"} 1 + `, + }, + { + desc: "fetch discovery error", + leases: []*v1.Lease{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-fetch-error", + Labels: map[string]string{"apiserver-identity": "testserver"}, + }, + Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-fetch-error")}, + }, + }, + peerServerConfig: map[string]http.HandlerFunc{ + "remote-fetch-error": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }, + }, + wantMetrics: ` + # HELP apiserver_peer_discovery_sync_errors_total [ALPHA] Total number of errors encountered while syncing discovery information from a peer kube-apiserver + # TYPE apiserver_peer_discovery_sync_errors_total counter + apiserver_peer_discovery_sync_errors_total{type="fetch_discovery"} 2 + `, + }, + } + + peerproxymetrics.Register() + for _, tt := range testCases { + t.Run(tt.desc, func(t *testing.T) { + defer peerproxymetrics.Reset() + h, fakeReconciler, leaseInformer, _, _ := setupPeerProxyHandler(t, "apiserver-identity=testserver") + + for _, lease := range tt.leases { + if err := leaseInformer.GetIndexer().Add(lease); err != nil { + t.Fatalf("failed to create lease: %v", err) + } + } + + for leaseName, handler := range tt.peerServerConfig { + if handler == nil { + handler = func(w http.ResponseWriter, r *http.Request) {} + } + ts := httptest.NewServer(handler) + defer ts.Close() + fakeReconciler.setEndpoint(leaseName, ts.URL[7:]) + } + + // Directly call syncPeerDiscoveryCache + // We don't care about the return error of syncPeerDiscoveryCache for this test, + // we only care that metrics are incremented. + _ = h.syncPeerDiscoveryCache(context.Background()) + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.wantMetrics), "apiserver_peer_discovery_sync_errors_total"); err != nil { + t.Error(err) + } + }) + } +} + +func setupPeerProxyHandler(t *testing.T, labelSelector string) (*peerProxyHandler, *fakeReconciler, cache.SharedIndexInformer, *fake.Clientset, informers.SharedInformerFactory) { + localServerID := "local-server" + fakeClient := fake.NewSimpleClientset() + fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + leaseInformer := fakeInformerFactory.Coordination().V1().Leases() + + fakeReconciler := newFakeReconciler() + negotiatedSerializer := serializer.NewCodecFactory(runtime.NewScheme()) + loopbackConfig := &rest.Config{} + proxyConfig := &transport.Config{ + TLS: transport.TLSConfig{Insecure: true}, + } + + h, err := NewPeerProxyHandler( + localServerID, + labelSelector, + leaseInformer, + fakeReconciler, + negotiatedSerializer, + loopbackConfig, + proxyConfig, + ) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + + return h, fakeReconciler, leaseInformer.Informer(), fakeClient, fakeInformerFactory +} + // newTestTLSServer creates a new httptest.NewTLSServer that serves discovery endpoints. func newTestTLSServer(t *testing.T) *httptest.Server { return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go index ace834b6ee5..ea85c205881 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go @@ -189,6 +189,7 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler { peerEndpoints, err := h.resolveServingLocation(peerServerIDs) if err != nil { + metrics.IncPeerProxyError(ctx, metrics.ProxyErrorEndpointResolution, gvr.Group, gvr.Version, gvr.Resource) gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version} klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr) responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r) @@ -252,6 +253,7 @@ func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, proxyRoundTripper, err := h.buildProxyRoundtripper(req) if err != nil { + metrics.IncPeerProxyError(req.Context(), metrics.ProxyErrorTransport, gvr.Group, gvr.Version, gvr.Resource) klog.Errorf("failed to build proxy round tripper: %v", err) return } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go index cd4417f9436..01f90f07ffb 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go @@ -125,6 +125,11 @@ func TestPeerProxy(t *testing.T) { }, }, wantStatus: http.StatusServiceUnavailable, + wantMetricsData: ` + # HELP apiserver_peer_proxy_errors_total [ALPHA] Total number of errors encountered while proxying requests to a peer kube apiserver + # TYPE apiserver_peer_proxy_errors_total counter + apiserver_peer_proxy_errors_total{group="",resource="bar",type="endpoint_resolution",version="foo"} 1 + `, }, { desc: "503 unreachable peer bind address", @@ -230,7 +235,7 @@ func TestPeerProxy(t *testing.T) { // compare metric if tt.wantMetricsData != "" { - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.wantMetricsData), []string{"apiserver_rerouted_request_total"}...); err != nil { + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.wantMetricsData), []string{"apiserver_rerouted_request_total", "apiserver_peer_proxy_errors_total"}...); err != nil { t.Fatal(err) } }