History sync: rewrite to use a sequential pipeline

This is in preparation for adding foreign key constraints to the history
tables. For this, is is required to insert the rows into the different history
tables in a defined order.
This commit is contained in:
Julian Brost 2021-09-14 16:26:00 +02:00
parent 82530c771d
commit bfcc324535
2 changed files with 207 additions and 209 deletions

View file

@ -3,6 +3,7 @@ package history
import (
"context"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/icingadb"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history"
@ -32,274 +33,270 @@ func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogg
}
}
// insertedMessage represents a just inserted row.
type insertedMessage struct {
// redisId specifies the origin Redis message.
redisId string
// structType represents the table the row was inserted into.
structType reflect.Type
}
const bulkSize = 1 << 14
// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
func (s Sync) Sync(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
for _, hs := range historyStreams {
var redis2structs []chan<- redis.XMessage
insertedMessages := make(chan insertedMessage, bulkSize)
for key, pipeline := range syncPipelines {
key := key
pipeline := pipeline
// messageProgress are the tables (represented by struct types)
// with successfully inserted rows by Redis message ID.
messageProgress := map[string]map[reflect.Type]struct{}{}
messageProgressMtx := &sync.Mutex{}
s.logger.Debugw("Starting history sync", zap.String("type", key))
stream := "icinga:history:stream:" + hs.kind
s.logger.Infof("Syncing %s history", hs.kind)
// The pipeline consists of n+2 stages connected sequentially using n+1 channels of type chan redis.XMessage,
// where n = len(pipeline), i.e. the number of actual sync stages. So the resulting pipeline looks like this:
//
// readFromRedis() Reads from redis and sends the history entries to the next stage
// ↓ ch[0]
// pipeline[0]() First actual sync stage, receives history items from the previous stage, syncs them
// and once completed, sends them off to the next stage.
// ↓ ch[1]
// ... There may be a different number of pipeline stages in between.
// ↓ ch[n-1]
// pipeline[n-1]() Last actual sync stage, once it's done, sends the history item to the final stage.
// ↓ ch[n]
// deleteFromRedis() After all stages have processed a message successfully, this final stage deletes
// the history entry from the Redis stream as it is now persisted in the database.
//
// Each history entry is processed by at most one stage at each time. Each state must forward the entry after
// it has processed it, even if the stage itself does not do anything with this specific entry. It should only
// forward the entry after it has completed its own sync so that later stages can rely on previous stages being
// executed successfully.
for _, structifier := range hs.structifiers {
redis2struct := make(chan redis.XMessage, bulkSize)
struct2db := make(chan contracts.Entity, bulkSize)
succeeded := make(chan contracts.Entity, bulkSize)
ch := make([]chan redis.XMessage, len(pipeline)+1)
for i := range ch {
if i == 0 {
// Make the first channel buffered so that all items of one read iteration fit into the channel.
// This allows starting the next Redis XREAD right after the previous one has finished.
ch[i] = make(chan redis.XMessage, s.redis.Options.XReadCount)
} else {
ch[i] = make(chan redis.XMessage)
}
}
// rowIds are IDs of to be synced Redis messages by database row.
rowIds := map[contracts.Entity]string{}
rowIdsMtx := &sync.Mutex{}
g.Go(func() error {
return s.readFromRedis(ctx, key, ch[0])
})
redis2structs = append(redis2structs, redis2struct)
for i, stage := range pipeline {
i := i
stage := stage
g.Go(structifyStream(ctx, structifier, redis2struct, struct2db, rowIds, rowIdsMtx))
g.Go(fwdSucceeded(ctx, insertedMessages, succeeded, rowIds, rowIdsMtx))
// Upserts from struct2db.
g.Go(func() error {
defer close(succeeded)
return s.db.UpsertStreamed(ctx, struct2db, succeeded)
return stage(ctx, s, key, ch[i], ch[i+1])
})
}
g.Go(s.xRead(ctx, redis2structs, stream))
g.Go(s.cleanup(ctx, hs, insertedMessages, messageProgress, messageProgressMtx, stream))
g.Go(func() error {
return s.deleteFromRedis(ctx, key, ch[len(pipeline)])
})
}
return g.Wait()
}
// xRead reads from the Redis stream and broadcasts the data to redis2structs.
func (s Sync) xRead(ctx context.Context, redis2structs []chan<- redis.XMessage, stream string) func() error {
return func() error {
defer func() {
for _, r2s := range redis2structs {
close(r2s)
}
}()
// readFromRedis is the first stage of the history sync pipeline. It reads the history stream from Redis
// and feeds the history entries into the next stage.
func (s Sync) readFromRedis(ctx context.Context, key string, output chan<- redis.XMessage) error {
defer close(output)
xra := &redis.XReadArgs{
Streams: []string{stream, "0-0"},
Count: bulkSize,
Block: 10 * time.Second,
xra := &redis.XReadArgs{
Streams: []string{"icinga:history:stream:" + key, "0-0"},
Count: int64(s.redis.Options.XReadCount),
Block: 10 * time.Second,
}
for {
cmd := s.redis.XRead(ctx, xra)
streams, err := cmd.Result()
if err != nil && err != redis.Nil {
return icingaredis.WrapCmdErr(cmd)
}
for {
cmd := s.redis.XRead(ctx, xra)
streams, err := cmd.Result()
for _, stream := range streams {
for _, message := range stream.Messages {
xra.Streams[1] = message.ID
if err != nil && err != redis.Nil {
select {
case output <- message:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
}
// deleteFromRedis is the last stage of the history sync pipeline. It receives history entries from the second to last
// pipeline stage and then deletes the stream entry from Redis as all pipeline stages successfully processed the entry.
func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redis.XMessage) error {
const logInterval = 20 * time.Second
var count uint64 // Count of synced entries for periodic logging.
stream := "icinga:history:stream:" + key
logTicker := time.NewTicker(logInterval)
defer logTicker.Stop()
bulks := com.BulkXMessages(ctx, input, s.redis.Options.HScanCount)
for {
select {
case bulk := <-bulks:
ids := make([]string, len(bulk))
for i := range bulk {
ids[i] = bulk[i].ID
}
cmd := s.redis.XDel(ctx, stream, ids...)
if _, err := cmd.Result(); err != nil {
return icingaredis.WrapCmdErr(cmd)
}
for _, stream := range streams {
for _, message := range stream.Messages {
xra.Streams[1] = message.ID
count += uint64(len(ids))
for _, r2s := range redis2structs {
select {
case <-ctx.Done():
return ctx.Err()
case r2s <- message:
}
}
}
case <-logTicker.C:
if count > 0 {
s.logger.Infof("Inserted %d %s history entries in the last %s", count, key, logInterval)
count = 0
}
case <-ctx.Done():
return ctx.Err()
}
}
}
// structifyStream structifies from redis2struct to struct2db.
func structifyStream(
ctx context.Context, structifier structify.MapStructifier, redis2struct <-chan redis.XMessage,
struct2db chan<- contracts.Entity, rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex,
) func() error {
return func() error {
defer close(struct2db)
// stageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop
// once that context is canceled), the Sync instance (for access to Redis, SQL database, logging), the key (information
// about which pipeline this function is running in, i.e. "notification"), an in channel for the stage to read history
// events from and an out channel to forward history entries to after processing them successfully. A stage function
// is supposed to forward each message from in to out, even if the event is not relevant for the current stage. On
// error conditions, the message must not be forwarded to the next stage so that the event is not deleted from Redis
// and can be processed at a later time.
type stageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error
for {
select {
case <-ctx.Done():
return ctx.Err()
case message, ok := <-redis2struct:
if !ok {
return nil
}
func stageFuncForEntity(structPtr interface{}) stageFunc {
structifier := structify.MakeMapStructifier(reflect.TypeOf(structPtr).Elem(), "json")
ptr, err := structifier(message.Values)
if err != nil {
return errors.Wrapf(err, "can't structify values %#v", message.Values)
}
return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
type State struct {
Message redis.XMessage // Original event from Redis.
Pending int // Number of pending entities. When reaching 0, the message is forwarded to out.
}
ue := ptr.(v1.UpserterEntity)
bufSize := s.db.Options.MaxPlaceholdersPerStatement
insert := make(chan contracts.Entity, bufSize) // Events sent to the database for insertion.
inserted := make(chan contracts.Entity) // Events returned by the database after successful insertion.
state := make(map[contracts.Entity]*State) // Shared state between all entities created by one event.
var stateMu sync.Mutex // Synchronizes concurrent access to state.
rowIdsMtx.Lock()
rowIds[ue] = message.ID
rowIdsMtx.Unlock()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer close(insert)
for {
select {
case <-ctx.Done():
return ctx.Err()
case struct2db <- ue:
}
}
}
}
}
case e, ok := <-in:
if !ok {
return nil
}
// fwdSucceeded informs insertedMessages about successfully inserted rows according to succeeded.
func fwdSucceeded(
ctx context.Context, insertedMessages chan<- insertedMessage, succeeded <-chan contracts.Entity,
rowIds map[contracts.Entity]string, rowIdsMtx *sync.Mutex,
) func() error {
return func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case row, ok := <-succeeded:
if !ok {
return nil
}
ptr, err := structifier(e.Values)
if err != nil {
return errors.Wrapf(err, "can't structify values %#v", e.Values)
}
rowIdsMtx.Lock()
ue := ptr.(v1.UpserterEntity)
id, ok := rowIds[row]
if ok {
delete(rowIds, row)
}
st := &State{
Message: e,
Pending: 1,
}
stateMu.Lock()
state[ue] = st
stateMu.Unlock()
rowIdsMtx.Unlock()
if ok {
select {
case insert <- ue:
case <-ctx.Done():
return ctx.Err()
case insertedMessages <- insertedMessage{id, reflect.TypeOf(row).Elem()}:
}
case <-ctx.Done():
return ctx.Err()
}
}
}
}
}
})
// cleanup collects completely inserted messages from insertedMessages and deletes them from Redis.
func (s Sync) cleanup(
ctx context.Context, hs historyStream, insertedMessages <-chan insertedMessage,
messageProgress map[string]map[reflect.Type]struct{}, messageProgressMtx *sync.Mutex, stream string,
) func() error {
return func() error {
var ids []string
var count uint64
var timeout <-chan time.Time
g.Go(func() error {
defer close(inserted)
const period = 20 * time.Second
periodically := time.NewTicker(period)
defer periodically.Stop()
return s.db.UpsertStreamed(ctx, insert, inserted)
})
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-periodically.C:
if count > 0 {
s.logger.Infof("Inserted %d %s history entries in the last %s", count, hs.kind, period)
count = 0
}
case msg := <-insertedMessages:
messageProgressMtx.Lock()
g.Go(func() error {
defer close(out)
mp, ok := messageProgress[msg.redisId]
if !ok {
mp = map[reflect.Type]struct{}{}
messageProgress[msg.redisId] = mp
}
for {
select {
case e, ok := <-inserted:
if !ok {
return nil
}
mp[msg.structType] = struct{}{}
stateMu.Lock()
st := state[e]
delete(state, e)
stateMu.Unlock()
if ok = len(mp) == len(hs.structifiers); ok {
delete(messageProgress, msg.redisId)
}
messageProgressMtx.Unlock()
if ok {
ids = append(ids, msg.redisId)
count++
switch len(ids) {
case 1:
timeout = time.After(time.Second / 4)
case bulkSize:
cmd := s.redis.XDel(ctx, stream, ids...)
if _, err := cmd.Result(); err != nil {
return icingaredis.WrapCmdErr(cmd)
st.Pending--
if st.Pending == 0 {
select {
case out <- st.Message:
case <-ctx.Done():
return ctx.Err()
}
ids = nil
timeout = nil
}
}
case <-timeout:
cmd := s.redis.XDel(ctx, stream, ids...)
if _, err := cmd.Result(); err != nil {
return icingaredis.WrapCmdErr(cmd)
}
ids = nil
timeout = nil
case <-ctx.Done():
return ctx.Err()
}
}
}
})
return g.Wait()
}
}
// historyStream represents a Redis history stream.
type historyStream struct {
// kind specifies the stream's purpose.
kind string
// structifiers lists the factories of the model structs the stream data shall be copied to.
structifiers []structify.MapStructifier
var syncPipelines = map[string][]stageFunc{
"notification": {
stageFuncForEntity((*v1.NotificationHistory)(nil)), // notification_history
stageFuncForEntity((*v1.HistoryNotification)(nil)), // history (depends on notification_history)
},
"usernotification": {
stageFuncForEntity((*v1.UserNotificationHistory)(nil)),
},
"state": {
stageFuncForEntity((*v1.StateHistory)(nil)), // state_history
stageFuncForEntity((*v1.HistoryState)(nil)), // history (depends on state_history)
},
"downtime": {
stageFuncForEntity((*v1.DowntimeHistory)(nil)), // downtime_history
stageFuncForEntity((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history)
},
"comment": {
stageFuncForEntity((*v1.CommentHistory)(nil)), // comment_history
stageFuncForEntity((*v1.HistoryComment)(nil)), // history (depends on comment_history)
},
"flapping": {
stageFuncForEntity((*v1.FlappingHistory)(nil)), // flapping_history
stageFuncForEntity((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history)
},
"acknowledgement": {
stageFuncForEntity((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history
stageFuncForEntity((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history)
},
}
// historyStreams contains all Redis history streams to sync.
var historyStreams = func() []historyStream {
var streams []historyStream
for _, rhs := range []struct {
kind string
structPtrs []v1.UpserterEntity
}{
{"notification", []v1.UpserterEntity{(*v1.NotificationHistory)(nil), (*v1.HistoryNotification)(nil)}},
{"usernotification", []v1.UpserterEntity{(*v1.UserNotificationHistory)(nil)}},
{"state", []v1.UpserterEntity{(*v1.StateHistory)(nil), (*v1.HistoryState)(nil)}},
{"downtime", []v1.UpserterEntity{(*v1.DowntimeHistory)(nil), (*v1.HistoryDowntime)(nil)}},
{"comment", []v1.UpserterEntity{(*v1.CommentHistory)(nil), (*v1.HistoryComment)(nil)}},
{"flapping", []v1.UpserterEntity{(*v1.FlappingHistory)(nil), (*v1.HistoryFlapping)(nil)}},
{"acknowledgement", []v1.UpserterEntity{(*v1.AcknowledgementHistory)(nil), (*v1.HistoryAck)(nil)}},
} {
var structifiers []structify.MapStructifier
for _, structPtr := range rhs.structPtrs {
structifiers = append(structifiers, structify.MakeMapStructifier(reflect.TypeOf(structPtr).Elem(), "json"))
}
streams = append(streams, historyStream{rhs.kind, structifiers})
}
return streams
}()

View file

@ -31,6 +31,7 @@ type Options struct {
MaxHMGetConnections int `yaml:"max_hmget_connections" default:"4096"`
HMGetCount int `yaml:"hmget_count" default:"4096"`
HScanCount int `yaml:"hscan_count" default:"4096"`
XReadCount int `yaml:"xread_count" default:"4096"`
}
// Validate checks constraints in the supplied Redis options and returns an error if they are violated.