diff --git a/configobject/configobject.go b/configobject/configobject.go index bc7a27b6..651f7332 100644 --- a/configobject/configobject.go +++ b/configobject/configobject.go @@ -1,10 +1,13 @@ package configobject -type Row interface { - InsertValues() []interface{} - UpdateValues() []interface{} - GetId() string - SetId(id string) -} +import ( + "git.icinga.com/icingadb/icingadb-main/connection" +) -type RowFactory func() Row \ No newline at end of file +type ObjectInformation struct { + ObjectType string + Factory connection.RowFactory + BulkInsertStmt *connection.BulkInsertStmt + BulkDeleteStmt *connection.BulkDeleteStmt + BulkUpdateStmt *connection.BulkUpdateStmt +} \ No newline at end of file diff --git a/configobject/configsync/configsync.go b/configobject/configsync/configsync.go index eefb3733..ab048ba1 100644 --- a/configobject/configsync/configsync.go +++ b/configobject/configsync/configsync.go @@ -14,15 +14,6 @@ import ( "sync/atomic" ) -// Context provides an Operator with all necessary information to sync a config type -type Context struct { - ObjectType string - Factory configobject.RowFactory - InsertStmt *connection.BulkInsertStmt - DeleteStmt *connection.BulkDeleteStmt - UpdateStmt *connection.BulkUpdateStmt -} - type Checksums struct { NameChecksum string `json:"name_checksum"` PropertiesChecksum string `json:"properties_checksum"` @@ -32,10 +23,10 @@ type Checksums struct { // Operator is the main worker for each config type. It takes a reference to a supervisor super, holding all required // connection information and other control mechanisms, a channel chHA, which informs the Operator of the current HA -// state, and a Context reference ctx defining the type and providing the necessary factories. -func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { - //insert, update, delete := GetDelta(super, ctx) - //log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete)) +// state, and a ObjectInformation reference defining the type and providing the necessary factories. +func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *configobject.ObjectInformation) error { + //insert, update, delete := GetDelta(super, objectInformation) + //log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", objectInformation.ObjectType, len(insert), len(update), len(delete)) var ( // If this IcingaDB-Instance looses responsibility, this channel will be @@ -46,7 +37,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chInsert chan []string // Used by the JsonDecodePool to provide the InsertExecWorker with decoded rows, ready to be inserted // JsonDecodePool -> InsertExecWorker - chInsertBack chan []configobject.Row + chInsertBack chan []connection.Row // Used by this Operator to provide the DeleteExecWorker with IDs to delete // Operator -> DeleteExecWorker chDelete chan []string @@ -58,7 +49,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chUpdate chan []string // Used by the JsonDecodePool to provide the UpdateExecWorker with decoded rows, ready to be updated // JsonDecodePool -> UpdateExecWorker - chUpdateBack chan []configobject.Row + chUpdateBack chan []connection.Row wgInsert *sync.WaitGroup wgDelete *sync.WaitGroup wgUpdate *sync.WaitGroup @@ -67,41 +58,41 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { switch msg { // Icinga 2 probably died, stop operations and tell all workers to shut down. case ha.Notify_StopSync: - log.Info(fmt.Sprintf("%s: Lost responsibility", ctx.ObjectType)) + log.Info(fmt.Sprintf("%s: Lost responsibility", objectInformation.ObjectType)) if done != nil { close(done) done = nil } // Starts up the whole sync process. case ha.Notify_StartSync: - log.Infof("%s: Got responsibility", ctx.ObjectType) + log.Infof("%s: Got responsibility", objectInformation.ObjectType) //TODO: This should only be done, if HA was taken over from another instance - insert, update, delete := GetDelta(super, ctx) - log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete)) + insert, update, delete := GetDelta(super, objectInformation) + log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", objectInformation.ObjectType, len(insert), len(update), len(delete)) // Clean up all channels and wait groups for a fresh config dump done = make(chan struct{}) chInsert = make(chan []string) - chInsertBack = make(chan []configobject.Row) + chInsertBack = make(chan []connection.Row) chDelete = make(chan []string) chUpdateComp = make(chan []string) chUpdate = make(chan []string) - chUpdateBack = make(chan []configobject.Row) + chUpdateBack = make(chan []connection.Row) wgInsert = &sync.WaitGroup{} wgDelete = &sync.WaitGroup{} wgUpdate = &sync.WaitGroup{} updateCounter := new(uint32) - go InsertPrepWorker(super, ctx, done, chInsert, chInsertBack) - go InsertExecWorker(super, ctx, done, chInsertBack, wgInsert) + go InsertPrepWorker(super, objectInformation, done, chInsert, chInsertBack) + go InsertExecWorker(super, objectInformation, done, chInsertBack, wgInsert) - go DeleteExecWorker(super, ctx, done, chDelete, wgDelete) + go DeleteExecWorker(super, objectInformation, done, chDelete, wgDelete) - go UpdateCompWorker(super, ctx, done, chUpdateComp, chUpdate, wgUpdate) - go UpdatePrepWorker(super, ctx, done, chUpdate, chUpdateBack) - go UpdateExecWorker(super, ctx, done, chUpdateBack, wgUpdate, updateCounter) + go UpdateCompWorker(super, objectInformation, done, chUpdateComp, chUpdate, wgUpdate) + go UpdatePrepWorker(super, objectInformation, done, chUpdate, chUpdateBack) + go UpdateExecWorker(super, objectInformation, done, chUpdateBack, wgUpdate, updateCounter) waitOrKill := func(wg *sync.WaitGroup, done chan struct{}) (kill bool) { waitDone := make(chan bool) @@ -130,11 +121,11 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { benchmarc.Stop() if !kill { log.WithFields(log.Fields{ - "type": ctx.ObjectType, + "type": objectInformation.ObjectType, "count": len(insert), "benchmark": benchmarc.String(), "action": "insert", - }).Infof("Inserted %v %ss in %v", len(insert), ctx.ObjectType, benchmarc.String()) + }).Infof("Inserted %v %ss in %v", len(insert), objectInformation.ObjectType, benchmarc.String()) } }() @@ -150,11 +141,11 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { benchmarc.Stop() if !kill { log.WithFields(log.Fields{ - "type": ctx.ObjectType, + "type": objectInformation.ObjectType, "count": len(delete), "benchmark": benchmarc.String(), "action": "delete", - }).Infof("Deleted %v %ss in %v", len(delete), ctx.ObjectType, benchmarc.String()) + }).Infof("Deleted %v %ss in %v", len(delete), objectInformation.ObjectType, benchmarc.String()) } }() @@ -170,11 +161,11 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { benchmarc.Stop() if !kill { log.WithFields(log.Fields{ - "type": ctx.ObjectType, + "type": objectInformation.ObjectType, "count": atomic.LoadUint32(updateCounter), "benchmark": benchmarc.String(), "action": "update", - }).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), ctx.ObjectType, benchmarc.String()) + }).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), objectInformation.ObjectType, benchmarc.String()) } }() } @@ -183,12 +174,12 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { return nil } -// GetDelta takes a config Context (host, service, checkcommand, etc.) and fetches the ids from MySQL and Redis. It +// GetDelta takes the ObjectInformation (host, service, checkcommand, etc.) and fetches the ids from MySQL and Redis. It // returns three string slices: // 1. IDs which are in the Redis but not in the MySQL (to insert) // 2. IDs which are in both (to possibly update) // 3. IDs which are in the MySQL but not the Redis (to delete) -func GetDelta(super *supervisor.Supervisor, ctx *Context) ([]string, []string, []string) { +func GetDelta(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation) ([]string, []string, []string) { var ( redisIds []string mysqlIds []string @@ -200,7 +191,7 @@ func GetDelta(super *supervisor.Supervisor, ctx *Context) ([]string, []string, [ go func() { defer wg.Done() var err error - res, err := super.Rdbw.HKeys(fmt.Sprintf("icinga:config:checksum:%s", ctx.ObjectType)).Result() + res, err := super.Rdbw.HKeys(fmt.Sprintf("icinga:config:checksum:%s", objectInformation.ObjectType)).Result() if err != nil { super.ChErr <- err return @@ -214,7 +205,7 @@ func GetDelta(super *supervisor.Supervisor, ctx *Context) ([]string, []string, [ defer wg.Done() var err error super.EnvLock.Lock() - mysqlIds, err = super.Dbw.SqlFetchIds(super.EnvId, ctx.ObjectType) + mysqlIds, err = super.Dbw.SqlFetchIds(super.EnvId, objectInformation.ObjectType) super.EnvLock.Unlock() if err != nil { super.ChErr <- err @@ -227,8 +218,8 @@ func GetDelta(super *supervisor.Supervisor, ctx *Context) ([]string, []string, [ } // InsertPrepWorker fetches config for IDs(chInsert) from Redis, wraps it into JsonDecodePackages and throws it into the JsonDecodePool -func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chInsert <-chan []string, chInsertBack chan<- []configobject.Row) { - defer log.Infof("%s: Insert preparation routine stopped", ctx.ObjectType) +func InsertPrepWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chInsert <-chan []string, chInsertBack chan<- []connection.Row) { + defer log.Infof("%s: Insert preparation routine stopped", objectInformation.ObjectType) prep := func(chunk *connection.ConfigChunk) { pkgs := jsondecoder.JsonDecodePackages{ @@ -242,8 +233,8 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru Id: key, ChecksumsRaw: chunk.Checksums[i].(string), ConfigRaw: chunk.Configs[i].(string), - Factory: ctx.Factory, - ObjectType: ctx.ObjectType, + Factory: objectInformation.Factory, + ObjectType: objectInformation.ObjectType, } pkgs.Packages = append(pkgs.Packages, pkg) } @@ -260,7 +251,7 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru default: } - ch := super.Rdbw.PipeConfigChunks(done, keys, ctx.ObjectType) + ch := super.Rdbw.PipeConfigChunks(done, keys, objectInformation.ObjectType) go func() { for chunk := range ch { go prep(chunk) @@ -269,8 +260,8 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru } } -// InsertExecWorker gets decoded configobject.Row objects from the JsonDecodePool and inserts them into MySQL -func InsertExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chInsertBack <-chan []configobject.Row, wg *sync.WaitGroup) { +// InsertExecWorker gets decoded connection.Row objects from the JsonDecodePool and inserts them into MySQL +func InsertExecWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chInsertBack <-chan []connection.Row, wg *sync.WaitGroup) { for rows := range chInsertBack { select { case _, ok := <-done: @@ -280,15 +271,15 @@ func InsertExecWorker(super *supervisor.Supervisor, ctx *Context, done chan stru default: } - go func(rows []configobject.Row) { - super.ChErr <- super.Dbw.SqlBulkInsert(rows, ctx.InsertStmt) + go func(rows []connection.Row) { + super.ChErr <- super.Dbw.SqlBulkInsert(rows, objectInformation.BulkInsertStmt) wg.Add(-len(rows)) }(rows) } } // DeleteExecWorker deletes IDs(chDelete) from MySQL -func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chDelete <-chan []string, wg *sync.WaitGroup) { +func DeleteExecWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chDelete <-chan []string, wg *sync.WaitGroup) { for keys := range chDelete { select { case _, ok := <-done: @@ -299,7 +290,7 @@ func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, done chan stru } go func(keys []string) { - super.ChErr <- super.Dbw.SqlBulkDelete(keys, ctx.DeleteStmt) + super.ChErr <- super.Dbw.SqlBulkDelete(keys, objectInformation.BulkDeleteStmt) wg.Add(-len(keys)) }(keys) } @@ -307,7 +298,7 @@ func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, done chan stru // UpdateCompWorker gets IDs(chUpdateComp) that might need an update, fetches the corresponding checksums for Redis and MySQL, // compares them and inserts changed IDs into chUpdate. -func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdateComp <-chan []string, chUpdate chan<- []string, wg *sync.WaitGroup) { +func UpdateCompWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chUpdateComp <-chan []string, chUpdate chan<- []string, wg *sync.WaitGroup) { prep := func(chunk *connection.ChecksumChunk, mysqlChecksums map[string]map[string]string) { changed := make([]string, 0) for i, key := range chunk.Keys { @@ -340,8 +331,8 @@ func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, done chan stru default: } - ch := super.Rdbw.PipeChecksumChunks(done, keys, ctx.ObjectType) - checksums, err := super.Dbw.SqlFetchChecksums(ctx.ObjectType, keys) + ch := super.Rdbw.PipeChecksumChunks(done, keys, objectInformation.ObjectType) + checksums, err := super.Dbw.SqlFetchChecksums(objectInformation.ObjectType, keys) if err != nil { super.ChErr <- err } @@ -355,7 +346,7 @@ func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, done chan stru } // UpdatePrepWorker fetches config for IDs(chUpdate) from Redis, wraps it into JsonDecodePackages and throws it into the JsonDecodePool -func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdate <-chan []string, chUpdateBack chan<- []configobject.Row) { +func UpdatePrepWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chUpdate <-chan []string, chUpdateBack chan<- []connection.Row) { prep := func(chunk *connection.ConfigChunk) { pkgs := jsondecoder.JsonDecodePackages{ ChBack: chUpdateBack, @@ -368,8 +359,8 @@ func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru Id: key, ChecksumsRaw: chunk.Checksums[i].(string), ConfigRaw: chunk.Configs[i].(string), - Factory: ctx.Factory, - ObjectType: ctx.ObjectType, + Factory: objectInformation.Factory, + ObjectType: objectInformation.ObjectType, } pkgs.Packages = append(pkgs.Packages, pkg) } @@ -386,7 +377,7 @@ func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru default: } - ch := super.Rdbw.PipeConfigChunks(done, keys, ctx.ObjectType) + ch := super.Rdbw.PipeConfigChunks(done, keys, objectInformation.ObjectType) go func() { for chunk := range ch { go prep(chunk) @@ -395,8 +386,8 @@ func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, done chan stru } } -// UpdateExecWorker gets decoded configobject.Row objects from the JsonDecodePool and updates them in MySQL -func UpdateExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdateBack <-chan []configobject.Row, wg *sync.WaitGroup, updateCounter *uint32) { +// UpdateExecWorker gets decoded connection.Row objects from the JsonDecodePool and updates them in MySQL +func UpdateExecWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chUpdateBack <-chan []connection.Row, wg *sync.WaitGroup, updateCounter *uint32) { for rows := range chUpdateBack { select { case _, ok := <-done: @@ -406,8 +397,8 @@ func UpdateExecWorker(super *supervisor.Supervisor, ctx *Context, done chan stru default: } - go func(rows []configobject.Row) { - super.ChErr <- super.Dbw.SqlBulkUpdate(rows, ctx.UpdateStmt) + go func(rows []connection.Row) { + super.ChErr <- super.Dbw.SqlBulkUpdate(rows, objectInformation.BulkUpdateStmt) wg.Add(-len(rows)) atomic.AddUint32(updateCounter, uint32(len(rows))) }(rows) diff --git a/configobject/host/host.go b/configobject/objecttypes/host/host.go similarity index 92% rename from configobject/host/host.go rename to configobject/objecttypes/host/host.go index 6db38612..9f1a9666 100644 --- a/configobject/host/host.go +++ b/configobject/objecttypes/host/host.go @@ -7,9 +7,7 @@ import ( ) var ( - BulkInsertStmt *connection.BulkInsertStmt - BulkDeleteStmt *connection.BulkDeleteStmt - BulkUpdateStmt *connection.BulkUpdateStmt + ObjectInformation configobject.ObjectInformation Fields = []string{ "id", "env_id", @@ -99,7 +97,7 @@ type Host struct { CommandEndpointId string `json:"command_endpoint_id"` } -func NewHost() configobject.Row { +func NewHost() connection.Row { h := Host{} h.NameCi = &h.Name @@ -171,7 +169,11 @@ func (h *Host) SetId(id string) { } func init() { - BulkInsertStmt = connection.NewBulkInsertStmt("host", Fields) - BulkDeleteStmt = connection.NewBulkDeleteStmt("host") - BulkUpdateStmt = connection.NewBulkUpdateStmt("host", Fields) + ObjectInformation = configobject.ObjectInformation{ + ObjectType: "host", + Factory: NewHost, + BulkInsertStmt: connection.NewBulkInsertStmt("host", Fields), + BulkDeleteStmt: connection.NewBulkDeleteStmt("host"), + BulkUpdateStmt: connection.NewBulkUpdateStmt("host", Fields), + } } \ No newline at end of file diff --git a/configobject/hostgroup/hostgroup.go b/configobject/objecttypes/hostgroup/hostgroup.go similarity index 79% rename from configobject/hostgroup/hostgroup.go rename to configobject/objecttypes/hostgroup/hostgroup.go index 0ed2298f..4c0b9a1a 100644 --- a/configobject/hostgroup/hostgroup.go +++ b/configobject/objecttypes/hostgroup/hostgroup.go @@ -7,9 +7,7 @@ import ( ) var ( - BulkInsertStmt *connection.BulkInsertStmt - BulkDeleteStmt *connection.BulkDeleteStmt - BulkUpdateStmt *connection.BulkUpdateStmt + ObjectInformation configobject.ObjectInformation Fields = []string{ "id", "env_id", @@ -35,7 +33,7 @@ type Hostgroup struct { ZoneId string `json:"zone_id"` } -func NewHostgroup() configobject.Row { +func NewHostgroup() connection.Row { h := Hostgroup{} h.NameCi = &h.Name @@ -75,7 +73,11 @@ func (h *Hostgroup) SetId(id string) { } func init() { - BulkInsertStmt = connection.NewBulkInsertStmt("hostgroup", Fields) - BulkDeleteStmt = connection.NewBulkDeleteStmt("hostgroup") - BulkUpdateStmt = connection.NewBulkUpdateStmt("hostgroup", Fields) + ObjectInformation = configobject.ObjectInformation{ + ObjectType: "hostgroup", + Factory: NewHostgroup, + BulkInsertStmt: connection.NewBulkInsertStmt("hostgroup", Fields), + BulkDeleteStmt: connection.NewBulkDeleteStmt("hostgroup"), + BulkUpdateStmt: connection.NewBulkUpdateStmt("hostgroup", Fields), + } } \ No newline at end of file diff --git a/configobject/service/service.go b/configobject/objecttypes/service/service.go similarity index 92% rename from configobject/service/service.go rename to configobject/objecttypes/service/service.go index 86721181..6a422c1b 100644 --- a/configobject/service/service.go +++ b/configobject/objecttypes/service/service.go @@ -7,9 +7,7 @@ import ( ) var ( - BulkInsertStmt *connection.BulkInsertStmt - BulkDeleteStmt *connection.BulkDeleteStmt - BulkUpdateStmt *connection.BulkUpdateStmt + ObjectInformation configobject.ObjectInformation Fields = []string{ "id", "env_id", @@ -93,7 +91,7 @@ type Service struct { CommandEndpointId string `json:"command_endpoint_id"` } -func NewService() configobject.Row { +func NewService() connection.Row { s := Service{} s.NameCi = &s.Name @@ -162,7 +160,11 @@ func (s *Service) SetId(id string) { } func init() { - BulkInsertStmt = connection.NewBulkInsertStmt("service", Fields) - BulkDeleteStmt = connection.NewBulkDeleteStmt("service") - BulkUpdateStmt = connection.NewBulkUpdateStmt("service", Fields) + ObjectInformation = configobject.ObjectInformation{ + ObjectType: "service", + Factory: NewService, + BulkInsertStmt: connection.NewBulkInsertStmt("service", Fields), + BulkDeleteStmt: connection.NewBulkDeleteStmt("service"), + BulkUpdateStmt: connection.NewBulkUpdateStmt("service", Fields), + } } \ No newline at end of file diff --git a/configobject/servicegroup/servicegroup.go b/configobject/objecttypes/servicegroup/servicegroup.go similarity index 78% rename from configobject/servicegroup/servicegroup.go rename to configobject/objecttypes/servicegroup/servicegroup.go index 5e3899ca..56390b97 100644 --- a/configobject/servicegroup/servicegroup.go +++ b/configobject/objecttypes/servicegroup/servicegroup.go @@ -7,9 +7,7 @@ import ( ) var ( - BulkInsertStmt *connection.BulkInsertStmt - BulkDeleteStmt *connection.BulkDeleteStmt - BulkUpdateStmt *connection.BulkUpdateStmt + ObjectInformation configobject.ObjectInformation Fields = []string{ "id", "env_id", @@ -35,7 +33,7 @@ type Servicegroup struct { ZoneId string `json:"zone_id"` } -func NewServicegroup() configobject.Row { +func NewServicegroup() connection.Row { s := Servicegroup{} s.NameCi = &s.Name @@ -75,7 +73,11 @@ func (s *Servicegroup) SetId(id string) { } func init() { - BulkInsertStmt = connection.NewBulkInsertStmt("servicegroup", Fields) - BulkDeleteStmt = connection.NewBulkDeleteStmt("servicegroup") - BulkUpdateStmt = connection.NewBulkUpdateStmt("servicegroup", Fields) + ObjectInformation = configobject.ObjectInformation{ + ObjectType: "servicegroup", + Factory: NewServicegroup, + BulkInsertStmt: connection.NewBulkInsertStmt("servicegroup", Fields), + BulkDeleteStmt: connection.NewBulkDeleteStmt("servicegroup"), + BulkUpdateStmt: connection.NewBulkUpdateStmt("servicegroup", Fields), + } } \ No newline at end of file diff --git a/configobject/user/user.go b/configobject/objecttypes/user/user.go similarity index 84% rename from configobject/user/user.go rename to configobject/objecttypes/user/user.go index 3c37ba1c..f6525002 100644 --- a/configobject/user/user.go +++ b/configobject/objecttypes/user/user.go @@ -7,9 +7,7 @@ import ( ) var ( - BulkInsertStmt *connection.BulkInsertStmt - BulkDeleteStmt *connection.BulkDeleteStmt - BulkUpdateStmt *connection.BulkUpdateStmt + ObjectInformation configobject.ObjectInformation Fields = []string{ "id", "env_id", @@ -49,7 +47,7 @@ type User struct { ZoneId string `json:"zone_id"` } -func NewUser() configobject.Row { +func NewUser() connection.Row { u := User{} u.NameCi = &u.Name @@ -96,7 +94,11 @@ func (u *User) SetId(id string) { } func init() { - BulkInsertStmt = connection.NewBulkInsertStmt("user", Fields) - BulkDeleteStmt = connection.NewBulkDeleteStmt("user") - BulkUpdateStmt = connection.NewBulkUpdateStmt("user", Fields) + ObjectInformation = configobject.ObjectInformation{ + ObjectType: "user", + Factory: NewUser, + BulkInsertStmt: connection.NewBulkInsertStmt("user", Fields), + BulkDeleteStmt: connection.NewBulkDeleteStmt("user"), + BulkUpdateStmt: connection.NewBulkUpdateStmt("user", Fields), + } } \ No newline at end of file diff --git a/connection/mysql.go b/connection/mysql.go index 401f270c..331c3e0f 100644 --- a/connection/mysql.go +++ b/connection/mysql.go @@ -5,7 +5,6 @@ import ( "context" "database/sql" "fmt" - "git.icinga.com/icingadb/icingadb-main/configobject" "git.icinga.com/icingadb/icingadb-main/utils" log "github.com/sirupsen/logrus" "strings" @@ -613,7 +612,7 @@ func (dbw *DBWrapper) SqlFetchChecksums(table string, ids []string) (map[string] return checksums, nil } -func (dbw *DBWrapper) SqlBulkInsert(rows []configobject.Row, stmt *BulkInsertStmt) error { +func (dbw *DBWrapper) SqlBulkInsert(rows []Row, stmt *BulkInsertStmt) error { if len(rows) == 0 { return nil } @@ -673,7 +672,7 @@ func (dbw *DBWrapper) SqlBulkDelete(keys []string, stmt *BulkDeleteStmt) error { return nil } -func (dbw *DBWrapper) SqlBulkUpdate(rows []configobject.Row, stmt *BulkUpdateStmt) error { +func (dbw *DBWrapper) SqlBulkUpdate(rows []Row, stmt *BulkUpdateStmt) error { if len(rows) == 0 { return nil } diff --git a/connection/mysql_utils.go b/connection/mysql_utils.go index 8220d5e9..849cb008 100644 --- a/connection/mysql_utils.go +++ b/connection/mysql_utils.go @@ -6,13 +6,13 @@ import ( "errors" "fmt" "github.com/go-sql-driver/mysql" + log "github.com/sirupsen/logrus" "io/ioutil" + oldlog "log" "reflect" "sort" "strconv" "strings" - log "github.com/sirupsen/logrus" - oldlog "log" ) // mkMysql creates a new MySQL client. @@ -422,4 +422,13 @@ func NewBulkUpdateStmt(table string, fields []string) *BulkUpdateStmt { } return &stmt -} \ No newline at end of file +} + +type Row interface { + InsertValues() []interface{} + UpdateValues() []interface{} + GetId() string + SetId(id string) +} + +type RowFactory func() Row \ No newline at end of file diff --git a/jsondecoder/json-decoder.go b/jsondecoder/json-decoder.go index 4bab0893..18409b07 100644 --- a/jsondecoder/json-decoder.go +++ b/jsondecoder/json-decoder.go @@ -1,7 +1,7 @@ package jsondecoder import ( - "git.icinga.com/icingadb/icingadb-main/configobject" + "git.icinga.com/icingadb/icingadb-main/connection" "github.com/json-iterator/go" ) @@ -15,21 +15,21 @@ type JsonDecodePackage struct { // Json strings from Redis ConfigRaw string // Unmarshaled config object ready to be used in SQL - Row configobject.Row + Row connection.Row // Package will be sent back through this channel - Factory configobject.RowFactory + Factory connection.RowFactory // Object type (host, service, endpoint, command...) ObjectType string } type JsonDecodePackages struct { Packages []JsonDecodePackage - ChBack chan<- []configobject.Row + ChBack chan<- []connection.Row } // decodeString unmarshals the string toDecode using the json package. The decoded json will be written to row. // Returns error, if not successful. -func decodeString(toDecode string, row configobject.Row) error { +func decodeString(toDecode string, row connection.Row) error { return json.Unmarshal([]byte(toDecode), row) } @@ -49,7 +49,7 @@ func decodePackage(chInput <-chan *JsonDecodePackages) error { var err error for pkgs := range chInput { - var rows []configobject.Row + var rows []connection.Row for _, pkg := range pkgs.Packages { row := pkg.Factory() row.SetId(pkg.Id) diff --git a/jsondecoder/json-decoder_test.go b/jsondecoder/json-decoder_test.go index 9a0ef037..8d332303 100644 --- a/jsondecoder/json-decoder_test.go +++ b/jsondecoder/json-decoder_test.go @@ -1,8 +1,8 @@ package jsondecoder import ( - "git.icinga.com/icingadb/icingadb-main/configobject" - "git.icinga.com/icingadb/icingadb-main/configobject/host" + "git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/host" + "git.icinga.com/icingadb/icingadb-main/connection" "github.com/stretchr/testify/assert" "testing" ) @@ -22,7 +22,7 @@ func Test_decodeString(t *testing.T) { func Test_DecodePool(t *testing.T) { var chInput = make(chan *JsonDecodePackages) - var chOutput = make(chan []configobject.Row) + var chOutput = make(chan []connection.Row) var chError = make(chan error) var TestPackageA = JsonDecodePackage{ @@ -30,7 +30,7 @@ func Test_DecodePool(t *testing.T) { "{\"checkcommand_id\":\"0bba6ab6747f1c0de3bf80932d10bc7b603e27fc\",\"customvars_checksum\":\"5ba93c9db0cff93f52b521d7420e43f6eda2784f\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[],\"groups_checksum\":\"a0930d202ae77bbafbc4d898e3d060f462160904\",\"name_checksum\":\"21021feda571e19d8afb53fb11ca089db7578cee\",\"properties_checksum\":\"8d515de29444d9df3e374bcc1b890040fcc48be5\"}", "{\"active_checks_enabled\":true,\"address\":\"\",\"address6\":\"\",\"check_interval\":60.0,\"check_retry_interval\":60.0,\"check_timeout\":null,\"checkcommand\":\"random\",\"display_name\":\"aa3derphosta469\",\"event_handler_enabled\":true,\"flapping_enabled\":false,\"flapping_threshold_high\":30.0,\"flapping_threshold_low\":25.0,\"icon_image_alt\":\"\",\"is_volatile\":false,\"max_check_attempts\":3.0,\"name\":\"aa3derphosta469\",\"notes\":\"\",\"notifications_enabled\":true,\"passive_checks_enabled\":true,\"perfdata_enabled\":true}", nil, - host.NewHost, + host.ObjectInformation.Factory, "host", } @@ -39,7 +39,7 @@ func Test_DecodePool(t *testing.T) { "{\"checkcommand_id\":\"0bba6ab6747f1c0de3bf80932d10bc7b603e27fc\",\"customvars_checksum\":\"5ba93c9db0cff93f52b521d7420e43f6eda2784f\",\"environment_id\":\"90a8834de76326869f3e703cd61513081ad73d3c\",\"group_ids\":[],\"groups_checksum\":\"a0930d202ae77bbafbc4d898e3d060f462160904\",\"name_checksum\":\"f14feab0710d05e4ca9ffd712f8c0af5d8f5119a\",\"properties_checksum\":\"bd3e124054427700571dae7552aa90e3fe4f1fde\"}", "{\"active_checks_enabled\":true,\"address\":\"\",\"address6\":\"\",\"check_interval\":60.0,\"check_retry_interval\":60.0,\"check_timeout\":null,\"checkcommand\":\"random\",\"display_name\":\"aa3derphosta378\",\"event_handler_enabled\":true,\"flapping_enabled\":false,\"flapping_threshold_high\":30.0,\"flapping_threshold_low\":25.0,\"icon_image_alt\":\"\",\"is_volatile\":false,\"max_check_attempts\":3.0,\"name\":\"aa3derphosta378\",\"notes\":\"\",\"notifications_enabled\":true,\"passive_checks_enabled\":true,\"perfdata_enabled\":true}", nil, - host.NewHost, + host.ObjectInformation.Factory, "host", } diff --git a/main.go b/main.go index 3265abc4..5bdc80e5 100644 --- a/main.go +++ b/main.go @@ -4,12 +4,12 @@ import ( "flag" "git.icinga.com/icingadb/icingadb-main/config" "git.icinga.com/icingadb/icingadb-main/configobject/configsync" - "git.icinga.com/icingadb/icingadb-main/configobject/host" - "git.icinga.com/icingadb/icingadb-main/configobject/hostgroup" - "git.icinga.com/icingadb/icingadb-main/configobject/service" - "git.icinga.com/icingadb/icingadb-main/configobject/servicegroup" + "git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/host" + "git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/hostgroup" + "git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/service" + "git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/servicegroup" + "git.icinga.com/icingadb/icingadb-main/configobject/objecttypes/user" "git.icinga.com/icingadb/icingadb-main/configobject/statesync" - "git.icinga.com/icingadb/icingadb-main/configobject/user" "git.icinga.com/icingadb/icingadb-main/connection" "git.icinga.com/icingadb/icingadb-main/ha" "git.icinga.com/icingadb/icingadb-main/jsondecoder" @@ -61,60 +61,7 @@ func main() { go jsondecoder.DecodePool(super.ChDecode, super.ChErr, 16) - chHAHost := haInstance.RegisterNotificationListener() - go func() { - super.ChErr <- configsync.Operator(&super, chHAHost, &configsync.Context{ - ObjectType: "host", - Factory: host.NewHost, - InsertStmt: host.BulkInsertStmt, - DeleteStmt: host.BulkDeleteStmt, - UpdateStmt: host.BulkUpdateStmt, - }) - }() - - chHAService := haInstance.RegisterNotificationListener() - go func() { - super.ChErr <- configsync.Operator(&super, chHAService, &configsync.Context{ - ObjectType: "service", - Factory: service.NewService, - InsertStmt: service.BulkInsertStmt, - DeleteStmt: service.BulkDeleteStmt, - UpdateStmt: service.BulkUpdateStmt, - }) - }() - - chHAHostgroup := haInstance.RegisterNotificationListener() - go func() { - super.ChErr <- configsync.Operator(&super, chHAHostgroup, &configsync.Context{ - ObjectType: "hostgroup", - Factory: hostgroup.NewHostgroup, - InsertStmt: hostgroup.BulkInsertStmt, - DeleteStmt: hostgroup.BulkDeleteStmt, - UpdateStmt: hostgroup.BulkUpdateStmt, - }) - }() - - chHAServicegroup := haInstance.RegisterNotificationListener() - go func() { - super.ChErr <- configsync.Operator(&super, chHAServicegroup, &configsync.Context{ - ObjectType: "servicegroup", - Factory: servicegroup.NewServicegroup, - InsertStmt: servicegroup.BulkInsertStmt, - DeleteStmt: servicegroup.BulkDeleteStmt, - UpdateStmt: servicegroup.BulkUpdateStmt, - }) - }() - - chHAUser := haInstance.RegisterNotificationListener() - go func() { - super.ChErr <- configsync.Operator(&super, chHAUser, &configsync.Context{ - ObjectType: "user", - Factory: user.NewUser, - InsertStmt: user.BulkInsertStmt, - DeleteStmt: user.BulkDeleteStmt, - UpdateStmt: user.BulkUpdateStmt, - }) - }() + startConfigSyncOperators(&super, haInstance) statesync.StartStateSync(&super) @@ -129,3 +76,30 @@ func main() { } } } + +func startConfigSyncOperators(super *supervisor.Supervisor, haInstance *ha.HA) { + chHAHost := haInstance.RegisterNotificationListener() + go func() { + super.ChErr <- configsync.Operator(super, chHAHost, &host.ObjectInformation) + }() + + chHAService := haInstance.RegisterNotificationListener() + go func() { + super.ChErr <- configsync.Operator(super, chHAService, &service.ObjectInformation) + }() + + chHAHostgroup := haInstance.RegisterNotificationListener() + go func() { + super.ChErr <- configsync.Operator(super, chHAHostgroup, &hostgroup.ObjectInformation) + }() + + chHAServicegroup := haInstance.RegisterNotificationListener() + go func() { + super.ChErr <- configsync.Operator(super, chHAServicegroup, &servicegroup.ObjectInformation) + }() + + chHAUser := haInstance.RegisterNotificationListener() + go func() { + super.ChErr <- configsync.Operator(super, chHAUser, &user.ObjectInformation) + }() +}