mirror of
https://github.com/Icinga/icingadb.git
synced 2026-06-06 15:22:08 -04:00
Sync history
This commit is contained in:
parent
9aa1070db0
commit
7ccf627df6
9 changed files with 881 additions and 0 deletions
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/flatten"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
"github.com/icinga/icingadb/pkg/icingadb/history"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
|
|
@ -40,6 +41,7 @@ func main() {
|
|||
heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger)
|
||||
ha := icingadb.NewHA(ctx, db, heartbeat, logger)
|
||||
s := icingadb.NewSync(db, rc, logger)
|
||||
hs := history.NewSync(db, rc, logger)
|
||||
|
||||
// For temporary exit after sync
|
||||
done := make(chan struct{}, 0)
|
||||
|
|
@ -186,6 +188,10 @@ func main() {
|
|||
})
|
||||
}
|
||||
|
||||
g.Go(func() error {
|
||||
return hs.Sync(synctx)
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
// TODO(el): This panics here even if a ctx gets cancelled.
|
||||
// That is intentional for the moment for testing.
|
||||
|
|
|
|||
300
pkg/icingadb/history/sync.go
Normal file
300
pkg/icingadb/history/sync.go
Normal file
|
|
@ -0,0 +1,300 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/structify"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Sync specifies the source and destination of a history sync.
|
||||
type Sync struct {
|
||||
db *icingadb.DB
|
||||
redis *icingaredis.Client
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
// NewSync creates a new Sync.
|
||||
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync {
|
||||
return &Sync{
|
||||
db: db,
|
||||
redis: redis,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// insertedMessage represents a just inserted row.
|
||||
type insertedMessage struct {
|
||||
// redisId specifies the origin Redis message.
|
||||
redisId string
|
||||
// structType represents the table the row was inserted into.
|
||||
structType reflect.Type
|
||||
}
|
||||
|
||||
const bulkSize = 1 << 14
|
||||
|
||||
// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
|
||||
func (s Sync) Sync(ctx context.Context) error {
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
for _, hs := range historyStreams {
|
||||
var redis2structs []chan<- redis.XMessage
|
||||
insertedMessages := make(chan insertedMessage, bulkSize)
|
||||
|
||||
// messageProgress are the tables (represented by struct types)
|
||||
// with successfully inserted rows by Redis message ID.
|
||||
messageProgress := map[string]map[reflect.Type]struct{}{}
|
||||
messageProgressMtx := &sync.Mutex{}
|
||||
|
||||
stream := "icinga:history:stream:" + hs.kind
|
||||
s.logger.Infof("Syncing %s history", hs.kind)
|
||||
|
||||
for _, structifier := range hs.structifiers {
|
||||
redis2struct := make(chan redis.XMessage, bulkSize)
|
||||
struct2db := make(chan contracts.Entity, bulkSize)
|
||||
succeeded := make(chan contracts.Entity, bulkSize)
|
||||
|
||||
// rowIds are IDs of to be synced Redis messages by database row.
|
||||
rowIds := map[contracts.Entity]string{}
|
||||
rowIdsMtx := &sync.Mutex{}
|
||||
|
||||
redis2structs = append(redis2structs, redis2struct)
|
||||
|
||||
g.Go(structifyStream(ctx, structifier, redis2struct, struct2db, rowIds, rowIdsMtx))
|
||||
g.Go(fwdSucceeded(ctx, insertedMessages, succeeded, rowIds, rowIdsMtx))
|
||||
|
||||
// Upserts from struct2db.
|
||||
g.Go(func() error {
|
||||
defer close(succeeded)
|
||||
return s.db.Upsert(ctx, struct2db, succeeded)
|
||||
})
|
||||
}
|
||||
|
||||
g.Go(s.xRead(ctx, redis2structs, stream))
|
||||
g.Go(s.cleanup(ctx, hs, insertedMessages, messageProgress, messageProgressMtx, stream))
|
||||
}
|
||||
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
// xRead reads from the Redis stream and broadcasts the data to redis2structs.
|
||||
func (s Sync) xRead(ctx context.Context, redis2structs []chan<- redis.XMessage, stream string) func() error {
|
||||
return func() error {
|
||||
defer func() {
|
||||
for _, r2s := range redis2structs {
|
||||
close(r2s)
|
||||
}
|
||||
}()
|
||||
|
||||
xra := &redis.XReadArgs{
|
||||
Streams: []string{stream, "0-0"},
|
||||
Count: bulkSize,
|
||||
Block: 10 * time.Second,
|
||||
}
|
||||
|
||||
for {
|
||||
streams, err := s.redis.XRead(ctx, xra).Result()
|
||||
if err != nil && err != redis.Nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, stream := range streams {
|
||||
for _, message := range stream.Messages {
|
||||
xra.Streams[1] = message.ID
|
||||
|
||||
for _, r2s := range redis2structs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case r2s <- message:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// structifyStream structifies from redis2struct to struct2db.
|
||||
func structifyStream(
|
||||
ctx context.Context, structifier structify.MapStructifier, redis2struct <-chan redis.XMessage,
|
||||
struct2db chan<- contracts.Entity, rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex,
|
||||
) func() error {
|
||||
return func() error {
|
||||
defer close(struct2db)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case message, ok := <-redis2struct:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
ptr, err := structifier(message.Values)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ue := ptr.(v1.UpserterEntity)
|
||||
|
||||
rowIdsMtx.Lock()
|
||||
rowIds[ue] = message.ID
|
||||
rowIdsMtx.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case struct2db <- ue:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fwdSucceeded informs insertedMessages about successfully inserted rows according to succeeded.
|
||||
func fwdSucceeded(
|
||||
ctx context.Context, insertedMessages chan<- insertedMessage, succeeded <-chan contracts.Entity,
|
||||
rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex,
|
||||
) func() error {
|
||||
return func() error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case row, ok := <-succeeded:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
rowIdsMtx.Lock()
|
||||
|
||||
id, ok := rowIds[row]
|
||||
if ok {
|
||||
delete(rowIds, row)
|
||||
}
|
||||
|
||||
rowIdsMtx.Unlock()
|
||||
|
||||
if ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case insertedMessages <- insertedMessage{id, reflect.TypeOf(row).Elem()}:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup collects completely inserted messages from insertedMessages and deletes them from Redis.
|
||||
func (s Sync) cleanup(
|
||||
ctx context.Context, hs historyStream, insertedMessages <-chan insertedMessage,
|
||||
messageProgress map[string]map[reflect.Type]struct{}, messageProgressMtx *sync.Mutex, stream string,
|
||||
) func() error {
|
||||
return func() error {
|
||||
var ids []string
|
||||
var count uint64
|
||||
var timeout <-chan time.Time
|
||||
|
||||
const period = 20 * time.Second
|
||||
periodically := time.NewTicker(period)
|
||||
defer periodically.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-periodically.C:
|
||||
if count > 0 {
|
||||
s.logger.Infof("Inserted %d %s history entries in the last %s", count, hs.kind, period)
|
||||
count = 0
|
||||
}
|
||||
case msg := <-insertedMessages:
|
||||
messageProgressMtx.Lock()
|
||||
|
||||
mp, ok := messageProgress[msg.redisId]
|
||||
if !ok {
|
||||
mp = map[reflect.Type]struct{}{}
|
||||
messageProgress[msg.redisId] = mp
|
||||
}
|
||||
|
||||
mp[msg.structType] = struct{}{}
|
||||
|
||||
if ok = len(mp) == len(hs.structifiers); ok {
|
||||
delete(messageProgress, msg.redisId)
|
||||
}
|
||||
|
||||
messageProgressMtx.Unlock()
|
||||
|
||||
if ok {
|
||||
ids = append(ids, msg.redisId)
|
||||
count++
|
||||
|
||||
switch len(ids) {
|
||||
case 1:
|
||||
timeout = time.After(time.Second / 4)
|
||||
case bulkSize:
|
||||
if _, err := s.redis.XDel(ctx, stream, ids...).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ids = nil
|
||||
timeout = nil
|
||||
}
|
||||
}
|
||||
case <-timeout:
|
||||
if _, err := s.redis.XDel(ctx, stream, ids...).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ids = nil
|
||||
timeout = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// historyStream represents a Redis history stream.
|
||||
type historyStream struct {
|
||||
// kind specifies the stream's purpose.
|
||||
kind string
|
||||
// structifiers lists the factories of the model structs the stream data shall be copied to.
|
||||
structifiers []structify.MapStructifier
|
||||
}
|
||||
|
||||
// historyStreams contains all Redis history streams to sync.
|
||||
var historyStreams = func() []historyStream {
|
||||
var streams []historyStream
|
||||
for _, rhs := range []struct {
|
||||
kind string
|
||||
structPtrs []v1.UpserterEntity
|
||||
}{
|
||||
{"notification", []v1.UpserterEntity{(*v1.NotificationHistory)(nil), (*v1.HistoryNotification)(nil)}},
|
||||
{"usernotification", []v1.UpserterEntity{(*v1.UserNotificationHistory)(nil)}},
|
||||
{"state", []v1.UpserterEntity{(*v1.StateHistory)(nil), (*v1.HistoryState)(nil)}},
|
||||
{"downtime", []v1.UpserterEntity{(*v1.DowntimeHistory)(nil), (*v1.HistoryDowntime)(nil)}},
|
||||
{"comment", []v1.UpserterEntity{(*v1.CommentHistory)(nil), (*v1.HistoryComment)(nil)}},
|
||||
{"flapping", []v1.UpserterEntity{(*v1.FlappingHistory)(nil), (*v1.HistoryFlapping)(nil)}},
|
||||
{"acknowledgement", []v1.UpserterEntity{(*v1.AcknowledgementHistory)(nil), (*v1.HistoryAck)(nil)}},
|
||||
} {
|
||||
var structifiers []structify.MapStructifier
|
||||
for _, structPtr := range rhs.structPtrs {
|
||||
structifiers = append(structifiers, structify.MakeMapStructifier(reflect.TypeOf(structPtr).Elem(), "json"))
|
||||
}
|
||||
|
||||
streams = append(streams, historyStream{rhs.kind, structifiers})
|
||||
}
|
||||
|
||||
return streams
|
||||
}()
|
||||
82
pkg/icingadb/v1/history/ack.go
Normal file
82
pkg/icingadb/v1/history/ack.go
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type AckHistoryUpserter struct {
|
||||
ClearTime types.UnixMilli `json:"clear_time"`
|
||||
ClearedBy types.String `json:"cleared_by"`
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
func (ahu *AckHistoryUpserter) Upsert() interface{} {
|
||||
return ahu
|
||||
}
|
||||
|
||||
type AcknowledgementHistory struct {
|
||||
v1.EntityWithoutChecksum `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
AckHistoryUpserter `json:",inline"`
|
||||
SetTime types.UnixMilli `json:"set_time"`
|
||||
Author string `json:"author"`
|
||||
Comment types.String `json:"comment"`
|
||||
ExpireTime types.UnixMilli `json:"expire_time"`
|
||||
IsPersistent types.Bool `json:"is_persistent"`
|
||||
IsSticky types.Bool `json:"is_sticky"`
|
||||
}
|
||||
|
||||
type HistoryAck struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
AcknowledgementHistoryId types.Binary `json:"id"`
|
||||
|
||||
// Idea: read SetTime and ClearTime from Redis and let EventTime decide which of them to write to MySQL.
|
||||
// So EventTime doesn't have to be read from Redis (json:"-")
|
||||
// and the others don't have to be written to MySQL (db:"-").
|
||||
SetTime types.UnixMilli `json:"set_time" db:"-"`
|
||||
ClearTime types.UnixMilli `json:"clear_time" db:"-"`
|
||||
EventTime AckEventTime `json:"-"`
|
||||
}
|
||||
|
||||
// Init implements the contracts.Initer interface.
|
||||
func (h *HistoryAck) Init() {
|
||||
h.EventTime.History = h
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryAck) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
type AckEventTime struct {
|
||||
History *HistoryAck `db:"-"`
|
||||
}
|
||||
|
||||
// Value implements the driver.Valuer interface.
|
||||
// Supports SQL NULL.
|
||||
func (et AckEventTime) Value() (driver.Value, error) {
|
||||
if et.History == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch et.History.EventType {
|
||||
case "ack_set":
|
||||
return et.History.SetTime.Value()
|
||||
case "ack_clear":
|
||||
return et.History.ClearTime.Value()
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ UpserterEntity = (*AcknowledgementHistory)(nil)
|
||||
_ contracts.Initer = (*HistoryAck)(nil)
|
||||
_ contracts.TableNamer = (*HistoryAck)(nil)
|
||||
_ UpserterEntity = (*HistoryAck)(nil)
|
||||
_ driver.Valuer = AckEventTime{}
|
||||
)
|
||||
120
pkg/icingadb/v1/history/comment.go
Normal file
120
pkg/icingadb/v1/history/comment.go
Normal file
|
|
@ -0,0 +1,120 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type CommentHistoryEntity struct {
|
||||
CommentId types.Binary `json:"comment_id"`
|
||||
}
|
||||
|
||||
// Fingerprint implements part of the contracts.Entity interface.
|
||||
func (che CommentHistoryEntity) Fingerprint() contracts.Fingerprinter {
|
||||
return che
|
||||
}
|
||||
|
||||
// ID implements part of the contracts.Entity interface.
|
||||
func (che CommentHistoryEntity) ID() contracts.ID {
|
||||
return che.CommentId
|
||||
}
|
||||
|
||||
// SetID implements part of the contracts.Entity interface.
|
||||
func (che *CommentHistoryEntity) SetID(id contracts.ID) {
|
||||
che.CommentId = id.(types.Binary)
|
||||
}
|
||||
|
||||
type CommentHistoryUpserter struct {
|
||||
RemovedBy types.String `json:"removed_by"`
|
||||
RemoveTime types.UnixMilli `json:"remove_time"`
|
||||
HasBeenRemoved types.Bool `json:"has_been_removed"`
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
func (chu *CommentHistoryUpserter) Upsert() interface{} {
|
||||
return chu
|
||||
}
|
||||
|
||||
type CommentHistory struct {
|
||||
CommentHistoryEntity `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
CommentHistoryUpserter `json:",inline"`
|
||||
EntryTime types.UnixMilli `json:"entry_time"`
|
||||
Author string `json:"author"`
|
||||
Comment string `json:"comment"`
|
||||
EntryType types.CommentType `json:"entry_type"`
|
||||
IsPersistent types.Bool `json:"is_persistent"`
|
||||
IsSticky types.Bool `json:"is_sticky"`
|
||||
ExpireTime types.UnixMilli `json:"expire_time"`
|
||||
}
|
||||
|
||||
// Init implements the contracts.Initer interface.
|
||||
func (ch *CommentHistory) Init() {
|
||||
ch.HasBeenRemoved = types.Bool{
|
||||
Bool: false,
|
||||
Valid: true,
|
||||
}
|
||||
}
|
||||
|
||||
type HistoryComment struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
CommentHistoryId types.Binary `json:"comment_id"`
|
||||
|
||||
// Idea: read EntryTime, RemoveTime and ExpireTime from Redis
|
||||
// and let EventTime decide which of them to write to MySQL.
|
||||
// So EventTime doesn't have to be read from Redis (json:"-")
|
||||
// and the others don't have to be written to MySQL (db:"-").
|
||||
EntryTime types.UnixMilli `json:"entry_time" db:"-"`
|
||||
RemoveTime types.UnixMilli `json:"remove_time" db:"-"`
|
||||
ExpireTime types.UnixMilli `json:"expire_time" db:"-"`
|
||||
EventTime CommentEventTime `json:"-"`
|
||||
}
|
||||
|
||||
// Init implements the contracts.Initer interface.
|
||||
func (h *HistoryComment) Init() {
|
||||
h.EventTime.History = h
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryComment) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
type CommentEventTime struct {
|
||||
History *HistoryComment `db:"-"`
|
||||
}
|
||||
|
||||
// Value implements the driver.Valuer interface.
|
||||
// Supports SQL NULL.
|
||||
func (et CommentEventTime) Value() (driver.Value, error) {
|
||||
if et.History == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch et.History.EventType {
|
||||
case "comment_add":
|
||||
return et.History.EntryTime.Value()
|
||||
case "comment_remove":
|
||||
v, err := et.History.RemoveTime.Value()
|
||||
if err == nil && v == nil {
|
||||
return et.History.ExpireTime.Value()
|
||||
}
|
||||
|
||||
return v, err
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ contracts.Entity = (*CommentHistoryEntity)(nil)
|
||||
_ contracts.Upserter = (*CommentHistoryUpserter)(nil)
|
||||
_ contracts.Initer = (*CommentHistory)(nil)
|
||||
_ UpserterEntity = (*CommentHistory)(nil)
|
||||
_ contracts.Initer = (*HistoryComment)(nil)
|
||||
_ contracts.TableNamer = (*HistoryComment)(nil)
|
||||
_ UpserterEntity = (*HistoryComment)(nil)
|
||||
_ driver.Valuer = CommentEventTime{}
|
||||
)
|
||||
119
pkg/icingadb/v1/history/downtime.go
Normal file
119
pkg/icingadb/v1/history/downtime.go
Normal file
|
|
@ -0,0 +1,119 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type DowntimeHistoryEntity struct {
|
||||
DowntimeId types.Binary `json:"downtime_id"`
|
||||
}
|
||||
|
||||
// Fingerprint implements part of the contracts.Entity interface.
|
||||
func (dhe DowntimeHistoryEntity) Fingerprint() contracts.Fingerprinter {
|
||||
return dhe
|
||||
}
|
||||
|
||||
// ID implements part of the contracts.Entity interface.
|
||||
func (dhe DowntimeHistoryEntity) ID() contracts.ID {
|
||||
return dhe.DowntimeId
|
||||
}
|
||||
|
||||
// SetID implements part of the contracts.Entity interface.
|
||||
func (dhe *DowntimeHistoryEntity) SetID(id contracts.ID) {
|
||||
dhe.DowntimeId = id.(types.Binary)
|
||||
}
|
||||
|
||||
type DowntimeHistoryUpserter struct {
|
||||
CancelledBy types.String `json:"cancelled_by"`
|
||||
HasBeenCancelled types.Bool `json:"has_been_cancelled"`
|
||||
CancelTime types.UnixMilli `json:"cancel_time"`
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
func (dhu *DowntimeHistoryUpserter) Upsert() interface{} {
|
||||
return dhu
|
||||
}
|
||||
|
||||
type DowntimeHistory struct {
|
||||
DowntimeHistoryEntity `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
DowntimeHistoryUpserter `json:",inline"`
|
||||
TriggeredById types.Binary `json:"triggered_by_id"`
|
||||
EntryTime types.UnixMilli `json:"entry_time"`
|
||||
Author string `json:"author"`
|
||||
Comment string `json:"comment"`
|
||||
IsFlexible types.Bool `json:"is_flexible"`
|
||||
FlexibleDuration uint64 `json:"flexible_duration"`
|
||||
ScheduledStartTime types.UnixMilli `json:"scheduled_start_time"`
|
||||
ScheduledEndTime types.UnixMilli `json:"scheduled_end_time"`
|
||||
StartTime types.UnixMilli `json:"start_time"`
|
||||
EndTime types.UnixMilli `json:"end_time"`
|
||||
TriggerTime types.UnixMilli `json:"trigger_time"`
|
||||
}
|
||||
|
||||
type HistoryDowntime struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
DowntimeHistoryId types.Binary `json:"downtime_id"`
|
||||
|
||||
// Idea: read StartTime, CancelTime, EndTime and HasBeenCancelled from Redis
|
||||
// and let EventTime decide based on HasBeenCancelled which of the others to write to MySQL.
|
||||
// So EventTime doesn't have to be read from Redis (json:"-")
|
||||
// and the others don't have to be written to MySQL (db:"-").
|
||||
StartTime types.UnixMilli `json:"start_time" db:"-"`
|
||||
CancelTime types.UnixMilli `json:"cancel_time" db:"-"`
|
||||
EndTime types.UnixMilli `json:"end_time" db:"-"`
|
||||
HasBeenCancelled types.Bool `json:"has_been_cancelled" db:"-"`
|
||||
EventTime DowntimeEventTime `json:"-"`
|
||||
}
|
||||
|
||||
// Init implements the contracts.Initer interface.
|
||||
func (h *HistoryDowntime) Init() {
|
||||
h.EventTime.History = h
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryDowntime) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
type DowntimeEventTime struct {
|
||||
History *HistoryDowntime `db:"-"`
|
||||
}
|
||||
|
||||
// Value implements the driver.Valuer interface.
|
||||
// Supports SQL NULL.
|
||||
func (et DowntimeEventTime) Value() (driver.Value, error) {
|
||||
if et.History == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch et.History.EventType {
|
||||
case "downtime_start":
|
||||
return et.History.StartTime.Value()
|
||||
case "downtime_end":
|
||||
if !et.History.HasBeenCancelled.Valid {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if et.History.HasBeenCancelled.Bool {
|
||||
return et.History.CancelTime.Value()
|
||||
} else {
|
||||
return et.History.EndTime.Value()
|
||||
}
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ contracts.Entity = (*DowntimeHistoryEntity)(nil)
|
||||
_ contracts.Upserter = (*DowntimeHistoryUpserter)(nil)
|
||||
_ UpserterEntity = (*DowntimeHistory)(nil)
|
||||
_ contracts.Initer = (*HistoryDowntime)(nil)
|
||||
_ contracts.TableNamer = (*HistoryDowntime)(nil)
|
||||
_ UpserterEntity = (*HistoryDowntime)(nil)
|
||||
_ driver.Valuer = DowntimeEventTime{}
|
||||
)
|
||||
80
pkg/icingadb/v1/history/flapping.go
Normal file
80
pkg/icingadb/v1/history/flapping.go
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type FlappingHistoryUpserter struct {
|
||||
EndTime types.UnixMilli `json:"end_time"`
|
||||
PercentStateChangeEnd types.Float `json:"percent_state_change_end"`
|
||||
FlappingThresholdLow float32 `json:"flapping_threshold_low"`
|
||||
FlappingThresholdHigh float32 `json:"flapping_threshold_high"`
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
func (fhu *FlappingHistoryUpserter) Upsert() interface{} {
|
||||
return fhu
|
||||
}
|
||||
|
||||
type FlappingHistory struct {
|
||||
v1.EntityWithoutChecksum `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
FlappingHistoryUpserter `json:",inline"`
|
||||
StartTime types.UnixMilli `json:"start_time"`
|
||||
PercentStateChangeStart types.Float `json:"percent_state_change_start"`
|
||||
}
|
||||
|
||||
type HistoryFlapping struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
FlappingHistoryId types.Binary `json:"id"`
|
||||
|
||||
// Idea: read StartTime and EndTime from Redis and let EventTime decide which of them to write to MySQL.
|
||||
// So EventTime doesn't have to be read from Redis (json:"-")
|
||||
// and the others don't have to be written to MySQL (db:"-").
|
||||
StartTime types.UnixMilli `json:"start_time" db:"-"`
|
||||
EndTime types.UnixMilli `json:"end_time" db:"-"`
|
||||
EventTime FlappingEventTime `json:"-"`
|
||||
}
|
||||
|
||||
// Init implements the contracts.Initer interface.
|
||||
func (h *HistoryFlapping) Init() {
|
||||
h.EventTime.History = h
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryFlapping) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
type FlappingEventTime struct {
|
||||
History *HistoryFlapping `db:"-"`
|
||||
}
|
||||
|
||||
// Value implements the driver.Valuer interface.
|
||||
// Supports SQL NULL.
|
||||
func (et FlappingEventTime) Value() (driver.Value, error) {
|
||||
if et.History == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch et.History.EventType {
|
||||
case "flapping_start":
|
||||
return et.History.StartTime.Value()
|
||||
case "flapping_end":
|
||||
return et.History.EndTime.Value()
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ UpserterEntity = (*FlappingHistory)(nil)
|
||||
_ contracts.Initer = (*HistoryFlapping)(nil)
|
||||
_ contracts.TableNamer = (*HistoryFlapping)(nil)
|
||||
_ UpserterEntity = (*HistoryFlapping)(nil)
|
||||
_ driver.Valuer = FlappingEventTime{}
|
||||
)
|
||||
89
pkg/icingadb/v1/history/meta.go
Normal file
89
pkg/icingadb/v1/history/meta.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type UpserterEntity interface {
|
||||
contracts.Upserter
|
||||
contracts.Entity
|
||||
}
|
||||
|
||||
type HistoryTableEntity struct {
|
||||
Id types.UUID `json:"id"`
|
||||
}
|
||||
|
||||
// Fingerprint implements part of the contracts.Entity interface.
|
||||
func (hte HistoryTableEntity) Fingerprint() contracts.Fingerprinter {
|
||||
return hte
|
||||
}
|
||||
|
||||
// ID implements part of the contracts.Entity interface.
|
||||
func (hte HistoryTableEntity) ID() contracts.ID {
|
||||
return hte.Id
|
||||
}
|
||||
|
||||
// SetID implements part of the contracts.Entity interface.
|
||||
func (hte *HistoryTableEntity) SetID(id contracts.ID) {
|
||||
hte.Id = id.(types.UUID)
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
// Update only the Id (effectively nothing).
|
||||
func (hte HistoryTableEntity) Upsert() interface{} {
|
||||
return hte
|
||||
}
|
||||
|
||||
type HistoryEntity struct {
|
||||
Id types.UUID `json:"event_id"`
|
||||
}
|
||||
|
||||
// Fingerprint implements part of the contracts.Entity interface.
|
||||
func (he HistoryEntity) Fingerprint() contracts.Fingerprinter {
|
||||
return he
|
||||
}
|
||||
|
||||
// ID implements part of the contracts.Entity interface.
|
||||
func (he HistoryEntity) ID() contracts.ID {
|
||||
return he.Id
|
||||
}
|
||||
|
||||
// SetID implements part of the contracts.Entity interface.
|
||||
func (he *HistoryEntity) SetID(id contracts.ID) {
|
||||
he.Id = id.(types.UUID)
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
// Update only the Id (effectively nothing).
|
||||
func (he HistoryEntity) Upsert() interface{} {
|
||||
return he
|
||||
}
|
||||
|
||||
type HistoryTableMeta struct {
|
||||
EnvironmentId types.Binary `json:"environment_id"`
|
||||
EndpointId types.Binary `json:"endpoint_id"`
|
||||
ObjectType string `json:"object_type"`
|
||||
HostId types.Binary `json:"host_id"`
|
||||
ServiceId types.Binary `json:"service_id"`
|
||||
}
|
||||
|
||||
type HistoryMeta struct {
|
||||
HistoryEntity `json:",inline"`
|
||||
EnvironmentId types.Binary `json:"environment_id"`
|
||||
EndpointId types.Binary `json:"endpoint_id"`
|
||||
ObjectType string `json:"object_type"`
|
||||
HostId types.Binary `json:"host_id"`
|
||||
ServiceId types.Binary `json:"service_id"`
|
||||
EventType string `json:"event_type"`
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ contracts.Entity = (*HistoryTableEntity)(nil)
|
||||
_ contracts.Upserter = HistoryTableEntity{}
|
||||
_ contracts.Entity = (*HistoryEntity)(nil)
|
||||
_ contracts.Upserter = HistoryEntity{}
|
||||
_ contracts.Entity = (*HistoryMeta)(nil)
|
||||
_ contracts.Upserter = (*HistoryMeta)(nil)
|
||||
)
|
||||
45
pkg/icingadb/v1/history/notification.go
Normal file
45
pkg/icingadb/v1/history/notification.go
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type NotificationHistory struct {
|
||||
HistoryTableEntity `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
NotificationId types.Binary `json:"notification_id"`
|
||||
Type types.NotificationType `json:"type"`
|
||||
SendTime types.UnixMilli `json:"send_time"`
|
||||
State uint8 `json:"state"`
|
||||
PreviousHardState uint8 `json:"previous_hard_state"`
|
||||
Author string `json:"author"`
|
||||
Text string `json:"text"`
|
||||
UsersNotified uint16 `json:"users_notified"`
|
||||
}
|
||||
|
||||
type UserNotificationHistory struct {
|
||||
HistoryTableEntity `json:",inline"`
|
||||
EnvironmentId types.Binary `json:"environment_id"`
|
||||
NotificationHistoryId types.UUID `json:"notification_history_id"`
|
||||
UserId types.Binary `json:"user_id"`
|
||||
}
|
||||
|
||||
type HistoryNotification struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
NotificationHistoryId types.UUID `json:"id"`
|
||||
EventTime types.UnixMilli `json:"send_time"`
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryNotification) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ UpserterEntity = (*NotificationHistory)(nil)
|
||||
_ UpserterEntity = (*UserNotificationHistory)(nil)
|
||||
_ contracts.TableNamer = (*HistoryNotification)(nil)
|
||||
_ UpserterEntity = (*HistoryNotification)(nil)
|
||||
)
|
||||
40
pkg/icingadb/v1/history/state.go
Normal file
40
pkg/icingadb/v1/history/state.go
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type StateHistory struct {
|
||||
HistoryTableEntity `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
EventTime types.UnixMilli `json:"event_time"`
|
||||
StateType types.StateType `json:"state_type"`
|
||||
SoftState uint8 `json:"soft_state"`
|
||||
HardState uint8 `json:"hard_state"`
|
||||
PreviousSoftState uint8 `json:"previous_soft_state"`
|
||||
PreviousHardState uint8 `json:"previous_hard_state"`
|
||||
Attempt uint8 `json:"attempt"`
|
||||
Output types.String `json:"output"`
|
||||
LongOutput types.String `json:"long_output"`
|
||||
MaxCheckAttempts uint32 `json:"max_check_attempts"`
|
||||
CheckSource types.String `json:"check_source"`
|
||||
}
|
||||
|
||||
type HistoryState struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
StateHistoryId types.UUID `json:"id"`
|
||||
EventTime types.UnixMilli `json:"event_time"`
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryState) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ UpserterEntity = (*StateHistory)(nil)
|
||||
_ contracts.TableNamer = (*HistoryState)(nil)
|
||||
_ UpserterEntity = (*HistoryState)(nil)
|
||||
)
|
||||
Loading…
Reference in a new issue