From bfcc32453581e877bf233d36fb4e23ea5a528a89 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 14 Sep 2021 16:26:00 +0200 Subject: [PATCH] History sync: rewrite to use a sequential pipeline This is in preparation for adding foreign key constraints to the history tables. For this, is is required to insert the rows into the different history tables in a defined order. --- pkg/icingadb/history/sync.go | 415 +++++++++++++++++------------------ pkg/icingaredis/client.go | 1 + 2 files changed, 207 insertions(+), 209 deletions(-) diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 4cec2333..afa1be3a 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -3,6 +3,7 @@ package history import ( "context" "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/icingadb" v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" @@ -32,274 +33,270 @@ func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogg } } -// insertedMessage represents a just inserted row. -type insertedMessage struct { - // redisId specifies the origin Redis message. - redisId string - // structType represents the table the row was inserted into. - structType reflect.Type -} - -const bulkSize = 1 << 14 - // Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. func (s Sync) Sync(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) - for _, hs := range historyStreams { - var redis2structs []chan<- redis.XMessage - insertedMessages := make(chan insertedMessage, bulkSize) + for key, pipeline := range syncPipelines { + key := key + pipeline := pipeline - // messageProgress are the tables (represented by struct types) - // with successfully inserted rows by Redis message ID. - messageProgress := map[string]map[reflect.Type]struct{}{} - messageProgressMtx := &sync.Mutex{} + s.logger.Debugw("Starting history sync", zap.String("type", key)) - stream := "icinga:history:stream:" + hs.kind - s.logger.Infof("Syncing %s history", hs.kind) + // The pipeline consists of n+2 stages connected sequentially using n+1 channels of type chan redis.XMessage, + // where n = len(pipeline), i.e. the number of actual sync stages. So the resulting pipeline looks like this: + // + // readFromRedis() Reads from redis and sends the history entries to the next stage + // ↓ ch[0] + // pipeline[0]() First actual sync stage, receives history items from the previous stage, syncs them + // and once completed, sends them off to the next stage. + // ↓ ch[1] + // ... There may be a different number of pipeline stages in between. + // ↓ ch[n-1] + // pipeline[n-1]() Last actual sync stage, once it's done, sends the history item to the final stage. + // ↓ ch[n] + // deleteFromRedis() After all stages have processed a message successfully, this final stage deletes + // the history entry from the Redis stream as it is now persisted in the database. + // + // Each history entry is processed by at most one stage at each time. Each state must forward the entry after + // it has processed it, even if the stage itself does not do anything with this specific entry. It should only + // forward the entry after it has completed its own sync so that later stages can rely on previous stages being + // executed successfully. - for _, structifier := range hs.structifiers { - redis2struct := make(chan redis.XMessage, bulkSize) - struct2db := make(chan contracts.Entity, bulkSize) - succeeded := make(chan contracts.Entity, bulkSize) + ch := make([]chan redis.XMessage, len(pipeline)+1) + for i := range ch { + if i == 0 { + // Make the first channel buffered so that all items of one read iteration fit into the channel. + // This allows starting the next Redis XREAD right after the previous one has finished. + ch[i] = make(chan redis.XMessage, s.redis.Options.XReadCount) + } else { + ch[i] = make(chan redis.XMessage) + } + } - // rowIds are IDs of to be synced Redis messages by database row. - rowIds := map[contracts.Entity]string{} - rowIdsMtx := &sync.Mutex{} + g.Go(func() error { + return s.readFromRedis(ctx, key, ch[0]) + }) - redis2structs = append(redis2structs, redis2struct) + for i, stage := range pipeline { + i := i + stage := stage - g.Go(structifyStream(ctx, structifier, redis2struct, struct2db, rowIds, rowIdsMtx)) - g.Go(fwdSucceeded(ctx, insertedMessages, succeeded, rowIds, rowIdsMtx)) - - // Upserts from struct2db. g.Go(func() error { - defer close(succeeded) - return s.db.UpsertStreamed(ctx, struct2db, succeeded) + return stage(ctx, s, key, ch[i], ch[i+1]) }) } - g.Go(s.xRead(ctx, redis2structs, stream)) - g.Go(s.cleanup(ctx, hs, insertedMessages, messageProgress, messageProgressMtx, stream)) + g.Go(func() error { + return s.deleteFromRedis(ctx, key, ch[len(pipeline)]) + }) } return g.Wait() } -// xRead reads from the Redis stream and broadcasts the data to redis2structs. -func (s Sync) xRead(ctx context.Context, redis2structs []chan<- redis.XMessage, stream string) func() error { - return func() error { - defer func() { - for _, r2s := range redis2structs { - close(r2s) - } - }() +// readFromRedis is the first stage of the history sync pipeline. It reads the history stream from Redis +// and feeds the history entries into the next stage. +func (s Sync) readFromRedis(ctx context.Context, key string, output chan<- redis.XMessage) error { + defer close(output) - xra := &redis.XReadArgs{ - Streams: []string{stream, "0-0"}, - Count: bulkSize, - Block: 10 * time.Second, + xra := &redis.XReadArgs{ + Streams: []string{"icinga:history:stream:" + key, "0-0"}, + Count: int64(s.redis.Options.XReadCount), + Block: 10 * time.Second, + } + + for { + cmd := s.redis.XRead(ctx, xra) + streams, err := cmd.Result() + + if err != nil && err != redis.Nil { + return icingaredis.WrapCmdErr(cmd) } - for { - cmd := s.redis.XRead(ctx, xra) - streams, err := cmd.Result() + for _, stream := range streams { + for _, message := range stream.Messages { + xra.Streams[1] = message.ID - if err != nil && err != redis.Nil { + select { + case output <- message: + case <-ctx.Done(): + return ctx.Err() + } + } + } + } +} + +// deleteFromRedis is the last stage of the history sync pipeline. It receives history entries from the second to last +// pipeline stage and then deletes the stream entry from Redis as all pipeline stages successfully processed the entry. +func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redis.XMessage) error { + const logInterval = 20 * time.Second + + var count uint64 // Count of synced entries for periodic logging. + stream := "icinga:history:stream:" + key + + logTicker := time.NewTicker(logInterval) + defer logTicker.Stop() + + bulks := com.BulkXMessages(ctx, input, s.redis.Options.HScanCount) + + for { + select { + case bulk := <-bulks: + ids := make([]string, len(bulk)) + for i := range bulk { + ids[i] = bulk[i].ID + } + + cmd := s.redis.XDel(ctx, stream, ids...) + if _, err := cmd.Result(); err != nil { return icingaredis.WrapCmdErr(cmd) } - for _, stream := range streams { - for _, message := range stream.Messages { - xra.Streams[1] = message.ID + count += uint64(len(ids)) - for _, r2s := range redis2structs { - select { - case <-ctx.Done(): - return ctx.Err() - case r2s <- message: - } - } - } + case <-logTicker.C: + if count > 0 { + s.logger.Infof("Inserted %d %s history entries in the last %s", count, key, logInterval) + count = 0 } + + case <-ctx.Done(): + return ctx.Err() } } } -// structifyStream structifies from redis2struct to struct2db. -func structifyStream( - ctx context.Context, structifier structify.MapStructifier, redis2struct <-chan redis.XMessage, - struct2db chan<- contracts.Entity, rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex, -) func() error { - return func() error { - defer close(struct2db) +// stageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop +// once that context is canceled), the Sync instance (for access to Redis, SQL database, logging), the key (information +// about which pipeline this function is running in, i.e. "notification"), an in channel for the stage to read history +// events from and an out channel to forward history entries to after processing them successfully. A stage function +// is supposed to forward each message from in to out, even if the event is not relevant for the current stage. On +// error conditions, the message must not be forwarded to the next stage so that the event is not deleted from Redis +// and can be processed at a later time. +type stageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error - for { - select { - case <-ctx.Done(): - return ctx.Err() - case message, ok := <-redis2struct: - if !ok { - return nil - } +func stageFuncForEntity(structPtr interface{}) stageFunc { + structifier := structify.MakeMapStructifier(reflect.TypeOf(structPtr).Elem(), "json") - ptr, err := structifier(message.Values) - if err != nil { - return errors.Wrapf(err, "can't structify values %#v", message.Values) - } + return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { + type State struct { + Message redis.XMessage // Original event from Redis. + Pending int // Number of pending entities. When reaching 0, the message is forwarded to out. + } - ue := ptr.(v1.UpserterEntity) + bufSize := s.db.Options.MaxPlaceholdersPerStatement + insert := make(chan contracts.Entity, bufSize) // Events sent to the database for insertion. + inserted := make(chan contracts.Entity) // Events returned by the database after successful insertion. + state := make(map[contracts.Entity]*State) // Shared state between all entities created by one event. + var stateMu sync.Mutex // Synchronizes concurrent access to state. - rowIdsMtx.Lock() - rowIds[ue] = message.ID - rowIdsMtx.Unlock() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + defer close(insert) + + for { select { - case <-ctx.Done(): - return ctx.Err() - case struct2db <- ue: - } - } - } - } -} + case e, ok := <-in: + if !ok { + return nil + } -// fwdSucceeded informs insertedMessages about successfully inserted rows according to succeeded. -func fwdSucceeded( - ctx context.Context, insertedMessages chan<- insertedMessage, succeeded <-chan contracts.Entity, - rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex, -) func() error { - return func() error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case row, ok := <-succeeded: - if !ok { - return nil - } + ptr, err := structifier(e.Values) + if err != nil { + return errors.Wrapf(err, "can't structify values %#v", e.Values) + } - rowIdsMtx.Lock() + ue := ptr.(v1.UpserterEntity) - id, ok := rowIds[row] - if ok { - delete(rowIds, row) - } + st := &State{ + Message: e, + Pending: 1, + } + stateMu.Lock() + state[ue] = st + stateMu.Unlock() - rowIdsMtx.Unlock() - - if ok { select { + case insert <- ue: case <-ctx.Done(): return ctx.Err() - case insertedMessages <- insertedMessage{id, reflect.TypeOf(row).Elem()}: } + + case <-ctx.Done(): + return ctx.Err() } } - } - } -} + }) -// cleanup collects completely inserted messages from insertedMessages and deletes them from Redis. -func (s Sync) cleanup( - ctx context.Context, hs historyStream, insertedMessages <-chan insertedMessage, - messageProgress map[string]map[reflect.Type]struct{}, messageProgressMtx *sync.Mutex, stream string, -) func() error { - return func() error { - var ids []string - var count uint64 - var timeout <-chan time.Time + g.Go(func() error { + defer close(inserted) - const period = 20 * time.Second - periodically := time.NewTicker(period) - defer periodically.Stop() + return s.db.UpsertStreamed(ctx, insert, inserted) + }) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-periodically.C: - if count > 0 { - s.logger.Infof("Inserted %d %s history entries in the last %s", count, hs.kind, period) - count = 0 - } - case msg := <-insertedMessages: - messageProgressMtx.Lock() + g.Go(func() error { + defer close(out) - mp, ok := messageProgress[msg.redisId] - if !ok { - mp = map[reflect.Type]struct{}{} - messageProgress[msg.redisId] = mp - } + for { + select { + case e, ok := <-inserted: + if !ok { + return nil + } - mp[msg.structType] = struct{}{} + stateMu.Lock() + st := state[e] + delete(state, e) + stateMu.Unlock() - if ok = len(mp) == len(hs.structifiers); ok { - delete(messageProgress, msg.redisId) - } - - messageProgressMtx.Unlock() - - if ok { - ids = append(ids, msg.redisId) - count++ - - switch len(ids) { - case 1: - timeout = time.After(time.Second / 4) - case bulkSize: - cmd := s.redis.XDel(ctx, stream, ids...) - if _, err := cmd.Result(); err != nil { - return icingaredis.WrapCmdErr(cmd) + st.Pending-- + if st.Pending == 0 { + select { + case out <- st.Message: + case <-ctx.Done(): + return ctx.Err() } - - ids = nil - timeout = nil } - } - case <-timeout: - cmd := s.redis.XDel(ctx, stream, ids...) - if _, err := cmd.Result(); err != nil { - return icingaredis.WrapCmdErr(cmd) - } - ids = nil - timeout = nil + case <-ctx.Done(): + return ctx.Err() + } } - } + }) + + return g.Wait() } } -// historyStream represents a Redis history stream. -type historyStream struct { - // kind specifies the stream's purpose. - kind string - // structifiers lists the factories of the model structs the stream data shall be copied to. - structifiers []structify.MapStructifier +var syncPipelines = map[string][]stageFunc{ + "notification": { + stageFuncForEntity((*v1.NotificationHistory)(nil)), // notification_history + stageFuncForEntity((*v1.HistoryNotification)(nil)), // history (depends on notification_history) + }, + "usernotification": { + stageFuncForEntity((*v1.UserNotificationHistory)(nil)), + }, + "state": { + stageFuncForEntity((*v1.StateHistory)(nil)), // state_history + stageFuncForEntity((*v1.HistoryState)(nil)), // history (depends on state_history) + }, + "downtime": { + stageFuncForEntity((*v1.DowntimeHistory)(nil)), // downtime_history + stageFuncForEntity((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history) + }, + "comment": { + stageFuncForEntity((*v1.CommentHistory)(nil)), // comment_history + stageFuncForEntity((*v1.HistoryComment)(nil)), // history (depends on comment_history) + }, + "flapping": { + stageFuncForEntity((*v1.FlappingHistory)(nil)), // flapping_history + stageFuncForEntity((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history) + }, + "acknowledgement": { + stageFuncForEntity((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history + stageFuncForEntity((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) + }, } - -// historyStreams contains all Redis history streams to sync. -var historyStreams = func() []historyStream { - var streams []historyStream - for _, rhs := range []struct { - kind string - structPtrs []v1.UpserterEntity - }{ - {"notification", []v1.UpserterEntity{(*v1.NotificationHistory)(nil), (*v1.HistoryNotification)(nil)}}, - {"usernotification", []v1.UpserterEntity{(*v1.UserNotificationHistory)(nil)}}, - {"state", []v1.UpserterEntity{(*v1.StateHistory)(nil), (*v1.HistoryState)(nil)}}, - {"downtime", []v1.UpserterEntity{(*v1.DowntimeHistory)(nil), (*v1.HistoryDowntime)(nil)}}, - {"comment", []v1.UpserterEntity{(*v1.CommentHistory)(nil), (*v1.HistoryComment)(nil)}}, - {"flapping", []v1.UpserterEntity{(*v1.FlappingHistory)(nil), (*v1.HistoryFlapping)(nil)}}, - {"acknowledgement", []v1.UpserterEntity{(*v1.AcknowledgementHistory)(nil), (*v1.HistoryAck)(nil)}}, - } { - var structifiers []structify.MapStructifier - for _, structPtr := range rhs.structPtrs { - structifiers = append(structifiers, structify.MakeMapStructifier(reflect.TypeOf(structPtr).Elem(), "json")) - } - - streams = append(streams, historyStream{rhs.kind, structifiers}) - } - - return streams -}() diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index 710adfc0..2a293c79 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -31,6 +31,7 @@ type Options struct { MaxHMGetConnections int `yaml:"max_hmget_connections" default:"4096"` HMGetCount int `yaml:"hmget_count" default:"4096"` HScanCount int `yaml:"hscan_count" default:"4096"` + XReadCount int `yaml:"xread_count" default:"4096"` } // Validate checks constraints in the supplied Redis options and returns an error if they are violated.