diff --git a/docs/api.rst b/docs/api.rst index b412a098..b2f4321b 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -3,49 +3,50 @@ API Reference ************* +Package Layout +============== + + econtext Package -================ +---------------- .. automodule:: econtext -.. autodata:: econtext.slave - econtext.core -============= +------------- .. automodule:: econtext.core -Exceptions ----------- +econtext.master +--------------- -.. autoclass:: econtext.core.Error -.. autoclass:: econtext.core.CallError -.. autoclass:: econtext.core.ChannelError -.. autoclass:: econtext.core.StreamError -.. autoclass:: econtext.core.TimeoutError +.. automodule:: econtext.master -Stream Classes --------------- -.. autoclass:: econtext.core.Stream - :members: +Context Factories +================= + +.. autofunction:: econtext.master.connect +.. autofunction:: econtext.ssh.connect Broker Class ------------- +============ -.. autoclass:: econtext.core.Broker +.. autoclass:: econtext.master.Broker :members: + :inherited-members: Context Class -------------- +============= -.. autoclass:: econtext.core.Context +.. autoclass:: econtext.master.Context :members: + :inherited-members: Channel Class @@ -55,56 +56,6 @@ Channel Class :members: -ExternalContext Class ---------------------- - -.. class:: econtext.core.ExternalContext - - External context implementation. - - .. attribute:: broker - - The :py:class:`econtext.core.Broker` instance. - - .. attribute:: context - - The :py:class:`econtext.core.Context` instance. - - .. attribute:: channel - - The :py:class:`econtext.core.Channel` over which - :py:data:`CALL_FUNCTION` requests are received. - - .. attribute:: stdout_log - - The :py:class:`econtext.core.IoLogger` connected to ``stdout``. - - .. attribute:: importer - - The :py:class:`econtext.core.Importer` instance. - - .. attribute:: stdout_log - - The :py:class:`IoLogger` connected to ``stdout``. - - .. attribute:: stderr_log - - The :py:class:`IoLogger` connected to ``stderr``. - - -econtext.master -=============== - -.. automodule:: econtext.master - - -Broker Class ------------- - -.. autoclass:: econtext.master.Broker - :members: - - Context Class ------------- @@ -112,8 +63,24 @@ Context Class :members: -econtext.utils -============== +Detecting A Slave +================= + +.. autodata:: econtext.slave + + +Utility Functions +================= .. automodule:: econtext.utils :members: + + +Exceptions +========== + +.. autoclass:: econtext.core.Error +.. autoclass:: econtext.core.CallError +.. autoclass:: econtext.core.ChannelError +.. autoclass:: econtext.core.StreamError +.. autoclass:: econtext.core.TimeoutError diff --git a/docs/index.rst b/docs/index.rst index 503550b6..ba005f04 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -209,7 +209,7 @@ usual into the slave process. print __doc__ sys.exit(1) - context = broker.get_remote(sys.argv[1]) + context = econtext.ssh.connect(broker, sys.argv[1]) context.call(install_app) if __name__ == '__main__' and not econtext.slave: diff --git a/docs/internals.rst b/docs/internals.rst index 638a930b..7a8d0320 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -20,15 +20,33 @@ Stream Classes .. autoclass:: econtext.core.BasicStream :members: +.. autoclass:: econtext.core.Stream + :members: -.. autoclass:: econtext.core.IoLogger +.. autoclass:: econtext.master.Stream :members: +.. autoclass:: econtext.ssh.Stream + :members: + + +Other Stream Subclasses +----------------------- + +.. autoclass:: econtext.core.IoLogger + :members: .. autoclass:: econtext.core.Waker :members: + +ExternalContext Class +--------------------- + +.. autoclass:: econtext.core.ExternalContext + + econtext.master =============== @@ -39,13 +57,3 @@ Helper Functions .. autofunction:: econtext.master.create_child .. autofunction:: econtext.master.get_child_modules .. autofunction:: econtext.master.minimize_source - - -Stream Classes --------------- - -.. autoclass:: econtext.master.LocalStream - :members: - -.. autoclass:: econtext.master.SshStream - :members: diff --git a/econtext/__init__.py b/econtext/__init__.py index a3d7315d..1eff1525 100644 --- a/econtext/__init__.py +++ b/econtext/__init__.py @@ -13,7 +13,7 @@ be expected. On the slave, it is built dynamically during startup. #: os.system('hostname') #: #: def main(broker): -#: context = broker.get_local() +#: context = econtext.master.connect(broker) #: context.call(do_work) # Causes slave to import __main__. #: #: if __name__ == '__main__' and not econtext.slave: diff --git a/econtext/ansible/connection.py b/econtext/ansible/connection.py index d21f8371..ef6bd2bf 100644 --- a/econtext/ansible/connection.py +++ b/econtext/ansible/connection.py @@ -16,6 +16,7 @@ Enable it by: """ import econtext.master +import econtext.ssh import econtext.utils from econtext.ansible import helpers @@ -38,9 +39,10 @@ class Connection(ansible.plugins.connection.ConnectionBase): return self.broker = econtext.master.Broker() if self._play_context.remote_addr == 'localhost': - self.context = self.broker.get_local() + self.context = econtext.master.connect(self.broker) else: - self.context = self.broker.get_remote(self._play_context.remote_addr) + self.context = econtext.ssh.connect(broker, + self._play_context.remote_addr) def exec_command(self, cmd, in_data=None, sudoable=True): super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable) diff --git a/econtext/core.py b/econtext/core.py index 35821505..f0a61e39 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -457,16 +457,6 @@ class Stream(BasicStream): set_cloexec(self.transmit_side.fd) self._context.stream = self - def connect(self): - """Connect to a Broker at the address specified in our associated - Context.""" - LOG.debug('%r.connect()', self) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.receive_side = Side(self, sock.fileno()) - self.transmit_side = Side(self, sock.fileno()) - sock.connect(self._context.parent_addr) - self.enqueue(0, self._context.name) - def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self._context) @@ -780,6 +770,38 @@ class Broker(object): class ExternalContext(object): + """ + External context implementation. + + .. attribute:: broker + + The :py:class:`econtext.core.Broker` instance. + + .. attribute:: context + + The :py:class:`econtext.core.Context` instance. + + .. attribute:: channel + + The :py:class:`econtext.core.Channel` over which + :py:data:`CALL_FUNCTION` requests are received. + + .. attribute:: stdout_log + + The :py:class:`econtext.core.IoLogger` connected to ``stdout``. + + .. attribute:: importer + + The :py:class:`econtext.core.Importer` instance. + + .. attribute:: stdout_log + + The :py:class:`IoLogger` connected to ``stdout``. + + .. attribute:: stderr_log + + The :py:class:`IoLogger` connected to ``stderr``. + """ def _setup_master(self, key): self.broker = Broker() self.context = Context(self.broker, 'master', key=key) diff --git a/econtext/master.py b/econtext/master.py index 6eee353f..20e9324f 100644 --- a/econtext/master.py +++ b/econtext/master.py @@ -4,7 +4,6 @@ starting new contexts via SSH. Its size is also restricted, since it must be sent to any context that will be used to establish additional child contexts. """ -import commands import getpass import imp import inspect @@ -69,24 +68,6 @@ def create_child(*args): return pid, parentfp -class Listener(econtext.core.BasicStream): - def __init__(self, broker, address=None, backlog=30): - self._broker = broker - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.bind(address or ('0.0.0.0', 0)) - self._sock.listen(backlog) - econtext.core.set_cloexec(self._sock.fileno()) - self._listen_addr = self._sock.getsockname() - self.receive_side = econtext.core.Side(self, self._sock.fileno()) - broker.update_stream(self) - - def on_receive(self, broker): - sock, addr = self._sock.accept() - context = Context(self._broker, name=addr) - stream = econtext.core.Stream(context) - stream.accept(sock.fileno(), sock.fileno()) - - class LogForwarder(object): def __init__(self, context): self._context = context @@ -199,7 +180,7 @@ class ModuleResponder(object): self._context.enqueue(reply_to, None) -class LocalStream(econtext.core.Stream): +class Stream(econtext.core.Stream): """ Base for streams capable of starting new slaves. """ @@ -207,7 +188,7 @@ class LocalStream(econtext.core.Stream): python_path = sys.executable def __init__(self, context): - super(LocalStream, self).__init__(context) + super(Stream, self).__init__(context) self._permitted_classes = set([ ('econtext.core', 'CallError'), ('econtext.core', 'Dead'), @@ -290,49 +271,9 @@ class LocalStream(econtext.core.Stream): raise econtext.core.StreamError('Bootstrap failed; stdout: %r', s) -class SshStream(LocalStream): - python_path = 'python' - #: The path to the SSH binary. - ssh_path = 'ssh' - - def get_boot_command(self): - bits = [self.ssh_path] - if self._context.username: - bits += ['-l', self._context.username] - bits.append(self._context.hostname) - base = super(SshStream, self).get_boot_command() - return bits + map(commands.mkarg, base) - - class Broker(econtext.core.Broker): shutdown_timeout = 5.0 - def create_listener(self, address=None, backlog=30): - """Listen on `address` for connections from newly spawned contexts.""" - self._listener = Listener(self, address, backlog) - - def get_local(self, name='default', python_path=None): - """Get the named context running on the local machine, creating it if - it does not exist.""" - context = Context(self, name) - context.stream = LocalStream(context) - if python_path: - context.stream.python_path = python_path - context.stream.connect() - return self.register(context) - - def get_remote(self, hostname, username=None, name=None, python_path=None): - """Get the named remote context, creating it if it does not exist.""" - if name is None: - name = hostname - - context = Context(self, name, hostname, username) - context.stream = SshStream(context) - if python_path: - context.stream.python_path = python_path - context.stream.connect() - return self.register(context) - class Context(econtext.core.Context): def __init__(self, *args, **kwargs): @@ -346,8 +287,9 @@ class Context(econtext.core.Context): def call_with_deadline(self, deadline, with_context, fn, *args, **kwargs): """Invoke `fn([context,] *args, **kwargs)` in the external context. - If `with_context` is True, pass its - :py:class:`econtext.core.ExternalContext` instance as first parameter. + If `with_context` is ``True``, pass its + :py:class:`ExternalContext ` instance as + the first parameter. If `deadline` is not ``None``, expire the call after `deadline` seconds. If `deadline` is ``None``, the invocation may block @@ -371,3 +313,14 @@ class Context(econtext.core.Context): def call(self, fn, *args, **kwargs): """Invoke `fn(*args, **kwargs)` in the external context.""" return self.call_with_deadline(None, False, fn, *args, **kwargs) + + +def connect(broker, name='default', python_path=None): + """Get the named context running on the local machine, creating it if + it does not exist.""" + context = Context(broker, name) + context.stream = Stream(context) + if python_path: + context.stream.python_path = python_path + context.stream.connect() + return broker.register(context) diff --git a/econtext/ssh.py b/econtext/ssh.py new file mode 100644 index 00000000..d46b0da5 --- /dev/null +++ b/econtext/ssh.py @@ -0,0 +1,34 @@ +""" +Functionality to allow establishing new slave contexts over an SSH connection. +""" + +import commands + +import econtext.master + + +class Stream(econtext.master.Stream): + python_path = 'python' + #: The path to the SSH binary. + ssh_path = 'ssh' + + def get_boot_command(self): + bits = [self.ssh_path] + if self._context.username: + bits += ['-l', self._context.username] + bits.append(self._context.hostname) + base = super(Stream, self).get_boot_command() + return bits + map(commands.mkarg, base) + + +def connect(broker, hostname, username=None, name=None, python_path=None): + """Get the named remote context, creating it if it does not exist.""" + if name is None: + name = hostname + + context = econtext.master.Context(broker, name, hostname, username) + context.stream = Stream(context) + if python_path: + context.stream.python_path = python_path + context.stream.connect() + return broker.register(context) diff --git a/examples/ansible_demo.py b/examples/ansible_demo.py index 40c8fb10..85138cbd 100644 --- a/examples/ansible_demo.py +++ b/examples/ansible_demo.py @@ -7,6 +7,7 @@ import logging import time import econtext +import econtext.master import econtext.utils # Prevent accident import of an Ansible module from hanging on stdin read. @@ -100,7 +101,7 @@ def main(broker): level = logging.INFO logging.basicConfig(level=level, format=fmt, datefmt=datefmt) - context = broker.get_local() + context = econtext.master.connect(broker) print context.call(run_module, 'ansible.modules.core.system.setup') for x in xrange(10): print context.call(run_module, 'ansible.modules.core.commands.command', 'hostname') diff --git a/tests/data/self_contained_program.py b/tests/data/self_contained_program.py index c3bfc7b9..9d9fccf6 100644 --- a/tests/data/self_contained_program.py +++ b/tests/data/self_contained_program.py @@ -12,7 +12,7 @@ def repr_stuff(): def main(): broker = econtext.master.Broker() try: - context = broker.get_local() + context = econtext.master.connect(broker) print context.call(repr_stuff) finally: broker.shutdown() diff --git a/tests/responder_test.py b/tests/responder_test.py index 405c65a0..ef5f515d 100644 --- a/tests/responder_test.py +++ b/tests/responder_test.py @@ -4,7 +4,6 @@ import subprocess import unittest import sys -import econtext.master import econtext.master import testlib @@ -16,13 +15,13 @@ class GoodModulesTest(testlib.BrokerMixin, unittest.TestCase): def test_plain_old_module(self): # The simplest case: a top-level module with no interesting imports or # package machinery damage. - context = self.broker.get_local() + context = econtext.master.connect(self.broker) self.assertEquals(256, context.call(plain_old_module.pow, 2, 8)) def test_simple_pkg(self): # Ensure success of a simple package containing two submodules, one of # which imports the other. - context = self.broker.get_local() + context = econtext.master.connect(self.broker) self.assertEquals(3, context.call(simple_pkg.a.subtract_one_add_two, 2))