Add Redis Pipeliner Wrapper

This commit is contained in:
Noah Hilverling 2019-03-12 12:48:47 +01:00
parent 48f438c7e6
commit 7f02a6a1b2
2 changed files with 54 additions and 0 deletions

View file

@ -65,6 +65,7 @@ type RedisClient interface {
HKeys(key string) *redis.StringSliceCmd
HGetAll(key string) *redis.StringStringMapCmd
TxPipelined(fn func(redis.Pipeliner) error) ([]redis.Cmder, error)
Pipeline() redis.Pipeliner
Subscribe(channels ...string) *redis.PubSub
}
@ -321,6 +322,12 @@ func (rdbw *RDBWrapper) TxPipelined(fn func(pipeliner redis.Pipeliner) error) ([
}
}
func (rdbw *RDBWrapper) Pipeline() PipelinerWrapper {
pipeliner := rdbw.Rdb.Pipeline()
plw := PipelinerWrapper{pipeliner: pipeliner, rdbw: rdbw}
return plw
}
func (rdbw *RDBWrapper) Subscribe() PubSubWrapper {
ps := rdbw.Rdb.Subscribe()
psw := PubSubWrapper{ps: ps, rdbw: rdbw}

47
redis_pipeliner.go Normal file
View file

@ -0,0 +1,47 @@
package icingadb_connection
import "github.com/go-redis/redis"
type PipelinerWrapper struct {
pipeliner redis.Pipeliner
rdbw *RDBWrapper
}
func (plw *PipelinerWrapper) Exec() ([]redis.Cmder, error) {
for {
if !plw.rdbw.IsConnected() {
plw.rdbw.WaitForConnection()
continue
}
cmder, err := plw.pipeliner.Exec()
if err != nil {
if !plw.rdbw.CheckConnection(false) {
continue
}
}
return cmder, err
}
}
func (plw *PipelinerWrapper) HMGet(key string, fields ...string) *redis.SliceCmd {
for {
if !plw.rdbw.IsConnected() {
plw.rdbw.WaitForConnection()
continue
}
cmd := plw.pipeliner.HMGet(key, fields...)
_, err := cmd.Result()
if err != nil {
if !plw.rdbw.CheckConnection(false) {
continue
}
}
return cmd
}
}