From ee023b1202022872e1012c37690171c78dbff59b Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 7 Apr 2021 11:42:21 +0200 Subject: [PATCH 01/17] Introduce utils.MakeMapStructifier() --- pkg/structify/structify.go | 157 +++++++++++++++++++++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 pkg/structify/structify.go diff --git a/pkg/structify/structify.go b/pkg/structify/structify.go new file mode 100644 index 00000000..d04b3f4c --- /dev/null +++ b/pkg/structify/structify.go @@ -0,0 +1,157 @@ +package structify + +import ( + "encoding" + "fmt" + "github.com/icinga/icingadb/pkg/contracts" + "reflect" + "strconv" + "unsafe" +) + +// structBranch represents either a leaf or a subTree. +type structBranch struct { + // field specifies the struct field index. + field int + // leaf specifies the map key to parse the struct field from. + leaf string + // subTree specifies the struct field's inner tree. + subTree []structBranch +} + +type MapStructifier = func(map[string]interface{}) (interface{}, error) + +// MakeMapStructifier builds a function which parses a map's string values into a new struct of type t +// and returns a pointer to it. tag specifies which tag connects struct fields to map keys. +// MakeMapStructifier panics if it detects an unsupported type (suitable for usage in init() or global vars). +func MakeMapStructifier(t reflect.Type, tag string) MapStructifier { + tree := buildStructTree(t, tag) + + return func(kv map[string]interface{}) (interface{}, error) { + vPtr := reflect.New(t) + ptr := vPtr.Interface() + + if initer, ok := ptr.(contracts.Initer); ok { + initer.Init() + } + + return ptr, structifyMapByTree(kv, tree, vPtr.Elem()) + } +} + +// buildStructTree assembles a tree which represents the struct t based on tag. +func buildStructTree(t reflect.Type, tag string) []structBranch { + var tree []structBranch + numFields := t.NumField() + + for i := 0; i < numFields; i++ { + if field := t.Field(i); field.PkgPath == "" { + switch tagValue := field.Tag.Get(tag); tagValue { + case "", "-": + case ",inline": + if subTree := buildStructTree(field.Type, tag); subTree != nil { + tree = append(tree, structBranch{i, "", subTree}) + } + default: + // If parseString doesn't support *T, it'll panic. + _ = parseString("", reflect.New(field.Type).Interface()) + + tree = append(tree, structBranch{i, tagValue, nil}) + } + } + } + + return tree +} + +// structifyMapByTree parses src's string values into the struct dest according to tree's specification. +func structifyMapByTree(src map[string]interface{}, tree []structBranch, dest reflect.Value) error { + for _, branch := range tree { + if branch.subTree == nil { + if v, ok := src[branch.leaf]; ok { + if vs, ok := v.(string); ok { + if err := parseString(vs, dest.Field(branch.field).Addr().Interface()); err != nil { + return err + } + } + } + } else if err := structifyMapByTree(src, branch.subTree, dest.Field(branch.field)); err != nil { + return err + } + } + + return nil +} + +// parseString parses src into *dest. +func parseString(src string, dest interface{}) error { + switch ptr := dest.(type) { + case encoding.TextUnmarshaler: + return ptr.UnmarshalText([]byte(src)) + case *string: + *ptr = src + return nil + case *uint8: + i, err := strconv.ParseUint(src, 10, int(unsafe.Sizeof(*ptr)*8)) + if err == nil { + *ptr = uint8(i) + } + return err + case *uint16: + i, err := strconv.ParseUint(src, 10, int(unsafe.Sizeof(*ptr)*8)) + if err == nil { + *ptr = uint16(i) + } + return err + case *uint32: + i, err := strconv.ParseUint(src, 10, int(unsafe.Sizeof(*ptr)*8)) + if err == nil { + *ptr = uint32(i) + } + return err + case *uint64: + i, err := strconv.ParseUint(src, 10, int(unsafe.Sizeof(*ptr)*8)) + if err == nil { + *ptr = i + } + return err + case *int8: + i, err := strconv.ParseInt(src, 10, int(unsafe.Sizeof(*ptr)*8)) + if err == nil { + *ptr = int8(i) + } + return err + case *int16: + i, err := strconv.ParseInt(src, 10, int(unsafe.Sizeof(*ptr)*8)) + if err == nil { + *ptr = int16(i) + } + return err + case *int32: + i, err := strconv.ParseInt(src, 10, int(unsafe.Sizeof(*ptr)*8)) + if err == nil { + *ptr = int32(i) + } + return err + case *int64: + i, err := strconv.ParseInt(src, 10, int(unsafe.Sizeof(*ptr)*8)) + if err == nil { + *ptr = i + } + return err + case *float32: + f, err := strconv.ParseFloat(src, int(unsafe.Sizeof(*ptr)*8)) + if err == nil { + *ptr = float32(f) + } + return err + case *float64: + f, err := strconv.ParseFloat(src, int(unsafe.Sizeof(*ptr)*8)) + if err == nil { + *ptr = f + } + return err + default: + panic(fmt.Sprintf("unsupported type: %T", dest)) + } +} From f2adf2e8c1968107d4a3645f6879d4b18e87321e Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 1 Mar 2021 13:55:01 +0100 Subject: [PATCH 02/17] Introduce types.UUID --- pkg/types/uuid.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 pkg/types/uuid.go diff --git a/pkg/types/uuid.go b/pkg/types/uuid.go new file mode 100644 index 00000000..02acbcdb --- /dev/null +++ b/pkg/types/uuid.go @@ -0,0 +1,24 @@ +package types + +import ( + "database/sql/driver" + "encoding" + "github.com/google/uuid" +) + +// UUID is like uuid.UUID, but marshals itself binarily (not like xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) in SQL context. +type UUID struct { + uuid.UUID +} + +// Value implements driver.Valuer. +func (uuid UUID) Value() (driver.Value, error) { + return uuid.UUID[:], nil +} + +// Assert interface compliance. +var ( + _ encoding.TextUnmarshaler = (*UUID)(nil) + _ driver.Valuer = UUID{} + _ driver.Valuer = (*UUID)(nil) +) From c86a8df20b153a8069e6af599b9037215cbcc0e7 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 23 Mar 2021 19:19:21 +0100 Subject: [PATCH 03/17] Introduce types.Float --- pkg/types/float.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 pkg/types/float.go diff --git a/pkg/types/float.go b/pkg/types/float.go new file mode 100644 index 00000000..eaa72c63 --- /dev/null +++ b/pkg/types/float.go @@ -0,0 +1,66 @@ +package types + +import ( + "bytes" + "database/sql" + "database/sql/driver" + "encoding" + "encoding/json" + "strconv" +) + +// Float adds JSON support to sql.NullFloat64. +type Float struct { + sql.NullFloat64 +} + +// MarshalJSON implements the json.Marshaler interface. +// Supports JSON null. +func (f Float) MarshalJSON() ([]byte, error) { + var v interface{} + if f.Valid { + v = f.Float64 + } + + return json.Marshal(v) +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (f *Float) UnmarshalText(text []byte) error { + parsed, err := strconv.ParseFloat(string(text), 64) + if err != nil { + return err + } + + *f = Float{sql.NullFloat64{ + Float64: parsed, + Valid: true, + }} + + return nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +// Supports JSON null. +func (f *Float) UnmarshalJSON(data []byte) error { + // Ignore null, like in the main JSON package. + if bytes.HasPrefix(data, []byte{'n'}) { + return nil + } + + err := json.Unmarshal(data, &f.Float64) + if err == nil { + f.Valid = true + } + + return err +} + +// Assert interface compliance. +var ( + _ json.Marshaler = Float{} + _ encoding.TextUnmarshaler = (*Float)(nil) + _ json.Unmarshaler = (*Float)(nil) + _ driver.Valuer = Float{} + _ sql.Scanner = (*Float)(nil) +) From ba69cef6cb7d15162ec8b3375ad0646394116708 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 16 Mar 2021 17:07:13 +0100 Subject: [PATCH 04/17] Introduce types.NotificationType --- pkg/types/notification_type.go | 73 ++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 pkg/types/notification_type.go diff --git a/pkg/types/notification_type.go b/pkg/types/notification_type.go new file mode 100644 index 00000000..4d864b24 --- /dev/null +++ b/pkg/types/notification_type.go @@ -0,0 +1,73 @@ +package types + +import ( + "database/sql/driver" + "encoding" + "fmt" + "strconv" +) + +// NotificationType specifies the reason of a sent notification. +type NotificationType uint16 + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (nt *NotificationType) UnmarshalText(bytes []byte) error { + text := string(bytes) + + i, err := strconv.ParseUint(text, 10, 64) + if err != nil { + return err + } + + n := NotificationType(i) + if uint64(n) != i { + // Truncated due to above cast, obviously too high + return BadNotificationType{text} + } + + if _, ok := notificationTypes[n]; !ok { + return BadNotificationType{text} + } + + *nt = n + return nil +} + +// Value implements the driver.Valuer interface. +func (nt NotificationType) Value() (driver.Value, error) { + if v, ok := notificationTypes[nt]; ok { + return v, nil + } else { + return nil, BadNotificationType{nt} + } +} + +// BadNotificationType complains about a syntactically, but not semantically valid NotificationType. +type BadNotificationType struct { + Type interface{} +} + +// Error implements the error interface. +func (bnt BadNotificationType) Error() string { + return fmt.Sprintf("bad notification type: %#v", bnt.Type) +} + +// notificationTypes maps all valid NotificationType values to their SQL representation. +var notificationTypes = map[NotificationType]string{ + 1: "downtime_start", + 2: "downtime_end", + 4: "downtime_removed", + 8: "custom", + 16: "acknowledgement", + 32: "problem", + 64: "recovery", + 128: "flapping_start", + 256: "flapping_end", +} + +// Assert interface compliance. +var ( + _ error = BadNotificationType{} + _ encoding.TextUnmarshaler = (*NotificationType)(nil) + _ driver.Valuer = NotificationType(0) +) From fb4fd4c9645570fe880f9a875b974f5032292372 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 23 Mar 2021 13:44:34 +0100 Subject: [PATCH 05/17] Introduce types.StateType --- pkg/types/state_type.go | 66 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 pkg/types/state_type.go diff --git a/pkg/types/state_type.go b/pkg/types/state_type.go new file mode 100644 index 00000000..e2539ba9 --- /dev/null +++ b/pkg/types/state_type.go @@ -0,0 +1,66 @@ +package types + +import ( + "database/sql/driver" + "encoding" + "fmt" + "strconv" +) + +// StateType specifies a state's hardness. +type StateType uint8 + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (st *StateType) UnmarshalText(bytes []byte) error { + text := string(bytes) + + i, err := strconv.ParseUint(text, 10, 64) + if err != nil { + return err + } + + s := StateType(i) + if uint64(s) != i { + // Truncated due to above cast, obviously too high + return BadStateType{text} + } + + if _, ok := stateTypes[s]; !ok { + return BadStateType{text} + } + + *st = s + return nil +} + +// Value implements the driver.Valuer interface. +func (st StateType) Value() (driver.Value, error) { + if v, ok := stateTypes[st]; ok { + return v, nil + } else { + return nil, BadStateType{st} + } +} + +// BadStateType complains about a syntactically, but not semantically valid StateType. +type BadStateType struct { + Type interface{} +} + +// Error implements the error interface. +func (bst BadStateType) Error() string { + return fmt.Sprintf("bad state type: %#v", bst.Type) +} + +// stateTypes maps all valid StateType values to their SQL representation. +var stateTypes = map[StateType]string{ + 0: "soft", + 1: "hard", +} + +// Assert interface compliance. +var ( + _ error = BadStateType{} + _ encoding.TextUnmarshaler = (*StateType)(nil) + _ driver.Valuer = StateType(0) +) From bfd22b1f3914113d5fa850aef03efdb227af6cb2 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 7 Apr 2021 11:56:39 +0200 Subject: [PATCH 06/17] Make types.Bool an encoding.TextUnmarshaler --- pkg/types/bool.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/pkg/types/bool.go b/pkg/types/bool.go index 9f8202c2..f711fd08 100644 --- a/pkg/types/bool.go +++ b/pkg/types/bool.go @@ -3,8 +3,10 @@ package types import ( "database/sql" "database/sql/driver" + "encoding" "encoding/json" "errors" + "strconv" ) var ( @@ -41,6 +43,17 @@ func (b Bool) MarshalJSON() ([]byte, error) { return json.Marshal(b.Bool) } +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (b *Bool) UnmarshalText(text []byte) error { + parsed, err := strconv.ParseUint(string(text), 10, 64) + if err != nil { + return err + } + + *b = Bool{parsed != 0, true} + return nil +} + // UnmarshalJSON implements the json.Unmarshaler interface. func (b *Bool) UnmarshalJSON(data []byte) error { if string(data) == "null" || len(data) == 0 { @@ -95,8 +108,9 @@ func (b Bool) Value() (driver.Value, error) { // Assert interface compliance. var ( - _ json.Marshaler = (*Bool)(nil) - _ json.Unmarshaler = (*Bool)(nil) - _ sql.Scanner = (*Bool)(nil) - _ driver.Valuer = (*Bool)(nil) + _ json.Marshaler = (*Bool)(nil) + _ encoding.TextUnmarshaler = (*Bool)(nil) + _ json.Unmarshaler = (*Bool)(nil) + _ sql.Scanner = (*Bool)(nil) + _ driver.Valuer = (*Bool)(nil) ) From 270dfa6159430bc5e57cdb003755159af0f30645 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 7 Apr 2021 11:59:29 +0200 Subject: [PATCH 07/17] Make types.String an encoding.TextUnmarshaler --- pkg/types/string.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/types/string.go b/pkg/types/string.go index 9b2643ce..ca2432b4 100644 --- a/pkg/types/string.go +++ b/pkg/types/string.go @@ -4,6 +4,7 @@ import ( "bytes" "database/sql" "database/sql/driver" + "encoding" "encoding/json" ) @@ -23,6 +24,16 @@ func (s String) MarshalJSON() ([]byte, error) { return json.Marshal(v) } +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (s *String) UnmarshalText(text []byte) error { + *s = String{sql.NullString{ + String: string(text), + Valid: true, + }} + + return nil +} + // UnmarshalJSON implements the json.Unmarshaler interface. // Supports JSON null. func (s *String) UnmarshalJSON(data []byte) error { @@ -41,8 +52,9 @@ func (s *String) UnmarshalJSON(data []byte) error { // Assert interface compliance. var ( - _ json.Marshaler = String{} - _ json.Unmarshaler = (*String)(nil) - _ driver.Valuer = String{} - _ sql.Scanner = (*String)(nil) + _ json.Marshaler = String{} + _ encoding.TextUnmarshaler = (*String)(nil) + _ json.Unmarshaler = (*String)(nil) + _ driver.Valuer = String{} + _ sql.Scanner = (*String)(nil) ) From 581cac27aad06f940e936837b3f4e32dbb73d3ed Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 7 Apr 2021 12:03:13 +0200 Subject: [PATCH 08/17] Make types.UnixMilli an encoding.TextUnmarshaler --- pkg/types/unix_milli.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/pkg/types/unix_milli.go b/pkg/types/unix_milli.go index 5b38335b..f3d24e4f 100644 --- a/pkg/types/unix_milli.go +++ b/pkg/types/unix_milli.go @@ -3,6 +3,7 @@ package types import ( "database/sql" "database/sql/driver" + "encoding" "encoding/json" "errors" "github.com/icinga/icingadb/pkg/utils" @@ -28,6 +29,17 @@ func (t UnixMilli) MarshalJSON() ([]byte, error) { return []byte(strconv.FormatInt(utils.UnixMilli(time.Time(t)), 10)), nil } +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (t *UnixMilli) UnmarshalText(text []byte) error { + parsed, err := strconv.ParseFloat(string(text), 64) + if err != nil { + return err + } + + *t = UnixMilli(utils.FromUnixMilli(int64(parsed))) + return nil +} + // UnmarshalJSON implements the json.Unmarshaler interface. // Unmarshals from milliseconds. Supports JSON null. func (t *UnixMilli) UnmarshalJSON(data []byte) error { @@ -74,8 +86,9 @@ func (t UnixMilli) Value() (driver.Value, error) { // Assert interface compliance. var ( - _ json.Marshaler = (*UnixMilli)(nil) - _ json.Unmarshaler = (*UnixMilli)(nil) - _ sql.Scanner = (*UnixMilli)(nil) - _ driver.Valuer = (*UnixMilli)(nil) + _ json.Marshaler = (*UnixMilli)(nil) + _ encoding.TextUnmarshaler = (*UnixMilli)(nil) + _ json.Unmarshaler = (*UnixMilli)(nil) + _ sql.Scanner = (*UnixMilli)(nil) + _ driver.Valuer = (*UnixMilli)(nil) ) From b13d2c3cd778f1db2cb95052ea70c8e1e705b947 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 18 Mar 2021 14:06:25 +0100 Subject: [PATCH 09/17] Restrict Bulker to contracts.Entity --- pkg/com/bulker.go | 17 +++++++++-------- pkg/icingadb/db.go | 16 ++++++++++------ 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go index b13e680e..3118e80a 100644 --- a/pkg/com/bulker.go +++ b/pkg/com/bulker.go @@ -2,21 +2,22 @@ package com import ( "context" + "github.com/icinga/icingadb/pkg/contracts" "golang.org/x/sync/errgroup" "sync" "time" ) type Bulker struct { - ch chan []interface{} + ch chan []contracts.Entity ctx context.Context mu sync.Mutex err error } -func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker { +func NewBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *Bulker { b := &Bulker{ - ch: make(chan []interface{}), + ch: make(chan []contracts.Entity), ctx: ctx, mu: sync.Mutex{}, } @@ -27,14 +28,14 @@ func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker { } // Bulk returns the channel on which the bulks are delivered. -func (b *Bulker) Bulk() <-chan []interface{} { +func (b *Bulker) Bulk() <-chan []contracts.Entity { return b.ch } -func (b *Bulker) run(ch <-chan interface{}, count int) { +func (b *Bulker) run(ch <-chan contracts.Entity, count int) { defer close(b.ch) - bufCh := make(chan interface{}, count) + bufCh := make(chan contracts.Entity, count) g, ctx := errgroup.WithContext(b.ctx) g.Go(func() error { @@ -56,7 +57,7 @@ func (b *Bulker) run(ch <-chan interface{}, count int) { g.Go(func() error { for done := false; !done; { - buf := make([]interface{}, 0, count) + buf := make([]contracts.Entity, 0, count) timeout := time.After(256 * time.Millisecond) for drain := true; drain && len(buf) < count; { @@ -90,6 +91,6 @@ func (b *Bulker) run(ch <-chan interface{}, count int) { _ = g.Wait() } -func Bulk(ctx context.Context, ch <-chan interface{}, count int) <-chan []interface{} { +func Bulk(ctx context.Context, ch <-chan contracts.Entity, count int) <-chan []contracts.Entity { return NewBulker(ctx, ch, count).Bulk() } diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 6e542a1c..c6306a41 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -158,7 +158,9 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i return g.Wait() } -func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurrent int, arg chan interface{}) error { +func (db DB) NamedBulkExec( + ctx context.Context, query string, count int, concurrent int, arg chan contracts.Entity, +) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) bulk := com.Bulk(ctx, arg, count) @@ -186,7 +188,7 @@ func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurr return err } - g.Go(func(b []interface{}) func() error { + g.Go(func(b []contracts.Entity) func() error { return func() error { defer sem.Release(1) @@ -219,7 +221,9 @@ func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurr return g.Wait() } -func (db DB) NamedBulkExecTx(ctx context.Context, query string, count int, concurrent int, arg chan interface{}) error { +func (db DB) NamedBulkExecTx( + ctx context.Context, query string, count int, concurrent int, arg chan contracts.Entity, +) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) bulk := com.Bulk(ctx, arg, count) @@ -243,7 +247,7 @@ func (db DB) NamedBulkExecTx(ctx context.Context, query string, count int, concu return err } - g.Go(func(b []interface{}) func() error { + g.Go(func(b []contracts.Entity) func() error { return func() error { defer sem.Release(1) @@ -335,7 +339,7 @@ func (db DB) Create(ctx context.Context, entities <-chan contracts.Entity) error return nil } // Buffer of one because we receive an entity and send it back immediately. - inserts := make(chan interface{}, 1) + inserts := make(chan contracts.Entity, 1) inserts <- entity go func() { @@ -360,7 +364,7 @@ func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error return nil } // Buffer of one because we receive an entity and send it back immediately. - updates := make(chan interface{}, 1) + updates := make(chan contracts.Entity, 1) updates <- entity go func() { From df0124de09a549237ef74402faf1dd6897cb16fe Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 2 Mar 2021 17:00:42 +0100 Subject: [PATCH 10/17] DB#NamedBulkExec(): optionally inform about successfully inserted rows --- pkg/icingadb/db.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index c6306a41..b2ca4ce8 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -159,7 +159,8 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i } func (db DB) NamedBulkExec( - ctx context.Context, query string, count int, concurrent int, arg chan contracts.Entity, + ctx context.Context, query string, count int, concurrent int, + arg chan contracts.Entity, succeeded chan<- contracts.Entity, ) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) @@ -206,6 +207,16 @@ func (db DB) NamedBulkExec( cnt.Add(uint64(len(b))) + if succeeded != nil { + for _, row := range b { + select { + case <-ctx.Done(): + return ctx.Err() + case succeeded <- row: + } + } + } + return nil }, IsRetryable, @@ -354,7 +365,7 @@ func (db DB) Create(ctx context.Context, entities <-chan contracts.Entity) error } }() - return db.NamedBulkExec(ctx, db.BuildInsertStmt(entity), 1<<15/len(db.BuildColumns(entity)), 1<<3, inserts) + return db.NamedBulkExec(ctx, db.BuildInsertStmt(entity), 1<<15/len(db.BuildColumns(entity)), 1<<3, inserts, nil) } func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error { From a9a4450068b7f1d7880d797ab9d8f9f133438212 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 18 Mar 2021 15:55:06 +0100 Subject: [PATCH 11/17] DB#NamedBulkExec*(): reduce channel requirements --- pkg/icingadb/db.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index b2ca4ce8..471139bc 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -160,7 +160,7 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i func (db DB) NamedBulkExec( ctx context.Context, query string, count int, concurrent int, - arg chan contracts.Entity, succeeded chan<- contracts.Entity, + arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, ) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) @@ -233,7 +233,7 @@ func (db DB) NamedBulkExec( } func (db DB) NamedBulkExecTx( - ctx context.Context, query string, count int, concurrent int, arg chan contracts.Entity, + ctx context.Context, query string, count int, concurrent int, arg <-chan contracts.Entity, ) error { var cnt com.Counter g, ctx := errgroup.WithContext(ctx) From 500dbac66a3c2da2a63c71513315d22870c5b213 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 9 Apr 2021 11:24:38 +0200 Subject: [PATCH 12/17] Deduplicate DB#Create() and DB#Update() --- pkg/com/com.go | 46 ++++++++++++++++++++++++++++++++++++++++++++ pkg/icingadb/db.go | 48 ++++++++-------------------------------------- 2 files changed, 54 insertions(+), 40 deletions(-) diff --git a/pkg/com/com.go b/pkg/com/com.go index 25e84934..9f163021 100644 --- a/pkg/com/com.go +++ b/pkg/com/com.go @@ -1,6 +1,7 @@ package com import ( + "context" "github.com/icinga/icingadb/pkg/contracts" "golang.org/x/sync/errgroup" ) @@ -44,3 +45,48 @@ func PipeError(in <-chan error, out chan<- error) { } }() } + +// CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item. +func CopyFirst( + ctx context.Context, input <-chan contracts.Entity, +) (first contracts.Entity, forward <-chan contracts.Entity, err error) { + var ok bool + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case first, ok = <-input: + } + + if !ok { + return + } + + // Buffer of one because we receive an entity and send it back immediately. + fwd := make(chan contracts.Entity, 1) + fwd <- first + + forward = fwd + + go func() { + defer close(fwd) + + for { + select { + case <-ctx.Done(): + return + case e, ok := <-input: + if !ok { + return + } + + select { + case <-ctx.Done(): + return + case fwd <- e: + } + } + } + }() + + return +} diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 471139bc..a1d49779 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -344,53 +344,21 @@ func (db DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFu } func (db DB) Create(ctx context.Context, entities <-chan contracts.Entity) error { - // TODO(el): Check ctx.Done()? - entity := <-entities - if entity == nil { - return nil + first, forward, err := com.CopyFirst(ctx, entities) + if first == nil { + return err } - // Buffer of one because we receive an entity and send it back immediately. - inserts := make(chan contracts.Entity, 1) - inserts <- entity - go func() { - defer close(inserts) - - for e := range entities { - select { - case inserts <- e: - case <-ctx.Done(): - return - } - } - }() - - return db.NamedBulkExec(ctx, db.BuildInsertStmt(entity), 1<<15/len(db.BuildColumns(entity)), 1<<3, inserts, nil) + return db.NamedBulkExec(ctx, db.BuildInsertStmt(first), 1<<15/len(db.BuildColumns(first)), 1<<3, forward, nil) } func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error { - // TODO(el): Check ctx.Done()? - entity := <-entities - if entity == nil { - return nil + first, forward, err := com.CopyFirst(ctx, entities) + if first == nil { + return err } - // Buffer of one because we receive an entity and send it back immediately. - updates := make(chan contracts.Entity, 1) - updates <- entity - go func() { - defer close(updates) - - for e := range entities { - select { - case updates <- e: - case <-ctx.Done(): - return - } - } - }() - - return db.NamedBulkExecTx(ctx, db.BuildUpdateStmt(entity), 1<<15, 1<<3, updates) + return db.NamedBulkExecTx(ctx, db.BuildUpdateStmt(first), 1<<15, 1<<3, forward) } func IsRetryable(err error) bool { From ebfabaffc2cf774d725e543a136a6575c8e38922 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 18 Mar 2021 16:03:09 +0100 Subject: [PATCH 13/17] Introduce DB#Upsert() --- pkg/icingadb/db.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index a1d49779..84a6e6d7 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -352,6 +352,17 @@ func (db DB) Create(ctx context.Context, entities <-chan contracts.Entity) error return db.NamedBulkExec(ctx, db.BuildInsertStmt(first), 1<<15/len(db.BuildColumns(first)), 1<<3, forward, nil) } +func (db DB) Upsert(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error { + first, forward, err := com.CopyFirst(ctx, entities) + if first == nil { + return err + } + + return db.NamedBulkExec( + ctx, db.BuildUpsertStmt(first), 1<<15/len(db.BuildColumns(first))/2, 1<<3, forward, succeeded, + ) +} + func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { From 4e87ca6de3bfc9d34cf75e596f8b7f78016f4432 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 24 Mar 2021 12:59:26 +0100 Subject: [PATCH 14/17] DB#Upsert(): allow to update not all columns --- pkg/contracts/contracts.go | 6 ++++++ pkg/icingadb/db.go | 27 +++++++++++++++++---------- pkg/icingadb/ha.go | 5 ++++- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/pkg/contracts/contracts.go b/pkg/contracts/contracts.go index 44298677..f08a752e 100644 --- a/pkg/contracts/contracts.go +++ b/pkg/contracts/contracts.go @@ -60,3 +60,9 @@ type Waiter interface { type Initer interface { Init() // Init initializes the object. } + +// Upserter implements the Upsert method, +// which returns a part of the object for ON DUPLICATE KEY UPDATE. +type Upserter interface { + Upsert() interface{} // Upsert partitions the object. +} diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 84a6e6d7..b6a2ada6 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -86,21 +86,29 @@ func (db DB) BuildUpdateStmt(update interface{}) string { ) } -func (db DB) BuildUpsertStmt(subject interface{}) string { - columns := db.BuildColumns(subject) - set := make([]string, 0, len(columns)) +func (db DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) { + insertColumns := db.BuildColumns(subject) + var updateColumns []string - for _, col := range columns { + if upserter, ok := subject.(contracts.Upserter); ok { + updateColumns = db.BuildColumns(upserter.Upsert()) + } else { + updateColumns = insertColumns + } + + set := make([]string, 0, len(updateColumns)) + + for _, col := range updateColumns { set = append(set, fmt.Sprintf("%s = :%s", col, col)) } return fmt.Sprintf( `INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s`, utils.Key(utils.Name(subject), '_'), - strings.Join(columns, ","), - fmt.Sprintf(":%s", strings.Join(columns, ",:")), + strings.Join(insertColumns, ","), + fmt.Sprintf(":%s", strings.Join(insertColumns, ",:")), strings.Join(set, ","), - ) + ), len(insertColumns) + len(updateColumns) } func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent int, args []interface{}) error { @@ -358,9 +366,8 @@ func (db DB) Upsert(ctx context.Context, entities <-chan contracts.Entity, succe return err } - return db.NamedBulkExec( - ctx, db.BuildUpsertStmt(first), 1<<15/len(db.BuildColumns(first))/2, 1<<3, forward, succeeded, - ) + stmt, placeholders := db.BuildUpsertStmt(first) + return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, 1<<3, forward, succeeded) } func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error { diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 1d31a6c5..bb119480 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -186,7 +186,10 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled, Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled, } - _, err = tx.NamedExecContext(ctx, h.db.BuildUpsertStmt(i), i) + + stmt, _ := h.db.BuildUpsertStmt(i) + _, err = tx.NamedExecContext(ctx, stmt, i) + if err != nil { cancel() if !utils.IsDeadlock(err) { From 3fbc9fa25fad2f492ebac370738cee4e0a7683a4 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 25 Mar 2021 16:51:02 +0100 Subject: [PATCH 15/17] DB#Upsert(): workaround jmoiron/sqlx#694 --- pkg/icingadb/db.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index b6a2ada6..f83e368f 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -366,8 +366,11 @@ func (db DB) Upsert(ctx context.Context, entities <-chan contracts.Entity, succe return err } - stmt, placeholders := db.BuildUpsertStmt(first) - return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, 1<<3, forward, succeeded) + // TODO(ak): wait for https://github.com/jmoiron/sqlx/issues/694 + //stmt, placeholders := db.BuildUpsertStmt(first) + //return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, 1<<3, forward, succeeded) + stmt, _ := db.BuildUpsertStmt(first) + return db.NamedBulkExec(ctx, stmt, 1, 1<<3, forward, succeeded) } func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error { From 9aa1070db0714819fd01674122779274b863c6af Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 6 Apr 2021 16:15:37 +0200 Subject: [PATCH 16/17] Introduce contracts.TableNamer --- pkg/contracts/contracts.go | 6 ++++++ pkg/icingadb/db.go | 10 +++++----- pkg/utils/utils.go | 9 +++++++++ 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pkg/contracts/contracts.go b/pkg/contracts/contracts.go index f08a752e..037f82bd 100644 --- a/pkg/contracts/contracts.go +++ b/pkg/contracts/contracts.go @@ -66,3 +66,9 @@ type Initer interface { type Upserter interface { Upsert() interface{} // Upsert partitions the object. } + +// TableNamer implements the TableName method, +// which returns the table of the object. +type TableNamer interface { + TableName() string // TableName tells the table. +} diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index f83e368f..80a4592d 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -48,7 +48,7 @@ func (db DB) BuildColumns(subject interface{}) []string { func (db DB) BuildDeleteStmt(from interface{}) string { return fmt.Sprintf( `DELETE FROM %s WHERE id IN (?)`, - utils.Key(utils.Name(from), '_'), + utils.TableName(from), ) } @@ -57,7 +57,7 @@ func (db DB) BuildInsertStmt(into interface{}) string { return fmt.Sprintf( `INSERT INTO %s (%s) VALUES (%s)`, - utils.Key(utils.Name(into), '_'), + utils.TableName(into), strings.Join(columns, ", "), fmt.Sprintf(":%s", strings.Join(columns, ", :")), ) @@ -67,7 +67,7 @@ func (db DB) BuildSelectStmt(from interface{}, into interface{}) string { return fmt.Sprintf( `SELECT %s FROM %s`, strings.Join(db.BuildColumns(into), ", "), - utils.Key(utils.Name(from), '_'), + utils.TableName(from), ) } @@ -81,7 +81,7 @@ func (db DB) BuildUpdateStmt(update interface{}) string { return fmt.Sprintf( `UPDATE %s SET %s WHERE id = :id`, - utils.Key(utils.Name(update), '_'), + utils.TableName(update), strings.Join(set, ", "), ) } @@ -104,7 +104,7 @@ func (db DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int return fmt.Sprintf( `INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s`, - utils.Key(utils.Name(subject), '_'), + utils.TableName(subject), strings.Join(insertColumns, ","), fmt.Sprintf(":%s", strings.Join(insertColumns, ",:")), strings.Join(set, ","), diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index ae8e537b..e0752043 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -42,6 +42,15 @@ func Name(t interface{}) string { return s[strings.LastIndex(s, ".")+1:] } +// TableName returns the table of t. +func TableName(t interface{}) string { + if tn, ok := t.(contracts.TableNamer); ok { + return tn.TableName() + } else { + return Key(Name(t), '_') + } +} + // Key returns the name with all Unicode letters mapped to lower case letters, // with an additional separator in front of each original upper case letter. func Key(name string, sep byte) string { From 7ccf627df6ec2a902af1eb014645a05bacc2542e Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 9 Apr 2021 11:17:37 +0200 Subject: [PATCH 17/17] Sync history --- cmd/icingadb/main.go | 6 + pkg/icingadb/history/sync.go | 300 ++++++++++++++++++++++++ pkg/icingadb/v1/history/ack.go | 82 +++++++ pkg/icingadb/v1/history/comment.go | 120 ++++++++++ pkg/icingadb/v1/history/downtime.go | 119 ++++++++++ pkg/icingadb/v1/history/flapping.go | 80 +++++++ pkg/icingadb/v1/history/meta.go | 89 +++++++ pkg/icingadb/v1/history/notification.go | 45 ++++ pkg/icingadb/v1/history/state.go | 40 ++++ 9 files changed, 881 insertions(+) create mode 100644 pkg/icingadb/history/sync.go create mode 100644 pkg/icingadb/v1/history/ack.go create mode 100644 pkg/icingadb/v1/history/comment.go create mode 100644 pkg/icingadb/v1/history/downtime.go create mode 100644 pkg/icingadb/v1/history/flapping.go create mode 100644 pkg/icingadb/v1/history/meta.go create mode 100644 pkg/icingadb/v1/history/notification.go create mode 100644 pkg/icingadb/v1/history/state.go diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 7f2ded72..32101737 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/flatten" "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/icingadb/history" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/utils" @@ -40,6 +41,7 @@ func main() { heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger) ha := icingadb.NewHA(ctx, db, heartbeat, logger) s := icingadb.NewSync(db, rc, logger) + hs := history.NewSync(db, rc, logger) // For temporary exit after sync done := make(chan struct{}, 0) @@ -186,6 +188,10 @@ func main() { }) } + g.Go(func() error { + return hs.Sync(synctx) + }) + if err := g.Wait(); err != nil { // TODO(el): This panics here even if a ctx gets cancelled. // That is intentional for the moment for testing. diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go new file mode 100644 index 00000000..3673ad69 --- /dev/null +++ b/pkg/icingadb/history/sync.go @@ -0,0 +1,300 @@ +package history + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/structify" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "reflect" + "sync" + "time" +) + +// Sync specifies the source and destination of a history sync. +type Sync struct { + db *icingadb.DB + redis *icingaredis.Client + logger *zap.SugaredLogger +} + +// NewSync creates a new Sync. +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync { + return &Sync{ + db: db, + redis: redis, + logger: logger, + } +} + +// insertedMessage represents a just inserted row. +type insertedMessage struct { + // redisId specifies the origin Redis message. + redisId string + // structType represents the table the row was inserted into. + structType reflect.Type +} + +const bulkSize = 1 << 14 + +// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. +func (s Sync) Sync(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + + for _, hs := range historyStreams { + var redis2structs []chan<- redis.XMessage + insertedMessages := make(chan insertedMessage, bulkSize) + + // messageProgress are the tables (represented by struct types) + // with successfully inserted rows by Redis message ID. + messageProgress := map[string]map[reflect.Type]struct{}{} + messageProgressMtx := &sync.Mutex{} + + stream := "icinga:history:stream:" + hs.kind + s.logger.Infof("Syncing %s history", hs.kind) + + for _, structifier := range hs.structifiers { + redis2struct := make(chan redis.XMessage, bulkSize) + struct2db := make(chan contracts.Entity, bulkSize) + succeeded := make(chan contracts.Entity, bulkSize) + + // rowIds are IDs of to be synced Redis messages by database row. + rowIds := map[contracts.Entity]string{} + rowIdsMtx := &sync.Mutex{} + + redis2structs = append(redis2structs, redis2struct) + + g.Go(structifyStream(ctx, structifier, redis2struct, struct2db, rowIds, rowIdsMtx)) + g.Go(fwdSucceeded(ctx, insertedMessages, succeeded, rowIds, rowIdsMtx)) + + // Upserts from struct2db. + g.Go(func() error { + defer close(succeeded) + return s.db.Upsert(ctx, struct2db, succeeded) + }) + } + + g.Go(s.xRead(ctx, redis2structs, stream)) + g.Go(s.cleanup(ctx, hs, insertedMessages, messageProgress, messageProgressMtx, stream)) + } + + return g.Wait() +} + +// xRead reads from the Redis stream and broadcasts the data to redis2structs. +func (s Sync) xRead(ctx context.Context, redis2structs []chan<- redis.XMessage, stream string) func() error { + return func() error { + defer func() { + for _, r2s := range redis2structs { + close(r2s) + } + }() + + xra := &redis.XReadArgs{ + Streams: []string{stream, "0-0"}, + Count: bulkSize, + Block: 10 * time.Second, + } + + for { + streams, err := s.redis.XRead(ctx, xra).Result() + if err != nil && err != redis.Nil { + return err + } + + for _, stream := range streams { + for _, message := range stream.Messages { + xra.Streams[1] = message.ID + + for _, r2s := range redis2structs { + select { + case <-ctx.Done(): + return ctx.Err() + case r2s <- message: + } + } + } + } + } + } +} + +// structifyStream structifies from redis2struct to struct2db. +func structifyStream( + ctx context.Context, structifier structify.MapStructifier, redis2struct <-chan redis.XMessage, + struct2db chan<- contracts.Entity, rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex, +) func() error { + return func() error { + defer close(struct2db) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case message, ok := <-redis2struct: + if !ok { + return nil + } + + ptr, err := structifier(message.Values) + if err != nil { + return err + } + + ue := ptr.(v1.UpserterEntity) + + rowIdsMtx.Lock() + rowIds[ue] = message.ID + rowIdsMtx.Unlock() + + select { + case <-ctx.Done(): + return ctx.Err() + case struct2db <- ue: + } + } + } + } +} + +// fwdSucceeded informs insertedMessages about successfully inserted rows according to succeeded. +func fwdSucceeded( + ctx context.Context, insertedMessages chan<- insertedMessage, succeeded <-chan contracts.Entity, + rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex, +) func() error { + return func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case row, ok := <-succeeded: + if !ok { + return nil + } + + rowIdsMtx.Lock() + + id, ok := rowIds[row] + if ok { + delete(rowIds, row) + } + + rowIdsMtx.Unlock() + + if ok { + select { + case <-ctx.Done(): + return ctx.Err() + case insertedMessages <- insertedMessage{id, reflect.TypeOf(row).Elem()}: + } + } + } + } + } +} + +// cleanup collects completely inserted messages from insertedMessages and deletes them from Redis. +func (s Sync) cleanup( + ctx context.Context, hs historyStream, insertedMessages <-chan insertedMessage, + messageProgress map[string]map[reflect.Type]struct{}, messageProgressMtx *sync.Mutex, stream string, +) func() error { + return func() error { + var ids []string + var count uint64 + var timeout <-chan time.Time + + const period = 20 * time.Second + periodically := time.NewTicker(period) + defer periodically.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-periodically.C: + if count > 0 { + s.logger.Infof("Inserted %d %s history entries in the last %s", count, hs.kind, period) + count = 0 + } + case msg := <-insertedMessages: + messageProgressMtx.Lock() + + mp, ok := messageProgress[msg.redisId] + if !ok { + mp = map[reflect.Type]struct{}{} + messageProgress[msg.redisId] = mp + } + + mp[msg.structType] = struct{}{} + + if ok = len(mp) == len(hs.structifiers); ok { + delete(messageProgress, msg.redisId) + } + + messageProgressMtx.Unlock() + + if ok { + ids = append(ids, msg.redisId) + count++ + + switch len(ids) { + case 1: + timeout = time.After(time.Second / 4) + case bulkSize: + if _, err := s.redis.XDel(ctx, stream, ids...).Result(); err != nil { + return err + } + + ids = nil + timeout = nil + } + } + case <-timeout: + if _, err := s.redis.XDel(ctx, stream, ids...).Result(); err != nil { + return err + } + + ids = nil + timeout = nil + } + } + } +} + +// historyStream represents a Redis history stream. +type historyStream struct { + // kind specifies the stream's purpose. + kind string + // structifiers lists the factories of the model structs the stream data shall be copied to. + structifiers []structify.MapStructifier +} + +// historyStreams contains all Redis history streams to sync. +var historyStreams = func() []historyStream { + var streams []historyStream + for _, rhs := range []struct { + kind string + structPtrs []v1.UpserterEntity + }{ + {"notification", []v1.UpserterEntity{(*v1.NotificationHistory)(nil), (*v1.HistoryNotification)(nil)}}, + {"usernotification", []v1.UpserterEntity{(*v1.UserNotificationHistory)(nil)}}, + {"state", []v1.UpserterEntity{(*v1.StateHistory)(nil), (*v1.HistoryState)(nil)}}, + {"downtime", []v1.UpserterEntity{(*v1.DowntimeHistory)(nil), (*v1.HistoryDowntime)(nil)}}, + {"comment", []v1.UpserterEntity{(*v1.CommentHistory)(nil), (*v1.HistoryComment)(nil)}}, + {"flapping", []v1.UpserterEntity{(*v1.FlappingHistory)(nil), (*v1.HistoryFlapping)(nil)}}, + {"acknowledgement", []v1.UpserterEntity{(*v1.AcknowledgementHistory)(nil), (*v1.HistoryAck)(nil)}}, + } { + var structifiers []structify.MapStructifier + for _, structPtr := range rhs.structPtrs { + structifiers = append(structifiers, structify.MakeMapStructifier(reflect.TypeOf(structPtr).Elem(), "json")) + } + + streams = append(streams, historyStream{rhs.kind, structifiers}) + } + + return streams +}() diff --git a/pkg/icingadb/v1/history/ack.go b/pkg/icingadb/v1/history/ack.go new file mode 100644 index 00000000..ba9a373c --- /dev/null +++ b/pkg/icingadb/v1/history/ack.go @@ -0,0 +1,82 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" +) + +type AckHistoryUpserter struct { + ClearTime types.UnixMilli `json:"clear_time"` + ClearedBy types.String `json:"cleared_by"` +} + +// Upsert implements the contracts.Upserter interface. +func (ahu *AckHistoryUpserter) Upsert() interface{} { + return ahu +} + +type AcknowledgementHistory struct { + v1.EntityWithoutChecksum `json:",inline"` + HistoryTableMeta `json:",inline"` + AckHistoryUpserter `json:",inline"` + SetTime types.UnixMilli `json:"set_time"` + Author string `json:"author"` + Comment types.String `json:"comment"` + ExpireTime types.UnixMilli `json:"expire_time"` + IsPersistent types.Bool `json:"is_persistent"` + IsSticky types.Bool `json:"is_sticky"` +} + +type HistoryAck struct { + HistoryMeta `json:",inline"` + AcknowledgementHistoryId types.Binary `json:"id"` + + // Idea: read SetTime and ClearTime from Redis and let EventTime decide which of them to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + SetTime types.UnixMilli `json:"set_time" db:"-"` + ClearTime types.UnixMilli `json:"clear_time" db:"-"` + EventTime AckEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryAck) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryAck) TableName() string { + return "history" +} + +type AckEventTime struct { + History *HistoryAck `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et AckEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "ack_set": + return et.History.SetTime.Value() + case "ack_clear": + return et.History.ClearTime.Value() + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*AcknowledgementHistory)(nil) + _ contracts.Initer = (*HistoryAck)(nil) + _ contracts.TableNamer = (*HistoryAck)(nil) + _ UpserterEntity = (*HistoryAck)(nil) + _ driver.Valuer = AckEventTime{} +) diff --git a/pkg/icingadb/v1/history/comment.go b/pkg/icingadb/v1/history/comment.go new file mode 100644 index 00000000..d3a5743a --- /dev/null +++ b/pkg/icingadb/v1/history/comment.go @@ -0,0 +1,120 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type CommentHistoryEntity struct { + CommentId types.Binary `json:"comment_id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (che CommentHistoryEntity) Fingerprint() contracts.Fingerprinter { + return che +} + +// ID implements part of the contracts.Entity interface. +func (che CommentHistoryEntity) ID() contracts.ID { + return che.CommentId +} + +// SetID implements part of the contracts.Entity interface. +func (che *CommentHistoryEntity) SetID(id contracts.ID) { + che.CommentId = id.(types.Binary) +} + +type CommentHistoryUpserter struct { + RemovedBy types.String `json:"removed_by"` + RemoveTime types.UnixMilli `json:"remove_time"` + HasBeenRemoved types.Bool `json:"has_been_removed"` +} + +// Upsert implements the contracts.Upserter interface. +func (chu *CommentHistoryUpserter) Upsert() interface{} { + return chu +} + +type CommentHistory struct { + CommentHistoryEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + CommentHistoryUpserter `json:",inline"` + EntryTime types.UnixMilli `json:"entry_time"` + Author string `json:"author"` + Comment string `json:"comment"` + EntryType types.CommentType `json:"entry_type"` + IsPersistent types.Bool `json:"is_persistent"` + IsSticky types.Bool `json:"is_sticky"` + ExpireTime types.UnixMilli `json:"expire_time"` +} + +// Init implements the contracts.Initer interface. +func (ch *CommentHistory) Init() { + ch.HasBeenRemoved = types.Bool{ + Bool: false, + Valid: true, + } +} + +type HistoryComment struct { + HistoryMeta `json:",inline"` + CommentHistoryId types.Binary `json:"comment_id"` + + // Idea: read EntryTime, RemoveTime and ExpireTime from Redis + // and let EventTime decide which of them to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + EntryTime types.UnixMilli `json:"entry_time" db:"-"` + RemoveTime types.UnixMilli `json:"remove_time" db:"-"` + ExpireTime types.UnixMilli `json:"expire_time" db:"-"` + EventTime CommentEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryComment) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryComment) TableName() string { + return "history" +} + +type CommentEventTime struct { + History *HistoryComment `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et CommentEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "comment_add": + return et.History.EntryTime.Value() + case "comment_remove": + v, err := et.History.RemoveTime.Value() + if err == nil && v == nil { + return et.History.ExpireTime.Value() + } + + return v, err + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*CommentHistoryEntity)(nil) + _ contracts.Upserter = (*CommentHistoryUpserter)(nil) + _ contracts.Initer = (*CommentHistory)(nil) + _ UpserterEntity = (*CommentHistory)(nil) + _ contracts.Initer = (*HistoryComment)(nil) + _ contracts.TableNamer = (*HistoryComment)(nil) + _ UpserterEntity = (*HistoryComment)(nil) + _ driver.Valuer = CommentEventTime{} +) diff --git a/pkg/icingadb/v1/history/downtime.go b/pkg/icingadb/v1/history/downtime.go new file mode 100644 index 00000000..aa645be2 --- /dev/null +++ b/pkg/icingadb/v1/history/downtime.go @@ -0,0 +1,119 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type DowntimeHistoryEntity struct { + DowntimeId types.Binary `json:"downtime_id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (dhe DowntimeHistoryEntity) Fingerprint() contracts.Fingerprinter { + return dhe +} + +// ID implements part of the contracts.Entity interface. +func (dhe DowntimeHistoryEntity) ID() contracts.ID { + return dhe.DowntimeId +} + +// SetID implements part of the contracts.Entity interface. +func (dhe *DowntimeHistoryEntity) SetID(id contracts.ID) { + dhe.DowntimeId = id.(types.Binary) +} + +type DowntimeHistoryUpserter struct { + CancelledBy types.String `json:"cancelled_by"` + HasBeenCancelled types.Bool `json:"has_been_cancelled"` + CancelTime types.UnixMilli `json:"cancel_time"` +} + +// Upsert implements the contracts.Upserter interface. +func (dhu *DowntimeHistoryUpserter) Upsert() interface{} { + return dhu +} + +type DowntimeHistory struct { + DowntimeHistoryEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + DowntimeHistoryUpserter `json:",inline"` + TriggeredById types.Binary `json:"triggered_by_id"` + EntryTime types.UnixMilli `json:"entry_time"` + Author string `json:"author"` + Comment string `json:"comment"` + IsFlexible types.Bool `json:"is_flexible"` + FlexibleDuration uint64 `json:"flexible_duration"` + ScheduledStartTime types.UnixMilli `json:"scheduled_start_time"` + ScheduledEndTime types.UnixMilli `json:"scheduled_end_time"` + StartTime types.UnixMilli `json:"start_time"` + EndTime types.UnixMilli `json:"end_time"` + TriggerTime types.UnixMilli `json:"trigger_time"` +} + +type HistoryDowntime struct { + HistoryMeta `json:",inline"` + DowntimeHistoryId types.Binary `json:"downtime_id"` + + // Idea: read StartTime, CancelTime, EndTime and HasBeenCancelled from Redis + // and let EventTime decide based on HasBeenCancelled which of the others to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + StartTime types.UnixMilli `json:"start_time" db:"-"` + CancelTime types.UnixMilli `json:"cancel_time" db:"-"` + EndTime types.UnixMilli `json:"end_time" db:"-"` + HasBeenCancelled types.Bool `json:"has_been_cancelled" db:"-"` + EventTime DowntimeEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryDowntime) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryDowntime) TableName() string { + return "history" +} + +type DowntimeEventTime struct { + History *HistoryDowntime `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et DowntimeEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "downtime_start": + return et.History.StartTime.Value() + case "downtime_end": + if !et.History.HasBeenCancelled.Valid { + return nil, nil + } + + if et.History.HasBeenCancelled.Bool { + return et.History.CancelTime.Value() + } else { + return et.History.EndTime.Value() + } + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*DowntimeHistoryEntity)(nil) + _ contracts.Upserter = (*DowntimeHistoryUpserter)(nil) + _ UpserterEntity = (*DowntimeHistory)(nil) + _ contracts.Initer = (*HistoryDowntime)(nil) + _ contracts.TableNamer = (*HistoryDowntime)(nil) + _ UpserterEntity = (*HistoryDowntime)(nil) + _ driver.Valuer = DowntimeEventTime{} +) diff --git a/pkg/icingadb/v1/history/flapping.go b/pkg/icingadb/v1/history/flapping.go new file mode 100644 index 00000000..9280b27a --- /dev/null +++ b/pkg/icingadb/v1/history/flapping.go @@ -0,0 +1,80 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" +) + +type FlappingHistoryUpserter struct { + EndTime types.UnixMilli `json:"end_time"` + PercentStateChangeEnd types.Float `json:"percent_state_change_end"` + FlappingThresholdLow float32 `json:"flapping_threshold_low"` + FlappingThresholdHigh float32 `json:"flapping_threshold_high"` +} + +// Upsert implements the contracts.Upserter interface. +func (fhu *FlappingHistoryUpserter) Upsert() interface{} { + return fhu +} + +type FlappingHistory struct { + v1.EntityWithoutChecksum `json:",inline"` + HistoryTableMeta `json:",inline"` + FlappingHistoryUpserter `json:",inline"` + StartTime types.UnixMilli `json:"start_time"` + PercentStateChangeStart types.Float `json:"percent_state_change_start"` +} + +type HistoryFlapping struct { + HistoryMeta `json:",inline"` + FlappingHistoryId types.Binary `json:"id"` + + // Idea: read StartTime and EndTime from Redis and let EventTime decide which of them to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + StartTime types.UnixMilli `json:"start_time" db:"-"` + EndTime types.UnixMilli `json:"end_time" db:"-"` + EventTime FlappingEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryFlapping) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryFlapping) TableName() string { + return "history" +} + +type FlappingEventTime struct { + History *HistoryFlapping `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et FlappingEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "flapping_start": + return et.History.StartTime.Value() + case "flapping_end": + return et.History.EndTime.Value() + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*FlappingHistory)(nil) + _ contracts.Initer = (*HistoryFlapping)(nil) + _ contracts.TableNamer = (*HistoryFlapping)(nil) + _ UpserterEntity = (*HistoryFlapping)(nil) + _ driver.Valuer = FlappingEventTime{} +) diff --git a/pkg/icingadb/v1/history/meta.go b/pkg/icingadb/v1/history/meta.go new file mode 100644 index 00000000..39c6d9b0 --- /dev/null +++ b/pkg/icingadb/v1/history/meta.go @@ -0,0 +1,89 @@ +package history + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type UpserterEntity interface { + contracts.Upserter + contracts.Entity +} + +type HistoryTableEntity struct { + Id types.UUID `json:"id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (hte HistoryTableEntity) Fingerprint() contracts.Fingerprinter { + return hte +} + +// ID implements part of the contracts.Entity interface. +func (hte HistoryTableEntity) ID() contracts.ID { + return hte.Id +} + +// SetID implements part of the contracts.Entity interface. +func (hte *HistoryTableEntity) SetID(id contracts.ID) { + hte.Id = id.(types.UUID) +} + +// Upsert implements the contracts.Upserter interface. +// Update only the Id (effectively nothing). +func (hte HistoryTableEntity) Upsert() interface{} { + return hte +} + +type HistoryEntity struct { + Id types.UUID `json:"event_id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (he HistoryEntity) Fingerprint() contracts.Fingerprinter { + return he +} + +// ID implements part of the contracts.Entity interface. +func (he HistoryEntity) ID() contracts.ID { + return he.Id +} + +// SetID implements part of the contracts.Entity interface. +func (he *HistoryEntity) SetID(id contracts.ID) { + he.Id = id.(types.UUID) +} + +// Upsert implements the contracts.Upserter interface. +// Update only the Id (effectively nothing). +func (he HistoryEntity) Upsert() interface{} { + return he +} + +type HistoryTableMeta struct { + EnvironmentId types.Binary `json:"environment_id"` + EndpointId types.Binary `json:"endpoint_id"` + ObjectType string `json:"object_type"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` +} + +type HistoryMeta struct { + HistoryEntity `json:",inline"` + EnvironmentId types.Binary `json:"environment_id"` + EndpointId types.Binary `json:"endpoint_id"` + ObjectType string `json:"object_type"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` + EventType string `json:"event_type"` +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*HistoryTableEntity)(nil) + _ contracts.Upserter = HistoryTableEntity{} + _ contracts.Entity = (*HistoryEntity)(nil) + _ contracts.Upserter = HistoryEntity{} + _ contracts.Entity = (*HistoryMeta)(nil) + _ contracts.Upserter = (*HistoryMeta)(nil) +) diff --git a/pkg/icingadb/v1/history/notification.go b/pkg/icingadb/v1/history/notification.go new file mode 100644 index 00000000..367ea881 --- /dev/null +++ b/pkg/icingadb/v1/history/notification.go @@ -0,0 +1,45 @@ +package history + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type NotificationHistory struct { + HistoryTableEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + NotificationId types.Binary `json:"notification_id"` + Type types.NotificationType `json:"type"` + SendTime types.UnixMilli `json:"send_time"` + State uint8 `json:"state"` + PreviousHardState uint8 `json:"previous_hard_state"` + Author string `json:"author"` + Text string `json:"text"` + UsersNotified uint16 `json:"users_notified"` +} + +type UserNotificationHistory struct { + HistoryTableEntity `json:",inline"` + EnvironmentId types.Binary `json:"environment_id"` + NotificationHistoryId types.UUID `json:"notification_history_id"` + UserId types.Binary `json:"user_id"` +} + +type HistoryNotification struct { + HistoryMeta `json:",inline"` + NotificationHistoryId types.UUID `json:"id"` + EventTime types.UnixMilli `json:"send_time"` +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryNotification) TableName() string { + return "history" +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*NotificationHistory)(nil) + _ UpserterEntity = (*UserNotificationHistory)(nil) + _ contracts.TableNamer = (*HistoryNotification)(nil) + _ UpserterEntity = (*HistoryNotification)(nil) +) diff --git a/pkg/icingadb/v1/history/state.go b/pkg/icingadb/v1/history/state.go new file mode 100644 index 00000000..318d1f49 --- /dev/null +++ b/pkg/icingadb/v1/history/state.go @@ -0,0 +1,40 @@ +package history + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type StateHistory struct { + HistoryTableEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + EventTime types.UnixMilli `json:"event_time"` + StateType types.StateType `json:"state_type"` + SoftState uint8 `json:"soft_state"` + HardState uint8 `json:"hard_state"` + PreviousSoftState uint8 `json:"previous_soft_state"` + PreviousHardState uint8 `json:"previous_hard_state"` + Attempt uint8 `json:"attempt"` + Output types.String `json:"output"` + LongOutput types.String `json:"long_output"` + MaxCheckAttempts uint32 `json:"max_check_attempts"` + CheckSource types.String `json:"check_source"` +} + +type HistoryState struct { + HistoryMeta `json:",inline"` + StateHistoryId types.UUID `json:"id"` + EventTime types.UnixMilli `json:"event_time"` +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryState) TableName() string { + return "history" +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*StateHistory)(nil) + _ contracts.TableNamer = (*HistoryState)(nil) + _ UpserterEntity = (*HistoryState)(nil) +)