From 8316349cf8f3b9104bd61034c99ca36fe2c77662 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 20 Sep 2019 14:03:53 +0200 Subject: [PATCH 1/2] Pre-create prometheus.Observers with constant labels refs #5 --- connection/mysql.go | 21 +++++++++++++++++---- connection/redis.go | 13 +++++++++++-- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/connection/mysql.go b/connection/mysql.go index 75a8c384..f8d71a9b 100644 --- a/connection/mysql.go +++ b/connection/mysql.go @@ -6,6 +6,7 @@ import ( "database/sql" "fmt" "git.icinga.com/icingadb/icingadb-main/utils" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "strings" "sync" @@ -13,6 +14,18 @@ import ( "time" ) +var mysqlObservers = struct { + begin prometheus.Observer + commit prometheus.Observer + rollback prometheus.Observer + transaction prometheus.Observer +}{ + DbIoSeconds.WithLabelValues("mysql", "begin"), + DbIoSeconds.WithLabelValues("mysql", "commit"), + DbIoSeconds.WithLabelValues("mysql", "rollback"), + DbIoSeconds.WithLabelValues("mysql", "transaction"), +} + // This is used in SqlFetchAll and SqlFetchAllQuiet type DbClientOrTransaction interface { Query(query string, args ...interface{}) (*sql.Rows, error) @@ -195,7 +208,7 @@ func (dbw *DBWrapper) SqlBegin(concurrencySafety bool, quiet bool) (DbTransactio tx, err = dbw.Db.BeginTx(context.Background(), &sql.TxOptions{Isolation: isoLvl}) benchmarc.Stop() - DbIoSeconds.WithLabelValues("mysql", "begin").Observe(benchmarc.Seconds()) + mysqlObservers.begin.Observe(benchmarc.Seconds()) log.WithFields(log.Fields{ "context": "sql", @@ -230,7 +243,7 @@ func (dbw *DBWrapper) SqlCommit(tx DbTransaction, quiet bool) error { err = tx.Commit() benchmarc.Stop() - DbIoSeconds.WithLabelValues("mysql", "commit").Observe(benchmarc.Seconds()) + mysqlObservers.commit.Observe(benchmarc.Seconds()) log.WithFields(log.Fields{ "context": "sql", @@ -263,7 +276,7 @@ func (dbw *DBWrapper) SqlRollback(tx DbTransaction, quiet bool) error { err = tx.Rollback() benchmarc.Stop() - DbIoSeconds.WithLabelValues("mysql", "rollback").Observe(benchmarc.Seconds()) + mysqlObservers.rollback.Observe(benchmarc.Seconds()) log.WithFields(log.Fields{ "context": "sql", @@ -484,7 +497,7 @@ func (dbw DBWrapper) SqlTransaction(concurrencySafety bool, retryOnConnectionFai errTx := dbw.sqlTryTransaction(f, concurrencySafety, false) if !quiet { benchmarc.Stop() - DbIoSeconds.WithLabelValues("mysql", "transaction").Observe(benchmarc.Seconds()) + mysqlObservers.transaction.Observe(benchmarc.Seconds()) log.WithFields(log.Fields{ "context": "sql", diff --git a/connection/redis.go b/connection/redis.go index bac48db9..543539f2 100644 --- a/connection/redis.go +++ b/connection/redis.go @@ -4,6 +4,7 @@ import ( "fmt" "git.icinga.com/icingadb/icingadb-main/utils" "github.com/go-redis/redis" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "sync" "sync/atomic" @@ -58,6 +59,14 @@ var RedisWriter = Icinga2RedisWriter{ }, } +var redisObservers = struct { + hgetall prometheus.Observer + multi prometheus.Observer +}{ + DbIoSeconds.WithLabelValues("redis", "hgetall"), + DbIoSeconds.WithLabelValues("redis", "multi"), +} + type RedisClient interface { Ping() *redis.StatusCmd Publish(channel string, message interface{}) *redis.IntCmd @@ -304,7 +313,7 @@ func (rdbw *RDBWrapper) HGetAll(key string) *redis.StringStringMapCmd { benchmarc.Stop() - DbIoSeconds.WithLabelValues("redis", "hgetall").Observe(benchmarc.Seconds()) + redisObservers.hgetall.Observe(benchmarc.Seconds()) log.WithFields(log.Fields{ "context": "redis", @@ -336,7 +345,7 @@ func (rdbw *RDBWrapper) TxPipelined(fn func(pipeliner redis.Pipeliner) error) ([ benchmarc.Stop() - DbIoSeconds.WithLabelValues("redis", "multi").Observe(benchmarc.Seconds()) + redisObservers.multi.Observe(benchmarc.Seconds()) log.WithFields(log.Fields{ "context": "redis", From 82956a139c2e45fe99fc1abf3e3f3f0188353d77 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 20 Sep 2019 15:08:44 +0200 Subject: [PATCH 2/2] Pre-create all prometheus.Observers refs #5 --- configobject/statesync/statesync.go | 13 ++++++- connection/mysql.go | 56 ++++++++++++++++------------- connection/mysql_test.go | 6 ++-- ha/ha.go | 22 +++++++++--- 4 files changed, 65 insertions(+), 32 deletions(-) diff --git a/configobject/statesync/statesync.go b/configobject/statesync/statesync.go index 177181ac..5f2dc3e3 100644 --- a/configobject/statesync/statesync.go +++ b/configobject/statesync/statesync.go @@ -5,6 +5,7 @@ import ( "git.icinga.com/icingadb/icingadb-main/connection" "git.icinga.com/icingadb/icingadb-main/supervisor" "github.com/go-redis/redis" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "time" ) @@ -12,6 +13,16 @@ import ( //Counter on how many host/service states have synced since the last logSyncCounters() var syncCounter = make(map[string]int) +var mysqlObservers = func() (mysqlObservers map[string]prometheus.Observer) { + mysqlObservers = map[string]prometheus.Observer{} + + for _, objectType := range [2]string{"host", "service"} { + mysqlObservers[objectType] = connection.DbIoSeconds.WithLabelValues("mysql", "replace into "+objectType+"_state") + } + + return +}() + //Start the sync goroutines for hosts and services func StartStateSync(super *supervisor.Supervisor) { go func() { @@ -79,7 +90,7 @@ func syncStates(super *supervisor.Supervisor, objectType string) { _, errExec := super.Dbw.SqlExecTx( tx, - "replace into "+objectType+"_state", + mysqlObservers[objectType], `REPLACE INTO `+objectType+`_state (`+objectType+`_id, env_id, state_type, soft_state, hard_state, attempt, severity, output, long_output, performance_data,`+ `check_commandline, is_problem, is_handled, is_reachable, is_flapping, is_acknowledged, acknowledgement_comment_id,`+ `in_downtime, execution_time, latency, timeout, last_update, last_state_change, last_soft_state,`+ diff --git a/connection/mysql.go b/connection/mysql.go index f8d71a9b..4252c05d 100644 --- a/connection/mysql.go +++ b/connection/mysql.go @@ -19,11 +19,17 @@ var mysqlObservers = struct { commit prometheus.Observer rollback prometheus.Observer transaction prometheus.Observer + bulkInsert prometheus.Observer + bulkDelete prometheus.Observer + bulkUpdate prometheus.Observer }{ DbIoSeconds.WithLabelValues("mysql", "begin"), DbIoSeconds.WithLabelValues("mysql", "commit"), DbIoSeconds.WithLabelValues("mysql", "rollback"), DbIoSeconds.WithLabelValues("mysql", "transaction"), + DbIoSeconds.WithLabelValues("mysql", "Bulk insert"), + DbIoSeconds.WithLabelValues("mysql", "Bulk delete"), + DbIoSeconds.WithLabelValues("mysql", "Bulk update"), } // This is used in SqlFetchAll and SqlFetchAllQuiet @@ -297,43 +303,43 @@ func (dbw *DBWrapper) SqlRollback(tx DbTransaction, quiet bool) error { } // Wrapper around sql.Exec() for auto-logging -func (dbw *DBWrapper) SqlExec(opDescription string, sql string, args ...interface{}) (sql.Result, error) { - return dbw.sqlExecInternal(dbw.Db, opDescription, sql, false, args...) +func (dbw *DBWrapper) SqlExec(opObserver prometheus.Observer, sql string, args ...interface{}) (sql.Result, error) { + return dbw.sqlExecInternal(dbw.Db, opObserver, sql, false, args...) } // No logging, no benchmarking -func (dbw *DBWrapper) SqlExecQuiet(opDescription string, sql string, args ...interface{}) (sql.Result, error) { - return dbw.sqlExecInternal(dbw.Db, opDescription, sql, true, args...) +func (dbw *DBWrapper) SqlExecQuiet(opObserver prometheus.Observer, sql string, args ...interface{}) (sql.Result, error) { + return dbw.sqlExecInternal(dbw.Db, opObserver, sql, true, args...) } // Wrapper around tx.Exec() for auto-logging -func (dbw *DBWrapper) SqlExecTx(tx DbTransaction, opDescription string, sql string, args ...interface{}) (sql.Result, error) { - return dbw.sqlExecInternal(tx, opDescription, sql, false, args...) +func (dbw *DBWrapper) SqlExecTx(tx DbTransaction, opObserver prometheus.Observer, sql string, args ...interface{}) (sql.Result, error) { + return dbw.sqlExecInternal(tx, opObserver, sql, false, args...) } // No logging, no benchmarking -func (dbw *DBWrapper) SqlExecTxQuiet(tx DbTransaction, opDescription string, sql string, args ...interface{}) (sql.Result, error) { - return dbw.sqlExecInternal(tx, opDescription, sql, true, args...) +func (dbw *DBWrapper) SqlExecTxQuiet(tx DbTransaction, opObserver prometheus.Observer, sql string, args ...interface{}) (sql.Result, error) { + return dbw.sqlExecInternal(tx, opObserver, sql, true, args...) } -func (dbw *DBWrapper) SqlFetchAll(queryDescription string, query string, args ...interface{}) ([][]interface{}, error) { - return dbw.sqlFetchAllInternal(dbw.Db, queryDescription, query, false, args...) +func (dbw *DBWrapper) SqlFetchAll(queryObserver prometheus.Observer, query string, args ...interface{}) ([][]interface{}, error) { + return dbw.sqlFetchAllInternal(dbw.Db, queryObserver, query, false, args...) } -func (dbw *DBWrapper) SqlFetchAllQuiet(queryDescription string, query string, args ...interface{}) ([][]interface{}, error) { - return dbw.sqlFetchAllInternal(dbw.Db, queryDescription, query, true, args...) +func (dbw *DBWrapper) SqlFetchAllQuiet(queryObserver prometheus.Observer, query string, args ...interface{}) ([][]interface{}, error) { + return dbw.sqlFetchAllInternal(dbw.Db, queryObserver, query, true, args...) } -func (dbw *DBWrapper) SqlFetchAllTx(tx DbTransaction, queryDescription string, query string, args ...interface{}) ([][]interface{}, error) { - return dbw.sqlFetchAllInternal(tx, queryDescription, query, false, args...) +func (dbw *DBWrapper) SqlFetchAllTx(tx DbTransaction, queryObserver prometheus.Observer, query string, args ...interface{}) ([][]interface{}, error) { + return dbw.sqlFetchAllInternal(tx, queryObserver, query, false, args...) } -func (dbw *DBWrapper) SqlFetchAllTxQuiet(tx DbTransaction, queryDescription string, query string, args ...interface{}) ([][]interface{}, error) { - return dbw.sqlFetchAllInternal(tx, queryDescription, query, true, args...) +func (dbw *DBWrapper) SqlFetchAllTxQuiet(tx DbTransaction, queryObserver prometheus.Observer, query string, args ...interface{}) ([][]interface{}, error) { + return dbw.sqlFetchAllInternal(tx, queryObserver, query, true, args...) } // Wrapper around sql.Exec() for auto-logging -func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opDescription string, sql string, quiet bool, args ...interface{}) (sql.Result, error) { +func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prometheus.Observer, sql string, quiet bool, args ...interface{}) (sql.Result, error) { for { if !dbw.IsConnected() { dbw.WaitForConnection() @@ -353,7 +359,7 @@ func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opDescription st } if !quiet { - DbIoSeconds.WithLabelValues("mysql", opDescription).Observe(benchmarc.Seconds()) + opObserver.Observe(benchmarc.Seconds()) log.WithFields(log.Fields{ "context": "sql", "benchmark": benchmarc, @@ -375,14 +381,14 @@ func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opDescription st } // Wrapper around Db.SqlQuery() for auto-logging -func (dbw *DBWrapper) sqlFetchAllInternal(db DbClientOrTransaction, queryDescription string, query string, quiet bool, args ...interface{}) ([][]interface{}, error) { +func (dbw *DBWrapper) sqlFetchAllInternal(db DbClientOrTransaction, queryObserver prometheus.Observer, query string, quiet bool, args ...interface{}) ([][]interface{}, error) { for { if !dbw.IsConnected() { dbw.WaitForConnection() continue } - res, err := sqlTryFetchAll(db, queryDescription, query, quiet, args...) + res, err := sqlTryFetchAll(db, queryObserver, query, quiet, args...) if err != nil { if _, isDb := db.(*sql.DB); isDb { @@ -396,7 +402,7 @@ func (dbw *DBWrapper) sqlFetchAllInternal(db DbClientOrTransaction, queryDescrip } } -func sqlTryFetchAll(db DbClientOrTransaction, queryDescription string, query string, quiet bool, args ...interface{}) ([][]interface{}, error) { +func sqlTryFetchAll(db DbClientOrTransaction, queryObserver prometheus.Observer, query string, quiet bool, args ...interface{}) ([][]interface{}, error) { var benchmarc *utils.Benchmark if !quiet { benchmarc = utils.NewBenchmark() @@ -410,7 +416,7 @@ func sqlTryFetchAll(db DbClientOrTransaction, queryDescription string, query str defer func() { if !quiet { - DbIoSeconds.WithLabelValues("mysql", queryDescription).Observe(benchmarc.Seconds()) + queryObserver.Observe(benchmarc.Seconds()) log.WithFields(log.Fields{ "context": "sql", "benchmark": benchmarc, @@ -666,7 +672,7 @@ func (dbw *DBWrapper) SqlBulkInsert(rows []Row, stmt *BulkInsertStmt) error { query := fmt.Sprintf(stmt.Format, strings.Join(placeholders, ", ")) _, err := dbw.WithRetry(func() (result sql.Result, e error) { - return dbw.SqlExec("Bulk insert", query, values...) + return dbw.SqlExec(mysqlObservers.bulkInsert, query, values...) }) if err != nil { @@ -697,7 +703,7 @@ func (dbw *DBWrapper) SqlBulkDelete(keys []string, stmt *BulkDeleteStmt) error { query := fmt.Sprintf(stmt.Format, placeholders) _, err := dbw.WithRetry(func() (result sql.Result, e error) { - return dbw.SqlExec("Bulk delete", query, values...) + return dbw.SqlExec(mysqlObservers.bulkDelete, query, values...) }) if err != nil { return err @@ -730,7 +736,7 @@ func (dbw *DBWrapper) SqlBulkUpdate(rows []Row, stmt *BulkUpdateStmt) error { query := fmt.Sprintf(stmt.Format, strings.Join(placeholders, ", ")) _, err := dbw.WithRetry(func() (result sql.Result, e error) { - return dbw.SqlExec("Bulk update", query, values...) + return dbw.SqlExec(mysqlObservers.bulkUpdate, query, values...) }) if err != nil { return err diff --git a/connection/mysql_test.go b/connection/mysql_test.go index e73812a6..6d9abe40 100644 --- a/connection/mysql_test.go +++ b/connection/mysql_test.go @@ -230,6 +230,8 @@ func TestDBWrapper_SqlQuery(t *testing.T) { mockDb.AssertExpectations(t) } +var mysqlTestObserver = DbIoSeconds.WithLabelValues("mysql", "test") + func TestDBWrapper_SqlExec(t *testing.T) { mockDb := new(DbMock) dbw := NewTestDBW(mockDb) @@ -243,7 +245,7 @@ func TestDBWrapper_SqlExec(t *testing.T) { dbw.CompareAndSetConnected(true) go func() { - _, err = dbw.SqlExec("test", "test") + _, err = dbw.SqlExec(mysqlTestObserver, "test") done <- true }() @@ -304,7 +306,7 @@ func TestDBWrapper_SqlFetchAll(t *testing.T) { done := make(chan bool) dbw.CompareAndSetConnected(false) go func() { - res, err = dbw.SqlFetchAll("test", "SELECT * FROM testing0815") + res, err = dbw.SqlFetchAll(mysqlTestObserver, "SELECT * FROM testing0815") done <- true }() diff --git a/ha/ha.go b/ha/ha.go index 71882eaf..6e553d34 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -4,9 +4,11 @@ import ( "bytes" "encoding/hex" "errors" + "git.icinga.com/icingadb/icingadb-main/connection" "git.icinga.com/icingadb/icingadb-main/supervisor" "github.com/go-redis/redis" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "sync" "time" @@ -43,6 +45,18 @@ func NewHA(super *supervisor.Supervisor) (*HA, error) { return &ho, nil } +var mysqlObservers = struct { + updateIcingadbInstanceById prometheus.Observer + updateIcingadbInstanceByEnvironmentId prometheus.Observer + insertIntoIcingadbInstance prometheus.Observer + selectIdHeartbeatFromIcingadbInstanceByEnvironmentId prometheus.Observer +}{ + connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by id"), + connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by environment_id"), + connection.DbIoSeconds.WithLabelValues("mysql", "insert into icingadb_instance"), + connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat from icingadb_instance where environment_id = ourEnvID"), +} + func (h *HA) icinga2HeartBeat() { h.icinga2MTime = time.Now().Unix() } @@ -52,27 +66,27 @@ func (h *HA) AreWeActive() bool { } func (h *HA) updateOwnInstance() error { - _, err := h.super.Dbw.SqlExec("update icingadb_instance by id", + _, err := h.super.Dbw.SqlExec(mysqlObservers.updateIcingadbInstanceById, "UPDATE icingadb_instance SET heartbeat = ? WHERE id = ?", h.icinga2MTime, h.uid[:]) return err } func (h *HA) takeOverInstance() error { - _, err := h.super.Dbw.SqlExec("update icingadb_instance by environment_id", + _, err := h.super.Dbw.SqlExec(mysqlObservers.updateIcingadbInstanceByEnvironmentId, "UPDATE icingadb_instance SET id = ?, heartbeat = ? WHERE environment_id = ?", h.uid[:], h.icinga2MTime, h.super.EnvId) return err } func (h *HA) insertInstance() error { - _, err := h.super.Dbw.SqlExec("insert into icingadb_instance", + _, err := h.super.Dbw.SqlExec(mysqlObservers.insertIntoIcingadbInstance, "INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, 'y')", h.uid[:], h.super.EnvId, h.icinga2MTime) return err } func (h *HA) getInstance() (bool, uuid.UUID, int64, error) { - rows, err := h.super.Dbw.SqlFetchAll("select id, heartbeat from icingadb_instance where environment_id = ourEnvID", + rows, err := h.super.Dbw.SqlFetchAll(mysqlObservers.selectIdHeartbeatFromIcingadbInstanceByEnvironmentId, "SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1", h.super.EnvId, )