闽公网安备 35020302035485号
const preloadManger = (urls, maxCount = 5) => {
let count = 0; // 计数 -- 用于控制并发数
const createTask = () => {
if (count < maxCount) {
const url = urls.pop(); // 从请求数组中取值
if (url) {
// 无论请求是否成功,都要执行taskFinish
loader(url).finally(taskFinish);
// 添加下一个请求
count++;
createTask();
}
}
};
const taskFinish = () => {
count--;
createTask();
};
createTask();
};
// 堆代码 duidaima.com
// 进行异步请求
const loader = async (url) => {
const res = await fetch(url).then(res=>res.json());
console.log("res",res);
return res
}
const urls = [];
for (let i = 1; i <= 20; i++) {
urls.push(`https://jsonplaceholder.typicode.com/todos/${i}`);
}
preloadManger(urls, 5)
请求状态:

5.结果数组results和urls数组的顺序保持一致,方便存取
// 并发请求函数
const concurrencyRequest = (urls, maxNum) => {
return new Promise((resolve) => {
if (urls.length === 0) {
resolve([]);
return;
}
const results = [];
let index = 0; // 下一个请求的下标
let count = 0; // 当前请求完成的数量
// 发送请求
async function request() {
if (index === urls.length) return;
const i = index; // 保存序号,使result和urls相对应
const url = urls[index];
index++;
console.log(url);
try {
const resp = await fetch(url);
// resp 加入到results
results[i] = resp;
} catch (err) {
// err 加入到results
results[i] = err;
} finally {
count++;
// 判断是否所有的请求都已完成
if (count === urls.length) {
console.log('完成了');
resolve(results);
}
request();
}
}
// maxNum和urls.length取最小进行调用
const times = Math.min(maxNum, urls.length);
for(let i = 0; i < times; i++) {
request();
}
})
}
测试代码:const urls = [];
for (let i = 1; i <= 20; i++) {
urls.push(`https://jsonplaceholder.typicode.com/todos/${i}`);
}
concurrencyRequest(urls, 5).then(res => {
console.log(res);
})
请求结果:
import Queue from 'yocto-queue';
import {AsyncResource} from '#async_hooks';
export default function pLimit(concurrency) {
// 判断这个参数是否是一个大于0的整数,如果不是就抛出一个错误
if (
!((Number.isInteger(concurrency)
|| concurrency === Number.POSITIVE_INFINITY)
&& concurrency > 0)
) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
// 创建队列 -- 用于存取请求
const queue = new Queue();
// 计数
let activeCount = 0;
// 用来处理并发数的函数
const next = () => {
activeCount--;
if (queue.size > 0) {
// queue.dequeue()可以理解为[].shift(),取出队列中的第一个任务,由于确定里面是一个函数,所以直接执行就可以了;
queue.dequeue()();
}
};
// run函数就是用来执行异步并发任务
const run = async (function_, resolve, arguments_) => {
// activeCount加1,表示当前并发数加1
activeCount++;
// 执行传入的异步函数,将结果赋值给result,注意:现在的result是一个处在pending状态的Promise
const result = (async () => function_(...arguments_))();
// resolve函数就是enqueue函数中返回的Promise的resolve函数
resolve(result);
// 等待result的状态发生改变,这里使用了try...catch,因为result可能会出现异常,所以需要捕获异常;
try {
await result;
} catch {}
next();
};
// 将run函数添加到请求队列中
const enqueue = (function_, resolve, arguments_) => {
queue.enqueue(
// 将run函数绑定到AsyncResource上,不需要立即执行,对此添加了一个bind方法
AsyncResource.bind(run.bind(undefined, function_, resolve, arguments_)),
);
// 立即执行一个异步函数,等待下一个微任务(注意:因为activeCount是异步更新的,所以需要等待下一个微任务执行才能获取新的值)
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// when the run function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
await Promise.resolve();
// 判断activeCount是否小于concurrency,并且队列中有任务,如果满足条件就会将队列中的任务取出来执行
if (activeCount < concurrency && queue.size > 0) {
// 注意:queue.dequeue()()执行的是run函数
queue.dequeue()();
}
})();
};
// 接收一个函数fn和参数args,然后返回一个Promise,执行出队操作
const generator = (function_, ...arguments_) => new Promise(resolve => {
enqueue(function_, resolve, arguments_);
});
// 向外暴露当前的并发数和队列中的任务数,并且手动清空队列
Object.defineProperties(generator, {
// 当前并发数
activeCount: {
get: () => activeCount,
},
// 队列中的任务数
pendingCount: {
get: () => queue.size,
},
// 清空队列
clearQueue: {
value() {
queue.clear();
},
},
});
return generator;
}
整个库只有短短71行代码,在代码中导入了yocto-queue库,它是一个微型的队列数据结构。class PLimit {
constructor(concurrency) {
this.concurrency = concurrency;
this.activeCount = 0;
this.queue = [];
return (fn, ...args) => {
return new Promise(resolve => {
this.enqueue(fn, resolve, args);
});
}
}
enqueue(fn, resolve, args) {
this.queue.push(this.run.bind(this, fn, resolve, args));
(async () => {
await Promise.resolve();
if (this.activeCount < this.concurrency && this.queue.length > 0) {
this.queue.shift()();
}
})();
}
async run(fn, resolve, args) {
this.activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {
}
this.next();
}
next() {
this.activeCount--;
if (this.queue.length > 0) {
this.queue.shift()();
}
}
}
小结