From 6ca9d36c8bf231c8e29cbac21f5515913fec5e6c Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 4 Mar 2021 00:50:24 +0100 Subject: [PATCH] Add cmd --- cmd/icingadb/main.go | 158 ++++++++++++++++++++++++++++++++++++ internal/command/command.go | 74 +++++++++++++++++ 2 files changed, 232 insertions(+) create mode 100644 cmd/icingadb/main.go create mode 100644 internal/command/command.go diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go new file mode 100644 index 00000000..54c0e079 --- /dev/null +++ b/cmd/icingadb/main.go @@ -0,0 +1,158 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "github.com/icinga/icingadb/internal/command" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/flatten" + "github.com/icinga/icingadb/pkg/icingadb" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "runtime" +) + +func main() { + cmd := command.New() + instanceId := cmd.InstanceId() + logger := cmd.Logger + defer logger.Sync() + defer func() { + if err := recover(); err != nil { + type stackTracer interface { + StackTrace() errors.StackTrace + } + if err, ok := err.(stackTracer); ok { + for _, f := range err.StackTrace() { + fmt.Printf("%+s:%d\n", f, f) + } + } + } + }() + db := cmd.Database() + defer db.Close() + rc := cmd.Redis() + + ctx := context.Background() + heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger) + ha := icingadb.NewHA(ctx, instanceId, db, heartbeat, logger) + s := icingadb.NewSync(db, rc, logger) + + // For temporary exit after sync + done := make(chan interface{}, 0) + + // Main loop + for { + hactx, cancel := context.WithCancel(ctx) + for { + select { + case <-ha.Takeover(): + go func() { + g, synctx := errgroup.WithContext(hactx) + + g.Go(func() error { + return nil + // TODO(el). This code is a draft for trying to synchronize customvar_flat from the customvar + // delta, which actually doesn't really make sense, since both synchronizations must always be + // completed without errors. The synchronization of customar and customvar_flat should only + // fetch the desired entities once and multiplex them to the synchronization of customvar and + // customvar_flat. + delta := s.GetDelta(synctx, v1.NewCustomvar) + if err := delta.Wait(); err != nil { + return err + } + + entities := utils.SyncMapEntities(delta.Create) + flat := make(chan contracts.Entity, 0) + + cvg, _ := errgroup.WithContext(synctx) + + g.Go(func() error { + defer close(flat) + + for i := 0; i < 1<