• Go中如何实现对 Http Client 请求的管道化处理
  • 发布于 2个月前
  • 298 热度
    0 评论
目标
在使用 net/http client 对外发送 http 请求的时候,实现对请求的管道化(切面或拦截器)处理。

什么是管道化处理
管道化处理英文为 pipeline,这种设计模式和责任链模式很像,是责任链模式的一种变体。在管道中存在多个 handler(处理器),每个 handler 负责一部分功能,例如日志、重试、熔断等;不同的 handler 之间,通过 context 共享数据;所有的 handler 串联在一起,进而形成 pipeline 管道。

pipeline、hander 和 context 是管道模式中三个最重要的概念。
管道化设计的优势
1.管道模式极大地体现了 _单一职责原则_:每个 handler 负责一个功能,有利于提高代码的可测试性;

2.如果需要新增功能,也只需要新增一个 handler 即可,这也是 开闭原则 的体现,能够实现对系统更加友好的运维。


如何实现
如前所述,管道模式有三个重要的功能:pipeline、hander 和 context,我们会依次实现这三个部分。但是对于 Go net/http 库来说,还要有相应的适配。

PipelineTransport
由于这个 pipeline 是面向 net/http 使用,因此需要实现 RoundTripper 接口——RoundTripper 负责 HTTP 请求的建立,发送,接收 HTTP 应答以及关闭——在 RoundTripper 的实现 PipelineTransport 中,达到对 http 请求管道 handler 管理的目的。
具体实现上,我们直接嵌入 http.Transport 进行扩展(我喜欢称之为继承)即可:
// 有 pipeline 能力的 Transport:PipelineTransport
type PipelineTransport struct {
  http.Transport  // 继承 Transport
  pipelineManager *PipelineManager
}

// http 请求的入口,重写 http.Transport 中对 RoundTripper.RoundTrip 的实现
func (transportHandler *PipelineTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  return transportHandler.pipelineManager.Invoke(req)
}

// 执行基类的 RoundTrip
func (transportHandler *PipelineTransport) BaseRoundTrip(req *http.Request) (*http.Response, error) {
  return transportHandler.Transport.RoundTrip(req)
}
// 堆代码 duidaima.com
// 创建 PipelineTransport
func NewPipelineTransport(messageHandlers ...MessageHandler) *PipelineTransport {
  pipelineTransport := &PipelineTransport{}
  pipelineTransport.pipelineManager = NewPipelineManager(messageHandlers, pipelineTransport)
  return pipelineTransport
}
context
context 用来实现 handler 之间的数据共享。既然是为 http client 创建的 pipeline,那么 context 中最重要的就是 http.Request。实际上,如果不共享其他数据,也可以直接将 http.Request 作为 context。
type PipelineContext struct {
  Req *http.Request
}

// 创建 PipelineContext
func NewPipelineContext(req *http.Request) *PipelineContext {
  return &PipelineContext{
    Req: req,
  }
}
hander
handler 是 pipeline 中对 http 消息进行处理的处理器,用于规范业务使用中的实现,只需要定义接口即可。这里还定义了一个 Next 类型,底层类型是一个函数(可以认为是这个函数的别名),用于在 handler 中调用下一个 handler(如果需要)。当然,这个 Next 函数,也可以在 context 中定义。
type Next func(context *PipelineContext) (*http.Response, error)

// 消息处理器
type MessageHandler interface {
  Handle(*PipelineContext, Next) (*http.Response, error)
}
pipeline
首先定义一个管道模型接口 Pipeline,该接口有一个方法 Invoke,用于作为管道模型的入口。其次定义接口 Pipeline 的实现 PipelineManager。PipelineManager 中有两个字段:messageHandlers 和 pipelineTransport。_messageHandlers_ 用来保存 pipeline 中的所有 handler。如果对于不同的 request,有不同的 handler,可以将所有的 handler(messageHandlers) 保存到 context 中。_pipelineTransport_ 用于在 handler 全部执行完毕后,向远程主机发出 http 请求(如果需要)。

在 Pipeline 接口中的 Invoke 函数,定义了一个Next 类型匿名函数 next,用于传递给 handler。
// 管道模型 PipelineManager 接口
type Pipeline interface {
  Invoke(req *http.Request) (*http.Response, error)
}

type PipelineManager struct {
  pipelineTransport *PipelineTransport
  messageHandlers   []MessageHandler
}

func (pipeline *PipelineManager) Invoke(req *http.Request) (*http.Response, error) {
  var next Next
  curIndex := 0
  next = func(context *PipelineContext) (*http.Response, error) {
    if curIndex < len(pipeline.messageHandlers) {
      messageHandler := pipeline.messageHandlers[curIndex]
      curIndex += 1
      return messageHandler.Handle(context, next)
    }
    // 中间件全部执行完了,执行父 Transport 中的 RoundTrip 方法
    return pipeline.pipelineTransport.BaseRoundTrip(context.Req)
  }

  context := NewPipelineContext(req)
  return next(context)
}

// 创建 PipelineManager
func NewPipelineManager(messageHandlers []MessageHandler, pipelineTransport *PipelineTransport) *PipelineManager {
  return &PipelineManager{
    pipelineTransport: pipelineTransport,
    messageHandlers:   messageHandlers,
  }
}

使用
实现一个 handler
假设,上述中的 pipeline 实现存放在 pipeline 包中,我们可以定义一个 LogMsgHandler:对每次 http 请求进行日志输出。
type LogMsgHandler struct{}

func (s *LogMsgHandler) Handle(context *pipeline.PipelineContext, next pipeline.Next) (resp *http.Response, respError error) {
  //  处理异常
  defer func() {
    if r := recover(); r != nil {
      log.Printf("【LogMsgHandler】http request end with panic recover: %v", r)
      respError = errors.New("【LogMsgHandler】http request end with panic recover")
    }
  }()
 
  log.Printf("【LogMsgHandler】http request start...")

  start := time.Now()
  resp, respError = next(context)
  duration := time.Since(start)
  ms := duration.Milliseconds()

  log.Printf("【LogMsgHandler】http request end, duration: %d ms", ms)
  return
}
使用 handler
现在,通过如下步骤,我们即可使用对 http 请求进行管道化处理:
1.通过 NewPipelineTransport 创建一个 RoundTripper 接口:transport
2.将 transport 传递给 http.Client
3.发送一个 http 请求
func createHttpClient() *http.Client {
  // order
  // req: LogMsgHandler --> MockHandler
  // rsp: MockHandler --> req:LogMsgHandler
  transport := pipeline.NewPipelineTransport(
    &sample.LogMsgHandler{},
    &sample.MockHandler{},
  )

  return &http.Client{
    Transport: transport,
  }
}

func main() {
  httpClient := createHttpClient()

  // using httpClient
  httpReq, _ := http.NewRequest("GET", "https://example.com/", nil)
  httpResp, err := httpClient.Do(httpReq)
 
 // parse http response
  if err != nil {
    log.Printf("send http return error: %v", err)
  } else {
    log.Printf("httpResp StatusCode: %d", httpResp.StatusCode)
    respBody, err := io.ReadAll(httpResp.Body)
    if err != nil {
      log.Printf("httpResp respBody error: %v", err)
    } else {
      log.Printf("httpResp respBody: %s", string(respBody))
    }
    if httpResp.Body != nil {
      httpResp.Body.Close()
    }
  }
}

用户评论