diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 9446dea0..130f7098 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -98,7 +98,7 @@ func (db *DB) BuildUpdateStmt(update interface{}) (string, int) { `UPDATE %s SET %s WHERE id = :id`, utils.TableName(update), strings.Join(set, ", "), - ), len(columns) + ), len(columns) + 1 // +1 because of WHERE id = :id } func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) { diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 80f0a07b..8682e05e 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -140,10 +140,9 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { com.ErrgroupReceive(g, errs) g.Go(func() error { - // TODO (el): This is very slow in high latency scenarios. - // Use strings.Repeat() on the query and create a stmt - // with a size near the default value of max_allowed_packet. - return s.db.UpdateStreamed(ctx, entities) + // Using upsert here on purpose as this is the fastest way to do bulk updates. + // However, there is a risk that errors in the sync implementation could silently insert new rows. + return s.db.UpsertStreamed(ctx, entities, nil) }) }