如何在Erlang或Elixir中使用ZeroMQ实现非阻塞套接字?

如何在Erlang或Elixir中使用ZeroMQ实现非阻塞套接字?

问题描述:

在Python我有使用“轮询”对象,其轮询阻断等待消息插座和放开毫秒的指定次数后的选项(在壳体下面,1000,在同时真块):如何在Erlang或Elixir中使用ZeroMQ实现非阻塞套接字?

import zmq 

# now open up all the sockets 
context = zmq.Context() 
outsub = context.socket(zmq.SUB) 
outsub.bind("tcp://" + myip + ":" + str(args.outsubport)) 
outsub.setsockopt(zmq.SUBSCRIBE, b"") 
inreq = context.socket(zmq.ROUTER) 
inreq.bind("tcp://" + myip + ":" + str(args.inreqport)) 
outref = context.socket(zmq.ROUTER) 
outref.bind("tcp://" + myip + ":" + str(args.outrefport)) 
req = context.socket(zmq.ROUTER) 
req.bind("tcp://" + myip + ":" + str(args.reqport)) 
repub = context.socket(zmq.PUB) 
repub.bind("tcp://" + myip + ":" + str(args.repubport)) 

# sort out the poller 
poller = zmq.Poller() 
poller.register(inreq, zmq.POLLIN) 
poller.register(outsub, zmq.POLLIN) 
poller.register(outref, zmq.POLLIN) 
poller.register(req, zmq.POLLIN) 

# UDP socket setup for broadcasting this server's address 
cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 

# housekeeping variables 
pulsecheck = datetime.utcnow() + timedelta(seconds = 1) 
alivelist = dict() 
pulsetimeout = 5 

while True: 
    polls = dict(poller.poll(1000)) 
    if inreq in polls: 
     msg = inreq.recv_multipart() 
     if msg[1] == b"pulse":   # handle pluse 
      ansi("cyan", False, textout = " pulse" + "-" + msg[0].decode()) 
      if not msg[0] in alivelist.keys(): 
       handlechange(msg[0]) 
      alivelist[msg[0]] = datetime.utcnow() + timedelta(seconds = pulsetimeout) 
    if outsub in polls: 
     msgin = outsub.recv_multipart()[0] 
     repub.send(msgin) # republish 
     msg = unpacker(msgin) 
     if isinstance(msg, dict): 
      valu = msg.get("value") 
      print(".", end = "", flush = True) 
     else: 
      ansi("green", False, textout = msg) 

    if req in polls: 
     msg = req.recv_multipart() 
     valmsg = validate_request(msg) 
     if not valmsg[0]: 
      ansi("red", True); print(valmsg[1]); ansi() 
     elif len(alivelist) > 0: 
      targetnode = random.choice(list(alivelist.keys())) 
      inreq.send_multipart([targetnode, packer(valmsg[1])]) 
      ansi("blue", True, textout = "sent to " + targetnode.decode()) 
     else: 
      ansi("red", True, textout = "NO CONNECTED NODES TO SEND REQUEST TO") 
    if outref in polls: 
     msg = outref.recv_multipart() 
     destinataire, correlid = msg[1].split(b"/") 
     req.send_multipart([destinataire, correlid, msg[2]]) 

我想在Elixir(或Erlang)中实现类似的东西,但我的首选本机库chumak似乎没有实现轮询。我如何在Erlang/Elixir中实现非阻塞接收,最好是使用Chumak,但如果需要,我会移至另一个Erlang zeroMQ库?我的套接字模式首选是路由器发送,经销商收到。

编辑

我的使用情况如下。我有第三方金融服务,根据请求提供数据,异步回答。因此,您可以发送多个请求,并在未指定的时间段后收到回复,但不一定按照您发送给他们的顺序回复。

所以我需要将这个服务连接到Erlang(实际上是Elixir)和ZeroMQ似乎是一个很好的契合。连接(通过Phoenix)到Erlang/Elixir的多个用户将发送请求,并且我需要将这些请求传递给此服务。

如果其中一个请求发生错误,或者第三方服务存在某种问题,则会出现问题。我将阻止 - 等待响应,然后无法为凤凰城的新请求提供服务。

基本上我想经常听新的请求,发送它们,但如果一个请求没有产生响应,我会有比请求少一个响应,这会导致永久等待。

我明白,如果我分别发送请求,那么好的会产生响应,所以即使随着时间的推移,我发送的请求和接收到的响应之间的数字差别也不会很大。也许设计理念是我不应该担心这个?或者我应该尝试追踪一对一的请求响应并以某种方式暂停非响应?这是一种有效的设计模式吗?

+0

使用一个进程拥有套接字,阻止接收和*反应*接收消息。使用另一个流程来完成其他任务(可能按照计划)。 *你不想轮询*,你想建立一个逻辑系统'链接进程,执行一些任务的合作*没有轮询*。这是Erlang并发方法的重点。如果你解释你试图达到的整体效果,它会有很大帮助 - 因为这几乎肯定是一个X-Y问题。有一个[Erlang/OTP聊天室](https://chat.*.com/rooms/75358/erlang-otp)。 – zxq9

+0

@ zxq9好吧我已经更新了这个问题。它更有意义吗?我应该把它搬到Erlang房间吗?问题是我更喜欢Elixir学习者(开始学习,但知道大部分基础知识,包括GenServer和应用程序)。 –

+0

@ zxq9我想我的问题是,如果Erlang进程在永远不会接收的接收上阻塞,它将无法处理来自Phoenix的新的*传入*请求。或者我的错误是我应该在另一个过程中处理所述请求? –

您的系统是否不断连接到异步查询资源,或者您是否与每个查询建立新的连接?

每种情况在Erlang都有自己的天然模型。

的情况:单(或池)长期连接(s)表示,保持与资源(与数据库的连接会的工作方式)会话

长期连接在系统中最自然地被建模为过程,它们唯一的工作就是表示外部资源。

这一进程的要求是:

  • 翻译外部资源的信息到有意义的内部邮件(不只是路过垃圾通过 - 不要让原材料,外部数据侵入你的系统,除非它是完全不透明给你)
  • 跟踪超时的请求(这可能需要的东西有点像投票,但可以更精确地erlang:send_after/3

当然这也意味着,完成的,该模块impleme在这个过程中需要说出该资源的协议。但是如果这样做完成,那么实际上不需要像MQ应用程序那样的消息传递代理。

这使您可以让进程成为被动模式并阻止接收,而您的程序的其余部分停止执行任何操作。如果没有一些任意的轮询,肯定会让你进入邪恶的黑色计划问题沼泽。

的情况:每次查询

到资源的新连接。如果每个查询需要一个新的连接模式是相似的,但在这里,你每次查询产生新的进程,并它代表查询本身在您的系统内。它阻止等待响应(超时),并且没有其他事情对它有影响。

实际上,这是更简单的模型,因为您不必清理过去可能超时的请求永远不会返回的列表,也不必与通过发送的一组暂停超时消息进行交互erlang:send_after/3,并且您将您的抽象移近您的问题的实际模型

你不知道什么时候这些查询会返回,并且会导致一些潜在的混淆 - 因此将每个实际查询建模为有生命的事物是切断逻辑混乱的最佳方式。

无论哪种方式,模型自然的问题:作为一个并发的,非同步系统

在任何情况下,但是,你要真正做到轮询你在Python或C或任何会的方式。这是一个并发的问题,因此建模将为您提供更多的逻辑*度,并且更有可能产生正确的解决方案,从而导致出现奇怪的情况。

+0

我的情况是A.是的,我听说你没有通过垃圾邮件,而且我确认了,但我仍然没有得到回复。我认为问题的一部分是我首先使用阻塞的zeroMQ接收套接字。我需要做的是“消除并忘记”这个查询,并且“希望”能够反驳,如果我没有在一定的时间内得到答案,我就会放弃一条错误消息。这需要一个ZeroMQ pub-sub套接字设置,而不是我使用的代理路由器模式。 从那里我一定会去与“外部服务代表”的过程模式。 –

+1

@ThomasBrowne我不确定我是否会在这种情况下使用ZeroMQ - Erlang中的套接字处理非常简单,因此设置一个标记的消息以便将来发送以指示超时。如果我确实使用了ZeroMQ,我可能会在第二种情况下将系统内部建模,其中每个查询*都是您为了跟踪其状态而产生的一个进程,并且它自己接收一个或多个回复,但拥有它将其查询内容发送给ZeroMQ处理程序(或为其自身创建订阅),就好像ZeroMQ进程本身就是外部资源一样。 – zxq9