Merge branch 'feature/pre-create-observers-5' into 'master'

Pre-create all prometheus.Observers

Closes #5

See merge request icingadb/icingadb-main!13
This commit is contained in:
Noah Hilverling 2019-09-23 11:23:15 +02:00
commit f7ca393d29
5 changed files with 93 additions and 38 deletions

View file

@ -5,6 +5,7 @@ import (
"git.icinga.com/icingadb/icingadb-main/connection"
"git.icinga.com/icingadb/icingadb-main/supervisor"
"github.com/go-redis/redis"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"time"
)
@ -12,6 +13,16 @@ import (
//Counter on how many host/service states have synced since the last logSyncCounters()
var syncCounter = make(map[string]int)
var mysqlObservers = func() (mysqlObservers map[string]prometheus.Observer) {
mysqlObservers = map[string]prometheus.Observer{}
for _, objectType := range [2]string{"host", "service"} {
mysqlObservers[objectType] = connection.DbIoSeconds.WithLabelValues("mysql", "replace into "+objectType+"_state")
}
return
}()
//Start the sync goroutines for hosts and services
func StartStateSync(super *supervisor.Supervisor) {
go func() {
@ -79,7 +90,7 @@ func syncStates(super *supervisor.Supervisor, objectType string) {
_, errExec := super.Dbw.SqlExecTx(
tx,
"replace into "+objectType+"_state",
mysqlObservers[objectType],
`REPLACE INTO `+objectType+`_state (`+objectType+`_id, env_id, state_type, soft_state, hard_state, attempt, severity, output, long_output, performance_data,`+
`check_commandline, is_problem, is_handled, is_reachable, is_flapping, is_acknowledged, acknowledgement_comment_id,`+
`in_downtime, execution_time, latency, timeout, last_update, last_state_change, last_soft_state,`+

View file

@ -6,6 +6,7 @@ import (
"database/sql"
"fmt"
"git.icinga.com/icingadb/icingadb-main/utils"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"strings"
"sync"
@ -13,6 +14,24 @@ import (
"time"
)
var mysqlObservers = struct {
begin prometheus.Observer
commit prometheus.Observer
rollback prometheus.Observer
transaction prometheus.Observer
bulkInsert prometheus.Observer
bulkDelete prometheus.Observer
bulkUpdate prometheus.Observer
}{
DbIoSeconds.WithLabelValues("mysql", "begin"),
DbIoSeconds.WithLabelValues("mysql", "commit"),
DbIoSeconds.WithLabelValues("mysql", "rollback"),
DbIoSeconds.WithLabelValues("mysql", "transaction"),
DbIoSeconds.WithLabelValues("mysql", "Bulk insert"),
DbIoSeconds.WithLabelValues("mysql", "Bulk delete"),
DbIoSeconds.WithLabelValues("mysql", "Bulk update"),
}
// This is used in SqlFetchAll and SqlFetchAllQuiet
type DbClientOrTransaction interface {
Query(query string, args ...interface{}) (*sql.Rows, error)
@ -195,7 +214,7 @@ func (dbw *DBWrapper) SqlBegin(concurrencySafety bool, quiet bool) (DbTransactio
tx, err = dbw.Db.BeginTx(context.Background(), &sql.TxOptions{Isolation: isoLvl})
benchmarc.Stop()
DbIoSeconds.WithLabelValues("mysql", "begin").Observe(benchmarc.Seconds())
mysqlObservers.begin.Observe(benchmarc.Seconds())
log.WithFields(log.Fields{
"context": "sql",
@ -230,7 +249,7 @@ func (dbw *DBWrapper) SqlCommit(tx DbTransaction, quiet bool) error {
err = tx.Commit()
benchmarc.Stop()
DbIoSeconds.WithLabelValues("mysql", "commit").Observe(benchmarc.Seconds())
mysqlObservers.commit.Observe(benchmarc.Seconds())
log.WithFields(log.Fields{
"context": "sql",
@ -263,7 +282,7 @@ func (dbw *DBWrapper) SqlRollback(tx DbTransaction, quiet bool) error {
err = tx.Rollback()
benchmarc.Stop()
DbIoSeconds.WithLabelValues("mysql", "rollback").Observe(benchmarc.Seconds())
mysqlObservers.rollback.Observe(benchmarc.Seconds())
log.WithFields(log.Fields{
"context": "sql",
@ -284,43 +303,43 @@ func (dbw *DBWrapper) SqlRollback(tx DbTransaction, quiet bool) error {
}
// Wrapper around sql.Exec() for auto-logging
func (dbw *DBWrapper) SqlExec(opDescription string, sql string, args ...interface{}) (sql.Result, error) {
return dbw.sqlExecInternal(dbw.Db, opDescription, sql, false, args...)
func (dbw *DBWrapper) SqlExec(opObserver prometheus.Observer, sql string, args ...interface{}) (sql.Result, error) {
return dbw.sqlExecInternal(dbw.Db, opObserver, sql, false, args...)
}
// No logging, no benchmarking
func (dbw *DBWrapper) SqlExecQuiet(opDescription string, sql string, args ...interface{}) (sql.Result, error) {
return dbw.sqlExecInternal(dbw.Db, opDescription, sql, true, args...)
func (dbw *DBWrapper) SqlExecQuiet(opObserver prometheus.Observer, sql string, args ...interface{}) (sql.Result, error) {
return dbw.sqlExecInternal(dbw.Db, opObserver, sql, true, args...)
}
// Wrapper around tx.Exec() for auto-logging
func (dbw *DBWrapper) SqlExecTx(tx DbTransaction, opDescription string, sql string, args ...interface{}) (sql.Result, error) {
return dbw.sqlExecInternal(tx, opDescription, sql, false, args...)
func (dbw *DBWrapper) SqlExecTx(tx DbTransaction, opObserver prometheus.Observer, sql string, args ...interface{}) (sql.Result, error) {
return dbw.sqlExecInternal(tx, opObserver, sql, false, args...)
}
// No logging, no benchmarking
func (dbw *DBWrapper) SqlExecTxQuiet(tx DbTransaction, opDescription string, sql string, args ...interface{}) (sql.Result, error) {
return dbw.sqlExecInternal(tx, opDescription, sql, true, args...)
func (dbw *DBWrapper) SqlExecTxQuiet(tx DbTransaction, opObserver prometheus.Observer, sql string, args ...interface{}) (sql.Result, error) {
return dbw.sqlExecInternal(tx, opObserver, sql, true, args...)
}
func (dbw *DBWrapper) SqlFetchAll(queryDescription string, query string, args ...interface{}) ([][]interface{}, error) {
return dbw.sqlFetchAllInternal(dbw.Db, queryDescription, query, false, args...)
func (dbw *DBWrapper) SqlFetchAll(queryObserver prometheus.Observer, query string, args ...interface{}) ([][]interface{}, error) {
return dbw.sqlFetchAllInternal(dbw.Db, queryObserver, query, false, args...)
}
func (dbw *DBWrapper) SqlFetchAllQuiet(queryDescription string, query string, args ...interface{}) ([][]interface{}, error) {
return dbw.sqlFetchAllInternal(dbw.Db, queryDescription, query, true, args...)
func (dbw *DBWrapper) SqlFetchAllQuiet(queryObserver prometheus.Observer, query string, args ...interface{}) ([][]interface{}, error) {
return dbw.sqlFetchAllInternal(dbw.Db, queryObserver, query, true, args...)
}
func (dbw *DBWrapper) SqlFetchAllTx(tx DbTransaction, queryDescription string, query string, args ...interface{}) ([][]interface{}, error) {
return dbw.sqlFetchAllInternal(tx, queryDescription, query, false, args...)
func (dbw *DBWrapper) SqlFetchAllTx(tx DbTransaction, queryObserver prometheus.Observer, query string, args ...interface{}) ([][]interface{}, error) {
return dbw.sqlFetchAllInternal(tx, queryObserver, query, false, args...)
}
func (dbw *DBWrapper) SqlFetchAllTxQuiet(tx DbTransaction, queryDescription string, query string, args ...interface{}) ([][]interface{}, error) {
return dbw.sqlFetchAllInternal(tx, queryDescription, query, true, args...)
func (dbw *DBWrapper) SqlFetchAllTxQuiet(tx DbTransaction, queryObserver prometheus.Observer, query string, args ...interface{}) ([][]interface{}, error) {
return dbw.sqlFetchAllInternal(tx, queryObserver, query, true, args...)
}
// Wrapper around sql.Exec() for auto-logging
func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opDescription string, sql string, quiet bool, args ...interface{}) (sql.Result, error) {
func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opObserver prometheus.Observer, sql string, quiet bool, args ...interface{}) (sql.Result, error) {
for {
if !dbw.IsConnected() {
dbw.WaitForConnection()
@ -340,7 +359,7 @@ func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opDescription st
}
if !quiet {
DbIoSeconds.WithLabelValues("mysql", opDescription).Observe(benchmarc.Seconds())
opObserver.Observe(benchmarc.Seconds())
log.WithFields(log.Fields{
"context": "sql",
"benchmark": benchmarc,
@ -362,14 +381,14 @@ func (dbw *DBWrapper) sqlExecInternal(db DbClientOrTransaction, opDescription st
}
// Wrapper around Db.SqlQuery() for auto-logging
func (dbw *DBWrapper) sqlFetchAllInternal(db DbClientOrTransaction, queryDescription string, query string, quiet bool, args ...interface{}) ([][]interface{}, error) {
func (dbw *DBWrapper) sqlFetchAllInternal(db DbClientOrTransaction, queryObserver prometheus.Observer, query string, quiet bool, args ...interface{}) ([][]interface{}, error) {
for {
if !dbw.IsConnected() {
dbw.WaitForConnection()
continue
}
res, err := sqlTryFetchAll(db, queryDescription, query, quiet, args...)
res, err := sqlTryFetchAll(db, queryObserver, query, quiet, args...)
if err != nil {
if _, isDb := db.(*sql.DB); isDb {
@ -383,7 +402,7 @@ func (dbw *DBWrapper) sqlFetchAllInternal(db DbClientOrTransaction, queryDescrip
}
}
func sqlTryFetchAll(db DbClientOrTransaction, queryDescription string, query string, quiet bool, args ...interface{}) ([][]interface{}, error) {
func sqlTryFetchAll(db DbClientOrTransaction, queryObserver prometheus.Observer, query string, quiet bool, args ...interface{}) ([][]interface{}, error) {
var benchmarc *utils.Benchmark
if !quiet {
benchmarc = utils.NewBenchmark()
@ -397,7 +416,7 @@ func sqlTryFetchAll(db DbClientOrTransaction, queryDescription string, query str
defer func() {
if !quiet {
DbIoSeconds.WithLabelValues("mysql", queryDescription).Observe(benchmarc.Seconds())
queryObserver.Observe(benchmarc.Seconds())
log.WithFields(log.Fields{
"context": "sql",
"benchmark": benchmarc,
@ -484,7 +503,7 @@ func (dbw DBWrapper) SqlTransaction(concurrencySafety bool, retryOnConnectionFai
errTx := dbw.sqlTryTransaction(f, concurrencySafety, false)
if !quiet {
benchmarc.Stop()
DbIoSeconds.WithLabelValues("mysql", "transaction").Observe(benchmarc.Seconds())
mysqlObservers.transaction.Observe(benchmarc.Seconds())
log.WithFields(log.Fields{
"context": "sql",
@ -653,7 +672,7 @@ func (dbw *DBWrapper) SqlBulkInsert(rows []Row, stmt *BulkInsertStmt) error {
query := fmt.Sprintf(stmt.Format, strings.Join(placeholders, ", "))
_, err := dbw.WithRetry(func() (result sql.Result, e error) {
return dbw.SqlExec("Bulk insert", query, values...)
return dbw.SqlExec(mysqlObservers.bulkInsert, query, values...)
})
if err != nil {
@ -684,7 +703,7 @@ func (dbw *DBWrapper) SqlBulkDelete(keys []string, stmt *BulkDeleteStmt) error {
query := fmt.Sprintf(stmt.Format, placeholders)
_, err := dbw.WithRetry(func() (result sql.Result, e error) {
return dbw.SqlExec("Bulk delete", query, values...)
return dbw.SqlExec(mysqlObservers.bulkDelete, query, values...)
})
if err != nil {
return err
@ -717,7 +736,7 @@ func (dbw *DBWrapper) SqlBulkUpdate(rows []Row, stmt *BulkUpdateStmt) error {
query := fmt.Sprintf(stmt.Format, strings.Join(placeholders, ", "))
_, err := dbw.WithRetry(func() (result sql.Result, e error) {
return dbw.SqlExec("Bulk update", query, values...)
return dbw.SqlExec(mysqlObservers.bulkUpdate, query, values...)
})
if err != nil {
return err

View file

@ -230,6 +230,8 @@ func TestDBWrapper_SqlQuery(t *testing.T) {
mockDb.AssertExpectations(t)
}
var mysqlTestObserver = DbIoSeconds.WithLabelValues("mysql", "test")
func TestDBWrapper_SqlExec(t *testing.T) {
mockDb := new(DbMock)
dbw := NewTestDBW(mockDb)
@ -243,7 +245,7 @@ func TestDBWrapper_SqlExec(t *testing.T) {
dbw.CompareAndSetConnected(true)
go func() {
_, err = dbw.SqlExec("test", "test")
_, err = dbw.SqlExec(mysqlTestObserver, "test")
done <- true
}()
@ -304,7 +306,7 @@ func TestDBWrapper_SqlFetchAll(t *testing.T) {
done := make(chan bool)
dbw.CompareAndSetConnected(false)
go func() {
res, err = dbw.SqlFetchAll("test", "SELECT * FROM testing0815")
res, err = dbw.SqlFetchAll(mysqlTestObserver, "SELECT * FROM testing0815")
done <- true
}()

View file

@ -4,6 +4,7 @@ import (
"fmt"
"git.icinga.com/icingadb/icingadb-main/utils"
"github.com/go-redis/redis"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"sync"
"sync/atomic"
@ -58,6 +59,14 @@ var RedisWriter = Icinga2RedisWriter{
},
}
var redisObservers = struct {
hgetall prometheus.Observer
multi prometheus.Observer
}{
DbIoSeconds.WithLabelValues("redis", "hgetall"),
DbIoSeconds.WithLabelValues("redis", "multi"),
}
type RedisClient interface {
Ping() *redis.StatusCmd
Publish(channel string, message interface{}) *redis.IntCmd
@ -304,7 +313,7 @@ func (rdbw *RDBWrapper) HGetAll(key string) *redis.StringStringMapCmd {
benchmarc.Stop()
DbIoSeconds.WithLabelValues("redis", "hgetall").Observe(benchmarc.Seconds())
redisObservers.hgetall.Observe(benchmarc.Seconds())
log.WithFields(log.Fields{
"context": "redis",
@ -336,7 +345,7 @@ func (rdbw *RDBWrapper) TxPipelined(fn func(pipeliner redis.Pipeliner) error) ([
benchmarc.Stop()
DbIoSeconds.WithLabelValues("redis", "multi").Observe(benchmarc.Seconds())
redisObservers.multi.Observe(benchmarc.Seconds())
log.WithFields(log.Fields{
"context": "redis",

View file

@ -4,9 +4,11 @@ import (
"bytes"
"encoding/hex"
"errors"
"git.icinga.com/icingadb/icingadb-main/connection"
"git.icinga.com/icingadb/icingadb-main/supervisor"
"github.com/go-redis/redis"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"sync"
"time"
@ -43,6 +45,18 @@ func NewHA(super *supervisor.Supervisor) (*HA, error) {
return &ho, nil
}
var mysqlObservers = struct {
updateIcingadbInstanceById prometheus.Observer
updateIcingadbInstanceByEnvironmentId prometheus.Observer
insertIntoIcingadbInstance prometheus.Observer
selectIdHeartbeatFromIcingadbInstanceByEnvironmentId prometheus.Observer
}{
connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by id"),
connection.DbIoSeconds.WithLabelValues("mysql", "update icingadb_instance by environment_id"),
connection.DbIoSeconds.WithLabelValues("mysql", "insert into icingadb_instance"),
connection.DbIoSeconds.WithLabelValues("mysql", "select id, heartbeat from icingadb_instance where environment_id = ourEnvID"),
}
func (h *HA) icinga2HeartBeat() {
h.icinga2MTime = time.Now().Unix()
}
@ -52,27 +66,27 @@ func (h *HA) AreWeActive() bool {
}
func (h *HA) updateOwnInstance() error {
_, err := h.super.Dbw.SqlExec("update icingadb_instance by id",
_, err := h.super.Dbw.SqlExec(mysqlObservers.updateIcingadbInstanceById,
"UPDATE icingadb_instance SET heartbeat = ? WHERE id = ?", h.icinga2MTime, h.uid[:])
return err
}
func (h *HA) takeOverInstance() error {
_, err := h.super.Dbw.SqlExec("update icingadb_instance by environment_id",
_, err := h.super.Dbw.SqlExec(mysqlObservers.updateIcingadbInstanceByEnvironmentId,
"UPDATE icingadb_instance SET id = ?, heartbeat = ? WHERE environment_id = ?",
h.uid[:], h.icinga2MTime, h.super.EnvId)
return err
}
func (h *HA) insertInstance() error {
_, err := h.super.Dbw.SqlExec("insert into icingadb_instance",
_, err := h.super.Dbw.SqlExec(mysqlObservers.insertIntoIcingadbInstance,
"INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, 'y')",
h.uid[:], h.super.EnvId, h.icinga2MTime)
return err
}
func (h *HA) getInstance() (bool, uuid.UUID, int64, error) {
rows, err := h.super.Dbw.SqlFetchAll("select id, heartbeat from icingadb_instance where environment_id = ourEnvID",
rows, err := h.super.Dbw.SqlFetchAll(mysqlObservers.selectIdHeartbeatFromIcingadbInstanceByEnvironmentId,
"SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1",
h.super.EnvId,
)