Add missing doc in db

This commit is contained in:
Eric Lippmann 2021-06-22 08:50:30 +02:00
parent 4977d8f1f4
commit d1c20b6946

View file

@ -33,8 +33,14 @@ type DB struct {
tableSemaphoresMu sync.Mutex
}
// Options define user configurable database options.
type Options struct {
MaxConnections int `yaml:"max_connections" default:"16"`
// Maximum number of open connections to the database.
MaxConnections int `yaml:"max_connections" default:"16"`
// Maximum number of connections per table,
// regardless of what the connection is actually doing,
// e.g. INSERT, UPDATE, DELETE.
MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"`
// MaxPlaceholdersPerStatement defines the maximum number of placeholders in an
@ -59,6 +65,7 @@ func NewDb(db *sqlx.DB, logger *zap.SugaredLogger, options *Options) *DB {
}
}
// BuildColumns returns all columns of the given struct.
func (db *DB) BuildColumns(subject interface{}) []string {
fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names
columns := make([]string, 0, len(fields))
@ -72,6 +79,7 @@ func (db *DB) BuildColumns(subject interface{}) []string {
return columns
}
// BuildDeleteStmt returns a DELETE statement for the given struct.
func (db *DB) BuildDeleteStmt(from interface{}) string {
return fmt.Sprintf(
`DELETE FROM %s WHERE id IN (?)`,
@ -79,6 +87,7 @@ func (db *DB) BuildDeleteStmt(from interface{}) string {
)
}
// BuildInsertStmt returns an INSERT INTO statement for the given struct.
func (db *DB) BuildInsertStmt(into interface{}) (string, int) {
columns := db.BuildColumns(into)
@ -90,14 +99,17 @@ func (db *DB) BuildInsertStmt(into interface{}) (string, int) {
), len(columns)
}
func (db *DB) BuildSelectStmt(from interface{}, into interface{}) string {
// BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct
// and the column list from the specified columns struct.
func (db *DB) BuildSelectStmt(table interface{}, columns interface{}) string {
return fmt.Sprintf(
`SELECT %s FROM %s`,
strings.Join(db.BuildColumns(into), ", "),
utils.TableName(from),
strings.Join(db.BuildColumns(columns), ", "),
utils.TableName(table),
)
}
// BuildUpdateStmt returns an UPDATE statement for the given struct.
func (db *DB) BuildUpdateStmt(update interface{}) (string, int) {
columns := db.BuildColumns(update)
set := make([]string, 0, len(columns))
@ -113,6 +125,7 @@ func (db *DB) BuildUpdateStmt(update interface{}) (string, int) {
), len(columns) + 1 // +1 because of WHERE id = :id
}
// BuildUpsertStmt returns an upsert statement for the given struct.
func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) {
insertColumns := db.BuildColumns(subject)
var updateColumns []string
@ -138,6 +151,11 @@ func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders in
), len(insertColumns)
}
// BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`.
// Takes in up to the number of arguments specified in count from the arg stream,
// derives and expands a query and executes it with this set of arguments until the arg stream has been processed.
// The derived queries are executed in a separate goroutine with a weighting of 1
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error {
var cnt com.Counter
g, ctx := errgroup.WithContext(ctx)
@ -193,6 +211,13 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
return g.Wait()
}
// NamedBulkExec bulk executes queries with named placeholders in a VALUES clause most likely
// in the format INSERT ... VALUES. Takes in up to the number of entities specified in count
// from the arg stream, derives and executes a new query with the VALUES clause expanded to
// this set of arguments, until the arg stream has been processed.
// The queries are executed in a separate goroutine with a weighting of 1
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
// Entities for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) NamedBulkExec(
ctx context.Context, query string, count int, sem *semaphore.Weighted,
arg <-chan contracts.Entity, succeeded chan<- contracts.Entity,
@ -207,11 +232,6 @@ func (db *DB) NamedBulkExec(
})
g.Go(func() error {
// stmt, err := db.PrepareNamedContext(ctx, query)
// if err != nil {
// return err
// }
for {
select {
case b, ok := <-bulk:
@ -265,6 +285,12 @@ func (db *DB) NamedBulkExec(
return g.Wait()
}
// NamedBulkExecTx bulk executes queries with named placeholders in separate transactions.
// Takes in up to the number of entities specified in count from the arg stream and
// executes a new transaction that runs a new query for each entity in this set of arguments,
// until the arg stream has been processed.
// The transactions are executed in a separate goroutine with a weighting of 1
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
func (db *DB) NamedBulkExecTx(
ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity,
) error {
@ -346,6 +372,9 @@ func (db *DB) BatchSizeByPlaceholders(n int) int {
return 1
}
// YieldAll executes the query with the supplied args,
// scans each resulting row into an entity returned by the factory function,
// and streams them into a returned channel.
func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, args ...interface{}) (<-chan contracts.Entity, <-chan error) {
var cnt com.Counter
entities := make(chan contracts.Entity, 1)
@ -387,6 +416,10 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF
return entities, com.WaitAsync(g)
}
// CreateStreamed bulk creates the specified entities via NamedBulkExec.
// The insert statement is created using BuildInsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
@ -399,6 +432,10 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil)
}
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
@ -411,6 +448,10 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded)
}
// UpdateStreamed bulk updates the specified entities via NamedBulkExecTx.
// The update statement is created using BuildUpdateStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxRowsPerTransaction and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
@ -422,11 +463,17 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti
return db.NamedBulkExecTx(ctx, stmt, db.options.MaxRowsPerTransaction, sem, forward)
}
// DeleteStreamed bulk deletes the specified ids via BulkExec.
// The delete statement is created using BuildDeleteStmt with the passed entityType.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error {
sem := db.getSemaphoreForTable(utils.TableName(entityType))
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.options.MaxPlaceholdersPerStatement, sem, ids)
}
// Delete creates a channel from the specified ids and
// bulk deletes them by passing the channel along with the entityType to DeleteStreamed.
func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error {
idsCh := make(chan interface{}, len(ids))
for _, id := range ids {
@ -437,6 +484,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int
return db.DeleteStreamed(ctx, entityType, idsCh)
}
// IsRetryable checks whether the given error is retryable.
func IsRetryable(err error) bool {
if errors.Is(err, driver.ErrBadConn) {
return true
@ -451,8 +499,8 @@ func IsRetryable(err error) bool {
switch e.Number {
case 1053, 1205, 1213, 2006:
// 1053: Server shutdown in progress
// 1205:
// 1213:
// 1205: Lock wait timeout
// 1213: Deadlock found when trying to get lock
// 2006: MySQL server has gone away
return true
}