From 7ccf627df6ec2a902af1eb014645a05bacc2542e Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 9 Apr 2021 11:17:37 +0200 Subject: [PATCH] Sync history --- cmd/icingadb/main.go | 6 + pkg/icingadb/history/sync.go | 300 ++++++++++++++++++++++++ pkg/icingadb/v1/history/ack.go | 82 +++++++ pkg/icingadb/v1/history/comment.go | 120 ++++++++++ pkg/icingadb/v1/history/downtime.go | 119 ++++++++++ pkg/icingadb/v1/history/flapping.go | 80 +++++++ pkg/icingadb/v1/history/meta.go | 89 +++++++ pkg/icingadb/v1/history/notification.go | 45 ++++ pkg/icingadb/v1/history/state.go | 40 ++++ 9 files changed, 881 insertions(+) create mode 100644 pkg/icingadb/history/sync.go create mode 100644 pkg/icingadb/v1/history/ack.go create mode 100644 pkg/icingadb/v1/history/comment.go create mode 100644 pkg/icingadb/v1/history/downtime.go create mode 100644 pkg/icingadb/v1/history/flapping.go create mode 100644 pkg/icingadb/v1/history/meta.go create mode 100644 pkg/icingadb/v1/history/notification.go create mode 100644 pkg/icingadb/v1/history/state.go diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 7f2ded72..32101737 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/flatten" "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/icingadb/history" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/utils" @@ -40,6 +41,7 @@ func main() { heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger) ha := icingadb.NewHA(ctx, db, heartbeat, logger) s := icingadb.NewSync(db, rc, logger) + hs := history.NewSync(db, rc, logger) // For temporary exit after sync done := make(chan struct{}, 0) @@ -186,6 +188,10 @@ func main() { }) } + g.Go(func() error { + return hs.Sync(synctx) + }) + if err := g.Wait(); err != nil { // TODO(el): This panics here even if a ctx gets cancelled. // That is intentional for the moment for testing. diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go new file mode 100644 index 00000000..3673ad69 --- /dev/null +++ b/pkg/icingadb/history/sync.go @@ -0,0 +1,300 @@ +package history + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/structify" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "reflect" + "sync" + "time" +) + +// Sync specifies the source and destination of a history sync. +type Sync struct { + db *icingadb.DB + redis *icingaredis.Client + logger *zap.SugaredLogger +} + +// NewSync creates a new Sync. +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync { + return &Sync{ + db: db, + redis: redis, + logger: logger, + } +} + +// insertedMessage represents a just inserted row. +type insertedMessage struct { + // redisId specifies the origin Redis message. + redisId string + // structType represents the table the row was inserted into. + structType reflect.Type +} + +const bulkSize = 1 << 14 + +// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. +func (s Sync) Sync(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + + for _, hs := range historyStreams { + var redis2structs []chan<- redis.XMessage + insertedMessages := make(chan insertedMessage, bulkSize) + + // messageProgress are the tables (represented by struct types) + // with successfully inserted rows by Redis message ID. + messageProgress := map[string]map[reflect.Type]struct{}{} + messageProgressMtx := &sync.Mutex{} + + stream := "icinga:history:stream:" + hs.kind + s.logger.Infof("Syncing %s history", hs.kind) + + for _, structifier := range hs.structifiers { + redis2struct := make(chan redis.XMessage, bulkSize) + struct2db := make(chan contracts.Entity, bulkSize) + succeeded := make(chan contracts.Entity, bulkSize) + + // rowIds are IDs of to be synced Redis messages by database row. + rowIds := map[contracts.Entity]string{} + rowIdsMtx := &sync.Mutex{} + + redis2structs = append(redis2structs, redis2struct) + + g.Go(structifyStream(ctx, structifier, redis2struct, struct2db, rowIds, rowIdsMtx)) + g.Go(fwdSucceeded(ctx, insertedMessages, succeeded, rowIds, rowIdsMtx)) + + // Upserts from struct2db. + g.Go(func() error { + defer close(succeeded) + return s.db.Upsert(ctx, struct2db, succeeded) + }) + } + + g.Go(s.xRead(ctx, redis2structs, stream)) + g.Go(s.cleanup(ctx, hs, insertedMessages, messageProgress, messageProgressMtx, stream)) + } + + return g.Wait() +} + +// xRead reads from the Redis stream and broadcasts the data to redis2structs. +func (s Sync) xRead(ctx context.Context, redis2structs []chan<- redis.XMessage, stream string) func() error { + return func() error { + defer func() { + for _, r2s := range redis2structs { + close(r2s) + } + }() + + xra := &redis.XReadArgs{ + Streams: []string{stream, "0-0"}, + Count: bulkSize, + Block: 10 * time.Second, + } + + for { + streams, err := s.redis.XRead(ctx, xra).Result() + if err != nil && err != redis.Nil { + return err + } + + for _, stream := range streams { + for _, message := range stream.Messages { + xra.Streams[1] = message.ID + + for _, r2s := range redis2structs { + select { + case <-ctx.Done(): + return ctx.Err() + case r2s <- message: + } + } + } + } + } + } +} + +// structifyStream structifies from redis2struct to struct2db. +func structifyStream( + ctx context.Context, structifier structify.MapStructifier, redis2struct <-chan redis.XMessage, + struct2db chan<- contracts.Entity, rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex, +) func() error { + return func() error { + defer close(struct2db) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case message, ok := <-redis2struct: + if !ok { + return nil + } + + ptr, err := structifier(message.Values) + if err != nil { + return err + } + + ue := ptr.(v1.UpserterEntity) + + rowIdsMtx.Lock() + rowIds[ue] = message.ID + rowIdsMtx.Unlock() + + select { + case <-ctx.Done(): + return ctx.Err() + case struct2db <- ue: + } + } + } + } +} + +// fwdSucceeded informs insertedMessages about successfully inserted rows according to succeeded. +func fwdSucceeded( + ctx context.Context, insertedMessages chan<- insertedMessage, succeeded <-chan contracts.Entity, + rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex, +) func() error { + return func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case row, ok := <-succeeded: + if !ok { + return nil + } + + rowIdsMtx.Lock() + + id, ok := rowIds[row] + if ok { + delete(rowIds, row) + } + + rowIdsMtx.Unlock() + + if ok { + select { + case <-ctx.Done(): + return ctx.Err() + case insertedMessages <- insertedMessage{id, reflect.TypeOf(row).Elem()}: + } + } + } + } + } +} + +// cleanup collects completely inserted messages from insertedMessages and deletes them from Redis. +func (s Sync) cleanup( + ctx context.Context, hs historyStream, insertedMessages <-chan insertedMessage, + messageProgress map[string]map[reflect.Type]struct{}, messageProgressMtx *sync.Mutex, stream string, +) func() error { + return func() error { + var ids []string + var count uint64 + var timeout <-chan time.Time + + const period = 20 * time.Second + periodically := time.NewTicker(period) + defer periodically.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-periodically.C: + if count > 0 { + s.logger.Infof("Inserted %d %s history entries in the last %s", count, hs.kind, period) + count = 0 + } + case msg := <-insertedMessages: + messageProgressMtx.Lock() + + mp, ok := messageProgress[msg.redisId] + if !ok { + mp = map[reflect.Type]struct{}{} + messageProgress[msg.redisId] = mp + } + + mp[msg.structType] = struct{}{} + + if ok = len(mp) == len(hs.structifiers); ok { + delete(messageProgress, msg.redisId) + } + + messageProgressMtx.Unlock() + + if ok { + ids = append(ids, msg.redisId) + count++ + + switch len(ids) { + case 1: + timeout = time.After(time.Second / 4) + case bulkSize: + if _, err := s.redis.XDel(ctx, stream, ids...).Result(); err != nil { + return err + } + + ids = nil + timeout = nil + } + } + case <-timeout: + if _, err := s.redis.XDel(ctx, stream, ids...).Result(); err != nil { + return err + } + + ids = nil + timeout = nil + } + } + } +} + +// historyStream represents a Redis history stream. +type historyStream struct { + // kind specifies the stream's purpose. + kind string + // structifiers lists the factories of the model structs the stream data shall be copied to. + structifiers []structify.MapStructifier +} + +// historyStreams contains all Redis history streams to sync. +var historyStreams = func() []historyStream { + var streams []historyStream + for _, rhs := range []struct { + kind string + structPtrs []v1.UpserterEntity + }{ + {"notification", []v1.UpserterEntity{(*v1.NotificationHistory)(nil), (*v1.HistoryNotification)(nil)}}, + {"usernotification", []v1.UpserterEntity{(*v1.UserNotificationHistory)(nil)}}, + {"state", []v1.UpserterEntity{(*v1.StateHistory)(nil), (*v1.HistoryState)(nil)}}, + {"downtime", []v1.UpserterEntity{(*v1.DowntimeHistory)(nil), (*v1.HistoryDowntime)(nil)}}, + {"comment", []v1.UpserterEntity{(*v1.CommentHistory)(nil), (*v1.HistoryComment)(nil)}}, + {"flapping", []v1.UpserterEntity{(*v1.FlappingHistory)(nil), (*v1.HistoryFlapping)(nil)}}, + {"acknowledgement", []v1.UpserterEntity{(*v1.AcknowledgementHistory)(nil), (*v1.HistoryAck)(nil)}}, + } { + var structifiers []structify.MapStructifier + for _, structPtr := range rhs.structPtrs { + structifiers = append(structifiers, structify.MakeMapStructifier(reflect.TypeOf(structPtr).Elem(), "json")) + } + + streams = append(streams, historyStream{rhs.kind, structifiers}) + } + + return streams +}() diff --git a/pkg/icingadb/v1/history/ack.go b/pkg/icingadb/v1/history/ack.go new file mode 100644 index 00000000..ba9a373c --- /dev/null +++ b/pkg/icingadb/v1/history/ack.go @@ -0,0 +1,82 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" +) + +type AckHistoryUpserter struct { + ClearTime types.UnixMilli `json:"clear_time"` + ClearedBy types.String `json:"cleared_by"` +} + +// Upsert implements the contracts.Upserter interface. +func (ahu *AckHistoryUpserter) Upsert() interface{} { + return ahu +} + +type AcknowledgementHistory struct { + v1.EntityWithoutChecksum `json:",inline"` + HistoryTableMeta `json:",inline"` + AckHistoryUpserter `json:",inline"` + SetTime types.UnixMilli `json:"set_time"` + Author string `json:"author"` + Comment types.String `json:"comment"` + ExpireTime types.UnixMilli `json:"expire_time"` + IsPersistent types.Bool `json:"is_persistent"` + IsSticky types.Bool `json:"is_sticky"` +} + +type HistoryAck struct { + HistoryMeta `json:",inline"` + AcknowledgementHistoryId types.Binary `json:"id"` + + // Idea: read SetTime and ClearTime from Redis and let EventTime decide which of them to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + SetTime types.UnixMilli `json:"set_time" db:"-"` + ClearTime types.UnixMilli `json:"clear_time" db:"-"` + EventTime AckEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryAck) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryAck) TableName() string { + return "history" +} + +type AckEventTime struct { + History *HistoryAck `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et AckEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "ack_set": + return et.History.SetTime.Value() + case "ack_clear": + return et.History.ClearTime.Value() + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*AcknowledgementHistory)(nil) + _ contracts.Initer = (*HistoryAck)(nil) + _ contracts.TableNamer = (*HistoryAck)(nil) + _ UpserterEntity = (*HistoryAck)(nil) + _ driver.Valuer = AckEventTime{} +) diff --git a/pkg/icingadb/v1/history/comment.go b/pkg/icingadb/v1/history/comment.go new file mode 100644 index 00000000..d3a5743a --- /dev/null +++ b/pkg/icingadb/v1/history/comment.go @@ -0,0 +1,120 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type CommentHistoryEntity struct { + CommentId types.Binary `json:"comment_id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (che CommentHistoryEntity) Fingerprint() contracts.Fingerprinter { + return che +} + +// ID implements part of the contracts.Entity interface. +func (che CommentHistoryEntity) ID() contracts.ID { + return che.CommentId +} + +// SetID implements part of the contracts.Entity interface. +func (che *CommentHistoryEntity) SetID(id contracts.ID) { + che.CommentId = id.(types.Binary) +} + +type CommentHistoryUpserter struct { + RemovedBy types.String `json:"removed_by"` + RemoveTime types.UnixMilli `json:"remove_time"` + HasBeenRemoved types.Bool `json:"has_been_removed"` +} + +// Upsert implements the contracts.Upserter interface. +func (chu *CommentHistoryUpserter) Upsert() interface{} { + return chu +} + +type CommentHistory struct { + CommentHistoryEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + CommentHistoryUpserter `json:",inline"` + EntryTime types.UnixMilli `json:"entry_time"` + Author string `json:"author"` + Comment string `json:"comment"` + EntryType types.CommentType `json:"entry_type"` + IsPersistent types.Bool `json:"is_persistent"` + IsSticky types.Bool `json:"is_sticky"` + ExpireTime types.UnixMilli `json:"expire_time"` +} + +// Init implements the contracts.Initer interface. +func (ch *CommentHistory) Init() { + ch.HasBeenRemoved = types.Bool{ + Bool: false, + Valid: true, + } +} + +type HistoryComment struct { + HistoryMeta `json:",inline"` + CommentHistoryId types.Binary `json:"comment_id"` + + // Idea: read EntryTime, RemoveTime and ExpireTime from Redis + // and let EventTime decide which of them to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + EntryTime types.UnixMilli `json:"entry_time" db:"-"` + RemoveTime types.UnixMilli `json:"remove_time" db:"-"` + ExpireTime types.UnixMilli `json:"expire_time" db:"-"` + EventTime CommentEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryComment) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryComment) TableName() string { + return "history" +} + +type CommentEventTime struct { + History *HistoryComment `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et CommentEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "comment_add": + return et.History.EntryTime.Value() + case "comment_remove": + v, err := et.History.RemoveTime.Value() + if err == nil && v == nil { + return et.History.ExpireTime.Value() + } + + return v, err + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*CommentHistoryEntity)(nil) + _ contracts.Upserter = (*CommentHistoryUpserter)(nil) + _ contracts.Initer = (*CommentHistory)(nil) + _ UpserterEntity = (*CommentHistory)(nil) + _ contracts.Initer = (*HistoryComment)(nil) + _ contracts.TableNamer = (*HistoryComment)(nil) + _ UpserterEntity = (*HistoryComment)(nil) + _ driver.Valuer = CommentEventTime{} +) diff --git a/pkg/icingadb/v1/history/downtime.go b/pkg/icingadb/v1/history/downtime.go new file mode 100644 index 00000000..aa645be2 --- /dev/null +++ b/pkg/icingadb/v1/history/downtime.go @@ -0,0 +1,119 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type DowntimeHistoryEntity struct { + DowntimeId types.Binary `json:"downtime_id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (dhe DowntimeHistoryEntity) Fingerprint() contracts.Fingerprinter { + return dhe +} + +// ID implements part of the contracts.Entity interface. +func (dhe DowntimeHistoryEntity) ID() contracts.ID { + return dhe.DowntimeId +} + +// SetID implements part of the contracts.Entity interface. +func (dhe *DowntimeHistoryEntity) SetID(id contracts.ID) { + dhe.DowntimeId = id.(types.Binary) +} + +type DowntimeHistoryUpserter struct { + CancelledBy types.String `json:"cancelled_by"` + HasBeenCancelled types.Bool `json:"has_been_cancelled"` + CancelTime types.UnixMilli `json:"cancel_time"` +} + +// Upsert implements the contracts.Upserter interface. +func (dhu *DowntimeHistoryUpserter) Upsert() interface{} { + return dhu +} + +type DowntimeHistory struct { + DowntimeHistoryEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + DowntimeHistoryUpserter `json:",inline"` + TriggeredById types.Binary `json:"triggered_by_id"` + EntryTime types.UnixMilli `json:"entry_time"` + Author string `json:"author"` + Comment string `json:"comment"` + IsFlexible types.Bool `json:"is_flexible"` + FlexibleDuration uint64 `json:"flexible_duration"` + ScheduledStartTime types.UnixMilli `json:"scheduled_start_time"` + ScheduledEndTime types.UnixMilli `json:"scheduled_end_time"` + StartTime types.UnixMilli `json:"start_time"` + EndTime types.UnixMilli `json:"end_time"` + TriggerTime types.UnixMilli `json:"trigger_time"` +} + +type HistoryDowntime struct { + HistoryMeta `json:",inline"` + DowntimeHistoryId types.Binary `json:"downtime_id"` + + // Idea: read StartTime, CancelTime, EndTime and HasBeenCancelled from Redis + // and let EventTime decide based on HasBeenCancelled which of the others to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + StartTime types.UnixMilli `json:"start_time" db:"-"` + CancelTime types.UnixMilli `json:"cancel_time" db:"-"` + EndTime types.UnixMilli `json:"end_time" db:"-"` + HasBeenCancelled types.Bool `json:"has_been_cancelled" db:"-"` + EventTime DowntimeEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryDowntime) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryDowntime) TableName() string { + return "history" +} + +type DowntimeEventTime struct { + History *HistoryDowntime `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et DowntimeEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "downtime_start": + return et.History.StartTime.Value() + case "downtime_end": + if !et.History.HasBeenCancelled.Valid { + return nil, nil + } + + if et.History.HasBeenCancelled.Bool { + return et.History.CancelTime.Value() + } else { + return et.History.EndTime.Value() + } + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*DowntimeHistoryEntity)(nil) + _ contracts.Upserter = (*DowntimeHistoryUpserter)(nil) + _ UpserterEntity = (*DowntimeHistory)(nil) + _ contracts.Initer = (*HistoryDowntime)(nil) + _ contracts.TableNamer = (*HistoryDowntime)(nil) + _ UpserterEntity = (*HistoryDowntime)(nil) + _ driver.Valuer = DowntimeEventTime{} +) diff --git a/pkg/icingadb/v1/history/flapping.go b/pkg/icingadb/v1/history/flapping.go new file mode 100644 index 00000000..9280b27a --- /dev/null +++ b/pkg/icingadb/v1/history/flapping.go @@ -0,0 +1,80 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" +) + +type FlappingHistoryUpserter struct { + EndTime types.UnixMilli `json:"end_time"` + PercentStateChangeEnd types.Float `json:"percent_state_change_end"` + FlappingThresholdLow float32 `json:"flapping_threshold_low"` + FlappingThresholdHigh float32 `json:"flapping_threshold_high"` +} + +// Upsert implements the contracts.Upserter interface. +func (fhu *FlappingHistoryUpserter) Upsert() interface{} { + return fhu +} + +type FlappingHistory struct { + v1.EntityWithoutChecksum `json:",inline"` + HistoryTableMeta `json:",inline"` + FlappingHistoryUpserter `json:",inline"` + StartTime types.UnixMilli `json:"start_time"` + PercentStateChangeStart types.Float `json:"percent_state_change_start"` +} + +type HistoryFlapping struct { + HistoryMeta `json:",inline"` + FlappingHistoryId types.Binary `json:"id"` + + // Idea: read StartTime and EndTime from Redis and let EventTime decide which of them to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + StartTime types.UnixMilli `json:"start_time" db:"-"` + EndTime types.UnixMilli `json:"end_time" db:"-"` + EventTime FlappingEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryFlapping) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryFlapping) TableName() string { + return "history" +} + +type FlappingEventTime struct { + History *HistoryFlapping `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et FlappingEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "flapping_start": + return et.History.StartTime.Value() + case "flapping_end": + return et.History.EndTime.Value() + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*FlappingHistory)(nil) + _ contracts.Initer = (*HistoryFlapping)(nil) + _ contracts.TableNamer = (*HistoryFlapping)(nil) + _ UpserterEntity = (*HistoryFlapping)(nil) + _ driver.Valuer = FlappingEventTime{} +) diff --git a/pkg/icingadb/v1/history/meta.go b/pkg/icingadb/v1/history/meta.go new file mode 100644 index 00000000..39c6d9b0 --- /dev/null +++ b/pkg/icingadb/v1/history/meta.go @@ -0,0 +1,89 @@ +package history + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type UpserterEntity interface { + contracts.Upserter + contracts.Entity +} + +type HistoryTableEntity struct { + Id types.UUID `json:"id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (hte HistoryTableEntity) Fingerprint() contracts.Fingerprinter { + return hte +} + +// ID implements part of the contracts.Entity interface. +func (hte HistoryTableEntity) ID() contracts.ID { + return hte.Id +} + +// SetID implements part of the contracts.Entity interface. +func (hte *HistoryTableEntity) SetID(id contracts.ID) { + hte.Id = id.(types.UUID) +} + +// Upsert implements the contracts.Upserter interface. +// Update only the Id (effectively nothing). +func (hte HistoryTableEntity) Upsert() interface{} { + return hte +} + +type HistoryEntity struct { + Id types.UUID `json:"event_id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (he HistoryEntity) Fingerprint() contracts.Fingerprinter { + return he +} + +// ID implements part of the contracts.Entity interface. +func (he HistoryEntity) ID() contracts.ID { + return he.Id +} + +// SetID implements part of the contracts.Entity interface. +func (he *HistoryEntity) SetID(id contracts.ID) { + he.Id = id.(types.UUID) +} + +// Upsert implements the contracts.Upserter interface. +// Update only the Id (effectively nothing). +func (he HistoryEntity) Upsert() interface{} { + return he +} + +type HistoryTableMeta struct { + EnvironmentId types.Binary `json:"environment_id"` + EndpointId types.Binary `json:"endpoint_id"` + ObjectType string `json:"object_type"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` +} + +type HistoryMeta struct { + HistoryEntity `json:",inline"` + EnvironmentId types.Binary `json:"environment_id"` + EndpointId types.Binary `json:"endpoint_id"` + ObjectType string `json:"object_type"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` + EventType string `json:"event_type"` +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*HistoryTableEntity)(nil) + _ contracts.Upserter = HistoryTableEntity{} + _ contracts.Entity = (*HistoryEntity)(nil) + _ contracts.Upserter = HistoryEntity{} + _ contracts.Entity = (*HistoryMeta)(nil) + _ contracts.Upserter = (*HistoryMeta)(nil) +) diff --git a/pkg/icingadb/v1/history/notification.go b/pkg/icingadb/v1/history/notification.go new file mode 100644 index 00000000..367ea881 --- /dev/null +++ b/pkg/icingadb/v1/history/notification.go @@ -0,0 +1,45 @@ +package history + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type NotificationHistory struct { + HistoryTableEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + NotificationId types.Binary `json:"notification_id"` + Type types.NotificationType `json:"type"` + SendTime types.UnixMilli `json:"send_time"` + State uint8 `json:"state"` + PreviousHardState uint8 `json:"previous_hard_state"` + Author string `json:"author"` + Text string `json:"text"` + UsersNotified uint16 `json:"users_notified"` +} + +type UserNotificationHistory struct { + HistoryTableEntity `json:",inline"` + EnvironmentId types.Binary `json:"environment_id"` + NotificationHistoryId types.UUID `json:"notification_history_id"` + UserId types.Binary `json:"user_id"` +} + +type HistoryNotification struct { + HistoryMeta `json:",inline"` + NotificationHistoryId types.UUID `json:"id"` + EventTime types.UnixMilli `json:"send_time"` +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryNotification) TableName() string { + return "history" +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*NotificationHistory)(nil) + _ UpserterEntity = (*UserNotificationHistory)(nil) + _ contracts.TableNamer = (*HistoryNotification)(nil) + _ UpserterEntity = (*HistoryNotification)(nil) +) diff --git a/pkg/icingadb/v1/history/state.go b/pkg/icingadb/v1/history/state.go new file mode 100644 index 00000000..318d1f49 --- /dev/null +++ b/pkg/icingadb/v1/history/state.go @@ -0,0 +1,40 @@ +package history + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type StateHistory struct { + HistoryTableEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + EventTime types.UnixMilli `json:"event_time"` + StateType types.StateType `json:"state_type"` + SoftState uint8 `json:"soft_state"` + HardState uint8 `json:"hard_state"` + PreviousSoftState uint8 `json:"previous_soft_state"` + PreviousHardState uint8 `json:"previous_hard_state"` + Attempt uint8 `json:"attempt"` + Output types.String `json:"output"` + LongOutput types.String `json:"long_output"` + MaxCheckAttempts uint32 `json:"max_check_attempts"` + CheckSource types.String `json:"check_source"` +} + +type HistoryState struct { + HistoryMeta `json:",inline"` + StateHistoryId types.UUID `json:"id"` + EventTime types.UnixMilli `json:"event_time"` +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryState) TableName() string { + return "history" +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*StateHistory)(nil) + _ contracts.TableNamer = (*HistoryState)(nil) + _ UpserterEntity = (*HistoryState)(nil) +)