package main import ( "sync" ) type Broadcaster struct { mu *sync.Mutex cond *sync.Cond signaled bool } func NewBroadcaster() *Broadcaster { var mu sync.Mutex return &Broadcaster{ mu: &mu, cond: sync.NewCond(&mu), signaled: false, } } func (b *Broadcaster) Go(fn func()) { go func() { b.cond.L.Lock() defer b.cond.L.Unlock() for !b.signaled { b.cond.Wait() } fn() }() } func (b *Broadcaster) Broadcast() { b.cond.L.Lock() b.signaled = true b.cond.L.Unlock() // 堆代码 duidaima.com b.cond.Broadcast() }sync.Cond是一个条件变量,可以用来实现广播通知。sync.Cond的Wait方法会阻塞当前所有调用的 goroutine,直到一个 goroutine 调用Broadcast方法。我们可以写为它写一个单元测试,其他四种也使用同样的单元测试代码,就不赘述了。
package main import ( "sync" "testing" ) func TestNewBroadcaster(t *testing.T) { b := NewBroadcaster() var wg sync.WaitGroup wg.Add(2) b.Go(func() { t.Log("function 1 finished") wg.Done() }) b.Go(func() { t.Log("function 2 finished") wg.Done() }) b.Broadcast() wg.Wait() }我觉得直接使用sync.Cond也可以,只不过 Jaana Dogan 将它包装了一下,更方便使用。
Broadcast方法用来通知所有等待的 goroutine
package main type Broadcaster struct { signal chan struct{} } func NewBroadcaster() *Broadcaster { return &Broadcaster{ signal: make(chan struct{}), } } func (b *Broadcaster) Go(fn func()) { go func() { <-b.signal fn() }() } func (b *Broadcaster) Broadcast() { close(b.signal) }channel的特性是可以关闭,关闭后的channel会一直返回零值,所以我们可以使用close来通知所有等待的 goroutine。这是常常使用 channel 实现的一种通知机制。
package broadcaster import ( "context" ) type Broadcaster struct { ctx context.Context cancel context.CancelFunc } func NewBroadcaster() *Broadcaster { ctx, cancel := context.WithCancel(context.Background()) return &Broadcaster{ ctx: ctx, cancel: cancel, } } func (b *Broadcaster) Go(fn func()) { go func() { <-b.ctx.Done() fn() }() } func (b *Broadcaster) Broadcast() { b.cancel() }平心而论,context也是一种不错的选择。
package main import ( "sync" "sync/atomic" ) type Broadcaster struct { done int32 trigger sync.WaitGroup } func NewBroadcaster() *Broadcaster { b := &Broadcaster{} b.trigger.Add(1) return b } func (b *Broadcaster) Go(fn func()) { go func() { if atomic.LoadInt32(&b.done) == 1 { return } b.trigger.Wait() fn() }() } func (b *Broadcaster) Broadcast() { if atomic.CompareAndSwapInt32(&b.done, 0, 1) { b.trigger.Done() } }sync.WaitGroup的Wait方法会阻塞等待Done方法的调用,所以我们可以使用WaitGroup来实现广播通知。一旦Wait被放行,所有阻塞在Wait的 goroutine 都会被放行。 所以一开始我们将 WaitGroup 的计数器设置为 1,然后调用Wait方法,一旦Broadcast方法被调用,计数器减 1,Wait方法放行。
package main import ( "sync" ) type Broadcaster struct { mu *sync.RWMutex } func NewBroadcaster() *Broadcaster { var mu sync.RWMutex mu.Lock() return &Broadcaster{mu: &mu} } func (b *Broadcaster) Go(fn func()) { go func() { b.mu.RLock() defer b.mu.RUnlock() fn() }() } func (b *Broadcaster) Broadcast() { b.mu.Unlock() }