2.如果需要新增功能,也只需要新增一个 handler 即可,这也是 开闭原则 的体现,能够实现对系统更加友好的运维。
// 有 pipeline 能力的 Transport:PipelineTransport type PipelineTransport struct { http.Transport // 继承 Transport pipelineManager *PipelineManager } // http 请求的入口,重写 http.Transport 中对 RoundTripper.RoundTrip 的实现 func (transportHandler *PipelineTransport) RoundTrip(req *http.Request) (*http.Response, error) { return transportHandler.pipelineManager.Invoke(req) } // 执行基类的 RoundTrip func (transportHandler *PipelineTransport) BaseRoundTrip(req *http.Request) (*http.Response, error) { return transportHandler.Transport.RoundTrip(req) } // 堆代码 duidaima.com // 创建 PipelineTransport func NewPipelineTransport(messageHandlers ...MessageHandler) *PipelineTransport { pipelineTransport := &PipelineTransport{} pipelineTransport.pipelineManager = NewPipelineManager(messageHandlers, pipelineTransport) return pipelineTransport }context
type PipelineContext struct { Req *http.Request } // 创建 PipelineContext func NewPipelineContext(req *http.Request) *PipelineContext { return &PipelineContext{ Req: req, } }hander
type Next func(context *PipelineContext) (*http.Response, error) // 消息处理器 type MessageHandler interface { Handle(*PipelineContext, Next) (*http.Response, error) }pipeline
// 管道模型 PipelineManager 接口 type Pipeline interface { Invoke(req *http.Request) (*http.Response, error) } type PipelineManager struct { pipelineTransport *PipelineTransport messageHandlers []MessageHandler } func (pipeline *PipelineManager) Invoke(req *http.Request) (*http.Response, error) { var next Next curIndex := 0 next = func(context *PipelineContext) (*http.Response, error) { if curIndex < len(pipeline.messageHandlers) { messageHandler := pipeline.messageHandlers[curIndex] curIndex += 1 return messageHandler.Handle(context, next) } // 中间件全部执行完了,执行父 Transport 中的 RoundTrip 方法 return pipeline.pipelineTransport.BaseRoundTrip(context.Req) } context := NewPipelineContext(req) return next(context) } // 创建 PipelineManager func NewPipelineManager(messageHandlers []MessageHandler, pipelineTransport *PipelineTransport) *PipelineManager { return &PipelineManager{ pipelineTransport: pipelineTransport, messageHandlers: messageHandlers, } }
type LogMsgHandler struct{} func (s *LogMsgHandler) Handle(context *pipeline.PipelineContext, next pipeline.Next) (resp *http.Response, respError error) { // 处理异常 defer func() { if r := recover(); r != nil { log.Printf("【LogMsgHandler】http request end with panic recover: %v", r) respError = errors.New("【LogMsgHandler】http request end with panic recover") } }() log.Printf("【LogMsgHandler】http request start...") start := time.Now() resp, respError = next(context) duration := time.Since(start) ms := duration.Milliseconds() log.Printf("【LogMsgHandler】http request end, duration: %d ms", ms) return }使用 handler
func createHttpClient() *http.Client { // order // req: LogMsgHandler --> MockHandler // rsp: MockHandler --> req:LogMsgHandler transport := pipeline.NewPipelineTransport( &sample.LogMsgHandler{}, &sample.MockHandler{}, ) return &http.Client{ Transport: transport, } } func main() { httpClient := createHttpClient() // using httpClient httpReq, _ := http.NewRequest("GET", "https://example.com/", nil) httpResp, err := httpClient.Do(httpReq) // parse http response if err != nil { log.Printf("send http return error: %v", err) } else { log.Printf("httpResp StatusCode: %d", httpResp.StatusCode) respBody, err := io.ReadAll(httpResp.Body) if err != nil { log.Printf("httpResp respBody error: %v", err) } else { log.Printf("httpResp respBody: %s", string(respBody)) } if httpResp.Body != nil { httpResp.Body.Close() } } }