diff --git a/redis.go b/redis.go index 3aa42c5b..a3540ea1 100644 --- a/redis.go +++ b/redis.go @@ -272,3 +272,9 @@ func (rdbw *RDBWrapper) TxPipelined(fn func(pipeliner redis.Pipeliner) error) ([ return c, e } } + +func (rdbw *RDBWrapper) Subscribe() PubSubWrapper { + ps := rdbw.Rdb.Subscribe() + psw := PubSubWrapper{ps: ps, rdbw: rdbw} + return psw +} \ No newline at end of file diff --git a/redis_pubsub.go b/redis_pubsub.go new file mode 100644 index 00000000..1cedb303 --- /dev/null +++ b/redis_pubsub.go @@ -0,0 +1,67 @@ +package icingadb_connection + +import ( + "github.com/go-redis/redis" +) + +type PubSubWrapper struct { + ps *redis.PubSub + rdbw *RDBWrapper +} + +func (psw *PubSubWrapper) Subscribe(channels ...string) error { + for { + if !psw.rdbw.IsConnected() { + psw.rdbw.WaitForConnection() + continue + } + + err := psw.ps.Subscribe(channels...) + + if err != nil { + if !psw.rdbw.CheckConnection(false) { + continue + } + } + + return err + } +} + +func (psw *PubSubWrapper) ReceiveMessage() (*redis.Message, error) { + for { + if !psw.rdbw.IsConnected() { + psw.rdbw.WaitForConnection() + continue + } + + msg, err := psw.ps.ReceiveMessage() + + if err != nil { + if !psw.rdbw.CheckConnection(false) { + continue + } + } + + return msg, err + } +} + +func (psw *PubSubWrapper) Close() error { + for { + if !psw.rdbw.IsConnected() { + psw.rdbw.WaitForConnection() + continue + } + + err := psw.ps.Close() + + if err != nil { + if !psw.rdbw.CheckConnection(false) { + continue + } + } + + return err + } +} \ No newline at end of file