From 17dc99b078b31025bb7d44e74cc6ab6eeee0bf16 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 6 Oct 2021 12:11:16 +0200 Subject: [PATCH] Only do anything while icinga:schema version meets our expectations ... not to work on unknown data structure. --- cmd/icingadb/main.go | 59 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index a8f7e847..c7eadec5 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal/command" "github.com/icinga/icingadb/internal/logging" "github.com/icinga/icingadb/pkg/com" @@ -24,9 +25,10 @@ import ( ) const ( - ExitSuccess = 0 - ExitFailure = 1 - expectedSchemaVersion = 2 + ExitSuccess = 0 + ExitFailure = 1 + expectedRedisSchemaVersion = "1" + expectedDbSchemaVersion = 2 ) func main() { @@ -77,6 +79,15 @@ func run() int { } } + { + pos, err := checkRedisSchema(logger, rc, "0-0") + if err != nil { + logger.Fatalf("%+v", err) + } + + go monitorRedisSchema(logger, rc, pos) + } + ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -285,9 +296,47 @@ func checkDbSchema(ctx context.Context, db *icingadb.DB) error { return errors.Wrap(err, "can't check database schema version") } - if version != expectedSchemaVersion { - return errors.Errorf("expected database schema v%d, got v%d", expectedSchemaVersion, version) + if version != expectedDbSchemaVersion { + return errors.Errorf("expected database schema v%d, got v%d", expectedDbSchemaVersion, version) } return nil } + +// monitorRedisSchema monitors rc's icinga:schema version validity. +func monitorRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos string) { + for { + var err error + pos, err = checkRedisSchema(logger, rc, pos) + + if err != nil { + logger.Fatalf("%+v", err) + } + } +} + +// checkRedisSchema verifies rc's icinga:schema version. +func checkRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos string) (newPos string, err error) { + if pos == "0-0" { + defer time.AfterFunc(3*time.Second, func() { logger.Info("Waiting for current Redis schema version") }).Stop() + } else { + logger.Debug("Waiting for new Redis schema version") + } + + cmd := rc.XRead(context.Background(), &redis.XReadArgs{Streams: []string{"icinga:schema", pos}}) + xRead, err := cmd.Result() + + if err != nil { + return "", icingaredis.WrapCmdErr(cmd) + } + + message := xRead[0].Messages[0] + if version := message.Values["version"]; version != expectedRedisSchemaVersion { + return "", errors.Errorf( + "unexpected Redis schema version: %q (expected %q)", version, expectedRedisSchemaVersion, + ) + } + + logger.Debug("Redis schema version is correct") + return message.ID, nil +}