前端与后端交互通常是靠HTTP请求,即GET请求和POST请求,向后端传递参数并获取后端的返回数据。然而,考虑到后端服务器压力,前端通常不会无限制地发起请求,也就是需要做并发限制,限制同一时刻能发起多少个请求;此外考虑到不同请求之间存在优先级关系,如各页面的首屏数据的请求通常优先级最高,需要尽早发起,而如AB实验、不重要数据的请求优先级较低,可以推迟发起。
// 堆代码 duidaima.com // context.ts type MethodType = 'GET' | 'POST'; export type RequestType = { // 请求类型 url: string, // 请求url method: MethodType, // 请求的method params: Record<string, any>, // 请求参数 priority?: number, // 请求优先级 [key: string]: any // 其他属性,如header等 }; export type ResponseType = Record<string, any>; // 响应类型 export default class Context { public url: string; public method: MethodType; public request: RequestType; public response: ResponseType; constructor(url?: string, method: MethodType = 'GET', request: RequestType = null, response: ResponseType = null) { this.url = url; this.method = method; this.response = response; this.request = request; } }
context主要包括request和response属性,其中request属性在发起请求前就可以知道,而response属性则需要在服务端响应后才能获得。
// task.ts import Context from "./context"; export type AsyncCallback = () => Promise<Context>; export type ResolveCallback = (value: Context) => void; export type RejectCallback = (err: Error) => void; let id = 0; export default class Task { private asyncCallback: AsyncCallback; // 封装的请求函数 private resolveCallbacks: ResolveCallback[] = []; // 当asyncCallback有结果时触发 private rejectCallbacks: RejectCallback[] = []; // 当asyncCallback出错时触发 private id: number; // 任务id public priority: number; // 任务优先级 public context: Context; constructor(callback: AsyncCallback, resolve: ResolveCallback, reject: RejectCallback, priority: number = 0, context: Context) { this.asyncCallback = callback; this.resolveCallbacks.push(resolve); this.rejectCallbacks.push(reject); this.priority = priority; this.id = Date.now() + (++id); this.context = context; } }其中,asyncCallback是需要调度的请求函数,resolveCallbacks和rejectCallbacks为观察者队列,当调用asyncCallback函数返回结果时触发;priority为任务的优先级,数值越大表示优先级越高,而id为任务的编号。执行该task也很简单,其run函数定义如下:
export default class Task { // Task.prototype.run public run() { // 运行任务,即运行asyncCallback得到返回结果 return new Promise((resolve, reject) => { try { this.asyncCallback().then(value => { this.resolveCallbacks.forEach(callback => callback(value)); resolve(value); }, error => { this.rejectCallbacks.forEach(callback => callback(error)); reject(error); }) } catch (e) { this.rejectCallbacks.forEach(callback => callback(e)); reject(e); } }) } }
即执行asyncCallback,由于该函数返回一个Promise,故需要then一下拿到结果并触发resolveCallbacks中的所有回调函数;而函数调用出错时(then的第二个参数)则触发rejectCallbacks中的所有回调函数。
// taskQueue.ts import Task from "./task"; let runningQueue: Task[]; // running队列,存储运行中的task,有容量限制 let waitingQueue: Task[]; // waiting队列,存储待执行的task let capacity: number; // 容量 // 初始化函数 export function initializer(cap: number) { capacity = cap; runningQueue = []; waitingQueue = []; }当添加task时,如果running队列没满,即runningQueue.length < capacity,则直接添加到running队列中,并立即执行该task:
export function addTask(task: Task) { if (runningQueue.length < capacity) { runningQueue.push(task); run(task); } else { // 任务队列满了,执行添加到waiting队列中的逻辑 // 待实现 } }而若running队列满了,则需要添加到waiting队列中。值得注意的是,waiting队列需要考虑task的优先级,让优先级越高的越早执行。为此,不妨实现一个大顶堆(Heap),来维护这种优先级关系。
// heap.ts export type CompareType<T> = (a: T, b: T) => boolean; const {floor: int} = Math; // 数组内两个位置的值交换 function swap<T>(heap: T[], i: number, j: number) { [heap[i], heap[j]] = [heap[j], heap[i]]; } // 向上调整,执行push的时候 function shiftUp<T>(heap: T[], i: number, compare: CompareType<T>) { while (i > 0 && compare(heap[i], heap[int(i / 2)])) { swap(heap, i, int(i / 2)); i = int(i / 2); } } // 向下调整,执行pop的时候 function shiftDown<T>(heap: T[], i: number, compare: CompareType<T>) { while (i < heap.length) { let l = i * 2 + 1, r = l + 1, t = l; if (l >= heap.length) break; if (r < heap.length && compare(heap[r], heap[l])) t = r; if (compare(heap[i], heap[t])) break; swap(heap, i, t); i = t; } } export function push<T>(heap: T[], value: T, compare: CompareType<T>) { heap.push(value); shiftUp(heap, heap.length - 1, compare); } export function pop<T>(heap: T[], compare: CompareType<T>) { swap(heap, 0, heap.length - 1); const value = heap.pop(); shiftDown(heap, 0, compare); return value; } export function peek<T>(heap: T[]) { return heap[0]; }测试代码:
// 测试代码 const taskComparer: CompareType<Task> = (a, b) => a.priority > b.priority; const heap: Task[] = []; push(heap, new Task(async () => null, value => {}, err => {}, 1, null), taskComparer); push(heap, new Task(async () => null, value => {}, err => {}, 5, null), taskComparer); push(heap, new Task(async () => null, value => {}, err => {}, 3, null), taskComparer); push(heap, new Task(async () => null, value => {}, err => {}, 2, null), taskComparer); push(heap, new Task(async () => null, value => {}, err => {}, 0, null), taskComparer); push(heap, new Task(async () => null, value => {}, err => {}, 5, null), taskComparer); while (heap.length) { console.log("priority = ", pop(heap, taskComparer).priority); }这时,当running队列满了后需要添加至waiting队列,并考虑task的优先级时,只需要通过大顶堆的方式添加到waiting队列中即可:
const taskComparer: CompareType<Task> = (a, b) => a.priority > b.priority; export function addTask(task: Task) { if (runningQueue.length < capacity) { runningQueue.push(task); run(task); } else { // 任务队列满了,执行添加到waiting队列中的逻辑 push(waitingQueue, task, taskComparer); } }而run函数,则是调用task的run函数去执行当前task,并在task执行完毕后调用finished函数
function run(task: Task) { task.run().then(value => { // 调用task的run函数 finished(task); // 执行完毕调用finished函数 }, error => { finished(task); }) }其中,finished函数实现如下:
function finished(task: Task) { let index: number; // 从running队列中移除这个执行完的task if (runningQueue.length && (index = runningQueue.indexOf(task)) !== -1) runningQueue.splice(index, 1); // 从waiting队列中获取若干个task,扔到running队列中执行 while (runningQueue.length < capacity && waitingQueue.length) addTask(pop(waitingQueue, taskComparer)) }其逻辑也很清晰,即从running队列中移除这个已经执行完的task,并将waiting队列中优先级比较高的task取出来添加到running堆列中执行。
上述是一个非常简单的实现,但足以应付并发限制的场景;而如果考虑请求限流的话,还需要额外添加一个pending队列去处理需要限流的task,即通过setTimeout去推迟该task的执行,这个比较绕,以后再写~
此外,通过堆的方式去维护task的优先级会存在一直插入高优先级的task导致低优先级task没法执行的情况,这个需要看是否存在这个场景,如果有这种场景那么就需要使用插入排序的方式,将waiting队列维护成一个按优先级降序排序的队列,这样可以很方便的去修改低优先级task的优先级。
import Context from "./context"; // 堆代码 duidaima.com // interceptor.ts export interface InterceptorCallback<T = unknown> { // 拦截器函数定义 (param: T): Promise<T> } export default class Interceptor<T = unknown> { private list: InterceptorCallback<T>[] = []; public use(interceptorCallback: InterceptorCallback<T> | InterceptorCallback<T>[]) { // 添加拦截器 if (Array.isArray(interceptorCallback)) { this.list.push(...interceptorCallback); return; } this.list.push(interceptorCallback); } public remove(interceptorCallback: InterceptorCallback<T>) { // 移除一个拦截器 let index: number; if ((index = this.list.indexOf(interceptorCallback)) !== -1) this.list.splice(index, 1); } public async execute(param: T): Promise<T> { // 执行所有的拦截器 for (let callback of this.list) { param = await callback(param) } return param; } }举几个拦截器的例子,例如在进行接口请求前需要判断用户是否登录(未登录通常无法执行请求,并需要跳转到登录页面)
export const LoginInterceptor: InterceptorCallback<Context> = async (context) => { const { request } = context; // 如果本地没有存储登录信息并且不是调用登录接口,则执行登录操作(跳转到登录页等) if ((!user || !user.getOpenId()) && !request.url.includes('/user/info')) { await login.tryLogin(); } return context; }又例如向request的参数中添加每次请求都会携带的公共参数,即:
export const GlobalParamsInterceptor: InterceptorCallback<Context> = async (context) => { const { request } = context; const globalParams = { // 公共参数 token: '1234', }; for (let key in globalParams) { if (globalParams[key]) { request.params[key] = globalParams[key]; } } return context; }又比如拿到响应后对接口请求的耗时进行上报,如下:
export const ReportInterceptor: InterceptorCallback<Context> = async (context) => { const startTime = context.request.time; const endTime = Date.now(); // 上报请求-响应耗时 await reporter('requestTime', endTime - startTime); return context; };定义适配器
// adaptor.ts import Context from "./context"; export interface IFetchAdaptor { fetch(): Promise<Context>; use(context: Context): void; } // 请求适配器 export abstract class FetchAdaptor implements IFetchAdaptor { public context: Context; abstract fetch(); use(context: Context) { this.context = context; } }以web页面的XMLHttpRequest为例子,在web端的请求适配器实现如下:
// web端的请求适配器 export class WebFetchAdaptor extends FetchAdaptor { fetch() { return new Promise((resolve, reject) => { try { const xhr = new XMLHttpRequest(); let {url, method, request} = this.context; xhr.onerror = reject; xhr.onreadystatechange = (event) => { if (xhr.readyState === 4) { if ((xhr.status >= 200 && xhr.status < 300) || xhr.status === 304) { switch (xhr.responseType) { case "json": this.context.response = xhr.response; // 结果挂载至context上 resolve(this.context); break; case "text": this.context.response = JSON.parse(xhr.responseText); // 结果挂载在context上 resolve(this.context); break; default: reject(new Error()); break; } } } }; // get和post处理请求参数不同,一个放在url里,一个构造formData再send出去 switch (method) { case 'GET': xhr.open(method, `${url}&${stringify(request.params)}`, true); xhr.send(null); break; case 'POST': xhr.open(method, url, true); const formData = new FormData(); Object.keys(parent).forEach(k => { formData.append(k, typeof request.params[k] !== 'object' ? request.params[k] : JSON.stringify(request.params[k])); }); xhr.send(formData); } } catch (e) { reject(e); } }) } } // {a: 1, b: 2} => a=1&b=2 function stringify(params: Record<string | number, any>) { return Object.keys(params).reduce((acc, cur) => { return acc + `${cur}=${typeof params[cur] !== "object" ? params[cur] : JSON.stringify(params[cur])}&`; }, '').replace(/&$/g, ''); }组装请求框架
// http.ts import Interceptor, { InterceptorCallback } from "./Interceptor"; import {addTask, initializer} from './taskQueue'; import Task from "./task"; import Context, {RequestType} from "./context"; import {FetchAdaptor, WebFetchAdaptor} from "./adaptor"; export default class Http { private requestInterceptor: Interceptor<Context>; private responseInterceptor: Interceptor<Context>; private adaptor: FetchAdaptor; constructor( adaptor: FetchAdaptor = new WebFetchAdaptor(), requestInterceptors: InterceptorCallback<Context>[] = [], responseInterceptors: InterceptorCallback<Context>[] = [], capacity = 6 ) { initializer(capacity); // 初始化running队列容量 this.requestInterceptor = new Interceptor(); // 请求拦截器 this.responseInterceptor = new Interceptor(); // 响应拦截器 this.adaptor = adaptor; // 请求适配器 this.requestInterceptor.use(requestInterceptors); this.responseInterceptor.use(responseInterceptors); } private async send(req: RequestType): Promise<Context> { // 将最原始的context通过请求拦截器器进行相应的处理 const context = await this.requestInterceptor.execute(new Context( req.url, req.method, req )); // 适配器使用处理后的context this.adaptor.use(context); // 定义任务函数 const asyncCallback = async () => { return await this.responseInterceptor.execute( // 执行响应拦截器,对返回结果进行处理 await this.adaptor.fetch() // 调用适配器发起请求,返回结果是一个context ) }; return new Promise<Context>((resolve, reject) => { const task = new Task( // 封装成一个task asyncCallback, resolve, reject, req.priority, context, ); addTask(task); // 调用addTask,如果能添加到running队列里,则会直接执行asyncCallback,否则按照优先级插入到waiting队列中等待执行 }) } // 对外导出post函数 public async post(url: string, req: Omit<RequestType, 'method' | 'url'>) { return await this.send({ url, method: 'POST', ...req } as RequestType); } // 对外导出get函数 public async get(url: string, req: Omit<RequestType, 'method' | 'url'>) { return await this.send({ url, method: 'GET', ...req } as RequestType); } }简单的测试
let id = 0; export class TestFetchAdaptor extends FetchAdaptor { fetch(): Promise<Context> { return new Promise((resolve, reject) => { const p = Math.random(); const time = 3000 + p * (3000); this.context.response = { data: `Hello world! ${++id}`, errno: -1, time }; setTimeout(() => { resolve(this.context); }, time); // 3s ~ 6s 返回结果 }) } }index.ts中使用上述框架,代码比较简单:
// index.ts import Http from "./http"; import { TestFetchAdaptor } from "./adaptor"; import {ReportInterceptor, GlobalParamsInterceptor, LoginInterceptor} from "./Interceptor"; const requestInterceptors = [ LoginInterceptor, GlobalParamsInterceptor ]; const responseInterceptors = [ ReportInterceptor ]; const http = new Http( new TestFetchAdaptor(), requestInterceptors, responseInterceptors, 4 ); async function main() { await Promise.all([ http.get('url1', { params: {}, priority: 1}), http.get('url2', { params: {}, priority: 0}), http.get('url3', { params: {}, priority: 0}), http.post('url4', { params: {}, priority: 2}), http.get('url5', { params: {}, priority: 0}), http.post('url6', { params: {}, priority: 3}), http.get('url7', { params: {}, priority: 5}), http.get('url8', { params: {}, priority: 4}), http.get('url9', { params: {}, priority: 1}), http.get('url10', { params: {}, priority: 3}), http.post('url11', { params: {}, priority: 5 }), ]); } main().catch(console.error);并在finished函数中,每调用一次finished函数查看一次running队列和waiting队列的task情况,看是否按限制执行
let id = 0; function finished(task: Task) { let index: number; // 从running队列中移除这个执行完的task if (runningQueue.length && (index = runningQueue.indexOf(task)) !== -1) runningQueue.splice(index, 1); // 从waiting队列中获取若干个task,扔到running队列中执行 while (runningQueue.length < capacity && waitingQueue.length) addTask(pop(waitingQueue, taskComparer)) console.log(`-------------| Epoch ${id++} |-------------`) console.log("runningQueue: "); console.log(runningQueue.map(v => { return `{ url:${v.context.url}, priority: ${v.priority} }` })); console.log("waitingQueue: "); console.log(waitingQueue.map(v => { return `{ url:${v.context.url}, priority: ${v.priority} }` })); console.log('\n'); }ts-node index.ts命令执行如下: