From 8cbf24932edeec6fde83ea5bfb41e6e36101c29a Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 29 Apr 2021 13:59:54 +0200 Subject: [PATCH] Add stack of current goroutine to errors --- internal/command/command.go | 3 ++- pkg/config/config.go | 12 +++++------ pkg/config/database.go | 7 ++++--- pkg/config/redis.go | 7 ++++++- pkg/driver/driver.go | 3 +++ pkg/icingadb/db.go | 19 +++++++++--------- pkg/icingadb/ha.go | 30 ++++++++++++++++++---------- pkg/icingaredis/utils.go | 2 +- pkg/icingaredis/v1/stats_message.go | 6 +++--- pkg/structify/structify.go | 31 +++++++++++++++++++++++++---- 10 files changed, 81 insertions(+), 39 deletions(-) diff --git a/internal/command/command.go b/internal/command/command.go index 7f7df9b0..518849dd 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -5,6 +5,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" "go.uber.org/zap" ) @@ -27,7 +28,7 @@ func New() *Command { logger, err := zap.NewDevelopment() if err != nil { - utils.Fatal(err) + utils.Fatal(errors.Wrap(err, "can't create logger")) } sugar := logger.Sugar() diff --git a/pkg/config/config.go b/pkg/config/config.go index 977e6bf0..a1a4115a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,8 +1,8 @@ package config import ( - "fmt" "github.com/jessevdk/go-flags" + "github.com/pkg/errors" "gopkg.in/yaml.v3" "os" ) @@ -25,7 +25,7 @@ type Flags struct { func FromYAMLFile(name string) (*Config, error) { f, err := os.Open(name) if err != nil { - return nil, err + return nil, errors.Wrap(err, "can't open YAML file") } defer f.Close() @@ -33,7 +33,7 @@ func FromYAMLFile(name string) (*Config, error) { d := yaml.NewDecoder(f) if err := d.Decode(&c); err != nil { - return nil, err + return nil, errors.Wrap(err, "can't parse YAML file "+name) } return c, nil @@ -43,11 +43,11 @@ func FromYAMLFile(name string) (*Config, error) { func ValidateFile(name string) error { f, err := os.Stat(name) if err != nil { - return err + return errors.Wrap(err, "not a readable file") } if f.IsDir() { - return fmt.Errorf("'%s' is a directory", name) + return errors.Errorf("'%s' is a directory", name) } return nil @@ -60,7 +60,7 @@ func ParseFlags() (*Flags, error) { parser := flags.NewParser(f, flags.Default) if _, err := parser.Parse(); err != nil { - return nil, err + return nil, errors.Wrap(err, "can't parse CLI flags") } if err := ValidateFile(f.Config); err != nil { diff --git a/pkg/config/database.go b/pkg/config/database.go index e9bfebb7..78e7ef10 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -9,6 +9,7 @@ import ( "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" + "github.com/pkg/errors" "go.uber.org/zap" "sync" ) @@ -38,7 +39,7 @@ func (d *Database) Open(logger *zap.SugaredLogger) (*icingadb.DB, error) { db, err := sqlx.Open("icingadb-mysql", dsn) if err != nil { - return nil, err + return nil, errors.Wrap(err, "can't open database") } db.SetMaxIdleConns(d.MaxConnections / 3) @@ -54,12 +55,12 @@ func (d *Database) Open(logger *zap.SugaredLogger) (*icingadb.DB, error) { // UnmarshalYAML implements the yaml.Unmarshaler interface. func (d *Database) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := defaults.Set(d); err != nil { - return err + return errors.Wrap(err, "can't set default database config") } // Prevent recursion. type self Database if err := unmarshal((*self)(d)); err != nil { - return err + return errors.Wrap(err, "can't parse database config") } if d.MaxConnectionsPerTable < 1 { diff --git a/pkg/config/redis.go b/pkg/config/redis.go index ec429bb5..6661871a 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -2,12 +2,12 @@ package config import ( "context" - "errors" "github.com/creasty/defaults" "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/retry" + "github.com/pkg/errors" "go.uber.org/zap" "net" "os" @@ -69,6 +69,11 @@ func dialWithLogging(logger *zap.SugaredLogger) func(context.Context, string, st backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), 5*time.Minute, ) + + if err != nil { + err = errors.Wrap(err, "can't connect to Redis") + } + return } } diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 6ec4c495..658b9080 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -44,6 +44,9 @@ func (d Driver) Open(dsn string) (c driver.Conn, err error) { backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1), timeout, ) + if err != nil { + err = errors.Wrap(err, "can't connect to database") + } return } diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 4c5e698b..85fe64b2 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -151,15 +151,15 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph return retry.WithBackoff( ctx, func(context.Context) error { - query, args, err := sqlx.In(query, b) + stmt, args, err := sqlx.In(query, b) if err != nil { - return err + return errors.Wrap(err, "can't build SQL placeholders for "+query) } - query = db.Rebind(query) - _, err = db.ExecContext(ctx, query, args...) + stmt = db.Rebind(stmt) + _, err = db.ExecContext(ctx, stmt, args...) if err != nil { - return err + return errors.Wrap(err, "can't perform "+query) } cnt.Add(uint64(len(b))) @@ -220,6 +220,7 @@ func (db *DB) NamedBulkExec( db.logger.Debugf("Executing %s with %d rows..", query, len(b)) _, err := db.NamedExecContext(ctx, query, b) if err != nil { + err = errors.Wrap(err, "can't perform "+query) fmt.Println(err) return err } @@ -339,7 +340,7 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF rows, err := db.Queryx(query, args...) if err != nil { - return err + return errors.Wrap(err, "can't perform "+query) } defer rows.Close() @@ -347,7 +348,7 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF e := factoryFunc() if err := rows.StructScan(e); err != nil { - return err + return errors.Wrap(err, fmt.Sprintf("can't store query result into a %T: %s", e, query)) } select { @@ -381,8 +382,8 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti } // TODO(ak): wait for https://github.com/jmoiron/sqlx/issues/694 - //stmt, placeholders := db.BuildUpsertStmt(first) - //return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, 1<<3, forward, succeeded) + // stmt, placeholders := db.BuildUpsertStmt(first) + // return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, 1<<3, forward, succeeded) stmt, _ := db.BuildUpsertStmt(first) sem := db.getSemaphoreForTable(utils.TableName(first)) return db.NamedBulkExec(ctx, stmt, 1, sem, forward, succeeded) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 594a93bc..3490bac3 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -11,6 +11,7 @@ import ( icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" "go.uber.org/zap" "sync" "time" @@ -171,12 +172,13 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo }) if err != nil { cancel() - return err + return errors.Wrap(err, "can't start transaction") } - rows, err := tx.QueryxContext(ctx, `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout))) + query := `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?` + rows, err := tx.QueryxContext(ctx, query, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout))) if err != nil { cancel() - return err + return errors.Wrap(err, "can't perform "+query) } takeover := true if rows.Next() { @@ -218,6 +220,7 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo _, err = tx.NamedExecContext(ctx, stmt, i) if err != nil { + err = errors.Wrap(err, "can't perform "+stmt) cancel() if !utils.IsDeadlock(err) { h.logger.Errorw("Can't update or insert instance", zap.Error(err)) @@ -235,7 +238,7 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo if err := tx.Commit(); err != nil { cancel() - return err + return errors.Wrap(err, "can't commit transaction") } if takeover { h.signalTakeover() @@ -251,9 +254,10 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLo func (h *HA) removeInstance() { h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId))) // Intentionally not using a context here as this is a cleanup task and h.ctx is already cancelled. - _, err := h.db.Exec("DELETE FROM icingadb_instance WHERE id = ?", h.instanceId) + query := "DELETE FROM icingadb_instance WHERE id = ?" + _, err := h.db.Exec(query, h.instanceId) if err != nil { - h.logger.Warnw("Could not remove instance from database", zap.Error(err)) + h.logger.Warnw("Could not remove instance from database", zap.Error(errors.Wrap(err, "can't perform "+query))) } } @@ -262,16 +266,20 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus) { case <-h.ctx.Done(): return case <-time.After(timeout): - result, err := h.db.ExecContext(h.ctx, "DELETE FROM icingadb_instance "+ - "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?", - h.instanceId, s.EnvironmentID(), s.EndpointId, types.UnixMilli(time.Now().Add(-timeout))) + query := "DELETE FROM icingadb_instance " + + "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?" + result, err := h.db.ExecContext(h.ctx, query, h.instanceId, s.EnvironmentID(), + s.EndpointId, types.UnixMilli(time.Now().Add(-timeout))) if err != nil { - h.logger.Errorw("Can't remove rows of old instances", zap.Error(err)) + h.logger.Errorw("Can't remove rows of old instances", zap.Error(errors.Wrap(err, "can't perform "+query))) return } affected, err := result.RowsAffected() if err != nil { - h.logger.Errorw("Can't get number of removed old instances", zap.Error(err)) + h.logger.Errorw( + "Can't get number of removed old instances", + zap.Error(errors.Wrap(err, "can't get affected rows")), + ) return } h.logger.Debugf("Removed %d old instances", affected) diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go index eb661724..5add78ee 100644 --- a/pkg/icingaredis/utils.go +++ b/pkg/icingaredis/utils.go @@ -32,7 +32,7 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc e := factoryFunc() if err := json.Unmarshal([]byte(pair.Value), e); err != nil { - return err + return errors.Wrap(err, "can't unJSON entity") } e.SetID(id) diff --git a/pkg/icingaredis/v1/stats_message.go b/pkg/icingaredis/v1/stats_message.go index f711a9ba..f7689333 100644 --- a/pkg/icingaredis/v1/stats_message.go +++ b/pkg/icingaredis/v1/stats_message.go @@ -2,8 +2,8 @@ package v1 import ( "encoding/json" - "errors" "github.com/icinga/icingadb/pkg/types" + "github.com/pkg/errors" ) // StatsMessage represents a message from the Redis stream icinga:stats. @@ -24,7 +24,7 @@ func (m StatsMessage) IcingaStatus() (*IcingaStatus, error) { } if err := json.Unmarshal([]byte(s), &envelope); err != nil { - return nil, err + return nil, errors.Wrap(err, "can't parse Icinga 2 status") } return &envelope.Status.IcingaApplication.IcingaStatus, nil @@ -38,7 +38,7 @@ func (m StatsMessage) Time() (*types.UnixMilli, error) { var t types.UnixMilli if err := json.Unmarshal([]byte(s), &t); err != nil { - return nil, err + return nil, errors.Wrap(err, "can't parse timestamp") } return &t, nil diff --git a/pkg/structify/structify.go b/pkg/structify/structify.go index 2569b815..b2ad1d19 100644 --- a/pkg/structify/structify.go +++ b/pkg/structify/structify.go @@ -4,8 +4,10 @@ import ( "encoding" "fmt" "github.com/icinga/icingadb/pkg/contracts" + "github.com/pkg/errors" "reflect" "strconv" + "strings" "unsafe" ) @@ -35,7 +37,8 @@ func MakeMapStructifier(t reflect.Type, tag string) MapStructifier { initer.Init() } - return ptr, structifyMapByTree(kv, tree, vPtr.Elem()) + vPtrElem := vPtr.Elem() + return ptr, structifyMapByTree(kv, tree, vPtrElem, vPtrElem, new([]int)) } } @@ -65,17 +68,37 @@ func buildStructTree(t reflect.Type, tag string) []structBranch { } // structifyMapByTree parses src's string values into the struct dest according to tree's specification. -func structifyMapByTree(src map[string]interface{}, tree []structBranch, dest reflect.Value) error { +func structifyMapByTree(src map[string]interface{}, tree []structBranch, dest, root reflect.Value, stack *[]int) error { + *stack = append(*stack, 0) + defer func() { + *stack = (*stack)[:len(*stack)-1] + }() + for _, branch := range tree { + (*stack)[len(*stack)-1] = branch.field + if branch.subTree == nil { if v, ok := src[branch.leaf]; ok { if vs, ok := v.(string); ok { if err := parseString(vs, dest.Field(branch.field).Addr().Interface()); err != nil { - return err + rt := root.Type() + typ := rt + var path []string + + for _, i := range *stack { + f := typ.Field(i) + path = append(path, f.Name) + typ = f.Type + } + + return errors.Wrap(err, fmt.Sprintf( + "can't parse %s into the %s %s#%s: %s", branch.leaf, + typ.Name(), rt.Name(), strings.Join(path, "."), vs, + )) } } } - } else if err := structifyMapByTree(src, branch.subTree, dest.Field(branch.field)); err != nil { + } else if err := structifyMapByTree(src, branch.subTree, dest.Field(branch.field), root, stack); err != nil { return err } }