本文主要讲解请求应答模式和发布订阅模式,其他通信模式,如果感兴趣可以参考官方文档。
4.请求端(RequestSocket)接收响应端(ResponseSocket)发送的信息。
public class ZeroMQRequest: IDisposable { public Action < string > Received; public Action < string > Sended; private string url = string.Empty; private RequestSocket request; public ZeroMQRequest(string url) { //堆代码 duidaima.com this.request = new RequestSocket(); this.url = url; } public void Connect() { request.Connect(this.url); } public void BeginReceive() { string msg = this.request.ReceiveFrameString(); Received ? .Invoke(msg); } public void SendMsg(string msg) { this.request.SendFrame(msg); if(Sended != null) { Sended.Invoke(msg); } } public void Disconnect() { request.Disconnect(this.url); } public void Dispose() { request.Close(); request.Dispose(); } }响应端接收消息之前,需要先进行绑定(Bind)到对应的网络端口,然后才能接收消息。示例代码如下所示:
public class ZeroMQResponse: IDisposable { public Action < string > Received; public Action < string > Sended; private string url = string.Empty; private ResponseSocket response; public ZeroMQResponse(string url) { this.url = url; this.response = new ResponseSocket(); this.response.Bind(this.url); } public void BeginReceive() { Task.Run(() => { while(true) { string msg = this.response.ReceiveFrameString(); Received ? .Invoke(msg); //收到回复 Send("Ok"); } }); } public void Send(string msg) { this.response.SendFrame(msg); if(Sended != null) { Sended.Invoke(msg); } } public void Dispose() { this.response.Dispose(); } }上述代码是将ReuqestSocket和ResponseSocket进行封装,并通过委托Action公开了接收和发送后响应接口,在使用时进行调用即可。
public class ZeroMQPublisher: IDisposable { private string url = string.Empty; private PublisherSocket publisher; public Action < string > Sended; public ZeroMQPublisher(string url) { this.url = url; this.publisher = new PublisherSocket(); this.publisher.Bind(url); } public void Send(string topic, string msg) { this.publisher.SendMoreFrame(topic); this.publisher.SendFrame(msg); if(Sended != null) { Sended.Invoke($ "send msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString(" yyyy - MM - dd HH: mm: ss ")}"); } } public void Dispose() { this.publisher.Close(); this.publisher.Dispose(); } }消息订阅类(SubscriberSocket),在消息接收之前,首先连接端口,订阅主题(Subscribe方法),然后才能进行消息的接收,示例代码如下所示:
public class ZeroMQSubscriber: IDisposable { private string url = string.Empty; private SubscriberSocket subscriber; public Action < string > Received; private bool isRunning = false; public ZeroMQSubscriber(string url) { this.url = url; this.subscriber = new SubscriberSocket(); this.subscriber.Connect(url); this.subscriber.Subscribe(string.Empty); this.isRunning = true; } public void BeginReceive() { Task.Run(() => { while(isRunning) { var topic = this.subscriber.ReceiveFrameString(); var msg = this.subscriber.ReceiveFrameString(); if(Received != null) { Received.Invoke($ "received msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString(" yyyy - MM - dd HH: mm: ss ")}"); } } }); } public void DisConnect() { isRunning = false; this.subscriber.Disconnect(this.url); } public void Dispose() { this.isRunning = false; this.subscriber.Close(); this.subscriber ? .Dispose(); } }注意,发布订阅模式是单向触发的,即消息发布者,不可以接收消息;消息接收者,也不可以发布消息。接收端在调用Subscribe方法时,如果主题为空,则表示可以订阅任何主题。
发布订阅模式示例截图如下所示: