mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
commit
3ea98313c3
23 changed files with 1456 additions and 75 deletions
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/flatten"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
"github.com/icinga/icingadb/pkg/icingadb/history"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
|
|
@ -40,6 +41,7 @@ func main() {
|
|||
heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger)
|
||||
ha := icingadb.NewHA(ctx, db, heartbeat, logger)
|
||||
s := icingadb.NewSync(db, rc, logger)
|
||||
hs := history.NewSync(db, rc, logger)
|
||||
|
||||
// For temporary exit after sync
|
||||
done := make(chan struct{}, 0)
|
||||
|
|
@ -186,6 +188,10 @@ func main() {
|
|||
})
|
||||
}
|
||||
|
||||
g.Go(func() error {
|
||||
return hs.Sync(synctx)
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
// TODO(el): This panics here even if a ctx gets cancelled.
|
||||
// That is intentional for the moment for testing.
|
||||
|
|
|
|||
|
|
@ -2,21 +2,22 @@ package com
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Bulker struct {
|
||||
ch chan []interface{}
|
||||
ch chan []contracts.Entity
|
||||
ctx context.Context
|
||||
mu sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker {
|
||||
func NewBulker(ctx context.Context, ch <-chan contracts.Entity, count int) *Bulker {
|
||||
b := &Bulker{
|
||||
ch: make(chan []interface{}),
|
||||
ch: make(chan []contracts.Entity),
|
||||
ctx: ctx,
|
||||
mu: sync.Mutex{},
|
||||
}
|
||||
|
|
@ -27,14 +28,14 @@ func NewBulker(ctx context.Context, ch <-chan interface{}, count int) *Bulker {
|
|||
}
|
||||
|
||||
// Bulk returns the channel on which the bulks are delivered.
|
||||
func (b *Bulker) Bulk() <-chan []interface{} {
|
||||
func (b *Bulker) Bulk() <-chan []contracts.Entity {
|
||||
return b.ch
|
||||
}
|
||||
|
||||
func (b *Bulker) run(ch <-chan interface{}, count int) {
|
||||
func (b *Bulker) run(ch <-chan contracts.Entity, count int) {
|
||||
defer close(b.ch)
|
||||
|
||||
bufCh := make(chan interface{}, count)
|
||||
bufCh := make(chan contracts.Entity, count)
|
||||
g, ctx := errgroup.WithContext(b.ctx)
|
||||
|
||||
g.Go(func() error {
|
||||
|
|
@ -56,7 +57,7 @@ func (b *Bulker) run(ch <-chan interface{}, count int) {
|
|||
|
||||
g.Go(func() error {
|
||||
for done := false; !done; {
|
||||
buf := make([]interface{}, 0, count)
|
||||
buf := make([]contracts.Entity, 0, count)
|
||||
timeout := time.After(256 * time.Millisecond)
|
||||
|
||||
for drain := true; drain && len(buf) < count; {
|
||||
|
|
@ -90,6 +91,6 @@ func (b *Bulker) run(ch <-chan interface{}, count int) {
|
|||
_ = g.Wait()
|
||||
}
|
||||
|
||||
func Bulk(ctx context.Context, ch <-chan interface{}, count int) <-chan []interface{} {
|
||||
func Bulk(ctx context.Context, ch <-chan contracts.Entity, count int) <-chan []contracts.Entity {
|
||||
return NewBulker(ctx, ch, count).Bulk()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package com
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
|
@ -44,3 +45,48 @@ func PipeError(in <-chan error, out chan<- error) {
|
|||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item.
|
||||
func CopyFirst(
|
||||
ctx context.Context, input <-chan contracts.Entity,
|
||||
) (first contracts.Entity, forward <-chan contracts.Entity, err error) {
|
||||
var ok bool
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, nil, ctx.Err()
|
||||
case first, ok = <-input:
|
||||
}
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Buffer of one because we receive an entity and send it back immediately.
|
||||
fwd := make(chan contracts.Entity, 1)
|
||||
fwd <- first
|
||||
|
||||
forward = fwd
|
||||
|
||||
go func() {
|
||||
defer close(fwd)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case e, ok := <-input:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case fwd <- e:
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,3 +60,15 @@ type Waiter interface {
|
|||
type Initer interface {
|
||||
Init() // Init initializes the object.
|
||||
}
|
||||
|
||||
// Upserter implements the Upsert method,
|
||||
// which returns a part of the object for ON DUPLICATE KEY UPDATE.
|
||||
type Upserter interface {
|
||||
Upsert() interface{} // Upsert partitions the object.
|
||||
}
|
||||
|
||||
// TableNamer implements the TableName method,
|
||||
// which returns the table of the object.
|
||||
type TableNamer interface {
|
||||
TableName() string // TableName tells the table.
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ func (db DB) BuildColumns(subject interface{}) []string {
|
|||
func (db DB) BuildDeleteStmt(from interface{}) string {
|
||||
return fmt.Sprintf(
|
||||
`DELETE FROM %s WHERE id IN (?)`,
|
||||
utils.Key(utils.Name(from), '_'),
|
||||
utils.TableName(from),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -57,7 +57,7 @@ func (db DB) BuildInsertStmt(into interface{}) string {
|
|||
|
||||
return fmt.Sprintf(
|
||||
`INSERT INTO %s (%s) VALUES (%s)`,
|
||||
utils.Key(utils.Name(into), '_'),
|
||||
utils.TableName(into),
|
||||
strings.Join(columns, ", "),
|
||||
fmt.Sprintf(":%s", strings.Join(columns, ", :")),
|
||||
)
|
||||
|
|
@ -67,7 +67,7 @@ func (db DB) BuildSelectStmt(from interface{}, into interface{}) string {
|
|||
return fmt.Sprintf(
|
||||
`SELECT %s FROM %s`,
|
||||
strings.Join(db.BuildColumns(into), ", "),
|
||||
utils.Key(utils.Name(from), '_'),
|
||||
utils.TableName(from),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -81,26 +81,34 @@ func (db DB) BuildUpdateStmt(update interface{}) string {
|
|||
|
||||
return fmt.Sprintf(
|
||||
`UPDATE %s SET %s WHERE id = :id`,
|
||||
utils.Key(utils.Name(update), '_'),
|
||||
utils.TableName(update),
|
||||
strings.Join(set, ", "),
|
||||
)
|
||||
}
|
||||
|
||||
func (db DB) BuildUpsertStmt(subject interface{}) string {
|
||||
columns := db.BuildColumns(subject)
|
||||
set := make([]string, 0, len(columns))
|
||||
func (db DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) {
|
||||
insertColumns := db.BuildColumns(subject)
|
||||
var updateColumns []string
|
||||
|
||||
for _, col := range columns {
|
||||
if upserter, ok := subject.(contracts.Upserter); ok {
|
||||
updateColumns = db.BuildColumns(upserter.Upsert())
|
||||
} else {
|
||||
updateColumns = insertColumns
|
||||
}
|
||||
|
||||
set := make([]string, 0, len(updateColumns))
|
||||
|
||||
for _, col := range updateColumns {
|
||||
set = append(set, fmt.Sprintf("%s = :%s", col, col))
|
||||
}
|
||||
|
||||
return fmt.Sprintf(
|
||||
`INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s`,
|
||||
utils.Key(utils.Name(subject), '_'),
|
||||
strings.Join(columns, ","),
|
||||
fmt.Sprintf(":%s", strings.Join(columns, ",:")),
|
||||
utils.TableName(subject),
|
||||
strings.Join(insertColumns, ","),
|
||||
fmt.Sprintf(":%s", strings.Join(insertColumns, ",:")),
|
||||
strings.Join(set, ","),
|
||||
)
|
||||
), len(insertColumns) + len(updateColumns)
|
||||
}
|
||||
|
||||
func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent int, args []interface{}) error {
|
||||
|
|
@ -158,7 +166,10 @@ func (db DB) BulkExec(ctx context.Context, query string, count int, concurrent i
|
|||
return g.Wait()
|
||||
}
|
||||
|
||||
func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurrent int, arg chan interface{}) error {
|
||||
func (db DB) NamedBulkExec(
|
||||
ctx context.Context, query string, count int, concurrent int,
|
||||
arg <-chan contracts.Entity, succeeded chan<- contracts.Entity,
|
||||
) error {
|
||||
var cnt com.Counter
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
bulk := com.Bulk(ctx, arg, count)
|
||||
|
|
@ -186,7 +197,7 @@ func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurr
|
|||
return err
|
||||
}
|
||||
|
||||
g.Go(func(b []interface{}) func() error {
|
||||
g.Go(func(b []contracts.Entity) func() error {
|
||||
return func() error {
|
||||
defer sem.Release(1)
|
||||
|
||||
|
|
@ -204,6 +215,16 @@ func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurr
|
|||
|
||||
cnt.Add(uint64(len(b)))
|
||||
|
||||
if succeeded != nil {
|
||||
for _, row := range b {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case succeeded <- row:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
IsRetryable,
|
||||
|
|
@ -219,7 +240,9 @@ func (db DB) NamedBulkExec(ctx context.Context, query string, count int, concurr
|
|||
return g.Wait()
|
||||
}
|
||||
|
||||
func (db DB) NamedBulkExecTx(ctx context.Context, query string, count int, concurrent int, arg chan interface{}) error {
|
||||
func (db DB) NamedBulkExecTx(
|
||||
ctx context.Context, query string, count int, concurrent int, arg <-chan contracts.Entity,
|
||||
) error {
|
||||
var cnt com.Counter
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
bulk := com.Bulk(ctx, arg, count)
|
||||
|
|
@ -243,7 +266,7 @@ func (db DB) NamedBulkExecTx(ctx context.Context, query string, count int, concu
|
|||
return err
|
||||
}
|
||||
|
||||
g.Go(func(b []interface{}) func() error {
|
||||
g.Go(func(b []contracts.Entity) func() error {
|
||||
return func() error {
|
||||
defer sem.Release(1)
|
||||
|
||||
|
|
@ -329,53 +352,34 @@ func (db DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFu
|
|||
}
|
||||
|
||||
func (db DB) Create(ctx context.Context, entities <-chan contracts.Entity) error {
|
||||
// TODO(el): Check ctx.Done()?
|
||||
entity := <-entities
|
||||
if entity == nil {
|
||||
return nil
|
||||
first, forward, err := com.CopyFirst(ctx, entities)
|
||||
if first == nil {
|
||||
return err
|
||||
}
|
||||
// Buffer of one because we receive an entity and send it back immediately.
|
||||
inserts := make(chan interface{}, 1)
|
||||
inserts <- entity
|
||||
|
||||
go func() {
|
||||
defer close(inserts)
|
||||
return db.NamedBulkExec(ctx, db.BuildInsertStmt(first), 1<<15/len(db.BuildColumns(first)), 1<<3, forward, nil)
|
||||
}
|
||||
|
||||
for e := range entities {
|
||||
select {
|
||||
case inserts <- e:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
func (db DB) Upsert(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error {
|
||||
first, forward, err := com.CopyFirst(ctx, entities)
|
||||
if first == nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return db.NamedBulkExec(ctx, db.BuildInsertStmt(entity), 1<<15/len(db.BuildColumns(entity)), 1<<3, inserts)
|
||||
// TODO(ak): wait for https://github.com/jmoiron/sqlx/issues/694
|
||||
//stmt, placeholders := db.BuildUpsertStmt(first)
|
||||
//return db.NamedBulkExec(ctx, stmt, 1<<15/placeholders, 1<<3, forward, succeeded)
|
||||
stmt, _ := db.BuildUpsertStmt(first)
|
||||
return db.NamedBulkExec(ctx, stmt, 1, 1<<3, forward, succeeded)
|
||||
}
|
||||
|
||||
func (db DB) Update(ctx context.Context, entities <-chan contracts.Entity) error {
|
||||
// TODO(el): Check ctx.Done()?
|
||||
entity := <-entities
|
||||
if entity == nil {
|
||||
return nil
|
||||
first, forward, err := com.CopyFirst(ctx, entities)
|
||||
if first == nil {
|
||||
return err
|
||||
}
|
||||
// Buffer of one because we receive an entity and send it back immediately.
|
||||
updates := make(chan interface{}, 1)
|
||||
updates <- entity
|
||||
|
||||
go func() {
|
||||
defer close(updates)
|
||||
|
||||
for e := range entities {
|
||||
select {
|
||||
case updates <- e:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return db.NamedBulkExecTx(ctx, db.BuildUpdateStmt(entity), 1<<15, 1<<3, updates)
|
||||
return db.NamedBulkExecTx(ctx, db.BuildUpdateStmt(first), 1<<15, 1<<3, forward)
|
||||
}
|
||||
|
||||
func IsRetryable(err error) bool {
|
||||
|
|
|
|||
|
|
@ -186,7 +186,10 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error {
|
|||
Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled,
|
||||
Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled,
|
||||
}
|
||||
_, err = tx.NamedExecContext(ctx, h.db.BuildUpsertStmt(i), i)
|
||||
|
||||
stmt, _ := h.db.BuildUpsertStmt(i)
|
||||
_, err = tx.NamedExecContext(ctx, stmt, i)
|
||||
|
||||
if err != nil {
|
||||
cancel()
|
||||
if !utils.IsDeadlock(err) {
|
||||
|
|
|
|||
300
pkg/icingadb/history/sync.go
Normal file
300
pkg/icingadb/history/sync.go
Normal file
|
|
@ -0,0 +1,300 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/structify"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Sync specifies the source and destination of a history sync.
|
||||
type Sync struct {
|
||||
db *icingadb.DB
|
||||
redis *icingaredis.Client
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
// NewSync creates a new Sync.
|
||||
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync {
|
||||
return &Sync{
|
||||
db: db,
|
||||
redis: redis,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// insertedMessage represents a just inserted row.
|
||||
type insertedMessage struct {
|
||||
// redisId specifies the origin Redis message.
|
||||
redisId string
|
||||
// structType represents the table the row was inserted into.
|
||||
structType reflect.Type
|
||||
}
|
||||
|
||||
const bulkSize = 1 << 14
|
||||
|
||||
// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
|
||||
func (s Sync) Sync(ctx context.Context) error {
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
for _, hs := range historyStreams {
|
||||
var redis2structs []chan<- redis.XMessage
|
||||
insertedMessages := make(chan insertedMessage, bulkSize)
|
||||
|
||||
// messageProgress are the tables (represented by struct types)
|
||||
// with successfully inserted rows by Redis message ID.
|
||||
messageProgress := map[string]map[reflect.Type]struct{}{}
|
||||
messageProgressMtx := &sync.Mutex{}
|
||||
|
||||
stream := "icinga:history:stream:" + hs.kind
|
||||
s.logger.Infof("Syncing %s history", hs.kind)
|
||||
|
||||
for _, structifier := range hs.structifiers {
|
||||
redis2struct := make(chan redis.XMessage, bulkSize)
|
||||
struct2db := make(chan contracts.Entity, bulkSize)
|
||||
succeeded := make(chan contracts.Entity, bulkSize)
|
||||
|
||||
// rowIds are IDs of to be synced Redis messages by database row.
|
||||
rowIds := map[contracts.Entity]string{}
|
||||
rowIdsMtx := &sync.Mutex{}
|
||||
|
||||
redis2structs = append(redis2structs, redis2struct)
|
||||
|
||||
g.Go(structifyStream(ctx, structifier, redis2struct, struct2db, rowIds, rowIdsMtx))
|
||||
g.Go(fwdSucceeded(ctx, insertedMessages, succeeded, rowIds, rowIdsMtx))
|
||||
|
||||
// Upserts from struct2db.
|
||||
g.Go(func() error {
|
||||
defer close(succeeded)
|
||||
return s.db.Upsert(ctx, struct2db, succeeded)
|
||||
})
|
||||
}
|
||||
|
||||
g.Go(s.xRead(ctx, redis2structs, stream))
|
||||
g.Go(s.cleanup(ctx, hs, insertedMessages, messageProgress, messageProgressMtx, stream))
|
||||
}
|
||||
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
// xRead reads from the Redis stream and broadcasts the data to redis2structs.
|
||||
func (s Sync) xRead(ctx context.Context, redis2structs []chan<- redis.XMessage, stream string) func() error {
|
||||
return func() error {
|
||||
defer func() {
|
||||
for _, r2s := range redis2structs {
|
||||
close(r2s)
|
||||
}
|
||||
}()
|
||||
|
||||
xra := &redis.XReadArgs{
|
||||
Streams: []string{stream, "0-0"},
|
||||
Count: bulkSize,
|
||||
Block: 10 * time.Second,
|
||||
}
|
||||
|
||||
for {
|
||||
streams, err := s.redis.XRead(ctx, xra).Result()
|
||||
if err != nil && err != redis.Nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, stream := range streams {
|
||||
for _, message := range stream.Messages {
|
||||
xra.Streams[1] = message.ID
|
||||
|
||||
for _, r2s := range redis2structs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case r2s <- message:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// structifyStream structifies from redis2struct to struct2db.
|
||||
func structifyStream(
|
||||
ctx context.Context, structifier structify.MapStructifier, redis2struct <-chan redis.XMessage,
|
||||
struct2db chan<- contracts.Entity, rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex,
|
||||
) func() error {
|
||||
return func() error {
|
||||
defer close(struct2db)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case message, ok := <-redis2struct:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
ptr, err := structifier(message.Values)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ue := ptr.(v1.UpserterEntity)
|
||||
|
||||
rowIdsMtx.Lock()
|
||||
rowIds[ue] = message.ID
|
||||
rowIdsMtx.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case struct2db <- ue:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fwdSucceeded informs insertedMessages about successfully inserted rows according to succeeded.
|
||||
func fwdSucceeded(
|
||||
ctx context.Context, insertedMessages chan<- insertedMessage, succeeded <-chan contracts.Entity,
|
||||
rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex,
|
||||
) func() error {
|
||||
return func() error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case row, ok := <-succeeded:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
rowIdsMtx.Lock()
|
||||
|
||||
id, ok := rowIds[row]
|
||||
if ok {
|
||||
delete(rowIds, row)
|
||||
}
|
||||
|
||||
rowIdsMtx.Unlock()
|
||||
|
||||
if ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case insertedMessages <- insertedMessage{id, reflect.TypeOf(row).Elem()}:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup collects completely inserted messages from insertedMessages and deletes them from Redis.
|
||||
func (s Sync) cleanup(
|
||||
ctx context.Context, hs historyStream, insertedMessages <-chan insertedMessage,
|
||||
messageProgress map[string]map[reflect.Type]struct{}, messageProgressMtx *sync.Mutex, stream string,
|
||||
) func() error {
|
||||
return func() error {
|
||||
var ids []string
|
||||
var count uint64
|
||||
var timeout <-chan time.Time
|
||||
|
||||
const period = 20 * time.Second
|
||||
periodically := time.NewTicker(period)
|
||||
defer periodically.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-periodically.C:
|
||||
if count > 0 {
|
||||
s.logger.Infof("Inserted %d %s history entries in the last %s", count, hs.kind, period)
|
||||
count = 0
|
||||
}
|
||||
case msg := <-insertedMessages:
|
||||
messageProgressMtx.Lock()
|
||||
|
||||
mp, ok := messageProgress[msg.redisId]
|
||||
if !ok {
|
||||
mp = map[reflect.Type]struct{}{}
|
||||
messageProgress[msg.redisId] = mp
|
||||
}
|
||||
|
||||
mp[msg.structType] = struct{}{}
|
||||
|
||||
if ok = len(mp) == len(hs.structifiers); ok {
|
||||
delete(messageProgress, msg.redisId)
|
||||
}
|
||||
|
||||
messageProgressMtx.Unlock()
|
||||
|
||||
if ok {
|
||||
ids = append(ids, msg.redisId)
|
||||
count++
|
||||
|
||||
switch len(ids) {
|
||||
case 1:
|
||||
timeout = time.After(time.Second / 4)
|
||||
case bulkSize:
|
||||
if _, err := s.redis.XDel(ctx, stream, ids...).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ids = nil
|
||||
timeout = nil
|
||||
}
|
||||
}
|
||||
case <-timeout:
|
||||
if _, err := s.redis.XDel(ctx, stream, ids...).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ids = nil
|
||||
timeout = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// historyStream represents a Redis history stream.
|
||||
type historyStream struct {
|
||||
// kind specifies the stream's purpose.
|
||||
kind string
|
||||
// structifiers lists the factories of the model structs the stream data shall be copied to.
|
||||
structifiers []structify.MapStructifier
|
||||
}
|
||||
|
||||
// historyStreams contains all Redis history streams to sync.
|
||||
var historyStreams = func() []historyStream {
|
||||
var streams []historyStream
|
||||
for _, rhs := range []struct {
|
||||
kind string
|
||||
structPtrs []v1.UpserterEntity
|
||||
}{
|
||||
{"notification", []v1.UpserterEntity{(*v1.NotificationHistory)(nil), (*v1.HistoryNotification)(nil)}},
|
||||
{"usernotification", []v1.UpserterEntity{(*v1.UserNotificationHistory)(nil)}},
|
||||
{"state", []v1.UpserterEntity{(*v1.StateHistory)(nil), (*v1.HistoryState)(nil)}},
|
||||
{"downtime", []v1.UpserterEntity{(*v1.DowntimeHistory)(nil), (*v1.HistoryDowntime)(nil)}},
|
||||
{"comment", []v1.UpserterEntity{(*v1.CommentHistory)(nil), (*v1.HistoryComment)(nil)}},
|
||||
{"flapping", []v1.UpserterEntity{(*v1.FlappingHistory)(nil), (*v1.HistoryFlapping)(nil)}},
|
||||
{"acknowledgement", []v1.UpserterEntity{(*v1.AcknowledgementHistory)(nil), (*v1.HistoryAck)(nil)}},
|
||||
} {
|
||||
var structifiers []structify.MapStructifier
|
||||
for _, structPtr := range rhs.structPtrs {
|
||||
structifiers = append(structifiers, structify.MakeMapStructifier(reflect.TypeOf(structPtr).Elem(), "json"))
|
||||
}
|
||||
|
||||
streams = append(streams, historyStream{rhs.kind, structifiers})
|
||||
}
|
||||
|
||||
return streams
|
||||
}()
|
||||
82
pkg/icingadb/v1/history/ack.go
Normal file
82
pkg/icingadb/v1/history/ack.go
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type AckHistoryUpserter struct {
|
||||
ClearTime types.UnixMilli `json:"clear_time"`
|
||||
ClearedBy types.String `json:"cleared_by"`
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
func (ahu *AckHistoryUpserter) Upsert() interface{} {
|
||||
return ahu
|
||||
}
|
||||
|
||||
type AcknowledgementHistory struct {
|
||||
v1.EntityWithoutChecksum `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
AckHistoryUpserter `json:",inline"`
|
||||
SetTime types.UnixMilli `json:"set_time"`
|
||||
Author string `json:"author"`
|
||||
Comment types.String `json:"comment"`
|
||||
ExpireTime types.UnixMilli `json:"expire_time"`
|
||||
IsPersistent types.Bool `json:"is_persistent"`
|
||||
IsSticky types.Bool `json:"is_sticky"`
|
||||
}
|
||||
|
||||
type HistoryAck struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
AcknowledgementHistoryId types.Binary `json:"id"`
|
||||
|
||||
// Idea: read SetTime and ClearTime from Redis and let EventTime decide which of them to write to MySQL.
|
||||
// So EventTime doesn't have to be read from Redis (json:"-")
|
||||
// and the others don't have to be written to MySQL (db:"-").
|
||||
SetTime types.UnixMilli `json:"set_time" db:"-"`
|
||||
ClearTime types.UnixMilli `json:"clear_time" db:"-"`
|
||||
EventTime AckEventTime `json:"-"`
|
||||
}
|
||||
|
||||
// Init implements the contracts.Initer interface.
|
||||
func (h *HistoryAck) Init() {
|
||||
h.EventTime.History = h
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryAck) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
type AckEventTime struct {
|
||||
History *HistoryAck `db:"-"`
|
||||
}
|
||||
|
||||
// Value implements the driver.Valuer interface.
|
||||
// Supports SQL NULL.
|
||||
func (et AckEventTime) Value() (driver.Value, error) {
|
||||
if et.History == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch et.History.EventType {
|
||||
case "ack_set":
|
||||
return et.History.SetTime.Value()
|
||||
case "ack_clear":
|
||||
return et.History.ClearTime.Value()
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ UpserterEntity = (*AcknowledgementHistory)(nil)
|
||||
_ contracts.Initer = (*HistoryAck)(nil)
|
||||
_ contracts.TableNamer = (*HistoryAck)(nil)
|
||||
_ UpserterEntity = (*HistoryAck)(nil)
|
||||
_ driver.Valuer = AckEventTime{}
|
||||
)
|
||||
120
pkg/icingadb/v1/history/comment.go
Normal file
120
pkg/icingadb/v1/history/comment.go
Normal file
|
|
@ -0,0 +1,120 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type CommentHistoryEntity struct {
|
||||
CommentId types.Binary `json:"comment_id"`
|
||||
}
|
||||
|
||||
// Fingerprint implements part of the contracts.Entity interface.
|
||||
func (che CommentHistoryEntity) Fingerprint() contracts.Fingerprinter {
|
||||
return che
|
||||
}
|
||||
|
||||
// ID implements part of the contracts.Entity interface.
|
||||
func (che CommentHistoryEntity) ID() contracts.ID {
|
||||
return che.CommentId
|
||||
}
|
||||
|
||||
// SetID implements part of the contracts.Entity interface.
|
||||
func (che *CommentHistoryEntity) SetID(id contracts.ID) {
|
||||
che.CommentId = id.(types.Binary)
|
||||
}
|
||||
|
||||
type CommentHistoryUpserter struct {
|
||||
RemovedBy types.String `json:"removed_by"`
|
||||
RemoveTime types.UnixMilli `json:"remove_time"`
|
||||
HasBeenRemoved types.Bool `json:"has_been_removed"`
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
func (chu *CommentHistoryUpserter) Upsert() interface{} {
|
||||
return chu
|
||||
}
|
||||
|
||||
type CommentHistory struct {
|
||||
CommentHistoryEntity `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
CommentHistoryUpserter `json:",inline"`
|
||||
EntryTime types.UnixMilli `json:"entry_time"`
|
||||
Author string `json:"author"`
|
||||
Comment string `json:"comment"`
|
||||
EntryType types.CommentType `json:"entry_type"`
|
||||
IsPersistent types.Bool `json:"is_persistent"`
|
||||
IsSticky types.Bool `json:"is_sticky"`
|
||||
ExpireTime types.UnixMilli `json:"expire_time"`
|
||||
}
|
||||
|
||||
// Init implements the contracts.Initer interface.
|
||||
func (ch *CommentHistory) Init() {
|
||||
ch.HasBeenRemoved = types.Bool{
|
||||
Bool: false,
|
||||
Valid: true,
|
||||
}
|
||||
}
|
||||
|
||||
type HistoryComment struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
CommentHistoryId types.Binary `json:"comment_id"`
|
||||
|
||||
// Idea: read EntryTime, RemoveTime and ExpireTime from Redis
|
||||
// and let EventTime decide which of them to write to MySQL.
|
||||
// So EventTime doesn't have to be read from Redis (json:"-")
|
||||
// and the others don't have to be written to MySQL (db:"-").
|
||||
EntryTime types.UnixMilli `json:"entry_time" db:"-"`
|
||||
RemoveTime types.UnixMilli `json:"remove_time" db:"-"`
|
||||
ExpireTime types.UnixMilli `json:"expire_time" db:"-"`
|
||||
EventTime CommentEventTime `json:"-"`
|
||||
}
|
||||
|
||||
// Init implements the contracts.Initer interface.
|
||||
func (h *HistoryComment) Init() {
|
||||
h.EventTime.History = h
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryComment) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
type CommentEventTime struct {
|
||||
History *HistoryComment `db:"-"`
|
||||
}
|
||||
|
||||
// Value implements the driver.Valuer interface.
|
||||
// Supports SQL NULL.
|
||||
func (et CommentEventTime) Value() (driver.Value, error) {
|
||||
if et.History == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch et.History.EventType {
|
||||
case "comment_add":
|
||||
return et.History.EntryTime.Value()
|
||||
case "comment_remove":
|
||||
v, err := et.History.RemoveTime.Value()
|
||||
if err == nil && v == nil {
|
||||
return et.History.ExpireTime.Value()
|
||||
}
|
||||
|
||||
return v, err
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ contracts.Entity = (*CommentHistoryEntity)(nil)
|
||||
_ contracts.Upserter = (*CommentHistoryUpserter)(nil)
|
||||
_ contracts.Initer = (*CommentHistory)(nil)
|
||||
_ UpserterEntity = (*CommentHistory)(nil)
|
||||
_ contracts.Initer = (*HistoryComment)(nil)
|
||||
_ contracts.TableNamer = (*HistoryComment)(nil)
|
||||
_ UpserterEntity = (*HistoryComment)(nil)
|
||||
_ driver.Valuer = CommentEventTime{}
|
||||
)
|
||||
119
pkg/icingadb/v1/history/downtime.go
Normal file
119
pkg/icingadb/v1/history/downtime.go
Normal file
|
|
@ -0,0 +1,119 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type DowntimeHistoryEntity struct {
|
||||
DowntimeId types.Binary `json:"downtime_id"`
|
||||
}
|
||||
|
||||
// Fingerprint implements part of the contracts.Entity interface.
|
||||
func (dhe DowntimeHistoryEntity) Fingerprint() contracts.Fingerprinter {
|
||||
return dhe
|
||||
}
|
||||
|
||||
// ID implements part of the contracts.Entity interface.
|
||||
func (dhe DowntimeHistoryEntity) ID() contracts.ID {
|
||||
return dhe.DowntimeId
|
||||
}
|
||||
|
||||
// SetID implements part of the contracts.Entity interface.
|
||||
func (dhe *DowntimeHistoryEntity) SetID(id contracts.ID) {
|
||||
dhe.DowntimeId = id.(types.Binary)
|
||||
}
|
||||
|
||||
type DowntimeHistoryUpserter struct {
|
||||
CancelledBy types.String `json:"cancelled_by"`
|
||||
HasBeenCancelled types.Bool `json:"has_been_cancelled"`
|
||||
CancelTime types.UnixMilli `json:"cancel_time"`
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
func (dhu *DowntimeHistoryUpserter) Upsert() interface{} {
|
||||
return dhu
|
||||
}
|
||||
|
||||
type DowntimeHistory struct {
|
||||
DowntimeHistoryEntity `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
DowntimeHistoryUpserter `json:",inline"`
|
||||
TriggeredById types.Binary `json:"triggered_by_id"`
|
||||
EntryTime types.UnixMilli `json:"entry_time"`
|
||||
Author string `json:"author"`
|
||||
Comment string `json:"comment"`
|
||||
IsFlexible types.Bool `json:"is_flexible"`
|
||||
FlexibleDuration uint64 `json:"flexible_duration"`
|
||||
ScheduledStartTime types.UnixMilli `json:"scheduled_start_time"`
|
||||
ScheduledEndTime types.UnixMilli `json:"scheduled_end_time"`
|
||||
StartTime types.UnixMilli `json:"start_time"`
|
||||
EndTime types.UnixMilli `json:"end_time"`
|
||||
TriggerTime types.UnixMilli `json:"trigger_time"`
|
||||
}
|
||||
|
||||
type HistoryDowntime struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
DowntimeHistoryId types.Binary `json:"downtime_id"`
|
||||
|
||||
// Idea: read StartTime, CancelTime, EndTime and HasBeenCancelled from Redis
|
||||
// and let EventTime decide based on HasBeenCancelled which of the others to write to MySQL.
|
||||
// So EventTime doesn't have to be read from Redis (json:"-")
|
||||
// and the others don't have to be written to MySQL (db:"-").
|
||||
StartTime types.UnixMilli `json:"start_time" db:"-"`
|
||||
CancelTime types.UnixMilli `json:"cancel_time" db:"-"`
|
||||
EndTime types.UnixMilli `json:"end_time" db:"-"`
|
||||
HasBeenCancelled types.Bool `json:"has_been_cancelled" db:"-"`
|
||||
EventTime DowntimeEventTime `json:"-"`
|
||||
}
|
||||
|
||||
// Init implements the contracts.Initer interface.
|
||||
func (h *HistoryDowntime) Init() {
|
||||
h.EventTime.History = h
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryDowntime) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
type DowntimeEventTime struct {
|
||||
History *HistoryDowntime `db:"-"`
|
||||
}
|
||||
|
||||
// Value implements the driver.Valuer interface.
|
||||
// Supports SQL NULL.
|
||||
func (et DowntimeEventTime) Value() (driver.Value, error) {
|
||||
if et.History == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch et.History.EventType {
|
||||
case "downtime_start":
|
||||
return et.History.StartTime.Value()
|
||||
case "downtime_end":
|
||||
if !et.History.HasBeenCancelled.Valid {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if et.History.HasBeenCancelled.Bool {
|
||||
return et.History.CancelTime.Value()
|
||||
} else {
|
||||
return et.History.EndTime.Value()
|
||||
}
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ contracts.Entity = (*DowntimeHistoryEntity)(nil)
|
||||
_ contracts.Upserter = (*DowntimeHistoryUpserter)(nil)
|
||||
_ UpserterEntity = (*DowntimeHistory)(nil)
|
||||
_ contracts.Initer = (*HistoryDowntime)(nil)
|
||||
_ contracts.TableNamer = (*HistoryDowntime)(nil)
|
||||
_ UpserterEntity = (*HistoryDowntime)(nil)
|
||||
_ driver.Valuer = DowntimeEventTime{}
|
||||
)
|
||||
80
pkg/icingadb/v1/history/flapping.go
Normal file
80
pkg/icingadb/v1/history/flapping.go
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type FlappingHistoryUpserter struct {
|
||||
EndTime types.UnixMilli `json:"end_time"`
|
||||
PercentStateChangeEnd types.Float `json:"percent_state_change_end"`
|
||||
FlappingThresholdLow float32 `json:"flapping_threshold_low"`
|
||||
FlappingThresholdHigh float32 `json:"flapping_threshold_high"`
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
func (fhu *FlappingHistoryUpserter) Upsert() interface{} {
|
||||
return fhu
|
||||
}
|
||||
|
||||
type FlappingHistory struct {
|
||||
v1.EntityWithoutChecksum `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
FlappingHistoryUpserter `json:",inline"`
|
||||
StartTime types.UnixMilli `json:"start_time"`
|
||||
PercentStateChangeStart types.Float `json:"percent_state_change_start"`
|
||||
}
|
||||
|
||||
type HistoryFlapping struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
FlappingHistoryId types.Binary `json:"id"`
|
||||
|
||||
// Idea: read StartTime and EndTime from Redis and let EventTime decide which of them to write to MySQL.
|
||||
// So EventTime doesn't have to be read from Redis (json:"-")
|
||||
// and the others don't have to be written to MySQL (db:"-").
|
||||
StartTime types.UnixMilli `json:"start_time" db:"-"`
|
||||
EndTime types.UnixMilli `json:"end_time" db:"-"`
|
||||
EventTime FlappingEventTime `json:"-"`
|
||||
}
|
||||
|
||||
// Init implements the contracts.Initer interface.
|
||||
func (h *HistoryFlapping) Init() {
|
||||
h.EventTime.History = h
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryFlapping) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
type FlappingEventTime struct {
|
||||
History *HistoryFlapping `db:"-"`
|
||||
}
|
||||
|
||||
// Value implements the driver.Valuer interface.
|
||||
// Supports SQL NULL.
|
||||
func (et FlappingEventTime) Value() (driver.Value, error) {
|
||||
if et.History == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch et.History.EventType {
|
||||
case "flapping_start":
|
||||
return et.History.StartTime.Value()
|
||||
case "flapping_end":
|
||||
return et.History.EndTime.Value()
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ UpserterEntity = (*FlappingHistory)(nil)
|
||||
_ contracts.Initer = (*HistoryFlapping)(nil)
|
||||
_ contracts.TableNamer = (*HistoryFlapping)(nil)
|
||||
_ UpserterEntity = (*HistoryFlapping)(nil)
|
||||
_ driver.Valuer = FlappingEventTime{}
|
||||
)
|
||||
89
pkg/icingadb/v1/history/meta.go
Normal file
89
pkg/icingadb/v1/history/meta.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type UpserterEntity interface {
|
||||
contracts.Upserter
|
||||
contracts.Entity
|
||||
}
|
||||
|
||||
type HistoryTableEntity struct {
|
||||
Id types.UUID `json:"id"`
|
||||
}
|
||||
|
||||
// Fingerprint implements part of the contracts.Entity interface.
|
||||
func (hte HistoryTableEntity) Fingerprint() contracts.Fingerprinter {
|
||||
return hte
|
||||
}
|
||||
|
||||
// ID implements part of the contracts.Entity interface.
|
||||
func (hte HistoryTableEntity) ID() contracts.ID {
|
||||
return hte.Id
|
||||
}
|
||||
|
||||
// SetID implements part of the contracts.Entity interface.
|
||||
func (hte *HistoryTableEntity) SetID(id contracts.ID) {
|
||||
hte.Id = id.(types.UUID)
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
// Update only the Id (effectively nothing).
|
||||
func (hte HistoryTableEntity) Upsert() interface{} {
|
||||
return hte
|
||||
}
|
||||
|
||||
type HistoryEntity struct {
|
||||
Id types.UUID `json:"event_id"`
|
||||
}
|
||||
|
||||
// Fingerprint implements part of the contracts.Entity interface.
|
||||
func (he HistoryEntity) Fingerprint() contracts.Fingerprinter {
|
||||
return he
|
||||
}
|
||||
|
||||
// ID implements part of the contracts.Entity interface.
|
||||
func (he HistoryEntity) ID() contracts.ID {
|
||||
return he.Id
|
||||
}
|
||||
|
||||
// SetID implements part of the contracts.Entity interface.
|
||||
func (he *HistoryEntity) SetID(id contracts.ID) {
|
||||
he.Id = id.(types.UUID)
|
||||
}
|
||||
|
||||
// Upsert implements the contracts.Upserter interface.
|
||||
// Update only the Id (effectively nothing).
|
||||
func (he HistoryEntity) Upsert() interface{} {
|
||||
return he
|
||||
}
|
||||
|
||||
type HistoryTableMeta struct {
|
||||
EnvironmentId types.Binary `json:"environment_id"`
|
||||
EndpointId types.Binary `json:"endpoint_id"`
|
||||
ObjectType string `json:"object_type"`
|
||||
HostId types.Binary `json:"host_id"`
|
||||
ServiceId types.Binary `json:"service_id"`
|
||||
}
|
||||
|
||||
type HistoryMeta struct {
|
||||
HistoryEntity `json:",inline"`
|
||||
EnvironmentId types.Binary `json:"environment_id"`
|
||||
EndpointId types.Binary `json:"endpoint_id"`
|
||||
ObjectType string `json:"object_type"`
|
||||
HostId types.Binary `json:"host_id"`
|
||||
ServiceId types.Binary `json:"service_id"`
|
||||
EventType string `json:"event_type"`
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ contracts.Entity = (*HistoryTableEntity)(nil)
|
||||
_ contracts.Upserter = HistoryTableEntity{}
|
||||
_ contracts.Entity = (*HistoryEntity)(nil)
|
||||
_ contracts.Upserter = HistoryEntity{}
|
||||
_ contracts.Entity = (*HistoryMeta)(nil)
|
||||
_ contracts.Upserter = (*HistoryMeta)(nil)
|
||||
)
|
||||
45
pkg/icingadb/v1/history/notification.go
Normal file
45
pkg/icingadb/v1/history/notification.go
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type NotificationHistory struct {
|
||||
HistoryTableEntity `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
NotificationId types.Binary `json:"notification_id"`
|
||||
Type types.NotificationType `json:"type"`
|
||||
SendTime types.UnixMilli `json:"send_time"`
|
||||
State uint8 `json:"state"`
|
||||
PreviousHardState uint8 `json:"previous_hard_state"`
|
||||
Author string `json:"author"`
|
||||
Text string `json:"text"`
|
||||
UsersNotified uint16 `json:"users_notified"`
|
||||
}
|
||||
|
||||
type UserNotificationHistory struct {
|
||||
HistoryTableEntity `json:",inline"`
|
||||
EnvironmentId types.Binary `json:"environment_id"`
|
||||
NotificationHistoryId types.UUID `json:"notification_history_id"`
|
||||
UserId types.Binary `json:"user_id"`
|
||||
}
|
||||
|
||||
type HistoryNotification struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
NotificationHistoryId types.UUID `json:"id"`
|
||||
EventTime types.UnixMilli `json:"send_time"`
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryNotification) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ UpserterEntity = (*NotificationHistory)(nil)
|
||||
_ UpserterEntity = (*UserNotificationHistory)(nil)
|
||||
_ contracts.TableNamer = (*HistoryNotification)(nil)
|
||||
_ UpserterEntity = (*HistoryNotification)(nil)
|
||||
)
|
||||
40
pkg/icingadb/v1/history/state.go
Normal file
40
pkg/icingadb/v1/history/state.go
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
package history
|
||||
|
||||
import (
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
)
|
||||
|
||||
type StateHistory struct {
|
||||
HistoryTableEntity `json:",inline"`
|
||||
HistoryTableMeta `json:",inline"`
|
||||
EventTime types.UnixMilli `json:"event_time"`
|
||||
StateType types.StateType `json:"state_type"`
|
||||
SoftState uint8 `json:"soft_state"`
|
||||
HardState uint8 `json:"hard_state"`
|
||||
PreviousSoftState uint8 `json:"previous_soft_state"`
|
||||
PreviousHardState uint8 `json:"previous_hard_state"`
|
||||
Attempt uint8 `json:"attempt"`
|
||||
Output types.String `json:"output"`
|
||||
LongOutput types.String `json:"long_output"`
|
||||
MaxCheckAttempts uint32 `json:"max_check_attempts"`
|
||||
CheckSource types.String `json:"check_source"`
|
||||
}
|
||||
|
||||
type HistoryState struct {
|
||||
HistoryMeta `json:",inline"`
|
||||
StateHistoryId types.UUID `json:"id"`
|
||||
EventTime types.UnixMilli `json:"event_time"`
|
||||
}
|
||||
|
||||
// TableName implements the contracts.TableNamer interface.
|
||||
func (*HistoryState) TableName() string {
|
||||
return "history"
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ UpserterEntity = (*StateHistory)(nil)
|
||||
_ contracts.TableNamer = (*HistoryState)(nil)
|
||||
_ UpserterEntity = (*HistoryState)(nil)
|
||||
)
|
||||
157
pkg/structify/structify.go
Normal file
157
pkg/structify/structify.go
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
package structify
|
||||
|
||||
import (
|
||||
"encoding"
|
||||
"fmt"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// structBranch represents either a leaf or a subTree.
|
||||
type structBranch struct {
|
||||
// field specifies the struct field index.
|
||||
field int
|
||||
// leaf specifies the map key to parse the struct field from.
|
||||
leaf string
|
||||
// subTree specifies the struct field's inner tree.
|
||||
subTree []structBranch
|
||||
}
|
||||
|
||||
type MapStructifier = func(map[string]interface{}) (interface{}, error)
|
||||
|
||||
// MakeMapStructifier builds a function which parses a map's string values into a new struct of type t
|
||||
// and returns a pointer to it. tag specifies which tag connects struct fields to map keys.
|
||||
// MakeMapStructifier panics if it detects an unsupported type (suitable for usage in init() or global vars).
|
||||
func MakeMapStructifier(t reflect.Type, tag string) MapStructifier {
|
||||
tree := buildStructTree(t, tag)
|
||||
|
||||
return func(kv map[string]interface{}) (interface{}, error) {
|
||||
vPtr := reflect.New(t)
|
||||
ptr := vPtr.Interface()
|
||||
|
||||
if initer, ok := ptr.(contracts.Initer); ok {
|
||||
initer.Init()
|
||||
}
|
||||
|
||||
return ptr, structifyMapByTree(kv, tree, vPtr.Elem())
|
||||
}
|
||||
}
|
||||
|
||||
// buildStructTree assembles a tree which represents the struct t based on tag.
|
||||
func buildStructTree(t reflect.Type, tag string) []structBranch {
|
||||
var tree []structBranch
|
||||
numFields := t.NumField()
|
||||
|
||||
for i := 0; i < numFields; i++ {
|
||||
if field := t.Field(i); field.PkgPath == "" {
|
||||
switch tagValue := field.Tag.Get(tag); tagValue {
|
||||
case "", "-":
|
||||
case ",inline":
|
||||
if subTree := buildStructTree(field.Type, tag); subTree != nil {
|
||||
tree = append(tree, structBranch{i, "", subTree})
|
||||
}
|
||||
default:
|
||||
// If parseString doesn't support *T, it'll panic.
|
||||
_ = parseString("", reflect.New(field.Type).Interface())
|
||||
|
||||
tree = append(tree, structBranch{i, tagValue, nil})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return tree
|
||||
}
|
||||
|
||||
// structifyMapByTree parses src's string values into the struct dest according to tree's specification.
|
||||
func structifyMapByTree(src map[string]interface{}, tree []structBranch, dest reflect.Value) error {
|
||||
for _, branch := range tree {
|
||||
if branch.subTree == nil {
|
||||
if v, ok := src[branch.leaf]; ok {
|
||||
if vs, ok := v.(string); ok {
|
||||
if err := parseString(vs, dest.Field(branch.field).Addr().Interface()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if err := structifyMapByTree(src, branch.subTree, dest.Field(branch.field)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseString parses src into *dest.
|
||||
func parseString(src string, dest interface{}) error {
|
||||
switch ptr := dest.(type) {
|
||||
case encoding.TextUnmarshaler:
|
||||
return ptr.UnmarshalText([]byte(src))
|
||||
case *string:
|
||||
*ptr = src
|
||||
return nil
|
||||
case *uint8:
|
||||
i, err := strconv.ParseUint(src, 10, int(unsafe.Sizeof(*ptr)*8))
|
||||
if err == nil {
|
||||
*ptr = uint8(i)
|
||||
}
|
||||
return err
|
||||
case *uint16:
|
||||
i, err := strconv.ParseUint(src, 10, int(unsafe.Sizeof(*ptr)*8))
|
||||
if err == nil {
|
||||
*ptr = uint16(i)
|
||||
}
|
||||
return err
|
||||
case *uint32:
|
||||
i, err := strconv.ParseUint(src, 10, int(unsafe.Sizeof(*ptr)*8))
|
||||
if err == nil {
|
||||
*ptr = uint32(i)
|
||||
}
|
||||
return err
|
||||
case *uint64:
|
||||
i, err := strconv.ParseUint(src, 10, int(unsafe.Sizeof(*ptr)*8))
|
||||
if err == nil {
|
||||
*ptr = i
|
||||
}
|
||||
return err
|
||||
case *int8:
|
||||
i, err := strconv.ParseInt(src, 10, int(unsafe.Sizeof(*ptr)*8))
|
||||
if err == nil {
|
||||
*ptr = int8(i)
|
||||
}
|
||||
return err
|
||||
case *int16:
|
||||
i, err := strconv.ParseInt(src, 10, int(unsafe.Sizeof(*ptr)*8))
|
||||
if err == nil {
|
||||
*ptr = int16(i)
|
||||
}
|
||||
return err
|
||||
case *int32:
|
||||
i, err := strconv.ParseInt(src, 10, int(unsafe.Sizeof(*ptr)*8))
|
||||
if err == nil {
|
||||
*ptr = int32(i)
|
||||
}
|
||||
return err
|
||||
case *int64:
|
||||
i, err := strconv.ParseInt(src, 10, int(unsafe.Sizeof(*ptr)*8))
|
||||
if err == nil {
|
||||
*ptr = i
|
||||
}
|
||||
return err
|
||||
case *float32:
|
||||
f, err := strconv.ParseFloat(src, int(unsafe.Sizeof(*ptr)*8))
|
||||
if err == nil {
|
||||
*ptr = float32(f)
|
||||
}
|
||||
return err
|
||||
case *float64:
|
||||
f, err := strconv.ParseFloat(src, int(unsafe.Sizeof(*ptr)*8))
|
||||
if err == nil {
|
||||
*ptr = f
|
||||
}
|
||||
return err
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported type: %T", dest))
|
||||
}
|
||||
}
|
||||
|
|
@ -3,8 +3,10 @@ package types
|
|||
import (
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"encoding"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -41,6 +43,17 @@ func (b Bool) MarshalJSON() ([]byte, error) {
|
|||
return json.Marshal(b.Bool)
|
||||
}
|
||||
|
||||
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
||||
func (b *Bool) UnmarshalText(text []byte) error {
|
||||
parsed, err := strconv.ParseUint(string(text), 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*b = Bool{parsed != 0, true}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
func (b *Bool) UnmarshalJSON(data []byte) error {
|
||||
if string(data) == "null" || len(data) == 0 {
|
||||
|
|
@ -95,8 +108,9 @@ func (b Bool) Value() (driver.Value, error) {
|
|||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ json.Marshaler = (*Bool)(nil)
|
||||
_ json.Unmarshaler = (*Bool)(nil)
|
||||
_ sql.Scanner = (*Bool)(nil)
|
||||
_ driver.Valuer = (*Bool)(nil)
|
||||
_ json.Marshaler = (*Bool)(nil)
|
||||
_ encoding.TextUnmarshaler = (*Bool)(nil)
|
||||
_ json.Unmarshaler = (*Bool)(nil)
|
||||
_ sql.Scanner = (*Bool)(nil)
|
||||
_ driver.Valuer = (*Bool)(nil)
|
||||
)
|
||||
|
|
|
|||
66
pkg/types/float.go
Normal file
66
pkg/types/float.go
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"encoding"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// Float adds JSON support to sql.NullFloat64.
|
||||
type Float struct {
|
||||
sql.NullFloat64
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
// Supports JSON null.
|
||||
func (f Float) MarshalJSON() ([]byte, error) {
|
||||
var v interface{}
|
||||
if f.Valid {
|
||||
v = f.Float64
|
||||
}
|
||||
|
||||
return json.Marshal(v)
|
||||
}
|
||||
|
||||
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
||||
func (f *Float) UnmarshalText(text []byte) error {
|
||||
parsed, err := strconv.ParseFloat(string(text), 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*f = Float{sql.NullFloat64{
|
||||
Float64: parsed,
|
||||
Valid: true,
|
||||
}}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
// Supports JSON null.
|
||||
func (f *Float) UnmarshalJSON(data []byte) error {
|
||||
// Ignore null, like in the main JSON package.
|
||||
if bytes.HasPrefix(data, []byte{'n'}) {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := json.Unmarshal(data, &f.Float64)
|
||||
if err == nil {
|
||||
f.Valid = true
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ json.Marshaler = Float{}
|
||||
_ encoding.TextUnmarshaler = (*Float)(nil)
|
||||
_ json.Unmarshaler = (*Float)(nil)
|
||||
_ driver.Valuer = Float{}
|
||||
_ sql.Scanner = (*Float)(nil)
|
||||
)
|
||||
73
pkg/types/notification_type.go
Normal file
73
pkg/types/notification_type.go
Normal file
|
|
@ -0,0 +1,73 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"encoding"
|
||||
"fmt"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// NotificationType specifies the reason of a sent notification.
|
||||
type NotificationType uint16
|
||||
|
||||
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
||||
func (nt *NotificationType) UnmarshalText(bytes []byte) error {
|
||||
text := string(bytes)
|
||||
|
||||
i, err := strconv.ParseUint(text, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n := NotificationType(i)
|
||||
if uint64(n) != i {
|
||||
// Truncated due to above cast, obviously too high
|
||||
return BadNotificationType{text}
|
||||
}
|
||||
|
||||
if _, ok := notificationTypes[n]; !ok {
|
||||
return BadNotificationType{text}
|
||||
}
|
||||
|
||||
*nt = n
|
||||
return nil
|
||||
}
|
||||
|
||||
// Value implements the driver.Valuer interface.
|
||||
func (nt NotificationType) Value() (driver.Value, error) {
|
||||
if v, ok := notificationTypes[nt]; ok {
|
||||
return v, nil
|
||||
} else {
|
||||
return nil, BadNotificationType{nt}
|
||||
}
|
||||
}
|
||||
|
||||
// BadNotificationType complains about a syntactically, but not semantically valid NotificationType.
|
||||
type BadNotificationType struct {
|
||||
Type interface{}
|
||||
}
|
||||
|
||||
// Error implements the error interface.
|
||||
func (bnt BadNotificationType) Error() string {
|
||||
return fmt.Sprintf("bad notification type: %#v", bnt.Type)
|
||||
}
|
||||
|
||||
// notificationTypes maps all valid NotificationType values to their SQL representation.
|
||||
var notificationTypes = map[NotificationType]string{
|
||||
1: "downtime_start",
|
||||
2: "downtime_end",
|
||||
4: "downtime_removed",
|
||||
8: "custom",
|
||||
16: "acknowledgement",
|
||||
32: "problem",
|
||||
64: "recovery",
|
||||
128: "flapping_start",
|
||||
256: "flapping_end",
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ error = BadNotificationType{}
|
||||
_ encoding.TextUnmarshaler = (*NotificationType)(nil)
|
||||
_ driver.Valuer = NotificationType(0)
|
||||
)
|
||||
66
pkg/types/state_type.go
Normal file
66
pkg/types/state_type.go
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"encoding"
|
||||
"fmt"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// StateType specifies a state's hardness.
|
||||
type StateType uint8
|
||||
|
||||
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
||||
func (st *StateType) UnmarshalText(bytes []byte) error {
|
||||
text := string(bytes)
|
||||
|
||||
i, err := strconv.ParseUint(text, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s := StateType(i)
|
||||
if uint64(s) != i {
|
||||
// Truncated due to above cast, obviously too high
|
||||
return BadStateType{text}
|
||||
}
|
||||
|
||||
if _, ok := stateTypes[s]; !ok {
|
||||
return BadStateType{text}
|
||||
}
|
||||
|
||||
*st = s
|
||||
return nil
|
||||
}
|
||||
|
||||
// Value implements the driver.Valuer interface.
|
||||
func (st StateType) Value() (driver.Value, error) {
|
||||
if v, ok := stateTypes[st]; ok {
|
||||
return v, nil
|
||||
} else {
|
||||
return nil, BadStateType{st}
|
||||
}
|
||||
}
|
||||
|
||||
// BadStateType complains about a syntactically, but not semantically valid StateType.
|
||||
type BadStateType struct {
|
||||
Type interface{}
|
||||
}
|
||||
|
||||
// Error implements the error interface.
|
||||
func (bst BadStateType) Error() string {
|
||||
return fmt.Sprintf("bad state type: %#v", bst.Type)
|
||||
}
|
||||
|
||||
// stateTypes maps all valid StateType values to their SQL representation.
|
||||
var stateTypes = map[StateType]string{
|
||||
0: "soft",
|
||||
1: "hard",
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ error = BadStateType{}
|
||||
_ encoding.TextUnmarshaler = (*StateType)(nil)
|
||||
_ driver.Valuer = StateType(0)
|
||||
)
|
||||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"encoding"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
|
|
@ -23,6 +24,16 @@ func (s String) MarshalJSON() ([]byte, error) {
|
|||
return json.Marshal(v)
|
||||
}
|
||||
|
||||
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
||||
func (s *String) UnmarshalText(text []byte) error {
|
||||
*s = String{sql.NullString{
|
||||
String: string(text),
|
||||
Valid: true,
|
||||
}}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
// Supports JSON null.
|
||||
func (s *String) UnmarshalJSON(data []byte) error {
|
||||
|
|
@ -41,8 +52,9 @@ func (s *String) UnmarshalJSON(data []byte) error {
|
|||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ json.Marshaler = String{}
|
||||
_ json.Unmarshaler = (*String)(nil)
|
||||
_ driver.Valuer = String{}
|
||||
_ sql.Scanner = (*String)(nil)
|
||||
_ json.Marshaler = String{}
|
||||
_ encoding.TextUnmarshaler = (*String)(nil)
|
||||
_ json.Unmarshaler = (*String)(nil)
|
||||
_ driver.Valuer = String{}
|
||||
_ sql.Scanner = (*String)(nil)
|
||||
)
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package types
|
|||
import (
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"encoding"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
|
|
@ -28,6 +29,17 @@ func (t UnixMilli) MarshalJSON() ([]byte, error) {
|
|||
return []byte(strconv.FormatInt(utils.UnixMilli(time.Time(t)), 10)), nil
|
||||
}
|
||||
|
||||
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
||||
func (t *UnixMilli) UnmarshalText(text []byte) error {
|
||||
parsed, err := strconv.ParseFloat(string(text), 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*t = UnixMilli(utils.FromUnixMilli(int64(parsed)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
// Unmarshals from milliseconds. Supports JSON null.
|
||||
func (t *UnixMilli) UnmarshalJSON(data []byte) error {
|
||||
|
|
@ -74,8 +86,9 @@ func (t UnixMilli) Value() (driver.Value, error) {
|
|||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ json.Marshaler = (*UnixMilli)(nil)
|
||||
_ json.Unmarshaler = (*UnixMilli)(nil)
|
||||
_ sql.Scanner = (*UnixMilli)(nil)
|
||||
_ driver.Valuer = (*UnixMilli)(nil)
|
||||
_ json.Marshaler = (*UnixMilli)(nil)
|
||||
_ encoding.TextUnmarshaler = (*UnixMilli)(nil)
|
||||
_ json.Unmarshaler = (*UnixMilli)(nil)
|
||||
_ sql.Scanner = (*UnixMilli)(nil)
|
||||
_ driver.Valuer = (*UnixMilli)(nil)
|
||||
)
|
||||
|
|
|
|||
24
pkg/types/uuid.go
Normal file
24
pkg/types/uuid.go
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"encoding"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// UUID is like uuid.UUID, but marshals itself binarily (not like xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) in SQL context.
|
||||
type UUID struct {
|
||||
uuid.UUID
|
||||
}
|
||||
|
||||
// Value implements driver.Valuer.
|
||||
func (uuid UUID) Value() (driver.Value, error) {
|
||||
return uuid.UUID[:], nil
|
||||
}
|
||||
|
||||
// Assert interface compliance.
|
||||
var (
|
||||
_ encoding.TextUnmarshaler = (*UUID)(nil)
|
||||
_ driver.Valuer = UUID{}
|
||||
_ driver.Valuer = (*UUID)(nil)
|
||||
)
|
||||
|
|
@ -42,6 +42,15 @@ func Name(t interface{}) string {
|
|||
return s[strings.LastIndex(s, ".")+1:]
|
||||
}
|
||||
|
||||
// TableName returns the table of t.
|
||||
func TableName(t interface{}) string {
|
||||
if tn, ok := t.(contracts.TableNamer); ok {
|
||||
return tn.TableName()
|
||||
} else {
|
||||
return Key(Name(t), '_')
|
||||
}
|
||||
}
|
||||
|
||||
// Key returns the name with all Unicode letters mapped to lower case letters,
|
||||
// with an additional separator in front of each original upper case letter.
|
||||
func Key(name string, sep byte) string {
|
||||
|
|
|
|||
Loading…
Reference in a new issue