Runtime Updates: Use proper buffer channel sizes

This commit is contained in:
Eric Lippmann 2021-11-11 20:16:22 +01:00
parent 7d6474f6b5
commit 6a8163cdbc

View file

@ -36,8 +36,6 @@ func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger
}
}
const bulkSize = 1 << 14
// Streams returns the stream key to ID mapping of the runtime update streams for later use in Sync.
func (r *RuntimeUpdates) Streams(ctx context.Context) (config, state icingaredis.Streams, err error) {
config = icingaredis.Streams{"icinga:runtime": "0-0"}
@ -70,22 +68,29 @@ func (r *RuntimeUpdates) Sync(
for _, factoryFunc := range factoryFuncs {
s := common.NewSyncSubject(factoryFunc)
updateMessages := make(chan redis.XMessage, bulkSize)
upsertEntities := make(chan contracts.Entity, bulkSize)
deleteIds := make(chan interface{}, bulkSize)
updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount)
upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount)
deleteIds := make(chan interface{}, r.redis.Options.XReadCount)
var upserted chan contracts.Entity
var upsertedFifo chan contracts.Entity
var deleted chan interface{}
var deletedFifo chan interface{}
var upsertCount int
var deleteCount int
upsertStmt, upsertPlaceholders := r.db.BuildUpsertStmt(s.Entity())
if !allowParallel {
upserted = make(chan contracts.Entity, 1)
upsertedFifo = make(chan contracts.Entity, 1)
deleted = make(chan interface{}, 1)
deletedFifo = make(chan interface{}, 1)
upsertCount = 1
deleteCount = 1
} else {
upserted = make(chan contracts.Entity)
deleted = make(chan interface{})
upsertCount = r.db.BatchSizeByPlaceholders(upsertPlaceholders)
deleteCount = r.db.Options.MaxPlaceholdersPerStatement
upserted = make(chan contracts.Entity, upsertCount)
deleted = make(chan interface{}, deleteCount)
}
updateMessagesByKey[fmt.Sprintf("icinga:%s", utils.Key(s.Name(), ':'))] = updateMessages
@ -100,12 +105,11 @@ func (r *RuntimeUpdates) Sync(
g.Go(func() error {
defer close(upserted)
stmt, placeholders := r.db.BuildUpsertStmt(s.Entity())
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, upserted,
ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted,
)
})
g.Go(func() error {
@ -141,7 +145,11 @@ func (r *RuntimeUpdates) Sync(
g.Go(func() error {
defer close(deleted)
return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds, deleted)
sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity()))
return r.db.BulkExec(
ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, deleted,
)
})
g.Go(func() error {
var counter com.Counter
@ -176,9 +184,9 @@ func (r *RuntimeUpdates) Sync(
// customvar and customvar_flat sync.
{
updateMessages := make(chan redis.XMessage, bulkSize)
upsertEntities := make(chan contracts.Entity, bulkSize)
deleteIds := make(chan interface{}, bulkSize)
updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount)
upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount)
deleteIds := make(chan interface{}, r.redis.Options.XReadCount)
cv := common.NewSyncSubject(v1.NewCustomvar)
cvFlat := common.NewSyncSubject(v1.NewCustomvarFlat)
@ -195,14 +203,18 @@ func (r *RuntimeUpdates) Sync(
customvars, flatCustomvars, errs := v1.ExpandCustomvars(ctx, upsertEntities)
com.ErrgroupReceive(g, errs)
upsertedCustomvars := make(chan contracts.Entity)
cvStmt, cvPlaceholders := r.db.BuildUpsertStmt(cv.Entity())
cvCount := r.db.BatchSizeByPlaceholders(cvPlaceholders)
upsertedCustomvars := make(chan contracts.Entity, cvCount)
g.Go(func() error {
defer close(upsertedCustomvars)
stmt, placeholders := r.db.BuildUpsertStmt(cv.Entity())
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, upsertedCustomvars)
return r.db.NamedBulkExec(
ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars,
)
})
g.Go(func() error {
var counter com.Counter
@ -226,14 +238,18 @@ func (r *RuntimeUpdates) Sync(
}
})
cvFlatStmt, cvFlatPlaceholders := r.db.BuildUpsertStmt(cvFlat.Entity())
cvFlatCount := r.db.BatchSizeByPlaceholders(cvFlatPlaceholders)
upsertedFlatCustomvars := make(chan contracts.Entity)
g.Go(func() error {
defer close(upsertedFlatCustomvars)
stmt, placeholders := r.db.BuildUpsertStmt(cvFlat.Entity())
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, upsertedFlatCustomvars)
return r.db.NamedBulkExec(
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, upsertedFlatCustomvars,
)
})
g.Go(func() error {
var counter com.Counter
@ -294,7 +310,7 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri
for {
xra := &redis.XReadArgs{
Streams: streams.Option(),
Count: bulkSize,
Count: int64(r.redis.Options.XReadCount),
Block: 0,
}