From b5e024e68d0ad892b0101217a59555555c8f47fc Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 5 Oct 2021 16:34:43 +0200 Subject: [PATCH] Sync state runtime updates ASAP I.e. don't wait for the complete initial sync first. --- cmd/icingadb/main.go | 46 +++++++++++++++++++++++---------- pkg/icingadb/runtime_updates.go | 22 +++++++++------- pkg/icingadb/v1/v1.go | 6 ++--- 3 files changed, 48 insertions(+), 26 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 8d948d00..e62f471e 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -118,13 +118,14 @@ func run() int { for hactx.Err() == nil { synctx, cancelSynctx := context.WithCancel(hactx) g, synctx := errgroup.WithContext(synctx) - // WaitGroup for configuration synchronization. - // Runtime updates must wait for configuration synchronization to complete. - wg := sync.WaitGroup{} + // WaitGroups for initial synchronization. + // Runtime updates must wait for initial synchronization to complete. + configInitSync := sync.WaitGroup{} + stateInitSync := &sync.WaitGroup{} // Get the last IDs of the runtime update streams before starting anything else, // otherwise updates may be lost. - runtimeUpdateStreams, err := rt.Streams(ctx) + runtimeConfigUpdateStreams, runtimeStateUpdateStreams, err := rt.Streams(ctx) if err != nil { logger.Fatalf("%+v", err) } @@ -161,20 +162,32 @@ func run() int { }) logger.Info("Starting config sync") - for _, factory := range v1.Factories { + + for _, factory := range v1.ConfigFactories { factory := factory - wg.Add(1) + configInitSync.Add(1) g.Go(func() error { - defer wg.Done() + defer configInitSync.Done() return s.SyncAfterDump(synctx, common.NewSyncSubject(factory.WithInit), dump) }) } - wg.Add(1) + for _, factory := range v1.StateFactories { + factory := factory + + stateInitSync.Add(1) + g.Go(func() error { + defer stateInitSync.Done() + + return s.SyncAfterDump(synctx, common.NewSyncSubject(factory.WithInit), dump) + }) + } + + configInitSync.Add(1) g.Go(func() error { - defer wg.Done() + defer configInitSync.Done() <-dump.Done("icinga:customvar") @@ -234,19 +247,24 @@ func run() int { }) g.Go(func() error { - wg.Wait() - - logger.Info("Starting runtime updates sync") + configInitSync.Wait() + logger.Info("Starting config runtime updates sync") // @TODO(el): The customvar runtime update sync may change because the customvar flat // runtime update sync is not yet implemented. return rt.Sync( synctx, - append([]contracts.EntityFactoryFunc{v1.NewCustomvar}, v1.Factories...), - runtimeUpdateStreams, + append([]contracts.EntityFactoryFunc{v1.NewCustomvar}, v1.ConfigFactories...), + runtimeConfigUpdateStreams, ) }) + g.Go(func() error { + stateInitSync.Wait() + logger.Info("Starting state runtime updates sync") + return rt.Sync(synctx, v1.StateFactories, runtimeStateUpdateStreams) + }) + if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) { logger.Fatalf("%+v", err) } diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 1f62d61f..c100b7b3 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -34,18 +34,22 @@ func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *zap.SugaredLog const bulkSize = 1 << 14 // Streams returns the stream key to ID mapping of the runtime update streams for later use in Sync. -func (r *RuntimeUpdates) Streams(ctx context.Context) (icingaredis.Streams, error) { - keys := [...]string{"icinga:runtime", "icinga:runtime:state"} - streams := make(map[string]string, len(keys)) - for _, key := range keys { - id, err := r.redis.StreamLastId(ctx, key) - if err != nil { - return nil, err +func (r *RuntimeUpdates) Streams(ctx context.Context) (config, state icingaredis.Streams, err error) { + config = icingaredis.Streams{"icinga:runtime": "0-0"} + state = icingaredis.Streams{"icinga:runtime:state": "0-0"} + + for _, streams := range [...]icingaredis.Streams{config, state} { + for key := range streams { + id, err := r.redis.StreamLastId(ctx, key) + if err != nil { + return nil, nil, err + } + + streams[key] = id } - streams[key] = id } - return streams, nil + return } // Sync synchronizes runtime update streams from s.redis to s.db and deletes the original data on success. diff --git a/pkg/icingadb/v1/v1.go b/pkg/icingadb/v1/v1.go index 337c5c46..7149cbf3 100644 --- a/pkg/icingadb/v1/v1.go +++ b/pkg/icingadb/v1/v1.go @@ -4,7 +4,9 @@ import ( "github.com/icinga/icingadb/pkg/contracts" ) -var Factories = []contracts.EntityFactoryFunc{ +var StateFactories = []contracts.EntityFactoryFunc{NewHostState, NewServiceState} + +var ConfigFactories = []contracts.EntityFactoryFunc{ NewActionUrl, NewCheckcommand, NewCheckcommandArgument, @@ -19,7 +21,6 @@ var Factories = []contracts.EntityFactoryFunc{ NewEventcommandEnvvar, NewHost, NewHostCustomvar, - NewHostState, NewHostgroup, NewHostgroupCustomvar, NewHostgroupMember, @@ -36,7 +37,6 @@ var Factories = []contracts.EntityFactoryFunc{ NewNotificationUsergroup, NewService, NewServiceCustomvar, - NewServiceState, NewServicegroup, NewServicegroupCustomvar, NewServicegroupMember,