闽公网安备 35020302035485号
// Greeter is the interface that we're exposing as a plugin.
type Greeter interface {
Greet() string
}
然后定义了一个实现这个插件接口的Greeter,这里是通过RPC去实现,所以需要一个rpc client// Here is an implementation that talks over RPC
type GreeterRPC struct{ client *rpc.Client }
func (g *GreeterRPC) Greet() string {
var resp string
err := g.client.Call("Plugin.Greet", new(interface{}), &resp)
if err != nil {
// You usually want your interfaces to return errors. If they don't,
// there isn't much other choice here.
panic(err)
}
return resp
}
紧接着,又定义了一个RPCServer包装了一遍// Here is the RPC server that GreeterRPC talks to, conforming to
// the requirements of net/rpc
type GreeterRPCServer struct {
// This is the real implementation
Impl Greeter
}
func (s *GreeterRPCServer) Greet(args interface{}, resp *string) error {
*resp = s.Impl.Greet()
return nil
}
最后的最后,才是插件的实现,主要是Server和Client这2个方法:type GreeterPlugin struct {
// Impl Injection
Impl Greeter
}
func (p *GreeterPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
return &GreeterRPCServer{Impl: p.Impl}, nil
}
func (GreeterPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &GreeterRPC{client: c}, nil
}
RPC Plugin接口的定义:// Plugin is the interface that is implemented to serve/connect to an
// inteface implementation.
type Plugin interface {
// Server should return the RPC server compatible struct to serve
// the methods that the Client calls over net/rpc.
Server(*MuxBroker) (interface{}, error)
// Client returns an interface implementation for the plugin you're
// serving that communicates to the server end of the plugin.
Client(*MuxBroker, *rpc.Client) (interface{}, error)
}
最终,在层层包装之下,这个文件定义了一个插件的框架,以及要实现的方法,但是还缺一个实现,实现是在greeter_impl.go文件里面// Here is a real implementation of Greeter
type GreeterHello struct {
logger hclog.Logger
}
func (g *GreeterHello) Greet() string {
g.logger.Debug("message from GreeterHello.Greet")
return "Hello!"
}
然后就是main里面的内容,这块的操作简单说就是设置一些参数,启动一个RPC服务,等待请求的到来:func main() {
logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
Output: os.Stderr,
JSONFormat: true,
})
greeter := &GreeterHello{
logger: logger,
}
// pluginMap is the map of plugins we can dispense.
var pluginMap = map[string]plugin.Plugin{
"greeter": &example.GreeterPlugin{Impl: greeter},
}
logger.Debug("message from plugin", "foo", "bar")
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "BASIC_PLUGIN",
MagicCookieValue: "hello",
},
Plugins: pluginMap,
})
}
最后别忘了编译插件,插件的编译实际上和普通Go程序没有什么区别,会得到一个二进制文件,后面会用到go build -o ./plugin/greeter ./plugin/greeter_impl.go3.使用插件
func main() {
// We're a host! Start by launching the plugin process.
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "BASIC_PLUGIN",
MagicCookieValue: "hello",
},
Plugins: map[string]plugin.Plugin{
"greeter": &example.GreeterPlugin{},
},
Cmd: exec.Command("./plugin/greeter"),
Logger: hclog.New(&hclog.LoggerOptions{
Name: "plugin",
Output: os.Stdout,
Level: hclog.Debug,
}),
})
defer client.Kill()
// Connect via RPC
rpcClient, err := client.Client()
if err != nil {
log.Fatal(err)
}
// Request the plugin
raw, err := rpcClient.Dispense("greeter")
if err != nil {
log.Fatal(err)
}
// We should have a Greeter now! This feels like a normal interface
// implementation but is in fact over an RPC connection.
greeter := raw.(example.Greeter)
fmt.Println(greeter.Greet())
}
这里面需要注意的是一个handshakeConfig里面的配置要和插件实现里面的一致,另外就是设置二进制执行文件的位置。其次,Plugins是一个map里面存储了插件名和插件定义的映射关系。func Serve(opts *ServeConfig) {
// 一些配置
...
// Register a listener so we can accept a connection
listener, err := serverListener()
if err != nil {
logger.Error("plugin init error", "error", err)
return
}
// Close the listener on return. We wrap this in a func() on purpose
// because the "listener" reference may change to TLS.
defer func() {
listener.Close()
}()
// TLS的配置
...
// Create the channel to tell us when we're done
doneCh := make(chan struct{})
// Build the server type
var server ServerProtocol
switch protoType {
case ProtocolNetRPC:
// If we have a TLS configuration then we wrap the listener
// ourselves and do it at that level.
if tlsConfig != nil {
listener = tls.NewListener(listener, tlsConfig)
}
// Create the RPC server to dispense
server = &RPCServer{
Plugins: pluginSet,
Stdout: stdout_r,
Stderr: stderr_r,
DoneCh: doneCh,
}
case ProtocolGRPC:
// Create the gRPC server
server = &GRPCServer{
Plugins: pluginSet,
Server: opts.GRPCServer,
TLS: tlsConfig,
Stdout: stdout_r,
Stderr: stderr_r,
DoneCh: doneCh,
logger: logger,
}
default:
panic("unknown server protocol: " + protoType)
}
// Initialize the servers
if err := server.Init(); err != nil {
logger.Error("protocol init", "error", err)
return
}
...
// Accept connections and wait for completion
go server.Serve(listener)
ctx := context.Background()
if opts.Test != nil && opts.Test.Context != nil {
ctx = opts.Test.Context
}
select {
case <-ctx.Done():
listener.Close()
if s, ok := server.(*GRPCServer); ok {
s.Stop()
}
// Wait for the server itself to shut down
<-doneCh
case <-doneCh:
}
}
代码很多,这里只展示了核心代码,其实正做的一件事就是初始化并启动RPC服务,做好接受请求的准备。更多的代码在插件使用这块,首先我们New了一个Client,这个Client是维护插件的,而且是一个插件一个Client,所以如果你要实现多插件共存,可以去实现一个插件和Client的映射关系即可。type Client struct {
config *ClientConfig
exited bool
l sync.Mutex
address net.Addr
process *os.Process
client ClientProtocol
protocol Protocol
logger hclog.Logger
doneCtx context.Context
ctxCancel context.CancelFunc
negotiatedVersion int
// clientWaitGroup is used to manage the lifecycle of the plugin management
// goroutines.
clientWaitGroup sync.WaitGroup
// stderrWaitGroup is used to prevent the command's Wait() function from
// being called before we've finished reading from the stderr pipe.
stderrWaitGroup sync.WaitGroup
// processKilled is used for testing only, to flag when the process was
// forcefully killed.
processKilled bool
}
在main里面当我们New完Client之后,依次调用了Client和Dispense2个方法,这个2个方法非常重要:// 堆代码 duidaima.com
// Client returns the protocol client for this connection.
// Subsequent calls to this will return the same client.
func (c *Client) Client() (ClientProtocol, error) {
_, err := c.Start()
if err != nil {
return nil, err
}
c.l.Lock()
defer c.l.Unlock()
if c.client != nil {
return c.client, nil
}
switch c.protocol {
case ProtocolNetRPC:
c.client, err = newRPCClient(c)
case ProtocolGRPC:
c.client, err = newGRPCClient(c.doneCtx, c)
default:
return nil, fmt.Errorf("unknown server protocol: %s", c.protocol)
}
if err != nil {
c.client = nil
return nil, err
}
return c.client, nil
}
其中c.Start这个方法干了很多事情,简单说就是根据配置里面的cmd,也就是咱们编译插件之后得到二进制可执行文件,启动插件的rpc服务。然后再根据协议的不同,启动RPC服务或者GRPC服务,得到一个真正可用Client,相当于就是通道已经打通了,接下来就是发起请求。func (c *RPCClient) Dispense(name string) (interface{}, error) {
p, ok := c.plugins[name]
if !ok {
return nil, fmt.Errorf("unknown plugin type: %s", name)
}
var id uint32
if err := c.control.Call(
"Dispenser.Dispense", name, &id); err != nil {
return nil, err
}
conn, err := c.broker.Dial(id)
if err != nil {
return nil, err
}
return p.Client(c.broker, rpc.NewClient(conn))
}
Dispense方法就是根据插件名拿到对应的插件对象,然后又包装了一层拿到一个Client对象,还记得最初定义插件时候那个Client吗?func (GreeterPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &GreeterRPC{client: c}, nil
}
最后,断言并调用插件的方法,这时候才是真正发起了RPC请求,并获取返回结果:greeter := raw.(example.Greeter) greeter.Greet()有一点,我感觉特别奇怪,从这个插件的实现来看,Dispense这步更像是细分了插件里面的插件,因为在我理解,一个二进制文件就是一个插件,一个插件只有一个实现。但是很明显,这个库并不这么认为,它认为一个插件文件里面可以实现多个插件,所以它增加了一个Plugins来存储插件的映射关系,也就是说你可以在一个插件里面实现多个接口。
// getPluginSet returns list of plugins supported
func getPluginSet() goplugin.PluginSet {
return goplugin.PluginSet{
"diagnostics": &grpcplugin.DiagnosticsGRPCPlugin{},
"resource": &grpcplugin.ResourceGRPCPlugin{},
"data": &grpcplugin.DataGRPCPlugin{},
"stream": &grpcplugin.StreamGRPCPlugin{},
"renderer": &pluginextensionv2.RendererGRPCPlugin{},
}
}
五.畅想4.如果真的要想好好用起这个库,确实还得花不少功夫,有一个好得地方是我们可以参考Hashicorp家的其它开源项目代码来完善,其实我也在想能不能稍
微把这个库封装一下提供一个简单易用的接口。