ZeroMQ:许多工人和一个主人的负载均衡
问题描述:
假设我有一个主进程将并行处理的数据分开。假设有1000个数据块和100个节点来运行计算。ZeroMQ:许多工人和一个主人的负载均衡
是否有某种方法可以使REQ/REP保持所有员工的工作繁忙?我试过在指南中使用负载平衡器模式,但使用单个客户端时,sock.recv()
将阻塞,直到它收到工作人员的响应。
以下是代码,稍微修改了zmq指南中的负载均衡器。启动一名客户,10名员工,以及中间的负载均衡器/经纪人。我怎样才能让所有那些同时工作的员工?
from __future__ import print_function
from multiprocessing import Process
import zmq
import time
import uuid
import random
def client_task():
"""Basic request-reply client using REQ socket."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = str(uuid.uuid4())
socket.connect("ipc://frontend.ipc")
# Send request, get reply
for i in range(100):
print("SENDING: ", i)
socket.send('WORK')
msg = socket.recv()
print(msg)
def worker_task():
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = str(uuid.uuid4())
socket.connect("ipc://backend.ipc")
# Tell broker we're ready for work
socket.send(b"READY")
while True:
address, empty, request = socket.recv_multipart()
time.sleep(random.randint(1, 4))
socket.send_multipart([address, b"", b"OK : " + str(socket.identity)])
def broker():
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("ipc://frontend.ipc")
backend = context.socket(zmq.ROUTER)
backend.bind("ipc://backend.ipc")
# Initialize main loop state
workers = []
poller = zmq.Poller()
# Only poll for requests from backend until workers are available
poller.register(backend, zmq.POLLIN)
while True:
sockets = dict(poller.poll())
if backend in sockets:
# Handle worker activity on the backend
request = backend.recv_multipart()
worker, empty, client = request[:3]
if not workers:
# Poll for clients now that a worker is available
poller.register(frontend, zmq.POLLIN)
workers.append(worker)
if client != b"READY" and len(request) > 3:
# If client reply, send rest back to frontend
empty, reply = request[3:]
frontend.send_multipart([client, b"", reply])
if frontend in sockets:
# Get next client request, route to last-used worker
client, empty, request = frontend.recv_multipart()
worker = workers.pop(0)
backend.send_multipart([worker, b"", client, b"", request])
if not workers:
# Don't poll clients if no workers are available
poller.unregister(frontend)
# Clean up
backend.close()
frontend.close()
context.term()
def main():
NUM_CLIENTS = 1
NUM_WORKERS = 10
# Start background tasks
def start(task, *args):
process = Process(target=task, args=args)
process.start()
start(broker)
for i in range(NUM_CLIENTS):
start(client_task)
for i in range(NUM_WORKERS):
start(worker_task)
# Process(target=broker).start()
if __name__ == "__main__":
main()
答
我想有不同的方法来做到这一点:
- 你可以,例如,使用threading
模块从单一的客户端启动所有请求,喜欢的东西:
result_list = [] # Add the result to a list for the example
rlock = threading.RLock()
def client_thread(client_url, request, i):
context = zmq.Context.instance()
socket = context.socket(zmq.REQ)
socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
socket.connect(client_url)
socket.send(request.encode())
reply = socket.recv()
with rlock:
result_list.append((i, reply))
return
def client_task():
# tasks = list with all your tasks
url_client = "ipc://frontend.ipc"
threads = []
for i in range(len(tasks)):
thread = threading.Thread(target=client_thread,
args=(url_client, tasks[i], i,))
thread.start()
threads.append(thread)
-you可以采取事件化文库的好处等asyncio
(有一个子模块zmq.asyncio和其他库aiozmq,最后一个提供了更高的抽象级别)。在这种情况下,您将按顺序将您的请求发送给工作人员,但不会阻止每个响应(并且不会阻止主循环忙碌),并在返回到主循环时获得结果。这可能是这样的:
import asyncio
import zmq.asyncio
async def client_async(request, context, i, client_url):
"""Basic client sending a request (REQ) to a ROUTER (the broker)"""
socket = context.socket(zmq.REQ)
socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
socket.connect(client_url)
await socket.send(request.encode())
reply = await socket.recv()
socket.close()
return reply
async def run(loop):
# tasks = list full of tasks
url_client = "ipc://frontend.ipc"
asyncio_tasks = []
ctx = zmq.asyncio.Context()
for i in range(len(tasks)):
task = asyncio.ensure_future(client_async(tasks[i], ctx, i, url_client))
asyncio_tasks.append(task)
responses = await asyncio.gather(*asyncio_tasks)
return responses
zmq.asyncio.install()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(run(loop))
我没有测试论文两个片段,但他们都从码来(与修改,以适应问题)我已经不是你的问题了类似的配置中使用ZMQ。
我最终在zmq指南中使用了“asynchronous”req/rep模式,最终解决了这个问题。中间代理的轮询比较理想,但对我的用例来说已经足够了 – reptilicus