Add and use configobject.ObjectInformation instead of SyncContext

This commit is contained in:
Noah Hilverling 2019-05-15 14:03:15 +02:00
parent 2d8d01fc2b
commit c052a3199f
12 changed files with 163 additions and 177 deletions

View file

@ -1,10 +1,13 @@
package configobject
type Row interface {
InsertValues() []interface{}
UpdateValues() []interface{}
GetId() string
SetId(id string)
}
import (
"git.icinga.com/icingadb/icingadb-main/connection"
)
type RowFactory func() Row
type ObjectInformation struct {
ObjectType string
Factory connection.RowFactory
BulkInsertStmt *connection.BulkInsertStmt
BulkDeleteStmt *connection.BulkDeleteStmt
BulkUpdateStmt *connection.BulkUpdateStmt
}

View file

@ -14,15 +14,6 @@ import (
"sync/atomic"
)
// Context provides an Operator with all necessary information to sync a config type
type Context struct {
ObjectType string
Factory configobject.RowFactory
InsertStmt *connection.BulkInsertStmt
DeleteStmt *connection.BulkDeleteStmt
UpdateStmt *connection.BulkUpdateStmt
}
type Checksums struct {
NameChecksum string `json:"name_checksum"`
PropertiesChecksum string `json:"properties_checksum"`
@ -32,10 +23,10 @@ type Checksums struct {
// Operator is the main worker for each config type. It takes a reference to a supervisor super, holding all required
// connection information and other control mechanisms, a channel chHA, which informs the Operator of the current HA
// state, and a Context reference ctx defining the type and providing the necessary factories.
func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
//insert, update, delete := GetDelta(super, ctx)
//log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete))
// state, and a ObjectInformation reference defining the type and providing the necessary factories.
func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *configobject.ObjectInformation) error {
//insert, update, delete := GetDelta(super, objectInformation)
//log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", objectInformation.ObjectType, len(insert), len(update), len(delete))
var (
// If this IcingaDB-Instance looses responsibility, this channel will be
@ -46,7 +37,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
chInsert chan []string
// Used by the JsonDecodePool to provide the InsertExecWorker with decoded rows, ready to be inserted
// JsonDecodePool -> InsertExecWorker
chInsertBack chan []configobject.Row
chInsertBack chan []connection.Row
// Used by this Operator to provide the DeleteExecWorker with IDs to delete
// Operator -> DeleteExecWorker
chDelete chan []string
@ -58,7 +49,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
chUpdate chan []string
// Used by the JsonDecodePool to provide the UpdateExecWorker with decoded rows, ready to be updated
// JsonDecodePool -> UpdateExecWorker
chUpdateBack chan []configobject.Row
chUpdateBack chan []connection.Row
wgInsert *sync.WaitGroup
wgDelete *sync.WaitGroup
wgUpdate *sync.WaitGroup
@ -67,41 +58,41 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
switch msg {
// Icinga 2 probably died, stop operations and tell all workers to shut down.
case ha.Notify_StopSync:
log.Info(fmt.Sprintf("%s: Lost responsibility", ctx.ObjectType))
log.Info(fmt.Sprintf("%s: Lost responsibility", objectInformation.ObjectType))
if done != nil {
close(done)
done = nil
}
// Starts up the whole sync process.
case ha.Notify_StartSync:
log.Infof("%s: Got responsibility", ctx.ObjectType)
log.Infof("%s: Got responsibility", objectInformation.ObjectType)
//TODO: This should only be done, if HA was taken over from another instance
insert, update, delete := GetDelta(super, ctx)
log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete))
insert, update, delete := GetDelta(super, objectInformation)
log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", objectInformation.ObjectType, len(insert), len(update), len(delete))
// Clean up all channels and wait groups for a fresh config dump
done = make(chan struct{})
chInsert = make(chan []string)
chInsertBack = make(chan []configobject.Row)
chInsertBack = make(chan []connection.Row)
chDelete = make(chan []string)
chUpdateComp = make(chan []string)
chUpdate = make(chan []string)
chUpdateBack = make(chan []configobject.Row)
chUpdateBack = make(chan []connection.Row)
wgInsert = &sync.WaitGroup{}
wgDelete = &sync.WaitGroup{}
wgUpdate = &sync.WaitGroup{}
updateCounter := new(uint32)
go InsertPrepWorker(super, ctx, done, chInsert, chInsertBack)
go InsertExecWorker(super, ctx, done, chInsertBack, wgInsert)
go InsertPrepWorker(super, objectInformation, done, chInsert, chInsertBack)
go InsertExecWorker(super, objectInformation, done, chInsertBack, wgInsert)
go DeleteExecWorker(super, ctx, done, chDelete, wgDelete)
go DeleteExecWorker(super, objectInformation, done, chDelete, wgDelete)
go UpdateCompWorker(super, ctx, done, chUpdateComp, chUpdate, wgUpdate)
go UpdatePrepWorker(super, ctx, done, chUpdate, chUpdateBack)
go UpdateExecWorker(super, ctx, done, chUpdateBack, wgUpdate, updateCounter)
go UpdateCompWorker(super, objectInformation, done, chUpdateComp, chUpdate, wgUpdate)
go UpdatePrepWorker(super, objectInformation, done, chUpdate, chUpdateBack)
go UpdateExecWorker(super, objectInformation, done, chUpdateBack, wgUpdate, updateCounter)
waitOrKill := func(wg *sync.WaitGroup, done chan struct{}) (kill bool) {
waitDone := make(chan bool)
@ -130,11 +121,11 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
benchmarc.Stop()
if !kill {
log.WithFields(log.Fields{
"type": ctx.ObjectType,
"type": objectInformation.ObjectType,
"count": len(insert),
"benchmark": benchmarc.String(),
"action": "insert",
}).Infof("Inserted %v %ss in %v", len(insert), ctx.ObjectType, benchmarc.String())
}).Infof("Inserted %v %ss in %v", len(insert), objectInformation.ObjectType, benchmarc.String())
}
}()
@ -150,11 +141,11 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
benchmarc.Stop()
if !kill {
log.WithFields(log.Fields{
"type": ctx.ObjectType,
"type": objectInformation.ObjectType,
"count": len(delete),
"benchmark": benchmarc.String(),
"action": "delete",
}).Infof("Deleted %v %ss in %v", len(delete), ctx.ObjectType, benchmarc.String())
}).Infof("Deleted %v %ss in %v", len(delete), objectInformation.ObjectType, benchmarc.String())
}
}()
@ -170,11 +161,11 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
benchmarc.Stop()
if !kill {
log.WithFields(log.Fields{
"type": ctx.ObjectType,
"type": objectInformation.ObjectType,
"count": atomic.LoadUint32(updateCounter),
"benchmark": benchmarc.String(),
"action": "update",
}).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), ctx.ObjectType, benchmarc.String())
}).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), objectInformation.ObjectType, benchmarc.String())
}
}()
}
@ -183,12 +174,12 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
return nil
}
// GetDelta takes a config Context (host, service, checkcommand, etc.) and fetches the ids from MySQL and Redis. It
// GetDelta takes the ObjectInformation (host, service, checkcommand, etc.) and fetches the ids from MySQL and Redis. It
// returns three string slices:
// 1. IDs which are in the Redis but not in the MySQL (to insert)
// 2. IDs which are in both (to possibly update)
// 3. IDs which are in the MySQL but not the Redis (to delete)
func GetDelta(super *supervisor.Supervisor, ctx *Context) ([]string, []string, []string) {
func GetDelta(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation) ([]string, []string, []string) {
var (
redisIds []string
mysqlIds []string
@ -200,7 +191,7 @@ func GetDelta(super *supervisor.Supervisor, ctx *Context) ([]string, []string, [
go func() {
defer wg.Done()
var err error
res, err := super.Rdbw.HKeys(fmt.Sprintf("icinga:config:checksum:%s", ctx.ObjectType)).Result()
res, err := super.Rdbw.HKeys(fmt.Sprintf("icinga:config:checksum:%s", objectInformation.ObjectType)).Result()
if err != nil {
super.ChErr <- err
return
@ -214,7 +205,7 @@ func GetDelta(super *supervisor.Supervisor, ctx *Context) ([]string, []string, [
defer wg.Done()
var err error
super.EnvLock.Lock()
mysqlIds, err = super.Dbw.SqlFetchIds(super.EnvId, ctx.ObjectType)
mysqlIds, err = super.Dbw.SqlFetchIds(super.EnvId, objectInformation.ObjectType)
super.EnvLock.Unlock()
if err != nil {
super.ChErr <- err
@ -227,8 +218,8 @@ func GetDelta(super *supervisor.Supervisor, ctx *Context) ([]string, []string, [
}
// InsertPrepWorker fetches config for IDs(chInsert) from Redis, wraps it into JsonDecodePackages and throws it into the JsonDecodePool
func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chInsert <-chan []string, chInsertBack chan<- []configobject.Row) {
defer log.Infof("%s: Insert preparation routine stopped", ctx.ObjectType)
func InsertPrepWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chInsert <-chan []string, chInsertBack chan<- []connection.Row) {
defer log.Infof("%s: Insert preparation routine stopped", objectInformation.ObjectType)
prep := func(chunk *connection.ConfigChunk) {
pkgs := jsondecoder.JsonDecodePackages{
@ -242,8 +233,8 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
Id: key,
ChecksumsRaw: chunk.Checksums[i].(string),
ConfigRaw: chunk.Configs[i].(string),
Factory: ctx.Factory,
ObjectType: ctx.ObjectType,
Factory: objectInformation.Factory,
ObjectType: objectInformation.ObjectType,
}
pkgs.Packages = append(pkgs.Packages, pkg)
}
@ -260,7 +251,7 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
default:
}
ch := super.Rdbw.PipeConfigChunks(done, keys, ctx.ObjectType)
ch := super.Rdbw.PipeConfigChunks(done, keys, objectInformation.ObjectType)
go func() {
for chunk := range ch {
go prep(chunk)
@ -269,8 +260,8 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
}
}
// InsertExecWorker gets decoded configobject.Row objects from the JsonDecodePool and inserts them into MySQL
func InsertExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chInsertBack <-chan []configobject.Row, wg *sync.WaitGroup) {
// InsertExecWorker gets decoded connection.Row objects from the JsonDecodePool and inserts them into MySQL
func InsertExecWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chInsertBack <-chan []connection.Row, wg *sync.WaitGroup) {
for rows := range chInsertBack {
select {
case _, ok := <-done:
@ -280,15 +271,15 @@ func InsertExecWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
default:
}
go func(rows []configobject.Row) {
super.ChErr <- super.Dbw.SqlBulkInsert(rows, ctx.InsertStmt)
go func(rows []connection.Row) {
super.ChErr <- super.Dbw.SqlBulkInsert(rows, objectInformation.BulkInsertStmt)
wg.Add(-len(rows))
}(rows)
}
}
// DeleteExecWorker deletes IDs(chDelete) from MySQL
func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chDelete <-chan []string, wg *sync.WaitGroup) {
func DeleteExecWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chDelete <-chan []string, wg *sync.WaitGroup) {
for keys := range chDelete {
select {
case _, ok := <-done:
@ -299,7 +290,7 @@ func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
}
go func(keys []string) {
super.ChErr <- super.Dbw.SqlBulkDelete(keys, ctx.DeleteStmt)
super.ChErr <- super.Dbw.SqlBulkDelete(keys, objectInformation.BulkDeleteStmt)
wg.Add(-len(keys))
}(keys)
}
@ -307,7 +298,7 @@ func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
// UpdateCompWorker gets IDs(chUpdateComp) that might need an update, fetches the corresponding checksums for Redis and MySQL,
// compares them and inserts changed IDs into chUpdate.
func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdateComp <-chan []string, chUpdate chan<- []string, wg *sync.WaitGroup) {
func UpdateCompWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chUpdateComp <-chan []string, chUpdate chan<- []string, wg *sync.WaitGroup) {
prep := func(chunk *connection.ChecksumChunk, mysqlChecksums map[string]map[string]string) {
changed := make([]string, 0)
for i, key := range chunk.Keys {
@ -340,8 +331,8 @@ func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
default:
}
ch := super.Rdbw.PipeChecksumChunks(done, keys, ctx.ObjectType)
checksums, err := super.Dbw.SqlFetchChecksums(ctx.ObjectType, keys)
ch := super.Rdbw.PipeChecksumChunks(done, keys, objectInformation.ObjectType)
checksums, err := super.Dbw.SqlFetchChecksums(objectInformation.ObjectType, keys)
if err != nil {
super.ChErr <- err
}
@ -355,7 +346,7 @@ func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
}
// UpdatePrepWorker fetches config for IDs(chUpdate) from Redis, wraps it into JsonDecodePackages and throws it into the JsonDecodePool
func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdate <-chan []string, chUpdateBack chan<- []configobject.Row) {
func UpdatePrepWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chUpdate <-chan []string, chUpdateBack chan<- []connection.Row) {
prep := func(chunk *connection.ConfigChunk) {
pkgs := jsondecoder.JsonDecodePackages{
ChBack: chUpdateBack,
@ -368,8 +359,8 @@ func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
Id: key,
ChecksumsRaw: chunk.Checksums[i].(string),
ConfigRaw: chunk.Configs[i].(string),
Factory: ctx.Factory,
ObjectType: ctx.ObjectType,
Factory: objectInformation.Factory,
ObjectType: objectInformation.ObjectType,
}
pkgs.Packages = append(pkgs.Packages, pkg)
}
@ -386,7 +377,7 @@ func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
default:
}
ch := super.Rdbw.PipeConfigChunks(done, keys, ctx.ObjectType)
ch := super.Rdbw.PipeConfigChunks(done, keys, objectInformation.ObjectType)
go func() {
for chunk := range ch {
go prep(chunk)
@ -395,8 +386,8 @@ func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
}
}
// UpdateExecWorker gets decoded configobject.Row objects from the JsonDecodePool and updates them in MySQL
func UpdateExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdateBack <-chan []configobject.Row, wg *sync.WaitGroup, updateCounter *uint32) {
// UpdateExecWorker gets decoded connection.Row objects from the JsonDecodePool and updates them in MySQL
func UpdateExecWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chUpdateBack <-chan []connection.Row, wg *sync.WaitGroup, updateCounter *uint32) {
for rows := range chUpdateBack {
select {
case _, ok := <-done:
@ -406,8 +397,8 @@ func UpdateExecWorker(super *supervisor.Supervisor, ctx *Context, done chan stru
default:
}
go func(rows []configobject.Row) {
super.ChErr <- super.Dbw.SqlBulkUpdate(rows, ctx.UpdateStmt)
go func(rows []connection.Row) {
super.ChErr <- super.Dbw.SqlBulkUpdate(rows, objectInformation.BulkUpdateStmt)
wg.Add(-len(rows))
atomic.AddUint32(updateCounter, uint32(len(rows)))
}(rows)

View file

@ -7,9 +7,7 @@ import (
)
var (
BulkInsertStmt *connection.BulkInsertStmt
BulkDeleteStmt *connection.BulkDeleteStmt
BulkUpdateStmt *connection.BulkUpdateStmt
ObjectInformation configobject.ObjectInformation
Fields = []string{
"id",
"env_id",
@ -99,7 +97,7 @@ type Host struct {
CommandEndpointId string `json:"command_endpoint_id"`
}
func NewHost() configobject.Row {
func NewHost() connection.Row {
h := Host{}
h.NameCi = &h.Name
@ -171,7 +169,11 @@ func (h *Host) SetId(id string) {
}
func init() {
BulkInsertStmt = connection.NewBulkInsertStmt("host", Fields)
BulkDeleteStmt = connection.NewBulkDeleteStmt("host")
BulkUpdateStmt = connection.NewBulkUpdateStmt("host", Fields)
ObjectInformation = configobject.ObjectInformation{
ObjectType: "host",
Factory: NewHost,
BulkInsertStmt: connection.NewBulkInsertStmt("host", Fields),
BulkDeleteStmt: connection.NewBulkDeleteStmt("host"),
BulkUpdateStmt: connection.NewBulkUpdateStmt("host", Fields),
}
}

View file

@ -7,9 +7,7 @@ import (
)
var (
BulkInsertStmt *connection.BulkInsertStmt
BulkDeleteStmt *connection.BulkDeleteStmt
BulkUpdateStmt *connection.BulkUpdateStmt
ObjectInformation configobject.ObjectInformation
Fields = []string{
"id",
"env_id",
@ -35,7 +33,7 @@ type Hostgroup struct {
ZoneId string `json:"zone_id"`
}
func NewHostgroup() configobject.Row {
func NewHostgroup() connection.Row {
h := Hostgroup{}
h.NameCi = &h.Name
@ -75,7 +73,11 @@ func (h *Hostgroup) SetId(id string) {
}
func init() {
BulkInsertStmt = connection.NewBulkInsertStmt("hostgroup", Fields)
BulkDeleteStmt = connection.NewBulkDeleteStmt("hostgroup")
BulkUpdateStmt = connection.NewBulkUpdateStmt("hostgroup", Fields)
ObjectInformation = configobject.ObjectInformation{
ObjectType: "hostgroup",
Factory: NewHostgroup,
BulkInsertStmt: connection.NewBulkInsertStmt("hostgroup", Fields),
BulkDeleteStmt: connection.NewBulkDeleteStmt("hostgroup"),
BulkUpdateStmt: connection.NewBulkUpdateStmt("hostgroup", Fields),
}
}

View file

@ -7,9 +7,7 @@ import (
)
var (
BulkInsertStmt *connection.BulkInsertStmt
BulkDeleteStmt *connection.BulkDeleteStmt
BulkUpdateStmt *connection.BulkUpdateStmt
ObjectInformation configobject.ObjectInformation
Fields = []string{
"id",
"env_id",
@ -93,7 +91,7 @@ type Service struct {
CommandEndpointId string `json:"command_endpoint_id"`
}
func NewService() configobject.Row {
func NewService() connection.Row {
s := Service{}
s.NameCi = &s.Name
@ -162,7 +160,11 @@ func (s *Service) SetId(id string) {
}
func init() {
BulkInsertStmt = connection.NewBulkInsertStmt("service", Fields)
BulkDeleteStmt = connection.NewBulkDeleteStmt("service")
BulkUpdateStmt = connection.NewBulkUpdateStmt("service", Fields)
ObjectInformation = configobject.ObjectInformation{
ObjectType: "service",
Factory: NewService,
BulkInsertStmt: connection.NewBulkInsertStmt("service", Fields),
BulkDeleteStmt: connection.NewBulkDeleteStmt("service"),
BulkUpdateStmt: connection.NewBulkUpdateStmt("service", Fields),
}
}

View file

@ -7,9 +7,7 @@ import (
)
var (
BulkInsertStmt *connection.BulkInsertStmt
BulkDeleteStmt *connection.BulkDeleteStmt
BulkUpdateStmt *connection.BulkUpdateStmt
ObjectInformation configobject.ObjectInformation
Fields = []string{
"id",
"env_id",
@ -35,7 +33,7 @@ type Servicegroup struct {
ZoneId string `json:"zone_id"`
}
func NewServicegroup() configobject.Row {
func NewServicegroup() connection.Row {
s := Servicegroup{}
s.NameCi = &s.Name
@ -75,7 +73,11 @@ func (s *Servicegroup) SetId(id string) {
}
func init() {
BulkInsertStmt = connection.NewBulkInsertStmt("servicegroup", Fields)
BulkDeleteStmt = connection.NewBulkDeleteStmt("servicegroup")
BulkUpdateStmt = connection.NewBulkUpdateStmt("servicegroup", Fields)
ObjectInformation = configobject.ObjectInformation{
ObjectType: "servicegroup",
Factory: NewServicegroup,
BulkInsertStmt: connection.NewBulkInsertStmt("servicegroup", Fields),
BulkDeleteStmt: connection.NewBulkDeleteStmt("servicegroup"),
BulkUpdateStmt: connection.NewBulkUpdateStmt("servicegroup", Fields),
}
}

View file

@ -7,9 +7,7 @@ import (
)
var (
BulkInsertStmt *connection.BulkInsertStmt
BulkDeleteStmt *connection.BulkDeleteStmt
BulkUpdateStmt *connection.BulkUpdateStmt
ObjectInformation configobject.ObjectInformation
Fields = []string{
"id",
"env_id",
@ -49,7 +47,7 @@ type User struct {
ZoneId string `json:"zone_id"`
}
func NewUser() configobject.Row {
func NewUser() connection.Row {
u := User{}
u.NameCi = &u.Name
@ -96,7 +94,11 @@ func (u *User) SetId(id string) {
}
func init() {
BulkInsertStmt = connection.NewBulkInsertStmt("user", Fields)
BulkDeleteStmt = connection.NewBulkDeleteStmt("user")
BulkUpdateStmt = connection.NewBulkUpdateStmt("user", Fields)
ObjectInformation = configobject.ObjectInformation{
ObjectType: "user",
Factory: NewUser,
BulkInsertStmt: connection.NewBulkInsertStmt("user", Fields),
BulkDeleteStmt: connection.NewBulkDeleteStmt("user"),
BulkUpdateStmt: connection.NewBulkUpdateStmt("user", Fields),
}
}

View file

@ -5,7 +5,6 @@ import (
"context"
"database/sql"
"fmt"
"git.icinga.com/icingadb/icingadb-main/configobject"
"git.icinga.com/icingadb/icingadb-main/utils"
log "github.com/sirupsen/logrus"
"strings"
@ -613,7 +612,7 @@ func (dbw *DBWrapper) SqlFetchChecksums(table string, ids []string) (map[string]
return checksums, nil
}
func (dbw *DBWrapper) SqlBulkInsert(rows []configobject.Row, stmt *BulkInsertStmt) error {
func (dbw *DBWrapper) SqlBulkInsert(rows []Row, stmt *BulkInsertStmt) error {
if len(rows) == 0 {
return nil
}
@ -673,7 +672,7 @@ func (dbw *DBWrapper) SqlBulkDelete(keys []string, stmt *BulkDeleteStmt) error {
return nil
}
func (dbw *DBWrapper) SqlBulkUpdate(rows []configobject.Row, stmt *BulkUpdateStmt) error {
func (dbw *DBWrapper) SqlBulkUpdate(rows []Row, stmt *BulkUpdateStmt) error {
if len(rows) == 0 {
return nil
}

View file

@ -6,13 +6,13 @@ import (
"errors"
"fmt"
"github.com/go-sql-driver/mysql"
log "github.com/sirupsen/logrus"
"io/ioutil"
oldlog "log"
"reflect"
"sort"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
oldlog "log"
)
// mkMysql creates a new MySQL client.
@ -422,4 +422,13 @@ func NewBulkUpdateStmt(table string, fields []string) *BulkUpdateStmt {
}
return &stmt
}
}
type Row interface {
InsertValues() []interface{}
UpdateValues() []interface{}
GetId() string
SetId(id string)
}
type RowFactory func() Row

View file

@ -1,7 +1,7 @@
package jsondecoder
import (
"git.icinga.com/icingadb/icingadb-main/configobject"
"git.icinga.com/icingadb/icingadb-main/connection"
"github.com/json-iterator/go"
)
@ -15,21 +15,21 @@ type JsonDecodePackage struct {
// Json strings from Redis
ConfigRaw string
// Unmarshaled config object ready to be used in SQL
Row configobject.Row
Row connection.Row
// Package will be sent back through this channel
Factory configobject.RowFactory
Factory connection.RowFactory
// Object type (host, service, endpoint, command...)
ObjectType string
}
type JsonDecodePackages struct {
Packages []JsonDecodePackage
ChBack chan<- []configobject.Row
ChBack chan<- []connection.Row
}
// decodeString unmarshals the string toDecode using the json package. The decoded json will be written to row.
// Returns error, if not successful.
func decodeString(toDecode string, row configobject.Row) error {
func decodeString(toDecode string, row connection.Row) error {
return json.Unmarshal([]byte(toDecode), row)
}
@ -49,7 +49,7 @@ func decodePackage(chInput <-chan *JsonDecodePackages) error {
var err error
for pkgs := range chInput {
var rows []configobject.Row
var rows []connection.Row
for _, pkg := range pkgs.Packages {
row := pkg.Factory()
row.SetId(pkg.Id)

View file

@ -1,8 +1,8 @@
package jsondecoder
import (
"git.icinga.com/icingadb/icingadb-main/configobject"
"git.icinga.com/icingadb/icingadb-main/configobject/host"
"git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/host"
"git.icinga.com/icingadb/icingadb-main/connection"
"github.com/stretchr/testify/assert"
"testing"
)
@ -22,7 +22,7 @@ func Test_decodeString(t *testing.T) {
func Test_DecodePool(t *testing.T) {
var chInput = make(chan *JsonDecodePackages)
var chOutput = make(chan []configobject.Row)
var chOutput = make(chan []connection.Row)
var chError = make(chan error)
var TestPackageA = JsonDecodePackage{
@ -30,7 +30,7 @@ func Test_DecodePool(t *testing.T) {
"{\"checkcommand_id\":\"0bba6ab6747f1c0de3bf80932d10bc7b603e27fc\",\"customvars_checksum\":\"5ba93c9db0cff93f52b521d7420e43f6eda2784f\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[],\"groups_checksum\":\"a0930d202ae77bbafbc4d898e3d060f462160904\",\"name_checksum\":\"21021feda571e19d8afb53fb11ca089db7578cee\",\"properties_checksum\":\"8d515de29444d9df3e374bcc1b890040fcc48be5\"}",
"{\"active_checks_enabled\":true,\"address\":\"\",\"address6\":\"\",\"check_interval\":60.0,\"check_retry_interval\":60.0,\"check_timeout\":null,\"checkcommand\":\"random\",\"display_name\":\"aa3derphosta469\",\"event_handler_enabled\":true,\"flapping_enabled\":false,\"flapping_threshold_high\":30.0,\"flapping_threshold_low\":25.0,\"icon_image_alt\":\"\",\"is_volatile\":false,\"max_check_attempts\":3.0,\"name\":\"aa3derphosta469\",\"notes\":\"\",\"notifications_enabled\":true,\"passive_checks_enabled\":true,\"perfdata_enabled\":true}",
nil,
host.NewHost,
host.ObjectInformation.Factory,
"host",
}
@ -39,7 +39,7 @@ func Test_DecodePool(t *testing.T) {
"{\"checkcommand_id\":\"0bba6ab6747f1c0de3bf80932d10bc7b603e27fc\",\"customvars_checksum\":\"5ba93c9db0cff93f52b521d7420e43f6eda2784f\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[],\"groups_checksum\":\"a0930d202ae77bbafbc4d898e3d060f462160904\",\"name_checksum\":\"f14feab0710d05e4ca9ffd712f8c0af5d8f5119a\",\"properties_checksum\":\"bd3e124054427700571dae7552aa90e3fe4f1fde\"}",
"{\"active_checks_enabled\":true,\"address\":\"\",\"address6\":\"\",\"check_interval\":60.0,\"check_retry_interval\":60.0,\"check_timeout\":null,\"checkcommand\":\"random\",\"display_name\":\"aa3derphosta378\",\"event_handler_enabled\":true,\"flapping_enabled\":false,\"flapping_threshold_high\":30.0,\"flapping_threshold_low\":25.0,\"icon_image_alt\":\"\",\"is_volatile\":false,\"max_check_attempts\":3.0,\"name\":\"aa3derphosta378\",\"notes\":\"\",\"notifications_enabled\":true,\"passive_checks_enabled\":true,\"perfdata_enabled\":true}",
nil,
host.NewHost,
host.ObjectInformation.Factory,
"host",
}

92
main.go
View file

@ -4,12 +4,12 @@ import (
"flag"
"git.icinga.com/icingadb/icingadb-main/config"
"git.icinga.com/icingadb/icingadb-main/configobject/configsync"
"git.icinga.com/icingadb/icingadb-main/configobject/host"
"git.icinga.com/icingadb/icingadb-main/configobject/hostgroup"
"git.icinga.com/icingadb/icingadb-main/configobject/service"
"git.icinga.com/icingadb/icingadb-main/configobject/servicegroup"
"git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/host"
"git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/hostgroup"
"git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/service"
"git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/servicegroup"
"git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/user"
"git.icinga.com/icingadb/icingadb-main/configobject/statesync"
"git.icinga.com/icingadb/icingadb-main/configobject/user"
"git.icinga.com/icingadb/icingadb-main/connection"
"git.icinga.com/icingadb/icingadb-main/ha"
"git.icinga.com/icingadb/icingadb-main/jsondecoder"
@ -61,60 +61,7 @@ func main() {
go jsondecoder.DecodePool(super.ChDecode, super.ChErr, 16)
chHAHost := haInstance.RegisterNotificationListener()
go func() {
super.ChErr <- configsync.Operator(&super, chHAHost, &configsync.Context{
ObjectType: "host",
Factory: host.NewHost,
InsertStmt: host.BulkInsertStmt,
DeleteStmt: host.BulkDeleteStmt,
UpdateStmt: host.BulkUpdateStmt,
})
}()
chHAService := haInstance.RegisterNotificationListener()
go func() {
super.ChErr <- configsync.Operator(&super, chHAService, &configsync.Context{
ObjectType: "service",
Factory: service.NewService,
InsertStmt: service.BulkInsertStmt,
DeleteStmt: service.BulkDeleteStmt,
UpdateStmt: service.BulkUpdateStmt,
})
}()
chHAHostgroup := haInstance.RegisterNotificationListener()
go func() {
super.ChErr <- configsync.Operator(&super, chHAHostgroup, &configsync.Context{
ObjectType: "hostgroup",
Factory: hostgroup.NewHostgroup,
InsertStmt: hostgroup.BulkInsertStmt,
DeleteStmt: hostgroup.BulkDeleteStmt,
UpdateStmt: hostgroup.BulkUpdateStmt,
})
}()
chHAServicegroup := haInstance.RegisterNotificationListener()
go func() {
super.ChErr <- configsync.Operator(&super, chHAServicegroup, &configsync.Context{
ObjectType: "servicegroup",
Factory: servicegroup.NewServicegroup,
InsertStmt: servicegroup.BulkInsertStmt,
DeleteStmt: servicegroup.BulkDeleteStmt,
UpdateStmt: servicegroup.BulkUpdateStmt,
})
}()
chHAUser := haInstance.RegisterNotificationListener()
go func() {
super.ChErr <- configsync.Operator(&super, chHAUser, &configsync.Context{
ObjectType: "user",
Factory: user.NewUser,
InsertStmt: user.BulkInsertStmt,
DeleteStmt: user.BulkDeleteStmt,
UpdateStmt: user.BulkUpdateStmt,
})
}()
startConfigSyncOperators(&super, haInstance)
statesync.StartStateSync(&super)
@ -129,3 +76,30 @@ func main() {
}
}
}
func startConfigSyncOperators(super *supervisor.Supervisor, haInstance *ha.HA) {
chHAHost := haInstance.RegisterNotificationListener()
go func() {
super.ChErr <- configsync.Operator(super, chHAHost, &host.ObjectInformation)
}()
chHAService := haInstance.RegisterNotificationListener()
go func() {
super.ChErr <- configsync.Operator(super, chHAService, &service.ObjectInformation)
}()
chHAHostgroup := haInstance.RegisterNotificationListener()
go func() {
super.ChErr <- configsync.Operator(super, chHAHostgroup, &hostgroup.ObjectInformation)
}()
chHAServicegroup := haInstance.RegisterNotificationListener()
go func() {
super.ChErr <- configsync.Operator(super, chHAServicegroup, &servicegroup.ObjectInformation)
}()
chHAUser := haInstance.RegisterNotificationListener()
go func() {
super.ChErr <- configsync.Operator(super, chHAUser, &user.ObjectInformation)
}()
}