闽公网安备 35020302035485号
package main
import (
"sync"
)
type Broadcaster struct {
mu *sync.Mutex
cond *sync.Cond
signaled bool
}
func NewBroadcaster() *Broadcaster {
var mu sync.Mutex
return &Broadcaster{
mu: &mu,
cond: sync.NewCond(&mu),
signaled: false,
}
}
func (b *Broadcaster) Go(fn func()) {
go func() {
b.cond.L.Lock()
defer b.cond.L.Unlock()
for !b.signaled {
b.cond.Wait()
}
fn()
}()
}
func (b *Broadcaster) Broadcast() {
b.cond.L.Lock()
b.signaled = true
b.cond.L.Unlock()
// 堆代码 duidaima.com
b.cond.Broadcast()
}
sync.Cond是一个条件变量,可以用来实现广播通知。sync.Cond的Wait方法会阻塞当前所有调用的 goroutine,直到一个 goroutine 调用Broadcast方法。我们可以写为它写一个单元测试,其他四种也使用同样的单元测试代码,就不赘述了。package main
import (
"sync"
"testing"
)
func TestNewBroadcaster(t *testing.T) {
b := NewBroadcaster()
var wg sync.WaitGroup
wg.Add(2)
b.Go(func() {
t.Log("function 1 finished")
wg.Done()
})
b.Go(func() {
t.Log("function 2 finished")
wg.Done()
})
b.Broadcast()
wg.Wait()
}
我觉得直接使用sync.Cond也可以,只不过 Jaana Dogan 将它包装了一下,更方便使用。Broadcast方法用来通知所有等待的 goroutine
package main
type Broadcaster struct {
signal chan struct{}
}
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
signal: make(chan struct{}),
}
}
func (b *Broadcaster) Go(fn func()) {
go func() {
<-b.signal
fn()
}()
}
func (b *Broadcaster) Broadcast() {
close(b.signal)
}
channel的特性是可以关闭,关闭后的channel会一直返回零值,所以我们可以使用close来通知所有等待的 goroutine。这是常常使用 channel 实现的一种通知机制。package broadcaster
import (
"context"
)
type Broadcaster struct {
ctx context.Context
cancel context.CancelFunc
}
func NewBroadcaster() *Broadcaster {
ctx, cancel := context.WithCancel(context.Background())
return &Broadcaster{
ctx: ctx,
cancel: cancel,
}
}
func (b *Broadcaster) Go(fn func()) {
go func() {
<-b.ctx.Done()
fn()
}()
}
func (b *Broadcaster) Broadcast() {
b.cancel()
}
平心而论,context也是一种不错的选择。package main
import (
"sync"
"sync/atomic"
)
type Broadcaster struct {
done int32
trigger sync.WaitGroup
}
func NewBroadcaster() *Broadcaster {
b := &Broadcaster{}
b.trigger.Add(1)
return b
}
func (b *Broadcaster) Go(fn func()) {
go func() {
if atomic.LoadInt32(&b.done) == 1 {
return
}
b.trigger.Wait()
fn()
}()
}
func (b *Broadcaster) Broadcast() {
if atomic.CompareAndSwapInt32(&b.done, 0, 1) {
b.trigger.Done()
}
}
sync.WaitGroup的Wait方法会阻塞等待Done方法的调用,所以我们可以使用WaitGroup来实现广播通知。一旦Wait被放行,所有阻塞在Wait的 goroutine 都会被放行。 所以一开始我们将 WaitGroup 的计数器设置为 1,然后调用Wait方法,一旦Broadcast方法被调用,计数器减 1,Wait方法放行。package main
import (
"sync"
)
type Broadcaster struct {
mu *sync.RWMutex
}
func NewBroadcaster() *Broadcaster {
var mu sync.RWMutex
mu.Lock()
return &Broadcaster{mu: &mu}
}
func (b *Broadcaster) Go(fn func()) {
go func() {
b.mu.RLock()
defer b.mu.RUnlock()
fn()
}()
}
func (b *Broadcaster) Broadcast() {
b.mu.Unlock()
}