From d1c20b6946e8086ea01821040d466efb68d5d257 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 22 Jun 2021 08:50:30 +0200 Subject: [PATCH] Add missing doc in db --- pkg/icingadb/db.go | 70 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index c13bbd95..e471d6dc 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -33,8 +33,14 @@ type DB struct { tableSemaphoresMu sync.Mutex } +// Options define user configurable database options. type Options struct { - MaxConnections int `yaml:"max_connections" default:"16"` + // Maximum number of open connections to the database. + MaxConnections int `yaml:"max_connections" default:"16"` + + // Maximum number of connections per table, + // regardless of what the connection is actually doing, + // e.g. INSERT, UPDATE, DELETE. MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"` // MaxPlaceholdersPerStatement defines the maximum number of placeholders in an @@ -59,6 +65,7 @@ func NewDb(db *sqlx.DB, logger *zap.SugaredLogger, options *Options) *DB { } } +// BuildColumns returns all columns of the given struct. func (db *DB) BuildColumns(subject interface{}) []string { fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names columns := make([]string, 0, len(fields)) @@ -72,6 +79,7 @@ func (db *DB) BuildColumns(subject interface{}) []string { return columns } +// BuildDeleteStmt returns a DELETE statement for the given struct. func (db *DB) BuildDeleteStmt(from interface{}) string { return fmt.Sprintf( `DELETE FROM %s WHERE id IN (?)`, @@ -79,6 +87,7 @@ func (db *DB) BuildDeleteStmt(from interface{}) string { ) } +// BuildInsertStmt returns an INSERT INTO statement for the given struct. func (db *DB) BuildInsertStmt(into interface{}) (string, int) { columns := db.BuildColumns(into) @@ -90,14 +99,17 @@ func (db *DB) BuildInsertStmt(into interface{}) (string, int) { ), len(columns) } -func (db *DB) BuildSelectStmt(from interface{}, into interface{}) string { +// BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct +// and the column list from the specified columns struct. +func (db *DB) BuildSelectStmt(table interface{}, columns interface{}) string { return fmt.Sprintf( `SELECT %s FROM %s`, - strings.Join(db.BuildColumns(into), ", "), - utils.TableName(from), + strings.Join(db.BuildColumns(columns), ", "), + utils.TableName(table), ) } +// BuildUpdateStmt returns an UPDATE statement for the given struct. func (db *DB) BuildUpdateStmt(update interface{}) (string, int) { columns := db.BuildColumns(update) set := make([]string, 0, len(columns)) @@ -113,6 +125,7 @@ func (db *DB) BuildUpdateStmt(update interface{}) (string, int) { ), len(columns) + 1 // +1 because of WHERE id = :id } +// BuildUpsertStmt returns an upsert statement for the given struct. func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) { insertColumns := db.BuildColumns(subject) var updateColumns []string @@ -138,6 +151,11 @@ func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders in ), len(insertColumns) } +// BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`. +// Takes in up to the number of arguments specified in count from the arg stream, +// derives and expands a query and executes it with this set of arguments until the arg stream has been processed. +// The derived queries are executed in a separate goroutine with a weighting of 1 +// and can be executed concurrently to the extent allowed by the semaphore passed in sem. func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) @@ -193,6 +211,13 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph return g.Wait() } +// NamedBulkExec bulk executes queries with named placeholders in a VALUES clause most likely +// in the format INSERT ... VALUES. Takes in up to the number of entities specified in count +// from the arg stream, derives and executes a new query with the VALUES clause expanded to +// this set of arguments, until the arg stream has been processed. +// The queries are executed in a separate goroutine with a weighting of 1 +// and can be executed concurrently to the extent allowed by the semaphore passed in sem. +// Entities for which the query ran successfully will be streamed on the succeeded channel. func (db *DB) NamedBulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, @@ -207,11 +232,6 @@ func (db *DB) NamedBulkExec( }) g.Go(func() error { - // stmt, err := db.PrepareNamedContext(ctx, query) - // if err != nil { - // return err - // } - for { select { case b, ok := <-bulk: @@ -265,6 +285,12 @@ func (db *DB) NamedBulkExec( return g.Wait() } +// NamedBulkExecTx bulk executes queries with named placeholders in separate transactions. +// Takes in up to the number of entities specified in count from the arg stream and +// executes a new transaction that runs a new query for each entity in this set of arguments, +// until the arg stream has been processed. +// The transactions are executed in a separate goroutine with a weighting of 1 +// and can be executed concurrently to the extent allowed by the semaphore passed in sem. func (db *DB) NamedBulkExecTx( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, ) error { @@ -346,6 +372,9 @@ func (db *DB) BatchSizeByPlaceholders(n int) int { return 1 } +// YieldAll executes the query with the supplied args, +// scans each resulting row into an entity returned by the factory function, +// and streams them into a returned channel. func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, args ...interface{}) (<-chan contracts.Entity, <-chan error) { var cnt com.Counter entities := make(chan contracts.Entity, 1) @@ -387,6 +416,10 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF return entities, com.WaitAsync(g) } +// CreateStreamed bulk creates the specified entities via NamedBulkExec. +// The insert statement is created using BuildInsertStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { @@ -399,6 +432,10 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil) } +// UpsertStreamed bulk upserts the specified entities via NamedBulkExec. +// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { @@ -411,6 +448,10 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded) } +// UpdateStreamed bulk updates the specified entities via NamedBulkExecTx. +// The update statement is created using BuildUpdateStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxRowsPerTransaction and +// concurrency is controlled via Options.MaxConnectionsPerTable. func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { @@ -422,11 +463,17 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti return db.NamedBulkExecTx(ctx, stmt, db.options.MaxRowsPerTransaction, sem, forward) } +// DeleteStreamed bulk deletes the specified ids via BulkExec. +// The delete statement is created using BuildDeleteStmt with the passed entityType. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error { sem := db.getSemaphoreForTable(utils.TableName(entityType)) return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.options.MaxPlaceholdersPerStatement, sem, ids) } +// Delete creates a channel from the specified ids and +// bulk deletes them by passing the channel along with the entityType to DeleteStreamed. func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error { idsCh := make(chan interface{}, len(ids)) for _, id := range ids { @@ -437,6 +484,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int return db.DeleteStreamed(ctx, entityType, idsCh) } +// IsRetryable checks whether the given error is retryable. func IsRetryable(err error) bool { if errors.Is(err, driver.ErrBadConn) { return true @@ -451,8 +499,8 @@ func IsRetryable(err error) bool { switch e.Number { case 1053, 1205, 1213, 2006: // 1053: Server shutdown in progress - // 1205: - // 1213: + // 1205: Lock wait timeout + // 1213: Deadlock found when trying to get lock // 2006: MySQL server has gone away return true }