fix missing involvedObject.apiVersion in event

This commit is contained in:
novahe 2025-10-21 22:58:14 +08:00
parent fb7774f22a
commit 9d48e7d2d4
10 changed files with 115 additions and 35 deletions

View file

@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
@ -236,10 +235,10 @@ func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfig
s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, kubeProxy)
s.NodeRef = &v1.ObjectReference{
Kind: "Node",
Name: s.NodeName,
UID: types.UID(s.NodeName),
Namespace: "",
APIVersion: "v1",
Kind: "Node",
Name: s.NodeName,
Namespace: "",
}
if len(config.HealthzBindAddress) > 0 {

View file

@ -28,7 +28,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
kubefeatures "k8s.io/kubernetes/pkg/features"
@ -320,9 +319,9 @@ func (cm *containerManagerImpl) validateNodeAllocatable() error {
// Using ObjectReference for events as the node maybe not cached; refer to #42701 for detail.
func nodeRefFromNode(nodeName string) *v1.ObjectReference {
return &v1.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
APIVersion: "v1",
Kind: "Node",
Name: nodeName,
Namespace: "",
}
}

View file

@ -402,6 +402,54 @@ func getEphemeralStorageResourceList(storage string) v1.ResourceList {
return res
}
func TestNodeRefFromNode(t *testing.T) {
testCases := []struct {
name string
nodeName string
expected *v1.ObjectReference
}{
{
name: "normal node name",
nodeName: "test-node",
expected: &v1.ObjectReference{
APIVersion: "v1",
Kind: "Node",
Name: "test-node",
Namespace: "",
},
},
{
name: "empty node name",
nodeName: "",
expected: &v1.ObjectReference{
APIVersion: "v1",
Kind: "Node",
Name: "",
UID: "",
Namespace: "",
},
},
{
name: "node name with special characters",
nodeName: "test-node-123.domain.local",
expected: &v1.ObjectReference{
APIVersion: "v1",
Kind: "Node",
Name: "test-node-123.domain.local",
Namespace: "",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := nodeRefFromNode(tc.nodeName)
assert.Equal(t, tc.expected, result, "test case %q failed", tc.name)
})
}
}
func TestGetCgroupConfig(t *testing.T) {
cases := []struct {
name string

View file

@ -548,10 +548,10 @@ func NewMainKubelet(ctx context.Context,
// construct a node reference used for events
nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: string(nodeName),
UID: types.UID(nodeName),
Namespace: "",
APIVersion: "v1",
Kind: "Node",
Name: string(nodeName),
Namespace: "",
}
oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder)

View file

@ -136,10 +136,10 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
if hcs.recorder != nil {
hcs.recorder.Eventf(
&v1.ObjectReference{
Kind: "Service",
Namespace: nsn.Namespace,
Name: nsn.Name,
UID: types.UID(nsn.String()),
APIVersion: "v1",
Kind: "Service",
Namespace: nsn.Namespace,
Name: nsn.Name,
}, nil, api.EventTypeWarning, "FailedToStartServiceHealthcheck", "Listen", msg)
}
klog.ErrorS(err, "Failed to start healthcheck", "node", hcs.nodeName, "service", nsn, "port", port)

View file

@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/events"
@ -76,10 +75,10 @@ func NewHollowProxy(
Broadcaster: broadcaster,
Recorder: recorder,
NodeRef: &v1.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
APIVersion: "v1",
Kind: "Node",
Name: nodeName,
Namespace: "",
},
},
}

View file

@ -163,10 +163,11 @@ func (c *CloudNodeLifecycleController) MonitorNodes(ctx context.Context) {
klog.V(2).Infof("deleting node since it is no longer present in cloud provider: %s", node.Name)
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.UID),
Namespace: "",
APIVersion: "v1",
Kind: "Node",
Name: node.Name,
UID: node.UID,
Namespace: "",
}
c.recorder.Eventf(ref, v1.EventTypeNormal, deleteNodeEvent,

View file

@ -909,8 +909,14 @@ func Test_NodesShutdown(t *testing.T) {
nodeMonitorPeriod: 1 * time.Second,
}
w := eventBroadcaster.StartStructuredLogging(0)
defer w.Stop()
e := eventBroadcaster.StartEventWatcher(func(e *v1.Event) {
loggerV := klog.FromContext(t.Context()).V(0)
loggerV.Info("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
if e.InvolvedObject.APIVersion == "" {
t.Fatalf("event involvedObject.apiVersion is empty")
}
})
defer e.Stop()
cloudNodeLifecycleController.MonitorNodes(ctx)
updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, testcase.existingNode.Name, metav1.GetOptions{})

View file

@ -307,10 +307,11 @@ func (rc *RouteController) reconcile(ctx context.Context, nodes []*v1.Node, rout
if rc.recorder != nil {
rc.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: string(nodeName),
UID: types.UID(nodeName),
Namespace: "",
APIVersion: "v1",
Kind: "Node",
Name: string(nodeName),
UID: node.UID,
Namespace: "",
}, v1.EventTypeWarning, "FailedToCreateRoute", msg)
klog.V(4).Info(msg)
return err

View file

@ -27,7 +27,9 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
fakecloud "k8s.io/cloud-provider/fake"
nodeutil "k8s.io/component-helpers/node/util"
@ -93,6 +95,7 @@ func TestReconcile(t *testing.T) {
node3 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-3", UID: "03"}, Spec: v1.NodeSpec{PodCIDR: "10.120.0.0/24", PodCIDRs: []string{"10.120.0.0/24", "a00:100::/24"}}, Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "10.0.3.1"}}}}
node4 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-4", UID: "04"}, Spec: v1.NodeSpec{PodCIDR: "10.120.1.0/24", PodCIDRs: []string{"10.120.1.0/24", "a00:200::/24"}}, Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "10.0.4.1"}}}}
nodeDuplicateCIDR := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-4", UID: "04"}, Spec: v1.NodeSpec{PodCIDR: "10.120.1.0/24", PodCIDRs: []string{"10.120.1.0/24", "10.120.1.0/24"}}, Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "10.0.4.1"}}}}
testCases := []struct {
description string
@ -102,6 +105,7 @@ func TestReconcile(t *testing.T) {
expectedNetworkUnavailable []bool
clientset *fake.Clientset
dualStack bool
expectError bool
}{
{
description: "routes have no TargetNodeAddresses at the beginning",
@ -414,6 +418,17 @@ func TestReconcile(t *testing.T) {
expectedNetworkUnavailable: []bool{true, true},
clientset: fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{node1, node2}}),
},
{
description: "duplicate pod cidr",
nodes: []*v1.Node{
&nodeDuplicateCIDR,
},
initialRoutes: []*cloudprovider.Route{},
expectedRoutes: []*cloudprovider.Route{},
expectedNetworkUnavailable: []bool{true, false},
expectError: true,
clientset: fake.NewClientset(&v1.NodeList{Items: []v1.Node{nodeDuplicateCIDR}}),
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
@ -439,6 +454,16 @@ func TestReconcile(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(testCase.clientset, 0)
rc := New(routes, testCase.clientset, informerFactory.Core().V1().Nodes(), cluster, cidrs)
recorder := record.NewBroadcaster(record.WithContext(ctx))
rc.recorder = recorder.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"})
e := recorder.StartEventWatcher(func(e *v1.Event) {
if e.InvolvedObject.APIVersion == "" {
t.Fatalf("event involvedObject.apiVersion is empty")
}
})
defer e.Stop()
rc.nodeListerSynced = alwaysReady
require.NoError(t, rc.reconcile(ctx, testCase.nodes, testCase.initialRoutes), "failed to reconcile")
for _, action := range testCase.clientset.Actions() {
@ -476,8 +501,10 @@ func TestReconcile(t *testing.T) {
break poll
}
case <-timeoutChan:
t.Errorf("rc.reconcile() err is %v,\nfound routes:\n%v\nexpected routes:\n%v\n",
err, flatten(finalRoutes), flatten(testCase.expectedRoutes))
if !testCase.expectError {
t.Errorf("rc.reconcile() err is %v,\nfound routes:\n%v\nexpected routes:\n%v\n",
err, flatten(finalRoutes), flatten(testCase.expectedRoutes))
}
break poll
}
}