diff --git a/connection/redis.go b/connection/redis.go index 657d1f4f..881e0fbf 100644 --- a/connection/redis.go +++ b/connection/redis.go @@ -6,6 +6,7 @@ import ( "github.com/go-redis/redis" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" + "strings" "sync" "sync/atomic" "time" @@ -105,7 +106,15 @@ func (rdbw *RDBWrapper) CompareAndSetConnected(connected bool) (swapped bool) { func NewRDBWrapper(address string) *RDBWrapper { log.Info("Connecting to Redis") + + // TODO: remove this in favor of https://github.com/go-redis/redis/pull/1165 + var net string + if strings.HasPrefix(address, "/") { + net = "unix" + } + rdb := redis.NewClient(&redis.Options{ + Network: net, Addr: address, DialTimeout: time.Minute / 2, ReadTimeout: time.Minute, diff --git a/connection/redis_pubsub_test.go b/connection/redis_pubsub_test.go index 8e97827c..ad592763 100644 --- a/connection/redis_pubsub_test.go +++ b/connection/redis_pubsub_test.go @@ -1,20 +1,24 @@ package connection import ( + "git.icinga.com/icingadb/icingadb-main/connection/redisd" "github.com/go-redis/redis" "github.com/stretchr/testify/assert" - "os" "testing" "time" ) func TestPubSubWrapper(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { diff --git a/connection/redis_test.go b/connection/redis_test.go index ea24eac7..164197b1 100644 --- a/connection/redis_test.go +++ b/connection/redis_test.go @@ -1,9 +1,9 @@ package connection import ( + "git.icinga.com/icingadb/icingadb-main/connection/redisd" "github.com/go-redis/redis" "github.com/stretchr/testify/assert" - "os" "sync" "sync/atomic" "testing" @@ -17,7 +17,18 @@ func NewTestRDBW(rdb RedisClient) RDBWrapper { } func TestNewRDBWrapper(t *testing.T) { - rdbw := NewRDBWrapper(os.Getenv("ICINGADB_TEST_REDIS")) + var server redisd.Server + + client, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + defer client.Close() + + rdbw := NewRDBWrapper(client.Options().Addr) assert.True(t, rdbw.CheckConnection(false), "Redis should be connected") rdbw = NewRDBWrapper("asdasdasdasdasd:5123") @@ -59,14 +70,19 @@ func TestRDBWrapper_GetConnectionCheckInterval(t *testing.T) { } func TestRDBWrapper_CheckConnection(t *testing.T) { + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(nil) - rdbw.Rdb = redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + rdbw.Rdb = rdb atomic.StoreUint32(rdbw.ConnectionLostCounterAtomic, 512312312) assert.True(t, rdbw.CheckConnection(false), "DBWrapper should be connected") assert.Equal(t, uint32(0), atomic.LoadUint32(rdbw.ConnectionLostCounterAtomic)) @@ -87,12 +103,16 @@ func TestRDBWrapper_CheckConnection(t *testing.T) { } func TestRDBWrapper_HGetAll(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -169,12 +189,16 @@ func TestRDBWrapper_HMGet(t *testing.T) { } func TestRDBWrapper_XRead(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -207,12 +231,16 @@ func TestRDBWrapper_XRead(t *testing.T) { } func TestRDBWrapper_XDel(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -242,12 +270,16 @@ func TestRDBWrapper_XDel(t *testing.T) { } func TestRDBWrapper_Publish(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -278,12 +310,16 @@ func TestRDBWrapper_Publish(t *testing.T) { } func TestRDBWrapper_TxPipelined(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { diff --git a/connection/redisd/config.go b/connection/redisd/config.go new file mode 100644 index 00000000..9c12e35b --- /dev/null +++ b/connection/redisd/config.go @@ -0,0 +1,65 @@ +package redisd + +import log "github.com/sirupsen/logrus" + +// configLogLevels maps the logrus log levels to Redis log levels (as in config). +var configLogLevels = map[log.Level]string{ + log.FatalLevel: "warning", + log.ErrorLevel: "warning", + log.WarnLevel: "warning", + log.InfoLevel: "notice", + log.DebugLevel: "verbose", +} + +// configTemplate is the constant part of Server's Redis config. +const configTemplate = ` +daemonize no +supervised no + +loglevel %s +logfile "" +syslog-enabled no + +dir "%s" + + +unixsocket "%s" +unixsocketperm 700 + +port 0 +bind 127.0.0.1 +protected-mode yes + +timeout 0 + + +databases 1 + +save "" + +stop-writes-on-bgsave-error yes + +rdbcompression yes +rdbchecksum yes +dbfilename dump.rdb + +appendonly no + + +maxclients 42 + +lua-time-limit 0 + +hash-max-ziplist-entries 512 +hash-max-ziplist-value 64 +list-max-ziplist-size -2 +list-compress-depth 0 +set-max-intset-entries 512 +zset-max-ziplist-entries 128 +zset-max-ziplist-value 64 +hll-sparse-max-bytes 3000 +activerehashing yes + +client-output-buffer-limit normal 0 0 0 +client-output-buffer-limit pubsub 0 0 0 +` diff --git a/connection/redisd/server.go b/connection/redisd/server.go new file mode 100644 index 00000000..01f0cc4b --- /dev/null +++ b/connection/redisd/server.go @@ -0,0 +1,223 @@ +package redisd + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "github.com/go-redis/redis" + log "github.com/sirupsen/logrus" + "io" + "io/ioutil" + "os" + "os/exec" + "path" + "regexp" + "strconv" + "syscall" + "time" +) + +// logLineRgx matches e.g. "1:M 23 Sep 2019 10:02:05.882 * Ready to accept connections". +var logLineRgx = regexp.MustCompile(`\A(\d+):([CM]) (.*?) ([-*#]) (.+)\z`) + +// roles contains the Redis processes' roles' descriptions by their indicator characters. +var roles = map[byte]string{ + 'C': "RDB writer", + 'M': "master", +} + +// logLevels maps the Redis log levels (as in log messages) to logrus log levels. +var logLevels = map[byte]log.Level{ + '-': log.DebugLevel, + '*': log.InfoLevel, + '#': log.WarnLevel, +} + +// Server represents a managed Redis server. +type Server struct { + // basedir is a directory containing the Redis server context. + basedir string + // cmd represents the main Redis process if any. + cmd *exec.Cmd + // stopped is closed as soon as the main Redis process is stopped. + stopped chan struct{} +} + +// redisVersion matches e.g. "redis_version:5.0.5". +var redisVersion = regexp.MustCompile(`(?m)^redis_version:(.*?)\s*$`) + +// redisVersionNumber matches e.g. "5". +var redisVersionNumber = regexp.MustCompile(`\d+`) + +// Start starts *s and returns a client for connecting to it. +func (s *Server) Start() (*redis.Client, error) { + { + var errTD error + s.basedir, errTD = ioutil.TempDir("", "") + + if errTD != nil { + return nil, errTD + } + } + + log.WithFields(log.Fields{"basedir": s.basedir}).Info("starting Redis server") + + workDir := path.Join(s.basedir, "work-dir") + + if errMkdir := os.Mkdir(workDir, 0700); errMkdir != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return nil, errMkdir + } + + config := &bytes.Buffer{} + socket := path.Join(s.basedir, "socket") + fmt.Fprintf(config, configTemplate, configLogLevels[log.GetLevel()], workDir, socket) + + cmd := exec.Command("redis-server", "-") + cmd.Dir = s.basedir + cmd.Stdin = config + + stdout, errStdout := cmd.StdoutPipe() + if errStdout != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return nil, errStdout + } + + if errStart := cmd.Start(); errStart != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return nil, errStart + } + + s.cmd = cmd + s.stopped = make(chan struct{}) + + go s.stdout2log(stdout) + + log.WithFields(log.Fields{"basedir": s.basedir}).Debug("checking the Redis server for actual serving") + + client := redis.NewClient(&redis.Options{ + Network: "unix", + Addr: socket, + ReadTimeout: time.Minute, + WriteTimeout: time.Minute, + }) + + for { + info, errInfo := client.Info().Result() + if errInfo == nil { + version := redisVersion.FindStringSubmatch(info) + if version == nil { + client.Close() + s.Stop() + return nil, errors.New("Redis server didn't tell its version") + } + + major := redisVersionNumber.FindString(version[1]) + if major == "" { + client.Close() + s.Stop() + return nil, errors.New("bad Redis server version: " + version[1]) + } + + majorInt, errMI := strconv.ParseUint(major, 10, 64) + if errMI != nil { + majorInt = ^uint64(0) + } + + if majorInt < 5 { + client.Close() + s.Stop() + return nil, errors.New(fmt.Sprintf("Redis server is too old (%s < 5)", version[1])) + } + + log.WithFields(log.Fields{ + "basedir": s.basedir, + "version": version[1], + }).Debug("Redis server is actually serving now") + return client, nil + } + + select { + case <-s.stopped: + client.Close() + return nil, errInfo + + default: + log.WithFields(log.Fields{ + "basedir": s.basedir, + "error": errInfo, + }).Debug("Redis server isn't actually serving, yet") + + time.Sleep(time.Second) + } + } +} + +// Stop stops *s. +func (s *Server) Stop() error { + log.Info("stopping Redis server") + + proc := s.cmd.Process + + if errSignal := proc.Signal(syscall.SIGTERM); errSignal != nil { + return errSignal + } + + <-s.stopped + return nil +} + +// stdout2log forwards the Redis server's log from stdout to logrus and manages s.basedir and s.stopped. +func (s *Server) stdout2log(stdout io.Reader) { + buffer := bufio.NewReader(stdout) + + for { + line, errRead := buffer.ReadBytes('\n') + if errRead != nil { + if errRead != io.EOF || len(line) > 0 { + log.WithFields(log.Fields{"error": errRead}).Error( + "got unexpected error while forwarding Redis server logs", + ) + } + + break + } + + line = line[:len(line)-1] + + if len(line) > 0 && '0' <= line[0] && line[0] <= '9' { + if submatch := logLineRgx.FindSubmatch(line); submatch != nil { + timeStamp, errTime := time.ParseInLocation( + "2 Jan 2006 15:04:05.999999999", string(submatch[3]), time.Local, + ) + if errTime != nil { + timeStamp = time.Now() + } + + pid, errPid := strconv.ParseUint(string(submatch[1]), 10, 64) + if errPid != nil { + pid = 0 + } + + log.WithTime(timeStamp).WithFields(map[string]interface{}{ + "component": "redis-server", + "pid": pid, + "role": roles[submatch[2][0]], + }).Log(logLevels[submatch[4][0]], string(submatch[5])) + } + } + } + + if errWait := s.cmd.Wait(); errWait != nil { + log.WithFields(log.Fields{"error": errWait}).Error("Redis server terminated with an error") + } + + os.RemoveAll(s.basedir) + s.basedir = "" + + close(s.stopped) +} diff --git a/ha/heartbeat_test.go b/ha/heartbeat_test.go index 55790453..00fffc84 100644 --- a/ha/heartbeat_test.go +++ b/ha/heartbeat_test.go @@ -3,9 +3,9 @@ package ha import ( "encoding/json" "git.icinga.com/icingadb/icingadb-main/connection" + "git.icinga.com/icingadb/icingadb-main/connection/redisd" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "os" "testing" "time" ) @@ -19,7 +19,18 @@ var icingastate = "{\"IcingaApplication\":" + "}}}}, \"config_dump_in_progress\": false}" func TestIcingaHeartbeatListener(t *testing.T) { - rdb := connection.NewRDBWrapper(os.Getenv("ICINGADB_TEST_REDIS")) + var server redisd.Server + + client, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + defer client.Close() + + rdb := connection.NewRDBWrapper(client.Options().Addr) assert.True(t, rdb.CheckConnection(false), "This test needs a working Redis connection") chEnv := make(chan *Environment)