NetMQ接收/响应循环不工作
问题描述:
我拿了https://netmq.readthedocs.io/上的一个简单的接收/请求套接字示例,并希望使它在无限循环中与parametrizedThread一起工作。 代码工作正常几圈后,它抛出NetMQ接收/响应循环不工作
非阻塞套接字操作无法立即
完成了什么,我得到了上面应该在第一循环结束后立即发生而不是随机的。这里有什么问题?这听起来像是必须刷新才能再次获得干净的连接(不确定)。
class Program
{
public class Connector
{
public String connection { get; set; }
public ResponseSocket server { get; set; }
public Connector(string address, ResponseSocket server_)
{
this.connection = address;
this.server = server_;
}
}
static void Main(string[] args)
{
string connection = "tcp://localhost:5555";
using (var server = new ResponseSocket())
{
while (true)
{
try
{
server.Bind(connection);
}
catch (NetMQException e)
{
Console.WriteLine(e.ErrorCode);
}
Connector c = new Connector(connection, server);
ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
Thread t = new Thread(parametrizedClientThread);
t.Start(c);
//runClientSide(connection, server);
}
}
}
private static void runClientSide(object param)
{
Connector conn = (Connector)param;
string connection = conn.connection;
ResponseSocket server = conn.server;
using (var client = new RequestSocket())
{
client.Connect(connection);
client.SendFrame("Hello");
string fromClientMessage = server.ReceiveFrameString();
Console.WriteLine("From Client: {0}", fromClientMessage);
server.SendFrame("Hi Back");
string fromServerMessage = client.ReceiveFrameString();
Console.WriteLine("From Server: {0}", fromServerMessage);
//Console.ReadLine();
}
}
答
NetMQSockets不是线程安全的,而且您正在从客户端线程内部访问服务器以发送/接收数据。无论如何,客户端不应该有权访问服务器套接字。
首先将Bind移到while循环的外部,它只需要一次,而不是为每个创建的客户端。 要等待消息使用NetMQPoller,它将为您处理其他所有事情,并在接收到消息后引发服务器ReceiveReady事件。
static void Main(string[] args) {
string connection = "tcp://localhost:5555";
using (var poller = new NetMQPoller()) {
using (var server = new ResponseSocket()) {
server.ReceiveReady += Server_ReceiveReady;
poller.Add(server);
poller.RunAsync();
server.Bind(connection);
// start 10000 clients
for(int i = 0; i < 10000; i++) {
ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
Thread t = new Thread(parametrizedClientThread);
t.Start(connection);
}
Console.ReadLine(); //let server run until user pressed Enter key
}
}
}
//server (e.Socket) is receiving data here and can answer it
private static void Server_ReceiveReady(object sender, NetMQSocketEventArgs e) {
string fromClientMessage = e.Socket.ReceiveFrameString();
Console.WriteLine("From Client: {0}", fromClientMessage);
e.Socket.SendFrame("Hi Back");
}
private static void runClientSide(object param) {
string connection = (string) param;
using (var client = new RequestSocket()) {
client.Connect(connection);
client.SendFrame("Hello");
//Removed server side code here and put it into ReceiveReady event
string fromServerMessage = client.ReceiveFrameString();
Console.WriteLine("From Server: {0}", fromServerMessage);
}
}
+0
谢谢,我现在看到我做错了。非常感谢帮助! – yp10
您只能使用相同的端口号打开一个到服务器的连接。您必须在打开新连接之前关闭连接,或者在每封邮件后都不要关闭连接。 while循环看起来不正确。 – jdweng
不确定问题是否循环。事实上,如果你删除了parametrizedThread部分并使用注释掉的方法(修改下面的输入参数),那么代码在循环中工作正常。对于我所知道的请求/响应不是线程安全的。不知道有什么问题。 – yp10
main()方法与跨线程的Thread不在同一进程中,并且需要使用“Invoke”在两个进程之间传输数据。 – jdweng