// Greeter is the interface that we're exposing as a plugin. type Greeter interface { Greet() string }然后定义了一个实现这个插件接口的Greeter,这里是通过RPC去实现,所以需要一个rpc client
// Here is an implementation that talks over RPC type GreeterRPC struct{ client *rpc.Client } func (g *GreeterRPC) Greet() string { var resp string err := g.client.Call("Plugin.Greet", new(interface{}), &resp) if err != nil { // You usually want your interfaces to return errors. If they don't, // there isn't much other choice here. panic(err) } return resp }紧接着,又定义了一个RPCServer包装了一遍
// Here is the RPC server that GreeterRPC talks to, conforming to // the requirements of net/rpc type GreeterRPCServer struct { // This is the real implementation Impl Greeter } func (s *GreeterRPCServer) Greet(args interface{}, resp *string) error { *resp = s.Impl.Greet() return nil }最后的最后,才是插件的实现,主要是Server和Client这2个方法:
type GreeterPlugin struct { // Impl Injection Impl Greeter } func (p *GreeterPlugin) Server(*plugin.MuxBroker) (interface{}, error) { return &GreeterRPCServer{Impl: p.Impl}, nil } func (GreeterPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { return &GreeterRPC{client: c}, nil }RPC Plugin接口的定义:
// Plugin is the interface that is implemented to serve/connect to an // inteface implementation. type Plugin interface { // Server should return the RPC server compatible struct to serve // the methods that the Client calls over net/rpc. Server(*MuxBroker) (interface{}, error) // Client returns an interface implementation for the plugin you're // serving that communicates to the server end of the plugin. Client(*MuxBroker, *rpc.Client) (interface{}, error) }最终,在层层包装之下,这个文件定义了一个插件的框架,以及要实现的方法,但是还缺一个实现,实现是在greeter_impl.go文件里面
// Here is a real implementation of Greeter type GreeterHello struct { logger hclog.Logger } func (g *GreeterHello) Greet() string { g.logger.Debug("message from GreeterHello.Greet") return "Hello!" }然后就是main里面的内容,这块的操作简单说就是设置一些参数,启动一个RPC服务,等待请求的到来:
func main() { logger := hclog.New(&hclog.LoggerOptions{ Level: hclog.Trace, Output: os.Stderr, JSONFormat: true, }) greeter := &GreeterHello{ logger: logger, } // pluginMap is the map of plugins we can dispense. var pluginMap = map[string]plugin.Plugin{ "greeter": &example.GreeterPlugin{Impl: greeter}, } logger.Debug("message from plugin", "foo", "bar") plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: plugin.HandshakeConfig{ ProtocolVersion: 1, MagicCookieKey: "BASIC_PLUGIN", MagicCookieValue: "hello", }, Plugins: pluginMap, }) }最后别忘了编译插件,插件的编译实际上和普通Go程序没有什么区别,会得到一个二进制文件,后面会用到
go build -o ./plugin/greeter ./plugin/greeter_impl.go3.使用插件
func main() { // We're a host! Start by launching the plugin process. client := plugin.NewClient(&plugin.ClientConfig{ HandshakeConfig: plugin.HandshakeConfig{ ProtocolVersion: 1, MagicCookieKey: "BASIC_PLUGIN", MagicCookieValue: "hello", }, Plugins: map[string]plugin.Plugin{ "greeter": &example.GreeterPlugin{}, }, Cmd: exec.Command("./plugin/greeter"), Logger: hclog.New(&hclog.LoggerOptions{ Name: "plugin", Output: os.Stdout, Level: hclog.Debug, }), }) defer client.Kill() // Connect via RPC rpcClient, err := client.Client() if err != nil { log.Fatal(err) } // Request the plugin raw, err := rpcClient.Dispense("greeter") if err != nil { log.Fatal(err) } // We should have a Greeter now! This feels like a normal interface // implementation but is in fact over an RPC connection. greeter := raw.(example.Greeter) fmt.Println(greeter.Greet()) }这里面需要注意的是一个handshakeConfig里面的配置要和插件实现里面的一致,另外就是设置二进制执行文件的位置。其次,Plugins是一个map里面存储了插件名和插件定义的映射关系。
func Serve(opts *ServeConfig) { // 一些配置 ... // Register a listener so we can accept a connection listener, err := serverListener() if err != nil { logger.Error("plugin init error", "error", err) return } // Close the listener on return. We wrap this in a func() on purpose // because the "listener" reference may change to TLS. defer func() { listener.Close() }() // TLS的配置 ... // Create the channel to tell us when we're done doneCh := make(chan struct{}) // Build the server type var server ServerProtocol switch protoType { case ProtocolNetRPC: // If we have a TLS configuration then we wrap the listener // ourselves and do it at that level. if tlsConfig != nil { listener = tls.NewListener(listener, tlsConfig) } // Create the RPC server to dispense server = &RPCServer{ Plugins: pluginSet, Stdout: stdout_r, Stderr: stderr_r, DoneCh: doneCh, } case ProtocolGRPC: // Create the gRPC server server = &GRPCServer{ Plugins: pluginSet, Server: opts.GRPCServer, TLS: tlsConfig, Stdout: stdout_r, Stderr: stderr_r, DoneCh: doneCh, logger: logger, } default: panic("unknown server protocol: " + protoType) } // Initialize the servers if err := server.Init(); err != nil { logger.Error("protocol init", "error", err) return } ... // Accept connections and wait for completion go server.Serve(listener) ctx := context.Background() if opts.Test != nil && opts.Test.Context != nil { ctx = opts.Test.Context } select { case <-ctx.Done(): listener.Close() if s, ok := server.(*GRPCServer); ok { s.Stop() } // Wait for the server itself to shut down <-doneCh case <-doneCh: } }代码很多,这里只展示了核心代码,其实正做的一件事就是初始化并启动RPC服务,做好接受请求的准备。更多的代码在插件使用这块,首先我们New了一个Client,这个Client是维护插件的,而且是一个插件一个Client,所以如果你要实现多插件共存,可以去实现一个插件和Client的映射关系即可。
type Client struct { config *ClientConfig exited bool l sync.Mutex address net.Addr process *os.Process client ClientProtocol protocol Protocol logger hclog.Logger doneCtx context.Context ctxCancel context.CancelFunc negotiatedVersion int // clientWaitGroup is used to manage the lifecycle of the plugin management // goroutines. clientWaitGroup sync.WaitGroup // stderrWaitGroup is used to prevent the command's Wait() function from // being called before we've finished reading from the stderr pipe. stderrWaitGroup sync.WaitGroup // processKilled is used for testing only, to flag when the process was // forcefully killed. processKilled bool }在main里面当我们New完Client之后,依次调用了Client和Dispense2个方法,这个2个方法非常重要:
// 堆代码 duidaima.com // Client returns the protocol client for this connection. // Subsequent calls to this will return the same client. func (c *Client) Client() (ClientProtocol, error) { _, err := c.Start() if err != nil { return nil, err } c.l.Lock() defer c.l.Unlock() if c.client != nil { return c.client, nil } switch c.protocol { case ProtocolNetRPC: c.client, err = newRPCClient(c) case ProtocolGRPC: c.client, err = newGRPCClient(c.doneCtx, c) default: return nil, fmt.Errorf("unknown server protocol: %s", c.protocol) } if err != nil { c.client = nil return nil, err } return c.client, nil }其中c.Start这个方法干了很多事情,简单说就是根据配置里面的cmd,也就是咱们编译插件之后得到二进制可执行文件,启动插件的rpc服务。然后再根据协议的不同,启动RPC服务或者GRPC服务,得到一个真正可用Client,相当于就是通道已经打通了,接下来就是发起请求。
func (c *RPCClient) Dispense(name string) (interface{}, error) { p, ok := c.plugins[name] if !ok { return nil, fmt.Errorf("unknown plugin type: %s", name) } var id uint32 if err := c.control.Call( "Dispenser.Dispense", name, &id); err != nil { return nil, err } conn, err := c.broker.Dial(id) if err != nil { return nil, err } return p.Client(c.broker, rpc.NewClient(conn)) }Dispense方法就是根据插件名拿到对应的插件对象,然后又包装了一层拿到一个Client对象,还记得最初定义插件时候那个Client吗?
func (GreeterPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { return &GreeterRPC{client: c}, nil }最后,断言并调用插件的方法,这时候才是真正发起了RPC请求,并获取返回结果:
greeter := raw.(example.Greeter) greeter.Greet()有一点,我感觉特别奇怪,从这个插件的实现来看,Dispense这步更像是细分了插件里面的插件,因为在我理解,一个二进制文件就是一个插件,一个插件只有一个实现。但是很明显,这个库并不这么认为,它认为一个插件文件里面可以实现多个插件,所以它增加了一个Plugins来存储插件的映射关系,也就是说你可以在一个插件里面实现多个接口。
// getPluginSet returns list of plugins supported func getPluginSet() goplugin.PluginSet { return goplugin.PluginSet{ "diagnostics": &grpcplugin.DiagnosticsGRPCPlugin{}, "resource": &grpcplugin.ResourceGRPCPlugin{}, "data": &grpcplugin.DataGRPCPlugin{}, "stream": &grpcplugin.StreamGRPCPlugin{}, "renderer": &pluginextensionv2.RendererGRPCPlugin{}, } }五.畅想
4.如果真的要想好好用起这个库,确实还得花不少功夫,有一个好得地方是我们可以参考Hashicorp家的其它开源项目代码来完善,其实我也在想能不能稍
微把这个库封装一下提供一个简单易用的接口。