• sync/atomic 设计与实现
  • 发布于 2个月前
  • 265 热度
    0 评论
概述
atomic 提供了原子同步操作原语,整个过程无需加锁,也不会产生 goroutine 上下文切换。
API
atomic API
Swap 操作由 SwapT 系列函数 (例如 SwapInt32, SwapInt64) 实现,等价于如下的原子操作:
old = *addr
*addr = new
return old
CAS 操作由 CompareAndSwapT 系列函数 (例如 CompareAndSwapInt32, CompareAndSwapInt64) 实现,等价于如下的原子操作:
if *addr == old {
    *addr = new
    return true
}
return false
Add 操作由 AddT 系列函数 (例如 AddInt32, AddInt64) 实现,等价于如下的原子操作:
*addr += delta
return *addr
Load And Store 操作由 LoadT and StoreT 系列函数 (例如 LoadInt32, LoadInt64) 实现,等价于如下的原子操作:
*addr = val

内部实现
我们来探究一下 sync/atomic 包的内部实现,文件目录路径为 $GOROOT/src/sync/atomic,笔者的 Go 版本为 go1.19 linux/amd64。需要注意的是,该文件内只给出了函数的定义,函数的实现为在对应的 asm.s 汇编文件中。

函数声明
Swap, CAS 等操作的函数原型全部定义在 doc.go 文件中。
// 堆代码 duidaima.com
// 在 386 上, 64 位函数使用的指令在 Pentium MMX 之前不可用
// 在非 Linux ARM 上,64 位函数使用的指令在 ARMv6k core 之前不可使用
// 在 ARM 上,386 和 32 位 MIPS,调用方负责以原子的方式访问 64 位对齐的 64 位字
// 全局变量、局部变量、已经分配的结构体、数组、切片中的第一个字可以依赖于 64 位对齐
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
...

func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
函数实现
Swap, CAS 等操作的函数实现全部在 asm.s 汇编文件中,但是该文件中的函数并不是直接实现,而是 “套了一层壳”, 最终的实现在平台体系结构对应的汇编文件中,目录为$GOROOT/src/runtime/internal/atomic。例如: 笔者的 Go 版本为 go1.19 linux/amd64, 那么就会跳转到 runtime/internal/atomic/atomic_amd64.s 文件对应的函数。

这里以 ·SwapInt32 汇编函数为例,来看下函数的跳转和实现。
# asm.s 文件
TEXT ·SwapInt32(SB),NOSPLIT,$0
 JMP runtime∕internal∕atomic·Xchg(SB)
函数的实现中使用了 JMP 跳转指令,最终跳转到了文件 $GOROOT/src/runtime/internal/atomic/atomic_amd64.s 中的 ·Xchg 函数。
// atomic_amd64.s 文件

TEXT ·Xchg(SB), NOSPLIT, $0-20
 MOVQ ptr+0(FP), BX   // 参数 1,8 字节 *int32 指针
 MOVL new+8(FP), AX   // 参数 2,4 字节 int32
 XCHGL AX, 0(BX)       // 交换指令
 MOVL AX, ret+16(FP)  // 交换后的 AX(old value) 写入 FP 伪寄存器返回值位
 RET

// 上面的汇编代码等价于如下 Go 代码
// uint32 Xchg(ptr *uint32, new uint32)
// Atomically:
// old := *ptr
// *ptr = new
// return old

atomic.Value
接下来看一下原子数据类型 atomic.Value 的内部实现。

Value 对象
Value 数据类型提供了一致性原子性的 Swap, CAS 等方法,其中:
Load 方法调用时,Value 的零值返回 nil
Store 方法一旦被调用,Value 对象就不能再复制
Value 对象一旦使用后,就不能再复制
type Value struct {
    v any
}
ifaceWords 对象
ifaceWords 对象是 Value 对象对应的 数据类型 + 值 的内部抽象表示,虽然 Value 对象相关方法的参数类型是 any,但是内部操作的都是 ifaceWords 对象 (通过类型转换机制)。
type ifaceWords struct {
 typ  unsafe.Pointer // 类型
 data unsafe.Pointer // 值
}
Value.Load 方法
Load 返回最新设置的值,如果未对该值调用过 Store 方法 (Value 处于零值状态),返回 nil。
func (v *Value) Load() (val any) {
 // 类型转化
 vp := (*ifaceWords)(unsafe.Pointer(v))
 // 原子获取值类型
 typ := LoadPointer(&vp.typ)
    // 从未调用过 Store 方法
 if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {
  return nil
 }
 
 // 原子获取值
 data := LoadPointer(&vp.data)
 // 返回值类型转换
 vlp := (*ifaceWords)(unsafe.Pointer(&val))
 // 返回值类型
 vlp.typ = typ
 // 返回值数据值
 vlp.data = data
 return
}
Value.Store 方法
Store 方法设置 Value 对象的值,对于某个特定的 Value 对象,每次调用 Store 方法时都必须使用相同的数据类型值 (例如第一次存储的是 int 类型, 那么之后调用时必须传递 int 类型), 类型不一致时会产生 panic, 传递 nil 参数 ( Store(nil) ) 同样会产生 panic。
func (v *Value) Store(val any) {
 if val == nil {
  // 参数为 nil, 直接 panic
  panic("sync/atomic: store of nil value into Value")
 }
 
 // 参数类型转换
 vp := (*ifaceWords)(unsafe.Pointer(v))
 // 返回值类型转换
 vlp := (*ifaceWords)(unsafe.Pointer(&val))
 
 for {
  // 原子获取参数值数据类型
  typ := LoadPointer(&vp.typ)
  
  if typ == nil {
   // 如果类型为 nil, 尝试开始第一次 Store
   // 禁止抢占(避免操作未完成时,被其他 goroutine 抢占),其他 goroutine 可以使用自旋锁来等待完成
   // 同时可以避免 GC 的时候看到 unsafe.Pointer(^uintptr(0)) 这个中间状态的值
   
   runtime_procPin() // 禁止抢占 (具体的实现这里先忽略)
   
   if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
    // 如果设置值的类型时失败
    // 说明当前 goroutine 在禁止抢占执行结束前,已经有其他 goroutine 设置完了值类型
    runtime_procUnpin() // 恢复抢占
    continue
   }
   
   // 完成第一次 Store 操作
   StorePointer(&vp.data, vlp.data)
   StorePointer(&vp.typ, vlp.typ)
   
   runtime_procUnpin() // 恢复抢占 (具体的实现这里先忽略)
   return
  }
  
  if typ == unsafe.Pointer(&firstStoreInProgress) {
   // 第一次 Store 操作正在执行中,等待...
   // [待优化项] 因为在第一次 Store 禁用了抢占,所以可以使用自旋锁等待完成
   continue
  }
  
  // 已经完成了第一次 Store 操作, 检查类型并覆盖数据(检查是否和第一次设置的数据类型一致)
  if typ != vlp.typ {
   // 当前数据类型和第一次设置的数据类型不一致,产生 panic
   panic("sync/atomic: store of inconsistently typed value into Value")
  }
  
  StorePointer(&vp.data, vlp.data)
  return
 }
}
Value.Swap 方法
Swap 方法将新值存储到 Value 对象,并返回 Value 对象的旧值,如果 Value 为空, 则返回 nil。对于某个特定的 Value 对象,每次调用 Swap 方法时都必须使用相同的数据类型值 (例如第一次存储的是 int 类型, 那么之后调用时必须传递 int 类型), 类型不一致时会产生 panic, 传递 nil 参数 ( Swap(nil) ) 同样会产生 panic。
func (v *Value) Swap(new any) (old any) {
 if new == nil {
  // 参数为 nil, 直接 panic
  panic("sync/atomic: swap of nil value into Value")
 }
 
 // 当前值类型转换
 vp := (*ifaceWords)(unsafe.Pointer(v))
 // 参数类型转换
 np := (*ifaceWords)(unsafe.Pointer(&new))
 
 for {
  // 原子获取当前值数据类型
  typ := LoadPointer(&vp.typ)
  
  if typ == nil {
     // Value 对象还没有被设置
     // 尝试开始第一次 Store 操作
   
     // 流程和 Store 方法内部类型,这里直接省略掉
     ...

   return nil
  }
  
  ...
  
  // 已经完成了第一次 Store 操作, 检查类型并覆盖数据(检查是否和第一次设置的数据类型一致)
  if typ != np.typ {
   // 当前数据类型和第一次设置的数据类型不一致,产生 panic
   panic("sync/atomic: swap of inconsistently typed value into Value")
  }
  
  // 返回值类型转换
  op := (*ifaceWords)(unsafe.Pointer(&old))
  op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data)
  return old
 }
}
Value.CompareAndSwap 方法
CompareAndSwap 执行 CAS 操作,对于某个特定的 Value 对象,每次调用 CompareAndSwap 方法时都必须使用相同的数据类型值 (例如第一次存储的是 int 类型, 那么之后调用时必须传递 int 类型), 类型不一致时会产生 panic, 传递 nil 参数 ( CompareAndSwap(old, nil) ) 同样会产生 panic。
func (v *Value) CompareAndSwap(old, new any) (swapped bool) {
 if new == nil {
  // 参数为 nil, 直接 panic
  panic("sync/atomic: compare and swap of nil value into Value")
 }
 
 // 当前值类型转换
 vp := (*ifaceWords)(unsafe.Pointer(v))
 // 参数新值类型转换
 np := (*ifaceWords)(unsafe.Pointer(&new))
 // 参数旧值类型转换
 op := (*ifaceWords)(unsafe.Pointer(&old))
 
 if op.typ != nil && np.typ != op.typ {
  // 新值和旧值数据类型不一致
  panic("sync/atomic: compare and swap of inconsistently typed values")
 }
 
 for {
  // 原子获取当前值数据类型
  typ := LoadPointer(&vp.typ)
  if typ == nil {
   if old != nil {
    // 当前值和参数值类型不一样
    return false
   }

    // 流程和 Store 方法内部类型,这里直接省略掉
   ...
   
   return true
  }
  
  ...
  
  if typ != np.typ {
   // 参数新值数据类型和当前值的数据类型不一致,产生 panic
   panic("sync/atomic: compare and swap of inconsistently typed value into Value")
  }
  
  // CompareAndSwapPointer 函数只能确保 vp.data 从获取到之后没有发生变化
  data := LoadPointer(&vp.data)
  
  // 拷贝当前值的变量,然后和参数旧值进行比较 
  var i any
  // 当前值类型
  (*ifaceWords)(unsafe.Pointer(&i)).typ = typ
  // 当前值数据
  (*ifaceWords)(unsafe.Pointer(&i)).data = data
  if i != old {
   // 当前值已经发生变化
   return false
  }
  
  // 调用 CAS 操作 (内部再调用对应的汇编)
  return CompareAndSwapPointer(&vp.data, data, np.data)
 }
}

小结
atomic 实现了同步算法的底层内存原子操作原语,其内部实现主要是汇编 (各个平台对应各自不同的指令,通过编译器链接),使用这些函数时需要更加谨慎, 官方给出的建议是除了特殊的、底层的应用程序外,其他情况最好使用 channel 或其他同步原语来完成 (但是从大多数开源组件实现代码来看,并没有遵守官方的建议)。

标准库还提供了 atomic.Value 原子数据类型,并且为该类型实现了常见的原子操作,如 Load, Store, CAS 等, 在实现类似 对象需要原子操作 这样的功能时可以直接复用该类型,例如标准库 context 包的内部实现中里面就用到了 atomic.Value。
用户评论