From fe900087a2835aaea1c260c1ef9cb75fc8c35b5e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 18 Mar 2018 19:03:56 +0545 Subject: [PATCH] issue #144: service: working service.Pool object. It knows how to dispatch messages from multiple receivers (associated with multiple services) to multiple threads, where the service implementation is invoked on the message. It wakes a maximum of one thread per received message. It knows how to shut down gracefully. Implication: due to the latch use, there are 2 file descriptors burned for every thread. We don't need interruptibility here, so in future, it might be nice to allow swapping a diferent queueing primitive into Select (maybe a subclass?) just for this case. --- mitogen/service.py | 62 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/mitogen/service.py b/mitogen/service.py index f096e8dc..4d0fbaab 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -26,6 +26,8 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import sys +import threading import mitogen.core import mitogen.master @@ -41,28 +43,21 @@ class Service(object): def __init__(self, router): self.router = router self.recv = mitogen.core.Receiver(router, self.handle) + self.recv.service = self self.handle = self.recv.handle self.running = True def validate_args(self, args): return True - def run_once(self): - try: - msg = self.recv.get() - except mitogen.core.ChannelError, e: - # Channel closed due to broker shutdown, exit gracefully. - LOG.debug('%r: channel closed: %s', self, e) - self.running = False - return - + def dispatch_one(self, msg): if len(msg.data) > self.max_message_size: LOG.error('%r: larger than permitted size: %r', self, msg) msg.reply(mitogen.core.CallError('Message size exceeded')) return args = msg.unpickle(throw=False) - if ( args == mitogen.core._DEAD or + if (args == mitogen.core._DEAD or isinstance(args, mitogen.core.CallError) or not self.validate_args(args)): LOG.warning('Received junk message: %r', args) @@ -74,6 +69,17 @@ class Service(object): LOG.exception('While invoking %r.dispatch()', self) msg.reply(mitogen.core.CallError(e)) + def run_once(self): + try: + msg = self.recv.get() + except mitogen.core.ChannelError, e: + # Channel closed due to broker shutdown, exit gracefully. + LOG.debug('%r: channel closed: %s', self, e) + self.running = False + return + + self.dispatch_one(msg) + def run(self): while self.running: self.run_once() @@ -82,7 +88,41 @@ class Service(object): class Pool(object): def __init__(self, router, services, size=1): self.services = list(services) - self.select = mitogen.master.Select() + self.select = mitogen.master.Select( + receivers=[ + service.recv + for service in self.services + ], + oneshot=False, + ) + self.threads = [] + for x in xrange(size): + thread = threading.Thread( + name='mitogen.service.Pool.worker-%d' % (x,), + target=self._worker_main, + ) + thread.start() + self.threads.append(thread) + + def stop(self): + self.select.close() + for th in self.threads: + th.join() + + def _worker_main(self): + while True: + try: + msg = self.select.get() + except (mitogen.core.ChannelError, mitogen.core.LatchError): + e = sys.exc_info()[1] + LOG.error('%r: channel or latch closed, exitting: %s', self, e) + return + + service = msg.receiver.service + try: + service.dispatch_one(msg) + except Exception: + LOG.exception('While handling %r using %r', msg, service) def call(context, handle, obj):