Merge pull request #136060 from tallclair/config-mux

Clean up Pod config sources: remove dead code
This commit is contained in:
Kubernetes Prow Robot 2026-01-08 07:05:39 +05:30 committed by GitHub
commit ca1f339c3e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 240 additions and 406 deletions

View file

@ -27,14 +27,13 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
// WaitForAPIServerSyncPeriod is the period between checks for the node list/watch initial sync
const WaitForAPIServerSyncPeriod = 1 * time.Second
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(logger klog.Logger, c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
func NewSourceApiserver(logger klog.Logger, c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- sourceUpdate) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
// The Reflector responsible for watching pods at the apiserver should be run only after
@ -55,13 +54,13 @@ func NewSourceApiserver(logger klog.Logger, c clientset.Interface, nodeName type
}
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- sourceUpdate) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
updates <- sourceUpdate{Pods: pods}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)

View file

@ -19,13 +19,12 @@ package config
import (
"testing"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
type fakePodLW struct {
@ -70,71 +69,66 @@ func TestNewSourceApiserver_UpdatesAndMultiplePods(t *testing.T) {
watchResp: fakeWatch,
}
ch := make(chan interface{})
ch := make(chan sourceUpdate)
newSourceApiserverFromLW(lw, ch)
got, ok := <-ch
update, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update := got.(kubetypes.PodUpdate)
expected := CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod1v1)
expected := createSourceUpdate(pod1v1)
if !apiequality.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v; Got %#v", expected, update)
t.Errorf("Expected %#v; update %#v", expected, update)
}
// Add another pod
fakeWatch.Add(pod2)
got, ok = <-ch
update, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubetypes.PodUpdate)
// Could be sorted either of these two ways:
expectedA := CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod1v1, pod2)
expectedB := CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod2, pod1v1)
expectedA := createSourceUpdate(pod1v1, pod2)
expectedB := createSourceUpdate(pod2, pod1v1)
if !apiequality.Semantic.DeepEqual(expectedA, update) && !apiequality.Semantic.DeepEqual(expectedB, update) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update)
t.Errorf("Expected %#v or %#v, update %#v", expectedA, expectedB, update)
}
// Modify pod1
fakeWatch.Modify(pod1v2)
got, ok = <-ch
update, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubetypes.PodUpdate)
expectedA = CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod1v2, pod2)
expectedB = CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod2, pod1v2)
expectedA = createSourceUpdate(pod1v2, pod2)
expectedB = createSourceUpdate(pod2, pod1v2)
if !apiequality.Semantic.DeepEqual(expectedA, update) && !apiequality.Semantic.DeepEqual(expectedB, update) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update)
t.Errorf("Expected %#v or %#v, update %#v", expectedA, expectedB, update)
}
// Delete pod1
fakeWatch.Delete(pod1v2)
got, ok = <-ch
update, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubetypes.PodUpdate)
expected = CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource, pod2)
expected = createSourceUpdate(pod2)
if !apiequality.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
t.Errorf("Expected %#v, update %#v", expected, update)
}
// Delete pod2
fakeWatch.Delete(pod2)
got, ok = <-ch
update, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubetypes.PodUpdate)
expected = CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource)
expected = createSourceUpdate() // Empty update.
if !apiequality.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
t.Errorf("Expected %#v, update %#v", expected, update)
}
}
@ -153,29 +147,27 @@ func TestNewSourceApiserver_TwoNamespacesSameName(t *testing.T) {
watchResp: fakeWatch,
}
ch := make(chan interface{})
ch := make(chan sourceUpdate)
newSourceApiserverFromLW(lw, ch)
got, ok := <-ch
update, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update := got.(kubetypes.PodUpdate)
// Make sure that we get both pods. Catches bug #2294.
if !(len(update.Pods) == 2) {
t.Errorf("Expected %d, Got %d", 2, len(update.Pods))
t.Errorf("Expected %d, update %d", 2, len(update.Pods))
}
// Delete pod1
fakeWatch.Delete(&pod1)
got, ok = <-ch
update, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update = got.(kubetypes.PodUpdate)
if !(len(update.Pods) == 1) {
t.Errorf("Expected %d, Got %d", 1, len(update.Pods))
t.Errorf("Expected %d, update %d", 1, len(update.Pods))
}
}
@ -187,17 +179,16 @@ func TestNewSourceApiserverInitialEmptySendsEmptyPodUpdate(t *testing.T) {
watchResp: fakeWatch,
}
ch := make(chan interface{})
ch := make(chan sourceUpdate)
newSourceApiserverFromLW(lw, ch)
got, ok := <-ch
update, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
update := got.(kubetypes.PodUpdate)
expected := CreatePodUpdate(kubetypes.SET, kubetypes.ApiserverSource)
expected := createSourceUpdate() // Expect empty update.
if !apiequality.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v; Got %#v", expected, update)
t.Errorf("Expected %#v; update %#v", expected, update)
}
}

View file

@ -18,7 +18,6 @@ package config
import (
"context"
"fmt"
"reflect"
"sync"
"time"
@ -34,23 +33,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/format"
)
// PodConfigNotificationMode describes how changes are sent to the update channel.
type PodConfigNotificationMode int
const (
// PodConfigNotificationUnknown is the default value for
// PodConfigNotificationMode when uninitialized.
PodConfigNotificationUnknown PodConfigNotificationMode = iota
// PodConfigNotificationSnapshot delivers the full configuration as a SET whenever
// any change occurs.
PodConfigNotificationSnapshot
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE and DELETE message whenever pods are
// changed, and a SET message if there are any additions or removals.
PodConfigNotificationSnapshotAndUpdates
// PodConfigNotificationIncremental delivers ADD, UPDATE, DELETE, REMOVE, RECONCILE to the update channel.
PodConfigNotificationIncremental
)
type podStartupSLIObserver interface {
ObservedPodOnWatch(pod *v1.Pod, when time.Time)
}
@ -70,11 +52,15 @@ type PodConfig struct {
sources sets.Set[string]
}
type sourceUpdate struct {
Pods []*v1.Pod
}
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *PodConfig {
func NewPodConfig(recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder, startupSLIObserver)
storage := newPodStorage(updates, recorder, startupSLIObserver)
podConfig := &PodConfig{
pods: storage,
mux: newMux(storage),
@ -86,7 +72,7 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder,
// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{} {
func (c *PodConfig) Channel(ctx context.Context, source string) chan<- sourceUpdate {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
@ -113,11 +99,6 @@ func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
return c.updates
}
// Sync requests the full configuration be delivered to the update channel.
func (c *PodConfig) Sync() {
c.pods.sync()
}
// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order. Note that this object is an in-memory source of
// "truth" and on creation contains zero entries. Once all previously read sources are
@ -126,7 +107,6 @@ type podStorage struct {
podLock sync.RWMutex
// map of source name to pod uid to pod reference
pods map[string]map[types.UID]*v1.Pod
mode PodConfigNotificationMode
// ensures that updates are delivered in strict order
// on the updates channel
@ -146,10 +126,9 @@ type podStorage struct {
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *podStorage {
func newPodStorage(updates chan<- kubetypes.PodUpdate, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *podStorage {
return &podStorage{
pods: make(map[string]map[types.UID]*v1.Pod),
mode: mode,
updates: updates,
sourcesSeen: sets.Set[string]{},
recorder: recorder,
@ -157,69 +136,45 @@ func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificatio
}
}
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// Merge normalizes a set of incoming updates from different sources into a map of all Pods
// and ensures that redundant updates are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
func (s *podStorage) Merge(ctx context.Context, source string, change interface{}) error {
func (s *podStorage) Merge(ctx context.Context, source string, update sourceUpdate) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, removes, reconciles := s.merge(ctx, source, change)
adds, updates, deletes, removes, reconciles := s.merge(ctx, source, update)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
case PodConfigNotificationSnapshotAndUpdates:
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
case PodConfigNotificationUnknown:
fallthrough
default:
panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
return nil
}
func (s *podStorage) merge(ctx context.Context, source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
func (s *podStorage) merge(ctx context.Context, source string, update sourceUpdate) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
logger := klog.FromContext(ctx)
@ -268,47 +223,17 @@ func (s *podStorage) merge(ctx context.Context, source string, change interface{
}
}
update := change.(kubetypes.PodUpdate)
switch update.Op {
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
if update.Op == kubetypes.ADD {
logger.V(4).Info("Adding new pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
} else if update.Op == kubetypes.DELETE {
logger.V(4).Info("Gracefully deleting pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
} else {
logger.V(4).Info("Updating pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
logger.V(4).Info("Setting pods for source", "source", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
updatePodsFunc(update.Pods, pods, pods)
case kubetypes.REMOVE:
logger.V(4).Info("Removing pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
for _, value := range update.Pods {
if existing, found := pods[value.UID]; found {
// this is a delete
delete(pods, value.UID)
removePods = append(removePods, existing)
continue
}
// this is a no-op
}
case kubetypes.SET:
logger.V(4).Info("Setting pods for source", "source", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
}
default:
logger.Info("Received invalid update type", "type", update)
}
s.pods[source] = pods
@ -475,25 +400,6 @@ func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGr
return
}
// sync sends a copy of the current state through the update channel.
func (s *podStorage) sync() {
s.updateLock.Lock()
defer s.updateLock.Unlock()
s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
}
func (s *podStorage) mergedState() interface{} {
s.podLock.RLock()
defer s.podLock.RUnlock()
pods := make([]*v1.Pod, 0)
for _, sourcePods := range s.pods {
for _, podRef := range sourcePods {
pods = append(pods, podRef.DeepCopy())
}
}
return pods
}
func copyPods(sourcePods []*v1.Pod) []*v1.Pod {
pods := []*v1.Pod{}
for _, source := range sourcePods {

View file

@ -42,7 +42,7 @@ const (
TestSource = "test"
)
func expectEmptyChannel(t *testing.T, ch <-chan interface{}) {
func expectEmptyChannel(t *testing.T, ch <-chan sourceUpdate) {
select {
case update := <-ch:
t.Errorf("Expected no update in channel, Got %v", update)
@ -93,15 +93,20 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
}
func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
func createSourceUpdate(pods ...*v1.Pod) sourceUpdate {
return sourceUpdate{pods}
}
func createPodConfigTester(ctx context.Context) (chan<- sourceUpdate, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
config := NewPodConfig(eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
channel := config.Channel(ctx, TestSource)
ch := config.Updates()
return channel, ch, config
}
func expectPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate, expected ...kubetypes.PodUpdate) {
t.Helper()
for i := range expected {
update := <-ch
sort.Sort(sortedPods(update.Pods))
@ -139,127 +144,77 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) {
func TestNewPodAdded(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(tCtx)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
podUpdate := createSourceUpdate(CreateValidPod("foo", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new")))
}
func TestNewPodAddedInvalidNamespace(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(tCtx)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", ""))
podUpdate := createSourceUpdate(CreateValidPod("foo", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "")))
}
func TestNewPodAddedDefaultNamespace(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(tCtx)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
podUpdate := createSourceUpdate(CreateValidPod("foo", "default"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "default")))
}
func TestNewPodAddedDifferentNamespaces(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(tCtx)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
pod1 := CreateValidPod("foo", "default")
podUpdate := createSourceUpdate(pod1)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default")))
// see an update in another namespace
podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
pod2 := CreateValidPod("foo", "new")
podUpdate = createSourceUpdate(pod1, pod2)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "default"), CreateValidPod("foo", "new")))
}
func TestInvalidPodFiltered(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(tCtx)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
podUpdate := createSourceUpdate(CreateValidPod("foo", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
// add an invalid update, pod with the same name
podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
podUpdate = createSourceUpdate(CreateValidPod("foo", "new"))
channel <- podUpdate
expectNoPodUpdate(t, ch)
}
func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationSnapshotAndUpdates)
// see an set
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo", "new")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new")))
// container updates are separated as UPDATE
pod := *podUpdate.Pods[0]
pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
channel <- CreatePodUpdate(kubetypes.ADD, TestSource, &pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, &pod))
}
func TestNewPodAddedSnapshot(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationSnapshot)
// see an set
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo", "new")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new")))
// container updates are separated as UPDATE
pod := *podUpdate.Pods[0]
pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
channel <- CreatePodUpdate(kubetypes.ADD, TestSource, &pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, &pod))
}
func TestNewPodAddedUpdatedRemoved(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(tCtx)
// should register an add
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
podUpdate := createSourceUpdate(CreateValidPod("foo", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
@ -269,11 +224,11 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
// an kubetypes.ADD should be converted to kubetypes.UPDATE
pod := CreateValidPod("foo", "new")
pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, pod)
podUpdate = createSourceUpdate(pod)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new"))
podUpdate = createSourceUpdate()
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod))
}
@ -281,11 +236,11 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
func TestNewPodAddedDelete(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(tCtx)
// should register an add
addedPod := CreateValidPod("foo", "new")
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, addedPod)
podUpdate := createSourceUpdate(addedPod)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, addedPod))
@ -293,7 +248,7 @@ func TestNewPodAddedDelete(t *testing.T) {
timestamp := metav1.NewTime(time.Now())
deletedPod := CreateValidPod("foo", "new")
deletedPod.ObjectMeta.DeletionTimestamp = &timestamp
podUpdate = CreatePodUpdate(kubetypes.DELETE, TestSource, deletedPod)
podUpdate = createSourceUpdate(deletedPod)
channel <- podUpdate
// the existing pod should be gracefully deleted
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.DELETE, TestSource, addedPod))
@ -302,20 +257,21 @@ func TestNewPodAddedDelete(t *testing.T) {
func TestNewPodAddedUpdatedSet(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(tCtx)
// should register an add
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new"))
podUpdate := createSourceUpdate(CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new")))
// should ignore ADDs that are identical
channel <- podUpdate
expectNoPodUpdate(t, ch)
// should be converted to an kubetypes.ADD, kubetypes.REMOVE, and kubetypes.UPDATE
pod := CreateValidPod("foo2", "new")
pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, pod, CreateValidPod("foo3", "new"), CreateValidPod("foo4", "new"))
podUpdate = createSourceUpdate(pod, CreateValidPod("foo3", "new"), CreateValidPod("foo4", "new"))
channel <- podUpdate
expectPodUpdate(t, ch,
CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new")),
@ -342,65 +298,51 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
}
return pods, pods[0]
}
for _, op := range []kubetypes.PodOperation{
kubetypes.ADD,
kubetypes.SET,
} {
var podWithStatusChange *v1.Pod
pods, _ := newTestPods(false, false)
channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
var podWithStatusChange *v1.Pod
pods, _ := newTestPods(false, false)
channel, ch, _ := createPodConfigTester(tCtx)
// Use SET to initialize the config, especially initialize the source set
channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...)
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pods...))
// Use SET to initialize the config, especially initialize the source set
channel <- createSourceUpdate(pods...)
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pods...))
// If status is not changed, no reconcile should be triggered
channel <- CreatePodUpdate(op, TestSource, pods...)
expectNoPodUpdate(t, ch)
// If status is not changed, no reconcile should be triggered
channel <- createSourceUpdate(pods...)
expectNoPodUpdate(t, ch)
// If the pod status is changed and not updated, a reconcile should be triggered
pods, podWithStatusChange = newTestPods(true, false)
channel <- CreatePodUpdate(op, TestSource, pods...)
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RECONCILE, TestSource, podWithStatusChange))
// If the pod status is changed and not updated, a reconcile should be triggered
pods, podWithStatusChange = newTestPods(true, false)
channel <- createSourceUpdate(pods...)
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RECONCILE, TestSource, podWithStatusChange))
// If the pod status is changed, but the pod is also updated, no reconcile should be triggered
pods, podWithStatusChange = newTestPods(true, true)
channel <- CreatePodUpdate(op, TestSource, pods...)
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, podWithStatusChange))
}
// If the pod status is changed, but the pod is also updated, no reconcile should be triggered
pods, podWithStatusChange = newTestPods(true, true)
channel <- createSourceUpdate(pods...)
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, podWithStatusChange))
}
func TestInitialEmptySet(t *testing.T) {
tCtx := ktesting.Init(t)
for _, test := range []struct {
mode PodConfigNotificationMode
op kubetypes.PodOperation
}{
{PodConfigNotificationIncremental, kubetypes.ADD},
{PodConfigNotificationSnapshot, kubetypes.SET},
{PodConfigNotificationSnapshotAndUpdates, kubetypes.SET},
} {
channel, ch, _ := createPodConfigTester(tCtx, test.mode)
channel, ch, _ := createPodConfigTester(tCtx)
// should register an empty PodUpdate operation
podUpdate := CreatePodUpdate(kubetypes.SET, TestSource)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource))
// should register an empty PodUpdate operation
podUpdate := createSourceUpdate()
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource))
// should ignore following empty sets
podUpdate = CreatePodUpdate(kubetypes.SET, TestSource)
channel <- podUpdate
podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource, CreateValidPod("foo", "new")))
}
// should ignore following empty sets
podUpdate = createSourceUpdate()
channel <- podUpdate
podUpdate = createSourceUpdate(CreateValidPod("foo", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
}
func TestPodUpdateAnnotations(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(tCtx)
pod := CreateValidPod("foo2", "new")
pod.Annotations = make(map[string]string)
@ -408,22 +350,22 @@ func TestPodUpdateAnnotations(t *testing.T) {
clone := pod.DeepCopy()
podUpdate := CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), clone, CreateValidPod("foo3", "new"))
podUpdate := createSourceUpdate(CreateValidPod("foo1", "new"), clone, CreateValidPod("foo3", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")))
pod.Annotations["kubernetes.io/blah"] = "superblah"
podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
podUpdate = createSourceUpdate(CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
pod.Annotations["kubernetes.io/otherblah"] = "doh"
podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
podUpdate = createSourceUpdate(CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
delete(pod.Annotations, "kubernetes.io/blah")
podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
podUpdate = createSourceUpdate(CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
}
@ -431,7 +373,7 @@ func TestPodUpdateAnnotations(t *testing.T) {
func TestPodUpdateLabels(t *testing.T) {
tCtx := ktesting.Init(t)
channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(tCtx)
pod := CreateValidPod("foo2", "new")
pod.Labels = make(map[string]string)
@ -439,12 +381,12 @@ func TestPodUpdateLabels(t *testing.T) {
clone := pod.DeepCopy()
podUpdate := CreatePodUpdate(kubetypes.SET, TestSource, clone)
podUpdate := createSourceUpdate(clone)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pod))
pod.Labels["key"] = "newValue"
podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, pod)
podUpdate = createSourceUpdate(pod)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
@ -454,7 +396,7 @@ func TestPodConfigRace(t *testing.T) {
tCtx := ktesting.Init(t)
eventBroadcaster := record.NewBroadcaster(record.WithContext(tCtx))
config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
config := NewPodConfig(eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
seenSources := sets.New[string](TestSource)
var wg sync.WaitGroup
const iterations = 100

View file

@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
api "k8s.io/kubernetes/pkg/apis/core"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
utilio "k8s.io/utils/io"
)
@ -55,12 +54,12 @@ type sourceFile struct {
period time.Duration
store cache.Store
fileKeyMapping map[string]string
updates chan<- interface{}
updates chan<- sourceUpdate
watchEvents chan *watchEvent
}
// NewSourceFile watches a config file for changes.
func NewSourceFile(logger klog.Logger, path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
func NewSourceFile(logger klog.Logger, path string, nodeName types.NodeName, period time.Duration, updates chan<- sourceUpdate) {
// "github.com/sigma/go-inotify" requires a path without trailing "/"
path = strings.TrimRight(path, string(os.PathSeparator))
@ -69,13 +68,13 @@ func NewSourceFile(logger klog.Logger, path string, nodeName types.NodeName, per
config.run(logger)
}
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- sourceUpdate) *sourceFile {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
updates <- sourceUpdate{Pods: pods}
}
store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
return &sourceFile{
@ -126,7 +125,7 @@ func (s *sourceFile) listConfig(logger klog.Logger) error {
return err
}
// Emit an update with an empty PodList to allow FileSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
s.updates <- sourceUpdate{Pods: []*v1.Pod{}}
return fmt.Errorf("path does not exist, ignoring")
}
@ -138,7 +137,7 @@ func (s *sourceFile) listConfig(logger klog.Logger) error {
}
if len(pods) == 0 {
// Emit an update with an empty PodList to allow FileSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
s.updates <- sourceUpdate{Pods: pods}
return nil
}
return s.replaceStore(pods...)

View file

@ -26,12 +26,11 @@ import (
"time"
"github.com/fsnotify/fsnotify"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
const (
@ -72,7 +71,7 @@ func (s *sourceFile) doWatch(logger klog.Logger) error {
return err
}
// Emit an update with an empty PodList to allow FileSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
s.updates <- sourceUpdate{Pods: []*v1.Pod{}}
return &retryableError{"path does not exist, ignoring"}
}

View file

@ -44,7 +44,7 @@ import (
func TestExtractFromNonExistentFile(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{}, 1)
ch := make(chan sourceUpdate, 1)
lw := newSourceFile("/some/fake/file", "localhost", time.Millisecond, ch)
err := lw.doWatch(logger)
if err == nil {
@ -54,12 +54,11 @@ func TestExtractFromNonExistentFile(t *testing.T) {
func TestUpdateOnNonExistentFile(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{})
ch := make(chan sourceUpdate)
NewSourceFile(logger, "random_non_existent_path", "localhost", time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
case update := <-ch:
expected := createSourceUpdate() // Expect empty update.
if !apiequality.Semantic.DeepDerivative(expected, update) {
t.Fatalf("expected %#v, Got %#v", expected, update)
}
@ -83,11 +82,10 @@ func TestReadPodsFromFileExistAlready(t *testing.T) {
defer os.RemoveAll(dirName)
file := testCase.writeToFile(dirName, "test_pod_manifest", t)
ch := make(chan interface{})
ch := make(chan sourceUpdate)
NewSourceFile(logger, file, hostname, time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
case update := <-ch:
for _, pod := range update.Pods {
// TODO: remove the conversion when validation is performed on versioned objects.
internalPod := &api.Pod{}
@ -141,7 +139,7 @@ type testCase struct {
lock *sync.Mutex
desc string
pod runtime.Object
expected kubetypes.PodUpdate
expected sourceUpdate
}
func getTestCases(hostname types.NodeName) []*testCase {
@ -170,7 +168,7 @@ func getTestCases(hostname types.NodeName) []*testCase {
Phase: v1.PodPending,
},
},
expected: CreatePodUpdate(kubetypes.SET, kubetypes.FileSource, &v1.Pod{
expected: createSourceUpdate(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-" + string(hostname),
UID: "12345",
@ -255,7 +253,7 @@ func watchFileAdded(watchDir bool, symlink bool, t *testing.T) {
createSymbolicLink(dirName, linkedDirName, fileName, t)
}
ch := make(chan interface{})
ch := make(chan sourceUpdate)
if watchDir {
NewSourceFile(logger, dirName, hostname, 100*time.Millisecond, ch)
} else {
@ -310,7 +308,7 @@ func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *test
}
var file string
ch := make(chan interface{})
ch := make(chan sourceUpdate)
func() {
testCase.lock.Lock()
defer testCase.lock.Unlock()
@ -364,12 +362,11 @@ func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *test
}
}
func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) {
func expectUpdate(t *testing.T, ch chan sourceUpdate, testCase *testCase) {
timer := time.After(5 * time.Second)
for {
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
case update := <-ch:
if len(update.Pods) == 0 {
// filter out the empty updates from reading a non-existing path
continue
@ -395,12 +392,11 @@ func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) {
}
}
func expectEmptyUpdate(t *testing.T, ch chan interface{}) {
func expectEmptyUpdate(t *testing.T, ch chan sourceUpdate) {
timer := time.After(5 * time.Second)
for {
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
case update := <-ch:
if len(update.Pods) != 0 {
t.Fatalf("expected empty update, got %#v", update)
}

View file

@ -23,7 +23,6 @@ import (
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/utils/ktesting"
)
@ -41,7 +40,7 @@ func TestExtractFromBadDataFile(t *testing.T) {
t.Fatalf("unable to write test file %#v", err)
}
ch := make(chan interface{}, 1)
ch := make(chan sourceUpdate, 1)
lw := newSourceFile(fileName, "localhost", time.Millisecond, ch)
err = lw.listConfig(logger)
if err == nil {
@ -58,18 +57,18 @@ func TestExtractFromEmptyDir(t *testing.T) {
defer removeAll(dirName, t)
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{}, 1)
ch := make(chan sourceUpdate, 1)
lw := newSourceFile(dirName, "localhost", time.Millisecond, ch)
err = lw.listConfig(logger)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
update, ok := (<-ch).(kubetypes.PodUpdate)
update, ok := <-ch
if !ok {
t.Fatalf("unexpected type: %#v", update)
}
expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
expected := createSourceUpdate() // Expect empty update.
if !apiequality.Semantic.DeepEqual(expected, update) {
t.Fatalf("expected %#v, got %#v", expected, update)
}

View file

@ -25,7 +25,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
api "k8s.io/kubernetes/pkg/apis/core"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
@ -36,14 +35,14 @@ type sourceURL struct {
url string
header http.Header
nodeName types.NodeName
updates chan<- interface{}
updates chan<- sourceUpdate
data []byte
failureLogs int
client *http.Client
}
// NewSourceURL specifies the URL where to read the Pod configuration from, then watches it for changes.
func NewSourceURL(logger klog.Logger, url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
func NewSourceURL(logger klog.Logger, url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- sourceUpdate) {
config := &sourceURL{
url: url,
header: header,
@ -102,7 +101,7 @@ func (s *sourceURL) extractFromURL(logger klog.Logger) error {
}
if len(data) == 0 {
// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
s.updates <- sourceUpdate{Pods: []*v1.Pod{}}
return fmt.Errorf("zero-length data received from %v", s.url)
}
// Short circuit if the data has not changed since the last time it was read.
@ -118,7 +117,7 @@ func (s *sourceURL) extractFromURL(logger klog.Logger) error {
// It parsed but could not be used.
return singlePodErr
}
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
s.updates <- sourceUpdate{Pods: []*v1.Pod{pod}}
return nil
}
@ -133,7 +132,7 @@ func (s *sourceURL) extractFromURL(logger klog.Logger) error {
for i := range podList.Items {
pods = append(pods, &podList.Items[i])
}
s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
s.updates <- sourceUpdate{Pods: pods}
return nil
}

View file

@ -23,7 +23,7 @@ import (
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -39,7 +39,7 @@ import (
func TestURLErrorNotExistNoUpdate(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{})
ch := make(chan sourceUpdate)
NewSourceURL(logger, "http://localhost:49575/_not_found_", http.Header{}, "localhost", time.Millisecond, ch)
select {
case got := <-ch:
@ -50,7 +50,7 @@ func TestURLErrorNotExistNoUpdate(t *testing.T) {
func TestExtractFromHttpBadness(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{}, 1)
ch := make(chan sourceUpdate, 1)
c := sourceURL{"http://localhost:49575/_not_found_", http.Header{}, "other", ch, nil, 0, http.DefaultClient}
if err := c.extractFromURL(logger); err == nil {
t.Errorf("Expected error")
@ -120,7 +120,7 @@ func TestExtractInvalidPods(t *testing.T) {
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
ch := make(chan interface{}, 1)
ch := make(chan sourceUpdate, 1)
c := sourceURL{testServer.URL, http.Header{}, "localhost", ch, nil, 0, http.DefaultClient}
if err := c.extractFromURL(logger); err == nil {
t.Errorf("%s: Expected error", testCase.desc)
@ -137,7 +137,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
var testCases = []struct {
desc string
pods runtime.Object
expected kubetypes.PodUpdate
expected sourceUpdate
}{
{
desc: "Single pod",
@ -161,8 +161,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
Phase: v1.PodPending,
},
},
expected: CreatePodUpdate(kubetypes.SET,
kubetypes.HTTPSource,
expected: createSourceUpdate(
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "111",
@ -232,8 +231,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
},
},
},
expected: CreatePodUpdate(kubetypes.SET,
kubetypes.HTTPSource,
expected: createSourceUpdate(
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "111",
@ -304,13 +302,13 @@ func TestExtractPodsFromHTTP(t *testing.T) {
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
ch := make(chan interface{}, 1)
ch := make(chan sourceUpdate, 1)
c := sourceURL{testServer.URL, http.Header{}, types.NodeName(nodeName), ch, nil, 0, http.DefaultClient}
if err := c.extractFromURL(logger); err != nil {
t.Errorf("%s: Unexpected error: %v", testCase.desc, err)
continue
}
update := (<-ch).(kubetypes.PodUpdate)
update := <-ch
if !apiequality.Semantic.DeepEqual(testCase.expected, update) {
t.Errorf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update)
@ -356,14 +354,14 @@ func TestURLWithHeader(t *testing.T) {
defer testServer.Close()
logger, _ := ktesting.NewTestContext(t)
ch := make(chan interface{}, 1)
ch := make(chan sourceUpdate, 1)
header := make(http.Header)
header.Set("Metadata-Flavor", "Google")
c := sourceURL{testServer.URL, header, "localhost", ch, nil, 0, http.DefaultClient}
if err := c.extractFromURL(logger); err != nil {
t.Fatalf("Unexpected error extracting from URL: %v", err)
}
update := (<-ch).(kubetypes.PodUpdate)
update := <-ch
headerVal := fakeHandler.RequestReceived.Header["Metadata-Flavor"]
if len(headerVal) != 1 || headerVal[0] != "Google" {

View file

@ -28,7 +28,7 @@ type merger interface {
// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(ctx context.Context, source string, update interface{}) error
Merge(ctx context.Context, source string, update sourceUpdate) error
}
// mux is a class for merging configuration from multiple sources. Changes are
@ -40,13 +40,13 @@ type mux struct {
// Sources and their lock.
sourceLock sync.RWMutex
// Maps source names to channels
sources map[string]chan interface{}
sources map[string]chan sourceUpdate
}
// newMux creates a new mux that can merge changes from multiple sources.
func newMux(merger merger) *mux {
mux := &mux{
sources: make(map[string]chan interface{}),
sources: make(map[string]chan sourceUpdate),
merger: merger,
}
return mux
@ -57,7 +57,7 @@ func newMux(merger merger) *mux {
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
func (m *mux) ChannelWithContext(ctx context.Context, source string) chan sourceUpdate {
if len(source) == 0 {
panic("Channel given an empty name")
}
@ -67,14 +67,14 @@ func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interf
if exists {
return channel
}
newChannel := make(chan interface{})
newChannel := make(chan sourceUpdate)
m.sources[source] = newChannel
go wait.Until(func() { m.listen(ctx, source, newChannel) }, 0, ctx.Done())
return newChannel
}
func (m *mux) listen(ctx context.Context, source string, listenChannel <-chan interface{}) {
func (m *mux) listen(ctx context.Context, source string, listenChannel <-chan sourceUpdate) {
logger := klog.FromContext(ctx)
for update := range listenChannel {
if err := m.merger.Merge(ctx, source, update); err != nil {

View file

@ -18,9 +18,13 @@ package config
import (
"context"
"fmt"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/test/utils/ktesting"
)
@ -40,36 +44,42 @@ func TestConfigurationChannels(t *testing.T) {
}
}
type MergeMock struct {
source string
update interface{}
t *testing.T
}
func (m MergeMock) Merge(ctx context.Context, source string, update interface{}) error {
if m.source != source {
m.t.Errorf("Expected %s, Got %s", m.source, source)
}
if !reflect.DeepEqual(m.update, update) {
m.t.Errorf("Expected %s, Got %s", m.update, update)
}
return nil
}
func TestMergeInvoked(t *testing.T) {
ctx := ktesting.Init(t)
ctx = ktesting.WithCancel(ctx)
defer ctx.Cancel("TestMergeInvoked completed")
merger := MergeMock{"one", "test", t}
const expectedSource = "one"
done := make(chan interface{})
var merger mergeFunc = func(ctx context.Context, source string, update sourceUpdate) error {
if expectedSource != source {
t.Errorf("Expected %s, Got %s", expectedSource, source)
}
expectedUpdate := fakeUpdate(expectedSource)
if !reflect.DeepEqual(expectedUpdate, update) {
t.Errorf("Expected %v, Got %v", expectedUpdate, update)
}
close(done)
return nil
}
mux := newMux(&merger)
mux.ChannelWithContext(ctx, "one") <- "test"
mux.ChannelWithContext(ctx, expectedSource) <- fakeUpdate(expectedSource)
// Wait for Merge call.
select {
case <-done:
// Test complete.
case <-ctx.Done():
t.Fatal("Test context canceled before completion")
}
}
// mergeFunc implements the Merger interface
type mergeFunc func(ctx context.Context, source string, update interface{}) error
type mergeFunc func(ctx context.Context, source string, update sourceUpdate) error
func (f mergeFunc) Merge(ctx context.Context, source string, update interface{}) error {
func (f mergeFunc) Merge(ctx context.Context, source string, update sourceUpdate) error {
return f(ctx, source, update)
}
@ -79,26 +89,27 @@ func TestSimultaneousMerge(t *testing.T) {
defer ctx.Cancel("TestSimultaneousMerge completed")
ch := make(chan bool, 2)
mux := newMux(mergeFunc(func(ctx context.Context, source string, update interface{}) error {
switch source {
case "one":
if update.(string) != "test" {
t.Errorf("Expected %s, Got %s", "test", update)
}
case "two":
if update.(string) != "test2" {
t.Errorf("Expected %s, Got %s", "test2", update)
}
default:
t.Errorf("Unexpected source, Got %s", update)
mux := newMux(mergeFunc(func(ctx context.Context, source string, update sourceUpdate) error {
if nsSource := update.Pods[0].Namespace; nsSource != source {
t.Errorf("Expected %s, Got %s", source, nsSource)
}
ch <- true
return nil
}))
source := mux.ChannelWithContext(ctx, "one")
source2 := mux.ChannelWithContext(ctx, "two")
source <- "test"
source2 <- "test2"
source <- fakeUpdate("one")
source2 <- fakeUpdate("two")
<-ch
<-ch
}
func fakeUpdate(source string) sourceUpdate {
return sourceUpdate{[]*v1.Pod{{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-pod", source),
Namespace: source,
UID: types.UID(fmt.Sprintf("%s-pod-uid", source)),
},
}}}
}

View file

@ -373,7 +373,7 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder, kubeDeps.PodStartupLatencyTracker)
cfg := config.NewPodConfig(kubeDeps.Recorder, kubeDeps.PodStartupLatencyTracker)
// TODO: it needs to be replaced by a proper context in the future
ctx := context.TODO()
@ -2611,9 +2611,6 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}

View file

@ -38,10 +38,8 @@ type PodOperation int
// These constants identify the PodOperations that can be made on a pod configuration.
const (
// SET is the current pod configuration.
SET PodOperation = iota
// ADD signifies pods that are new to this source.
ADD
ADD PodOperation = iota
// DELETE signifies pods that are gracefully deleted from this source.
DELETE
// REMOVE signifies pods that have been removed from this source.