diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 2ce35f71..6616d9f2 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -36,8 +36,6 @@ func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger } } -const bulkSize = 1 << 14 - // Streams returns the stream key to ID mapping of the runtime update streams for later use in Sync. func (r *RuntimeUpdates) Streams(ctx context.Context) (config, state icingaredis.Streams, err error) { config = icingaredis.Streams{"icinga:runtime": "0-0"} @@ -70,22 +68,29 @@ func (r *RuntimeUpdates) Sync( for _, factoryFunc := range factoryFuncs { s := common.NewSyncSubject(factoryFunc) - updateMessages := make(chan redis.XMessage, bulkSize) - upsertEntities := make(chan contracts.Entity, bulkSize) - deleteIds := make(chan interface{}, bulkSize) + updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount) + upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount) + deleteIds := make(chan interface{}, r.redis.Options.XReadCount) + var upserted chan contracts.Entity var upsertedFifo chan contracts.Entity var deleted chan interface{} var deletedFifo chan interface{} - + var upsertCount int + var deleteCount int + upsertStmt, upsertPlaceholders := r.db.BuildUpsertStmt(s.Entity()) if !allowParallel { upserted = make(chan contracts.Entity, 1) upsertedFifo = make(chan contracts.Entity, 1) deleted = make(chan interface{}, 1) deletedFifo = make(chan interface{}, 1) + upsertCount = 1 + deleteCount = 1 } else { - upserted = make(chan contracts.Entity) - deleted = make(chan interface{}) + upsertCount = r.db.BatchSizeByPlaceholders(upsertPlaceholders) + deleteCount = r.db.Options.MaxPlaceholdersPerStatement + upserted = make(chan contracts.Entity, upsertCount) + deleted = make(chan interface{}, deleteCount) } updateMessagesByKey[fmt.Sprintf("icinga:%s", utils.Key(s.Name(), ':'))] = updateMessages @@ -100,12 +105,11 @@ func (r *RuntimeUpdates) Sync( g.Go(func() error { defer close(upserted) - stmt, placeholders := r.db.BuildUpsertStmt(s.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) return r.db.NamedBulkExec( - ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, upserted, + ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, ) }) g.Go(func() error { @@ -141,7 +145,11 @@ func (r *RuntimeUpdates) Sync( g.Go(func() error { defer close(deleted) - return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds, deleted) + sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity())) + + return r.db.BulkExec( + ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, deleted, + ) }) g.Go(func() error { var counter com.Counter @@ -176,9 +184,9 @@ func (r *RuntimeUpdates) Sync( // customvar and customvar_flat sync. { - updateMessages := make(chan redis.XMessage, bulkSize) - upsertEntities := make(chan contracts.Entity, bulkSize) - deleteIds := make(chan interface{}, bulkSize) + updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount) + upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount) + deleteIds := make(chan interface{}, r.redis.Options.XReadCount) cv := common.NewSyncSubject(v1.NewCustomvar) cvFlat := common.NewSyncSubject(v1.NewCustomvarFlat) @@ -195,14 +203,18 @@ func (r *RuntimeUpdates) Sync( customvars, flatCustomvars, errs := v1.ExpandCustomvars(ctx, upsertEntities) com.ErrgroupReceive(g, errs) - upsertedCustomvars := make(chan contracts.Entity) + cvStmt, cvPlaceholders := r.db.BuildUpsertStmt(cv.Entity()) + cvCount := r.db.BatchSizeByPlaceholders(cvPlaceholders) + upsertedCustomvars := make(chan contracts.Entity, cvCount) g.Go(func() error { defer close(upsertedCustomvars) - stmt, placeholders := r.db.BuildUpsertStmt(cv.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, upsertedCustomvars) + + return r.db.NamedBulkExec( + ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, + ) }) g.Go(func() error { var counter com.Counter @@ -226,14 +238,18 @@ func (r *RuntimeUpdates) Sync( } }) + cvFlatStmt, cvFlatPlaceholders := r.db.BuildUpsertStmt(cvFlat.Entity()) + cvFlatCount := r.db.BatchSizeByPlaceholders(cvFlatPlaceholders) upsertedFlatCustomvars := make(chan contracts.Entity) g.Go(func() error { defer close(upsertedFlatCustomvars) - stmt, placeholders := r.db.BuildUpsertStmt(cvFlat.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, upsertedFlatCustomvars) + + return r.db.NamedBulkExec( + ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, upsertedFlatCustomvars, + ) }) g.Go(func() error { var counter com.Counter @@ -294,7 +310,7 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri for { xra := &redis.XReadArgs{ Streams: streams.Option(), - Count: bulkSize, + Count: int64(r.redis.Options.XReadCount), Block: 0, }