diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index 16e277a3143..353897cfb6f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -679,6 +679,8 @@ func processDeltasInBatch( callbacks = append(callbacks, func() { handler.OnDelete(obj) }) + default: + return fmt.Errorf("Delta type %s is not supported in batch processing", d.Type) } } diff --git a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go index 3afd79af59a..857634c0c9f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go @@ -264,6 +264,15 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { return Deltas{item}, err } +// batchable stores the delta types that can be batched +var batchable = map[DeltaType]bool{ + Sync: true, + Replaced: true, + Added: true, + Updated: true, + Deleted: true, +} + // PopBatch pops as many items as possible to be processed as a batch using processBatch, // or pop a single item using processSingle if multiple items cannot be batched. func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProcessFunc) error { @@ -295,6 +304,14 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc break } item := f.items[i] + if !batchable[item.Type] { + if len(deltas) == 0 { + // if an unbatchable delta is first in the list, process just that one by itself + moveDeltaToProcessList(i) + } + // close the batch when an unbatchable delta is encountered + break + } id, err := f.keyOf(item) if err != nil { // close the batch here if error happens