Do not store ctx inside Cond and add Godoc

This commit is contained in:
Eric Lippmann 2021-07-15 14:06:21 +02:00
parent 2eb914a48c
commit b7ecfb9df2

View file

@ -2,81 +2,89 @@ package com
import (
"context"
"errors"
"github.com/pkg/errors"
)
var ErrCondClosed = errors.New("condition closed")
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
// Cond implements a channel-based synchronization for goroutines that wait for signals or send them.
// Internally based on a controller loop that handles the synchronization of new listeners and signal propagation,
// which is only started when NewCond is called. Thus the zero value cannot be used.
type Cond struct {
ctx context.Context
cancel context.CancelFunc
broadcast chan struct{}
done chan struct{}
cancel context.CancelFunc
listeners chan chan struct{}
}
// NewCond returns a new Cond and starts the controller loop.
func NewCond(ctx context.Context) *Cond {
done, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
c := &Cond{
broadcast: make(chan struct{}),
cancel: cancel,
ctx: done,
done: make(chan struct{}),
listeners: make(chan chan struct{}),
}
go c.controller()
go c.controller(ctx)
return c
}
// Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait.
// Panics if the controller loop has already ended.
func (c *Cond) Broadcast() {
select {
case c.broadcast <- struct{}{}:
case <-c.done:
panic(errors.New("condition closed"))
}
}
// Close stops the controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (c *Cond) Close() error {
c.cancel()
<-c.done
return nil
}
// Done returns a channel that will be closed when the controller loop has ended.
func (c *Cond) Done() <-chan struct{} {
return c.done
}
// Wait returns a channel that is closed with the next signal.
// Panics if the controller loop has already ended.
func (c *Cond) Wait() <-chan struct{} {
select {
case l := <-c.listeners:
return l
case <-c.done:
panic(errors.New("condition closed"))
}
}
// controller loop.
func (c *Cond) controller() {
func (c *Cond) controller(ctx context.Context) {
defer close(c.done)
// Note that the notify channel does not close when the controller loop ends
// in order not to notify pending listeners.
notify := make(chan struct{})
for {
select {
case <-c.broadcast:
// all current receivers get a closed channel
// Close channel to notify all current listeners.
close(notify)
// set up next batch of receivers.
// Create a new channel for the next listeners.
notify = make(chan struct{})
case c.listeners <- notify:
// great. A Receiver has our channel
case <-c.ctx.Done():
// A new listener received the channel.
case <-ctx.Done():
return
}
}
}
// Close implements the io.Closer interface.
func (c *Cond) Close() error {
c.cancel()
return nil
}
// Wait returns a channel on which the next (close) signal will be sent.
func (c *Cond) Wait() <-chan struct{} {
select {
case l := <-c.listeners:
return l
case <-c.ctx.Done():
panic(ErrCondClosed)
}
}
// Broadcast wakes all current listeners.
func (c *Cond) Broadcast() {
select {
case c.broadcast <- struct{}{}:
case <-c.ctx.Done():
panic(ErrCondClosed)
}
}
func (c *Cond) Done() <-chan struct{} {
return c.ctx.Done()
}