Make channels more specific

This commit is contained in:
Alexander A. Klimov 2021-03-15 16:34:58 +01:00
parent 6ccbc7d091
commit 4dffbad76e
3 changed files with 22 additions and 27 deletions

View file

@ -42,7 +42,7 @@ func main() {
s := icingadb.NewSync(db, rc, logger)
// For temporary exit after sync
done := make(chan interface{}, 0)
done := make(chan struct{}, 0)
// Main loop
for {

View file

@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"encoding/hex"
"errors"
"github.com/google/uuid"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
@ -26,9 +25,9 @@ type HA struct {
heartbeat *icingaredis.Heartbeat
logger *zap.SugaredLogger
responsible bool
handover chan interface{}
takeover chan interface{}
done chan interface{}
handover chan struct{}
takeover chan struct{}
done chan struct{}
mu *sync.Mutex
err error
errOnce sync.Once
@ -46,9 +45,9 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger
db: db,
heartbeat: heartbeat,
logger: logger,
handover: make(chan interface{}),
takeover: make(chan interface{}),
done: make(chan interface{}),
handover: make(chan struct{}),
takeover: make(chan struct{}),
done: make(chan struct{}),
mu: &sync.Mutex{},
}
@ -67,7 +66,7 @@ func (h *HA) Close() error {
return h.Err()
}
func (h *HA) Done() <-chan interface{} {
func (h *HA) Done() <-chan struct{} {
return h.done
}
@ -78,11 +77,11 @@ func (h *HA) Err() error {
return h.err
}
func (h *HA) Handover() chan interface{} {
func (h *HA) Handover() chan struct{} {
return h.handover
}
func (h *HA) Takeover() chan interface{} {
func (h *HA) Takeover() chan struct{} {
return h.takeover
}
@ -104,16 +103,12 @@ func (h *HA) controller() {
for {
select {
case b, ok := <-h.heartbeat.Beat():
case m, ok := <-h.heartbeat.Beat():
if !ok {
// Beat channel closed.
return
}
now := time.Now()
m, ok := b.(icingaredisv1.StatsMessage)
if !ok {
h.abort(errors.New("bad message"))
}
t, err := m.Time()
if err != nil {
h.abort(err)

View file

@ -19,9 +19,9 @@ type Heartbeat struct {
client *Client
logger *zap.SugaredLogger
active bool
beat chan interface{}
lost chan interface{}
done chan interface{}
beat chan v1.StatsMessage
lost chan struct{}
done chan struct{}
mu *sync.Mutex
err error
}
@ -34,9 +34,9 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger
cancel: cancel,
client: client,
logger: logger,
beat: make(chan interface{}),
lost: make(chan interface{}),
done: make(chan interface{}),
beat: make(chan v1.StatsMessage),
lost: make(chan struct{}),
done: make(chan struct{}),
mu: &sync.Mutex{},
}
@ -55,7 +55,7 @@ func (h Heartbeat) Close() error {
return h.Err()
}
func (h Heartbeat) Done() <-chan interface{} {
func (h Heartbeat) Done() <-chan struct{} {
return h.done
}
@ -66,17 +66,17 @@ func (h Heartbeat) Err() error {
return h.err
}
func (h Heartbeat) Beat() <-chan interface{} {
func (h Heartbeat) Beat() <-chan v1.StatsMessage {
return h.beat
}
func (h Heartbeat) Lost() <-chan interface{} {
func (h Heartbeat) Lost() <-chan struct{} {
return h.lost
}
// controller loop.
func (h Heartbeat) controller() {
messages := make(chan interface{})
messages := make(chan v1.StatsMessage)
defer close(messages)
g, ctx := errgroup.WithContext(h.ctx)
@ -95,7 +95,7 @@ func (h Heartbeat) controller() {
}
select {
case messages <- v1.StatsMessage(streams[0].Messages[0].Values):
case messages <- streams[0].Messages[0].Values:
case <-ctx.Done():
return ctx.Err()
}