diff --git a/mysql.go b/mysql.go index 95ddac16..dc6a1c68 100644 --- a/mysql.go +++ b/mysql.go @@ -191,6 +191,39 @@ func (dbw *DBWrapper) sqlTryTransaction(f func(*sql.Tx) error, concurrencySafety return dbw.SqlCommit(tx, quiet) } +// SqlTransaction executes the given function inside a transaction. +func (dbw *DBWrapper) SqlTransactionQuiet(concurrencySafety bool, retryOnConnectionFailure bool, f func(*sql.Tx) error) error { + for { + if !dbw.IsConnected() { + dbw.WaitForConnection() + continue + } + + errTx := dbw.sqlTryTransaction(f, concurrencySafety, true) + if errTx != nil { + //TODO: Do this only for concurrencySafety = true, once we figure out the serialization errors. + if isSerializationFailure(errTx) { + continue + } + + if !dbw.checkConnection(false) { + if retryOnConnectionFailure { + continue + } else { + return MysqlConnectionError{"Transaction failed duo to a connection error"} + } + } + + // We still log errors + log.WithFields(log.Fields{ + "context": "sql", + "error": errTx, + }).Warn("SQL error occurred") + } + return errTx + } +} + // Wrapper around Db.BeginTx() for auto-logging func (dbw *DBWrapper) SqlBegin(concurrencySafety bool, quiet bool) (*sql.Tx, error) { var isoLvl sql.IsolationLevel @@ -363,6 +396,89 @@ func sqlTryFetchAll(db DbClient, queryDescription string, query string, args ... return res, nil } +// No logging, no benchmarking +func (dbw *DBWrapper) SqlFetchAllQuiet(db DbClient, queryDescription string, query string, args ...interface{}) ([][]interface{}, error) { + for { + if !dbw.IsConnected() { + dbw.WaitForConnection() + continue + } + + res, err := sqlTryFetchAllQuiet(db, queryDescription, query, args...) + + if err != nil { + if _, isDb := db.(*sql.DB); isDb { + if !dbw.checkConnection(false) { + continue + } + } + } + + return res, err + } +} + +func sqlTryFetchAllQuiet(db DbClient, queryDescription string, query string, args ...interface{}) ([][]interface{}, error) { + rows, errQuery := db.Query(query, args...) + + if errQuery != nil { + return [][]interface{}{}, errQuery + } + + defer rows.Close() + + columnTypes, errCT := rows.ColumnTypes() + if errCT != nil { + return [][]interface{}{}, errCT + } + + colsPerRow := len(columnTypes) + buf := list.New() + bridges := make([]dbTypeBridge, colsPerRow) + scanDest := make([]interface{}, colsPerRow) + + for i, columnType := range columnTypes { + typ := columnType.DatabaseTypeName() + factory, hasFactory := dbTypeBridgeFactories[typ] + if hasFactory { + bridges[i] = factory() + } else { + bridges[i] = &dbBrokenBridge{typ: typ} + } + + scanDest[i] = bridges[i] + } + + for { + if rows.Next() { + if errScan := rows.Scan(scanDest...); errScan != nil { + return [][]interface{}{}, errScan + } + + row := make([]interface{}, colsPerRow) + + for i, bridge := range bridges { + row[i] = bridge.Result() + } + + buf.PushBack(row) + } else if errNx := rows.Err(); errNx == nil { + break + } else { + return nil, errNx + } + } + + res := make([][]interface{}, buf.Len()) + + for current, i := buf.Front(), 0; current != nil; current = current.Next() { + res[i] = current.Value.([]interface{}) + i++ + } + + return res, nil +} + // Wrapper around tx.SqlExec() for auto-logging func (dbw *DBWrapper) SqlExec(tx *sql.Tx, opDescription string, sql string, args ...interface{}) (sql.Result, error) { benchmarc := benchmark.NewBenchmark() @@ -381,3 +497,9 @@ func (dbw *DBWrapper) SqlExec(tx *sql.Tx, opDescription string, sql string, args return res, err } + +// No logging, no benchmarking +func (dbw *DBWrapper) SqlExecQuiet(tx *sql.Tx, opDescription string, sql string, args ...interface{}) (sql.Result, error) { + res, err := tx.Exec(sql, args...) + return res, err +}