Merge pull request #299 from Icinga/code-quality

Code quality
This commit is contained in:
Eric Lippmann 2021-08-09 10:41:20 +02:00 committed by GitHub
commit b1ec497c2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 187 additions and 200 deletions

View file

@ -113,6 +113,7 @@ func run() int {
case <-dump.InProgress():
logger.Info("Icinga 2 started a new config dump, waiting for it to complete")
cancelSynctx()
return nil
case <-synctx.Done():
return synctx.Err()
@ -237,14 +238,15 @@ func run() int {
// otherwise there is no way to get Icinga DB back into a working state.
logger.Fatalf("%+v", errors.New("HA exited without an error but main context isn't cancelled"))
}
cancelHactx()
return ExitFailure
case <-ctx.Done():
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
case s := <-sig:
logger.Infow("Exiting due to signal", zap.String("signal", s.String()))
cancelHactx()
return ExitSuccess
}
}

View file

@ -9,12 +9,14 @@ import (
"go.uber.org/zap"
)
// Command provides factories for creating Redis and Database connections from Config.
type Command struct {
Flags *config.Flags
Config *config.Config
Logger *zap.SugaredLogger
}
// New creates and returns a new Command, parses CLI flags and YAML the config, and initializes the logger.
func New() *Command {
flags, err := config.ParseFlags()
if err != nil {
@ -42,6 +44,7 @@ func New() *Command {
}
}
// Database creates and returns a new icingadb.DB connection from config.Config.
func (c Command) Database() *icingadb.DB {
db, err := c.Config.Database.Open(c.Logger)
if err != nil {
@ -51,6 +54,7 @@ func (c Command) Database() *icingadb.DB {
return db
}
// Redis creates and returns a new icingaredis.Client connection from config.Config.
func (c Command) Redis() *icingaredis.Client {
rc, err := c.Config.Redis.NewClient(c.Logger)
if err != nil {

View file

@ -7,12 +7,14 @@ import (
"time"
)
// Bulker reads all values from a channel and streams them in chunks into a Bulk channel.
type Bulker struct {
ch chan []interface{}
ctx context.Context
mu sync.Mutex
}
// NewBulker returns a new Bulker and starts streaming.
func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker {
b := &Bulker{
ch: make(chan []interface{}),
@ -89,6 +91,7 @@ func (b *Bulker) run(ch <-chan interface{}, count int) {
_ = g.Wait()
}
// Bulk reads all values from a channel and streams them in chunks into a returned channel.
func Bulk(ctx context.Context, ch <-chan interface{}, count int) <-chan []interface{} {
return NewBulker(ctx, ch, count).Bulk()
}

View file

@ -36,16 +36,6 @@ func ErrgroupReceive(g *errgroup.Group, err <-chan error) {
})
}
// PipeError forwards the first non-nil error from in to out
// using a separate goroutine.
func PipeError(in <-chan error, out chan<- error) {
go func() {
if e := <-in; e != nil {
out <- e
}
}()
}
// CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item.
func CopyFirst(
ctx context.Context, input <-chan contracts.Entity,

View file

@ -2,7 +2,7 @@ package com
import "sync/atomic"
// Atomic counter.
// Counter implements an atomic counter.
type Counter uint64
// Add adds the given delta to the counter.

View file

@ -8,12 +8,14 @@ import (
"time"
)
// EntityBulker reads all entities from a channel and streams them in chunks into a Bulk channel.
type EntityBulker struct {
ch chan []contracts.Entity
ctx context.Context
mu sync.Mutex
}
// NewEntityBulker returns a new EntityBulker and starts streaming.
func NewEntityBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *EntityBulker {
b := &EntityBulker{
ch: make(chan []contracts.Entity),
@ -90,6 +92,7 @@ func (b *EntityBulker) run(ch <-chan contracts.Entity, count int) {
_ = g.Wait()
}
// BulkEntities reads all entities from a channel and streams them in chunks into a returned channel.
func BulkEntities(ctx context.Context, ch <-chan contracts.Entity, count int) <-chan []contracts.Entity {
return NewEntityBulker(ctx, ch, count).Bulk()
}

View file

@ -17,8 +17,6 @@ type Config struct {
type Flags struct {
// Config is the path to the config file
Config string `short:"c" long:"config" description:"path to config file" required:"true" default:"./config.yml"`
// Datadir is the location of the data directory
Datadir string `long:"datadir" description:"path to the data directory" required:"true" default:"./"`
}
// FromYAMLFile returns a new Config value created from the given YAML config file.
@ -39,20 +37,6 @@ func FromYAMLFile(name string) (*Config, error) {
return c, nil
}
// ValidateFile checks whether the given file name is a readable file.
func ValidateFile(name string) error {
f, err := os.Stat(name)
if err != nil {
return errors.Wrap(err, "can't read file "+name)
}
if f.IsDir() {
return errors.New(name + " is a directory")
}
return nil
}
// ParseFlags parses CLI flags and
// returns a Flags value created from them.
func ParseFlags() (*Flags, error) {
@ -63,9 +47,5 @@ func ParseFlags() (*Flags, error) {
return nil, errors.Wrap(err, "can't parse CLI flags")
}
if err := ValidateFile(f.Config); err != nil {
return nil, errors.Wrap(err, "invalid config file "+f.Config)
}
return f, nil
}

View file

@ -78,23 +78,23 @@ func dialWithLogging(logger *zap.SugaredLogger) func(context.Context, string, st
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (d *Redis) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := defaults.Set(d); err != nil {
return errors.Wrapf(err, "can't set defaults %#v", d)
func (r *Redis) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := defaults.Set(r); err != nil {
return errors.Wrapf(err, "can't set defaults %#v", r)
}
// Prevent recursion.
type self Redis
if err := unmarshal((*self)(d)); err != nil {
return internal.CantUnmarshalYAML(err, d)
if err := unmarshal((*self)(r)); err != nil {
return internal.CantUnmarshalYAML(err, r)
}
if d.MaxHMGetConnections < 1 {
if r.MaxHMGetConnections < 1 {
return errors.New("max_hmget_connections must be at least 1")
}
if d.HMGetCount < 1 {
if r.HMGetCount < 1 {
return errors.New("hmget_count must be at least 1")
}
if d.HScanCount < 1 {
if r.HScanCount < 1 {
return errors.New("hscan_count must be at least 1")
}

View file

@ -33,8 +33,14 @@ type DB struct {
tableSemaphoresMu sync.Mutex
}
// Options define user configurable database options.
type Options struct {
MaxConnections int `yaml:"max_connections" default:"16"`
// Maximum number of open connections to the database.
MaxConnections int `yaml:"max_connections" default:"16"`
// Maximum number of connections per table,
// regardless of what the connection is actually doing,
// e.g. INSERT, UPDATE, DELETE.
MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"`
// MaxPlaceholdersPerStatement defines the maximum number of placeholders in an
@ -59,6 +65,7 @@ func NewDb(db *sqlx.DB, logger *zap.SugaredLogger, options *Options) *DB {
}
}
// BuildColumns returns all columns of the given struct.
func (db *DB) BuildColumns(subject interface{}) []string {
fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names
columns := make([]string, 0, len(fields))
@ -72,6 +79,7 @@ func (db *DB) BuildColumns(subject interface{}) []string {
return columns
}
// BuildDeleteStmt returns a DELETE statement for the given struct.
func (db *DB) BuildDeleteStmt(from interface{}) string {
return fmt.Sprintf(
`DELETE FROM %s WHERE id IN (?)`,
@ -79,6 +87,7 @@ func (db *DB) BuildDeleteStmt(from interface{}) string {
)
}
// BuildInsertStmt returns an INSERT INTO statement for the given struct.
func (db *DB) BuildInsertStmt(into interface{}) (string, int) {
columns := db.BuildColumns(into)
@ -90,14 +99,17 @@ func (db *DB) BuildInsertStmt(into interface{}) (string, int) {
), len(columns)
}
func (db *DB) BuildSelectStmt(from interface{}, into interface{}) string {
// BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct
// and the column list from the specified columns struct.
func (db *DB) BuildSelectStmt(table interface{}, columns interface{}) string {
return fmt.Sprintf(
`SELECT %s FROM %s`,
strings.Join(db.BuildColumns(into), ", "),
utils.TableName(from),
strings.Join(db.BuildColumns(columns), ", "),
utils.TableName(table),
)
}
// BuildUpdateStmt returns an UPDATE statement for the given struct.
func (db *DB) BuildUpdateStmt(update interface{}) (string, int) {
columns := db.BuildColumns(update)
set := make([]string, 0, len(columns))
@ -113,6 +125,7 @@ func (db *DB) BuildUpdateStmt(update interface{}) (string, int) {
), len(columns) + 1 // +1 because of WHERE id = :id
}
// BuildUpsertStmt returns an upsert statement for the given struct.
func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) {
insertColumns := db.BuildColumns(subject)
var updateColumns []string
@ -138,6 +151,11 @@ func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders in
), len(insertColumns)
}
// BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`.
// Takes in up to the number of arguments specified in count from the arg stream,
// derives and expands a query and executes it with this set of arguments until the arg stream has been processed.
// The derived queries are executed in a separate goroutine with a weighting of 1
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error {
var cnt com.Counter
g, ctx := errgroup.WithContext(ctx)
@ -193,6 +211,13 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
return g.Wait()
}
// NamedBulkExec bulk executes queries with named placeholders in a VALUES clause most likely
// in the format INSERT ... VALUES. Takes in up to the number of entities specified in count
// from the arg stream, derives and executes a new query with the VALUES clause expanded to
// this set of arguments, until the arg stream has been processed.
// The queries are executed in a separate goroutine with a weighting of 1
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
// Entities for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) NamedBulkExec(
ctx context.Context, query string, count int, sem *semaphore.Weighted,
arg <-chan contracts.Entity, succeeded chan<- contracts.Entity,
@ -207,11 +232,6 @@ func (db *DB) NamedBulkExec(
})
g.Go(func() error {
// stmt, err := db.PrepareNamedContext(ctx, query)
// if err != nil {
// return err
// }
for {
select {
case b, ok := <-bulk:
@ -265,6 +285,12 @@ func (db *DB) NamedBulkExec(
return g.Wait()
}
// NamedBulkExecTx bulk executes queries with named placeholders in separate transactions.
// Takes in up to the number of entities specified in count from the arg stream and
// executes a new transaction that runs a new query for each entity in this set of arguments,
// until the arg stream has been processed.
// The transactions are executed in a separate goroutine with a weighting of 1
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
func (db *DB) NamedBulkExecTx(
ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity,
) error {
@ -346,6 +372,9 @@ func (db *DB) BatchSizeByPlaceholders(n int) int {
return 1
}
// YieldAll executes the query with the supplied args,
// scans each resulting row into an entity returned by the factory function,
// and streams them into a returned channel.
func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, args ...interface{}) (<-chan contracts.Entity, <-chan error) {
var cnt com.Counter
entities := make(chan contracts.Entity, 1)
@ -360,7 +389,7 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF
db.logger.Infof("Fetched %d elements of %s in %s", cnt.Val(), utils.Name(v), elapsed)
})
rows, err := db.Queryx(query, args...)
rows, err := db.QueryxContext(ctx, query, args...)
if err != nil {
return internal.CantPerformQuery(err, query)
}
@ -387,6 +416,10 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF
return entities, com.WaitAsync(g)
}
// CreateStreamed bulk creates the specified entities via NamedBulkExec.
// The insert statement is created using BuildInsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
@ -399,6 +432,10 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil)
}
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
@ -411,6 +448,10 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti
return db.NamedBulkExec(ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, succeeded)
}
// UpdateStreamed bulk updates the specified entities via NamedBulkExecTx.
// The update statement is created using BuildUpdateStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxRowsPerTransaction and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
@ -422,11 +463,17 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti
return db.NamedBulkExecTx(ctx, stmt, db.options.MaxRowsPerTransaction, sem, forward)
}
// DeleteStreamed bulk deletes the specified ids via BulkExec.
// The delete statement is created using BuildDeleteStmt with the passed entityType.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error {
sem := db.getSemaphoreForTable(utils.TableName(entityType))
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.options.MaxPlaceholdersPerStatement, sem, ids)
}
// Delete creates a channel from the specified ids and
// bulk deletes them by passing the channel along with the entityType to DeleteStreamed.
func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error {
idsCh := make(chan interface{}, len(ids))
for _, id := range ids {
@ -437,6 +484,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int
return db.DeleteStreamed(ctx, entityType, idsCh)
}
// IsRetryable checks whether the given error is retryable.
func IsRetryable(err error) bool {
if errors.Is(err, driver.ErrBadConn) {
return true
@ -451,8 +499,8 @@ func IsRetryable(err error) bool {
switch e.Number {
case 1053, 1205, 1213, 2006:
// 1053: Server shutdown in progress
// 1205:
// 1213:
// 1205: Lock wait timeout
// 1213: Deadlock found when trying to get lock
// 2006: MySQL server has gone away
return true
}

View file

@ -12,6 +12,7 @@ import (
"time"
)
// Delta calculates the delta of actual and desired entities, and stores which entities need to be created, updated, and deleted.
type Delta struct {
Create EntitiesById
Update EntitiesById
@ -21,6 +22,7 @@ type Delta struct {
logger *zap.SugaredLogger
}
// NewDelta creates a new Delta and starts calculating it.
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta {
delta := &Delta{
Subject: subject,
@ -28,16 +30,17 @@ func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subj
logger: logger,
}
go delta.start(ctx, actual, desired)
go delta.run(ctx, actual, desired)
return delta
}
// Wait waits for the delta calculation to complete and returns an error, if any.
func (delta *Delta) Wait() error {
return <-delta.done
}
func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) {
func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) {
defer close(delta.done)
var update EntitiesById

View file

@ -9,6 +9,8 @@ import (
"sync"
)
// DumpSignals reads dump signals from a Redis stream via Listen.
// Dump-done signals are passed on via Done channels, while InProgress must be checked for dump-wip signals.
type DumpSignals struct {
redis *icingaredis.Client
logger *zap.SugaredLogger
@ -18,6 +20,7 @@ type DumpSignals struct {
inProgressCh chan struct{}
}
// NewDumpSignals returns new DumpSignals.
func NewDumpSignals(redis *icingaredis.Client, logger *zap.SugaredLogger) *DumpSignals {
return &DumpSignals{
redis: redis,

View file

@ -5,8 +5,10 @@ import (
"github.com/icinga/icingadb/pkg/contracts"
)
// EntitiesById is a map of key-contracts.Entity pairs.
type EntitiesById map[string]contracts.Entity
// Keys returns the keys.
func (ebi EntitiesById) Keys() []string {
keys := make([]string, 0, len(ebi))
for k := range ebi {
@ -16,6 +18,7 @@ func (ebi EntitiesById) Keys() []string {
return keys
}
// IDs returns the contracts.ID of the entities.
func (ebi EntitiesById) IDs() []interface{} {
ids := make([]interface{}, 0, len(ebi))
for _, v := range ebi {
@ -25,6 +28,7 @@ func (ebi EntitiesById) IDs() []interface{} {
return ids
}
// Entities streams the entities on a returned channel.
func (ebi EntitiesById) Entities(ctx context.Context) <-chan contracts.Entity {
entities := make(chan contracts.Entity)

View file

@ -20,9 +20,10 @@ import (
var timeout = 60 * time.Second
// HA provides high availability and indicates whether a Takeover or Handover must be made.
type HA struct {
ctx context.Context
cancel context.CancelFunc
cancelCtx context.CancelFunc
instanceId types.Binary
db *DB
heartbeat *icingaredis.Heartbeat
@ -36,14 +37,15 @@ type HA struct {
errOnce sync.Once
}
// NewHA returns a new HA and starts the controller loop.
func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA {
ctx, cancel := context.WithCancel(ctx)
ctx, cancelCtx := context.WithCancel(ctx)
instanceId := uuid.New()
ha := &HA{
ctx: ctx,
cancel: cancel,
cancelCtx: cancelCtx,
instanceId: instanceId[:],
db: db,
heartbeat: heartbeat,
@ -62,7 +64,7 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger
// Close implements the io.Closer interface.
func (h *HA) Close() error {
// Cancel ctx.
h.cancel()
h.cancelCtx()
// Wait until the controller loop ended.
<-h.Done()
// Remove our instance from the database.
@ -71,10 +73,12 @@ func (h *HA) Close() error {
return h.Err()
}
// Done returns a channel that's closed when the HA controller loop ended.
func (h *HA) Done() <-chan struct{} {
return h.done
}
// Err returns an error if Done has been closed and there is an error. Otherwise returns nil.
func (h *HA) Err() error {
h.mu.Lock()
defer h.mu.Unlock()
@ -82,10 +86,12 @@ func (h *HA) Err() error {
return h.err
}
// Handover returns a channel with which handovers are signaled.
func (h *HA) Handover() chan struct{} {
return h.handover
}
// Takeover returns a channel with which takeovers are signaled.
func (h *HA) Takeover() chan struct{} {
return h.takeover
}
@ -96,7 +102,7 @@ func (h *HA) abort(err error) {
h.err = errors.Wrap(err, "HA aborted")
h.mu.Unlock()
h.cancel()
h.cancelCtx()
})
}
@ -168,18 +174,18 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo
sleep := boff(uint64(attempt))
time.Sleep(sleep)
ctx, cancel := context.WithCancel(h.ctx)
ctx, cancelCtx := context.WithCancel(h.ctx)
tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
cancel()
cancelCtx()
return errors.Wrap(err, "can't start transaction")
}
query := `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`
rows, err := tx.QueryxContext(ctx, query, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout)))
if err != nil {
cancel()
cancelCtx()
return internal.CantPerformQuery(err, query)
}
takeover := true
@ -222,7 +228,7 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo
_, err = tx.NamedExecContext(ctx, stmt, i)
if err != nil {
cancel()
cancelCtx()
err = internal.CantPerformQuery(err, stmt)
if !utils.IsDeadlock(err) {
h.logger.Errorw("Can't update or insert instance", zap.Error(err))
@ -239,14 +245,14 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo
}
if err := tx.Commit(); err != nil {
cancel()
cancelCtx()
return errors.Wrap(err, "can't commit transaction")
}
if takeover {
h.signalTakeover()
}
cancel()
cancelCtx()
break
}

View file

@ -47,7 +47,7 @@ func PackAny(in interface{}, out io.Writer) error {
var tByte = reflect.TypeOf(byte(0))
var tBytes = reflect.TypeOf([]uint8(nil))
// packValue does the actual job of packAny and just exists for recursion w/o unneccessary reflect.ValueOf calls.
// packValue does the actual job of packAny and just exists for recursion w/o unnecessary reflect.ValueOf calls.
func packValue(in reflect.Value, out io.Writer) error {
switch kind := in.Kind(); kind {
case reflect.Invalid: // nil

View file

@ -22,6 +22,7 @@ type Sync struct {
logger *zap.SugaredLogger
}
// NewSync returns a new Sync.
func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync {
return &Sync{
db: db,

View file

@ -5,11 +5,13 @@ import (
"github.com/icinga/icingadb/pkg/types"
)
// UpserterEntity provides upsert for entities.
type UpserterEntity interface {
contracts.Upserter
contracts.Entity
}
// HistoryTableEntity is embedded by every concrete history type that has its own table.
type HistoryTableEntity struct {
Id types.UUID `json:"id"`
}
@ -35,6 +37,7 @@ func (hte HistoryTableEntity) Upsert() interface{} {
return hte
}
// HistoryEntity is embedded by every concrete history type.
type HistoryEntity struct {
Id types.UUID `json:"event_id"`
}
@ -60,6 +63,7 @@ func (he HistoryEntity) Upsert() interface{} {
return he
}
// HistoryTableMeta is embedded by every concrete history type that has its own table.
type HistoryTableMeta struct {
EnvironmentId types.Binary `json:"environment_id"`
EndpointId types.Binary `json:"endpoint_id"`
@ -68,6 +72,7 @@ type HistoryTableMeta struct {
ServiceId types.Binary `json:"service_id"`
}
// HistoryMeta is embedded by every concrete history type that belongs to the history table.
type HistoryMeta struct {
HistoryEntity `json:",inline"`
EnvironmentId types.Binary `json:"environment_id"`

View file

@ -57,12 +57,14 @@ func (n *NameCiMeta) Init() {
n.NameCi = &n.Name
}
// CustomvarMeta is embedded by every type with custom variables.
type CustomvarMeta struct {
EntityWithoutChecksum `json:",inline"`
EnvironmentMeta `json:",inline"`
CustomvarId types.Binary `json:"customvar_id"`
}
// GroupMeta is embedded by every type that represents a specific group.
type GroupMeta struct {
EntityWithChecksum `json:",inline"`
EnvironmentMeta `json:",inline"`
@ -71,6 +73,7 @@ type GroupMeta struct {
ZoneId types.Binary `json:"zone_id"`
}
// MemberMeta is embedded by every type that represents members of a specific group.
type MemberMeta struct {
EntityWithoutChecksum `json:",inline"`
EnvironmentMeta `json:",inline"`

View file

@ -24,6 +24,7 @@ type Client struct {
options *Options
}
// Options define user configurable Redis options.
type Options struct {
Timeout time.Duration `yaml:"timeout" default:"30s"`
MaxHMGetConnections int `yaml:"max_hmget_connections" default:"4096"`

View file

@ -22,7 +22,7 @@ var timeout = 60 * time.Second
type Heartbeat struct {
active bool
beat *com.Cond
cancel context.CancelFunc
cancelCtx context.CancelFunc
client *Client
done chan struct{}
errMu sync.Mutex
@ -35,15 +35,15 @@ type Heartbeat struct {
// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger) *Heartbeat {
ctx, cancel := context.WithCancel(ctx)
ctx, cancelCtx := context.WithCancel(ctx)
heartbeat := &Heartbeat{
beat: com.NewCond(ctx),
cancel: cancel,
client: client,
done: make(chan struct{}),
logger: logger,
lost: com.NewCond(ctx),
beat: com.NewCond(ctx),
cancelCtx: cancelCtx,
client: client,
done: make(chan struct{}),
logger: logger,
lost: com.NewCond(ctx),
}
go heartbeat.controller(ctx)
@ -59,7 +59,7 @@ func (h *Heartbeat) Beat() <-chan struct{} {
// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (h *Heartbeat) Close() error {
h.cancel()
h.cancelCtx()
<-h.Done()
return h.Err()

View file

@ -31,6 +31,9 @@ func (s Streams) Option() []string {
return append(streams, ids...)
}
// CreateEntities streams and creates entities from the
// given Redis field value pairs using the specified factory function,
// and streams them on a returned channel.
func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) {
entities := make(chan contracts.Entity)
g, ctx := errgroup.WithContext(ctx)
@ -72,6 +75,9 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc
return entities, com.WaitAsync(g)
}
// SetChecksums concurrently streams from the given entities and
// sets their checksums using the specified map and
// streams the results on a returned channel.
func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) {
entitiesWithChecksum := make(chan contracts.Entity)
g, ctx := errgroup.WithContext(ctx)
@ -87,9 +93,7 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu
if checksumer, ok := checksums[entity.ID().String()]; ok {
entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum())
} else {
panic("no checksum")
// TODO(el): Error is not published
//return errors.New("no checksum")
return errors.Errorf("no checksum for %#v", entity)
}
select {
@ -109,7 +113,8 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu
return entitiesWithChecksum, com.WaitAsync(g)
}
// WrapCmdErr adds the command itself and the stack of the current goroutine to the command's error if any.
// WrapCmdErr adds the command itself and
// the stack of the current goroutine to the command's error if any.
func WrapCmdErr(cmd redis.Cmder) error {
err := cmd.Err()
if err != nil {

View file

@ -5,6 +5,7 @@ import (
"github.com/icinga/icingadb/pkg/types"
)
// IcingaStatus defines Icinga status information.
type IcingaStatus struct {
Environment string `json:"environment"`
NodeName string `json:"node_name"`
@ -19,6 +20,7 @@ type IcingaStatus struct {
PerformanceDataEnabled types.Bool `json:"enable_perfdata"`
}
// EnvironmentID returns the environment ID.
func (s *IcingaStatus) EnvironmentID() types.Binary {
chksm := sha1.Sum([]byte(s.Environment))

View file

@ -9,10 +9,12 @@ import (
// StatsMessage represents a message from the Redis stream icinga:stats.
type StatsMessage map[string]interface{}
// Raw returns the key-value pairs of the message.
func (m StatsMessage) Raw() map[string]interface{} {
return m
}
// IcingaStatus extracts Icinga status information from the message into IcingaStatus and returns it.
func (m StatsMessage) IcingaStatus() (*IcingaStatus, error) {
if s, ok := m["IcingaApplication"].(string); ok {
var envelope struct {
@ -33,6 +35,7 @@ func (m StatsMessage) IcingaStatus() (*IcingaStatus, error) {
return nil, errors.Errorf(`bad message %#v. "IcingaApplication" missing`, m)
}
// Time extracts the timestamp of the message into types.UnixMilli and returns it.
func (m StatsMessage) Time() (*types.UnixMilli, error) {
if s, ok := m["timestamp"].(string); ok {
var t types.UnixMilli

View file

@ -20,16 +20,15 @@ func WithBackoff(
ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, timeout time.Duration,
) (err error) {
if timeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
var cancelCtx context.CancelFunc
ctx, cancelCtx = context.WithTimeout(ctx, timeout)
defer cancelCtx()
}
for attempt := 0; ; /* true */ attempt++ {
prevErr := err
if err = retryableFunc(ctx); err == nil {
// No error.
return
}
@ -40,7 +39,6 @@ func WithBackoff(
}
if !isRetryable {
// Not retryable.
err = errors.Wrap(err, "can't retry")
return
@ -49,7 +47,6 @@ func WithBackoff(
sleep := b(uint64(attempt))
select {
case <-ctx.Done():
// Context canceled. Return last known error.
if err == nil {
err = ctx.Err()
}
@ -57,7 +54,6 @@ func WithBackoff(
return
case <-time.After(sleep):
// Wait for backoff duration and continue.
}
}
}

View file

@ -8,12 +8,12 @@ import (
"github.com/pkg/errors"
)
// Acknowledgement specifies an acknowledgement state (yes, no, sticky).
// AcknowledgementState specifies an acknowledgement state (yes, no, sticky).
type AcknowledgementState uint8
// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (as *AcknowledgementState) UnmarshalText(bytes []byte) error {
return as.UnmarshalJSON(bytes)
func (as *AcknowledgementState) UnmarshalText(text []byte) error {
return as.UnmarshalJSON(text)
}
// UnmarshalJSON implements the json.Unmarshaler interface.

View file

@ -41,7 +41,7 @@ func (binary Binary) String() string {
// MarshalText implements a custom marhsal function to encode
// the Binary as hex. MarshalText implements the
// enconding.TextMarshaler interface.
// encoding.TextMarshaler interface.
func (binary Binary) MarshalText() ([]byte, error) {
return []byte(binary.String()), nil
}

View file

@ -10,18 +10,6 @@ import (
"strconv"
)
var (
Yes = Bool{
Bool: true,
Valid: true,
}
No = Bool{
Bool: false,
Valid: true,
}
)
var (
enum = map[bool]string{
true: "y",

View file

@ -13,15 +13,15 @@ import (
type CommentType uint8
// UnmarshalJSON implements the json.Unmarshaler interface.
func (ct *CommentType) UnmarshalJSON(bytes []byte) error {
func (ct *CommentType) UnmarshalJSON(data []byte) error {
var i uint8
if err := internal.UnmarshalJSON(bytes, &i); err != nil {
if err := internal.UnmarshalJSON(data, &i); err != nil {
return err
}
c := CommentType(i)
if _, ok := commentTypes[c]; !ok {
return badCommentType(bytes)
return badCommentType(data)
}
*ct = c
@ -29,22 +29,22 @@ func (ct *CommentType) UnmarshalJSON(bytes []byte) error {
}
// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (ct *CommentType) UnmarshalText(bytes []byte) error {
text := string(bytes)
func (ct *CommentType) UnmarshalText(text []byte) error {
s := string(text)
i, err := strconv.ParseUint(text, 10, 64)
i, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return internal.CantParseUint64(err, text)
return internal.CantParseUint64(err, s)
}
c := CommentType(i)
if uint64(c) != i {
// Truncated due to above cast, obviously too high
return badCommentType(text)
return badCommentType(s)
}
if _, ok := commentTypes[c]; !ok {
return badCommentType(text)
return badCommentType(s)
}
*ct = c

View file

@ -12,9 +12,9 @@ import (
type NotificationStates uint8
// UnmarshalJSON implements the json.Unmarshaler interface.
func (nst *NotificationStates) UnmarshalJSON(bytes []byte) error {
func (nst *NotificationStates) UnmarshalJSON(data []byte) error {
var states []string
if err := internal.UnmarshalJSON(bytes, &states); err != nil {
if err := internal.UnmarshalJSON(data, &states); err != nil {
return err
}

View file

@ -12,22 +12,22 @@ import (
type NotificationType uint16
// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (nt *NotificationType) UnmarshalText(bytes []byte) error {
text := string(bytes)
func (nt *NotificationType) UnmarshalText(text []byte) error {
s := string(text)
i, err := strconv.ParseUint(text, 10, 64)
i, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return internal.CantParseUint64(err, text)
return internal.CantParseUint64(err, s)
}
n := NotificationType(i)
if uint64(n) != i {
// Truncated due to above cast, obviously too high
return badNotificationType(text)
return badNotificationType(s)
}
if _, ok := notificationTypes[n]; !ok {
return badNotificationType(text)
return badNotificationType(s)
}
*nt = n

View file

@ -12,9 +12,9 @@ import (
type NotificationTypes uint16
// UnmarshalJSON implements the json.Unmarshaler interface.
func (nt *NotificationTypes) UnmarshalJSON(bytes []byte) error {
func (nt *NotificationTypes) UnmarshalJSON(data []byte) error {
var types []string
if err := internal.UnmarshalJSON(bytes, &types); err != nil {
if err := internal.UnmarshalJSON(data, &types); err != nil {
return err
}

View file

@ -12,8 +12,8 @@ import (
type StateType uint8
// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (st *StateType) UnmarshalText(bytes []byte) error {
return st.UnmarshalJSON(bytes)
func (st *StateType) UnmarshalText(text []byte) error {
return st.UnmarshalJSON(text)
}
// UnmarshalJSON implements the json.Unmarshaler interface.

View file

@ -5,17 +5,11 @@ import (
"crypto/sha1"
"fmt"
"github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/exp/utf8string"
"io/ioutil"
"math"
"math/rand"
"os"
"strings"
"sync"
"time"
"unicode"
)
@ -71,10 +65,21 @@ func Key(name string, sep byte) string {
return b.String()
}
// Timed calls the given callback with the time that has elapsed since the start.
//
// Timed should be installed by defer:
//
// func TimedExample(logger *zap.SugaredLogger) {
// defer utils.Timed(time.Now(), func(elapsed time.Duration) {
// logger.Debugf("Executed job in %s", elapsed)
// })
// job()
// }
func Timed(start time.Time, callback func(elapsed time.Duration)) {
callback(time.Since(start))
}
// BatchSliceOfStrings groups the given keys into chunks of size count and streams them into a returned channel.
func BatchSliceOfStrings(ctx context.Context, keys []string, count int) <-chan []string {
batches := make(chan []string)
@ -98,69 +103,12 @@ func BatchSliceOfStrings(ctx context.Context, keys []string, count int) <-chan [
return batches
}
func BatchSliceOfInterfaces(ctx context.Context, keys []interface{}, count int) <-chan []interface{} {
batches := make(chan []interface{})
go func() {
defer close(batches)
for i := 0; i < len(keys); i += count {
end := i + count
if end > len(keys) {
end = len(keys)
}
select {
case batches <- keys[i:end]:
case <-ctx.Done():
return
}
}
}()
return batches
}
// IsContextCanceled returns whether the given error is context.Canceled.
func IsContextCanceled(err error) bool {
return errors.Is(err, context.Canceled)
}
func CreateOrRead(name string, callback func() []byte) ([]byte, error) {
info, err := os.Stat(name)
if os.IsNotExist(err) {
b := callback()
if err := ioutil.WriteFile(name, b, 0660); err != nil {
defer os.Remove(name)
return nil, errors.Wrap(err, "can't write to file "+name)
}
return b, nil
}
if err != nil {
return nil, errors.Wrap(err, "can't read file "+name)
}
if info.IsDir() {
return nil, errors.Errorf(name + " is a directory")
}
b, err := ioutil.ReadFile(name)
if err != nil {
return nil, errors.Wrap(err, "can't read file "+name)
}
return b, nil
}
func Uuid() []byte {
u := uuid.New()
return u[:]
}
// Checksum returns the SHA-1 checksum of the data.
func Checksum(data interface{}) []byte {
var chksm [sha1.Size]byte
@ -176,8 +124,8 @@ func Checksum(data interface{}) []byte {
return chksm[:]
}
// Fatal panics with the given error.
func Fatal(err error) {
// TODO(el): Print stacktrace via some recover() magic?
panic(err)
}
@ -194,17 +142,6 @@ func IsDeadlock(err error) bool {
return false
}
func RandomSleep(sugar *zap.SugaredLogger) {
once := sync.Once{}
once.Do(func() {
rand.Seed(time.Now().UnixNano())
})
n := rand.Intn(100)
d := time.Duration(n) * time.Millisecond
sugar.Info("Sleeping for ", d)
time.Sleep(d)
}
var ellipsis = utf8string.NewString("...")
// Ellipsize shortens s to <=limit runes and indicates shortening by "...".