diff --git a/pkg/kubelet/config/apiserver.go b/pkg/kubelet/config/apiserver.go index 60c040299c4..c02cc9c8ff4 100644 --- a/pkg/kubelet/config/apiserver.go +++ b/pkg/kubelet/config/apiserver.go @@ -27,14 +27,13 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) // WaitForAPIServerSyncPeriod is the period between checks for the node list/watch initial sync const WaitForAPIServerSyncPeriod = 1 * time.Second // NewSourceApiserver creates a config source that watches and pulls from the apiserver. -func NewSourceApiserver(logger klog.Logger, c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) { +func NewSourceApiserver(logger klog.Logger, c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- sourceUpdate) { lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName))) // The Reflector responsible for watching pods at the apiserver should be run only after @@ -55,13 +54,13 @@ func NewSourceApiserver(logger klog.Logger, c clientset.Interface, nodeName type } // newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver. -func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) { +func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- sourceUpdate) { send := func(objs []interface{}) { var pods []*v1.Pod for _, o := range objs { pods = append(pods, o.(*v1.Pod)) } - updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource} + updates <- sourceUpdate{Pods: pods} } r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0) go r.Run(wait.NeverStop) diff --git a/pkg/kubelet/config/apiserver_test.go b/pkg/kubelet/config/apiserver_test.go index 9bbafecc6ec..3d80af68641 100644 --- a/pkg/kubelet/config/apiserver_test.go +++ b/pkg/kubelet/config/apiserver_test.go @@ -19,13 +19,12 @@ package config import ( "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) type fakePodLW struct { @@ -70,71 +69,66 @@ func TestNewSourceApiserver_UpdatesAndMultiplePods(t *testing.T) { watchResp: fakeWatch, } - ch := make(chan interface{}) + ch := make(chan sourceUpdate) newSourceApiserverFromLW(lw, ch) - got, ok := <-ch + update, ok := <-ch if !ok { t.Errorf("Unable to read from channel when expected") } - update := got.(kubetypes.PodUpdate) - expected := CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod1v1) + expected := createSourceUpdate(pod1v1) if !apiequality.Semantic.DeepEqual(expected, update) { - t.Errorf("Expected %#v; Got %#v", expected, update) + t.Errorf("Expected %#v; update %#v", expected, update) } // Add another pod fakeWatch.Add(pod2) - got, ok = <-ch + update, ok = <-ch if !ok { t.Errorf("Unable to read from channel when expected") } - update = got.(kubetypes.PodUpdate) // Could be sorted either of these two ways: - expectedA := CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod1v1, pod2) - expectedB := CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod2, pod1v1) + expectedA := createSourceUpdate(pod1v1, pod2) + expectedB := createSourceUpdate(pod2, pod1v1) if !apiequality.Semantic.DeepEqual(expectedA, update) && !apiequality.Semantic.DeepEqual(expectedB, update) { - t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update) + t.Errorf("Expected %#v or %#v, update %#v", expectedA, expectedB, update) } // Modify pod1 fakeWatch.Modify(pod1v2) - got, ok = <-ch + update, ok = <-ch if !ok { t.Errorf("Unable to read from channel when expected") } - update = got.(kubetypes.PodUpdate) - expectedA = CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod1v2, pod2) - expectedB = CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod2, pod1v2) + expectedA = createSourceUpdate(pod1v2, pod2) + expectedB = createSourceUpdate(pod2, pod1v2) if !apiequality.Semantic.DeepEqual(expectedA, update) && !apiequality.Semantic.DeepEqual(expectedB, update) { - t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update) + t.Errorf("Expected %#v or %#v, update %#v", expectedA, expectedB, update) } // Delete pod1 fakeWatch.Delete(pod1v2) - got, ok = <-ch + update, ok = <-ch if !ok { t.Errorf("Unable to read from channel when expected") } - update = got.(kubetypes.PodUpdate) - expected = CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod2) + expected = createSourceUpdate(pod2) if !apiequality.Semantic.DeepEqual(expected, update) { - t.Errorf("Expected %#v, Got %#v", expected, update) + t.Errorf("Expected %#v, update %#v", expected, update) } // Delete pod2 fakeWatch.Delete(pod2) - got, ok = <-ch + update, ok = <-ch if !ok { t.Errorf("Unable to read from channel when expected") } - update = got.(kubetypes.PodUpdate) - expected = CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource) + expected = createSourceUpdate() // Empty update. if !apiequality.Semantic.DeepEqual(expected, update) { - t.Errorf("Expected %#v, Got %#v", expected, update) + t.Errorf("Expected %#v, update %#v", expected, update) } } @@ -153,29 +147,27 @@ func TestNewSourceApiserver_TwoNamespacesSameName(t *testing.T) { watchResp: fakeWatch, } - ch := make(chan interface{}) + ch := make(chan sourceUpdate) newSourceApiserverFromLW(lw, ch) - got, ok := <-ch + update, ok := <-ch if !ok { t.Errorf("Unable to read from channel when expected") } - update := got.(kubetypes.PodUpdate) // Make sure that we get both pods. Catches bug #2294. if !(len(update.Pods) == 2) { - t.Errorf("Expected %d, Got %d", 2, len(update.Pods)) + t.Errorf("Expected %d, update %d", 2, len(update.Pods)) } // Delete pod1 fakeWatch.Delete(&pod1) - got, ok = <-ch + update, ok = <-ch if !ok { t.Errorf("Unable to read from channel when expected") } - update = got.(kubetypes.PodUpdate) if !(len(update.Pods) == 1) { - t.Errorf("Expected %d, Got %d", 1, len(update.Pods)) + t.Errorf("Expected %d, update %d", 1, len(update.Pods)) } } @@ -187,17 +179,16 @@ func TestNewSourceApiserverInitialEmptySendsEmptyPodUpdate(t *testing.T) { watchResp: fakeWatch, } - ch := make(chan interface{}) + ch := make(chan sourceUpdate) newSourceApiserverFromLW(lw, ch) - got, ok := <-ch + update, ok := <-ch if !ok { t.Errorf("Unable to read from channel when expected") } - update := got.(kubetypes.PodUpdate) - expected := CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource) + expected := createSourceUpdate() // Expect empty update. if !apiequality.Semantic.DeepEqual(expected, update) { - t.Errorf("Expected %#v; Got %#v", expected, update) + t.Errorf("Expected %#v; update %#v", expected, update) } } diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index dc0728c8f36..c793d2c7a45 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -18,7 +18,6 @@ package config import ( "context" - "fmt" "reflect" "sync" "time" @@ -34,23 +33,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util/format" ) -// PodConfigNotificationMode describes how changes are sent to the update channel. -type PodConfigNotificationMode int - -const ( - // PodConfigNotificationUnknown is the default value for - // PodConfigNotificationMode when uninitialized. - PodConfigNotificationUnknown PodConfigNotificationMode = iota - // PodConfigNotificationSnapshot delivers the full configuration as a SET whenever - // any change occurs. - PodConfigNotificationSnapshot - // PodConfigNotificationSnapshotAndUpdates delivers an UPDATE and DELETE message whenever pods are - // changed, and a SET message if there are any additions or removals. - PodConfigNotificationSnapshotAndUpdates - // PodConfigNotificationIncremental delivers ADD, UPDATE, DELETE, REMOVE, RECONCILE to the update channel. - PodConfigNotificationIncremental -) - type podStartupSLIObserver interface { ObservedPodOnWatch(pod *v1.Pod, when time.Time) } @@ -70,11 +52,15 @@ type PodConfig struct { sources sets.Set[string] } +type sourceUpdate struct { + Pods []*v1.Pod +} + // NewPodConfig creates an object that can merge many configuration sources into a stream // of normalized updates to a pod configuration. -func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *PodConfig { +func NewPodConfig(recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *PodConfig { updates := make(chan kubetypes.PodUpdate, 50) - storage := newPodStorage(updates, mode, recorder, startupSLIObserver) + storage := newPodStorage(updates, recorder, startupSLIObserver) podConfig := &PodConfig{ pods: storage, mux: newMux(storage), @@ -86,7 +72,7 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, // Channel creates or returns a config source channel. The channel // only accepts PodUpdates -func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{} { +func (c *PodConfig) Channel(ctx context.Context, source string) chan<- sourceUpdate { c.sourcesLock.Lock() defer c.sourcesLock.Unlock() c.sources.Insert(source) @@ -113,11 +99,6 @@ func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate { return c.updates } -// Sync requests the full configuration be delivered to the update channel. -func (c *PodConfig) Sync() { - c.pods.sync() -} - // podStorage manages the current pod state at any point in time and ensures updates // to the channel are delivered in order. Note that this object is an in-memory source of // "truth" and on creation contains zero entries. Once all previously read sources are @@ -126,7 +107,6 @@ type podStorage struct { podLock sync.RWMutex // map of source name to pod uid to pod reference pods map[string]map[types.UID]*v1.Pod - mode PodConfigNotificationMode // ensures that updates are delivered in strict order // on the updates channel @@ -146,10 +126,9 @@ type podStorage struct { // TODO: PodConfigNotificationMode could be handled by a listener to the updates channel // in the future, especially with multiple listeners. // TODO: allow initialization of the current state of the store with snapshotted version. -func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *podStorage { +func newPodStorage(updates chan<- kubetypes.PodUpdate, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *podStorage { return &podStorage{ pods: make(map[string]map[types.UID]*v1.Pod), - mode: mode, updates: updates, sourcesSeen: sets.Set[string]{}, recorder: recorder, @@ -157,69 +136,45 @@ func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificatio } } -// Merge normalizes a set of incoming changes from different sources into a map of all Pods -// and ensures that redundant changes are filtered out, and then pushes zero or more minimal +// Merge normalizes a set of incoming updates from different sources into a map of all Pods +// and ensures that redundant updates are filtered out, and then pushes zero or more minimal // updates onto the update channel. Ensures that updates are delivered in order. -func (s *podStorage) Merge(ctx context.Context, source string, change interface{}) error { +func (s *podStorage) Merge(ctx context.Context, source string, update sourceUpdate) error { s.updateLock.Lock() defer s.updateLock.Unlock() seenBefore := s.sourcesSeen.Has(source) - adds, updates, deletes, removes, reconciles := s.merge(ctx, source, change) + adds, updates, deletes, removes, reconciles := s.merge(ctx, source, update) firstSet := !seenBefore && s.sourcesSeen.Has(source) // deliver update notifications - switch s.mode { - case PodConfigNotificationIncremental: - if len(removes.Pods) > 0 { - s.updates <- *removes - } - if len(adds.Pods) > 0 { - s.updates <- *adds - } - if len(updates.Pods) > 0 { - s.updates <- *updates - } - if len(deletes.Pods) > 0 { - s.updates <- *deletes - } - if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 { - // Send an empty update when first seeing the source and there are - // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that - // the source is ready. - s.updates <- *adds - } - // Only add reconcile support here, because kubelet doesn't support Snapshot update now. - if len(reconciles.Pods) > 0 { - s.updates <- *reconciles - } - - case PodConfigNotificationSnapshotAndUpdates: - if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { - s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} - } - if len(updates.Pods) > 0 { - s.updates <- *updates - } - if len(deletes.Pods) > 0 { - s.updates <- *deletes - } - - case PodConfigNotificationSnapshot: - if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet { - s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} - } - - case PodConfigNotificationUnknown: - fallthrough - default: - panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode)) + if len(removes.Pods) > 0 { + s.updates <- *removes + } + if len(adds.Pods) > 0 { + s.updates <- *adds + } + if len(updates.Pods) > 0 { + s.updates <- *updates + } + if len(deletes.Pods) > 0 { + s.updates <- *deletes + } + if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 { + // Send an empty update when first seeing the source and there are + // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that + // the source is ready. + s.updates <- *adds + } + // Only add reconcile support here, because kubelet doesn't support Snapshot update now. + if len(reconciles.Pods) > 0 { + s.updates <- *reconciles } return nil } -func (s *podStorage) merge(ctx context.Context, source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) { +func (s *podStorage) merge(ctx context.Context, source string, update sourceUpdate) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) { s.podLock.Lock() defer s.podLock.Unlock() logger := klog.FromContext(ctx) @@ -268,47 +223,17 @@ func (s *podStorage) merge(ctx context.Context, source string, change interface{ } } - update := change.(kubetypes.PodUpdate) - switch update.Op { - case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE: - if update.Op == kubetypes.ADD { - logger.V(4).Info("Adding new pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) - } else if update.Op == kubetypes.DELETE { - logger.V(4).Info("Gracefully deleting pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) - } else { - logger.V(4).Info("Updating pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) + logger.V(4).Info("Setting pods for source", "source", source) + s.markSourceSet(source) + // Clear the old map entries by just creating a new map + oldPods := pods + pods = make(map[types.UID]*v1.Pod) + updatePodsFunc(update.Pods, oldPods, pods) + for uid, existing := range oldPods { + if _, found := pods[uid]; !found { + // this is a delete + removePods = append(removePods, existing) } - updatePodsFunc(update.Pods, pods, pods) - - case kubetypes.REMOVE: - logger.V(4).Info("Removing pods from source", "source", source, "pods", klog.KObjSlice(update.Pods)) - for _, value := range update.Pods { - if existing, found := pods[value.UID]; found { - // this is a delete - delete(pods, value.UID) - removePods = append(removePods, existing) - continue - } - // this is a no-op - } - - case kubetypes.SET: - logger.V(4).Info("Setting pods for source", "source", source) - s.markSourceSet(source) - // Clear the old map entries by just creating a new map - oldPods := pods - pods = make(map[types.UID]*v1.Pod) - updatePodsFunc(update.Pods, oldPods, pods) - for uid, existing := range oldPods { - if _, found := pods[uid]; !found { - // this is a delete - removePods = append(removePods, existing) - } - } - - default: - logger.Info("Received invalid update type", "type", update) - } s.pods[source] = pods @@ -475,25 +400,6 @@ func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGr return } -// sync sends a copy of the current state through the update channel. -func (s *podStorage) sync() { - s.updateLock.Lock() - defer s.updateLock.Unlock() - s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource} -} - -func (s *podStorage) mergedState() interface{} { - s.podLock.RLock() - defer s.podLock.RUnlock() - pods := make([]*v1.Pod, 0) - for _, sourcePods := range s.pods { - for _, podRef := range sourcePods { - pods = append(pods, podRef.DeepCopy()) - } - } - return pods -} - func copyPods(sourcePods []*v1.Pod) []*v1.Pod { pods := []*v1.Pod{} for _, source := range sourcePods { diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index b2e652c6c2e..24692fb6eb1 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -42,7 +42,7 @@ const ( TestSource = "test" ) -func expectEmptyChannel(t *testing.T, ch <-chan interface{}) { +func expectEmptyChannel(t *testing.T, ch <-chan sourceUpdate) { select { case update := <-ch: t.Errorf("Expected no update in channel, Got %v", update) @@ -93,15 +93,20 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod) return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source} } -func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) { +func createSourceUpdate(pods ...*v1.Pod) sourceUpdate { + return sourceUpdate{pods} +} + +func createPodConfigTester(ctx context.Context) (chan<- sourceUpdate, <-chan kubetypes.PodUpdate, *PodConfig) { eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) - config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{}) + config := NewPodConfig(eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{}) channel := config.Channel(ctx, TestSource) ch := config.Updates() return channel, ch, config } func expectPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate, expected ...kubetypes.PodUpdate) { + t.Helper() for i := range expected { update := <-ch sort.Sort(sortedPods(update.Pods)) @@ -139,127 +144,77 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) { func TestNewPodAdded(t *testing.T) { tCtx := ktesting.Init(t) - channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(tCtx) // see an update - podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) + podUpdate := createSourceUpdate(CreateValidPod("foo", "new")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))) - - config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new"))) } func TestNewPodAddedInvalidNamespace(t *testing.T) { tCtx := ktesting.Init(t) - channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(tCtx) // see an update - podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "")) + podUpdate := createSourceUpdate(CreateValidPod("foo", "")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", ""))) - - config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", ""))) } func TestNewPodAddedDefaultNamespace(t *testing.T) { tCtx := ktesting.Init(t) - channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(tCtx) // see an update - podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default")) + podUpdate := createSourceUpdate(CreateValidPod("foo", "default")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))) - - config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "default"))) } func TestNewPodAddedDifferentNamespaces(t *testing.T) { tCtx := ktesting.Init(t) - channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(tCtx) // see an update - podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default")) + pod1 := CreateValidPod("foo", "default") + podUpdate := createSourceUpdate(pod1) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))) // see an update in another namespace - podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) + pod2 := CreateValidPod("foo", "new") + podUpdate = createSourceUpdate(pod1, pod2) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))) - - config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "default"), CreateValidPod("foo", "new"))) } func TestInvalidPodFiltered(t *testing.T) { tCtx := ktesting.Init(t) - channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(tCtx) // see an update - podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) + podUpdate := createSourceUpdate(CreateValidPod("foo", "new")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))) // add an invalid update, pod with the same name - podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) + podUpdate = createSourceUpdate(CreateValidPod("foo", "new")) channel <- podUpdate expectNoPodUpdate(t, ch) } -func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { - tCtx := ktesting.Init(t) - - channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationSnapshotAndUpdates) - - // see an set - podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) - channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo", "new"))) - - config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new"))) - - // container updates are separated as UPDATE - pod := *podUpdate.Pods[0] - pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}} - channel <- CreatePodUpdate(kubetypes.ADD, TestSource, &pod) - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, &pod)) -} - -func TestNewPodAddedSnapshot(t *testing.T) { - tCtx := ktesting.Init(t) - - channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationSnapshot) - - // see an set - podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) - channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo", "new"))) - - config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new"))) - - // container updates are separated as UPDATE - pod := *podUpdate.Pods[0] - pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}} - channel <- CreatePodUpdate(kubetypes.ADD, TestSource, &pod) - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, &pod)) -} - func TestNewPodAddedUpdatedRemoved(t *testing.T) { tCtx := ktesting.Init(t) - channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(tCtx) // should register an add - podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) + podUpdate := createSourceUpdate(CreateValidPod("foo", "new")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))) @@ -269,11 +224,11 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { // an kubetypes.ADD should be converted to kubetypes.UPDATE pod := CreateValidPod("foo", "new") pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}} - podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, pod) + podUpdate = createSourceUpdate(pod) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) - podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new")) + podUpdate = createSourceUpdate() channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod)) } @@ -281,11 +236,11 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { func TestNewPodAddedDelete(t *testing.T) { tCtx := ktesting.Init(t) - channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(tCtx) // should register an add addedPod := CreateValidPod("foo", "new") - podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, addedPod) + podUpdate := createSourceUpdate(addedPod) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, addedPod)) @@ -293,7 +248,7 @@ func TestNewPodAddedDelete(t *testing.T) { timestamp := metav1.NewTime(time.Now()) deletedPod := CreateValidPod("foo", "new") deletedPod.ObjectMeta.DeletionTimestamp = ×tamp - podUpdate = CreatePodUpdate(kubetypes.DELETE, TestSource, deletedPod) + podUpdate = createSourceUpdate(deletedPod) channel <- podUpdate // the existing pod should be gracefully deleted expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.DELETE, TestSource, addedPod)) @@ -302,20 +257,21 @@ func TestNewPodAddedDelete(t *testing.T) { func TestNewPodAddedUpdatedSet(t *testing.T) { tCtx := ktesting.Init(t) - channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(tCtx) // should register an add - podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new")) + podUpdate := createSourceUpdate(CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new"))) // should ignore ADDs that are identical + channel <- podUpdate expectNoPodUpdate(t, ch) // should be converted to an kubetypes.ADD, kubetypes.REMOVE, and kubetypes.UPDATE pod := CreateValidPod("foo2", "new") pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}} - podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, pod, CreateValidPod("foo3", "new"), CreateValidPod("foo4", "new")) + podUpdate = createSourceUpdate(pod, CreateValidPod("foo3", "new"), CreateValidPod("foo4", "new")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new")), @@ -342,65 +298,51 @@ func TestNewPodAddedSetReconciled(t *testing.T) { } return pods, pods[0] } - for _, op := range []kubetypes.PodOperation{ - kubetypes.ADD, - kubetypes.SET, - } { - var podWithStatusChange *v1.Pod - pods, _ := newTestPods(false, false) - channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + var podWithStatusChange *v1.Pod + pods, _ := newTestPods(false, false) + channel, ch, _ := createPodConfigTester(tCtx) - // Use SET to initialize the config, especially initialize the source set - channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...) - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pods...)) + // Use SET to initialize the config, especially initialize the source set + channel <- createSourceUpdate(pods...) + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pods...)) - // If status is not changed, no reconcile should be triggered - channel <- CreatePodUpdate(op, TestSource, pods...) - expectNoPodUpdate(t, ch) + // If status is not changed, no reconcile should be triggered + channel <- createSourceUpdate(pods...) + expectNoPodUpdate(t, ch) - // If the pod status is changed and not updated, a reconcile should be triggered - pods, podWithStatusChange = newTestPods(true, false) - channel <- CreatePodUpdate(op, TestSource, pods...) - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RECONCILE, TestSource, podWithStatusChange)) + // If the pod status is changed and not updated, a reconcile should be triggered + pods, podWithStatusChange = newTestPods(true, false) + channel <- createSourceUpdate(pods...) + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RECONCILE, TestSource, podWithStatusChange)) - // If the pod status is changed, but the pod is also updated, no reconcile should be triggered - pods, podWithStatusChange = newTestPods(true, true) - channel <- CreatePodUpdate(op, TestSource, pods...) - expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, podWithStatusChange)) - } + // If the pod status is changed, but the pod is also updated, no reconcile should be triggered + pods, podWithStatusChange = newTestPods(true, true) + channel <- createSourceUpdate(pods...) + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, podWithStatusChange)) } func TestInitialEmptySet(t *testing.T) { tCtx := ktesting.Init(t) - for _, test := range []struct { - mode PodConfigNotificationMode - op kubetypes.PodOperation - }{ - {PodConfigNotificationIncremental, kubetypes.ADD}, - {PodConfigNotificationSnapshot, kubetypes.SET}, - {PodConfigNotificationSnapshotAndUpdates, kubetypes.SET}, - } { - channel, ch, _ := createPodConfigTester(tCtx, test.mode) + channel, ch, _ := createPodConfigTester(tCtx) - // should register an empty PodUpdate operation - podUpdate := CreatePodUpdate(kubetypes.SET, TestSource) - channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource)) + // should register an empty PodUpdate operation + podUpdate := createSourceUpdate() + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource)) - // should ignore following empty sets - podUpdate = CreatePodUpdate(kubetypes.SET, TestSource) - channel <- podUpdate - podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) - channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource, CreateValidPod("foo", "new"))) - } + // should ignore following empty sets + podUpdate = createSourceUpdate() + channel <- podUpdate + podUpdate = createSourceUpdate(CreateValidPod("foo", "new")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))) } func TestPodUpdateAnnotations(t *testing.T) { tCtx := ktesting.Init(t) - channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(tCtx) pod := CreateValidPod("foo2", "new") pod.Annotations = make(map[string]string) @@ -408,22 +350,22 @@ func TestPodUpdateAnnotations(t *testing.T) { clone := pod.DeepCopy() - podUpdate := CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), clone, CreateValidPod("foo3", "new")) + podUpdate := createSourceUpdate(CreateValidPod("foo1", "new"), clone, CreateValidPod("foo3", "new")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))) pod.Annotations["kubernetes.io/blah"] = "superblah" - podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")) + podUpdate = createSourceUpdate(CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) pod.Annotations["kubernetes.io/otherblah"] = "doh" - podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")) + podUpdate = createSourceUpdate(CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) delete(pod.Annotations, "kubernetes.io/blah") - podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")) + podUpdate = createSourceUpdate(CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) } @@ -431,7 +373,7 @@ func TestPodUpdateAnnotations(t *testing.T) { func TestPodUpdateLabels(t *testing.T) { tCtx := ktesting.Init(t) - channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(tCtx) pod := CreateValidPod("foo2", "new") pod.Labels = make(map[string]string) @@ -439,12 +381,12 @@ func TestPodUpdateLabels(t *testing.T) { clone := pod.DeepCopy() - podUpdate := CreatePodUpdate(kubetypes.SET, TestSource, clone) + podUpdate := createSourceUpdate(clone) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pod)) pod.Labels["key"] = "newValue" - podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, pod) + podUpdate = createSourceUpdate(pod) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) @@ -454,7 +396,7 @@ func TestPodConfigRace(t *testing.T) { tCtx := ktesting.Init(t) eventBroadcaster := record.NewBroadcaster(record.WithContext(tCtx)) - config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{}) + config := NewPodConfig(eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{}) seenSources := sets.New[string](TestSource) var wg sync.WaitGroup const iterations = 100 diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index bf48553acbc..4f20da07250 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" api "k8s.io/kubernetes/pkg/apis/core" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" utilio "k8s.io/utils/io" ) @@ -55,12 +54,12 @@ type sourceFile struct { period time.Duration store cache.Store fileKeyMapping map[string]string - updates chan<- interface{} + updates chan<- sourceUpdate watchEvents chan *watchEvent } // NewSourceFile watches a config file for changes. -func NewSourceFile(logger klog.Logger, path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) { +func NewSourceFile(logger klog.Logger, path string, nodeName types.NodeName, period time.Duration, updates chan<- sourceUpdate) { // "github.com/sigma/go-inotify" requires a path without trailing "/" path = strings.TrimRight(path, string(os.PathSeparator)) @@ -69,13 +68,13 @@ func NewSourceFile(logger klog.Logger, path string, nodeName types.NodeName, per config.run(logger) } -func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile { +func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- sourceUpdate) *sourceFile { send := func(objs []interface{}) { var pods []*v1.Pod for _, o := range objs { pods = append(pods, o.(*v1.Pod)) } - updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource} + updates <- sourceUpdate{Pods: pods} } store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc) return &sourceFile{ @@ -126,7 +125,7 @@ func (s *sourceFile) listConfig(logger klog.Logger) error { return err } // Emit an update with an empty PodList to allow FileSource to be marked as seen - s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource} + s.updates <- sourceUpdate{Pods: []*v1.Pod{}} return fmt.Errorf("path does not exist, ignoring") } @@ -138,7 +137,7 @@ func (s *sourceFile) listConfig(logger klog.Logger) error { } if len(pods) == 0 { // Emit an update with an empty PodList to allow FileSource to be marked as seen - s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource} + s.updates <- sourceUpdate{Pods: pods} return nil } return s.replaceStore(pods...) diff --git a/pkg/kubelet/config/file_linux.go b/pkg/kubelet/config/file_linux.go index 20055e6a8cf..84ff81c9cd4 100644 --- a/pkg/kubelet/config/file_linux.go +++ b/pkg/kubelet/config/file_linux.go @@ -26,12 +26,11 @@ import ( "time" "github.com/fsnotify/fsnotify" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/flowcontrol" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) const ( @@ -72,7 +71,7 @@ func (s *sourceFile) doWatch(logger klog.Logger) error { return err } // Emit an update with an empty PodList to allow FileSource to be marked as seen - s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource} + s.updates <- sourceUpdate{Pods: []*v1.Pod{}} return &retryableError{"path does not exist, ignoring"} } diff --git a/pkg/kubelet/config/file_linux_test.go b/pkg/kubelet/config/file_linux_test.go index cda1aee393b..2da167f8d17 100644 --- a/pkg/kubelet/config/file_linux_test.go +++ b/pkg/kubelet/config/file_linux_test.go @@ -44,7 +44,7 @@ import ( func TestExtractFromNonExistentFile(t *testing.T) { logger, _ := ktesting.NewTestContext(t) - ch := make(chan interface{}, 1) + ch := make(chan sourceUpdate, 1) lw := newSourceFile("/some/fake/file", "localhost", time.Millisecond, ch) err := lw.doWatch(logger) if err == nil { @@ -54,12 +54,11 @@ func TestExtractFromNonExistentFile(t *testing.T) { func TestUpdateOnNonExistentFile(t *testing.T) { logger, _ := ktesting.NewTestContext(t) - ch := make(chan interface{}) + ch := make(chan sourceUpdate) NewSourceFile(logger, "random_non_existent_path", "localhost", time.Millisecond, ch) select { - case got := <-ch: - update := got.(kubetypes.PodUpdate) - expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource) + case update := <-ch: + expected := createSourceUpdate() // Expect empty update. if !apiequality.Semantic.DeepDerivative(expected, update) { t.Fatalf("expected %#v, Got %#v", expected, update) } @@ -83,11 +82,10 @@ func TestReadPodsFromFileExistAlready(t *testing.T) { defer os.RemoveAll(dirName) file := testCase.writeToFile(dirName, "test_pod_manifest", t) - ch := make(chan interface{}) + ch := make(chan sourceUpdate) NewSourceFile(logger, file, hostname, time.Millisecond, ch) select { - case got := <-ch: - update := got.(kubetypes.PodUpdate) + case update := <-ch: for _, pod := range update.Pods { // TODO: remove the conversion when validation is performed on versioned objects. internalPod := &api.Pod{} @@ -141,7 +139,7 @@ type testCase struct { lock *sync.Mutex desc string pod runtime.Object - expected kubetypes.PodUpdate + expected sourceUpdate } func getTestCases(hostname types.NodeName) []*testCase { @@ -170,7 +168,7 @@ func getTestCases(hostname types.NodeName) []*testCase { Phase: v1.PodPending, }, }, - expected: CreatePodUpdate(kubetypes.SET, kubetypes.FileSource, &v1.Pod{ + expected: createSourceUpdate(&v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-" + string(hostname), UID: "12345", @@ -255,7 +253,7 @@ func watchFileAdded(watchDir bool, symlink bool, t *testing.T) { createSymbolicLink(dirName, linkedDirName, fileName, t) } - ch := make(chan interface{}) + ch := make(chan sourceUpdate) if watchDir { NewSourceFile(logger, dirName, hostname, 100*time.Millisecond, ch) } else { @@ -310,7 +308,7 @@ func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *test } var file string - ch := make(chan interface{}) + ch := make(chan sourceUpdate) func() { testCase.lock.Lock() defer testCase.lock.Unlock() @@ -364,12 +362,11 @@ func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *test } } -func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) { +func expectUpdate(t *testing.T, ch chan sourceUpdate, testCase *testCase) { timer := time.After(5 * time.Second) for { select { - case got := <-ch: - update := got.(kubetypes.PodUpdate) + case update := <-ch: if len(update.Pods) == 0 { // filter out the empty updates from reading a non-existing path continue @@ -395,12 +392,11 @@ func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) { } } -func expectEmptyUpdate(t *testing.T, ch chan interface{}) { +func expectEmptyUpdate(t *testing.T, ch chan sourceUpdate) { timer := time.After(5 * time.Second) for { select { - case got := <-ch: - update := got.(kubetypes.PodUpdate) + case update := <-ch: if len(update.Pods) != 0 { t.Fatalf("expected empty update, got %#v", update) } diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go index 8aecb35af78..2cad9e15ea0 100644 --- a/pkg/kubelet/config/file_test.go +++ b/pkg/kubelet/config/file_test.go @@ -23,7 +23,6 @@ import ( "time" apiequality "k8s.io/apimachinery/pkg/api/equality" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/test/utils/ktesting" ) @@ -41,7 +40,7 @@ func TestExtractFromBadDataFile(t *testing.T) { t.Fatalf("unable to write test file %#v", err) } - ch := make(chan interface{}, 1) + ch := make(chan sourceUpdate, 1) lw := newSourceFile(fileName, "localhost", time.Millisecond, ch) err = lw.listConfig(logger) if err == nil { @@ -58,18 +57,18 @@ func TestExtractFromEmptyDir(t *testing.T) { defer removeAll(dirName, t) logger, _ := ktesting.NewTestContext(t) - ch := make(chan interface{}, 1) + ch := make(chan sourceUpdate, 1) lw := newSourceFile(dirName, "localhost", time.Millisecond, ch) err = lw.listConfig(logger) if err != nil { t.Fatalf("unexpected error: %v", err) } - update, ok := (<-ch).(kubetypes.PodUpdate) + update, ok := <-ch if !ok { t.Fatalf("unexpected type: %#v", update) } - expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource) + expected := createSourceUpdate() // Expect empty update. if !apiequality.Semantic.DeepEqual(expected, update) { t.Fatalf("expected %#v, got %#v", expected, update) } diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index 123f5edf01e..13ed0d6b826 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -25,7 +25,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" api "k8s.io/kubernetes/pkg/apis/core" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" @@ -36,14 +35,14 @@ type sourceURL struct { url string header http.Header nodeName types.NodeName - updates chan<- interface{} + updates chan<- sourceUpdate data []byte failureLogs int client *http.Client } // NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes. -func NewSourceURL(logger klog.Logger, url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) { +func NewSourceURL(logger klog.Logger, url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- sourceUpdate) { config := &sourceURL{ url: url, header: header, @@ -102,7 +101,7 @@ func (s *sourceURL) extractFromURL(logger klog.Logger) error { } if len(data) == 0 { // Emit an update with an empty PodList to allow HTTPSource to be marked as seen - s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.HTTPSource} + s.updates <- sourceUpdate{Pods: []*v1.Pod{}} return fmt.Errorf("zero-length data received from %v", s.url) } // Short circuit if the data has not changed since the last time it was read. @@ -118,7 +117,7 @@ func (s *sourceURL) extractFromURL(logger klog.Logger) error { // It parsed but could not be used. return singlePodErr } - s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.HTTPSource} + s.updates <- sourceUpdate{Pods: []*v1.Pod{pod}} return nil } @@ -133,7 +132,7 @@ func (s *sourceURL) extractFromURL(logger klog.Logger) error { for i := range podList.Items { pods = append(pods, &podList.Items[i]) } - s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.HTTPSource} + s.updates <- sourceUpdate{Pods: pods} return nil } diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go index d637451e2f2..ad867a26446 100644 --- a/pkg/kubelet/config/http_test.go +++ b/pkg/kubelet/config/http_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -39,7 +39,7 @@ import ( func TestURLErrorNotExistNoUpdate(t *testing.T) { logger, _ := ktesting.NewTestContext(t) - ch := make(chan interface{}) + ch := make(chan sourceUpdate) NewSourceURL(logger, "http://localhost:49575/_not_found_", http.Header{}, "localhost", time.Millisecond, ch) select { case got := <-ch: @@ -50,7 +50,7 @@ func TestURLErrorNotExistNoUpdate(t *testing.T) { func TestExtractFromHttpBadness(t *testing.T) { logger, _ := ktesting.NewTestContext(t) - ch := make(chan interface{}, 1) + ch := make(chan sourceUpdate, 1) c := sourceURL{"http://localhost:49575/_not_found_", http.Header{}, "other", ch, nil, 0, http.DefaultClient} if err := c.extractFromURL(logger); err == nil { t.Errorf("Expected error") @@ -120,7 +120,7 @@ func TestExtractInvalidPods(t *testing.T) { } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() - ch := make(chan interface{}, 1) + ch := make(chan sourceUpdate, 1) c := sourceURL{testServer.URL, http.Header{}, "localhost", ch, nil, 0, http.DefaultClient} if err := c.extractFromURL(logger); err == nil { t.Errorf("%s: Expected error", testCase.desc) @@ -137,7 +137,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { var testCases = []struct { desc string pods runtime.Object - expected kubetypes.PodUpdate + expected sourceUpdate }{ { desc: "Single pod", @@ -161,8 +161,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { Phase: v1.PodPending, }, }, - expected: CreatePodUpdate(kubetypes.SET, - kubetypes.HTTPSource, + expected: createSourceUpdate( &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: "111", @@ -232,8 +231,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { }, }, }, - expected: CreatePodUpdate(kubetypes.SET, - kubetypes.HTTPSource, + expected: createSourceUpdate( &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: "111", @@ -304,13 +302,13 @@ func TestExtractPodsFromHTTP(t *testing.T) { } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() - ch := make(chan interface{}, 1) + ch := make(chan sourceUpdate, 1) c := sourceURL{testServer.URL, http.Header{}, types.NodeName(nodeName), ch, nil, 0, http.DefaultClient} if err := c.extractFromURL(logger); err != nil { t.Errorf("%s: Unexpected error: %v", testCase.desc, err) continue } - update := (<-ch).(kubetypes.PodUpdate) + update := <-ch if !apiequality.Semantic.DeepEqual(testCase.expected, update) { t.Errorf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update) @@ -356,14 +354,14 @@ func TestURLWithHeader(t *testing.T) { defer testServer.Close() logger, _ := ktesting.NewTestContext(t) - ch := make(chan interface{}, 1) + ch := make(chan sourceUpdate, 1) header := make(http.Header) header.Set("Metadata-Flavor", "Google") c := sourceURL{testServer.URL, header, "localhost", ch, nil, 0, http.DefaultClient} if err := c.extractFromURL(logger); err != nil { t.Fatalf("Unexpected error extracting from URL: %v", err) } - update := (<-ch).(kubetypes.PodUpdate) + update := <-ch headerVal := fakeHandler.RequestReceived.Header["Metadata-Flavor"] if len(headerVal) != 1 || headerVal[0] != "Google" { diff --git a/pkg/kubelet/config/mux.go b/pkg/kubelet/config/mux.go index 091148f950f..6482562055a 100644 --- a/pkg/kubelet/config/mux.go +++ b/pkg/kubelet/config/mux.go @@ -28,7 +28,7 @@ type merger interface { // Invoked when a change from a source is received. May also function as an incremental // merger if you wish to consume changes incrementally. Must be reentrant when more than // one source is defined. - Merge(ctx context.Context, source string, update interface{}) error + Merge(ctx context.Context, source string, update sourceUpdate) error } // mux is a class for merging configuration from multiple sources. Changes are @@ -40,13 +40,13 @@ type mux struct { // Sources and their lock. sourceLock sync.RWMutex // Maps source names to channels - sources map[string]chan interface{} + sources map[string]chan sourceUpdate } // newMux creates a new mux that can merge changes from multiple sources. func newMux(merger merger) *mux { mux := &mux{ - sources: make(map[string]chan interface{}), + sources: make(map[string]chan sourceUpdate), merger: merger, } return mux @@ -57,7 +57,7 @@ func newMux(merger merger) *mux { // source will return the same channel. This allows change and state based sources // to use the same channel. Different source names however will be treated as a // union. -func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interface{} { +func (m *mux) ChannelWithContext(ctx context.Context, source string) chan sourceUpdate { if len(source) == 0 { panic("Channel given an empty name") } @@ -67,14 +67,14 @@ func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interf if exists { return channel } - newChannel := make(chan interface{}) + newChannel := make(chan sourceUpdate) m.sources[source] = newChannel go wait.Until(func() { m.listen(ctx, source, newChannel) }, 0, ctx.Done()) return newChannel } -func (m *mux) listen(ctx context.Context, source string, listenChannel <-chan interface{}) { +func (m *mux) listen(ctx context.Context, source string, listenChannel <-chan sourceUpdate) { logger := klog.FromContext(ctx) for update := range listenChannel { if err := m.merger.Merge(ctx, source, update); err != nil { diff --git a/pkg/kubelet/config/mux_test.go b/pkg/kubelet/config/mux_test.go index 6033023c871..022f8e5e3fa 100644 --- a/pkg/kubelet/config/mux_test.go +++ b/pkg/kubelet/config/mux_test.go @@ -18,9 +18,13 @@ package config import ( "context" + "fmt" "reflect" "testing" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/test/utils/ktesting" ) @@ -40,36 +44,42 @@ func TestConfigurationChannels(t *testing.T) { } } -type MergeMock struct { - source string - update interface{} - t *testing.T -} - -func (m MergeMock) Merge(ctx context.Context, source string, update interface{}) error { - if m.source != source { - m.t.Errorf("Expected %s, Got %s", m.source, source) - } - if !reflect.DeepEqual(m.update, update) { - m.t.Errorf("Expected %s, Got %s", m.update, update) - } - return nil -} - func TestMergeInvoked(t *testing.T) { ctx := ktesting.Init(t) ctx = ktesting.WithCancel(ctx) defer ctx.Cancel("TestMergeInvoked completed") - merger := MergeMock{"one", "test", t} + const expectedSource = "one" + done := make(chan interface{}) + var merger mergeFunc = func(ctx context.Context, source string, update sourceUpdate) error { + if expectedSource != source { + t.Errorf("Expected %s, Got %s", expectedSource, source) + } + expectedUpdate := fakeUpdate(expectedSource) + if !reflect.DeepEqual(expectedUpdate, update) { + t.Errorf("Expected %v, Got %v", expectedUpdate, update) + } + close(done) + return nil + } + mux := newMux(&merger) - mux.ChannelWithContext(ctx, "one") <- "test" + + mux.ChannelWithContext(ctx, expectedSource) <- fakeUpdate(expectedSource) + + // Wait for Merge call. + select { + case <-done: + // Test complete. + case <-ctx.Done(): + t.Fatal("Test context canceled before completion") + } } // mergeFunc implements the Merger interface -type mergeFunc func(ctx context.Context, source string, update interface{}) error +type mergeFunc func(ctx context.Context, source string, update sourceUpdate) error -func (f mergeFunc) Merge(ctx context.Context, source string, update interface{}) error { +func (f mergeFunc) Merge(ctx context.Context, source string, update sourceUpdate) error { return f(ctx, source, update) } @@ -79,26 +89,27 @@ func TestSimultaneousMerge(t *testing.T) { defer ctx.Cancel("TestSimultaneousMerge completed") ch := make(chan bool, 2) - mux := newMux(mergeFunc(func(ctx context.Context, source string, update interface{}) error { - switch source { - case "one": - if update.(string) != "test" { - t.Errorf("Expected %s, Got %s", "test", update) - } - case "two": - if update.(string) != "test2" { - t.Errorf("Expected %s, Got %s", "test2", update) - } - default: - t.Errorf("Unexpected source, Got %s", update) + mux := newMux(mergeFunc(func(ctx context.Context, source string, update sourceUpdate) error { + if nsSource := update.Pods[0].Namespace; nsSource != source { + t.Errorf("Expected %s, Got %s", source, nsSource) } ch <- true return nil })) source := mux.ChannelWithContext(ctx, "one") source2 := mux.ChannelWithContext(ctx, "two") - source <- "test" - source2 <- "test2" + source <- fakeUpdate("one") + source2 <- fakeUpdate("two") <-ch <-ch } + +func fakeUpdate(source string) sourceUpdate { + return sourceUpdate{[]*v1.Pod{{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-pod", source), + Namespace: source, + UID: types.UID(fmt.Sprintf("%s-pod-uid", source)), + }, + }}} +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9acfc620c32..77aa516994b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -373,7 +373,7 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku } // source of all configuration - cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder, kubeDeps.PodStartupLatencyTracker) + cfg := config.NewPodConfig(kubeDeps.Recorder, kubeDeps.PodStartupLatencyTracker) // TODO: it needs to be replaced by a proper context in the future ctx := context.TODO() @@ -2611,9 +2611,6 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods)) // DELETE is treated as a UPDATE because of graceful deletion. handler.HandlePodUpdates(u.Pods) - case kubetypes.SET: - // TODO: Do we want to support this? - klog.ErrorS(nil, "Kubelet does not support snapshot update") default: klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op) } diff --git a/pkg/kubelet/types/pod_update.go b/pkg/kubelet/types/pod_update.go index ee4bd58c7e6..281efd20d33 100644 --- a/pkg/kubelet/types/pod_update.go +++ b/pkg/kubelet/types/pod_update.go @@ -38,10 +38,8 @@ type PodOperation int // These constants identify the PodOperations that can be made on a pod configuration. const ( - // SET is the current pod configuration. - SET PodOperation = iota // ADD signifies pods that are new to this source. - ADD + ADD PodOperation = iota // DELETE signifies pods that are gracefully deleted from this source. DELETE // REMOVE signifies pods that have been removed from this source.