// A Locker represents an object that can be locked and unlocked. type Locker interface { Lock() Unlock() }Ants的自旋锁是基于CAS机制和指数退避算法实现的一种自旋锁,主要利用了下面几个关键的点:
• runtime.Gosched() 让当前goroutine让出CPU时间片
备注: Go语言中 sync/atomic包提供了底层的原子级内存操作,可实用CAS 函数(Compare And Swap)。
指数退避算法以指数方式重试请求,请求失败后重试间隔分别是 1、2、4 ...,2的n次方秒增加//实现Locker接口 type spinLock uint32 //最大回退次数 const maxBackoff = 16 // 加锁 func (sl *spinLock) Lock() { backoff := 1 //基于CAS机制,尝试获取锁 for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) { //执行backoff次 cpu让出时间片次数 for i := 0; i < backoff; i++ { //使当前goroutine让出CPU时间片 runtime.Gosched() } if backoff < maxBackoff { //左移后赋值 等于 backoff = backoff << 1 //左移一位就是乘以 2的1次方 backoff <<= 1 } } } //堆代码 duidaima.com //释放锁 func (sl *spinLock) Unlock() { atomic.StoreUint32((*uint32)(sl), 0) }Gosched()使当前goroutine程放弃处理器,以让其它goroutine运行,它不会挂起当前goroutine,因此当前goroutine未来会恢复执行。
type Pool struct { // 协程池容量 capacity int32 // 当前协程池中正在运行的协程数 running int32 // ants 实现的自旋锁,用于同步并发操作 lock sync.Locker // 存放一组Worker workers workerArray // 协程池状态 (1-关闭、0-开启) state int32 // 并发协调器,用于阻塞模式下,挂起和唤醒等待资源的协程 cond *sync.Cond // worker 对象池 workerCache sync.Pool // 等待的协程数量 waiting int32 // 回收协程是否关闭 heartbeatDone int32 // 闭回收协程的控制器函数 stopHeartbeat context.CancelFunc // 协程池的配置 options *Options }这里对几个配置着重讲一下:
type goWorker struct { //goWorker 所属的协程池 pool *Pool //接收实际执行任务的管道 task chan func() //goWorker 回收到协程池的时间 recycleTime time.Time }WorkerArray
type workerArray interface { // worker 列表长度 len() int // 是否为空 isEmpty() bool // 插入一个goworker insert(worker *goWorker) error // 从WorkerArray获取可用的goworker detach() *goWorker // 清理pool.workers中的过期goworker retrieveExpiry(duration time.Duration) []*goWorker // 重置,清空WorkerArray中所有的goWorker reset() }
func NewPool(size int, options ...Option) (*Pool, error) { //读取一些自定义的配置 opts := loadOptions(options...) ... // 创建 Pool 对象 p := &Pool{ capacity: int32(size), lock: internal.NewSpinLock(), options: opts, } // 指定 sync.Pool 创建 worker 的方法 p.workerCache.New = func() interface{} { return &goWorker{ pool: p, task: make(chan func(), workerChanCap), } } // 初始化Pool时是否进行内存预分配 // 区分workerArray 的实现方式 if p.options.PreAlloc { if size == -1 { return nil, ErrInvalidPreAllocSize } // 预先分配固定 Size 的池子 p.workers = newWorkerArray(loopQueueType, size) } else { // 初始化不创建,运行时再创建 p.workers = newWorkerArray(stackType, 0) } p.cond = sync.NewCond(p.lock) // 开启一个goroutine清理过期的 worker go p.purgePeriodically() return p, nil }workerChanCap:确定工作程序的通道是否应为缓冲通道,当获取给GOMAXPROCS设置的值等于1时表示单核执行,此时的通道是无缓冲通道,否则是有缓冲通道,且容量是1。这里讲的是默认未进行预分配,采用 workerStack 栈实现workerArray的初始化。
func (p *Pool) purgePeriodically(ctx context.Context) { // ExpiryDuration 默认是1s heartbeat := time.NewTicker(p.options.ExpiryDuration) ... for { select { case <-heartbeat.C: case <-ctx.Done(): return } // pool关闭 if p.IsClosed() { break } // 从 workers 中获取过期的 worker p.lock.Lock() expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) p.lock.Unlock() // 清理过期的worker for i := range expiredWorkers { expiredWorkers[i].task <- nil expiredWorkers[i] = nil } // 唤醒所有等待的线程 if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) { p.cond.Broadcast() } } }清理流程如下:
func (p *Pool) Submit(task func()) error { // pool是否关闭 if p.IsClosed() { return ErrPoolClosed } var w *goWorker // 尝试获取空闲的goWorker if w = p.retrieveWorker(); w == nil { return ErrPoolOverload } // 发送到 goWorker的channel中 w.task <- task return nil }获取可用goWork
func (p *Pool) retrieveWorker() (w *goWorker) { //创建一个新的goWorker,并执行 spawnWorker := func() { //实例化 worker w = p.workerCache.Get().(*goWorker) // 运行 w.run() } // 加锁 p.lock.Lock() // 从workers 中取出一个 goWorker // workerStack 实现了p.workers的方法 w = p.workers.detach() if w != nil { p.lock.Unlock() // Pool容量大于正在工作的 goWorker 数量) //则调用 spawnWorker() 新建一个 goWorker } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { p.lock.Unlock() spawnWorker() } else { // options设置了非阻塞选项,直接返回 nil if p.options.Nonblocking { p.lock.Unlock() return } retry: //option设置了最大阻塞队列,当前阻塞等待的任务数量已经达设置上限,直接返回 nil if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { p.lock.Unlock() return } ... var nw int //如果正在执行的worker数量为0时,则重新创建woker if nw = p.Running(); nw == 0 { p.lock.Unlock() spawnWorker() return } //p.workers中获取可用的worker //执行开头创建的spawnWorker if w = p.workers.detach(); w == nil { if nw < p.Cap() { p.lock.Unlock() spawnWorker() return } goto retry } p.lock.Unlock() } return }看完注释后理一理retrieveWorker的执行逻辑:
func (w *goWorker) run() { // pool的running 加 一 w.pool.addRunning(1) go func() { defer func() { ... if p := recover(); p != nil { //处理捕获的panic } w.pool.cond.Signal() }() //任务执行 for f := range w.task { if f == nil { return } f() //执行完后回收worker if ok := w.pool.revertWorker(w); !ok { return } } }() }goWorker放回pool
func (p *Pool) revertWorker(worker *goWorker) bool { if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { p.cond.Broadcast() return false } // 重置空闲计时器,用于判定过期 worker.recycleTime = p.nowTime() p.lock.Lock() ... // 调用works的insert方法放回Pool err := p.workers.insert(worker) if err != nil { p.lock.Unlock() return false } // p.cond.Signal() 唤醒一个可能等待的线程 p.cond.Signal() p.lock.Unlock() return true }文章比较长,看完需要耐心,不会你应该会学习到不少东西,觉得不错的话分享给你朋友吧!