const preloadManger = (urls, maxCount = 5) => { let count = 0; // 计数 -- 用于控制并发数 const createTask = () => { if (count < maxCount) { const url = urls.pop(); // 从请求数组中取值 if (url) { // 无论请求是否成功,都要执行taskFinish loader(url).finally(taskFinish); // 添加下一个请求 count++; createTask(); } } }; const taskFinish = () => { count--; createTask(); }; createTask(); }; // 堆代码 duidaima.com // 进行异步请求 const loader = async (url) => { const res = await fetch(url).then(res=>res.json()); console.log("res",res); return res } const urls = []; for (let i = 1; i <= 20; i++) { urls.push(`https://jsonplaceholder.typicode.com/todos/${i}`); } preloadManger(urls, 5)请求状态:
5.结果数组results和urls数组的顺序保持一致,方便存取
// 并发请求函数 const concurrencyRequest = (urls, maxNum) => { return new Promise((resolve) => { if (urls.length === 0) { resolve([]); return; } const results = []; let index = 0; // 下一个请求的下标 let count = 0; // 当前请求完成的数量 // 发送请求 async function request() { if (index === urls.length) return; const i = index; // 保存序号,使result和urls相对应 const url = urls[index]; index++; console.log(url); try { const resp = await fetch(url); // resp 加入到results results[i] = resp; } catch (err) { // err 加入到results results[i] = err; } finally { count++; // 判断是否所有的请求都已完成 if (count === urls.length) { console.log('完成了'); resolve(results); } request(); } } // maxNum和urls.length取最小进行调用 const times = Math.min(maxNum, urls.length); for(let i = 0; i < times; i++) { request(); } }) }测试代码:
const urls = []; for (let i = 1; i <= 20; i++) { urls.push(`https://jsonplaceholder.typicode.com/todos/${i}`); } concurrencyRequest(urls, 5).then(res => { console.log(res); })请求结果:
import Queue from 'yocto-queue'; import {AsyncResource} from '#async_hooks'; export default function pLimit(concurrency) { // 判断这个参数是否是一个大于0的整数,如果不是就抛出一个错误 if ( !((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0) ) { throw new TypeError('Expected `concurrency` to be a number from 1 and up'); } // 创建队列 -- 用于存取请求 const queue = new Queue(); // 计数 let activeCount = 0; // 用来处理并发数的函数 const next = () => { activeCount--; if (queue.size > 0) { // queue.dequeue()可以理解为[].shift(),取出队列中的第一个任务,由于确定里面是一个函数,所以直接执行就可以了; queue.dequeue()(); } }; // run函数就是用来执行异步并发任务 const run = async (function_, resolve, arguments_) => { // activeCount加1,表示当前并发数加1 activeCount++; // 执行传入的异步函数,将结果赋值给result,注意:现在的result是一个处在pending状态的Promise const result = (async () => function_(...arguments_))(); // resolve函数就是enqueue函数中返回的Promise的resolve函数 resolve(result); // 等待result的状态发生改变,这里使用了try...catch,因为result可能会出现异常,所以需要捕获异常; try { await result; } catch {} next(); }; // 将run函数添加到请求队列中 const enqueue = (function_, resolve, arguments_) => { queue.enqueue( // 将run函数绑定到AsyncResource上,不需要立即执行,对此添加了一个bind方法 AsyncResource.bind(run.bind(undefined, function_, resolve, arguments_)), ); // 立即执行一个异步函数,等待下一个微任务(注意:因为activeCount是异步更新的,所以需要等待下一个微任务执行才能获取新的值) (async () => { // This function needs to wait until the next microtask before comparing // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously // when the run function is dequeued and called. The comparison in the if-statement // needs to happen asynchronously as well to get an up-to-date value for `activeCount`. await Promise.resolve(); // 判断activeCount是否小于concurrency,并且队列中有任务,如果满足条件就会将队列中的任务取出来执行 if (activeCount < concurrency && queue.size > 0) { // 注意:queue.dequeue()()执行的是run函数 queue.dequeue()(); } })(); }; // 接收一个函数fn和参数args,然后返回一个Promise,执行出队操作 const generator = (function_, ...arguments_) => new Promise(resolve => { enqueue(function_, resolve, arguments_); }); // 向外暴露当前的并发数和队列中的任务数,并且手动清空队列 Object.defineProperties(generator, { // 当前并发数 activeCount: { get: () => activeCount, }, // 队列中的任务数 pendingCount: { get: () => queue.size, }, // 清空队列 clearQueue: { value() { queue.clear(); }, }, }); return generator; }整个库只有短短71行代码,在代码中导入了yocto-queue库,它是一个微型的队列数据结构。
class PLimit { constructor(concurrency) { this.concurrency = concurrency; this.activeCount = 0; this.queue = []; return (fn, ...args) => { return new Promise(resolve => { this.enqueue(fn, resolve, args); }); } } enqueue(fn, resolve, args) { this.queue.push(this.run.bind(this, fn, resolve, args)); (async () => { await Promise.resolve(); if (this.activeCount < this.concurrency && this.queue.length > 0) { this.queue.shift()(); } })(); } async run(fn, resolve, args) { this.activeCount++; const result = (async () => fn(...args))(); resolve(result); try { await result; } catch { } this.next(); } next() { this.activeCount--; if (this.queue.length > 0) { this.queue.shift()(); } } }小结