From d9717d5bbbe973b565ddd78d916c03975e1d331a Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 23 Sep 2019 14:04:58 +0200 Subject: [PATCH 1/6] Tests: connect to temporary Redis --- connection/redis.go | 9 ++ connection/redis_pubsub_test.go | 18 ++- connection/redis_test.go | 112 ++++++++++------ connection/redisd/config.go | 65 ++++++++++ connection/redisd/server.go | 223 ++++++++++++++++++++++++++++++++ ha/heartbeat_test.go | 15 ++- 6 files changed, 395 insertions(+), 47 deletions(-) create mode 100644 connection/redisd/config.go create mode 100644 connection/redisd/server.go diff --git a/connection/redis.go b/connection/redis.go index 657d1f4f..881e0fbf 100644 --- a/connection/redis.go +++ b/connection/redis.go @@ -6,6 +6,7 @@ import ( "github.com/go-redis/redis" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" + "strings" "sync" "sync/atomic" "time" @@ -105,7 +106,15 @@ func (rdbw *RDBWrapper) CompareAndSetConnected(connected bool) (swapped bool) { func NewRDBWrapper(address string) *RDBWrapper { log.Info("Connecting to Redis") + + // TODO: remove this in favor of https://github.com/go-redis/redis/pull/1165 + var net string + if strings.HasPrefix(address, "/") { + net = "unix" + } + rdb := redis.NewClient(&redis.Options{ + Network: net, Addr: address, DialTimeout: time.Minute / 2, ReadTimeout: time.Minute, diff --git a/connection/redis_pubsub_test.go b/connection/redis_pubsub_test.go index 8e97827c..ad592763 100644 --- a/connection/redis_pubsub_test.go +++ b/connection/redis_pubsub_test.go @@ -1,20 +1,24 @@ package connection import ( + "git.icinga.com/icingadb/icingadb-main/connection/redisd" "github.com/go-redis/redis" "github.com/stretchr/testify/assert" - "os" "testing" "time" ) func TestPubSubWrapper(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { diff --git a/connection/redis_test.go b/connection/redis_test.go index ea24eac7..164197b1 100644 --- a/connection/redis_test.go +++ b/connection/redis_test.go @@ -1,9 +1,9 @@ package connection import ( + "git.icinga.com/icingadb/icingadb-main/connection/redisd" "github.com/go-redis/redis" "github.com/stretchr/testify/assert" - "os" "sync" "sync/atomic" "testing" @@ -17,7 +17,18 @@ func NewTestRDBW(rdb RedisClient) RDBWrapper { } func TestNewRDBWrapper(t *testing.T) { - rdbw := NewRDBWrapper(os.Getenv("ICINGADB_TEST_REDIS")) + var server redisd.Server + + client, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + defer client.Close() + + rdbw := NewRDBWrapper(client.Options().Addr) assert.True(t, rdbw.CheckConnection(false), "Redis should be connected") rdbw = NewRDBWrapper("asdasdasdasdasd:5123") @@ -59,14 +70,19 @@ func TestRDBWrapper_GetConnectionCheckInterval(t *testing.T) { } func TestRDBWrapper_CheckConnection(t *testing.T) { + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(nil) - rdbw.Rdb = redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + rdbw.Rdb = rdb atomic.StoreUint32(rdbw.ConnectionLostCounterAtomic, 512312312) assert.True(t, rdbw.CheckConnection(false), "DBWrapper should be connected") assert.Equal(t, uint32(0), atomic.LoadUint32(rdbw.ConnectionLostCounterAtomic)) @@ -87,12 +103,16 @@ func TestRDBWrapper_CheckConnection(t *testing.T) { } func TestRDBWrapper_HGetAll(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -169,12 +189,16 @@ func TestRDBWrapper_HMGet(t *testing.T) { } func TestRDBWrapper_XRead(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -207,12 +231,16 @@ func TestRDBWrapper_XRead(t *testing.T) { } func TestRDBWrapper_XDel(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -242,12 +270,16 @@ func TestRDBWrapper_XDel(t *testing.T) { } func TestRDBWrapper_Publish(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -278,12 +310,16 @@ func TestRDBWrapper_Publish(t *testing.T) { } func TestRDBWrapper_TxPipelined(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { diff --git a/connection/redisd/config.go b/connection/redisd/config.go new file mode 100644 index 00000000..9c12e35b --- /dev/null +++ b/connection/redisd/config.go @@ -0,0 +1,65 @@ +package redisd + +import log "github.com/sirupsen/logrus" + +// configLogLevels maps the logrus log levels to Redis log levels (as in config). +var configLogLevels = map[log.Level]string{ + log.FatalLevel: "warning", + log.ErrorLevel: "warning", + log.WarnLevel: "warning", + log.InfoLevel: "notice", + log.DebugLevel: "verbose", +} + +// configTemplate is the constant part of Server's Redis config. +const configTemplate = ` +daemonize no +supervised no + +loglevel %s +logfile "" +syslog-enabled no + +dir "%s" + + +unixsocket "%s" +unixsocketperm 700 + +port 0 +bind 127.0.0.1 +protected-mode yes + +timeout 0 + + +databases 1 + +save "" + +stop-writes-on-bgsave-error yes + +rdbcompression yes +rdbchecksum yes +dbfilename dump.rdb + +appendonly no + + +maxclients 42 + +lua-time-limit 0 + +hash-max-ziplist-entries 512 +hash-max-ziplist-value 64 +list-max-ziplist-size -2 +list-compress-depth 0 +set-max-intset-entries 512 +zset-max-ziplist-entries 128 +zset-max-ziplist-value 64 +hll-sparse-max-bytes 3000 +activerehashing yes + +client-output-buffer-limit normal 0 0 0 +client-output-buffer-limit pubsub 0 0 0 +` diff --git a/connection/redisd/server.go b/connection/redisd/server.go new file mode 100644 index 00000000..01f0cc4b --- /dev/null +++ b/connection/redisd/server.go @@ -0,0 +1,223 @@ +package redisd + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "github.com/go-redis/redis" + log "github.com/sirupsen/logrus" + "io" + "io/ioutil" + "os" + "os/exec" + "path" + "regexp" + "strconv" + "syscall" + "time" +) + +// logLineRgx matches e.g. "1:M 23 Sep 2019 10:02:05.882 * Ready to accept connections". +var logLineRgx = regexp.MustCompile(`\A(\d+):([CM]) (.*?) ([-*#]) (.+)\z`) + +// roles contains the Redis processes' roles' descriptions by their indicator characters. +var roles = map[byte]string{ + 'C': "RDB writer", + 'M': "master", +} + +// logLevels maps the Redis log levels (as in log messages) to logrus log levels. +var logLevels = map[byte]log.Level{ + '-': log.DebugLevel, + '*': log.InfoLevel, + '#': log.WarnLevel, +} + +// Server represents a managed Redis server. +type Server struct { + // basedir is a directory containing the Redis server context. + basedir string + // cmd represents the main Redis process if any. + cmd *exec.Cmd + // stopped is closed as soon as the main Redis process is stopped. + stopped chan struct{} +} + +// redisVersion matches e.g. "redis_version:5.0.5". +var redisVersion = regexp.MustCompile(`(?m)^redis_version:(.*?)\s*$`) + +// redisVersionNumber matches e.g. "5". +var redisVersionNumber = regexp.MustCompile(`\d+`) + +// Start starts *s and returns a client for connecting to it. +func (s *Server) Start() (*redis.Client, error) { + { + var errTD error + s.basedir, errTD = ioutil.TempDir("", "") + + if errTD != nil { + return nil, errTD + } + } + + log.WithFields(log.Fields{"basedir": s.basedir}).Info("starting Redis server") + + workDir := path.Join(s.basedir, "work-dir") + + if errMkdir := os.Mkdir(workDir, 0700); errMkdir != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return nil, errMkdir + } + + config := &bytes.Buffer{} + socket := path.Join(s.basedir, "socket") + fmt.Fprintf(config, configTemplate, configLogLevels[log.GetLevel()], workDir, socket) + + cmd := exec.Command("redis-server", "-") + cmd.Dir = s.basedir + cmd.Stdin = config + + stdout, errStdout := cmd.StdoutPipe() + if errStdout != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return nil, errStdout + } + + if errStart := cmd.Start(); errStart != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return nil, errStart + } + + s.cmd = cmd + s.stopped = make(chan struct{}) + + go s.stdout2log(stdout) + + log.WithFields(log.Fields{"basedir": s.basedir}).Debug("checking the Redis server for actual serving") + + client := redis.NewClient(&redis.Options{ + Network: "unix", + Addr: socket, + ReadTimeout: time.Minute, + WriteTimeout: time.Minute, + }) + + for { + info, errInfo := client.Info().Result() + if errInfo == nil { + version := redisVersion.FindStringSubmatch(info) + if version == nil { + client.Close() + s.Stop() + return nil, errors.New("Redis server didn't tell its version") + } + + major := redisVersionNumber.FindString(version[1]) + if major == "" { + client.Close() + s.Stop() + return nil, errors.New("bad Redis server version: " + version[1]) + } + + majorInt, errMI := strconv.ParseUint(major, 10, 64) + if errMI != nil { + majorInt = ^uint64(0) + } + + if majorInt < 5 { + client.Close() + s.Stop() + return nil, errors.New(fmt.Sprintf("Redis server is too old (%s < 5)", version[1])) + } + + log.WithFields(log.Fields{ + "basedir": s.basedir, + "version": version[1], + }).Debug("Redis server is actually serving now") + return client, nil + } + + select { + case <-s.stopped: + client.Close() + return nil, errInfo + + default: + log.WithFields(log.Fields{ + "basedir": s.basedir, + "error": errInfo, + }).Debug("Redis server isn't actually serving, yet") + + time.Sleep(time.Second) + } + } +} + +// Stop stops *s. +func (s *Server) Stop() error { + log.Info("stopping Redis server") + + proc := s.cmd.Process + + if errSignal := proc.Signal(syscall.SIGTERM); errSignal != nil { + return errSignal + } + + <-s.stopped + return nil +} + +// stdout2log forwards the Redis server's log from stdout to logrus and manages s.basedir and s.stopped. +func (s *Server) stdout2log(stdout io.Reader) { + buffer := bufio.NewReader(stdout) + + for { + line, errRead := buffer.ReadBytes('\n') + if errRead != nil { + if errRead != io.EOF || len(line) > 0 { + log.WithFields(log.Fields{"error": errRead}).Error( + "got unexpected error while forwarding Redis server logs", + ) + } + + break + } + + line = line[:len(line)-1] + + if len(line) > 0 && '0' <= line[0] && line[0] <= '9' { + if submatch := logLineRgx.FindSubmatch(line); submatch != nil { + timeStamp, errTime := time.ParseInLocation( + "2 Jan 2006 15:04:05.999999999", string(submatch[3]), time.Local, + ) + if errTime != nil { + timeStamp = time.Now() + } + + pid, errPid := strconv.ParseUint(string(submatch[1]), 10, 64) + if errPid != nil { + pid = 0 + } + + log.WithTime(timeStamp).WithFields(map[string]interface{}{ + "component": "redis-server", + "pid": pid, + "role": roles[submatch[2][0]], + }).Log(logLevels[submatch[4][0]], string(submatch[5])) + } + } + } + + if errWait := s.cmd.Wait(); errWait != nil { + log.WithFields(log.Fields{"error": errWait}).Error("Redis server terminated with an error") + } + + os.RemoveAll(s.basedir) + s.basedir = "" + + close(s.stopped) +} diff --git a/ha/heartbeat_test.go b/ha/heartbeat_test.go index 55790453..00fffc84 100644 --- a/ha/heartbeat_test.go +++ b/ha/heartbeat_test.go @@ -3,9 +3,9 @@ package ha import ( "encoding/json" "git.icinga.com/icingadb/icingadb-main/connection" + "git.icinga.com/icingadb/icingadb-main/connection/redisd" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "os" "testing" "time" ) @@ -19,7 +19,18 @@ var icingastate = "{\"IcingaApplication\":" + "}}}}, \"config_dump_in_progress\": false}" func TestIcingaHeartbeatListener(t *testing.T) { - rdb := connection.NewRDBWrapper(os.Getenv("ICINGADB_TEST_REDIS")) + var server redisd.Server + + client, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + defer client.Close() + + rdb := connection.NewRDBWrapper(client.Options().Addr) assert.True(t, rdb.CheckConnection(false), "This test needs a working Redis connection") chEnv := make(chan *Environment) From fa9941d0c9a2ff57e69b31b6679d765a014bbfde Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 24 Sep 2019 16:21:37 +0200 Subject: [PATCH 2/6] Tests: connect to temporary MySQLd --- connection/mysql_test.go | 92 ++++++++++++- connection/mysqld/server.go | 261 ++++++++++++++++++++++++++++++++++++ 2 files changed, 350 insertions(+), 3 deletions(-) create mode 100644 connection/mysqld/server.go diff --git a/connection/mysql_test.go b/connection/mysql_test.go index b7067c15..7b6f27c4 100644 --- a/connection/mysql_test.go +++ b/connection/mysql_test.go @@ -1,14 +1,16 @@ package connection import ( + "bytes" "context" "database/sql" "errors" + "fmt" + "git.icinga.com/icingadb/icingadb-main/connection/mysqld" "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "os" "sync" "sync/atomic" "testing" @@ -161,7 +163,22 @@ func TestDBWrapper_SqlBegin(t *testing.T) { } func TestDBWrapper_SqlTransaction(t *testing.T) { - dbw, err := NewDBWrapper(os.Getenv("ICINGADB_TEST_MYSQL")) + var server mysqld.Server + + host, errSt := server.Start() + if errSt != nil { + t.Fatal(errSt) + return + } + + defer server.Stop() + + if errMTD := mkTestDb(host); errMTD != nil { + t.Fatal(errMTD) + return + } + + dbw, err := NewDBWrapper(fmt.Sprintf("icingadb:icingadb@%s/icingadb", host)) require.NoError(t, err, "Is the MySQL server running?") err = dbw.SqlTransaction(false, true, false, func(tx DbTransaction) error { @@ -295,7 +312,22 @@ func TestGetConnectionCheckInterval(t *testing.T) { } func TestDBWrapper_SqlFetchAll(t *testing.T) { - dbw, err := NewDBWrapper(os.Getenv("ICINGADB_TEST_MYSQL")) + var server mysqld.Server + + host, errSt := server.Start() + if errSt != nil { + t.Fatal(errSt) + return + } + + defer server.Stop() + + if errMTD := mkTestDb(host); errMTD != nil { + t.Fatal(errMTD) + return + } + + dbw, err := NewDBWrapper(fmt.Sprintf("icingadb:icingadb@%s/icingadb", host)) require.NoError(t, err, "Is the MySQL server running?") _, err = dbw.Db.Exec("CREATE TABLE testing0815 (id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, name varchar(255) NOT NULL)") @@ -324,3 +356,57 @@ func TestDBWrapper_SqlFetchAll(t *testing.T) { _, err = dbw.Db.Exec("DROP TABLE testing0815") assert.NoError(t, err) } + +var cComment = regexp.MustCompile(`/\*.*?\*/`) + +func mkTestDb(host string) error { + noDb, errNoDb := sql.Open("mysql", fmt.Sprintf("root@%s/", host)) + if errNoDb != nil { + return errNoDb + } + + defer noDb.Close() + + for _, ddl := range []string{ + "CREATE DATABASE icingadb", + "GRANT ALL ON icingadb.* TO icingadb@localhost IDENTIFIED BY 'icingadb'", + } { + if _, errEx := noDb.Exec(ddl); errEx != nil { + return errEx + } + } + + db, errDb := sql.Open("mysql", fmt.Sprintf("icingadb:icingadb@%s/icingadb", host)) + if errDb != nil { + return errDb + } + + defer db.Close() + + _, thisFile, _, _ := runtime.Caller(0) + schema := path.Join(filepath.Dir(filepath.Dir(thisFile)), "etc/schema/mysql") + + entries, errRD := ioutil.ReadDir(schema) + if errRD != nil { + return errRD + } + + for _, entry := range entries { + if name := entry.Name(); strings.HasSuffix(name, ".sql") { + ddls, errRF := ioutil.ReadFile(path.Join(schema, name)) + if errRF != nil { + return errRF + } + + for _, ddl := range bytes.Split(ddls, []byte{';'}) { + if ddl = bytes.TrimSpace(cComment.ReplaceAll(ddl, nil)); len(ddl) > 0 { + if _, errEx := db.Exec(string(ddl)); errEx != nil { + return errEx + } + } + } + } + } + + return nil +} diff --git a/connection/mysqld/server.go b/connection/mysqld/server.go new file mode 100644 index 00000000..a0026c2a --- /dev/null +++ b/connection/mysqld/server.go @@ -0,0 +1,261 @@ +package mysqld + +import ( + "bufio" + "database/sql" + "fmt" + log "github.com/sirupsen/logrus" + "io" + "io/ioutil" + "os" + "os/exec" + "os/user" + "path" + "regexp" + "strconv" + "syscall" + "time" +) + +// logLineRgx matches e.g. "2019-09-23 13:39:55 0 [Note] InnoDB: Using Linux native AIO". +var logLineRgx = regexp.MustCompile(`\A(\S+ \S+) (\d+) \[(\w+)] (.+)\z`) + +// logLevels maps the MySQLd log levels (as in log messages) to logrus log levels. +var logLevels = map[string]log.Level{ + "Note": log.InfoLevel, + "Warning": log.WarnLevel, + "ERROR": log.ErrorLevel, +} + +// Server represents a managed MySQL server. +type Server struct { + // basedir is a directory containing the MySQL server context. + basedir string + // cmd represents the main MySQLd process if any. + cmd *exec.Cmd + // stopped is closed as soon as the main MySQLd process is stopped. + stopped chan struct{} + // errorLogEof is closed on the main MySQLd process' error log EOF. + errorLogEof chan struct{} + // logPipeCloser is the result of opening the log pipe for writing to ensure our reader's termination. + logPipeCloser io.Closer +} + +// Start starts *s and returns the host to connect to. +func (s *Server) Start() (string, error) { + me, errUC := user.Current() + if errUC != nil { + return "", errUC + } + + { + var errTD error + s.basedir, errTD = ioutil.TempDir("", "") + + if errTD != nil { + return "", errTD + } + } + + log.WithFields(log.Fields{"basedir": s.basedir}).Info("starting MySQL server") + + socket := path.Join(s.basedir, "socket") + host := fmt.Sprintf("unix(%s)", socket) + + db, errOpen := sql.Open("mysql", fmt.Sprintf("root@%s/", host)) + if errOpen != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return "", errOpen + } + + defer db.Close() + + dataDir := path.Join(s.basedir, "data") + if errMkdir := os.Mkdir(dataDir, 0700); errMkdir != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return "", errMkdir + } + + logPipe := path.Join(s.basedir, "log") + if errMkfifo := syscall.Mkfifo(logPipe, 0700); errMkfifo != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return "", errMkfifo + } + + params := []string{ + "--no-defaults", + "--user=" + me.Username, + "--pid-file=" + path.Join(s.basedir, "pid"), + "--socket=" + socket, + "--basedir=/usr", + "--datadir=" + dataDir, + "--tmpdir=/tmp", + "--lc-messages-dir=/usr/share/mysql", + "--skip-networking", + "--query_cache_size=16M", + "--expire_logs_days=10", + "--character-set-server=utf8mb4", + "--collation-server=utf8mb4_general_ci", + } + + { + cmd := exec.Command("mysql_install_db", append(params, "--log_error=/dev/null")...) + cmd.Dir = s.basedir + + if errRun := cmd.Run(); errRun != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return "", errRun + } + } + + s.errorLogEof = make(chan struct{}) + go s.file2log(logPipe) + + logPipeWriter, errCr := os.Create(logPipe) + if errCr != nil { + os.RemoveAll(s.basedir) + s.basedir = "" + return "", errCr + } + + cmd := exec.Command("mysqld", append(params, "--log_error="+logPipe)...) + cmd.Dir = s.basedir + + stderr, errStderr := cmd.StderrPipe() + if errStderr != nil { + logPipeWriter.Close() + + os.RemoveAll(s.basedir) + s.basedir = "" + + return "", errStderr + } + + if errStart := cmd.Start(); errStart != nil { + logPipeWriter.Close() + + os.RemoveAll(s.basedir) + s.basedir = "" + + return "", errStart + } + + s.cmd = cmd + s.stopped = make(chan struct{}) + s.logPipeCloser = logPipeWriter + + go s.stderr2log(stderr) + + log.WithFields(log.Fields{"basedir": s.basedir}).Debug("checking the MySQL server for actual serving") + + for { + errPing := db.Ping() + if errPing == nil { + log.WithFields(log.Fields{"basedir": s.basedir}).Debug("MySQL server is actually serving now") + return host, nil + } + + select { + case <-s.stopped: + return "", errPing + + default: + log.WithFields(log.Fields{ + "basedir": s.basedir, + "error": errPing, + }).Debug("MySQL server isn't actually serving, yet") + + time.Sleep(time.Second) + } + } +} + +// Stop stops *s. +func (s *Server) Stop() error { + log.Info("stopping MySQL server") + + if errSignal := s.cmd.Process.Signal(syscall.SIGTERM); errSignal != nil { + return errSignal + } + + <-s.stopped + return nil +} + +// file2log forwards the MySQL server's log from path to logrus. +func (s *Server) file2log(path string) { + defer close(s.errorLogEof) + + stream, errOpen := os.Open(path) + if errOpen != nil { + log.WithFields(log.Fields{"source": "log file", "error": errOpen}).Error( + "got unexpected error while forwarding MySQL server logs", + ) + return + } + + defer stream.Close() + + stream2log(stream, "log file") +} + +// stderr2log forwards the MySQL server's log from stderr to logrus and cleans up *s. +func (s *Server) stderr2log(stderr io.Reader) { + stream2log(stderr, "stderr") + + if errWait := s.cmd.Wait(); errWait != nil { + log.WithFields(log.Fields{"error": errWait}).Error("MySQL server terminated with an error") + } + + s.logPipeCloser.Close() + <-s.errorLogEof + + os.RemoveAll(s.basedir) + s.basedir = "" + + close(s.stopped) +} + +// stream2log forwards the MySQL server's log from stream to logrus. +func stream2log(stream io.Reader, source string) { + buffer := bufio.NewReader(stream) + + for { + line, errRead := buffer.ReadBytes('\n') + if errRead != nil { + if errRead != io.EOF || len(line) > 0 { + log.WithFields(log.Fields{"source": source, "error": errRead}).Error( + "got unexpected error while forwarding MySQL server logs", + ) + } + + break + } + + line = line[:len(line)-1] + + if len(line) > 0 { + if submatch := logLineRgx.FindSubmatch(line); submatch != nil { + timeStamp, errTime := time.ParseInLocation("2006-01-02 15:04:05", string(submatch[1]), time.Local) + if errTime != nil { + timeStamp = time.Now() + } + + thread, errPU := strconv.ParseUint(string(submatch[2]), 10, 64) + if errPU != nil { + thread = 0 + } + + log.WithTime(timeStamp).WithFields(log.Fields{ + "component": "mysqld", + "source": source, + "thread": thread, + }).Log(logLevels[string(submatch[3])], string(submatch[4])) + } + } + } +} From c9bcfd772934aea872177a8519d93a6543495f08 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 26 Sep 2019 14:36:14 +0200 Subject: [PATCH 3/6] Temporary MySQLd: support MacOS --- connection/mysqld/server.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/connection/mysqld/server.go b/connection/mysqld/server.go index a0026c2a..14846c5c 100644 --- a/connection/mysqld/server.go +++ b/connection/mysqld/server.go @@ -90,10 +90,8 @@ func (s *Server) Start() (string, error) { "--user=" + me.Username, "--pid-file=" + path.Join(s.basedir, "pid"), "--socket=" + socket, - "--basedir=/usr", "--datadir=" + dataDir, "--tmpdir=/tmp", - "--lc-messages-dir=/usr/share/mysql", "--skip-networking", "--query_cache_size=16M", "--expire_logs_days=10", @@ -102,7 +100,21 @@ func (s *Server) Start() (string, error) { } { - cmd := exec.Command("mysql_install_db", append(params, "--log_error=/dev/null")...) + cmd := exec.Command("mysql_install_db") + + realPath, errRL := os.Readlink(cmd.Path) + if errRL != nil { + realPath = cmd.Path + } + + if !path.IsAbs(realPath) { + realPath = path.Join(path.Dir(cmd.Path), realPath) + } + + basedir := path.Dir(path.Dir(realPath)) + params = append(params, "--basedir="+basedir, "--lc-messages-dir="+path.Join(basedir, "share/mysql/english")) + + cmd.Args = append(params, "--log_error="+path.Join(s.basedir, "install")) cmd.Dir = s.basedir if errRun := cmd.Run(); errRun != nil { From 88359e841d9b6f3890b59e7d574fc68e14c482f0 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 26 Sep 2019 16:21:50 +0200 Subject: [PATCH 4/6] Temporary MySQLd: support MariaDB 10.4 --- connection/mysql_test.go | 9 ++------- connection/mysqld/server.go | 1 + 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/connection/mysql_test.go b/connection/mysql_test.go index 7b6f27c4..b26fea07 100644 --- a/connection/mysql_test.go +++ b/connection/mysql_test.go @@ -367,13 +367,8 @@ func mkTestDb(host string) error { defer noDb.Close() - for _, ddl := range []string{ - "CREATE DATABASE icingadb", - "GRANT ALL ON icingadb.* TO icingadb@localhost IDENTIFIED BY 'icingadb'", - } { - if _, errEx := noDb.Exec(ddl); errEx != nil { - return errEx - } + if _, errEx := noDb.Exec("CREATE DATABASE icingadb"); errEx != nil { + return errEx } db, errDb := sql.Open("mysql", fmt.Sprintf("icingadb:icingadb@%s/icingadb", host)) diff --git a/connection/mysqld/server.go b/connection/mysqld/server.go index 14846c5c..7ba29c3a 100644 --- a/connection/mysqld/server.go +++ b/connection/mysqld/server.go @@ -97,6 +97,7 @@ func (s *Server) Start() (string, error) { "--expire_logs_days=10", "--character-set-server=utf8mb4", "--collation-server=utf8mb4_general_ci", + "--skip-grant-tables", } { From c8b16b4d88b2465dfbfc54da5fda951372ff3fa1 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Mon, 30 Sep 2019 09:58:25 +0200 Subject: [PATCH 5/6] Redis: Use temporary redis while testing --- connection/redis_test.go | 64 +++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/connection/redis_test.go b/connection/redis_test.go index 164197b1..628bba71 100644 --- a/connection/redis_test.go +++ b/connection/redis_test.go @@ -145,12 +145,16 @@ func TestRDBWrapper_HGetAll(t *testing.T) { } func TestRDBWrapper_HKeys(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -168,12 +172,16 @@ func TestRDBWrapper_HKeys(t *testing.T) { } func TestRDBWrapper_HMGet(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -357,12 +365,16 @@ func TestRDBWrapper_TxPipelined(t *testing.T) { } func TestRDBWrapper_PipeConfigChunks(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { @@ -382,12 +394,16 @@ func TestRDBWrapper_PipeConfigChunks(t *testing.T) { } func TestRDBWrapper_PipeChecksumChunks(t *testing.T) { - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) + var server redisd.Server + + rdb, errSrv := server.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer server.Stop() + rdbw := NewTestRDBW(rdb) if !rdbw.CheckConnection(true) { From 35d6f481254a0c0db111a8a567c556bc34941a70 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 30 Sep 2019 11:09:23 +0200 Subject: [PATCH 6/6] ha/ha_test.go: connect to temporary Redis and MySQLd --- connection/mysql_test.go | 54 +--------- connection/mysqld/testdb.go | 62 ++++++++++++ ha/ha_test.go | 196 +++++++++++++++++++++++++++++++----- 3 files changed, 237 insertions(+), 75 deletions(-) create mode 100644 connection/mysqld/testdb.go diff --git a/connection/mysql_test.go b/connection/mysql_test.go index b26fea07..486b9557 100644 --- a/connection/mysql_test.go +++ b/connection/mysql_test.go @@ -1,7 +1,6 @@ package connection import ( - "bytes" "context" "database/sql" "errors" @@ -173,7 +172,7 @@ func TestDBWrapper_SqlTransaction(t *testing.T) { defer server.Stop() - if errMTD := mkTestDb(host); errMTD != nil { + if errMTD := mysqld.MkTestDb(host); errMTD != nil { t.Fatal(errMTD) return } @@ -322,7 +321,7 @@ func TestDBWrapper_SqlFetchAll(t *testing.T) { defer server.Stop() - if errMTD := mkTestDb(host); errMTD != nil { + if errMTD := mysqld.MkTestDb(host); errMTD != nil { t.Fatal(errMTD) return } @@ -356,52 +355,3 @@ func TestDBWrapper_SqlFetchAll(t *testing.T) { _, err = dbw.Db.Exec("DROP TABLE testing0815") assert.NoError(t, err) } - -var cComment = regexp.MustCompile(`/\*.*?\*/`) - -func mkTestDb(host string) error { - noDb, errNoDb := sql.Open("mysql", fmt.Sprintf("root@%s/", host)) - if errNoDb != nil { - return errNoDb - } - - defer noDb.Close() - - if _, errEx := noDb.Exec("CREATE DATABASE icingadb"); errEx != nil { - return errEx - } - - db, errDb := sql.Open("mysql", fmt.Sprintf("icingadb:icingadb@%s/icingadb", host)) - if errDb != nil { - return errDb - } - - defer db.Close() - - _, thisFile, _, _ := runtime.Caller(0) - schema := path.Join(filepath.Dir(filepath.Dir(thisFile)), "etc/schema/mysql") - - entries, errRD := ioutil.ReadDir(schema) - if errRD != nil { - return errRD - } - - for _, entry := range entries { - if name := entry.Name(); strings.HasSuffix(name, ".sql") { - ddls, errRF := ioutil.ReadFile(path.Join(schema, name)) - if errRF != nil { - return errRF - } - - for _, ddl := range bytes.Split(ddls, []byte{';'}) { - if ddl = bytes.TrimSpace(cComment.ReplaceAll(ddl, nil)); len(ddl) > 0 { - if _, errEx := db.Exec(string(ddl)); errEx != nil { - return errEx - } - } - } - } - } - - return nil -} diff --git a/connection/mysqld/testdb.go b/connection/mysqld/testdb.go new file mode 100644 index 00000000..88af96d9 --- /dev/null +++ b/connection/mysqld/testdb.go @@ -0,0 +1,62 @@ +package mysqld + +import ( + "bytes" + "database/sql" + "fmt" + "io/ioutil" + "path" + "path/filepath" + "regexp" + "runtime" + "strings" +) + +var cComment = regexp.MustCompile(`/\*.*?\*/`) + +func MkTestDb(host string) error { + noDb, errNoDb := sql.Open("mysql", fmt.Sprintf("root@%s/", host)) + if errNoDb != nil { + return errNoDb + } + + defer noDb.Close() + + if _, errEx := noDb.Exec("CREATE DATABASE icingadb"); errEx != nil { + return errEx + } + + db, errDb := sql.Open("mysql", fmt.Sprintf("icingadb:icingadb@%s/icingadb", host)) + if errDb != nil { + return errDb + } + + defer db.Close() + + _, thisFile, _, _ := runtime.Caller(0) + schema := path.Join(filepath.Dir(filepath.Dir(filepath.Dir(thisFile))), "etc/schema/mysql") + + entries, errRD := ioutil.ReadDir(schema) + if errRD != nil { + return errRD + } + + for _, entry := range entries { + if name := entry.Name(); strings.HasSuffix(name, ".sql") { + ddls, errRF := ioutil.ReadFile(path.Join(schema, name)) + if errRF != nil { + return errRF + } + + for _, ddl := range bytes.Split(ddls, []byte{';'}) { + if ddl = bytes.TrimSpace(cComment.ReplaceAll(ddl, nil)); len(ddl) > 0 { + if _, errEx := db.Exec(string(ddl)); errEx != nil { + return errEx + } + } + } + } + } + + return nil +} diff --git a/ha/ha_test.go b/ha/ha_test.go index 64c5ee15..04b00018 100644 --- a/ha/ha_test.go +++ b/ha/ha_test.go @@ -2,23 +2,25 @@ package ha import ( "crypto/sha1" + "fmt" "git.icinga.com/icingadb/icingadb-main/connection" + "git.icinga.com/icingadb/icingadb-main/connection/mysqld" + "git.icinga.com/icingadb/icingadb-main/connection/redisd" "git.icinga.com/icingadb/icingadb-main/supervisor" "github.com/go-redis/redis" "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "os" "sync" "testing" "time" ) -func createTestingHA(t *testing.T) *HA { - redisConn := connection.NewRDBWrapper(os.Getenv("ICINGADB_TEST_REDIS")) +func createTestingHA(t *testing.T, redisAddr, mysqlHost string) *HA { + redisConn := connection.NewRDBWrapper(redisAddr) - mysqlConn, err := connection.NewDBWrapper(os.Getenv("ICINGADB_TEST_MYSQL")) + mysqlConn, err := connection.NewDBWrapper(fmt.Sprintf("icingadb:icingadb@%s/icingadb", mysqlHost)) if err != nil { assert.Fail(t, "This test needs a working Redis connection!") } @@ -50,7 +52,33 @@ func createTestingHA(t *testing.T) *HA { var mysqlTestObserver = connection.DbIoSeconds.WithLabelValues("mysql", "test") func TestHA_InsertInstance(t *testing.T) { - ha := createTestingHA(t) + var redisServer redisd.Server + + client, errSrv := redisServer.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer redisServer.Stop() + defer client.Close() + + var mysqlServer mysqld.Server + + host, errSt := mysqlServer.Start() + if errSt != nil { + t.Fatal(errSt) + return + } + + defer mysqlServer.Stop() + + if errMTD := mysqld.MkTestDb(host); errMTD != nil { + t.Fatal(errMTD) + return + } + + ha := createTestingHA(t, client.Options().Addr, host) err := ha.insertInstance() require.NoError(t, err, "insertInstance should not return an error") @@ -70,7 +98,33 @@ func TestHA_InsertInstance(t *testing.T) { } func TestHA_checkResponsibility(t *testing.T) { - ha := createTestingHA(t) + var redisServer redisd.Server + + client, errSrv := redisServer.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer redisServer.Stop() + defer client.Close() + + var mysqlServer mysqld.Server + + host, errSt := mysqlServer.Start() + if errSt != nil { + t.Fatal(errSt) + return + } + + defer mysqlServer.Stop() + + if errMTD := mysqld.MkTestDb(host); errMTD != nil { + t.Fatal(errMTD) + return + } + + ha := createTestingHA(t, client.Options().Addr, host) ha.checkResponsibility() assert.Equal(t, true, ha.isActive, "HA should be responsible, if no other instance is active") @@ -103,7 +157,33 @@ func TestHA_checkResponsibility(t *testing.T) { } func TestHA_waitForEnvironment(t *testing.T) { - ha := createTestingHA(t) + var redisServer redisd.Server + + client, errSrv := redisServer.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer redisServer.Stop() + defer client.Close() + + var mysqlServer mysqld.Server + + host, errSt := mysqlServer.Start() + if errSt != nil { + t.Fatal(errSt) + return + } + + defer mysqlServer.Stop() + + if errMTD := mysqld.MkTestDb(host); errMTD != nil { + t.Fatal(errMTD) + return + } + + ha := createTestingHA(t, client.Options().Addr, host) chEnv := make(chan *Environment) @@ -138,7 +218,33 @@ func TestHA_waitForEnvironment(t *testing.T) { } func TestHA_runHA(t *testing.T) { - ha := createTestingHA(t) + var redisServer redisd.Server + + client, errSrv := redisServer.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer redisServer.Stop() + defer client.Close() + + var mysqlServer mysqld.Server + + host, errSt := mysqlServer.Start() + if errSt != nil { + t.Fatal(errSt) + return + } + + defer mysqlServer.Stop() + + if errMTD := mysqld.MkTestDb(host); errMTD != nil { + t.Fatal(errMTD) + return + } + + ha := createTestingHA(t, client.Options().Addr, host) ha.heartbeatTimer = time.NewTimer(10 * time.Second) chEnv := make(chan *Environment) @@ -175,7 +281,33 @@ func TestHA_runHA(t *testing.T) { } func TestHA_NotificationListeners(t *testing.T) { - ha := createTestingHA(t) + var redisServer redisd.Server + + client, errSrv := redisServer.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer redisServer.Stop() + defer client.Close() + + var mysqlServer mysqld.Server + + host, errSt := mysqlServer.Start() + if errSt != nil { + t.Fatal(errSt) + return + } + + defer mysqlServer.Stop() + + if errMTD := mysqld.MkTestDb(host); errMTD != nil { + t.Fatal(errMTD) + return + } + + ha := createTestingHA(t, client.Options().Addr, host) chHost := ha.RegisterNotificationListener("host") wg := sync.WaitGroup{} @@ -213,18 +345,36 @@ func TestHA_NotificationListeners(t *testing.T) { } func TestHA_EventListener(t *testing.T) { - ha := createTestingHA(t) + var redisServer redisd.Server + + client, errSrv := redisServer.Start() + if errSrv != nil { + t.Fatal(errSrv) + return + } + + defer redisServer.Stop() + + var mysqlServer mysqld.Server + + host, errSt := mysqlServer.Start() + if errSt != nil { + t.Fatal(errSt) + return + } + + defer mysqlServer.Stop() + + if errMTD := mysqld.MkTestDb(host); errMTD != nil { + t.Fatal(errMTD) + return + } + + ha := createTestingHA(t, client.Options().Addr, host) ha.isActive = true go ha.StartEventListener() - rdb := redis.NewClient(&redis.Options{ - Addr: os.Getenv("ICINGADB_TEST_REDIS"), - DialTimeout: time.Minute / 2, - ReadTimeout: time.Minute, - WriteTimeout: time.Minute, - }) - - rdb.Del("icinga:dump") + client.Del("icinga:dump") chHost := ha.RegisterNotificationListener("host") chService := ha.RegisterNotificationListener("service") @@ -247,11 +397,11 @@ func TestHA_EventListener(t *testing.T) { wg.Done() }() - rdb.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "done"}}) - rdb.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "wip"}}) - rdb.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "done"}}) - rdb.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "wip"}}) - rdb.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "service", "state": "done"}}) + client.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "done"}}) + client.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "host", "state": "wip"}}) + client.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "done"}}) + client.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "*", "state": "wip"}}) + client.XAdd(&redis.XAddArgs{Stream: "icinga:dump", Values: map[string]interface{}{"type": "service", "state": "done"}}) wg.Wait() } \ No newline at end of file