mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Reimplement quiet functions
This commit is contained in:
parent
f1d834e54b
commit
1c2d41f447
1 changed files with 122 additions and 0 deletions
122
mysql.go
122
mysql.go
|
|
@ -191,6 +191,39 @@ func (dbw *DBWrapper) sqlTryTransaction(f func(*sql.Tx) error, concurrencySafety
|
|||
return dbw.SqlCommit(tx, quiet)
|
||||
}
|
||||
|
||||
// SqlTransaction executes the given function inside a transaction.
|
||||
func (dbw *DBWrapper) SqlTransactionQuiet(concurrencySafety bool, retryOnConnectionFailure bool, f func(*sql.Tx) error) error {
|
||||
for {
|
||||
if !dbw.IsConnected() {
|
||||
dbw.WaitForConnection()
|
||||
continue
|
||||
}
|
||||
|
||||
errTx := dbw.sqlTryTransaction(f, concurrencySafety, true)
|
||||
if errTx != nil {
|
||||
//TODO: Do this only for concurrencySafety = true, once we figure out the serialization errors.
|
||||
if isSerializationFailure(errTx) {
|
||||
continue
|
||||
}
|
||||
|
||||
if !dbw.checkConnection(false) {
|
||||
if retryOnConnectionFailure {
|
||||
continue
|
||||
} else {
|
||||
return MysqlConnectionError{"Transaction failed duo to a connection error"}
|
||||
}
|
||||
}
|
||||
|
||||
// We still log errors
|
||||
log.WithFields(log.Fields{
|
||||
"context": "sql",
|
||||
"error": errTx,
|
||||
}).Warn("SQL error occurred")
|
||||
}
|
||||
return errTx
|
||||
}
|
||||
}
|
||||
|
||||
// Wrapper around Db.BeginTx() for auto-logging
|
||||
func (dbw *DBWrapper) SqlBegin(concurrencySafety bool, quiet bool) (*sql.Tx, error) {
|
||||
var isoLvl sql.IsolationLevel
|
||||
|
|
@ -363,6 +396,89 @@ func sqlTryFetchAll(db DbClient, queryDescription string, query string, args ...
|
|||
return res, nil
|
||||
}
|
||||
|
||||
// No logging, no benchmarking
|
||||
func (dbw *DBWrapper) SqlFetchAllQuiet(db DbClient, queryDescription string, query string, args ...interface{}) ([][]interface{}, error) {
|
||||
for {
|
||||
if !dbw.IsConnected() {
|
||||
dbw.WaitForConnection()
|
||||
continue
|
||||
}
|
||||
|
||||
res, err := sqlTryFetchAllQuiet(db, queryDescription, query, args...)
|
||||
|
||||
if err != nil {
|
||||
if _, isDb := db.(*sql.DB); isDb {
|
||||
if !dbw.checkConnection(false) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
}
|
||||
|
||||
func sqlTryFetchAllQuiet(db DbClient, queryDescription string, query string, args ...interface{}) ([][]interface{}, error) {
|
||||
rows, errQuery := db.Query(query, args...)
|
||||
|
||||
if errQuery != nil {
|
||||
return [][]interface{}{}, errQuery
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
columnTypes, errCT := rows.ColumnTypes()
|
||||
if errCT != nil {
|
||||
return [][]interface{}{}, errCT
|
||||
}
|
||||
|
||||
colsPerRow := len(columnTypes)
|
||||
buf := list.New()
|
||||
bridges := make([]dbTypeBridge, colsPerRow)
|
||||
scanDest := make([]interface{}, colsPerRow)
|
||||
|
||||
for i, columnType := range columnTypes {
|
||||
typ := columnType.DatabaseTypeName()
|
||||
factory, hasFactory := dbTypeBridgeFactories[typ]
|
||||
if hasFactory {
|
||||
bridges[i] = factory()
|
||||
} else {
|
||||
bridges[i] = &dbBrokenBridge{typ: typ}
|
||||
}
|
||||
|
||||
scanDest[i] = bridges[i]
|
||||
}
|
||||
|
||||
for {
|
||||
if rows.Next() {
|
||||
if errScan := rows.Scan(scanDest...); errScan != nil {
|
||||
return [][]interface{}{}, errScan
|
||||
}
|
||||
|
||||
row := make([]interface{}, colsPerRow)
|
||||
|
||||
for i, bridge := range bridges {
|
||||
row[i] = bridge.Result()
|
||||
}
|
||||
|
||||
buf.PushBack(row)
|
||||
} else if errNx := rows.Err(); errNx == nil {
|
||||
break
|
||||
} else {
|
||||
return nil, errNx
|
||||
}
|
||||
}
|
||||
|
||||
res := make([][]interface{}, buf.Len())
|
||||
|
||||
for current, i := buf.Front(), 0; current != nil; current = current.Next() {
|
||||
res[i] = current.Value.([]interface{})
|
||||
i++
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Wrapper around tx.SqlExec() for auto-logging
|
||||
func (dbw *DBWrapper) SqlExec(tx *sql.Tx, opDescription string, sql string, args ...interface{}) (sql.Result, error) {
|
||||
benchmarc := benchmark.NewBenchmark()
|
||||
|
|
@ -381,3 +497,9 @@ func (dbw *DBWrapper) SqlExec(tx *sql.Tx, opDescription string, sql string, args
|
|||
|
||||
return res, err
|
||||
}
|
||||
|
||||
// No logging, no benchmarking
|
||||
func (dbw *DBWrapper) SqlExecQuiet(tx *sql.Tx, opDescription string, sql string, args ...interface{}) (sql.Result, error) {
|
||||
res, err := tx.Exec(sql, args...)
|
||||
return res, err
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue