diff --git a/.gitignore b/.gitignore index 6092d04e..e244ca12 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .coverage .tox .venv +venvs/** **/.DS_Store *.pyc *.pyd diff --git a/.travis.yml b/.travis.yml index 5dfdae00..95d9c64c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,7 +14,8 @@ cache: - /home/travis/virtualenv install: -- pip install -r dev_requirements.txt +# |cat to disable progress bar. +- pip install -r dev_requirements.txt |cat script: - | diff --git a/.travis/ansible_tests.py b/.travis/ansible_tests.py index 3b5e40db..efaca9fe 100755 --- a/.travis/ansible_tests.py +++ b/.travis/ansible_tests.py @@ -34,6 +34,7 @@ with ci_lib.Fold('job_setup'): os.chdir(TESTS_DIR) os.chmod('../data/docker/mitogen__has_sudo_pubkey.key', int('0600', 7)) + run("pip install -qr requirements.txt") # tests/ansible/requirements # Don't set -U as that will upgrade Paramiko to a non-2.6 compatible version. run("pip install -q ansible==%s", ci_lib.ANSIBLE_VERSION) diff --git a/.travis/ci_lib.py b/.travis/ci_lib.py index eb130a14..e92564b6 100644 --- a/.travis/ci_lib.py +++ b/.travis/ci_lib.py @@ -32,6 +32,17 @@ def subprocess__check_output(*popenargs, **kwargs): if not hasattr(subprocess, 'check_output'): subprocess.check_output = subprocess__check_output + +# ----------------- + +# Force stdout FD 1 to be a pipe, so tools like pip don't spam progress bars. + +sys.stdout = os.popen('stdbuf -oL cat', 'w', 1) +os.dup2(sys.stdout.fileno(), 1) + +sys.stderr = sys.stdout +os.dup2(sys.stderr.fileno(), 2) + # ----------------- def _argv(s, *args): diff --git a/.travis/debops_common_tests.sh b/.travis/debops_common_tests.sh index 50e67ada..753d1c11 100755 --- a/.travis/debops_common_tests.sh +++ b/.travis/debops_common_tests.sh @@ -27,7 +27,7 @@ mkdir "$TMPDIR" echo travis_fold:start:job_setup -pip install -qqqU debops==0.7.2 ansible==${ANSIBLE_VERSION} +pip install -qqqU debops==0.7.2 ansible==${ANSIBLE_VERSION} |cat debops-init "$TMPDIR/project" cd "$TMPDIR/project" diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 708b6c13..f2725e9d 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -427,16 +427,30 @@ def config_from_hostvars(transport, inventory_name, connection, class CallChain(mitogen.parent.CallChain): + """ + Extend :class:`mitogen.parent.CallChain` to additionally cause the + associated :class:`Connection` to be reset if a ChannelError occurs. + + This only catches failures that occur while a call is pnding, it is a + stop-gap until a more general method is available to notice connection in + every situation. + """ call_aborted_msg = ( 'Mitogen was disconnected from the remote environment while a call ' 'was in-progress. If you feel this is in error, please file a bug. ' 'Original error was: %s' ) + def __init__(self, connection, context, pipelined=False): + super(CallChain, self).__init__(context, pipelined) + #: The connection to reset on CallError. + self._connection = connection + def _rethrow(self, recv): try: return recv.get().unpickle() except mitogen.core.ChannelError as e: + self._connection.reset() raise ansible.errors.AnsibleConnectionFailure( self.call_aborted_msg % (e,) ) @@ -550,7 +564,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): self.host_vars = task_vars['hostvars'] self.delegate_to_hostname = delegate_to_hostname self.loader_basedir = loader_basedir - self.close(new_task=True) + self._reset(mode='put') def get_task_var(self, key, default=None): if self._task_vars and key in self._task_vars: @@ -682,7 +696,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): raise ansible.errors.AnsibleConnectionFailure(dct['msg']) self.context = dct['context'] - self.chain = CallChain(self.context, pipelined=True) + self.chain = CallChain(self, self.context, pipelined=True) if self._play_context.become: self.login_context = dct['via'] else: @@ -709,6 +723,20 @@ class Connection(ansible.plugins.connection.ConnectionBase): self.get_chain().call_no_reply(os.mkdir, self._shell.tmpdir) return self._shell.tmpdir + def _reset_tmp_path(self): + """ + Called by _reset(); ask the remote context to delete any temporary + directory created for the action. CallChain is not used here to ensure + exception is logged by the context on failure, since the CallChain + itself is about to be destructed. + """ + if getattr(self._shell, 'tmpdir', None) is not None: + self.context.call_no_reply( + ansible_mitogen.target.prune_tree, + self._shell.tmpdir, + ) + self._shell.tmpdir = None + def _connect(self): """ Establish a connection to the master process's UNIX listener socket, @@ -727,38 +755,53 @@ class Connection(ansible.plugins.connection.ConnectionBase): stack = self._build_stack() self._connect_stack(stack) - def close(self, new_task=False): + def _reset(self, mode): """ - Arrange for the mitogen.master.Router running in the worker to - gracefully shut down, and wait for shutdown to complete. Safe to call - multiple times. + Forget everything we know about the connected context. + + :param str mode: + Name of ContextService method to use to discard the context, either + 'put' or 'reset'. """ - if getattr(self._shell, 'tmpdir', None) is not None: - # Avoid CallChain to ensure exception is logged on failure. - self.context.call_no_reply( - ansible_mitogen.target.prune_tree, - self._shell.tmpdir, - ) - self._shell.tmpdir = None + if not self.context: + return - if self.context: - self.chain.reset() - self.parent.call_service( - service_name='ansible_mitogen.services.ContextService', - method_name='put', - context=self.context - ) + self._reset_tmp_path() + self.chain.reset() + self.parent.call_service( + service_name='ansible_mitogen.services.ContextService', + method_name=mode, + context=self.context + ) self.context = None self.login_context = None self.init_child_result = None self.chain = None - if self.broker and not new_task: + + def close(self): + """ + Arrange for the mitogen.master.Router running in the worker to + gracefully shut down, and wait for shutdown to complete. Safe to call + multiple times. + """ + self._reset(mode='put') + if self.broker: self.broker.shutdown() self.broker.join() self.broker = None self.router = None + def reset(self): + """ + Explicitly terminate the connection to the remote host. This discards + any local state we hold for the connection, returns the Connection to + the 'disconnected' state, and informs ContextService the connection is + bad somehow, and should be shut down and discarded. + """ + self._connect() + self._reset(mode='reset') + def get_chain(self, use_login=False, use_fork=False): """ Return the :class:`mitogen.parent.CallChain` to use for executing diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 199f2116..dde44c89 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -89,6 +89,24 @@ def _get_candidate_temp_dirs(): return mitogen.utils.cast(dirs) +def key_from_dict(**kwargs): + """ + Return a unique string representation of a dict as quickly as possible. + Used to generated deduplication keys from a request. + """ + out = [] + stack = [kwargs] + while stack: + obj = stack.pop() + if isinstance(obj, dict): + stack.extend(sorted(obj.items())) + elif isinstance(obj, (list, tuple)): + stack.extend(obj) + else: + out.append(str(obj)) + return ''.join(out) + + class Error(Exception): pass @@ -113,7 +131,7 @@ class ContextService(mitogen.service.Service): super(ContextService, self).__init__(*args, **kwargs) self._lock = threading.Lock() #: Records the :meth:`get` result dict for successful calls, returned - #: for identical subsequent calls. Keyed by :meth:`key_from_kwargs`. + #: for identical subsequent calls. Keyed by :meth:`key_from_dict`. self._response_by_key = {} #: List of :class:`mitogen.core.Latch` awaiting the result for a #: particular key. @@ -126,8 +144,27 @@ class ContextService(mitogen.service.Service): #: :attr:`max_interpreters` is reached, the most recently used context #: is destroyed to make room for any additional context. self._lru_by_via = {} - #: :meth:`key_from_kwargs` result by Context. + #: :func:`key_from_dict` result by Context. self._key_by_context = {} + #: Mapping of Context -> parent Context + self._via_by_context = {} + + @mitogen.service.expose(mitogen.service.AllowParents()) + @mitogen.service.arg_spec({ + 'context': mitogen.core.Context + }) + def reset(self, context): + """ + Return a reference, forcing close and discard of the underlying + connection. Used for 'meta: reset_connection' or when some other error + is detected. + """ + LOG.debug('%r.reset(%r)', self, context) + self._lock.acquire() + try: + self._shutdown_unlocked(context) + finally: + self._lock.release() @mitogen.service.expose(mitogen.service.AllowParents()) @mitogen.service.arg_spec({ @@ -149,29 +186,13 @@ class ContextService(mitogen.service.Service): finally: self._lock.release() - def key_from_kwargs(self, **kwargs): - """ - Generate a deduplication key from the request. - """ - out = [] - stack = [kwargs] - while stack: - obj = stack.pop() - if isinstance(obj, dict): - stack.extend(sorted(obj.items())) - elif isinstance(obj, (list, tuple)): - stack.extend(obj) - else: - out.append(str(obj)) - return ''.join(out) - def _produce_response(self, key, response): """ Reply to every waiting request matching a configuration key with a response dictionary, deleting the list of waiters when done. :param str key: - Result of :meth:`key_from_kwargs` + Result of :meth:`key_from_dict` :param dict response: Response dictionary :returns: @@ -187,6 +208,19 @@ class ContextService(mitogen.service.Service): self._lock.release() return count + def _forget_context_unlocked(self, context): + key = self._key_by_context.get(context) + if key is None: + LOG.debug('%r: attempt to forget unknown %r', self, context) + return + + self._response_by_key.pop(key, None) + self._latches_by_key.pop(key, None) + self._key_by_context.pop(context, None) + self._refs_by_context.pop(context, None) + self._via_by_context.pop(context, None) + self._lru_by_via.pop(context, None) + def _shutdown_unlocked(self, context, lru=None, new_context=None): """ Arrange for `context` to be shut down, and optionally add `new_context` @@ -194,15 +228,15 @@ class ContextService(mitogen.service.Service): """ LOG.info('%r._shutdown_unlocked(): shutting down %r', self, context) context.shutdown() - - key = self._key_by_context[context] - del self._response_by_key[key] - del self._refs_by_context[context] - del self._key_by_context[context] - if lru and context in lru: - lru.remove(context) - if new_context: - lru.append(new_context) + via = self._via_by_context.get(context) + if via: + lru = self._lru_by_via.get(via) + if lru: + if context in lru: + lru.remove(context) + if new_context: + lru.append(new_context) + self._forget_context_unlocked(context) def _update_lru_unlocked(self, new_context, spec, via): """ @@ -223,6 +257,7 @@ class ContextService(mitogen.service.Service): 'but they are all marked as in-use.', via) return + self._via_by_context[new_context] = via self._shutdown_unlocked(context, lru=lru, new_context=new_context) def _update_lru(self, new_context, spec, via): @@ -241,7 +276,6 @@ class ContextService(mitogen.service.Service): try: for context in list(self._key_by_context): self._shutdown_unlocked(context) - self._lru_by_via = {} finally: self._lock.release() @@ -256,15 +290,12 @@ class ContextService(mitogen.service.Service): # in _latches_by_key below. self._lock.acquire() try: - for context, key in list(self._key_by_context.items()): - if context.context_id in stream.routes: + routes = self.router.route_monitor.get_routes(stream) + for context in list(self._key_by_context): + if context.context_id in routes: LOG.info('Dropping %r due to disconnect of %r', context, stream) - self._response_by_key.pop(key, None) - self._latches_by_key.pop(key, None) - self._refs_by_context.pop(context, None) - self._lru_by_via.pop(context, None) - self._refs_by_context.pop(context, None) + self._forget_context_unlocked(context) finally: self._lock.release() @@ -360,7 +391,7 @@ class ContextService(mitogen.service.Service): def _wait_or_start(self, spec, via=None): latch = mitogen.core.Latch() - key = self.key_from_kwargs(via=via, **spec) + key = key_from_dict(via=via, **spec) self._lock.acquire() try: response = self._response_by_key.get(key) diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index ff6ed083..0e74f960 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -241,8 +241,7 @@ def is_good_temp_dir(path): try: os.chmod(tmp.name, int('0700', 8)) except OSError as e: - LOG.debug('temp dir %r unusable: %s: chmod failed: %s', - path, e) + LOG.debug('temp dir %r unusable: chmod failed: %s', path, e) return False try: @@ -250,7 +249,7 @@ def is_good_temp_dir(path): if not os.access(tmp.name, os.X_OK): raise OSError('filesystem appears to be mounted noexec') except OSError as e: - LOG.debug('temp dir %r unusable: %s: %s', path, e) + LOG.debug('temp dir %r unusable: %s', path, e) return False finally: tmp.close() diff --git a/dev_requirements.txt b/dev_requirements.txt index 68f0422a..f48006e5 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,10 +1,11 @@ -r docs/docs-requirements.txt -ansible==2.6.1 coverage==4.5.1 Django==1.6.11 # Last version supporting 2.6. mock==2.0.0 pytz==2018.5 -paramiko==2.3.2 # Last 2.6-compat version. +cffi==1.11.2 # Random pin to try and fix pyparser==2.18 not having effect +pycparser==2.18 # Last version supporting 2.6. +faulthandler==3.1; python_version < '3.3' # used by testlib pytest-catchlog==1.2.2 pytest==3.1.2 PyYAML==3.11; python_version < '2.7' @@ -14,4 +15,3 @@ unittest2==1.1.0 # Fix InsecurePlatformWarning while creating py26 tox environment # https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings urllib3[secure]; python_version < '2.7.9' -google-api-python-client==1.6.5 diff --git a/docs/api.rst b/docs/api.rst index 72a6b4db..52d5dcec 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1227,6 +1227,14 @@ Broker Class thread, or immediately if the current thread is the broker thread. Safe to call from any thread. + .. method:: defer_sync (func) + + Arrange for `func()` to execute on the broker thread, blocking the + current thread until a result or exception is available. + + :returns: + Call result. + .. method:: start_receive (stream) Mark the :attr:`receive_side ` on `stream` as diff --git a/docs/changelog.rst b/docs/changelog.rst index 099b253b..0d7d9766 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -15,6 +15,59 @@ Release Notes +v0.2.4 (2018-??-??) +------------------ + +Mitogen for Ansible +~~~~~~~~~~~~~~~~~~~ + +Enhancements +^^^^^^^^^^^^ + +* `#369 `_: :meth:`Connection.reset` + is implemented, allowing `meta: reset_connection + `_ to shut + down the remote interpreter as expected, and improving support for the + `reboot + `_ + module. + + +Fixes +^^^^^ + +Core Library +~~~~~~~~~~~~ + +* `#76 `_: routing maintains the set + of destination context ID ever received on each stream, and when + disconnection occurs, propagates ``DEL_ROUTE`` messages downwards towards + every stream that ever communicated with a disappearing peer, rather than + simply toward parents. + + Conversations between nodes in any level of the connection tree should + correctly receive ``DEL_ROUTE`` messages when a participant disconnects, + allowing receivers to be woken with :class:`mitogen.core.ChannelError` to + signal the connection has broken, even when one participant is not a parent + of the other. + +* `#405 `_: if a message is rejected + due to being too large, and it has a ``reply_to`` set, a dead message is + returned to the sender. This ensures function calls exceeding the configured + maximum size crash rather than hang. + +* `#411 `_: the SSH method typed + "``y``" rather than the requisite "``yes``" when `check_host_keys="accept"` + was configured. This would lead to connection timeouts due to the hung + response. + +* `16ca111e `_: handle OpenSSH + 7.5 permission denied prompts when ``~/.ssh/config`` rewrites are present. + +* `9ec360c2 `_: a new + :meth:`mitogen.core.Broker.defer_sync` utility function is provided. + + v0.2.3 (2018-10-23) ------------------- diff --git a/docs/getting_started.rst b/docs/getting_started.rst index 1c92a559..20d1c2b3 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -246,7 +246,7 @@ Running User Functions ---------------------- So far we have used the interactive interpreter to call some standard library -functions, but if since source code typed at the interpreter cannot be +functions, but since the source code typed at the interpreter cannot be recovered, Mitogen is unable to execute functions defined in this way. We must therefore continue by writing our code as a script:: diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 2a4623eb..5e2c10f5 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -575,6 +575,28 @@ When ``sudo:node22a:webapp`` wants to send a message to .. image:: images/route.png +Disconnect Propagation +###################### + +To ensure timely shutdown when a failure occurs, where some context is awaiting +a response from another context that has become disconnected, +:class:`mitogen.core.Router` additionally records the destination context ID of +every message received on a particular stream. + +When ``DEL_ROUTE`` is generated locally or received on some other stream, +:class:`mitogen.parent.RouteMonitor` uses this to find every stream that ever +communicated with the route that is about to go away, and forwards the message +to each found. + +The recipient ``DEL_ROUTE`` handler in turn uses the message to find any +:class:`mitogen.core.Context` in the local process corresponding to the +disappearing route, and if found, fires a ``disconnected`` event on it. + +Any interested party, such as :class:`mitogen.core.Receiver`, may subscribe to +the event and use it to abort any threads that were asleep waiting for a reply +that will never arrive. + + .. _source-verification: Source Verification diff --git a/mitogen/core.py b/mitogen/core.py index dadf0924..642540a5 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1085,6 +1085,9 @@ class Stream(BasicStream): self._output_buf = collections.deque() self._input_buf_len = 0 self._output_buf_len = 0 + #: Routing records the dst_id of every message arriving from this + #: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID. + self.egress_ids = set() def construct(self): pass @@ -1279,7 +1282,7 @@ def _unpickle_context(router, context_id, name): (isinstance(name, UnicodeType) and len(name) < 100)) ): raise TypeError('cannot unpickle Context: bad input') - return router.context_class(router, context_id, name) + return router.context_by_id(context_id, name=name) class Poller(object): @@ -1716,10 +1719,27 @@ class Router(object): self._last_handle = itertools.count(1000) #: handle -> (persistent?, func(msg)) self._handle_map = {} + self.add_handler(self._on_del_route, DEL_ROUTE) def __repr__(self): return 'Router(%r)' % (self.broker,) + def _on_del_route(self, msg): + """ + Stub DEL_ROUTE handler; fires 'disconnect' events on the corresponding + member of :attr:`_context_by_id`. This handler is replaced by + :class:`mitogen.parent.RouteMonitor` in an upgraded context. + """ + LOG.error('%r._on_del_route() %r', self, msg) + if not msg.is_dead: + target_id_s, _, name = msg.data.partition(b(':')) + target_id = int(target_id_s, 10) + if target_id not in self._context_by_id: + LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg) + return + + fire(self._context_by_id[target_id], 'disconnect') + def on_stream_disconnect(self, stream): for context in self._context_by_id.values(): stream_ = self._stream_by_id.get(context.context_id) @@ -1732,6 +1752,15 @@ class Router(object): _, (_, func, _) = self._handle_map.popitem() func(Message.dead()) + def context_by_id(self, context_id, via_id=None, create=True, name=None): + context = self._context_by_id.get(context_id) + if create and not context: + context = self.context_class(self, context_id, name=name) + if via_id is not None: + context.via = self.context_by_id(via_id) + self._context_by_id[context_id] = context + return context + def register(self, context, stream): _v and LOG.debug('register(%r, %r)', context, stream) self._stream_by_id[context.context_id] = stream @@ -1803,11 +1832,17 @@ class Router(object): except Exception: LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) + def _maybe_send_dead(self, msg): + if msg.reply_to and not msg.is_dead: + msg.reply(Message.dead(), router=self) + def _async_route(self, msg, in_stream=None): _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) + if len(msg.data) > self.max_message_size: LOG.error('message too large (max %d bytes): %r', self.max_message_size, msg) + self._maybe_send_dead(msg) return # Perform source verification. @@ -1829,6 +1864,9 @@ class Router(object): if in_stream.auth_id is not None: msg.auth_id = in_stream.auth_id + # Maintain a set of IDs the source ever communicated with. + in_stream.egress_ids.add(msg.dst_id) + if msg.dst_id == mitogen.context_id: return self._invoke(msg, in_stream) @@ -1836,21 +1874,18 @@ class Router(object): if out_stream is None: out_stream = self._stream_by_id.get(mitogen.parent_id) - dead = False if out_stream is None: - LOG.error('%r: no route for %r, my ID is %r', - self, msg, mitogen.context_id) - dead = True + if msg.reply_to not in (0, IS_DEAD): + LOG.error('%r: no route for %r, my ID is %r', + self, msg, mitogen.context_id) + self._maybe_send_dead(msg) + return - if in_stream and self.unidirectional and not dead and \ - not (in_stream.is_privileged or out_stream.is_privileged): + if in_stream and self.unidirectional and not \ + (in_stream.is_privileged or out_stream.is_privileged): LOG.error('routing mode prevents forward of %r from %r -> %r', msg, in_stream, out_stream) - dead = True - - if dead: - if msg.reply_to and not msg.is_dead: - msg.reply(Message.dead(), router=self) + self._maybe_send_dead(msg) return out_stream._send(msg) @@ -1907,6 +1942,25 @@ class Broker(object): it = (side.keep_alive for (_, (side, _)) in self.poller.readers) return sum(it, 0) + def defer_sync(self, func): + """ + Block the calling thread while `func` runs on a broker thread. + + :returns: + Return value of `func()`. + """ + latch = Latch() + def wrapper(): + try: + latch.put(func()) + except Exception: + latch.put(sys.exc_info()[1]) + self.defer(wrapper) + res = latch.get() + if isinstance(res, Exception): + raise res + return res + def _call(self, stream, func): try: func(self) @@ -2067,11 +2121,6 @@ class ExternalContext(object): _v and LOG.debug('%r: parent stream is gone, dying.', self) self.broker.shutdown() - def _sync(self, func): - latch = Latch() - self.broker.defer(lambda: latch.put(func())) - return latch.get() - def detach(self): self.detached = True stream = self.router.stream_by_id(mitogen.parent_id) @@ -2080,7 +2129,7 @@ class ExternalContext(object): self.parent.send_await(Message(handle=DETACHING)) LOG.info('Detaching from %r; parent is %s', stream, self.parent) for x in range(20): - pending = self._sync(lambda: stream.pending_bytes()) + pending = self.broker.defer_sync(lambda: stream.pending_bytes()) if not pending: break time.sleep(0.05) diff --git a/mitogen/master.py b/mitogen/master.py index d4ee607a..73302910 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -335,6 +335,11 @@ class LogForwarder(object): class ModuleFinder(object): + """ + Given the name of a loaded module, make a best-effort attempt at finding + related modules likely needed by a child context requesting the original + module. + """ def __init__(self): #: Import machinery is expensive, keep :py:meth`:get_module_source` #: results around. @@ -477,7 +482,8 @@ class ModuleFinder(object): def resolve_relpath(self, fullname, level): """Given an ImportFrom AST node, guess the prefix that should be tacked on to an alias name to produce a canonical name. `fullname` is the name - of the module in which the ImportFrom appears.""" + of the module in which the ImportFrom appears. + """ mod = sys.modules.get(fullname, None) if hasattr(mod, '__path__'): fullname += '.__init__' @@ -499,7 +505,7 @@ class ModuleFinder(object): def find_related_imports(self, fullname): """ - Return a list of non-stdlb modules that are directly imported by + Return a list of non-stdlib modules that are directly imported by `fullname`, plus their parents. The list is determined by retrieving the source code of @@ -550,8 +556,8 @@ class ModuleFinder(object): Return a list of non-stdlib modules that are imported directly or indirectly by `fullname`, plus their parents. - This method is like :py:meth:`on_disconect`, but it also recursively - searches any modules which are imported by `fullname`. + This method is like :py:meth:`find_related_imports`, but also + recursively searches any modules which are imported by `fullname`. :param fullname: Fully qualified name of an _already imported_ module for which source code can be retrieved diff --git a/mitogen/parent.py b/mitogen/parent.py index a57ca20b..9e878e3f 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -69,6 +69,7 @@ from mitogen.core import IOLOG IS_WSL = 'Microsoft' in os.uname()[2] +itervalues = getattr(dict, 'itervalues', dict.values) if mitogen.core.PY3: xrange = range @@ -543,6 +544,7 @@ def _upgrade_broker(broker): len(old.readers), len(old.writers)) +@mitogen.core.takes_econtext def upgrade_router(econtext): if not isinstance(econtext.router, Router): # TODO econtext.broker.defer(_upgrade_broker, econtext.broker) @@ -911,9 +913,6 @@ class Stream(mitogen.core.Stream): def __init__(self, *args, **kwargs): super(Stream, self).__init__(*args, **kwargs) self.sent_modules = set(['mitogen', 'mitogen.core']) - #: List of contexts reachable via this stream; used to cleanup routes - #: during disconnection. - self.routes = set([self.remote_id]) def construct(self, max_message_size, remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, @@ -1428,45 +1427,75 @@ class RouteMonitor(object): persist=True, policy=is_immediate_child, ) + #: Mapping of Stream instance to integer context IDs reachable via the + #: stream; used to cleanup routes during disconnection. + self._routes_by_stream = {} - def propagate(self, handle, target_id, name=None): - # self.parent is None in the master. - if not self.parent: - return - + def _send_one(self, stream, handle, target_id, name): data = str(target_id) if name: data = '%s:%s' % (target_id, mitogen.core.b(name)) - self.parent.send( + stream.send( mitogen.core.Message( handle=handle, data=data.encode('utf-8'), + dst_id=stream.remote_id, ) ) + def _propagate(self, handle, target_id, name=None): + if not self.parent: + # self.parent is None in the master. + return + + stream = self.router.stream_by_id(self.parent.context_id) + self._send_one(stream, handle, target_id, name) + + def _child_propagate(self, handle, target_id): + """ + For DEL_ROUTE, we additionally want to broadcast the message to any + stream that has ever communicated with the disconnecting ID, so + core.py's :meth:`mitogen.core.Router._on_del_route` can turn the + message into a disconnect event. + """ + for stream in itervalues(self.router._stream_by_id): + if target_id in stream.egress_ids: + self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None) + def notice_stream(self, stream): """ When this parent is responsible for a new directly connected child stream, we're also responsible for broadcasting DEL_ROUTE upstream if/when that child disconnects. """ - self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id, - stream.name) + self._routes_by_stream[stream] = set([stream.remote_id]) + self._propagate(mitogen.core.ADD_ROUTE, stream.remote_id, + stream.name) mitogen.core.listen( obj=stream, name='disconnect', func=lambda: self._on_stream_disconnect(stream), ) + def get_routes(self, stream): + """ + Return the set of context IDs reachable on a stream. + + :param mitogen.core.Stream stream: + :returns: set([int]) + """ + return self._routes_by_stream.get(stream) or set() + def _on_stream_disconnect(self, stream): """ Respond to disconnection of a local stream by """ - LOG.debug('%r is gone; propagating DEL_ROUTE for %r', - stream, stream.routes) - for target_id in stream.routes: + routes = self._routes_by_stream.pop(stream) + LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, routes) + for target_id in routes: self.router.del_route(target_id) - self.propagate(mitogen.core.DEL_ROUTE, target_id) + self._propagate(mitogen.core.DEL_ROUTE, target_id) + self._child_propagate(mitogen.core.DEL_ROUTE, target_id) context = self.router.context_by_id(target_id, create=False) if context: @@ -1489,9 +1518,9 @@ class RouteMonitor(object): return LOG.debug('Adding route to %d via %r', target_id, stream) - stream.routes.add(target_id) + self._routes_by_stream[stream].add(target_id) self.router.add_route(target_id, stream) - self.propagate(mitogen.core.ADD_ROUTE, target_id, target_name) + self._propagate(mitogen.core.ADD_ROUTE, target_id, target_name) def _on_del_route(self, msg): if msg.is_dead: @@ -1505,14 +1534,21 @@ class RouteMonitor(object): target_id, stream, registered_stream) return - LOG.debug('Deleting route to %d via %r', target_id, stream) - stream.routes.discard(target_id) - self.router.del_route(target_id) - self.propagate(mitogen.core.DEL_ROUTE, target_id) context = self.router.context_by_id(target_id, create=False) if context: + LOG.debug('%r: Firing local disconnect for %r', self, context) mitogen.core.fire(context, 'disconnect') + LOG.debug('Deleting route to %d via %r', target_id, stream) + routes = self._routes_by_stream.get(stream) + if routes: + routes.discard(target_id) + + self.router.del_route(target_id) + if stream.remote_id != mitogen.parent_id: + self._propagate(mitogen.core.DEL_ROUTE, target_id) + self._child_propagate(mitogen.core.DEL_ROUTE, target_id) + class Router(mitogen.core.Router): context_class = Context @@ -1562,11 +1598,12 @@ class Router(mitogen.core.Router): def del_route(self, target_id): LOG.debug('%r.del_route(%r)', self, target_id) - try: - del self._stream_by_id[target_id] - except KeyError: - LOG.error('%r: cant delete route to %r: no such stream', - self, target_id) + # DEL_ROUTE may be sent by a parent if it knows this context sent + # messages to a peer that has now disconnected, to let us raise + # 'disconnect' event on the appropriate Context instance. In that case, + # we won't a matching _stream_by_id entry for the disappearing route, + # so don't raise an error for a missing key here. + self._stream_by_id.pop(target_id, None) def get_module_blacklist(self): if mitogen.context_id == 0: @@ -1581,15 +1618,6 @@ class Router(mitogen.core.Router): def allocate_id(self): return self.id_allocator.allocate() - def context_by_id(self, context_id, via_id=None, create=True): - context = self._context_by_id.get(context_id) - if create and not context: - context = self.context_class(self, context_id) - if via_id is not None: - context.via = self.context_by_id(via_id) - self._context_by_id[context_id] = context - return context - connection_timeout_msg = u"Connection timed out." def _connect(self, klass, name=None, **kwargs): diff --git a/mitogen/ssh.py b/mitogen/ssh.py index ee97425b..fba6e8f2 100644 --- a/mitogen/ssh.py +++ b/mitogen/ssh.py @@ -31,6 +31,7 @@ Functionality to allow establishing new slave contexts over an SSH connection. """ import logging +import re import time try: @@ -46,10 +47,16 @@ LOG = logging.getLogger('mitogen') # sshpass uses 'assword' because it doesn't lowercase the input. PASSWORD_PROMPT = b('password') -PERMDENIED_PROMPT = b('permission denied') HOSTKEY_REQ_PROMPT = b('are you sure you want to continue connecting (yes/no)?') HOSTKEY_FAIL = b('host key verification failed.') +# [user@host: ] permission denied +PERMDENIED_RE = re.compile( + ('(?:[^@]+@[^:]+: )?' # Absent in OpenSSH <7.5 + 'Permission denied').encode(), + re.I +) + DEBUG_PREFIXES = (b('debug1:'), b('debug2:'), b('debug3:')) @@ -258,7 +265,7 @@ class Stream(mitogen.parent.Stream): def _host_key_prompt(self): if self.check_host_keys == 'accept': LOG.debug('%r: accepting host key', self) - self.tty_stream.transmit_side.write(b('y\n')) + self.tty_stream.transmit_side.write(b('yes\n')) return # _host_key_prompt() should never be reached with ignore or enforce @@ -289,11 +296,7 @@ class Stream(mitogen.parent.Stream): self._host_key_prompt() elif HOSTKEY_FAIL in buf.lower(): raise HostKeyError(self.hostkey_failed_msg) - elif buf.lower().startswith(( - PERMDENIED_PROMPT, - b("%s@%s: " % (self.username, self.hostname)) - + PERMDENIED_PROMPT, - )): + elif PERMDENIED_RE.match(buf): # issue #271: work around conflict with user shell reporting # 'permission denied' e.g. during chdir($HOME) by only matching # it at the start of the line. diff --git a/mitogen/unix.py b/mitogen/unix.py index 4a4dfb65..417842bc 100644 --- a/mitogen/unix.py +++ b/mitogen/unix.py @@ -52,7 +52,7 @@ def is_path_dead(path): s.connect(path) except socket.error: e = sys.exc_info()[1] - return e[0] in (errno.ECONNREFUSED, errno.ENOENT) + return e.args[0] in (errno.ECONNREFUSED, errno.ENOENT) return False @@ -82,7 +82,7 @@ class Listener(mitogen.core.BasicStream): sock.setblocking(True) try: pid, = struct.unpack('>L', sock.recv(4)) - except socket.error: + except (struct.error, socket.error): LOG.error('%r: failed to read remote identity: %s', self, sys.exc_info()[1]) return diff --git a/tests/ansible/ara_env.py b/tests/ansible/ara_env.py new file mode 100755 index 00000000..ab2b726e --- /dev/null +++ b/tests/ansible/ara_env.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +""" +Print shell environment exports adding ARA plugins to the list of plugins +from ansible.cfg in the CWD. +""" + +import os + +import ara.setup +import ansible.constants as C + +os.chdir(os.path.dirname(__file__)) + +print('export ANSIBLE_ACTION_PLUGINS=%s:%s' % ( + ':'.join(C.DEFAULT_ACTION_PLUGIN_PATH), + ara.setup.action_plugins, +)) + +print('export ANSIBLE_CALLBACK_PLUGINS=%s:%s' % ( + ':'.join(C.DEFAULT_CALLBACK_PLUGIN_PATH), + ara.setup.callback_plugins, +)) + +print('export ANSIBLE_LIBRARY=%s:%s' % ( + ':'.join(C.DEFAULT_MODULE_PATH), + ara.setup.library, +)) diff --git a/tests/ansible/bench/loop-100-copies.yml b/tests/ansible/bench/loop-100-copies.yml new file mode 100644 index 00000000..231bf4a1 --- /dev/null +++ b/tests/ansible/bench/loop-100-copies.yml @@ -0,0 +1,25 @@ + +- hosts: all + any_errors_fatal: true + tasks: + + - name: Create file tree + connection: local + shell: > + mkdir -p /tmp/filetree.in; + for i in `seq -f /tmp/filetree.in/%g 1 100`; do echo $RANDOM > $i; done; + + - name: Delete remote file tree + file: path=/tmp/filetree.out state=absent + when: 0 + + - file: + state: directory + path: /tmp/filetree.out + + - name: Trigger nasty process pileup + copy: + src: "{{item.src}}" + dest: "/tmp/filetree.out/{{item.path}}" + with_filetree: /tmp/filetree.in + when: item.state == 'file' diff --git a/tests/ansible/hosts/z b/tests/ansible/hosts/z new file mode 100644 index 00000000..61d27940 --- /dev/null +++ b/tests/ansible/hosts/z @@ -0,0 +1,25 @@ +z + +[z-x10] +z-[01:10] + +[z-x20] +z-[01:20] + +[z-x50] +z-[01:50] + +[z-x100] +z-[001:100] + +[z-x200] +z-[001:200] + +[z-x300] +z-[001:300] + +[z-x400] +z-[001:400] + +[z-x500] +z-[001:500] diff --git a/tests/ansible/integration/connection/all.yml b/tests/ansible/integration/connection/all.yml index 123e11c4..9c5a2837 100644 --- a/tests/ansible/integration/connection/all.yml +++ b/tests/ansible/integration/connection/all.yml @@ -1,5 +1,8 @@ --- +- import_playbook: disconnect_during_module.yml +- import_playbook: disconnect_resets_connection.yml - import_playbook: exec_command.yml -- import_playbook: put_small_file.yml - import_playbook: put_large_file.yml +- import_playbook: put_small_file.yml +- import_playbook: reset.yml diff --git a/tests/ansible/integration/connection/disconnect_during_module.yml b/tests/ansible/integration/connection/disconnect_during_module.yml new file mode 100644 index 00000000..f2943b44 --- /dev/null +++ b/tests/ansible/integration/connection/disconnect_during_module.yml @@ -0,0 +1,19 @@ +# issue 352: test ability to notice disconnection during a module invocation. +--- + +- name: integration/connection/disconnect_during_module.yml + hosts: test-targets localhost + gather_facts: no + any_errors_fatal: false + tasks: + - run_once: true # don't run against localhost + shell: | + kill -9 $PPID + register: out + ignore_errors: true + + - assert: + that: + - out.msg.startswith('Mitogen was disconnected from the remote environment while a call was in-progress.') + + - meta: clear_host_errors diff --git a/tests/ansible/integration/connection/disconnect_resets_connection.yml b/tests/ansible/integration/connection/disconnect_resets_connection.yml new file mode 100644 index 00000000..9e186182 --- /dev/null +++ b/tests/ansible/integration/connection/disconnect_resets_connection.yml @@ -0,0 +1,44 @@ +# issue 370: Connection should reset to 'disconnected' state when disconnect +# detected +# +# Previously the 'Mitogen was disconnected' error would fail the first task, +# but the Connection instance would still think it still had a valid +# connection. +# +# See also disconnect_during_module.yml + +--- + +- name: integration/connection/disconnect_resets_connection.yml + hosts: test-targets + gather_facts: no + any_errors_fatal: true + tasks: + - mitogen_action_script: + script: | + import sys + from ansible.errors import AnsibleConnectionFailure + + assert not self._connection.connected, \ + "Connection was not initially disconnected." + + self._low_level_execute_command('echo') + assert self._connection.connected, \ + "Connection was not connected after good command." + + try: + self._low_level_execute_command('kill -9 $PPID') + assert 0, 'AnsibleConnectionFailure was not raised' + except AnsibleConnectionFailure: + e = sys.exc_info()[1] + assert str(e).startswith('Mitogen was disconnected') + + assert not self._connection.connected, \ + "Connection did not reset." + + try: + self._low_level_execute_command('kill -9 $PPID') + assert 0, 'AnsibleConnectionFailure was not raised' + except AnsibleConnectionFailure: + e = sys.exc_info()[1] + assert str(e).startswith('Mitogen was disconnected') diff --git a/tests/ansible/integration/connection/reset.yml b/tests/ansible/integration/connection/reset.yml new file mode 100644 index 00000000..56e901b7 --- /dev/null +++ b/tests/ansible/integration/connection/reset.yml @@ -0,0 +1,38 @@ +# issue #369: Connection.reset() should cause destruction of the remote +# interpreter and any children. + +--- + +- name: integration/connection/reset.yml + hosts: test-targets + tasks: + - when: is_mitogen + block: + - custom_python_detect_environment: + register: out + + - custom_python_detect_environment: + become: true + register: out_become + + - meta: reset_connection + + - custom_python_detect_environment: + register: out2 + + - custom_python_detect_environment: + register: out_become2 + + - assert: + that: + # Interpreter PID has changed. + - out.pid != out2.pid + + # SSH PID has changed. + - out.ppid != out2.ppid + + # Interpreter PID has changed. + - out_become.pid != out_become2.pid + + # sudo PID has changed. + - out_become.ppid != out_become2.ppid diff --git a/tests/ansible/lib/action/mitogen_action_script.py b/tests/ansible/lib/action/mitogen_action_script.py new file mode 100644 index 00000000..e034345c --- /dev/null +++ b/tests/ansible/lib/action/mitogen_action_script.py @@ -0,0 +1,28 @@ +# I am an Ansible action plug-in. I run the script provided in the parameter in +# the context of the action. + +import sys + +from ansible.plugins.action import ActionBase + + +def execute(s, gbls, lcls): + if sys.version_info > (3,): + exec(s, gbls, lcls) + else: + exec('exec s in gbls, lcls') + + +class ActionModule(ActionBase): + def run(self, tmp=None, task_vars=None): + super(ActionModule, self).run(tmp=tmp, task_vars=task_vars) + lcls = { + 'self': self, + 'result': {} + } + execute(self._task.args['script'], globals(), lcls) + return lcls['result'] + + +if __name__ == '__main__': + main() diff --git a/tests/ansible/requirements.txt b/tests/ansible/requirements.txt new file mode 100644 index 00000000..fdabb0f6 --- /dev/null +++ b/tests/ansible/requirements.txt @@ -0,0 +1,2 @@ +paramiko==2.3.2 # Last 2.6-compat version. +google-api-python-client==1.6.5 diff --git a/tests/ansible/tests/__init__.py b/tests/ansible/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/ansible/tests/target_test.py b/tests/ansible/tests/target_test.py index e3d59433..7d6c0b46 100644 --- a/tests/ansible/tests/target_test.py +++ b/tests/ansible/tests/target_test.py @@ -28,10 +28,10 @@ class ApplyModeSpecTest(unittest2.TestCase): def test_simple(self): spec = 'u+rwx,go=x' - self.assertEquals(0711, self.func(spec, 0)) + self.assertEquals(int('0711', 8), self.func(spec, 0)) spec = 'g-rw' - self.assertEquals(0717, self.func(spec, 0777)) + self.assertEquals(int('0717', 8), self.func(spec, int('0777', 8))) class IsGoodTempDirTest(unittest2.TestCase): diff --git a/tests/bench/README.md b/tests/bench/README.md new file mode 100644 index 00000000..0ef27df3 --- /dev/null +++ b/tests/bench/README.md @@ -0,0 +1,5 @@ + +# tests/bench/ + +Various manually executed scripts to aid benchmarking, or trigger old +performance problems. diff --git a/tests/bench/large_messages.py b/tests/bench/large_messages.py new file mode 100644 index 00000000..24220023 --- /dev/null +++ b/tests/bench/large_messages.py @@ -0,0 +1,28 @@ + +# Verify _receive_one() quadratic behaviour fixed. + +import subprocess +import time +import socket +import mitogen + + +@mitogen.main() +def main(router): + c = router.fork() + + n = 1048576 * 127 + s = ' ' * n + print('bytes in %.2fMiB string...' % (n/1048576.0),) + + t0 = time.time() + for x in range(10): + tt0 = time.time() + assert n == c.call(len, s) + print('took %dms' % (1000 * (time.time() - tt0),)) + t1 = time.time() + print('total %dms / %dms avg / %.2fMiB/sec' % ( + 1000 * (t1 - t0), + (1000 * (t1 - t0)) / (x + 1), + ((n * (x + 1)) / (t1 - t0)) / 1048576.0, + )) diff --git a/tests/bench/roundtrip.py b/tests/bench/roundtrip.py index 13b9413d..7c5a9252 100644 --- a/tests/bench/roundtrip.py +++ b/tests/bench/roundtrip.py @@ -12,6 +12,6 @@ def do_nothing(): def main(router): f = router.fork() t0 = time.time() - for x in xrange(10000): + for x in range(1000): f.call(do_nothing) print '++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec' diff --git a/tests/bench/service.py b/tests/bench/service.py new file mode 100644 index 00000000..6d866b5c --- /dev/null +++ b/tests/bench/service.py @@ -0,0 +1,23 @@ +""" +Measure latency of local service RPC. +""" + +import time + +import mitogen.service +import mitogen + + +class MyService(mitogen.service.Service): + @mitogen.service.expose(policy=mitogen.service.AllowParents()) + def ping(self): + return 'pong' + + +@mitogen.main() +def main(router): + f = router.fork() + t0 = time.time() + for x in range(1000): + f.call_service(service_name=MyService, method_name='ping') + print('++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec') diff --git a/tests/broker_test.py b/tests/broker_test.py new file mode 100644 index 00000000..7d070e3d --- /dev/null +++ b/tests/broker_test.py @@ -0,0 +1,32 @@ + +import threading + +import unittest2 + +import testlib + +import mitogen.core + + +class DeferSyncTest(testlib.TestCase): + klass = mitogen.core.Broker + + def test_okay(self): + broker = self.klass() + try: + th = broker.defer_sync(lambda: threading.currentThread()) + self.assertEquals(th, broker._thread) + finally: + broker.shutdown() + + def test_exception(self): + broker = self.klass() + try: + self.assertRaises(ValueError, + broker.defer_sync, lambda: int('dave')) + finally: + broker.shutdown() + + +if __name__ == '__main__': + unittest2.main() diff --git a/tests/call_function_test.py b/tests/call_function_test.py index dc9a2298..72991d62 100644 --- a/tests/call_function_test.py +++ b/tests/call_function_test.py @@ -103,7 +103,8 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase): def test_accepts_returns_context(self): context = self.local.call(func_returns_arg, self.local) - self.assertIsNot(context, self.local) + # Unpickling now deduplicates Context instances. + self.assertIs(context, self.local) self.assertEqual(context.context_id, self.local.context_id) self.assertEqual(context.name, self.local.name) @@ -119,12 +120,12 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase): lambda: recv.get().unpickle()) -class ChainTest(testlib.RouterMixin, testlib.TestCase): +class CallChainTest(testlib.RouterMixin, testlib.TestCase): # Verify mitogen_chain functionality. klass = mitogen.parent.CallChain def setUp(self): - super(ChainTest, self).setUp() + super(CallChainTest, self).setUp() self.local = self.router.fork() def test_subsequent_calls_produce_same_error(self): diff --git a/tests/data/main_with_no_exec_guard.py b/tests/data/main_with_no_exec_guard.py new file mode 100644 index 00000000..153e4743 --- /dev/null +++ b/tests/data/main_with_no_exec_guard.py @@ -0,0 +1,12 @@ + +import logging +import mitogen.master + +def foo(): + pass + +logging.basicConfig(level=logging.INFO) +router = mitogen.master.Router() + +l = router.local() +l.call(foo) diff --git a/tests/data/stubs/README.md b/tests/data/stubs/README.md new file mode 100644 index 00000000..02de6456 --- /dev/null +++ b/tests/data/stubs/README.md @@ -0,0 +1,5 @@ + +# tests/data/stubs/ + +Dummy implementations of various third party tools that just spawn local Python +interpreters. Used to roughly test the tools' associated Mitogen classes. diff --git a/tests/data/stubs/docker.py b/tests/data/stubs/docker.py new file mode 100755 index 00000000..341cc818 --- /dev/null +++ b/tests/data/stubs/docker.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python + +import sys +import os + +os.environ['ORIGINAL_ARGV'] = repr(sys.argv) +os.execv(sys.executable, sys.argv[sys.argv.index('-c') - 1:]) diff --git a/tests/data/fake_lxc.py b/tests/data/stubs/lxc-attach.py similarity index 100% rename from tests/data/fake_lxc.py rename to tests/data/stubs/lxc-attach.py diff --git a/tests/data/fake_lxc_attach.py b/tests/data/stubs/lxc.py similarity index 100% rename from tests/data/fake_lxc_attach.py rename to tests/data/stubs/lxc.py diff --git a/tests/data/fakessh.py b/tests/data/stubs/ssh.py similarity index 78% rename from tests/data/fakessh.py rename to tests/data/stubs/ssh.py index 8df5aa39..80c02835 100755 --- a/tests/data/fakessh.py +++ b/tests/data/stubs/ssh.py @@ -15,6 +15,9 @@ Are you sure you want to continue connecting (yes/no)? HOST_KEY_STRICT_MSG = """Host key verification failed.\n""" +PERMDENIED_CLASSIC_MSG = 'Permission denied (publickey,password)\n' +PERMDENIED_75_MSG = 'chicken@nandos.com: permission denied (publickey,password)\n' + def tty(msg): fp = open('/dev/tty', 'wb', 0) @@ -37,13 +40,23 @@ def confirm(msg): fp.close() -if os.getenv('FAKESSH_MODE') == 'ask': - assert 'y\n' == confirm(HOST_KEY_ASK_MSG) +mode = os.getenv('STUBSSH_MODE') + +if mode == 'ask': + assert 'yes\n' == confirm(HOST_KEY_ASK_MSG) -if os.getenv('FAKESSH_MODE') == 'strict': +elif mode == 'strict': stderr(HOST_KEY_STRICT_MSG) sys.exit(255) +elif mode == 'permdenied_classic': + stderr(PERMDENIED_CLASSIC_MSG) + sys.exit(255) + +elif mode == 'permdenied_75': + stderr(PERMDENIED_75_MSG) + sys.exit(255) + # # Set an env var if stderr was a TTY to make ssh_test tests easier to write. diff --git a/tests/docker_test.py b/tests/docker_test.py new file mode 100644 index 00000000..33ead10c --- /dev/null +++ b/tests/docker_test.py @@ -0,0 +1,28 @@ +import os + +import mitogen + +import unittest2 + +import testlib + + +class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): + def test_okay(self): + docker_path = testlib.data_path('stubs/docker.py') + context = self.router.docker( + container='container_name', + docker_path=docker_path, + ) + stream = self.router.stream_by_id(context.context_id) + + argv = eval(context.call(os.getenv, 'ORIGINAL_ARGV')) + self.assertEquals(argv[0], docker_path) + self.assertEquals(argv[1], 'exec') + self.assertEquals(argv[2], '--interactive') + self.assertEquals(argv[3], 'container_name') + self.assertEquals(argv[4], stream.python_path) + + +if __name__ == '__main__': + unittest2.main() diff --git a/tests/lxc_test.py b/tests/lxc_test.py index a30cd186..3168aad2 100644 --- a/tests/lxc_test.py +++ b/tests/lxc_test.py @@ -11,9 +11,9 @@ def has_subseq(seq, subseq): return any(seq[x:x+len(subseq)] == subseq for x in range(0, len(seq))) -class FakeLxcAttachTest(testlib.RouterMixin, unittest2.TestCase): +class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): def test_okay(self): - lxc_attach_path = testlib.data_path('fake_lxc_attach.py') + lxc_attach_path = testlib.data_path('stubs/lxc-attach.py') context = self.router.lxc( container='container_name', lxc_attach_path=lxc_attach_path, diff --git a/tests/lxd_test.py b/tests/lxd_test.py index 9c2397a2..41e9df15 100644 --- a/tests/lxd_test.py +++ b/tests/lxd_test.py @@ -7,9 +7,9 @@ import unittest2 import testlib -class FakeLxcTest(testlib.RouterMixin, unittest2.TestCase): +class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): def test_okay(self): - lxc_path = testlib.data_path('fake_lxc.py') + lxc_path = testlib.data_path('stubs/lxc.py') context = self.router.lxd( container='container_name', lxc_path=lxc_path, diff --git a/tests/parent_test.py b/tests/parent_test.py index c9ccaf3f..aaf335b8 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -30,6 +30,11 @@ def wait_for_child(pid, timeout=1.0): assert False, "wait_for_child() timed out" +@mitogen.core.takes_econtext +def call_func_in_sibling(ctx, econtext): + ctx.call(time.sleep, 99999) + + class GetDefaultRemoteNameTest(testlib.TestCase): func = staticmethod(mitogen.parent.get_default_remote_name) @@ -74,7 +79,7 @@ class WstatusToStrTest(testlib.TestCase): (pid, status), _ = mitogen.core.io_op(os.waitpid, pid, 0) self.assertEquals( self.func(status), - 'exited due to signal %s (SIGKILL)' % (signal.SIGKILL,) + 'exited due to signal %s (SIGKILL)' % (int(signal.SIGKILL),) ) # can't test SIGSTOP without POSIX sessions rabbithole @@ -298,5 +303,78 @@ class WriteAllTest(unittest2.TestCase): proc.terminate() +class DisconnectTest(testlib.RouterMixin, testlib.TestCase): + def test_child_disconnected(self): + # Easy mode: process notices its own directly connected child is + # disconnected. + c1 = self.router.fork() + recv = c1.call_async(time.sleep, 9999) + c1.shutdown(wait=True) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get()) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) + + def test_indirect_child_disconnected(self): + # Achievement unlocked: process notices an indirectly connected child + # is disconnected. + c1 = self.router.fork() + c2 = self.router.fork(via=c1) + recv = c2.call_async(time.sleep, 9999) + c2.shutdown(wait=True) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get()) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) + + def test_indirect_child_intermediary_disconnected(self): + # Battlefield promotion: process notices indirect child disconnected + # due to an intermediary child disconnecting. + c1 = self.router.fork() + c2 = self.router.fork(via=c1) + recv = c2.call_async(time.sleep, 9999) + c1.shutdown(wait=True) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get()) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) + + def test_near_sibling_disconnected(self): + # Hard mode: child notices sibling connected to same parent has + # disconnected. + c1 = self.router.fork() + c2 = self.router.fork() + + # Let c1 call functions in c2. + self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id + c1.call(mitogen.parent.upgrade_router) + + recv = c1.call_async(call_func_in_sibling, c2) + c2.shutdown(wait=True) + e = self.assertRaises(mitogen.core.CallError, + lambda: recv.get().unpickle()) + self.assertTrue(e.args[0].startswith( + 'mitogen.core.ChannelError: Channel closed by local end.' + )) + + def test_far_sibling_disconnected(self): + # God mode: child of child notices child of child of parent has + # disconnected. + c1 = self.router.fork() + c11 = self.router.fork(via=c1) + + c2 = self.router.fork() + c22 = self.router.fork(via=c2) + + # Let c1 call functions in c2. + self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id + c11.call(mitogen.parent.upgrade_router) + + recv = c11.call_async(call_func_in_sibling, c22) + c22.shutdown(wait=True) + e = self.assertRaises(mitogen.core.CallError, + lambda: recv.get().unpickle()) + self.assertTrue(e.args[0].startswith( + 'mitogen.core.ChannelError: Channel closed by local end.' + )) + + if __name__ == '__main__': unittest2.main() diff --git a/tests/router_test.py b/tests/router_test.py index 68474e00..7b7e2896 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -1,6 +1,7 @@ import logging import subprocess import time +import zlib import unittest2 @@ -189,20 +190,35 @@ class AddHandlerTest(unittest2.TestCase): self.assertTrue(queue.get(timeout=5).is_dead) -class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase): +class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase): klass = mitogen.master.Router def test_local_exceeded(self): router = self.klass(broker=self.broker, max_message_size=4096) - recv = mitogen.core.Receiver(router) logs = testlib.LogCapturer() logs.start() - sem = mitogen.core.Latch() + # Send message and block for one IO loop, so _async_route can run. router.route(mitogen.core.Message.pickled(' '*8192)) - router.broker.defer(sem.put, ' ') # wlil always run after _async_route - sem.get() + router.broker.defer_sync(lambda: None) + + expect = 'message too large (max 4096 bytes)' + self.assertTrue(expect in logs.stop()) + + def test_local_dead_message(self): + # Local router should generate dead message when reply_to is set. + router = self.klass(broker=self.broker, max_message_size=4096) + + logs = testlib.LogCapturer() + logs.start() + + # Try function call. Receiver should be woken by a dead message sent by + # router due to message size exceeded. + child = router.fork() + e = self.assertRaises(mitogen.core.ChannelError, + lambda: child.call(zlib.crc32, ' '*8192)) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) expect = 'message too large (max 4096 bytes)' self.assertTrue(expect in logs.stop()) @@ -314,5 +330,16 @@ class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase): self.assertTrue('policy refused message: ' in logs.stop()) +class EgressIdsTest(testlib.RouterMixin, testlib.TestCase): + def test_egress_ids_populated(self): + # Ensure Stream.egress_ids is populated on message reception. + c1 = self.router.fork() + stream = self.router.stream_by_id(c1.context_id) + self.assertEquals(set(), stream.egress_ids) + + c1.call(time.sleep, 0) + self.assertEquals(set([mitogen.context_id]), stream.egress_ids) + + if __name__ == '__main__': unittest2.main() diff --git a/tests/ssh_test.py b/tests/ssh_test.py index efca057d..bdee30dd 100644 --- a/tests/ssh_test.py +++ b/tests/ssh_test.py @@ -1,5 +1,6 @@ import os import sys +import tempfile import mitogen import mitogen.ssh @@ -11,19 +12,36 @@ import testlib import plain_old_module -class FakeSshTest(testlib.RouterMixin, unittest2.TestCase): +class StubSshMixin(testlib.RouterMixin): + """ + Mix-in that provides :meth:`stub_ssh` executing the stub 'ssh.py'. + """ + def stub_ssh(self, STUBSSH_MODE=None, **kwargs): + os.environ['STUBSSH_MODE'] = str(STUBSSH_MODE) + try: + return self.router.ssh( + hostname='hostname', + username='mitogen__has_sudo', + ssh_path=testlib.data_path('stubs/ssh.py'), + **kwargs + ) + finally: + del os.environ['STUBSSH_MODE'] + + +class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): def test_okay(self): context = self.router.ssh( hostname='hostname', username='mitogen__has_sudo', - ssh_path=testlib.data_path('fakessh.py'), + ssh_path=testlib.data_path('stubs/ssh.py'), ) #context.call(mitogen.utils.log_to_file, '/tmp/log') #context.call(mitogen.utils.disable_site_packages) self.assertEquals(3, context.call(plain_old_module.add, 1, 2)) -class SshTest(testlib.DockerMixin, unittest2.TestCase): +class SshTest(testlib.DockerMixin, testlib.TestCase): stream_class = mitogen.ssh.Stream def test_stream_name(self): @@ -105,6 +123,47 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase): context.call(plain_old_module.get_sentinel_value), ) + def test_enforce_unknown_host_key(self): + fp = tempfile.NamedTemporaryFile() + try: + e = self.assertRaises(mitogen.ssh.HostKeyError, + lambda: self.docker_ssh( + username='mitogen__has_sudo_pubkey', + password='has_sudo_password', + ssh_args=['-o', 'UserKnownHostsFile ' + fp.name], + check_host_keys='enforce', + ) + ) + self.assertEquals(e.args[0], mitogen.ssh.Stream.hostkey_failed_msg) + finally: + fp.close() + + def test_accept_enforce_host_keys(self): + fp = tempfile.NamedTemporaryFile() + try: + context = self.docker_ssh( + username='mitogen__has_sudo', + password='has_sudo_password', + ssh_args=['-o', 'UserKnownHostsFile ' + fp.name], + check_host_keys='accept', + ) + context.shutdown(wait=True) + + fp.seek(0) + # Lame test, but we're about to use enforce mode anyway, which + # verifies the file contents. + self.assertTrue(len(fp.read()) > 0) + + context = self.docker_ssh( + username='mitogen__has_sudo', + password='has_sudo_password', + ssh_args=['-o', 'UserKnownHostsFile ' + fp.name], + check_host_keys='enforce', + ) + context.shutdown(wait=True) + finally: + fp.close() + class BannerTest(testlib.DockerMixin, unittest2.TestCase): # Verify the ability to disambiguate random spam appearing in the SSHd's @@ -124,39 +183,37 @@ class BannerTest(testlib.DockerMixin, unittest2.TestCase): self.assertEquals(name, context.name) -class RequirePtyTest(testlib.DockerMixin, testlib.TestCase): - stream_class = mitogen.ssh.Stream +class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase): + def test_classic_prompt(self): + self.assertRaises(mitogen.ssh.PasswordError, + lambda: self.stub_ssh(STUBSSH_MODE='permdenied_classic')) - def fake_ssh(self, FAKESSH_MODE=None, **kwargs): - os.environ['FAKESSH_MODE'] = str(FAKESSH_MODE) - try: - return self.router.ssh( - hostname='hostname', - username='mitogen__has_sudo', - ssh_path=testlib.data_path('fakessh.py'), - **kwargs - ) - finally: - del os.environ['FAKESSH_MODE'] + def test_openssh_75_prompt(self): + self.assertRaises(mitogen.ssh.PasswordError, + lambda: self.stub_ssh(STUBSSH_MODE='permdenied_75')) + + +class StubCheckHostKeysTest(StubSshMixin, testlib.TestCase): + stream_class = mitogen.ssh.Stream def test_check_host_keys_accept(self): # required=true, host_key_checking=accept - context = self.fake_ssh(FAKESSH_MODE='ask', check_host_keys='accept') + context = self.stub_ssh(STUBSSH_MODE='ask', check_host_keys='accept') self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY')) def test_check_host_keys_enforce(self): # required=false, host_key_checking=enforce - context = self.fake_ssh(check_host_keys='enforce') + context = self.stub_ssh(check_host_keys='enforce') self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY')) def test_check_host_keys_ignore(self): # required=false, host_key_checking=ignore - context = self.fake_ssh(check_host_keys='ignore') + context = self.stub_ssh(check_host_keys='ignore') self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY')) def test_password_present(self): # required=true, password is not None - context = self.fake_ssh(check_host_keys='ignore', password='willick') + context = self.stub_ssh(check_host_keys='ignore', password='willick') self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY')) diff --git a/tests/testlib.py b/tests/testlib.py index 63d96233..8f11337d 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -14,6 +14,11 @@ import mitogen.core import mitogen.master import mitogen.utils +try: + import faulthandler +except ImportError: + pass + try: import urlparse except ImportError: @@ -32,6 +37,9 @@ sys.path.append(DATA_DIR) if mitogen.is_master: mitogen.utils.log_to_file() +if faulthandler is not None: + faulthandler.enable() + def data_path(suffix): path = os.path.join(DATA_DIR, suffix) @@ -160,12 +168,12 @@ def sync_with_broker(broker, timeout=10.0): class CaptureStreamHandler(logging.StreamHandler): def __init__(self, *args, **kwargs): - super(CaptureStreamHandler, self).__init__(*args, **kwargs) + logging.StreamHandler.__init__(self, *args, **kwargs) self.msgs = [] def emit(self, msg): self.msgs.append(msg) - return super(CaptureStreamHandler, self).emit(msg) + logging.StreamHandler.emit(self, msg) class LogCapturer(object): diff --git a/tests/unix_test.py b/tests/unix_test.py new file mode 100644 index 00000000..67265c81 --- /dev/null +++ b/tests/unix_test.py @@ -0,0 +1,119 @@ + +import os +import socket +import sys +import time + +import unittest2 + +import mitogen +import mitogen.fork +import mitogen.master +import mitogen.service +import mitogen.unix + +import testlib + + +class MyService(mitogen.service.Service): + def __init__(self, latch, **kwargs): + super(MyService, self).__init__(**kwargs) + # used to wake up main thread once client has made its request + self.latch = latch + + @mitogen.service.expose(policy=mitogen.service.AllowParents()) + def ping(self, msg): + self.latch.put(None) + return { + 'src_id': msg.src_id, + 'auth_id': msg.auth_id, + } + + +class IsPathDeadTest(unittest2.TestCase): + func = staticmethod(mitogen.unix.is_path_dead) + path = '/tmp/stale-socket' + + def test_does_not_exist(self): + self.assertTrue(self.func('/tmp/does-not-exist')) + + def make_socket(self): + if os.path.exists(self.path): + os.unlink(self.path) + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.bind(self.path) + return s + + def test_conn_refused(self): + s = self.make_socket() + s.close() + self.assertTrue(self.func(self.path)) + + def test_is_alive(self): + s = self.make_socket() + s.listen(5) + self.assertFalse(self.func(self.path)) + s.close() + os.unlink(self.path) + + +class ListenerTest(testlib.RouterMixin, unittest2.TestCase): + klass = mitogen.unix.Listener + + def test_constructor_basic(self): + listener = self.klass(router=self.router) + self.assertFalse(mitogen.unix.is_path_dead(listener.path)) + os.unlink(listener.path) + + +class ClientTest(unittest2.TestCase): + klass = mitogen.unix.Listener + + def _try_connect(self, path): + # give server a chance to setup listener + for x in range(10): + try: + return mitogen.unix.connect(path) + except socket.error: + if x == 9: + raise + time.sleep(0.1) + + def _test_simple_client(self, path): + router, context = self._try_connect(path) + self.assertEquals(0, context.context_id) + self.assertEquals(1, mitogen.context_id) + self.assertEquals(0, mitogen.parent_id) + resp = context.call_service(service_name=MyService, method_name='ping') + self.assertEquals(mitogen.context_id, resp['src_id']) + self.assertEquals(0, resp['auth_id']) + + def _test_simple_server(self, path): + router = mitogen.master.Router() + latch = mitogen.core.Latch() + try: + try: + listener = self.klass(path=path, router=router) + pool = mitogen.service.Pool(router=router, services=[ + MyService(latch=latch, router=router), + ]) + latch.get() + # give broker a chance to deliver service resopnse + time.sleep(0.1) + finally: + pool.shutdown() + router.broker.shutdown() + finally: + os._exit(0) + + def test_simple(self): + path = mitogen.unix.make_socket_path() + if os.fork(): + self._test_simple_client(path) + else: + mitogen.fork.on_fork() + self._test_simple_server(path) + + +if __name__ == '__main__': + unittest2.main()