• 高效I/O并发处理:双缓冲和Exchanger
  • 发布于 2个月前
  • 107 热度
    0 评论
  • 旧巷
  • 0 粉丝 23 篇博客
  •   
双缓冲(double buffering)是高效处理 I/O 操作的一种并发技术,它使用两个 buffer,一个 goroutine 使用其中一个 buffer 进行写,而另一个 goroutine 使用另一个 buffer 进行读,然后进行交换。这样两个 goroutine 可能并发的执行,减少它们之间的等待和阻塞。

本文还提供了一个类似 Java 的java.util.concurrent.Exchanger[1]的 Go 并发原语,它可以用来在两个 goroutine 之间交换数据,快速实现双缓冲的模式。 这个并发原语可以在github.com/smallnest/exp/sync/Exchanger[2]找到。

double buffering 并发模式
双缓冲(double buffering)设计方式虽然在一些领域中被广泛的应用,但是我还没有看到它在并发模式中专门列出了,或者专门列为一种模式。这里我们不妨把它称之为双缓存模式。

这是一种在 I/O 处理领域广泛使用的用来提速的编程技术,它使用两个缓冲区来加速计算机,该计算机可以重叠 I/O 和处理。一个缓冲区中的数据正在处理,而下一组数据被读入另一个缓冲区。 在流媒体应用程序中,一个缓冲区中的数据被发送到声音或图形卡,而另一个缓冲区则被来自源(Internet、本地服务器等)的更多数据填充。 当视频显示在屏幕上时,一个缓冲区中的数据被填充,而另一个缓冲区中的数据正在显示。当在缓冲区之间移动数据的功能是在硬件电路中实现的,而不是由软件执行时,全动态视频的速度会加快,不但速度被加快,而且可以减少黑屏闪烁的可能。

在这个模式中,两个 goroutine 并发的执行,一个 goroutine 使用一个 buffer 进行写(不妨称为 buffer1),而另一个 goroutine 使用另一个 buffer 进行读(不妨称为 buffer2)。如图所示。 当左边的 writer 写满它当前使用的 buffer1 后,它申请和右边的 goroutine 的 buffer2 进行交换,这会出现两种情况:
1.右边的 reader 已经读完了它当前使用的 buffer2,那么它会立即交换,这样左边的 writer 可以继续写 buffer2,而右边的 reader 可以继续读 buffer1。

2.右边的 reader 还没有读完 buffer2,那么左边的 writer 就会阻塞,直到右边的 reader 读完 buffer2,然后交换。 周而复始。


同样右边的 goroutine 也是同样的处理,当它读完 buffer2 后,它会申请和左边的 goroutine 的 buffer1 进行交换,这会出现两种情况:
1.左边的 writer 已经写完了它当前使用的 buffer1,那么它会立即交换,这样右边的 reader 可以继续读 buffer1,而左边的 writer 可以继续写 buffer2。

2.左边的 writer 还没有写完 buffer1,那么右边的 reader 就会阻塞,直到左边的 writer 写完 buffer1,然后交换。 周而复始。


这样两个 goroutine 就可以并发的执行,而不用等待对方的读写操作。这样可以提高并发处理的效率。

不仅仅如此, double buffering 其实可以应用于更多的场景, 不仅仅是 buffer 的场景,如 Java 的垃圾回收机制中,HotSpot JVM 把年轻代分为了三部分:1 个 Eden 区和 2 个 Survivor 区(分别叫 from 和 to,或者 s0 和 s1),在 GC 开始的时候,对象只会存在于 Eden 区和名为“From”的 Survivor 区,Survivor 区“To”是空的。紧接着进行 GC,Eden 区中所有存活的对象都会被复制到“To”,而在“From”区中,仍存活的对象会根据他们的年龄值来决定去向。年龄达到一定值的对象会被移动到年老代中,没有达到阈值的对象会被复制到“To”区域。经过这次 GC 后,Eden 区和 From 区已经被清空。这个时候,“From”和“To”会交换(exchange)他们的角色,也就是新的“To”就是上次 GC 前的“From”,新的“From”就是上次 GC 前的“To”。不管怎样,都会保证名为 To 的 Survivor 区域是空的。Minor GC 会一直重复这样的过程,直到“To”区被填满,“To”区被填满之后,会将所有对象移动到年老代中。

Exchanger 的实现
既然有这样的场景,有这样的需求,所以我们需要针对这样场景的一个同步原语。Java 给我们做了一个很好的师范,接下来我们使用实现相应的 Go,但是我们的实现和 Java 的实现完全不同,我们要基于 Go 既有的同步原语来实现。

基于 Java 实现的 Exchanger 的功能,我们也实现一个Exchanger, 我们期望它的功能如下:
1.只用作两个 goroutine 之间的数据交换,不支持多个 goroutine 之间的数据交换。
2.可以重用。交换完之后还可以继续交换
3.支持泛型,可以交换任意类型的数据
4.如果对端还没有准备交换,就阻塞等待

5.在交换完之前,阻塞的 goroutine 不可能调用Exchange方法两次


Go 内存模型补充: 同一次交换, 一个 goroutine 在调用Exchange方法的完成,一定happens after另一个 goroutine 调用Exchange方法的开始。
如果你非常熟悉 Go 的各种同步原语,你可以很快的组合出这样一个同步原语。如果你还不是那么熟悉,建议你阅读《深入理解 Go 并发编程》这本书,京东有售。 下面是一个简单的实现,代码在Exchanger[3]。 我们只用left、right指代这两个 goroutine, goroutine 是 Go 语言中的并发单元,我们期望的就是这两个 goroutine 发生关系。

为了跟踪这两个 goroutine,我们需要使用 goroutine id 来标记这两个 goroutine,这样避免了第三者插入。
type Exchanger[T any] struct {
 leftGoID, rightGoID int64
 left, right         chan T
}
你必须使用 NewExchanger 创建一个Exchanger,它会返回一个Exchanger的指针。 初始化的时候我们把 left 和 right 的 id 都设置为-1,表示还没有 goroutine 使用它们,并且不会和所有的 goroutine 的 id 冲突。 同时我们创建两个 channel,一个用来左边的 goroutine 写,右边的 goroutine 读,另一个用来右边的 goroutine 写,左边的 goroutine 读。channel 的 buffer 设置为 1,这样可以避免死锁。
func NewExchanger[T any]( "T any") *Exchanger[T] {
 return &Exchanger[T]{
  leftGoID:  -1,
  rightGoID: -1,
  left:      make(chan T, 1),
  right:     make(chan T, 1),
 }
}
Exchange方法是核心方法,它用来交换数据,它的实现如下:
func (e *Exchanger[T]) Exchange(value T) T {
 goid := goroutine.ID()

 // left goroutine
 isLeft := atomic.CompareAndSwapInt64(&e.leftGoID, -1, goid)
 if !isLeft {
  isLeft = atomic.LoadInt64(&e.leftGoID) == goid
 }
 if isLeft {
  e.right <- value // send value to right
  return <-e.left  // wait for value from right
 }

 // right goroutine
 isRight := atomic.CompareAndSwapInt64(&e.rightGoID, -1, goid)
 if !isRight {
  isRight = atomic.LoadInt64(&e.rightGoID) == goid
 }
 if isRight {
  e.left <- value  // send value to left
  return <-e.right // wait for value from left
 }

 // other goroutine
 panic("sync: exchange called from neither left nor right goroutine")
}
当一个 goroutine 调用的时候,首先我们尝试把它设置为left,如果成功,那么它就是left。 如果不成功,我们就判断它是不是先前已经是left,如果是,那么它就是left。 如果先前,或者此时left已经被另一个 goroutine 占用了,它还有机会成为right,同样的逻辑检查和设置right。如果既不是left也不是right,那么就是第三者插入了,我们需要 panic,因为我们不希望第三者插足。

如果它是left,那么它就会把数据发送到right,然后等待right发送数据过来。 如果它是right,那么它就会把数据发送到left,然后等待left发送数据过来。
这样就实现了数据的交换。

Exchanger 的使用
我们使用一个简单的双缓冲例子来说明如何使用Exchanger,我们创建两个 goroutine,一个 goroutine 负责写,另一个 goroutine 负责读,它们之间通过Exchanger来交换数据。
 buf1 := bytes.NewBuffer(make([]byte, 1024))
 buf2 := bytes.NewBuffer(make([]byte, 1024))
 // 堆代码 duidaima.com
 exchanger := syncx.NewExchanger[*bytes.Buffer]( "*bytes.Buffer")

 var wg sync.WaitGroup
 wg.Add(2)

 expect := 0
 go func() { // g1
  defer wg.Done()

  buf := buf1
  for i := 0; i < 10; i++ {
   for j := 0; j < 1024; j++ {
    buf.WriteByte(byte(j / 256))
    expect += j / 256
   }

   buf = exchanger.Exchange(buf)
  }
 }()

 var got int
 go func() { // g2
  defer wg.Done()

  buf := buf2
  for i := 0; i < 10; i++ {
   buf = exchanger.Exchange(buf)
   for _, b := range buf.Bytes() {
    got += int(b)
   }
   buf.Reset()
  }
 }()

 wg.Wait()

 fmt.Println(got)
 fmt.Println(expect == got)
在这个例子中 g1负责写,每个 buffer 的容量是 1024,写满就交给另外一个读 g2,并从读 g2 中交换过来一个空的 buffer 继续写。 交换 10 次之后,两个 goroutine 都退出了,我们检查写入的数据和读取的数据是否一致,如果一致,那么就说明我们的Exchanger实现是正确的。

总结
文本介绍了一种类似 Java 的Exchanger的同步原语的实现,这个同步原语可以在双缓冲的场景中使用,提高并发处理的性能。
用户评论