From cabcd458ffd8f4bd33d06b1543845fe04649b1e5 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 13:20:48 +0200 Subject: [PATCH 01/12] Don't "misuse" unsafe.Pointer --- pkg/icingadb/objectpacker/objectpacker_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/icingadb/objectpacker/objectpacker_test.go b/pkg/icingadb/objectpacker/objectpacker_test.go index 6afb5882..26a1d7ff 100644 --- a/pkg/icingadb/objectpacker/objectpacker_test.go +++ b/pkg/icingadb/objectpacker/objectpacker_test.go @@ -5,7 +5,6 @@ import ( "github.com/icinga/icingadb/pkg/types" "io" "testing" - "unsafe" ) // limitedWriter allows writing a specific amount of data. @@ -150,7 +149,7 @@ func TestPackAny(t *testing.T) { assertPackAnyPanic(t, make(chan struct{}, 0), 0) assertPackAnyPanic(t, func() {}, 0) assertPackAnyPanic(t, struct{}{}, 0) - assertPackAnyPanic(t, unsafe.Pointer(uintptr(0)), 0) + assertPackAnyPanic(t, uintptr(0), 0) } func assertPackAny(t *testing.T, in interface{}, out []byte) { From 621c1b953737a8983ba9701da8f01fc5426000c3 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 13:43:29 +0200 Subject: [PATCH 02/12] Ensure context cancellation --- cmd/icingadb/main.go | 8 +++++++- pkg/icingadb/ha.go | 5 +++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 5c108af9..3c287cee 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -70,6 +70,8 @@ func run() int { } ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger) ha := icingadb.NewHA(ctx, db, heartbeat, logger) // Closing ha on exit ensures that this instance retracts its heartbeat @@ -234,14 +236,18 @@ func run() int { // otherwise there is no way to get Icinga DB back into a working state. panic(errors.New("HA exited without an error but main context isn't cancelled")) } + + cancelHactx() return ExitFailure case <-ctx.Done(): panic(errors.New("main context closed unexpectedly")) case s := <-sig: logger.Infow("Exiting due to signal", zap.String("signal", s.String())) - cancelCtx() + cancelHactx() return ExitSuccess } } + + cancelHactx() } } diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index e753eb34..2a6dfacb 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -170,10 +170,12 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo Isolation: sql.LevelSerializable, }) if err != nil { + cancel() return err } rows, err := tx.QueryxContext(ctx, `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout))) if err != nil { + cancel() return err } takeover := true @@ -232,6 +234,9 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo continue } } + + cancel() + if err := tx.Commit(); err != nil { return err } From c3ea4d949061b71dd14700166deb8e69264fd81a Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 13:44:21 +0200 Subject: [PATCH 03/12] Avoid unreachable code --- pkg/icingaredis/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go index aa947a87..75c07e51 100644 --- a/pkg/icingaredis/utils.go +++ b/pkg/icingaredis/utils.go @@ -70,7 +70,7 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu } else { panic("no checksum") // TODO(el): Error is not published - return errors.New("no checksum") + //return errors.New("no checksum") } select { From dac83f2773b131141ba1a5699a867e39b21ea121 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 13:46:40 +0200 Subject: [PATCH 04/12] Drop unused stuff --- pkg/com/bulker.go | 1 - pkg/com/entity_bulker.go | 1 - pkg/icingadb/delta.go | 1 - version.go | 3 --- 4 files changed, 6 deletions(-) delete mode 100644 version.go diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go index d52855ea..d209deab 100644 --- a/pkg/com/bulker.go +++ b/pkg/com/bulker.go @@ -11,7 +11,6 @@ type Bulker struct { ch chan []interface{} ctx context.Context mu sync.Mutex - err error } func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker { diff --git a/pkg/com/entity_bulker.go b/pkg/com/entity_bulker.go index 443063fd..1377a3c8 100644 --- a/pkg/com/entity_bulker.go +++ b/pkg/com/entity_bulker.go @@ -12,7 +12,6 @@ type EntityBulker struct { ch chan []contracts.Entity ctx context.Context mu sync.Mutex - err error } func NewEntityBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *EntityBulker { diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 7f21a496..0f2ace67 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -18,7 +18,6 @@ type Delta struct { Delete EntitiesById Subject *common.SyncSubject done chan error - err error logger *zap.SugaredLogger } diff --git a/version.go b/version.go deleted file mode 100644 index d0f223aa..00000000 --- a/version.go +++ /dev/null @@ -1,3 +0,0 @@ -package main - -const version = "v1.0.0-rc1" From c636de294a378f89702f34b19d6d6a6376d2c11c Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 13:50:00 +0200 Subject: [PATCH 05/12] Un-capitalize error messages --- pkg/types/binary.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/types/binary.go b/pkg/types/binary.go index 619c4503..12e40692 100644 --- a/pkg/types/binary.go +++ b/pkg/types/binary.go @@ -108,7 +108,7 @@ func (binary *Binary) Scan(src interface{}) error { *binary = b default: - return fmt.Errorf("Unable to scan type %T into Binary", src) + return fmt.Errorf("unable to scan type %T into Binary", src) } return nil From 5a084ba7a9d96e8c87fc793305ab346ecbbd2d1b Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 13:56:21 +0200 Subject: [PATCH 06/12] Simplify code --- pkg/icingadb/entitiesbyid.go | 2 +- pkg/icingadb/objectpacker/objectpacker_test.go | 4 ++-- pkg/icingaredis/utils.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/icingadb/entitiesbyid.go b/pkg/icingadb/entitiesbyid.go index 2f7788d6..f8f71be7 100644 --- a/pkg/icingadb/entitiesbyid.go +++ b/pkg/icingadb/entitiesbyid.go @@ -26,7 +26,7 @@ func (ebi EntitiesById) IDs() []interface{} { } func (ebi EntitiesById) Entities(ctx context.Context) <-chan contracts.Entity { - entities := make(chan contracts.Entity, 0) + entities := make(chan contracts.Entity) go func() { defer close(entities) diff --git a/pkg/icingadb/objectpacker/objectpacker_test.go b/pkg/icingadb/objectpacker/objectpacker_test.go index 26a1d7ff..15731815 100644 --- a/pkg/icingadb/objectpacker/objectpacker_test.go +++ b/pkg/icingadb/objectpacker/objectpacker_test.go @@ -112,7 +112,7 @@ func TestPackAny(t *testing.T) { 3, 0x40, 0x45, 0, 0, 0, 0, 0, 0, }) - assertPackAny(t, map[[1]byte]bool{[1]byte{42}: true}, []byte{ + assertPackAny(t, map[[1]byte]bool{{42}: true}, []byte{ 6, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 42, 2, @@ -146,7 +146,7 @@ func TestPackAny(t *testing.T) { assertPackAnyPanic(t, complex64(0+0i), 0) assertPackAnyPanic(t, 0+0i, 0) - assertPackAnyPanic(t, make(chan struct{}, 0), 0) + assertPackAnyPanic(t, make(chan struct{}), 0) assertPackAnyPanic(t, func() {}, 0) assertPackAnyPanic(t, struct{}{}, 0) assertPackAnyPanic(t, uintptr(0), 0) diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go index 75c07e51..eb661724 100644 --- a/pkg/icingaredis/utils.go +++ b/pkg/icingaredis/utils.go @@ -13,7 +13,7 @@ import ( ) func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) { - entities := make(chan contracts.Entity, 0) + entities := make(chan contracts.Entity) g, ctx := errgroup.WithContext(ctx) g.Go(func() error { @@ -54,7 +54,7 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc } func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) { - entitiesWithChecksum := make(chan contracts.Entity, 0) + entitiesWithChecksum := make(chan contracts.Entity) g, ctx := errgroup.WithContext(ctx) g.Go(func() error { From 35349262cee830c69c9b3b36dae1db5ef3eb7bbb Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 14:00:23 +0200 Subject: [PATCH 07/12] Use time.NewTicker(), not time.Tick() --- pkg/icingaredis/heartbeat.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index bbb160c2..bc5e2c0e 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -84,7 +84,9 @@ func (h Heartbeat) controller() { // Message producer loop g.Go(func() error { // We expect heartbeats every second but only read them every 3 seconds - throttle := time.Tick(time.Second * 3) + throttle := time.NewTicker(time.Second * 3) + defer throttle.Stop() + for { cmd := h.client.XRead(ctx, &redis.XReadArgs{ Streams: []string{"icinga:stats", "$"}, @@ -102,7 +104,7 @@ func (h Heartbeat) controller() { return ctx.Err() } - <-throttle + <-throttle.C } }) From a4abdce1f09001d13e3ad569545ac2d3fe0bcb9b Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 14:03:04 +0200 Subject: [PATCH 08/12] Use time.Since(x), not time.Now().Sub(x) --- pkg/icingadb/ha.go | 2 +- pkg/icingadb/sync.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 2a6dfacb..a42b7265 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -188,7 +188,7 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo } if shouldLog { - h.logger.Infow("Another instance is active", "instance_id", instance.Id, zap.String("environment", s.Environment), "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Now().Sub(instance.Heartbeat.Time()))) + h.logger.Infow("Another instance is active", "instance_id", instance.Id, zap.String("environment", s.Environment), "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) } takeover = false break diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 3b0e0eed..6c5dbe0c 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -46,7 +46,7 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du s.logger.Infow("Waiting for dump done signal", zap.String("type", typeName), zap.String("key", key), - zap.Duration("duration", time.Now().Sub(startTime))) + zap.Duration("duration", time.Since(startTime))) loggedWaiting = true case <-dump.Done(key): logFn := s.logger.Debugw @@ -56,7 +56,7 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du logFn("Starting sync", zap.String("type", typeName), zap.String("key", key), - zap.Duration("waited", time.Now().Sub(startTime))) + zap.Duration("waited", time.Since(startTime))) return s.Sync(ctx, subject) case <-ctx.Done(): return ctx.Err() From 1329c68cf31e5310f318875ab540a2f6b56bbddf Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 14:06:33 +0200 Subject: [PATCH 09/12] Use !bytes.Equal(x,y), not bytes.Compare(x,y)!=0 --- pkg/icingadb/objectpacker/objectpacker_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/objectpacker/objectpacker_test.go b/pkg/icingadb/objectpacker/objectpacker_test.go index 15731815..33d60934 100644 --- a/pkg/icingadb/objectpacker/objectpacker_test.go +++ b/pkg/icingadb/objectpacker/objectpacker_test.go @@ -158,8 +158,8 @@ func assertPackAny(t *testing.T, in interface{}, out []byte) { { buf := &bytes.Buffer{} if err := PackAny(in, buf); err == nil { - if bytes.Compare(buf.Bytes(), out) != 0 { - t.Errorf("buf := &bytes.Buffer{}; packAny(%#v, buf); bytes.Compare(buf.Bytes(), %#v) != 0", in, out) + if !bytes.Equal(buf.Bytes(), out) { + t.Errorf("buf := &bytes.Buffer{}; packAny(%#v, buf); !bytes.Equal(buf.Bytes(), %#v)", in, out) } } else { t.Errorf("packAny(%#v, &bytes.Buffer{}) != nil", in) From 64433ec67450292ba8def5fac9c4204f4e6a86fb Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 14:14:05 +0200 Subject: [PATCH 10/12] Make channel for signal.Notify() buffered --- cmd/icingadb/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 3c287cee..00a993d8 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -82,7 +82,7 @@ func run() int { rt := icingadb.NewRuntimeUpdates(db, rc, logger) ods := overdue.NewSync(db, rc, logger) - sig := make(chan os.Signal) + sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) // Main loop From afe0a90487be1a209cd32ac1f72d901fa832a9b7 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 14:18:43 +0200 Subject: [PATCH 11/12] s/CondClosed/ErrCondClosed/ --- pkg/com/cond.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/com/cond.go b/pkg/com/cond.go index a70bf3f1..63ef1950 100644 --- a/pkg/com/cond.go +++ b/pkg/com/cond.go @@ -5,7 +5,7 @@ import ( "errors" ) -var CondClosed = errors.New("condition closed") +var ErrCondClosed = errors.New("condition closed") // Cond implements a condition variable, a rendezvous point // for goroutines waiting for or announcing the occurrence @@ -64,7 +64,7 @@ func (c *Cond) Wait() <-chan struct{} { case l := <-c.listeners: return l case <-c.ctx.Done(): - panic(CondClosed) + panic(ErrCondClosed) } } @@ -73,7 +73,7 @@ func (c *Cond) Broadcast() { select { case c.broadcast <- struct{}{}: case <-c.ctx.Done(): - panic(CondClosed) + panic(ErrCondClosed) } } From b0354e35032692ab4388cbb37e76a5c596c8dcac Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 28 May 2021 14:20:41 +0200 Subject: [PATCH 12/12] Don't misuse loop as if --- pkg/icingadb/ha.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index a42b7265..e6b05ba3 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -179,19 +179,17 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo return err } takeover := true - for rows.Next() { + if rows.Next() { instance := &v1.IcingadbInstance{} err := rows.StructScan(instance) if err != nil { h.logger.Errorw("Can't scan currently active instance", zap.Error(err)) - break + } else { + if shouldLog { + h.logger.Infow("Another instance is active", "instance_id", instance.Id, zap.String("environment", s.Environment), "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) + } + takeover = false } - - if shouldLog { - h.logger.Infow("Another instance is active", "instance_id", instance.Id, zap.String("environment", s.Environment), "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) - } - takeover = false - break } _ = rows.Close() i := v1.IcingadbInstance{