diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 22702470..10490769 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -179,10 +179,6 @@ class MuxProcess(object): cls.worker_sock = None self = cls() self.worker_main() - # Test frameworks living somewhere higher on the stack of the - # original parent process may try to catch sys.exit(), so do a C - # level exit instead. - os._exit(0) def worker_main(self): """ @@ -193,10 +189,19 @@ class MuxProcess(object): self._setup_master() self._setup_services() - # Let the parent know our listening socket is ready. - mitogen.core.io_op(self.child_sock.send, b('1')) - # Block until the socket is closed, which happens on parent exit. - mitogen.core.io_op(self.child_sock.recv, 1) + try: + # Let the parent know our listening socket is ready. + mitogen.core.io_op(self.child_sock.send, b('1')) + # Block until the socket is closed, which happens on parent exit. + mitogen.core.io_op(self.child_sock.recv, 1) + finally: + self.broker.shutdown() + self.broker.join() + + # Test frameworks living somewhere higher on the stack of the + # original parent process may try to catch sys.exit(), so do a C + # level exit instead. + os._exit(0) def _enable_router_debug(self): if 'MITOGEN_ROUTER_DEBUG' in os.environ: @@ -236,10 +241,14 @@ class MuxProcess(object): """ Construct a Router, Broker, and mitogen.unix listener """ - self.router = mitogen.master.Router(max_message_size=4096 * 1048576) + self.broker = mitogen.master.Broker(install_watcher=False) + self.router = mitogen.master.Router( + broker=self.broker, + max_message_size=4096 * 1048576, + ) self._setup_responder(self.router.responder) - mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown) - mitogen.core.listen(self.router.broker, 'exit', self.on_broker_exit) + mitogen.core.listen(self.broker, 'shutdown', self.on_broker_shutdown) + mitogen.core.listen(self.broker, 'exit', self.on_broker_exit) self.listener = mitogen.unix.Listener( router=self.router, path=self.unix_listener_path, @@ -273,12 +282,6 @@ class MuxProcess(object): threads to exit gracefully. """ self.pool.stop(join=False) - try: - os.unlink(self.listener.path) - except OSError as e: - # Prevent a shutdown race with the parent process. - if e.args[0] != errno.ENOENT: - raise def on_broker_exit(self): """ diff --git a/mitogen/unix.py b/mitogen/unix.py index 7cd5edfe..57c0f84d 100644 --- a/mitogen/unix.py +++ b/mitogen/unix.py @@ -60,7 +60,7 @@ def is_path_dead(path): def make_socket_path(): - return tempfile.mktemp(prefix='mitogen_unix_') + return tempfile.mktemp(prefix='mitogen_unix_', suffix='.sock') class Listener(mitogen.core.BasicStream): @@ -88,8 +88,17 @@ class Listener(mitogen.core.BasicStream): self.receive_side = mitogen.core.Side(self, self._sock.fileno()) router.broker.start_receive(self) + def _unlink_socket(self): + try: + os.unlink(self.path) + except OSError as e: + # Prevent a shutdown race with the parent process. + if e.args[0] != errno.ENOENT: + raise + def on_shutdown(self, broker): broker.stop_receive(self) + self._unlink_socket() self._sock.close() self.receive_side.closed = True