pyzmq dealer not sending to the addressed worker
I'm having trouble trying to get a DEALER in pyzmq to send a message to my target worker thread. I've read the documentation and am pretty sure I have followed the example correctly.
In my code I have a ROUTER/DEALER setup. What I'm trying to achieve is to have multiple clients connect to the ROUTER and have them send requests that will go to a free worker thread and then have the worker thread reply once it finishes its task. I don't want a long running task to hold up other clients who might have a task that is quick to return.
In my code I'm addressing messages to free workers but the message ends up going to a busy worker.
# code to setup ROUTER/DEALER protocol self._frontend = self._context.socket(zmq.ROUTER) self._frontend.bind(self._url) url_worker = "inproc://workers" self._backend = self._context.socket(zmq.DEALER) self._backend.bind(url_worker) for i in range(self._worker_threads): id = 'w%d' % i thread = threading.Thread(target=self.handle_request, args=(url_worker, self._context, id)) thread.start() self.load_balance_requests([w.encode('ascii') for w in self._workers_ready]) # handles request for workers def handle_request(self, url_worker, context, id): self._logger.info('Connect to %s %s' % (url_worker, id)) socket = context.socket(zmq.REP) # used to identify worker socket.identity = id.encode('ascii') socket.connect(url_worker) with self._workers_lock: self._workers_ready.append(id) while True: try: # Wait for next request from client self._logger.info('Wait for message') address, empty, zmsg = socket.recv_multipart() self._logger.info('Request received') def load_balance_requests(self, workers): poller = zmq.Poller() poller.register(self._backend, zmq.POLLIN) poller.register(self._frontend, zmq.POLLIN) while True: sockets = dict(poller.poll()) if self._backend in sockets: # Handle worker activity on the backend request = self._backend.recv_multipart() worker, empty, client = request[:3] if not workers: # Poll for clients now that a worker is available poller.register(self._frontend, zmq.POLLIN) workers.append(worker) empty, reply = request[3:] self._logger.info('Sending to %s from %s' % (str(client), str(worker))) self._frontend.send_multipart([client, b"", reply]) if self._frontend in sockets: # Get next client request, route to last-used worker client, empty, request = self._frontend.recv_multipart() worker = workers.pop(0) self._logger.info('Sending to %s from %s' % (str(worker), str(client))) self._backend.send_multipart([worker, b"", client, b"", request]) if not workers: # Don't poll clients if no workers are available poller.unregister(self._frontend) # Clean up self._backend.close() self._frontend.close() self._context.term()
This is my log output. I have worker w0 on thread 31448, worker w1 on thread 14180 and worker w2 on thread 30932. The request sent to w0 is a long running task. The next 2 requests go to w1 and w2. So far so good. But then another request comes in that is sent to w1. However the request blocks and eventually shows up on thread 31448 which was worker w0. It eventually goes through but only after the first w0 task was finished.
Not sure what I'm doing wrong here.
2018-07-11 11:24:25,747 - myprogram INFO 31964 31448 - Connect to inproc://workers w0 2018-07-11 11:24:25,747 - myprogram INFO 31964 14180 - Connect to inproc://workers w1 2018-07-11 11:24:25,759 - myprogram INFO 31964 30932 - Connect to inproc://workers w2 2018-07-11 11:25:08,156 - myprogram INFO 31964 2036 - Sending to b'w0' from b'\x00\x80\x00\x00)' 2018-07-11 11:25:08,156 - myprogram INFO 31964 31448 - Request received 2018-07-11 11:25:12,801 - myprogram INFO 31964 2036 - Sending to b'w1' from b'\x00\x80\x00\x00*' 2018-07-11 11:25:12,801 - myprogram INFO 31964 14180 - Request to received 2018-07-11 11:25:13,635 - myprogram INFO 31964 2036 - Sending to b'\x00\x80\x00\x00*' from b'w1' 2018-07-11 11:25:15,346 - myprogram INFO 31964 2036 - Sending to b'w2' from b'\x00\x80\x00\x00+' 2018-07-11 11:25:15,351 - myprogram INFO 31964 30932 - Request received 2018-07-11 11:25:20,239 - myprogram INFO 31964 2036 - Sending to b'\x00\x80\x00\x00+' from b'w2' 2018-07-11 11:25:20,542 - myprogram INFO 31964 2036 - Sending to b'w1' from b'\x00\x80\x00\x00,' 2018-07-11 11:27:05,911 - myprogram INFO 31964 2036 - Sending to b'\x00\x80\x00\x00)' from b'w0' 2018-07-11 11:27:05,912 - myprogram INFO 31964 31448 - Request received 2018-07-11 11:27:06,468 - myprogram INFO 31964 2036 - Sending to b'\x00\x80\x00\x00,' from b'w1'