diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 5c108af9..00a993d8 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -70,6 +70,8 @@ func run() int { } ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger) ha := icingadb.NewHA(ctx, db, heartbeat, logger) // Closing ha on exit ensures that this instance retracts its heartbeat @@ -80,7 +82,7 @@ func run() int { rt := icingadb.NewRuntimeUpdates(db, rc, logger) ods := overdue.NewSync(db, rc, logger) - sig := make(chan os.Signal) + sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) // Main loop @@ -234,14 +236,18 @@ func run() int { // otherwise there is no way to get Icinga DB back into a working state. panic(errors.New("HA exited without an error but main context isn't cancelled")) } + + cancelHactx() return ExitFailure case <-ctx.Done(): panic(errors.New("main context closed unexpectedly")) case s := <-sig: logger.Infow("Exiting due to signal", zap.String("signal", s.String())) - cancelCtx() + cancelHactx() return ExitSuccess } } + + cancelHactx() } } diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go index d52855ea..d209deab 100644 --- a/pkg/com/bulker.go +++ b/pkg/com/bulker.go @@ -11,7 +11,6 @@ type Bulker struct { ch chan []interface{} ctx context.Context mu sync.Mutex - err error } func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker { diff --git a/pkg/com/cond.go b/pkg/com/cond.go index a70bf3f1..63ef1950 100644 --- a/pkg/com/cond.go +++ b/pkg/com/cond.go @@ -5,7 +5,7 @@ import ( "errors" ) -var CondClosed = errors.New("condition closed") +var ErrCondClosed = errors.New("condition closed") // Cond implements a condition variable, a rendezvous point // for goroutines waiting for or announcing the occurrence @@ -64,7 +64,7 @@ func (c *Cond) Wait() <-chan struct{} { case l := <-c.listeners: return l case <-c.ctx.Done(): - panic(CondClosed) + panic(ErrCondClosed) } } @@ -73,7 +73,7 @@ func (c *Cond) Broadcast() { select { case c.broadcast <- struct{}{}: case <-c.ctx.Done(): - panic(CondClosed) + panic(ErrCondClosed) } } diff --git a/pkg/com/entity_bulker.go b/pkg/com/entity_bulker.go index 443063fd..1377a3c8 100644 --- a/pkg/com/entity_bulker.go +++ b/pkg/com/entity_bulker.go @@ -12,7 +12,6 @@ type EntityBulker struct { ch chan []contracts.Entity ctx context.Context mu sync.Mutex - err error } func NewEntityBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *EntityBulker { diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 7f21a496..0f2ace67 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -18,7 +18,6 @@ type Delta struct { Delete EntitiesById Subject *common.SyncSubject done chan error - err error logger *zap.SugaredLogger } diff --git a/pkg/icingadb/entitiesbyid.go b/pkg/icingadb/entitiesbyid.go index 2f7788d6..f8f71be7 100644 --- a/pkg/icingadb/entitiesbyid.go +++ b/pkg/icingadb/entitiesbyid.go @@ -26,7 +26,7 @@ func (ebi EntitiesById) IDs() []interface{} { } func (ebi EntitiesById) Entities(ctx context.Context) <-chan contracts.Entity { - entities := make(chan contracts.Entity, 0) + entities := make(chan contracts.Entity) go func() { defer close(entities) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index e753eb34..e6b05ba3 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -170,26 +170,26 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo Isolation: sql.LevelSerializable, }) if err != nil { + cancel() return err } rows, err := tx.QueryxContext(ctx, `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout))) if err != nil { + cancel() return err } takeover := true - for rows.Next() { + if rows.Next() { instance := &v1.IcingadbInstance{} err := rows.StructScan(instance) if err != nil { h.logger.Errorw("Can't scan currently active instance", zap.Error(err)) - break + } else { + if shouldLog { + h.logger.Infow("Another instance is active", "instance_id", instance.Id, zap.String("environment", s.Environment), "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) + } + takeover = false } - - if shouldLog { - h.logger.Infow("Another instance is active", "instance_id", instance.Id, zap.String("environment", s.Environment), "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Now().Sub(instance.Heartbeat.Time()))) - } - takeover = false - break } _ = rows.Close() i := v1.IcingadbInstance{ @@ -232,6 +232,9 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo continue } } + + cancel() + if err := tx.Commit(); err != nil { return err } diff --git a/pkg/icingadb/objectpacker/objectpacker_test.go b/pkg/icingadb/objectpacker/objectpacker_test.go index 6afb5882..33d60934 100644 --- a/pkg/icingadb/objectpacker/objectpacker_test.go +++ b/pkg/icingadb/objectpacker/objectpacker_test.go @@ -5,7 +5,6 @@ import ( "github.com/icinga/icingadb/pkg/types" "io" "testing" - "unsafe" ) // limitedWriter allows writing a specific amount of data. @@ -113,7 +112,7 @@ func TestPackAny(t *testing.T) { 3, 0x40, 0x45, 0, 0, 0, 0, 0, 0, }) - assertPackAny(t, map[[1]byte]bool{[1]byte{42}: true}, []byte{ + assertPackAny(t, map[[1]byte]bool{{42}: true}, []byte{ 6, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 42, 2, @@ -147,10 +146,10 @@ func TestPackAny(t *testing.T) { assertPackAnyPanic(t, complex64(0+0i), 0) assertPackAnyPanic(t, 0+0i, 0) - assertPackAnyPanic(t, make(chan struct{}, 0), 0) + assertPackAnyPanic(t, make(chan struct{}), 0) assertPackAnyPanic(t, func() {}, 0) assertPackAnyPanic(t, struct{}{}, 0) - assertPackAnyPanic(t, unsafe.Pointer(uintptr(0)), 0) + assertPackAnyPanic(t, uintptr(0), 0) } func assertPackAny(t *testing.T, in interface{}, out []byte) { @@ -159,8 +158,8 @@ func assertPackAny(t *testing.T, in interface{}, out []byte) { { buf := &bytes.Buffer{} if err := PackAny(in, buf); err == nil { - if bytes.Compare(buf.Bytes(), out) != 0 { - t.Errorf("buf := &bytes.Buffer{}; packAny(%#v, buf); bytes.Compare(buf.Bytes(), %#v) != 0", in, out) + if !bytes.Equal(buf.Bytes(), out) { + t.Errorf("buf := &bytes.Buffer{}; packAny(%#v, buf); !bytes.Equal(buf.Bytes(), %#v)", in, out) } } else { t.Errorf("packAny(%#v, &bytes.Buffer{}) != nil", in) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 3b0e0eed..6c5dbe0c 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -46,7 +46,7 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du s.logger.Infow("Waiting for dump done signal", zap.String("type", typeName), zap.String("key", key), - zap.Duration("duration", time.Now().Sub(startTime))) + zap.Duration("duration", time.Since(startTime))) loggedWaiting = true case <-dump.Done(key): logFn := s.logger.Debugw @@ -56,7 +56,7 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du logFn("Starting sync", zap.String("type", typeName), zap.String("key", key), - zap.Duration("waited", time.Now().Sub(startTime))) + zap.Duration("waited", time.Since(startTime))) return s.Sync(ctx, subject) case <-ctx.Done(): return ctx.Err() diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index bbb160c2..bc5e2c0e 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -84,7 +84,9 @@ func (h Heartbeat) controller() { // Message producer loop g.Go(func() error { // We expect heartbeats every second but only read them every 3 seconds - throttle := time.Tick(time.Second * 3) + throttle := time.NewTicker(time.Second * 3) + defer throttle.Stop() + for { cmd := h.client.XRead(ctx, &redis.XReadArgs{ Streams: []string{"icinga:stats", "$"}, @@ -102,7 +104,7 @@ func (h Heartbeat) controller() { return ctx.Err() } - <-throttle + <-throttle.C } }) diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go index aa947a87..eb661724 100644 --- a/pkg/icingaredis/utils.go +++ b/pkg/icingaredis/utils.go @@ -13,7 +13,7 @@ import ( ) func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) { - entities := make(chan contracts.Entity, 0) + entities := make(chan contracts.Entity) g, ctx := errgroup.WithContext(ctx) g.Go(func() error { @@ -54,7 +54,7 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc } func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) { - entitiesWithChecksum := make(chan contracts.Entity, 0) + entitiesWithChecksum := make(chan contracts.Entity) g, ctx := errgroup.WithContext(ctx) g.Go(func() error { @@ -70,7 +70,7 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu } else { panic("no checksum") // TODO(el): Error is not published - return errors.New("no checksum") + //return errors.New("no checksum") } select { diff --git a/pkg/types/binary.go b/pkg/types/binary.go index 619c4503..12e40692 100644 --- a/pkg/types/binary.go +++ b/pkg/types/binary.go @@ -108,7 +108,7 @@ func (binary *Binary) Scan(src interface{}) error { *binary = b default: - return fmt.Errorf("Unable to scan type %T into Binary", src) + return fmt.Errorf("unable to scan type %T into Binary", src) } return nil diff --git a/version.go b/version.go deleted file mode 100644 index d0f223aa..00000000 --- a/version.go +++ /dev/null @@ -1,3 +0,0 @@ -package main - -const version = "v1.0.0-rc1"