Sync state runtime updates ASAP

I.e. don't wait for the complete initial sync first.
This commit is contained in:
Alexander A. Klimov 2021-10-05 16:34:43 +02:00
parent 9216c10625
commit b5e024e68d
3 changed files with 48 additions and 26 deletions

View file

@ -118,13 +118,14 @@ func run() int {
for hactx.Err() == nil {
synctx, cancelSynctx := context.WithCancel(hactx)
g, synctx := errgroup.WithContext(synctx)
// WaitGroup for configuration synchronization.
// Runtime updates must wait for configuration synchronization to complete.
wg := sync.WaitGroup{}
// WaitGroups for initial synchronization.
// Runtime updates must wait for initial synchronization to complete.
configInitSync := sync.WaitGroup{}
stateInitSync := &sync.WaitGroup{}
// Get the last IDs of the runtime update streams before starting anything else,
// otherwise updates may be lost.
runtimeUpdateStreams, err := rt.Streams(ctx)
runtimeConfigUpdateStreams, runtimeStateUpdateStreams, err := rt.Streams(ctx)
if err != nil {
logger.Fatalf("%+v", err)
}
@ -161,20 +162,32 @@ func run() int {
})
logger.Info("Starting config sync")
for _, factory := range v1.Factories {
for _, factory := range v1.ConfigFactories {
factory := factory
wg.Add(1)
configInitSync.Add(1)
g.Go(func() error {
defer wg.Done()
defer configInitSync.Done()
return s.SyncAfterDump(synctx, common.NewSyncSubject(factory.WithInit), dump)
})
}
wg.Add(1)
for _, factory := range v1.StateFactories {
factory := factory
stateInitSync.Add(1)
g.Go(func() error {
defer stateInitSync.Done()
return s.SyncAfterDump(synctx, common.NewSyncSubject(factory.WithInit), dump)
})
}
configInitSync.Add(1)
g.Go(func() error {
defer wg.Done()
defer configInitSync.Done()
<-dump.Done("icinga:customvar")
@ -234,19 +247,24 @@ func run() int {
})
g.Go(func() error {
wg.Wait()
logger.Info("Starting runtime updates sync")
configInitSync.Wait()
logger.Info("Starting config runtime updates sync")
// @TODO(el): The customvar runtime update sync may change because the customvar flat
// runtime update sync is not yet implemented.
return rt.Sync(
synctx,
append([]contracts.EntityFactoryFunc{v1.NewCustomvar}, v1.Factories...),
runtimeUpdateStreams,
append([]contracts.EntityFactoryFunc{v1.NewCustomvar}, v1.ConfigFactories...),
runtimeConfigUpdateStreams,
)
})
g.Go(func() error {
stateInitSync.Wait()
logger.Info("Starting state runtime updates sync")
return rt.Sync(synctx, v1.StateFactories, runtimeStateUpdateStreams)
})
if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) {
logger.Fatalf("%+v", err)
}

View file

@ -34,18 +34,22 @@ func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *zap.SugaredLog
const bulkSize = 1 << 14
// Streams returns the stream key to ID mapping of the runtime update streams for later use in Sync.
func (r *RuntimeUpdates) Streams(ctx context.Context) (icingaredis.Streams, error) {
keys := [...]string{"icinga:runtime", "icinga:runtime:state"}
streams := make(map[string]string, len(keys))
for _, key := range keys {
id, err := r.redis.StreamLastId(ctx, key)
if err != nil {
return nil, err
func (r *RuntimeUpdates) Streams(ctx context.Context) (config, state icingaredis.Streams, err error) {
config = icingaredis.Streams{"icinga:runtime": "0-0"}
state = icingaredis.Streams{"icinga:runtime:state": "0-0"}
for _, streams := range [...]icingaredis.Streams{config, state} {
for key := range streams {
id, err := r.redis.StreamLastId(ctx, key)
if err != nil {
return nil, nil, err
}
streams[key] = id
}
streams[key] = id
}
return streams, nil
return
}
// Sync synchronizes runtime update streams from s.redis to s.db and deletes the original data on success.

View file

@ -4,7 +4,9 @@ import (
"github.com/icinga/icingadb/pkg/contracts"
)
var Factories = []contracts.EntityFactoryFunc{
var StateFactories = []contracts.EntityFactoryFunc{NewHostState, NewServiceState}
var ConfigFactories = []contracts.EntityFactoryFunc{
NewActionUrl,
NewCheckcommand,
NewCheckcommandArgument,
@ -19,7 +21,6 @@ var Factories = []contracts.EntityFactoryFunc{
NewEventcommandEnvvar,
NewHost,
NewHostCustomvar,
NewHostState,
NewHostgroup,
NewHostgroupCustomvar,
NewHostgroupMember,
@ -36,7 +37,6 @@ var Factories = []contracts.EntityFactoryFunc{
NewNotificationUsergroup,
NewService,
NewServiceCustomvar,
NewServiceState,
NewServicegroup,
NewServicegroupCustomvar,
NewServicegroupMember,