mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-08 16:30:57 -04:00
Plumb ctx down to sync() and syncStatus() methods
Use contextual logging in sync() and syncStatus() by passing ctx from the caller instead of context.Background(). Replace klog.Infof calls with logger.Info/Error from klog.FromContext(ctx). Update tests to use ktesting.NewTestContext. Signed-off-by: ChengHao Yang <17496418+tico88612@users.noreply.github.com>
This commit is contained in:
parent
92a8387751
commit
90912e6e10
2 changed files with 36 additions and 25 deletions
|
|
@ -100,16 +100,17 @@ type Controller struct {
|
|||
// Start will not return until the default ServiceCIDR exists or stopCh is closed.
|
||||
func (c *Controller) Start(ctx context.Context) {
|
||||
defer utilruntime.HandleCrashWithContext(ctx)
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx))
|
||||
c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
|
||||
c.eventBroadcaster.StartStructuredLogging(0)
|
||||
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
|
||||
|
||||
klog.Infof("Starting %s", controllerName)
|
||||
logger.Info("Starting", "controller", controllerName)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
klog.Infof("Shutting down %s", controllerName)
|
||||
logger.Info("Shutting down", "controller", controllerName)
|
||||
c.eventBroadcaster.Shutdown()
|
||||
}()
|
||||
|
||||
|
|
@ -121,40 +122,41 @@ func (c *Controller) Start(ctx context.Context) {
|
|||
// wait until first successfully sync
|
||||
// this blocks apiserver startup so poll with a short interval
|
||||
err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
|
||||
syncErr := c.sync()
|
||||
syncErr := c.sync(ctx)
|
||||
return syncErr == nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
klog.Infof("error initializing the default ServiceCIDR: %v", err)
|
||||
|
||||
logger.Error(err, "error initializing the default ServiceCIDR")
|
||||
}
|
||||
|
||||
// run the sync loop in the background with the defined interval
|
||||
go wait.UntilWithContext(ctx, func(ctx context.Context) {
|
||||
err := c.sync()
|
||||
err := c.sync(ctx)
|
||||
if err != nil {
|
||||
klog.Infof("error trying to sync the default ServiceCIDR: %v", err)
|
||||
logger.Error(err, "error trying to sync the default ServiceCIDR")
|
||||
}
|
||||
}, c.interval)
|
||||
}
|
||||
|
||||
func (c *Controller) sync() error {
|
||||
func (c *Controller) sync(ctx context.Context) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
// check if the default ServiceCIDR already exist
|
||||
serviceCIDR, err := c.serviceCIDRLister.Get(DefaultServiceCIDRName)
|
||||
// if exists
|
||||
if err == nil {
|
||||
// single to dual stack upgrade
|
||||
if len(c.cidrs) == 2 && len(serviceCIDR.Spec.CIDRs) == 1 && c.cidrs[0] == serviceCIDR.Spec.CIDRs[0] {
|
||||
klog.Infof("Updating default ServiceCIDR from single-stack (%v) to dual-stack (%v)", serviceCIDR.Spec.CIDRs, c.cidrs)
|
||||
logger.Info("Updating default ServiceCIDR from single-stack to dual-stack", "from", serviceCIDR.Spec.CIDRs, "to", c.cidrs)
|
||||
serviceCIDRcopy := serviceCIDR.DeepCopy()
|
||||
serviceCIDRcopy.Spec.CIDRs = c.cidrs
|
||||
_, err := c.client.NetworkingV1().ServiceCIDRs().Update(context.Background(), serviceCIDRcopy, metav1.UpdateOptions{})
|
||||
_, err := c.client.NetworkingV1().ServiceCIDRs().Update(ctx, serviceCIDRcopy, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Infof("The default ServiceCIDR can not be updated from %s to dual stack %v : %v", c.cidrs[0], c.cidrs, err)
|
||||
logger.Error(err, "The default ServiceCIDR can not be updated to dual stack", "from", c.cidrs[0], "to", c.cidrs)
|
||||
c.eventRecorder.Eventf(serviceCIDR, v1.EventTypeWarning, "KubernetesDefaultServiceCIDRError", "The default ServiceCIDR can not be upgraded from %s to dual stack %v : %v", c.cidrs[0], c.cidrs, err)
|
||||
}
|
||||
} else {
|
||||
c.syncStatus(serviceCIDR)
|
||||
c.syncStatus(ctx, serviceCIDR)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -165,7 +167,7 @@ func (c *Controller) sync() error {
|
|||
}
|
||||
|
||||
// default ServiceCIDR does not exist
|
||||
klog.Infof("Creating default ServiceCIDR with CIDRs: %v", c.cidrs)
|
||||
logger.Info("Creating default ServiceCIDR", "CIDRs", c.cidrs)
|
||||
serviceCIDR = &networkingapiv1.ServiceCIDR{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: DefaultServiceCIDRName,
|
||||
|
|
@ -174,20 +176,22 @@ func (c *Controller) sync() error {
|
|||
CIDRs: c.cidrs,
|
||||
},
|
||||
}
|
||||
serviceCIDR, err = c.client.NetworkingV1().ServiceCIDRs().Create(context.Background(), serviceCIDR, metav1.CreateOptions{})
|
||||
serviceCIDR, err = c.client.NetworkingV1().ServiceCIDRs().Create(ctx, serviceCIDR, metav1.CreateOptions{})
|
||||
if err != nil && !apierrors.IsAlreadyExists(err) {
|
||||
c.eventRecorder.Eventf(serviceCIDR, v1.EventTypeWarning, "KubernetesDefaultServiceCIDRError", "The default ServiceCIDR can not be created")
|
||||
return err
|
||||
}
|
||||
c.syncStatus(serviceCIDR)
|
||||
c.syncStatus(ctx, serviceCIDR)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) syncStatus(serviceCIDR *networkingapiv1.ServiceCIDR) {
|
||||
func (c *Controller) syncStatus(ctx context.Context, serviceCIDR *networkingapiv1.ServiceCIDR) {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
// don't sync the status of the ServiceCIDR if is being deleted,
|
||||
// deletion must be handled by the controller-manager
|
||||
if !serviceCIDR.GetDeletionTimestamp().IsZero() {
|
||||
klog.V(6).Infof("ServiceCIDR %s is being deleted, skipping status sync", serviceCIDR.Name)
|
||||
logger.V(6).Info("ServiceCIDR is being deleted, skipping status sync", "serviceCIDR", klog.KObj(serviceCIDR))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -199,7 +203,7 @@ func (c *Controller) syncStatus(serviceCIDR *networkingapiv1.ServiceCIDR) {
|
|||
// Handle inconsistent configuration
|
||||
if !sameConfig {
|
||||
if !c.reportedMismatchedCIDRs {
|
||||
klog.Infof("Inconsistent ServiceCIDR status for %s, controller configuration: %v, ServiceCIDR configuration: %v. Configure the flags to match current ServiceCIDR or manually delete it.", serviceCIDR.Name, c.cidrs, serviceCIDR.Spec.CIDRs)
|
||||
logger.Info("Inconsistent ServiceCIDR status", "serviceCIDR", klog.KObj(serviceCIDR), "controllerCIDRs", c.cidrs, "serviceCIDRs", serviceCIDR.Spec.CIDRs)
|
||||
c.eventRecorder.Eventf(serviceCIDR, v1.EventTypeWarning, "KubernetesDefaultServiceCIDRInconsistent", "The default ServiceCIDR %v does not match the controller flag configurations %s", serviceCIDR.Spec.CIDRs, c.cidrs)
|
||||
c.reportedMismatchedCIDRs = true
|
||||
}
|
||||
|
|
@ -216,18 +220,18 @@ func (c *Controller) syncStatus(serviceCIDR *networkingapiv1.ServiceCIDR) {
|
|||
// is the case, then it will require an intervention by the cluster administrator.
|
||||
case currentReadyCondition != nil && currentReadyCondition.Status == metav1.ConditionFalse:
|
||||
if !c.reportedNotReadyCondition {
|
||||
klog.InfoS("Default ServiceCIDR condition Ready is False, but controller configuration matches. Please validate your cluster's network configuration.", "serviceCIDR", klog.KObj(serviceCIDR), "status", currentReadyCondition.Status, "reason", currentReadyCondition.Reason, "message", currentReadyCondition.Message)
|
||||
logger.Info("Default ServiceCIDR condition Ready is False, but controller configuration matches. Please validate your cluster's network configuration.", "serviceCIDR", klog.KObj(serviceCIDR), "status", currentReadyCondition.Status, "reason", currentReadyCondition.Reason, "message", currentReadyCondition.Message)
|
||||
c.eventRecorder.Eventf(serviceCIDR, v1.EventTypeWarning, currentReadyCondition.Reason, "Configuration matches, but %s", currentReadyCondition.Message)
|
||||
c.reportedNotReadyCondition = true
|
||||
}
|
||||
|
||||
// Current Ready=True and config matches, nothing to do.
|
||||
case currentReadyCondition != nil && currentReadyCondition.Status == metav1.ConditionTrue:
|
||||
klog.V(6).Infof("ServiceCIDR %s is Ready and configuration matches. No status update needed.", serviceCIDR.Name)
|
||||
logger.V(6).Info("ServiceCIDR is Ready and configuration matches. No status update needed.", "serviceCIDR", klog.KObj(serviceCIDR))
|
||||
|
||||
// No condition set and ServiceCIDR matches this apiserver configuration, set condition to True
|
||||
case currentReadyCondition == nil || currentReadyCondition.Status == metav1.ConditionUnknown:
|
||||
klog.Infof("Setting default ServiceCIDR condition Ready to True")
|
||||
logger.Info("Setting default ServiceCIDR condition Ready to True")
|
||||
svcApplyStatus := networkingapiv1apply.ServiceCIDRStatus().WithConditions(
|
||||
metav1apply.Condition().
|
||||
WithType(networkingapiv1.ServiceCIDRConditionReady).
|
||||
|
|
@ -235,8 +239,8 @@ func (c *Controller) syncStatus(serviceCIDR *networkingapiv1.ServiceCIDR) {
|
|||
WithMessage("Kubernetes default Service CIDR is ready").
|
||||
WithLastTransitionTime(metav1.Now()))
|
||||
svcApply := networkingapiv1apply.ServiceCIDR(DefaultServiceCIDRName).WithStatus(svcApplyStatus)
|
||||
if _, errApply := c.client.NetworkingV1().ServiceCIDRs().ApplyStatus(context.Background(), svcApply, metav1.ApplyOptions{FieldManager: controllerName, Force: true}); errApply != nil {
|
||||
klog.Infof("error updating default ServiceCIDR status: %v", errApply)
|
||||
if _, errApply := c.client.NetworkingV1().ServiceCIDRs().ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: controllerName, Force: true}); errApply != nil {
|
||||
logger.Error(errApply, "error updating default ServiceCIDR status")
|
||||
c.eventRecorder.Eventf(serviceCIDR, v1.EventTypeWarning, "KubernetesDefaultServiceCIDRError", "The default ServiceCIDR Status can not be set to Ready=True")
|
||||
}
|
||||
default:
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import (
|
|||
"k8s.io/client-go/kubernetes/fake"
|
||||
k8stesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
|
|
@ -156,8 +157,12 @@ func TestControllerSync(t *testing.T) {
|
|||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
client, controller := newController(t, []string{defaultIPv4CIDR, defaultIPv6CIDR}, tc.cidrs...)
|
||||
controller.sync()
|
||||
err := controller.sync(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
expectAction(t, client.Actions(), tc.actions)
|
||||
})
|
||||
}
|
||||
|
|
@ -330,11 +335,13 @@ func TestControllerSyncConversions(t *testing.T) {
|
|||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
|
||||
// Initialize controller and client with the existing ServiceCIDR
|
||||
client, controller := newController(t, tc.controllerCIDRs, tc.existingCIDR)
|
||||
|
||||
// Call the syncStatus method directly
|
||||
err := controller.sync()
|
||||
err := controller.sync(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue