From 41e473b2442a88184b897b652c9f1ffe267a2ebe Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 29 Apr 2021 12:52:56 +0200 Subject: [PATCH 01/32] Remove unused functions --- pkg/utils/utils.go | 76 ---------------------------------------------- 1 file changed, 76 deletions(-) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 7a4b878a..8c25adf5 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -5,17 +5,11 @@ import ( "crypto/sha1" "fmt" "github.com/go-sql-driver/mysql" - "github.com/google/uuid" "github.com/icinga/icingadb/pkg/contracts" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/exp/utf8string" - "io/ioutil" "math" - "math/rand" - "os" "strings" - "sync" "time" "unicode" ) @@ -98,69 +92,10 @@ func BatchSliceOfStrings(ctx context.Context, keys []string, count int) <-chan [ return batches } -func BatchSliceOfInterfaces(ctx context.Context, keys []interface{}, count int) <-chan []interface{} { - batches := make(chan []interface{}) - - go func() { - defer close(batches) - - for i := 0; i < len(keys); i += count { - end := i + count - if end > len(keys) { - end = len(keys) - } - - select { - case batches <- keys[i:end]: - case <-ctx.Done(): - return - } - } - }() - - return batches -} - func IsContextCanceled(err error) bool { return errors.Is(err, context.Canceled) } -func CreateOrRead(name string, callback func() []byte) ([]byte, error) { - info, err := os.Stat(name) - - if os.IsNotExist(err) { - b := callback() - if err := ioutil.WriteFile(name, b, 0660); err != nil { - defer os.Remove(name) - - return nil, errors.Wrap(err, "can't write to file "+name) - } - - return b, nil - } - - if err != nil { - return nil, errors.Wrap(err, "can't read file "+name) - } - - if info.IsDir() { - return nil, errors.Errorf(name + " is a directory") - } - - b, err := ioutil.ReadFile(name) - if err != nil { - return nil, errors.Wrap(err, "can't read file "+name) - } - - return b, nil -} - -func Uuid() []byte { - u := uuid.New() - - return u[:] -} - func Checksum(data interface{}) []byte { var chksm [sha1.Size]byte @@ -194,17 +129,6 @@ func IsDeadlock(err error) bool { return false } -func RandomSleep(sugar *zap.SugaredLogger) { - once := sync.Once{} - once.Do(func() { - rand.Seed(time.Now().UnixNano()) - }) - n := rand.Intn(100) - d := time.Duration(n) * time.Millisecond - sugar.Info("Sleeping for ", d) - time.Sleep(d) -} - var ellipsis = utf8string.NewString("...") // Ellipsize shortens s to <=limit runes and indicates shortening by "...". From f7be60623c344ef417904efcaa69a6ab8c7162d2 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 10 Jun 2021 09:55:46 +0200 Subject: [PATCH 02/32] Use QueryxContext() instead of Query() --- pkg/icingadb/db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index c4b75d54..c13bbd95 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -360,7 +360,7 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF db.logger.Infof("Fetched %d elements of %s in %s", cnt.Val(), utils.Name(v), elapsed) }) - rows, err := db.Queryx(query, args...) + rows, err := db.QueryxContext(ctx, query, args...) if err != nil { return internal.CantPerformQuery(err, query) } From e1d27bd93f1af660110d0034d83faf2323a2ca19 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 10 Jun 2021 13:53:38 +0200 Subject: [PATCH 03/32] Remove unused function PipeError --- pkg/com/com.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/com/com.go b/pkg/com/com.go index 9f163021..9e0a6983 100644 --- a/pkg/com/com.go +++ b/pkg/com/com.go @@ -36,16 +36,6 @@ func ErrgroupReceive(g *errgroup.Group, err <-chan error) { }) } -// PipeError forwards the first non-nil error from in to out -// using a separate goroutine. -func PipeError(in <-chan error, out chan<- error) { - go func() { - if e := <-in; e != nil { - out <- e - } - }() -} - // CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item. func CopyFirst( ctx context.Context, input <-chan contracts.Entity, From d40768ee64292b070e4c4d4bb52b9e8a00999e65 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 10 Jun 2021 13:55:07 +0200 Subject: [PATCH 04/32] Fix different receiver names --- pkg/config/redis.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/config/redis.go b/pkg/config/redis.go index dcd86ca2..717f5624 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -78,23 +78,23 @@ func dialWithLogging(logger *zap.SugaredLogger) func(context.Context, string, st } // UnmarshalYAML implements the yaml.Unmarshaler interface. -func (d *Redis) UnmarshalYAML(unmarshal func(interface{}) error) error { - if err := defaults.Set(d); err != nil { - return errors.Wrapf(err, "can't set defaults %#v", d) +func (r *Redis) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(r); err != nil { + return errors.Wrapf(err, "can't set defaults %#v", r) } // Prevent recursion. type self Redis - if err := unmarshal((*self)(d)); err != nil { - return internal.CantUnmarshalYAML(err, d) + if err := unmarshal((*self)(r)); err != nil { + return internal.CantUnmarshalYAML(err, r) } - if d.MaxHMGetConnections < 1 { + if r.MaxHMGetConnections < 1 { return errors.New("max_hmget_connections must be at least 1") } - if d.HMGetCount < 1 { + if r.HMGetCount < 1 { return errors.New("hmget_count must be at least 1") } - if d.HScanCount < 1 { + if r.HScanCount < 1 { return errors.New("hscan_count must be at least 1") } From ec70babc9164d8f7d96cb956ea3080327f2e9e64 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 10 Jun 2021 13:55:56 +0200 Subject: [PATCH 05/32] Fix typo --- pkg/icingadb/objectpacker/objectpacker.go | 2 +- pkg/types/binary.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/objectpacker/objectpacker.go b/pkg/icingadb/objectpacker/objectpacker.go index 2a3ac36a..eb29ec1b 100644 --- a/pkg/icingadb/objectpacker/objectpacker.go +++ b/pkg/icingadb/objectpacker/objectpacker.go @@ -47,7 +47,7 @@ func PackAny(in interface{}, out io.Writer) error { var tByte = reflect.TypeOf(byte(0)) var tBytes = reflect.TypeOf([]uint8(nil)) -// packValue does the actual job of packAny and just exists for recursion w/o unneccessary reflect.ValueOf calls. +// packValue does the actual job of packAny and just exists for recursion w/o unnecessary reflect.ValueOf calls. func packValue(in reflect.Value, out io.Writer) error { switch kind := in.Kind(); kind { case reflect.Invalid: // nil diff --git a/pkg/types/binary.go b/pkg/types/binary.go index 61b4bd09..00a5417f 100644 --- a/pkg/types/binary.go +++ b/pkg/types/binary.go @@ -41,7 +41,7 @@ func (binary Binary) String() string { // MarshalText implements a custom marhsal function to encode // the Binary as hex. MarshalText implements the -// enconding.TextMarshaler interface. +// encoding.TextMarshaler interface. func (binary Binary) MarshalText() ([]byte, error) { return []byte(binary.String()), nil } From 83866f3a70f6e89fc264d123433acc21f0694f0d Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 10 Jun 2021 13:56:17 +0200 Subject: [PATCH 06/32] Remove unused variables Yes and No --- pkg/types/bool.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pkg/types/bool.go b/pkg/types/bool.go index 9fc49b7b..43302640 100644 --- a/pkg/types/bool.go +++ b/pkg/types/bool.go @@ -10,18 +10,6 @@ import ( "strconv" ) -var ( - Yes = Bool{ - Bool: true, - Valid: true, - } - - No = Bool{ - Bool: false, - Valid: true, - } -) - var ( enum = map[bool]string{ true: "y", From 42935ae962b71165d0d5b8a3b7ad038c9e2420ab Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 10 Jun 2021 13:57:25 +0200 Subject: [PATCH 07/32] Fix comments --- pkg/com/counter.go | 2 +- pkg/types/acknowledgement_state.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/com/counter.go b/pkg/com/counter.go index abc332c4..9c6e5264 100644 --- a/pkg/com/counter.go +++ b/pkg/com/counter.go @@ -2,7 +2,7 @@ package com import "sync/atomic" -// Atomic counter. +// Counter implements an atomic counter. type Counter uint64 // Add adds the given delta to the counter. diff --git a/pkg/types/acknowledgement_state.go b/pkg/types/acknowledgement_state.go index 5793efc6..7e41dc9b 100644 --- a/pkg/types/acknowledgement_state.go +++ b/pkg/types/acknowledgement_state.go @@ -8,7 +8,7 @@ import ( "github.com/pkg/errors" ) -// Acknowledgement specifies an acknowledgement state (yes, no, sticky). +// AcknowledgementState specifies an acknowledgement state (yes, no, sticky). type AcknowledgementState uint8 // UnmarshalText implements the encoding.TextUnmarshaler interface. From 63b8d98237e0bae10995a7b9374fed83ef00746f Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 10 Jun 2021 14:02:48 +0200 Subject: [PATCH 08/32] Always use text as paramter name in UnmarshalText() --- pkg/types/acknowledgement_state.go | 4 ++-- pkg/types/comment_type.go | 12 ++++++------ pkg/types/notification_type.go | 12 ++++++------ pkg/types/state_type.go | 4 ++-- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/types/acknowledgement_state.go b/pkg/types/acknowledgement_state.go index 7e41dc9b..5bff6137 100644 --- a/pkg/types/acknowledgement_state.go +++ b/pkg/types/acknowledgement_state.go @@ -12,8 +12,8 @@ import ( type AcknowledgementState uint8 // UnmarshalText implements the encoding.TextUnmarshaler interface. -func (as *AcknowledgementState) UnmarshalText(bytes []byte) error { - return as.UnmarshalJSON(bytes) +func (as *AcknowledgementState) UnmarshalText(text []byte) error { + return as.UnmarshalJSON(text) } // UnmarshalJSON implements the json.Unmarshaler interface. diff --git a/pkg/types/comment_type.go b/pkg/types/comment_type.go index 4eb65caf..5e88e47c 100644 --- a/pkg/types/comment_type.go +++ b/pkg/types/comment_type.go @@ -29,22 +29,22 @@ func (ct *CommentType) UnmarshalJSON(bytes []byte) error { } // UnmarshalText implements the encoding.TextUnmarshaler interface. -func (ct *CommentType) UnmarshalText(bytes []byte) error { - text := string(bytes) +func (ct *CommentType) UnmarshalText(text []byte) error { + s := string(text) - i, err := strconv.ParseUint(text, 10, 64) + i, err := strconv.ParseUint(s, 10, 64) if err != nil { - return internal.CantParseUint64(err, text) + return internal.CantParseUint64(err, s) } c := CommentType(i) if uint64(c) != i { // Truncated due to above cast, obviously too high - return badCommentType(text) + return badCommentType(s) } if _, ok := commentTypes[c]; !ok { - return badCommentType(text) + return badCommentType(s) } *ct = c diff --git a/pkg/types/notification_type.go b/pkg/types/notification_type.go index 872d5a13..f2980f4e 100644 --- a/pkg/types/notification_type.go +++ b/pkg/types/notification_type.go @@ -12,22 +12,22 @@ import ( type NotificationType uint16 // UnmarshalText implements the encoding.TextUnmarshaler interface. -func (nt *NotificationType) UnmarshalText(bytes []byte) error { - text := string(bytes) +func (nt *NotificationType) UnmarshalText(text []byte) error { + s := string(text) - i, err := strconv.ParseUint(text, 10, 64) + i, err := strconv.ParseUint(s, 10, 64) if err != nil { - return internal.CantParseUint64(err, text) + return internal.CantParseUint64(err, s) } n := NotificationType(i) if uint64(n) != i { // Truncated due to above cast, obviously too high - return badNotificationType(text) + return badNotificationType(s) } if _, ok := notificationTypes[n]; !ok { - return badNotificationType(text) + return badNotificationType(s) } *nt = n diff --git a/pkg/types/state_type.go b/pkg/types/state_type.go index c04f7f2e..8a24819e 100644 --- a/pkg/types/state_type.go +++ b/pkg/types/state_type.go @@ -12,8 +12,8 @@ import ( type StateType uint8 // UnmarshalText implements the encoding.TextUnmarshaler interface. -func (st *StateType) UnmarshalText(bytes []byte) error { - return st.UnmarshalJSON(bytes) +func (st *StateType) UnmarshalText(text []byte) error { + return st.UnmarshalJSON(text) } // UnmarshalJSON implements the json.Unmarshaler interface. From f3f07a29cce28c7ebb5281678a55cb884bdaf89b Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 10 Jun 2021 14:22:23 +0200 Subject: [PATCH 09/32] Always use data as paramter name in UnmarshalJSON() --- pkg/types/comment_type.go | 6 +++--- pkg/types/notification_states.go | 4 ++-- pkg/types/notification_types.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/types/comment_type.go b/pkg/types/comment_type.go index 5e88e47c..8aed4759 100644 --- a/pkg/types/comment_type.go +++ b/pkg/types/comment_type.go @@ -13,15 +13,15 @@ import ( type CommentType uint8 // UnmarshalJSON implements the json.Unmarshaler interface. -func (ct *CommentType) UnmarshalJSON(bytes []byte) error { +func (ct *CommentType) UnmarshalJSON(data []byte) error { var i uint8 - if err := internal.UnmarshalJSON(bytes, &i); err != nil { + if err := internal.UnmarshalJSON(data, &i); err != nil { return err } c := CommentType(i) if _, ok := commentTypes[c]; !ok { - return badCommentType(bytes) + return badCommentType(data) } *ct = c diff --git a/pkg/types/notification_states.go b/pkg/types/notification_states.go index f6e91119..ff5760a3 100644 --- a/pkg/types/notification_states.go +++ b/pkg/types/notification_states.go @@ -12,9 +12,9 @@ import ( type NotificationStates uint8 // UnmarshalJSON implements the json.Unmarshaler interface. -func (nst *NotificationStates) UnmarshalJSON(bytes []byte) error { +func (nst *NotificationStates) UnmarshalJSON(data []byte) error { var states []string - if err := internal.UnmarshalJSON(bytes, &states); err != nil { + if err := internal.UnmarshalJSON(data, &states); err != nil { return err } diff --git a/pkg/types/notification_types.go b/pkg/types/notification_types.go index 2406dae4..832a515c 100644 --- a/pkg/types/notification_types.go +++ b/pkg/types/notification_types.go @@ -12,9 +12,9 @@ import ( type NotificationTypes uint16 // UnmarshalJSON implements the json.Unmarshaler interface. -func (nt *NotificationTypes) UnmarshalJSON(bytes []byte) error { +func (nt *NotificationTypes) UnmarshalJSON(data []byte) error { var types []string - if err := internal.UnmarshalJSON(bytes, &types); err != nil { + if err := internal.UnmarshalJSON(data, &types); err != nil { return err } From 0b1610c69b626d19ad7e9975a82142dc532a5c23 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 10 Jun 2021 14:29:50 +0200 Subject: [PATCH 10/32] Use cancelCtx() instead of just cancel() --- pkg/icingadb/ha.go | 22 +++++++++++----------- pkg/icingaredis/heartbeat.go | 18 +++++++++--------- pkg/retry/retry.go | 6 +++--- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 2a7d7133..29324776 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -22,7 +22,7 @@ var timeout = 60 * time.Second type HA struct { ctx context.Context - cancel context.CancelFunc + cancelCtx context.CancelFunc instanceId types.Binary db *DB heartbeat *icingaredis.Heartbeat @@ -37,13 +37,13 @@ type HA struct { } func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA { - ctx, cancel := context.WithCancel(ctx) + ctx, cancelCtx := context.WithCancel(ctx) instanceId := uuid.New() ha := &HA{ ctx: ctx, - cancel: cancel, + cancelCtx: cancelCtx, instanceId: instanceId[:], db: db, heartbeat: heartbeat, @@ -62,7 +62,7 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger // Close implements the io.Closer interface. func (h *HA) Close() error { // Cancel ctx. - h.cancel() + h.cancelCtx() // Wait until the controller loop ended. <-h.Done() // Remove our instance from the database. @@ -96,7 +96,7 @@ func (h *HA) abort(err error) { h.err = errors.Wrap(err, "HA aborted") h.mu.Unlock() - h.cancel() + h.cancelCtx() }) } @@ -168,18 +168,18 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo sleep := boff(uint64(attempt)) time.Sleep(sleep) - ctx, cancel := context.WithCancel(h.ctx) + ctx, cancelCtx := context.WithCancel(h.ctx) tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{ Isolation: sql.LevelSerializable, }) if err != nil { - cancel() + cancelCtx() return errors.Wrap(err, "can't start transaction") } query := `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?` rows, err := tx.QueryxContext(ctx, query, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout))) if err != nil { - cancel() + cancelCtx() return internal.CantPerformQuery(err, query) } takeover := true @@ -222,7 +222,7 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo _, err = tx.NamedExecContext(ctx, stmt, i) if err != nil { - cancel() + cancelCtx() err = internal.CantPerformQuery(err, stmt) if !utils.IsDeadlock(err) { h.logger.Errorw("Can't update or insert instance", zap.Error(err)) @@ -239,14 +239,14 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo } if err := tx.Commit(); err != nil { - cancel() + cancelCtx() return errors.Wrap(err, "can't commit transaction") } if takeover { h.signalTakeover() } - cancel() + cancelCtx() break } diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 888ffffb..9559e6c6 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -22,7 +22,7 @@ var timeout = 60 * time.Second type Heartbeat struct { active bool beat *com.Cond - cancel context.CancelFunc + cancelCtx context.CancelFunc client *Client done chan struct{} errMu sync.Mutex @@ -35,15 +35,15 @@ type Heartbeat struct { // NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger) *Heartbeat { - ctx, cancel := context.WithCancel(ctx) + ctx, cancelCtx := context.WithCancel(ctx) heartbeat := &Heartbeat{ - beat: com.NewCond(ctx), - cancel: cancel, - client: client, - done: make(chan struct{}), - logger: logger, - lost: com.NewCond(ctx), + beat: com.NewCond(ctx), + cancelCtx: cancelCtx, + client: client, + done: make(chan struct{}), + logger: logger, + lost: com.NewCond(ctx), } go heartbeat.controller(ctx) @@ -59,7 +59,7 @@ func (h *Heartbeat) Beat() <-chan struct{} { // Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. // Implements the io.Closer interface. func (h *Heartbeat) Close() error { - h.cancel() + h.cancelCtx() <-h.Done() return h.Err() diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index eb53dfb3..e052c860 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -20,9 +20,9 @@ func WithBackoff( ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, timeout time.Duration, ) (err error) { if timeout > 0 { - var cancel func() - ctx, cancel = context.WithTimeout(ctx, timeout) - defer cancel() + var cancelCtx context.CancelFunc + ctx, cancelCtx = context.WithTimeout(ctx, timeout) + defer cancelCtx() } for attempt := 0; ; /* true */ attempt++ { From 858dbe7481d61b60079f4ca81812162247a5d88b Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Mon, 14 Jun 2021 11:44:24 +0200 Subject: [PATCH 11/32] Remove config.ValidateFile() YAML already complains that the file is a directory: "can't parse YAML file pkg: yaml: input error: read pkg: is a directory" --- pkg/config/config.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index e5c90246..bbdd7e5f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -39,20 +39,6 @@ func FromYAMLFile(name string) (*Config, error) { return c, nil } -// ValidateFile checks whether the given file name is a readable file. -func ValidateFile(name string) error { - f, err := os.Stat(name) - if err != nil { - return errors.Wrap(err, "can't read file "+name) - } - - if f.IsDir() { - return errors.New(name + " is a directory") - } - - return nil -} - // ParseFlags parses CLI flags and // returns a Flags value created from them. func ParseFlags() (*Flags, error) { @@ -63,9 +49,5 @@ func ParseFlags() (*Flags, error) { return nil, errors.Wrap(err, "can't parse CLI flags") } - if err := ValidateFile(f.Config); err != nil { - return nil, errors.Wrap(err, "invalid config file "+f.Config) - } - return f, nil } From ee36691f3fbc344deb85310e281a0677c58c5f21 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Mon, 14 Jun 2021 12:36:02 +0200 Subject: [PATCH 12/32] Remove --datadir config flag It's currently not used anywhere. --- pkg/config/config.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index bbdd7e5f..c6f619c9 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -17,8 +17,6 @@ type Config struct { type Flags struct { // Config is the path to the config file Config string `short:"c" long:"config" description:"path to config file" required:"true" default:"./config.yml"` - // Datadir is the location of the data directory - Datadir string `long:"datadir" description:"path to the data directory" required:"true" default:"./"` } // FromYAMLFile returns a new Config value created from the given YAML config file. From 7bda89e79d458143f7cafe5d6be8a6cbb03f6c0b Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Mon, 14 Jun 2021 12:43:53 +0200 Subject: [PATCH 13/32] Return error instead of panicking --- pkg/icingaredis/utils.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go index 77dc3b31..bd4817b4 100644 --- a/pkg/icingaredis/utils.go +++ b/pkg/icingaredis/utils.go @@ -87,9 +87,7 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu if checksumer, ok := checksums[entity.ID().String()]; ok { entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum()) } else { - panic("no checksum") - // TODO(el): Error is not published - //return errors.New("no checksum") + return errors.Errorf("no checksum for %#v", entity) } select { From 270f1930aad7cefca00b47a44ce8d03080c537b9 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Mon, 21 Jun 2021 22:31:59 +0200 Subject: [PATCH 14/32] Remove useless comments --- pkg/retry/retry.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index e052c860..60b756df 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -29,7 +29,6 @@ func WithBackoff( prevErr := err if err = retryableFunc(ctx); err == nil { - // No error. return } @@ -40,7 +39,6 @@ func WithBackoff( } if !isRetryable { - // Not retryable. err = errors.Wrap(err, "can't retry") return @@ -49,7 +47,6 @@ func WithBackoff( sleep := b(uint64(attempt)) select { case <-ctx.Done(): - // Context canceled. Return last known error. if err == nil { err = ctx.Err() } @@ -57,7 +54,6 @@ func WithBackoff( return case <-time.After(sleep): - // Wait for backoff duration and continue. } } } From ffd66c5333f1b04a2d23250bcbb46e9d31a52753 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Mon, 21 Jun 2021 22:47:06 +0200 Subject: [PATCH 15/32] Add newline before return --- cmd/icingadb/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 75b5c52f..daf7461b 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -113,6 +113,7 @@ func run() int { case <-dump.InProgress(): logger.Info("Icinga 2 started a new config dump, waiting for it to complete") cancelSynctx() + return nil case <-synctx.Done(): return synctx.Err() @@ -237,14 +238,15 @@ func run() int { // otherwise there is no way to get Icinga DB back into a working state. logger.Fatalf("%+v", errors.New("HA exited without an error but main context isn't cancelled")) } - cancelHactx() + return ExitFailure case <-ctx.Done(): logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) case s := <-sig: logger.Infow("Exiting due to signal", zap.String("signal", s.String())) cancelHactx() + return ExitSuccess } } From 4977d8f1f4bbe9dbb081aa1a5cc02e76ec5fb122 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Mon, 21 Jun 2021 22:54:52 +0200 Subject: [PATCH 16/32] Add missing doc in command --- internal/command/command.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/command/command.go b/internal/command/command.go index 38718218..2becd2f7 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -9,12 +9,14 @@ import ( "go.uber.org/zap" ) +// Command provides factories for creating Redis and Database connections from Config. type Command struct { Flags *config.Flags Config *config.Config Logger *zap.SugaredLogger } +// New creates and returns a new Command, parses CLI flags and YAML the config, and initializes the logger. func New() *Command { flags, err := config.ParseFlags() if err != nil { @@ -42,6 +44,7 @@ func New() *Command { } } +// Database creates and returns a new icingadb.DB connection from config.Config. func (c Command) Database() *icingadb.DB { db, err := c.Config.Database.Open(c.Logger) if err != nil { @@ -51,6 +54,7 @@ func (c Command) Database() *icingadb.DB { return db } +// Redis creates and returns a new icingaredis.Client connection from config.Config. func (c Command) Redis() *icingaredis.Client { rc, err := c.Config.Redis.NewClient(c.Logger) if err != nil { From d1c20b6946e8086ea01821040d466efb68d5d257 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 22 Jun 2021 08:50:30 +0200 Subject: [PATCH 17/32] Add missing doc in db --- pkg/icingadb/db.go | 70 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index c13bbd95..e471d6dc 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -33,8 +33,14 @@ type DB struct { tableSemaphoresMu sync.Mutex } +// Options define user configurable database options. type Options struct { - MaxConnections int `yaml:"max_connections" default:"16"` + // Maximum number of open connections to the database. + MaxConnections int `yaml:"max_connections" default:"16"` + + // Maximum number of connections per table, + // regardless of what the connection is actually doing, + // e.g. INSERT, UPDATE, DELETE. MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"` // MaxPlaceholdersPerStatement defines the maximum number of placeholders in an @@ -59,6 +65,7 @@ func NewDb(db *sqlx.DB, logger *zap.SugaredLogger, options *Options) *DB { } } +// BuildColumns returns all columns of the given struct. func (db *DB) BuildColumns(subject interface{}) []string { fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names columns := make([]string, 0, len(fields)) @@ -72,6 +79,7 @@ func (db *DB) BuildColumns(subject interface{}) []string { return columns } +// BuildDeleteStmt returns a DELETE statement for the given struct. func (db *DB) BuildDeleteStmt(from interface{}) string { return fmt.Sprintf( `DELETE FROM %s WHERE id IN (?)`, @@ -79,6 +87,7 @@ func (db *DB) BuildDeleteStmt(from interface{}) string { ) } +// BuildInsertStmt returns an INSERT INTO statement for the given struct. func (db *DB) BuildInsertStmt(into interface{}) (string, int) { columns := db.BuildColumns(into) @@ -90,14 +99,17 @@ func (db *DB) BuildInsertStmt(into interface{}) (string, int) { ), len(columns) } -func (db *DB) BuildSelectStmt(from interface{}, into interface{}) string { +// BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct +// and the column list from the specified columns struct. +func (db *DB) BuildSelectStmt(table interface{}, columns interface{}) string { return fmt.Sprintf( `SELECT %s FROM %s`, - strings.Join(db.BuildColumns(into), ", "), - utils.TableName(from), + strings.Join(db.BuildColumns(columns), ", "), + utils.TableName(table), ) } +// BuildUpdateStmt returns an UPDATE statement for the given struct. func (db *DB) BuildUpdateStmt(update interface{}) (string, int) { columns := db.BuildColumns(update) set := make([]string, 0, len(columns)) @@ -113,6 +125,7 @@ func (db *DB) BuildUpdateStmt(update interface{}) (string, int) { ), len(columns) + 1 // +1 because of WHERE id = :id } +// BuildUpsertStmt returns an upsert statement for the given struct. func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) { insertColumns := db.BuildColumns(subject) var updateColumns []string @@ -138,6 +151,11 @@ func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders in ), len(insertColumns) } +// BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`. +// Takes in up to the number of arguments specified in count from the arg stream, +// derives and expands a query and executes it with this set of arguments until the arg stream has been processed. +// The derived queries are executed in a separate goroutine with a weighting of 1 +// and can be executed concurrently to the extent allowed by the semaphore passed in sem. func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) @@ -193,6 +211,13 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph return g.Wait() } +// NamedBulkExec bulk executes queries with named placeholders in a VALUES clause most likely +// in the format INSERT ... VALUES. Takes in up to the number of entities specified in count +// from the arg stream, derives and executes a new query with the VALUES clause expanded to +// this set of arguments, until the arg stream has been processed. +// The queries are executed in a separate goroutine with a weighting of 1 +// and can be executed concurrently to the extent allowed by the semaphore passed in sem. +// Entities for which the query ran successfully will be streamed on the succeeded channel. func (db *DB) NamedBulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, @@ -207,11 +232,6 @@ func (db *DB) NamedBulkExec( }) g.Go(func() error { - // stmt, err := db.PrepareNamedContext(ctx, query) - // if err != nil { - // return err - // } - for { select { case b, ok := <-bulk: @@ -265,6 +285,12 @@ func (db *DB) NamedBulkExec( return g.Wait() } +// NamedBulkExecTx bulk executes queries with named placeholders in separate transactions. +// Takes in up to the number of entities specified in count from the arg stream and +// executes a new transaction that runs a new query for each entity in this set of arguments, +// until the arg stream has been processed. +// The transactions are executed in a separate goroutine with a weighting of 1 +// and can be executed concurrently to the extent allowed by the semaphore passed in sem. func (db *DB) NamedBulkExecTx( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, ) error { @@ -346,6 +372,9 @@ func (db *DB) BatchSizeByPlaceholders(n int) int { return 1 } +// YieldAll executes the query with the supplied args, +// scans each resulting row into an entity returned by the factory function, +// and streams them into a returned channel. func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, args ...interface{}) (<-chan contracts.Entity, <-chan error) { var cnt com.Counter entities := make(chan contracts.Entity, 1) @@ -387,6 +416,10 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF return entities, com.WaitAsync(g) } +// CreateStreamed bulk creates the specified entities via NamedBulkExec. +// The insert statement is created using BuildInsertStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { @@ -399,6 +432,10 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil) } +// UpsertStreamed bulk upserts the specified entities via NamedBulkExec. +// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { @@ -411,6 +448,10 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded) } +// UpdateStreamed bulk updates the specified entities via NamedBulkExecTx. +// The update statement is created using BuildUpdateStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxRowsPerTransaction and +// concurrency is controlled via Options.MaxConnectionsPerTable. func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { @@ -422,11 +463,17 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti return db.NamedBulkExecTx(ctx, stmt, db.options.MaxRowsPerTransaction, sem, forward) } +// DeleteStreamed bulk deletes the specified ids via BulkExec. +// The delete statement is created using BuildDeleteStmt with the passed entityType. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error { sem := db.getSemaphoreForTable(utils.TableName(entityType)) return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.options.MaxPlaceholdersPerStatement, sem, ids) } +// Delete creates a channel from the specified ids and +// bulk deletes them by passing the channel along with the entityType to DeleteStreamed. func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error { idsCh := make(chan interface{}, len(ids)) for _, id := range ids { @@ -437,6 +484,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int return db.DeleteStreamed(ctx, entityType, idsCh) } +// IsRetryable checks whether the given error is retryable. func IsRetryable(err error) bool { if errors.Is(err, driver.ErrBadConn) { return true @@ -451,8 +499,8 @@ func IsRetryable(err error) bool { switch e.Number { case 1053, 1205, 1213, 2006: // 1053: Server shutdown in progress - // 1205: - // 1213: + // 1205: Lock wait timeout + // 1213: Deadlock found when trying to get lock // 2006: MySQL server has gone away return true } From 7d59a98f90941167660ec5b5aa32ba2da4e70df7 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 22 Jun 2021 09:02:40 +0200 Subject: [PATCH 18/32] Add missing doc in utils --- pkg/utils/utils.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 8c25adf5..74d33bfe 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -65,10 +65,21 @@ func Key(name string, sep byte) string { return b.String() } +// Timed calls the given callback with the time that has elapsed since the start. +// +// Timed should be installed by defer: +// +// func TimedExample(logger *zap.SugaredLogger) { +// defer utils.Timed(time.Now(), func(elapsed time.Duration) { +// logger.Debugf("Executed job in %s", elapsed) +// }) +// job() +// } func Timed(start time.Time, callback func(elapsed time.Duration)) { callback(time.Since(start)) } +// BatchSliceOfStrings groups the given keys into chunks of size count and streams them into a returned channel. func BatchSliceOfStrings(ctx context.Context, keys []string, count int) <-chan []string { batches := make(chan []string) @@ -92,10 +103,12 @@ func BatchSliceOfStrings(ctx context.Context, keys []string, count int) <-chan [ return batches } +// IsContextCanceled returns whether the given error is context.Canceled. func IsContextCanceled(err error) bool { return errors.Is(err, context.Canceled) } +// Checksum returns the SHA-1 checksum of the data. func Checksum(data interface{}) []byte { var chksm [sha1.Size]byte @@ -111,8 +124,8 @@ func Checksum(data interface{}) []byte { return chksm[:] } +// Fatal panics with the given error. func Fatal(err error) { - // TODO(el): Print stacktrace via some recover() magic? panic(err) } From fee30380d517cfed9e5040aa548e2de40b4e6e78 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 22 Jun 2021 14:53:12 +0200 Subject: [PATCH 19/32] Add missing doc in client --- pkg/icingaredis/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index c6da3573..01a765d7 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -24,6 +24,7 @@ type Client struct { options *Options } +// Options define user configurable Redis options. type Options struct { Timeout time.Duration `yaml:"timeout" default:"30s"` MaxHMGetConnections int `yaml:"max_hmget_connections" default:"4096"` From 92bc1b26c784756373bb2424bc110232b0b56646 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 22 Jun 2021 14:53:33 +0200 Subject: [PATCH 20/32] Add missing doc in redis utils --- pkg/icingaredis/utils.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go index bd4817b4..9176dba7 100644 --- a/pkg/icingaredis/utils.go +++ b/pkg/icingaredis/utils.go @@ -31,6 +31,9 @@ func (s Streams) Option() []string { return append(streams, ids...) } +// CreateEntities streams and creates entities from the +// given Redis field value pairs using the specified factory function, +// and streams them on a returned channel. func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) { entities := make(chan contracts.Entity) g, ctx := errgroup.WithContext(ctx) @@ -72,6 +75,9 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc return entities, com.WaitAsync(g) } +// SetChecksums concurrently streams from the given entities and +// sets their checksums using the specified map and +// streams the results on a returned channel. func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) { entitiesWithChecksum := make(chan contracts.Entity) g, ctx := errgroup.WithContext(ctx) @@ -107,7 +113,8 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu return entitiesWithChecksum, com.WaitAsync(g) } -// WrapCmdErr adds the command itself and the stack of the current goroutine to the command's error if any. +// WrapCmdErr adds the command itself and +// the stack of the current goroutine to the command's error if any. func WrapCmdErr(cmd redis.Cmder) error { err := cmd.Err() if err != nil { From ff88cb73f740ca67bf534a3a1cbb029761725c45 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 22 Jun 2021 14:53:42 +0200 Subject: [PATCH 21/32] Add missing doc in icinga_status --- pkg/icingaredis/v1/icinga_status.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/icingaredis/v1/icinga_status.go b/pkg/icingaredis/v1/icinga_status.go index ccd52526..9243926f 100644 --- a/pkg/icingaredis/v1/icinga_status.go +++ b/pkg/icingaredis/v1/icinga_status.go @@ -5,6 +5,7 @@ import ( "github.com/icinga/icingadb/pkg/types" ) +// IcingaStatus defines Icinga status information. type IcingaStatus struct { Environment string `json:"environment"` NodeName string `json:"node_name"` @@ -19,6 +20,7 @@ type IcingaStatus struct { PerformanceDataEnabled types.Bool `json:"enable_perfdata"` } +// EnvironmentID returns the environment ID. func (s *IcingaStatus) EnvironmentID() types.Binary { chksm := sha1.Sum([]byte(s.Environment)) From bf415f2e1cdf9136fa011e88fef9d4872da367ef Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 22 Jun 2021 14:53:52 +0200 Subject: [PATCH 22/32] Add missing doc in stats_message --- pkg/icingaredis/v1/stats_message.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/icingaredis/v1/stats_message.go b/pkg/icingaredis/v1/stats_message.go index 817d0a21..5b046292 100644 --- a/pkg/icingaredis/v1/stats_message.go +++ b/pkg/icingaredis/v1/stats_message.go @@ -9,10 +9,12 @@ import ( // StatsMessage represents a message from the Redis stream icinga:stats. type StatsMessage map[string]interface{} +// Raw returns the key-value pairs of the message. func (m StatsMessage) Raw() map[string]interface{} { return m } +// IcingaStatus extracts Icinga status information from the message into IcingaStatus and returns it. func (m StatsMessage) IcingaStatus() (*IcingaStatus, error) { if s, ok := m["IcingaApplication"].(string); ok { var envelope struct { @@ -33,6 +35,7 @@ func (m StatsMessage) IcingaStatus() (*IcingaStatus, error) { return nil, errors.Errorf(`bad message %#v. "IcingaApplication" missing`, m) } +// Time extracts the timestamp of the message into types.UnixMilli and returns it. func (m StatsMessage) Time() (*types.UnixMilli, error) { if s, ok := m["timestamp"].(string); ok { var t types.UnixMilli From 2c3a58e3659f0a4f90daa667bcfc6490b259289b Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 22 Jun 2021 15:27:24 +0200 Subject: [PATCH 23/32] Add missing doc in bulker --- pkg/com/bulker.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go index d209deab..8dcfecb4 100644 --- a/pkg/com/bulker.go +++ b/pkg/com/bulker.go @@ -7,12 +7,14 @@ import ( "time" ) +// Bulker reads all values from a channel and streams them in chunks into a Bulk channel. type Bulker struct { ch chan []interface{} ctx context.Context mu sync.Mutex } +// NewBulker returns a new Bulker and starts streaming. func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker { b := &Bulker{ ch: make(chan []interface{}), @@ -89,6 +91,7 @@ func (b *Bulker) run(ch <-chan interface{}, count int) { _ = g.Wait() } +// Bulk reads all values from a channel and streams them in chunks into a returned channel. func Bulk(ctx context.Context, ch <-chan interface{}, count int) <-chan []interface{} { return NewBulker(ctx, ch, count).Bulk() } From a13788073d2e1b7ad69a16de95729ee4578e0e6b Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 22 Jun 2021 15:27:36 +0200 Subject: [PATCH 24/32] Add missing doc entitiy_bulker --- pkg/com/entity_bulker.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/com/entity_bulker.go b/pkg/com/entity_bulker.go index 1377a3c8..b4911c2e 100644 --- a/pkg/com/entity_bulker.go +++ b/pkg/com/entity_bulker.go @@ -8,12 +8,14 @@ import ( "time" ) +// EntityBulker reads all entities from a channel and streams them in chunks into a Bulk channel. type EntityBulker struct { ch chan []contracts.Entity ctx context.Context mu sync.Mutex } +// NewEntityBulker returns a new EntityBulker and starts streaming. func NewEntityBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *EntityBulker { b := &EntityBulker{ ch: make(chan []contracts.Entity), @@ -90,6 +92,7 @@ func (b *EntityBulker) run(ch <-chan contracts.Entity, count int) { _ = g.Wait() } +// BulkEntities reads all entities from a channel and streams them in chunks into a returned channel. func BulkEntities(ctx context.Context, ch <-chan contracts.Entity, count int) <-chan []contracts.Entity { return NewEntityBulker(ctx, ch, count).Bulk() } From 1fda4ea6ee77a9399d18899cda2e2e4d2d20dd06 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 22 Jun 2021 18:30:21 +0200 Subject: [PATCH 25/32] Rename start() to run() --- pkg/icingadb/delta.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 0f2ace67..89e1af5e 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -28,7 +28,7 @@ func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subj logger: logger, } - go delta.start(ctx, actual, desired) + go delta.run(ctx, actual, desired) return delta } @@ -37,7 +37,7 @@ func (delta *Delta) Wait() error { return <-delta.done } -func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) { +func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) { defer close(delta.done) var update EntitiesById From fc5e2882ff1eaf5a2969657fd73973df0585aef1 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 23 Jun 2021 09:33:23 +0200 Subject: [PATCH 26/32] Add missing doc in delta --- pkg/icingadb/delta.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 89e1af5e..59c9eaab 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -12,6 +12,7 @@ import ( "time" ) +// Delta calculates the delta of actual and desired entities, and stores which entities need to be created, updated, and deleted. type Delta struct { Create EntitiesById Update EntitiesById @@ -21,6 +22,7 @@ type Delta struct { logger *zap.SugaredLogger } +// NewDelta creates a new Delta and starts calculating it. func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta { delta := &Delta{ Subject: subject, @@ -33,6 +35,7 @@ func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subj return delta } +// Wait waits for the delta calculation to complete and returns an error, if any. func (delta *Delta) Wait() error { return <-delta.done } From 1d361594ee6aceb97411908090ac604abc570e24 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 23 Jun 2021 09:33:23 +0200 Subject: [PATCH 27/32] Add missing doc in dump_signals --- pkg/icingadb/dump_signals.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/icingadb/dump_signals.go b/pkg/icingadb/dump_signals.go index 791755f5..cfc82c60 100644 --- a/pkg/icingadb/dump_signals.go +++ b/pkg/icingadb/dump_signals.go @@ -9,6 +9,8 @@ import ( "sync" ) +// DumpSignals reads dump signals from a Redis stream via Listen. +// Dump-done signals are passed on via Done channels, while InProgress must be checked for dump-wip signals. type DumpSignals struct { redis *icingaredis.Client logger *zap.SugaredLogger @@ -18,6 +20,7 @@ type DumpSignals struct { inProgressCh chan struct{} } +// NewDumpSignals returns new DumpSignals. func NewDumpSignals(redis *icingaredis.Client, logger *zap.SugaredLogger) *DumpSignals { return &DumpSignals{ redis: redis, From ac0f26e59bba49221e5497f687d3d71d06cc198b Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 23 Jun 2021 09:33:23 +0200 Subject: [PATCH 28/32] Add missing doc in entitiesbyid --- pkg/icingadb/entitiesbyid.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/icingadb/entitiesbyid.go b/pkg/icingadb/entitiesbyid.go index f8f71be7..b40050e1 100644 --- a/pkg/icingadb/entitiesbyid.go +++ b/pkg/icingadb/entitiesbyid.go @@ -5,8 +5,10 @@ import ( "github.com/icinga/icingadb/pkg/contracts" ) +// EntitiesById is a map of key-contracts.Entity pairs. type EntitiesById map[string]contracts.Entity +// Keys returns the keys. func (ebi EntitiesById) Keys() []string { keys := make([]string, 0, len(ebi)) for k := range ebi { @@ -16,6 +18,7 @@ func (ebi EntitiesById) Keys() []string { return keys } +// IDs returns the contracts.ID of the entities. func (ebi EntitiesById) IDs() []interface{} { ids := make([]interface{}, 0, len(ebi)) for _, v := range ebi { @@ -25,6 +28,7 @@ func (ebi EntitiesById) IDs() []interface{} { return ids } +// Entities streams the entities on a returned channel. func (ebi EntitiesById) Entities(ctx context.Context) <-chan contracts.Entity { entities := make(chan contracts.Entity) From 77ab2753f990cf2c57ba027585e85e38c89e6ef1 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 23 Jun 2021 09:33:23 +0200 Subject: [PATCH 29/32] Add missing doc in ha --- pkg/icingadb/ha.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 29324776..c3fb8a84 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -20,6 +20,7 @@ import ( var timeout = 60 * time.Second +// HA provides high availability and indicates whether a Takeover or Handover must be made. type HA struct { ctx context.Context cancelCtx context.CancelFunc @@ -36,6 +37,7 @@ type HA struct { errOnce sync.Once } +// NewHA returns a new HA and starts the controller loop. func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA { ctx, cancelCtx := context.WithCancel(ctx) @@ -71,10 +73,12 @@ func (h *HA) Close() error { return h.Err() } +// Done returns a channel that's closed when the HA controller loop ended. func (h *HA) Done() <-chan struct{} { return h.done } +// Err returns an error if Done has been closed and there is an error. Otherwise returns nil. func (h *HA) Err() error { h.mu.Lock() defer h.mu.Unlock() @@ -82,10 +86,12 @@ func (h *HA) Err() error { return h.err } +// Handover returns a channel with which handovers are signaled. func (h *HA) Handover() chan struct{} { return h.handover } +// Takeover returns a channel with which takeovers are signaled. func (h *HA) Takeover() chan struct{} { return h.takeover } From a4b77c6a45ea7ac05aff8f39b8dde962fed52d76 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 23 Jun 2021 09:33:24 +0200 Subject: [PATCH 30/32] Add missing doc in sync --- pkg/icingadb/sync.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 8682e05e..5101e86e 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -22,6 +22,7 @@ type Sync struct { logger *zap.SugaredLogger } +// NewSync returns a new Sync. func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync { return &Sync{ db: db, From 1d5ae198aa951041b03a83955837e6663aa17abd Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 23 Jun 2021 09:33:24 +0200 Subject: [PATCH 31/32] Add missing doc in meta --- pkg/icingadb/v1/history/meta.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/icingadb/v1/history/meta.go b/pkg/icingadb/v1/history/meta.go index 39c6d9b0..b2c817f9 100644 --- a/pkg/icingadb/v1/history/meta.go +++ b/pkg/icingadb/v1/history/meta.go @@ -5,11 +5,13 @@ import ( "github.com/icinga/icingadb/pkg/types" ) +// UpserterEntity provides upsert for entities. type UpserterEntity interface { contracts.Upserter contracts.Entity } +// HistoryTableEntity is embedded by every concrete history type that has its own table. type HistoryTableEntity struct { Id types.UUID `json:"id"` } @@ -35,6 +37,7 @@ func (hte HistoryTableEntity) Upsert() interface{} { return hte } +// HistoryEntity is embedded by every concrete history type. type HistoryEntity struct { Id types.UUID `json:"event_id"` } @@ -60,6 +63,7 @@ func (he HistoryEntity) Upsert() interface{} { return he } +// HistoryTableMeta is embedded by every concrete history type that has its own table. type HistoryTableMeta struct { EnvironmentId types.Binary `json:"environment_id"` EndpointId types.Binary `json:"endpoint_id"` @@ -68,6 +72,7 @@ type HistoryTableMeta struct { ServiceId types.Binary `json:"service_id"` } +// HistoryMeta is embedded by every concrete history type that belongs to the history table. type HistoryMeta struct { HistoryEntity `json:",inline"` EnvironmentId types.Binary `json:"environment_id"` From 0a521a6e4a7b15c540d895d460cd7eee55d7512a Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 23 Jun 2021 09:33:24 +0200 Subject: [PATCH 32/32] Add missing doc in meta --- pkg/icingadb/v1/meta.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/icingadb/v1/meta.go b/pkg/icingadb/v1/meta.go index 89c0de40..9266751e 100644 --- a/pkg/icingadb/v1/meta.go +++ b/pkg/icingadb/v1/meta.go @@ -57,12 +57,14 @@ func (n *NameCiMeta) Init() { n.NameCi = &n.Name } +// CustomvarMeta is embedded by every type with custom variables. type CustomvarMeta struct { EntityWithoutChecksum `json:",inline"` EnvironmentMeta `json:",inline"` CustomvarId types.Binary `json:"customvar_id"` } +// GroupMeta is embedded by every type that represents a specific group. type GroupMeta struct { EntityWithChecksum `json:",inline"` EnvironmentMeta `json:",inline"` @@ -71,6 +73,7 @@ type GroupMeta struct { ZoneId types.Binary `json:"zone_id"` } +// MemberMeta is embedded by every type that represents members of a specific group. type MemberMeta struct { EntityWithoutChecksum `json:",inline"` EnvironmentMeta `json:",inline"`