Export DB.getSemaphoreForTable()

This commit is contained in:
Eric Lippmann 2021-09-30 15:27:46 +02:00
parent 16dd4663ad
commit d017a05d05

View file

@ -438,7 +438,7 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti
return errors.Wrap(err, "can't copy first entity")
}
sem := db.getSemaphoreForTable(utils.TableName(first))
sem := db.GetSemaphoreForTable(utils.TableName(first))
stmt, placeholders := db.BuildInsertStmt(first)
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil)
@ -454,7 +454,7 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti
return errors.Wrap(err, "can't copy first entity")
}
sem := db.getSemaphoreForTable(utils.TableName(first))
sem := db.GetSemaphoreForTable(utils.TableName(first))
stmt, placeholders := db.BuildUpsertStmt(first)
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded)
@ -469,7 +469,7 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti
if first == nil {
return errors.Wrap(err, "can't copy first entity")
}
sem := db.getSemaphoreForTable(utils.TableName(first))
sem := db.GetSemaphoreForTable(utils.TableName(first))
stmt, _ := db.BuildUpdateStmt(first)
return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward)
@ -480,7 +480,7 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error {
sem := db.getSemaphoreForTable(utils.TableName(entityType))
sem := db.GetSemaphoreForTable(utils.TableName(entityType))
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids)
}
@ -496,7 +496,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int
return db.DeleteStreamed(ctx, entityType, idsCh)
}
func (db *DB) getSemaphoreForTable(table string) *semaphore.Weighted {
func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {
db.tableSemaphoresMu.Lock()
defer db.tableSemaphoresMu.Unlock()