 闽公网安备 35020302035485号
                
                闽公网安备 35020302035485号
                


// A Locker represents an object that can be locked and unlocked.
type Locker interface {
     Lock()
     Unlock()
}
Ants的自旋锁是基于CAS机制和指数退避算法实现的一种自旋锁,主要利用了下面几个关键的点:• runtime.Gosched() 让当前goroutine让出CPU时间片
备注: Go语言中 sync/atomic包提供了底层的原子级内存操作,可实用CAS 函数(Compare And Swap)。
指数退避算法以指数方式重试请求,请求失败后重试间隔分别是 1、2、4 ...,2的n次方秒增加//实现Locker接口
type spinLock uint32
//最大回退次数
const maxBackoff = 16
// 加锁
func (sl *spinLock) Lock() {
    backoff := 1
    //基于CAS机制,尝试获取锁
    for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
        //执行backoff次 cpu让出时间片次数
        for i := 0; i < backoff; i++ {
            //使当前goroutine让出CPU时间片
            runtime.Gosched()
        }
        if backoff < maxBackoff {
            //左移后赋值 等于 backoff = backoff << 1
            //左移一位就是乘以 2的1次方
            backoff <<= 1
        }
    }
}
//堆代码 duidaima.com
//释放锁
func (sl *spinLock) Unlock() {
    atomic.StoreUint32((*uint32)(sl), 0)
}
Gosched()使当前goroutine程放弃处理器,以让其它goroutine运行,它不会挂起当前goroutine,因此当前goroutine未来会恢复执行。

type Pool struct {
    // 协程池容量 
    capacity int32
    // 当前协程池中正在运行的协程数
    running int32
    // ants 实现的自旋锁,用于同步并发操作
    lock sync.Locker
    // 存放一组Worker
    workers workerArray
    // 协程池状态 (1-关闭、0-开启)
    state int32
    // 并发协调器,用于阻塞模式下,挂起和唤醒等待资源的协程
    cond *sync.Cond
    // worker 对象池
    workerCache sync.Pool
    // 等待的协程数量
    waiting int32
    // 回收协程是否关闭
    heartbeatDone int32
    // 闭回收协程的控制器函数
    stopHeartbeat context.CancelFunc
    // 协程池的配置
    options *Options
}
这里对几个配置着重讲一下:type goWorker struct {
    //goWorker 所属的协程池
    pool *Pool
    //接收实际执行任务的管道
    task chan func()
    //goWorker 回收到协程池的时间
    recycleTime time.Time
}
WorkerArray
type workerArray interface {
 // worker 列表长度
 len() int 
 // 是否为空
 isEmpty() bool
 // 插入一个goworker
 insert(worker *goWorker) error 
 // 从WorkerArray获取可用的goworker
 detach() *goWorker 
 // 清理pool.workers中的过期goworker
 retrieveExpiry(duration time.Duration) []*goWorker  
 // 重置,清空WorkerArray中所有的goWorker
 reset() 
}

func NewPool(size int, options ...Option) (*Pool, error) {
    //读取一些自定义的配置
    opts := loadOptions(options...)
    ...
    // 创建 Pool 对象
    p := &Pool{
        capacity: int32(size),
        lock:     internal.NewSpinLock(),
        options:  opts,
    }
     // 指定 sync.Pool 创建 worker 的方法
    p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), workerChanCap),
        }
    }
    // 初始化Pool时是否进行内存预分配
    // 区分workerArray 的实现方式
    if p.options.PreAlloc {
        if size == -1 {
            return nil, ErrInvalidPreAllocSize
        }
        // 预先分配固定 Size 的池子
        p.workers = newWorkerArray(loopQueueType, size)
    } else {
        // 初始化不创建,运行时再创建
        p.workers = newWorkerArray(stackType, 0)
    }
    p.cond = sync.NewCond(p.lock)
    // 开启一个goroutine清理过期的 worker
    go p.purgePeriodically()
    return p, nil
}
workerChanCap:确定工作程序的通道是否应为缓冲通道,当获取给GOMAXPROCS设置的值等于1时表示单核执行,此时的通道是无缓冲通道,否则是有缓冲通道,且容量是1。这里讲的是默认未进行预分配,采用 workerStack 栈实现workerArray的初始化。func (p *Pool) purgePeriodically(ctx context.Context) {
    // ExpiryDuration 默认是1s
    heartbeat := time.NewTicker(p.options.ExpiryDuration)
    ...
    for {
        select {
        case <-heartbeat.C:
        case <-ctx.Done():
            return
        }
        // pool关闭
        if p.IsClosed() {
            break
        }
        // 从 workers 中获取过期的 worker
        p.lock.Lock()
        expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
        p.lock.Unlock()
        // 清理过期的worker
        for i := range expiredWorkers {
            expiredWorkers[i].task <- nil
            expiredWorkers[i] = nil
        }
        // 唤醒所有等待的线程
        if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
            p.cond.Broadcast()
        }
    }
}
清理流程如下:func (p *Pool) Submit(task func()) error {
    // pool是否关闭
    if p.IsClosed() {
        return ErrPoolClosed
    }
    var w *goWorker
    // 尝试获取空闲的goWorker
    if w = p.retrieveWorker(); w == nil {
        return ErrPoolOverload
    }
    // 发送到 goWorker的channel中
    w.task <- task
    return nil
}
获取可用goWorkfunc (p *Pool) retrieveWorker() (w *goWorker) {
    //创建一个新的goWorker,并执行
    spawnWorker := func() {
        //实例化 worker
        w = p.workerCache.Get().(*goWorker)
        // 运行
        w.run()
    }
    // 加锁
    p.lock.Lock()
    // 从workers 中取出一个 goWorker
    // workerStack 实现了p.workers的方法
    w = p.workers.detach()
    if w != nil { 
        p.lock.Unlock()
    // Pool容量大于正在工作的 goWorker 数量)
    //则调用 spawnWorker() 新建一个 goWorker
    } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
        
        p.lock.Unlock()
        spawnWorker()
    } else { 
          // options设置了非阻塞选项,直接返回 nil
          if p.options.Nonblocking {
            p.lock.Unlock()
            return
        }
    retry:
        //option设置了最大阻塞队列,当前阻塞等待的任务数量已经达设置上限,直接返回 nil
        if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
            p.lock.Unlock()
            return
        }
        ...
        var nw int
        //如果正在执行的worker数量为0时,则重新创建woker
        if nw = p.Running(); nw == 0 { 
            p.lock.Unlock()
            spawnWorker()
            return
        }
        //p.workers中获取可用的worker
        //执行开头创建的spawnWorker
        if w = p.workers.detach(); w == nil {
            if nw < p.Cap() {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }
        p.lock.Unlock()
    }
    return
}
看完注释后理一理retrieveWorker的执行逻辑:func (w *goWorker) run() {
    // pool的running 加 一
    w.pool.addRunning(1)
    go func() {
        defer func() {
            ...
            if p := recover(); p != nil {
                //处理捕获的panic
            }
            w.pool.cond.Signal()
        }()
        //任务执行
        for f := range w.task {
            if f == nil {
                return
            }
            f()
            //执行完后回收worker
            if ok := w.pool.revertWorker(w); !ok {
                return
            }
        }
    }()
}
goWorker放回poolfunc (p *Pool) revertWorker(worker *goWorker) bool {
    if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
        p.cond.Broadcast()
        return false
    }
    // 重置空闲计时器,用于判定过期
    worker.recycleTime = p.nowTime()
    p.lock.Lock()
    ...
    // 调用works的insert方法放回Pool
    err := p.workers.insert(worker)
    if err != nil {
        p.lock.Unlock()
        return false
    }
    // p.cond.Signal() 唤醒一个可能等待的线程
    p.cond.Signal()
    p.lock.Unlock()
    return true
}
文章比较长,看完需要耐心,不会你应该会学习到不少东西,觉得不错的话分享给你朋友吧!