diff --git a/pkg/com/cond.go b/pkg/com/cond.go index 63ef1950..72ba347c 100644 --- a/pkg/com/cond.go +++ b/pkg/com/cond.go @@ -2,81 +2,89 @@ package com import ( "context" - "errors" + "github.com/pkg/errors" ) -var ErrCondClosed = errors.New("condition closed") - -// Cond implements a condition variable, a rendezvous point -// for goroutines waiting for or announcing the occurrence -// of an event. +// Cond implements a channel-based synchronization for goroutines that wait for signals or send them. +// Internally based on a controller loop that handles the synchronization of new listeners and signal propagation, +// which is only started when NewCond is called. Thus the zero value cannot be used. type Cond struct { - ctx context.Context - cancel context.CancelFunc broadcast chan struct{} + done chan struct{} + cancel context.CancelFunc listeners chan chan struct{} } +// NewCond returns a new Cond and starts the controller loop. func NewCond(ctx context.Context) *Cond { - done, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) c := &Cond{ broadcast: make(chan struct{}), cancel: cancel, - ctx: done, + done: make(chan struct{}), listeners: make(chan chan struct{}), } - go c.controller() + go c.controller(ctx) return c } +// Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait. +// Panics if the controller loop has already ended. +func (c *Cond) Broadcast() { + select { + case c.broadcast <- struct{}{}: + case <-c.done: + panic(errors.New("condition closed")) + } +} + +// Close stops the controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (c *Cond) Close() error { + c.cancel() + <-c.done + + return nil +} + +// Done returns a channel that will be closed when the controller loop has ended. +func (c *Cond) Done() <-chan struct{} { + return c.done +} + +// Wait returns a channel that is closed with the next signal. +// Panics if the controller loop has already ended. +func (c *Cond) Wait() <-chan struct{} { + select { + case l := <-c.listeners: + return l + case <-c.done: + panic(errors.New("condition closed")) + } +} + // controller loop. -func (c *Cond) controller() { +func (c *Cond) controller(ctx context.Context) { + defer close(c.done) + + // Note that the notify channel does not close when the controller loop ends + // in order not to notify pending listeners. notify := make(chan struct{}) for { select { case <-c.broadcast: - // all current receivers get a closed channel + // Close channel to notify all current listeners. close(notify) - // set up next batch of receivers. + // Create a new channel for the next listeners. notify = make(chan struct{}) case c.listeners <- notify: - // great. A Receiver has our channel - case <-c.ctx.Done(): + // A new listener received the channel. + case <-ctx.Done(): return } } } - -// Close implements the io.Closer interface. -func (c *Cond) Close() error { - c.cancel() - - return nil -} - -// Wait returns a channel on which the next (close) signal will be sent. -func (c *Cond) Wait() <-chan struct{} { - select { - case l := <-c.listeners: - return l - case <-c.ctx.Done(): - panic(ErrCondClosed) - } -} - -// Broadcast wakes all current listeners. -func (c *Cond) Broadcast() { - select { - case c.broadcast <- struct{}{}: - case <-c.ctx.Done(): - panic(ErrCondClosed) - } -} - -func (c *Cond) Done() <-chan struct{} { - return c.ctx.Done() -}