闽公网安备 35020302035485号

本文主要讲解请求应答模式和发布订阅模式,其他通信模式,如果感兴趣可以参考官方文档。
4.请求端(RequestSocket)接收响应端(ResponseSocket)发送的信息。
public class ZeroMQRequest: IDisposable
{
public Action < string > Received;
public Action < string > Sended;
private string url = string.Empty;
private RequestSocket request;
public ZeroMQRequest(string url)
{
//堆代码 duidaima.com
this.request = new RequestSocket();
this.url = url;
}
public void Connect()
{
request.Connect(this.url);
}
public void BeginReceive()
{
string msg = this.request.ReceiveFrameString();
Received ? .Invoke(msg);
}
public void SendMsg(string msg)
{
this.request.SendFrame(msg);
if(Sended != null)
{
Sended.Invoke(msg);
}
}
public void Disconnect()
{
request.Disconnect(this.url);
}
public void Dispose()
{
request.Close();
request.Dispose();
}
}
响应端接收消息之前,需要先进行绑定(Bind)到对应的网络端口,然后才能接收消息。示例代码如下所示:public class ZeroMQResponse: IDisposable
{
public Action < string > Received;
public Action < string > Sended;
private string url = string.Empty;
private ResponseSocket response;
public ZeroMQResponse(string url)
{
this.url = url;
this.response = new ResponseSocket();
this.response.Bind(this.url);
}
public void BeginReceive()
{
Task.Run(() =>
{
while(true)
{
string msg = this.response.ReceiveFrameString();
Received ? .Invoke(msg);
//收到回复
Send("Ok");
}
});
}
public void Send(string msg)
{
this.response.SendFrame(msg);
if(Sended != null)
{
Sended.Invoke(msg);
}
}
public void Dispose()
{
this.response.Dispose();
}
}
上述代码是将ReuqestSocket和ResponseSocket进行封装,并通过委托Action公开了接收和发送后响应接口,在使用时进行调用即可。


public class ZeroMQPublisher: IDisposable
{
private string url = string.Empty;
private PublisherSocket publisher;
public Action < string > Sended;
public ZeroMQPublisher(string url)
{
this.url = url;
this.publisher = new PublisherSocket();
this.publisher.Bind(url);
}
public void Send(string topic, string msg)
{
this.publisher.SendMoreFrame(topic);
this.publisher.SendFrame(msg);
if(Sended != null)
{
Sended.Invoke($ "send msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString("
yyyy - MM - dd HH: mm: ss ")}");
}
}
public void Dispose()
{
this.publisher.Close();
this.publisher.Dispose();
}
}
消息订阅类(SubscriberSocket),在消息接收之前,首先连接端口,订阅主题(Subscribe方法),然后才能进行消息的接收,示例代码如下所示:public class ZeroMQSubscriber: IDisposable
{
private string url = string.Empty;
private SubscriberSocket subscriber;
public Action < string > Received;
private bool isRunning = false;
public ZeroMQSubscriber(string url)
{
this.url = url;
this.subscriber = new SubscriberSocket();
this.subscriber.Connect(url);
this.subscriber.Subscribe(string.Empty);
this.isRunning = true;
}
public void BeginReceive()
{
Task.Run(() =>
{
while(isRunning)
{
var topic = this.subscriber.ReceiveFrameString();
var msg = this.subscriber.ReceiveFrameString();
if(Received != null)
{
Received.Invoke($ "received msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString("
yyyy - MM - dd HH: mm: ss ")}");
}
}
});
}
public void DisConnect()
{
isRunning = false;
this.subscriber.Disconnect(this.url);
}
public void Dispose()
{
this.isRunning = false;
this.subscriber.Close();
this.subscriber ? .Dispose();
}
}
注意,发布订阅模式是单向触发的,即消息发布者,不可以接收消息;消息接收者,也不可以发布消息。接收端在调用Subscribe方法时,如果主题为空,则表示可以订阅任何主题。发布订阅模式示例截图如下所示: