Guard against unexpected delta types in batch processing

This commit is contained in:
Michael Aspinwall 2026-01-14 12:41:04 -05:00
parent b8470beda4
commit 64f780c1ec
2 changed files with 19 additions and 0 deletions

View file

@ -679,6 +679,8 @@ func processDeltasInBatch(
callbacks = append(callbacks, func() {
handler.OnDelete(obj)
})
default:
return fmt.Errorf("Delta type %s is not supported in batch processing", d.Type)
}
}

View file

@ -264,6 +264,15 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
return Deltas{item}, err
}
// batchable stores the delta types that can be batched
var batchable = map[DeltaType]bool{
Sync: true,
Replaced: true,
Added: true,
Updated: true,
Deleted: true,
}
// PopBatch pops as many items as possible to be processed as a batch using processBatch,
// or pop a single item using processSingle if multiple items cannot be batched.
func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProcessFunc) error {
@ -295,6 +304,14 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc
break
}
item := f.items[i]
if !batchable[item.Type] {
if len(deltas) == 0 {
// if an unbatchable delta is first in the list, process just that one by itself
moveDeltaToProcessList(i)
}
// close the batch when an unbatchable delta is encountered
break
}
id, err := f.keyOf(item)
if err != nil {
// close the batch here if error happens