mirror of
https://github.com/Icinga/icingadb.git
synced 2026-06-08 16:34:29 -04:00
Merge pull request #278 from Icinga/bugfix/lint-code
Make code quality comply to CI requirements
This commit is contained in:
commit
0b9dd32173
13 changed files with 38 additions and 34 deletions
|
|
@ -70,6 +70,8 @@ func run() int {
|
|||
}
|
||||
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
defer cancelCtx()
|
||||
|
||||
heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger)
|
||||
ha := icingadb.NewHA(ctx, db, heartbeat, logger)
|
||||
// Closing ha on exit ensures that this instance retracts its heartbeat
|
||||
|
|
@ -80,7 +82,7 @@ func run() int {
|
|||
rt := icingadb.NewRuntimeUpdates(db, rc, logger)
|
||||
ods := overdue.NewSync(db, rc, logger)
|
||||
|
||||
sig := make(chan os.Signal)
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
|
||||
|
||||
// Main loop
|
||||
|
|
@ -234,14 +236,18 @@ func run() int {
|
|||
// otherwise there is no way to get Icinga DB back into a working state.
|
||||
panic(errors.New("HA exited without an error but main context isn't cancelled"))
|
||||
}
|
||||
|
||||
cancelHactx()
|
||||
return ExitFailure
|
||||
case <-ctx.Done():
|
||||
panic(errors.New("main context closed unexpectedly"))
|
||||
case s := <-sig:
|
||||
logger.Infow("Exiting due to signal", zap.String("signal", s.String()))
|
||||
cancelCtx()
|
||||
cancelHactx()
|
||||
return ExitSuccess
|
||||
}
|
||||
}
|
||||
|
||||
cancelHactx()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ type Bulker struct {
|
|||
ch chan []interface{}
|
||||
ctx context.Context
|
||||
mu sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import (
|
|||
"errors"
|
||||
)
|
||||
|
||||
var CondClosed = errors.New("condition closed")
|
||||
var ErrCondClosed = errors.New("condition closed")
|
||||
|
||||
// Cond implements a condition variable, a rendezvous point
|
||||
// for goroutines waiting for or announcing the occurrence
|
||||
|
|
@ -64,7 +64,7 @@ func (c *Cond) Wait() <-chan struct{} {
|
|||
case l := <-c.listeners:
|
||||
return l
|
||||
case <-c.ctx.Done():
|
||||
panic(CondClosed)
|
||||
panic(ErrCondClosed)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -73,7 +73,7 @@ func (c *Cond) Broadcast() {
|
|||
select {
|
||||
case c.broadcast <- struct{}{}:
|
||||
case <-c.ctx.Done():
|
||||
panic(CondClosed)
|
||||
panic(ErrCondClosed)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ type EntityBulker struct {
|
|||
ch chan []contracts.Entity
|
||||
ctx context.Context
|
||||
mu sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func NewEntityBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *EntityBulker {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ type Delta struct {
|
|||
Delete EntitiesById
|
||||
Subject *common.SyncSubject
|
||||
done chan error
|
||||
err error
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ func (ebi EntitiesById) IDs() []interface{} {
|
|||
}
|
||||
|
||||
func (ebi EntitiesById) Entities(ctx context.Context) <-chan contracts.Entity {
|
||||
entities := make(chan contracts.Entity, 0)
|
||||
entities := make(chan contracts.Entity)
|
||||
|
||||
go func() {
|
||||
defer close(entities)
|
||||
|
|
|
|||
|
|
@ -170,26 +170,26 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo
|
|||
Isolation: sql.LevelSerializable,
|
||||
})
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
rows, err := tx.QueryxContext(ctx, `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout)))
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
takeover := true
|
||||
for rows.Next() {
|
||||
if rows.Next() {
|
||||
instance := &v1.IcingadbInstance{}
|
||||
err := rows.StructScan(instance)
|
||||
if err != nil {
|
||||
h.logger.Errorw("Can't scan currently active instance", zap.Error(err))
|
||||
break
|
||||
} else {
|
||||
if shouldLog {
|
||||
h.logger.Infow("Another instance is active", "instance_id", instance.Id, zap.String("environment", s.Environment), "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())))
|
||||
}
|
||||
takeover = false
|
||||
}
|
||||
|
||||
if shouldLog {
|
||||
h.logger.Infow("Another instance is active", "instance_id", instance.Id, zap.String("environment", s.Environment), "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Now().Sub(instance.Heartbeat.Time())))
|
||||
}
|
||||
takeover = false
|
||||
break
|
||||
}
|
||||
_ = rows.Close()
|
||||
i := v1.IcingadbInstance{
|
||||
|
|
@ -232,6 +232,9 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo
|
|||
continue
|
||||
}
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import (
|
|||
"github.com/icinga/icingadb/pkg/types"
|
||||
"io"
|
||||
"testing"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// limitedWriter allows writing a specific amount of data.
|
||||
|
|
@ -113,7 +112,7 @@ func TestPackAny(t *testing.T) {
|
|||
3, 0x40, 0x45, 0, 0, 0, 0, 0, 0,
|
||||
})
|
||||
|
||||
assertPackAny(t, map[[1]byte]bool{[1]byte{42}: true}, []byte{
|
||||
assertPackAny(t, map[[1]byte]bool{{42}: true}, []byte{
|
||||
6, 0, 0, 0, 0, 0, 0, 0, 1,
|
||||
0, 0, 0, 0, 0, 0, 0, 1, 42,
|
||||
2,
|
||||
|
|
@ -147,10 +146,10 @@ func TestPackAny(t *testing.T) {
|
|||
|
||||
assertPackAnyPanic(t, complex64(0+0i), 0)
|
||||
assertPackAnyPanic(t, 0+0i, 0)
|
||||
assertPackAnyPanic(t, make(chan struct{}, 0), 0)
|
||||
assertPackAnyPanic(t, make(chan struct{}), 0)
|
||||
assertPackAnyPanic(t, func() {}, 0)
|
||||
assertPackAnyPanic(t, struct{}{}, 0)
|
||||
assertPackAnyPanic(t, unsafe.Pointer(uintptr(0)), 0)
|
||||
assertPackAnyPanic(t, uintptr(0), 0)
|
||||
}
|
||||
|
||||
func assertPackAny(t *testing.T, in interface{}, out []byte) {
|
||||
|
|
@ -159,8 +158,8 @@ func assertPackAny(t *testing.T, in interface{}, out []byte) {
|
|||
{
|
||||
buf := &bytes.Buffer{}
|
||||
if err := PackAny(in, buf); err == nil {
|
||||
if bytes.Compare(buf.Bytes(), out) != 0 {
|
||||
t.Errorf("buf := &bytes.Buffer{}; packAny(%#v, buf); bytes.Compare(buf.Bytes(), %#v) != 0", in, out)
|
||||
if !bytes.Equal(buf.Bytes(), out) {
|
||||
t.Errorf("buf := &bytes.Buffer{}; packAny(%#v, buf); !bytes.Equal(buf.Bytes(), %#v)", in, out)
|
||||
}
|
||||
} else {
|
||||
t.Errorf("packAny(%#v, &bytes.Buffer{}) != nil", in)
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du
|
|||
s.logger.Infow("Waiting for dump done signal",
|
||||
zap.String("type", typeName),
|
||||
zap.String("key", key),
|
||||
zap.Duration("duration", time.Now().Sub(startTime)))
|
||||
zap.Duration("duration", time.Since(startTime)))
|
||||
loggedWaiting = true
|
||||
case <-dump.Done(key):
|
||||
logFn := s.logger.Debugw
|
||||
|
|
@ -56,7 +56,7 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du
|
|||
logFn("Starting sync",
|
||||
zap.String("type", typeName),
|
||||
zap.String("key", key),
|
||||
zap.Duration("waited", time.Now().Sub(startTime)))
|
||||
zap.Duration("waited", time.Since(startTime)))
|
||||
return s.Sync(ctx, subject)
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
|
|
|||
|
|
@ -84,7 +84,9 @@ func (h Heartbeat) controller() {
|
|||
// Message producer loop
|
||||
g.Go(func() error {
|
||||
// We expect heartbeats every second but only read them every 3 seconds
|
||||
throttle := time.Tick(time.Second * 3)
|
||||
throttle := time.NewTicker(time.Second * 3)
|
||||
defer throttle.Stop()
|
||||
|
||||
for {
|
||||
cmd := h.client.XRead(ctx, &redis.XReadArgs{
|
||||
Streams: []string{"icinga:stats", "$"},
|
||||
|
|
@ -102,7 +104,7 @@ func (h Heartbeat) controller() {
|
|||
return ctx.Err()
|
||||
}
|
||||
|
||||
<-throttle
|
||||
<-throttle.C
|
||||
}
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import (
|
|||
)
|
||||
|
||||
func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) {
|
||||
entities := make(chan contracts.Entity, 0)
|
||||
entities := make(chan contracts.Entity)
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
g.Go(func() error {
|
||||
|
|
@ -54,7 +54,7 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc
|
|||
}
|
||||
|
||||
func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) {
|
||||
entitiesWithChecksum := make(chan contracts.Entity, 0)
|
||||
entitiesWithChecksum := make(chan contracts.Entity)
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
g.Go(func() error {
|
||||
|
|
@ -70,7 +70,7 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu
|
|||
} else {
|
||||
panic("no checksum")
|
||||
// TODO(el): Error is not published
|
||||
return errors.New("no checksum")
|
||||
//return errors.New("no checksum")
|
||||
}
|
||||
|
||||
select {
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ func (binary *Binary) Scan(src interface{}) error {
|
|||
*binary = b
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unable to scan type %T into Binary", src)
|
||||
return fmt.Errorf("unable to scan type %T into Binary", src)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -1,3 +0,0 @@
|
|||
package main
|
||||
|
||||
const version = "v1.0.0-rc1"
|
||||
Loading…
Reference in a new issue