Update {host,service}_state#is_overdue

refs #38
This commit is contained in:
Alexander A. Klimov 2019-11-27 18:36:30 +01:00
parent 894fb81de4
commit 486af84c3e
4 changed files with 320 additions and 0 deletions

View file

@ -0,0 +1,181 @@
// IcingaDB | (c) 2019 Icinga GmbH | GPLv2+
package statesync
import (
"encoding/hex"
"fmt"
"github.com/Icinga/icingadb/connection"
"github.com/Icinga/icingadb/supervisor"
"github.com/Icinga/icingadb/utils"
"github.com/go-redis/redis"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"strconv"
"strings"
"sync/atomic"
"time"
)
// luaGetOverdues takes the following KEYS:
// * either icinga:nextupdate:host or icinga:nextupdate:service
// * either icingadb:overdue:host or icingadb:overdue:service
// * a random one
//
// It takes the following ARGV:
// * the current date and time as *nix timestamp float in seconds
//
// It returns the following:
// * overdue monitored objects not yet marked overdue
// * not overdue monitored objects not yet unmarked overdue
var luaGetOverdues = redis.NewScript(`
local icingaNextupdate = KEYS[1]
local icingadbOverdue = KEYS[2]
local tempOverdue = KEYS[3]
local now = ARGV[1]
redis.call('DEL', tempOverdue)
local zrbs = redis.call('ZRANGEBYSCORE', icingaNextupdate, '-inf', '(' .. now)
for i = 1, #zrbs do
redis.call('SADD', tempOverdue, zrbs[i])
end
zrbs = nil
local res = {redis.call('SDIFF', tempOverdue, icingadbOverdue), redis.call('SDIFF', icingadbOverdue, tempOverdue)}
redis.call('DEL', tempOverdue)
return res
`)
// overdueSyncCounters count on how many host/service is_overdue have synced since the last logOverdueSyncCounters().
var overdueSyncCounters = struct {
host, service uint64
}{}
var updateHostOverdue = connection.DbIoSeconds.WithLabelValues("mysql", "update host_state by host_id")
var updateServiceOverdue = connection.DbIoSeconds.WithLabelValues("mysql", "update service_state by service_id")
// startOverdueSync starts the sync goroutines for hosts and services.
func startOverdueSync(super *supervisor.Supervisor) {
go syncOverdue(super, "host", &overdueSyncCounters.host, updateHostOverdue)
go syncOverdue(super, "service", &overdueSyncCounters.service, updateServiceOverdue)
go logOverdueSyncCounters()
}
// logOverdueSyncCounters logs the amount of synced is_overdue every 20 seconds.
func logOverdueSyncCounters() {
every20s := time.NewTicker(time.Second * 20)
defer every20s.Stop()
for {
<-every20s.C
host := atomic.SwapUint64(&overdueSyncCounters.host, 0)
service := atomic.SwapUint64(&overdueSyncCounters.service, 0)
if host > 0 || service > 0 {
log.WithFields(log.Fields{
"host": host,
"service": service,
"period": 20 * time.Second,
}).Infof("Synced some host and service overdue indicators")
}
}
}
// syncOverdue tries to sync is_overdue of given object type every second.
func syncOverdue(super *supervisor.Supervisor, objectType string, counter *uint64, observer prometheus.Observer) {
for super.EnvId == nil {
log.Debug("OverdueSync: Waiting for EnvId to be set")
time.Sleep(time.Second)
}
every1s := time.NewTicker(time.Second)
defer every1s.Stop()
keys := [3]string{"icinga:nextupdate:" + objectType, "icingadb:overdue:" + objectType, ""}
for {
uuid4, errNR := uuid.NewRandom()
if errNR != nil {
log.WithFields(log.Fields{"error": errNR}).Error("Couldn't generate a new UUIDv4")
time.Sleep(time.Second)
continue
}
keys[2] = uuid4.String()
break
}
for {
overdues, errRS := luaGetOverdues.Run(
super.Rdbw,
keys[:],
strconv.FormatFloat(utils.TimeToFloat(time.Now()), 'f', -1, 64),
).Result()
if errRS != nil {
super.ChErr <- errRS
time.Sleep(time.Second)
continue
}
root := overdues.([]interface{})
updateOverdue(super, objectType, counter, observer, root[0].([]interface{}), true)
updateOverdue(super, objectType, counter, observer, root[1].([]interface{}), false)
<-every1s.C
}
}
// updateOverdue sets objectType_state#is_overdue for ids to overdue
// and updates icingadb:overdue:objectType respectively.
func updateOverdue(super *supervisor.Supervisor, objectType string, counter *uint64, observer prometheus.Observer, ids []interface{}, overdue bool) {
if len(ids) > 0 {
placeholders := make([]string, 0, len(ids))
for len(placeholders) < cap(placeholders) {
placeholders = append(placeholders, "?")
}
args := make([]interface{}, 0, len(ids))
for _, hexId := range ids {
id, errHD := hex.DecodeString(hexId.(string))
if errHD != nil {
super.ChErr <- errHD
return
}
args = append(args, id)
}
_, errSE := super.Dbw.SqlExec(
observer,
fmt.Sprintf(
"UPDATE %s_state SET is_overdue='%s' WHERE %s_id IN (%s)",
objectType, utils.Bool[overdue], objectType, strings.Join(placeholders, ","),
),
args...,
)
if errSE != nil {
super.ChErr <- errSE
return
}
atomic.AddUint64(counter, uint64(len(ids)))
var op func(key string, members ...interface{}) *redis.IntCmd
if overdue {
op = super.Rdbw.SAdd
} else {
op = super.Rdbw.SRem
}
if _, errOp := op("icingadb:overdue:"+objectType, ids...).Result(); errOp != nil {
super.ChErr <- errSE
}
}
}

View file

@ -31,6 +31,8 @@ var mysqlObservers = func() (mysqlObservers map[string]prometheus.Observer) {
// StartStateSync starts the sync goroutines for hosts and services.
func StartStateSync(super *supervisor.Supervisor) {
startOverdueSync(super)
go func() {
for {
syncStates(super, "host")

View file

@ -81,6 +81,12 @@ type RedisClient interface {
TxPipelined(fn func(redis.Pipeliner) error) ([]redis.Cmder, error)
Pipeline() redis.Pipeliner
Subscribe(channels ...string) *redis.PubSub
Eval(script string, keys []string, args ...interface{}) *redis.Cmd
EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
ScriptExists(hashes ...string) *redis.BoolSliceCmd
ScriptLoad(script string) *redis.StringCmd
SAdd(key string, members ...interface{}) *redis.IntCmd
SRem(key string, members ...interface{}) *redis.IntCmd
}
type StatusCmd interface {
@ -339,6 +345,132 @@ func (rdbw *RDBWrapper) HGetAll(key string) *redis.StringStringMapCmd {
}
}
// Eval is a wrapper for connection handling.
func (rdbw *RDBWrapper) Eval(script string, keys []string, args ...interface{}) *redis.Cmd {
for {
if !rdbw.IsConnected() {
rdbw.WaitForConnection()
continue
}
cmd := rdbw.Rdb.Eval(script, keys, args...)
_, err := cmd.Result()
if err != nil {
if !rdbw.CheckConnection(false) {
continue
}
}
return cmd
}
}
// EvalSha is a wrapper for connection handling.
func (rdbw *RDBWrapper) EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd {
for {
if !rdbw.IsConnected() {
rdbw.WaitForConnection()
continue
}
cmd := rdbw.Rdb.EvalSha(sha1, keys, args...)
_, err := cmd.Result()
if err != nil {
if !rdbw.CheckConnection(false) {
continue
}
}
return cmd
}
}
// ScriptExists is a wrapper for connection handling.
func (rdbw *RDBWrapper) ScriptExists(hashes ...string) *redis.BoolSliceCmd {
for {
if !rdbw.IsConnected() {
rdbw.WaitForConnection()
continue
}
cmd := rdbw.Rdb.ScriptExists(hashes...)
_, err := cmd.Result()
if err != nil {
if !rdbw.CheckConnection(false) {
continue
}
}
return cmd
}
}
// ScriptLoad is a wrapper for connection handling.
func (rdbw *RDBWrapper) ScriptLoad(script string) *redis.StringCmd {
for {
if !rdbw.IsConnected() {
rdbw.WaitForConnection()
continue
}
cmd := rdbw.Rdb.ScriptLoad(script)
_, err := cmd.Result()
if err != nil {
if !rdbw.CheckConnection(false) {
continue
}
}
return cmd
}
}
// SAdd is a wrapper for connection handling.
func (rdbw *RDBWrapper) SAdd(key string, members ...interface{}) *redis.IntCmd {
for {
if !rdbw.IsConnected() {
rdbw.WaitForConnection()
continue
}
cmd := rdbw.Rdb.SAdd(key, members...)
_, err := cmd.Result()
if err != nil {
if !rdbw.CheckConnection(false) {
continue
}
}
return cmd
}
}
// SRem is a wrapper for connection handling.
func (rdbw *RDBWrapper) SRem(key string, members ...interface{}) *redis.IntCmd {
for {
if !rdbw.IsConnected() {
rdbw.WaitForConnection()
continue
}
cmd := rdbw.Rdb.SRem(key, members...)
_, err := cmd.Result()
if err != nil {
if !rdbw.CheckConnection(false) {
continue
}
}
return cmd
}
}
// TxPipelined is a wrapper for auto-logging and connection handling.
func (rdbw *RDBWrapper) TxPipelined(fn func(pipeliner redis.Pipeliner) error) ([]redis.Cmder, error) {
for {

View file

@ -139,3 +139,8 @@ func MillisecsToTime(millis float64) time.Time {
return time.Unix(int64(wholeSecs), int64((secs-wholeSecs)*(float64(time.Second)/float64(time.Nanosecond))))
}
func TimeToFloat(t time.Time) float64 {
secs := t.Unix()
return float64(secs) + float64(t.Sub(time.Unix(secs, 0)))/float64(time.Second)
}