Rearrange docs, split connection methods into submodules.

pull/35/head
David Wilson 8 years ago
parent 87a4206015
commit ff903b1bcd

@ -3,49 +3,50 @@ API Reference
************* *************
Package Layout
==============
econtext Package econtext Package
================ ----------------
.. automodule:: econtext .. automodule:: econtext
.. autodata:: econtext.slave
econtext.core econtext.core
============= -------------
.. automodule:: econtext.core .. automodule:: econtext.core
Exceptions econtext.master
---------- ---------------
.. autoclass:: econtext.core.Error .. automodule:: econtext.master
.. autoclass:: econtext.core.CallError
.. autoclass:: econtext.core.ChannelError
.. autoclass:: econtext.core.StreamError
.. autoclass:: econtext.core.TimeoutError
Stream Classes
--------------
.. autoclass:: econtext.core.Stream Context Factories
:members: =================
.. autofunction:: econtext.master.connect
.. autofunction:: econtext.ssh.connect
Broker Class Broker Class
------------ ============
.. autoclass:: econtext.core.Broker .. autoclass:: econtext.master.Broker
:members: :members:
:inherited-members:
Context Class Context Class
------------- =============
.. autoclass:: econtext.core.Context .. autoclass:: econtext.master.Context
:members: :members:
:inherited-members:
Channel Class Channel Class
@ -55,56 +56,6 @@ Channel Class
:members: :members:
ExternalContext Class
---------------------
.. class:: econtext.core.ExternalContext
External context implementation.
.. attribute:: broker
The :py:class:`econtext.core.Broker` instance.
.. attribute:: context
The :py:class:`econtext.core.Context` instance.
.. attribute:: channel
The :py:class:`econtext.core.Channel` over which
:py:data:`CALL_FUNCTION` requests are received.
.. attribute:: stdout_log
The :py:class:`econtext.core.IoLogger` connected to ``stdout``.
.. attribute:: importer
The :py:class:`econtext.core.Importer` instance.
.. attribute:: stdout_log
The :py:class:`IoLogger` connected to ``stdout``.
.. attribute:: stderr_log
The :py:class:`IoLogger` connected to ``stderr``.
econtext.master
===============
.. automodule:: econtext.master
Broker Class
------------
.. autoclass:: econtext.master.Broker
:members:
Context Class Context Class
------------- -------------
@ -112,8 +63,24 @@ Context Class
:members: :members:
econtext.utils Detecting A Slave
============== =================
.. autodata:: econtext.slave
Utility Functions
=================
.. automodule:: econtext.utils .. automodule:: econtext.utils
:members: :members:
Exceptions
==========
.. autoclass:: econtext.core.Error
.. autoclass:: econtext.core.CallError
.. autoclass:: econtext.core.ChannelError
.. autoclass:: econtext.core.StreamError
.. autoclass:: econtext.core.TimeoutError

@ -209,7 +209,7 @@ usual into the slave process.
print __doc__ print __doc__
sys.exit(1) sys.exit(1)
context = broker.get_remote(sys.argv[1]) context = econtext.ssh.connect(broker, sys.argv[1])
context.call(install_app) context.call(install_app)
if __name__ == '__main__' and not econtext.slave: if __name__ == '__main__' and not econtext.slave:

@ -20,15 +20,33 @@ Stream Classes
.. autoclass:: econtext.core.BasicStream .. autoclass:: econtext.core.BasicStream
:members: :members:
.. autoclass:: econtext.core.Stream
:members:
.. autoclass:: econtext.core.IoLogger .. autoclass:: econtext.master.Stream
:members: :members:
.. autoclass:: econtext.ssh.Stream
:members:
Other Stream Subclasses
-----------------------
.. autoclass:: econtext.core.IoLogger
:members:
.. autoclass:: econtext.core.Waker .. autoclass:: econtext.core.Waker
:members: :members:
ExternalContext Class
---------------------
.. autoclass:: econtext.core.ExternalContext
econtext.master econtext.master
=============== ===============
@ -39,13 +57,3 @@ Helper Functions
.. autofunction:: econtext.master.create_child .. autofunction:: econtext.master.create_child
.. autofunction:: econtext.master.get_child_modules .. autofunction:: econtext.master.get_child_modules
.. autofunction:: econtext.master.minimize_source .. autofunction:: econtext.master.minimize_source
Stream Classes
--------------
.. autoclass:: econtext.master.LocalStream
:members:
.. autoclass:: econtext.master.SshStream
:members:

@ -13,7 +13,7 @@ be expected. On the slave, it is built dynamically during startup.
#: os.system('hostname') #: os.system('hostname')
#: #:
#: def main(broker): #: def main(broker):
#: context = broker.get_local() #: context = econtext.master.connect(broker)
#: context.call(do_work) # Causes slave to import __main__. #: context.call(do_work) # Causes slave to import __main__.
#: #:
#: if __name__ == '__main__' and not econtext.slave: #: if __name__ == '__main__' and not econtext.slave:

@ -16,6 +16,7 @@ Enable it by:
""" """
import econtext.master import econtext.master
import econtext.ssh
import econtext.utils import econtext.utils
from econtext.ansible import helpers from econtext.ansible import helpers
@ -38,9 +39,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
return return
self.broker = econtext.master.Broker() self.broker = econtext.master.Broker()
if self._play_context.remote_addr == 'localhost': if self._play_context.remote_addr == 'localhost':
self.context = self.broker.get_local() self.context = econtext.master.connect(self.broker)
else: else:
self.context = self.broker.get_remote(self._play_context.remote_addr) self.context = econtext.ssh.connect(broker,
self._play_context.remote_addr)
def exec_command(self, cmd, in_data=None, sudoable=True): def exec_command(self, cmd, in_data=None, sudoable=True):
super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable) super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)

@ -457,16 +457,6 @@ class Stream(BasicStream):
set_cloexec(self.transmit_side.fd) set_cloexec(self.transmit_side.fd)
self._context.stream = self self._context.stream = self
def connect(self):
"""Connect to a Broker at the address specified in our associated
Context."""
LOG.debug('%r.connect()', self)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.receive_side = Side(self, sock.fileno())
self.transmit_side = Side(self, sock.fileno())
sock.connect(self._context.parent_addr)
self.enqueue(0, self._context.name)
def __repr__(self): def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self._context) return '%s(%r)' % (self.__class__.__name__, self._context)
@ -780,6 +770,38 @@ class Broker(object):
class ExternalContext(object): class ExternalContext(object):
"""
External context implementation.
.. attribute:: broker
The :py:class:`econtext.core.Broker` instance.
.. attribute:: context
The :py:class:`econtext.core.Context` instance.
.. attribute:: channel
The :py:class:`econtext.core.Channel` over which
:py:data:`CALL_FUNCTION` requests are received.
.. attribute:: stdout_log
The :py:class:`econtext.core.IoLogger` connected to ``stdout``.
.. attribute:: importer
The :py:class:`econtext.core.Importer` instance.
.. attribute:: stdout_log
The :py:class:`IoLogger` connected to ``stdout``.
.. attribute:: stderr_log
The :py:class:`IoLogger` connected to ``stderr``.
"""
def _setup_master(self, key): def _setup_master(self, key):
self.broker = Broker() self.broker = Broker()
self.context = Context(self.broker, 'master', key=key) self.context = Context(self.broker, 'master', key=key)

@ -4,7 +4,6 @@ starting new contexts via SSH. Its size is also restricted, since it must be
sent to any context that will be used to establish additional child contexts. sent to any context that will be used to establish additional child contexts.
""" """
import commands
import getpass import getpass
import imp import imp
import inspect import inspect
@ -69,24 +68,6 @@ def create_child(*args):
return pid, parentfp return pid, parentfp
class Listener(econtext.core.BasicStream):
def __init__(self, broker, address=None, backlog=30):
self._broker = broker
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.bind(address or ('0.0.0.0', 0))
self._sock.listen(backlog)
econtext.core.set_cloexec(self._sock.fileno())
self._listen_addr = self._sock.getsockname()
self.receive_side = econtext.core.Side(self, self._sock.fileno())
broker.update_stream(self)
def on_receive(self, broker):
sock, addr = self._sock.accept()
context = Context(self._broker, name=addr)
stream = econtext.core.Stream(context)
stream.accept(sock.fileno(), sock.fileno())
class LogForwarder(object): class LogForwarder(object):
def __init__(self, context): def __init__(self, context):
self._context = context self._context = context
@ -199,7 +180,7 @@ class ModuleResponder(object):
self._context.enqueue(reply_to, None) self._context.enqueue(reply_to, None)
class LocalStream(econtext.core.Stream): class Stream(econtext.core.Stream):
""" """
Base for streams capable of starting new slaves. Base for streams capable of starting new slaves.
""" """
@ -207,7 +188,7 @@ class LocalStream(econtext.core.Stream):
python_path = sys.executable python_path = sys.executable
def __init__(self, context): def __init__(self, context):
super(LocalStream, self).__init__(context) super(Stream, self).__init__(context)
self._permitted_classes = set([ self._permitted_classes = set([
('econtext.core', 'CallError'), ('econtext.core', 'CallError'),
('econtext.core', 'Dead'), ('econtext.core', 'Dead'),
@ -290,49 +271,9 @@ class LocalStream(econtext.core.Stream):
raise econtext.core.StreamError('Bootstrap failed; stdout: %r', s) raise econtext.core.StreamError('Bootstrap failed; stdout: %r', s)
class SshStream(LocalStream):
python_path = 'python'
#: The path to the SSH binary.
ssh_path = 'ssh'
def get_boot_command(self):
bits = [self.ssh_path]
if self._context.username:
bits += ['-l', self._context.username]
bits.append(self._context.hostname)
base = super(SshStream, self).get_boot_command()
return bits + map(commands.mkarg, base)
class Broker(econtext.core.Broker): class Broker(econtext.core.Broker):
shutdown_timeout = 5.0 shutdown_timeout = 5.0
def create_listener(self, address=None, backlog=30):
"""Listen on `address` for connections from newly spawned contexts."""
self._listener = Listener(self, address, backlog)
def get_local(self, name='default', python_path=None):
"""Get the named context running on the local machine, creating it if
it does not exist."""
context = Context(self, name)
context.stream = LocalStream(context)
if python_path:
context.stream.python_path = python_path
context.stream.connect()
return self.register(context)
def get_remote(self, hostname, username=None, name=None, python_path=None):
"""Get the named remote context, creating it if it does not exist."""
if name is None:
name = hostname
context = Context(self, name, hostname, username)
context.stream = SshStream(context)
if python_path:
context.stream.python_path = python_path
context.stream.connect()
return self.register(context)
class Context(econtext.core.Context): class Context(econtext.core.Context):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -346,8 +287,9 @@ class Context(econtext.core.Context):
def call_with_deadline(self, deadline, with_context, fn, *args, **kwargs): def call_with_deadline(self, deadline, with_context, fn, *args, **kwargs):
"""Invoke `fn([context,] *args, **kwargs)` in the external context. """Invoke `fn([context,] *args, **kwargs)` in the external context.
If `with_context` is True, pass its If `with_context` is ``True``, pass its
:py:class:`econtext.core.ExternalContext` instance as first parameter. :py:class:`ExternalContext <econtext.core.ExternalContext>` instance as
the first parameter.
If `deadline` is not ``None``, expire the call after `deadline` If `deadline` is not ``None``, expire the call after `deadline`
seconds. If `deadline` is ``None``, the invocation may block seconds. If `deadline` is ``None``, the invocation may block
@ -371,3 +313,14 @@ class Context(econtext.core.Context):
def call(self, fn, *args, **kwargs): def call(self, fn, *args, **kwargs):
"""Invoke `fn(*args, **kwargs)` in the external context.""" """Invoke `fn(*args, **kwargs)` in the external context."""
return self.call_with_deadline(None, False, fn, *args, **kwargs) return self.call_with_deadline(None, False, fn, *args, **kwargs)
def connect(broker, name='default', python_path=None):
"""Get the named context running on the local machine, creating it if
it does not exist."""
context = Context(broker, name)
context.stream = Stream(context)
if python_path:
context.stream.python_path = python_path
context.stream.connect()
return broker.register(context)

@ -0,0 +1,34 @@
"""
Functionality to allow establishing new slave contexts over an SSH connection.
"""
import commands
import econtext.master
class Stream(econtext.master.Stream):
python_path = 'python'
#: The path to the SSH binary.
ssh_path = 'ssh'
def get_boot_command(self):
bits = [self.ssh_path]
if self._context.username:
bits += ['-l', self._context.username]
bits.append(self._context.hostname)
base = super(Stream, self).get_boot_command()
return bits + map(commands.mkarg, base)
def connect(broker, hostname, username=None, name=None, python_path=None):
"""Get the named remote context, creating it if it does not exist."""
if name is None:
name = hostname
context = econtext.master.Context(broker, name, hostname, username)
context.stream = Stream(context)
if python_path:
context.stream.python_path = python_path
context.stream.connect()
return broker.register(context)

@ -7,6 +7,7 @@ import logging
import time import time
import econtext import econtext
import econtext.master
import econtext.utils import econtext.utils
# Prevent accident import of an Ansible module from hanging on stdin read. # Prevent accident import of an Ansible module from hanging on stdin read.
@ -100,7 +101,7 @@ def main(broker):
level = logging.INFO level = logging.INFO
logging.basicConfig(level=level, format=fmt, datefmt=datefmt) logging.basicConfig(level=level, format=fmt, datefmt=datefmt)
context = broker.get_local() context = econtext.master.connect(broker)
print context.call(run_module, 'ansible.modules.core.system.setup') print context.call(run_module, 'ansible.modules.core.system.setup')
for x in xrange(10): for x in xrange(10):
print context.call(run_module, 'ansible.modules.core.commands.command', 'hostname') print context.call(run_module, 'ansible.modules.core.commands.command', 'hostname')

@ -12,7 +12,7 @@ def repr_stuff():
def main(): def main():
broker = econtext.master.Broker() broker = econtext.master.Broker()
try: try:
context = broker.get_local() context = econtext.master.connect(broker)
print context.call(repr_stuff) print context.call(repr_stuff)
finally: finally:
broker.shutdown() broker.shutdown()

@ -4,7 +4,6 @@ import subprocess
import unittest import unittest
import sys import sys
import econtext.master
import econtext.master import econtext.master
import testlib import testlib
@ -16,13 +15,13 @@ class GoodModulesTest(testlib.BrokerMixin, unittest.TestCase):
def test_plain_old_module(self): def test_plain_old_module(self):
# The simplest case: a top-level module with no interesting imports or # The simplest case: a top-level module with no interesting imports or
# package machinery damage. # package machinery damage.
context = self.broker.get_local() context = econtext.master.connect(self.broker)
self.assertEquals(256, context.call(plain_old_module.pow, 2, 8)) self.assertEquals(256, context.call(plain_old_module.pow, 2, 8))
def test_simple_pkg(self): def test_simple_pkg(self):
# Ensure success of a simple package containing two submodules, one of # Ensure success of a simple package containing two submodules, one of
# which imports the other. # which imports the other.
context = self.broker.get_local() context = econtext.master.connect(self.broker)
self.assertEquals(3, self.assertEquals(3,
context.call(simple_pkg.a.subtract_one_add_two, 2)) context.call(simple_pkg.a.subtract_one_add_two, 2))

Loading…
Cancel
Save