• 基于Go语言实现的五种broadcaster方法
  • 发布于 2个月前
  • 181 热度
    0 评论
昨天看到 Jaana Dogan 创建了一个 broadcaster 的库, 话说美女 Jaana Dogan 又回到了 Google 了么。她的实现我们就当做 broadcaster 的第一个实现吧。什么是 broadcaster?就是村口的大喇叭,一播音,全村都知道了。Jaana Dogan 实现的这个 broadcaster 只有通知的功能,没有传递消息,也不能重用。我们就以这个库为基准,看看我们能够实现几种方式。

1、sync.Cond 实现
Jaana Dogan 使用sync.Cond实现。
package main

import (
 "sync"
)

type Broadcaster struct {
 mu       *sync.Mutex
 cond     *sync.Cond
 signaled bool
}

func NewBroadcaster() *Broadcaster {
 var mu sync.Mutex
 return &Broadcaster{
  mu:       &mu,
  cond:     sync.NewCond(&mu),
  signaled: false,
 }
}

func (b *Broadcaster) Go(fn func()) {
 go func() {
  b.cond.L.Lock()
  defer b.cond.L.Unlock()

  for !b.signaled {
   b.cond.Wait()
  }
  fn()
 }()
}

func (b *Broadcaster) Broadcast() {
 b.cond.L.Lock()
 b.signaled = true
 b.cond.L.Unlock()
 // 堆代码 duidaima.com
 b.cond.Broadcast()
}
sync.Cond是一个条件变量,可以用来实现广播通知。sync.Cond的Wait方法会阻塞当前所有调用的 goroutine,直到一个 goroutine 调用Broadcast方法。我们可以写为它写一个单元测试,其他四种也使用同样的单元测试代码,就不赘述了。
package main

import (
 "sync"
 "testing"
)

func TestNewBroadcaster(t *testing.T) {
 b := NewBroadcaster()

 var wg sync.WaitGroup
 wg.Add(2)

 b.Go(func() {
  t.Log("function 1 finished")
  wg.Done()
 })
 b.Go(func() {
  t.Log("function 2 finished")
  wg.Done()
 })

 b.Broadcast()

 wg.Wait()
}
我觉得直接使用sync.Cond也可以,只不过 Jaana Dogan 将它包装了一下,更方便使用。
需要阻塞等待通知的逻辑放在Go方法中

Broadcast方法用来通知所有等待的 goroutine


2、channel 实现
使用channle可以更简洁的实现。
package main

type Broadcaster struct {
 signal chan struct{}
}

func NewBroadcaster() *Broadcaster {
 return &Broadcaster{
  signal: make(chan struct{}),
 }
}

func (b *Broadcaster) Go(fn func()) {
 go func() {
  <-b.signal
  fn()
 }()
}

func (b *Broadcaster) Broadcast() {
 close(b.signal)
}
channel的特性是可以关闭,关闭后的channel会一直返回零值,所以我们可以使用close来通知所有等待的 goroutine。这是常常使用 channel 实现的一种通知机制。

3、context 实现
context是 Go1.7 引入的一个标准库,用来传递请求的上下文,可以用来实现广播通知。
package broadcaster

import (
 "context"
)

type Broadcaster struct {
 ctx    context.Context
 cancel context.CancelFunc
}

func NewBroadcaster() *Broadcaster {
 ctx, cancel := context.WithCancel(context.Background())
 return &Broadcaster{
  ctx:    ctx,
  cancel: cancel,
 }
}

func (b *Broadcaster) Go(fn func()) {
 go func() {
  <-b.ctx.Done()
  fn()
 }()
}

func (b *Broadcaster) Broadcast() {
 b.cancel()
}
平心而论,context也是一种不错的选择。

4、sync.WaitGroup 实现
sync.WaitGroup也可以实现广播通知。
package main

import (
 "sync"
 "sync/atomic"
)

type Broadcaster struct {
 done    int32
 trigger sync.WaitGroup
}

func NewBroadcaster() *Broadcaster {
 b := &Broadcaster{}
 b.trigger.Add(1)
 return b
}

func (b *Broadcaster) Go(fn func()) {
 go func() {
  if atomic.LoadInt32(&b.done) == 1 {
   return
  }

  b.trigger.Wait()
  fn()
 }()
}

func (b *Broadcaster) Broadcast() {
 if atomic.CompareAndSwapInt32(&b.done, 0, 1) {
  b.trigger.Done()
 }
}
sync.WaitGroup的Wait方法会阻塞等待Done方法的调用,所以我们可以使用WaitGroup来实现广播通知。一旦Wait被放行,所有阻塞在Wait的 goroutine 都会被放行。 所以一开始我们将 WaitGroup 的计数器设置为 1,然后调用Wait方法,一旦Broadcast方法被调用,计数器减 1,Wait方法放行。

5、sync.RWMutex 实现
sync.RWMutex也可以实现广播通知。
package main

import (
 "sync"
)

type Broadcaster struct {
 mu *sync.RWMutex
}

func NewBroadcaster() *Broadcaster {
 var mu sync.RWMutex
 mu.Lock()

 return &Broadcaster{mu: &mu}
}

func (b *Broadcaster) Go(fn func()) {
 go func() {
  b.mu.RLock()
  defer b.mu.RUnlock()
  fn()
 }()
}

func (b *Broadcaster) Broadcast() {
 b.mu.Unlock()
}

读写锁的特性是写锁会阻塞读锁,所以我们可以使用读写锁来实现广播通知。一开始我们将写锁锁住,然后调用Go方法请求读锁,肯定获取不到而阻塞,一旦Broadcast方法被调用,释放写锁,所有阻塞在读锁的 goroutine 都会被放行。

以上罗列了五种实现 broadcaster 的方法,你喜欢哪一种?你还有什么更好的实现方法吗?欢迎留言讨论。
用户评论