Fix method naming everywhere.

pull/35/head
David Wilson 9 years ago
parent 4b11023558
commit 0e58a48611

@ -56,7 +56,7 @@ class TimeoutError(StreamError):
class CallError(ContextError): class CallError(ContextError):
"""Raised when .Call() fails""" """Raised when .call() fails"""
def __init__(self, e): def __init__(self, e):
name = '%s.%s' % (type(e).__module__, type(e).__name__) name = '%s.%s' % (type(e).__module__, type(e).__name__)
tb = sys.exc_info()[2] tb = sys.exc_info()[2]
@ -64,7 +64,7 @@ class CallError(ContextError):
stack = ''.join(traceback.format_tb(tb)) stack = ''.join(traceback.format_tb(tb))
else: else:
stack = '' stack = ''
ContextError.__init__(self, 'Call failed: %s: %s\n%s', name, e, stack) ContextError.__init__(self, 'call failed: %s: %s\n%s', name, e, stack)
class Dead(object): class Dead(object):
@ -98,33 +98,33 @@ class Channel(object):
self._context = context self._context = context
self._handle = handle self._handle = handle
self._queue = Queue.Queue() self._queue = Queue.Queue()
self._context.AddHandleCB(self._Receive, handle) self._context.add_handle_cb(self._receive, handle)
def _Receive(self, data): def _receive(self, data):
"""Callback from the Stream; appends data to the internal queue.""" """Callback from the Stream; appends data to the internal queue."""
IOLOG.debug('%r._Receive(%r)', self, data) IOLOG.debug('%r._receive(%r)', self, data)
self._queue.put(data) self._queue.put(data)
def Close(self): def close(self):
"""Indicate this channel is closed to the remote side.""" """Indicate this channel is closed to the remote side."""
IOLOG.debug('%r.Close()', self) IOLOG.debug('%r.close()', self)
self._context.Enqueue(self._handle, _DEAD) self._context.enqueue(self._handle, _DEAD)
def Send(self, data): def send(self, data):
"""Send `data` to the remote.""" """Send `data` to the remote."""
IOLOG.debug('%r.Send(%r)', self, data) IOLOG.debug('%r.send(%r)', self, data)
self._context.Enqueue(self._handle, data) self._context.enqueue(self._handle, data)
def Receive(self, timeout=None): def on_receive(self, timeout=None):
"""Receive an object from the remote, or return ``None`` if `timeout` """on_receive an object from the remote, or return ``None`` if `timeout`
is reached.""" is reached."""
IOLOG.debug('%r.Receive(timeout=%r)', self, timeout) IOLOG.debug('%r.on_receive(timeout=%r)', self, timeout)
try: try:
data = self._queue.get(True, timeout) data = self._queue.get(True, timeout)
except Queue.Empty: except Queue.Empty:
return return
IOLOG.debug('%r.Receive() got %r', self, data) IOLOG.debug('%r.on_receive() got %r', self, data)
if data == _DEAD: if data == _DEAD:
raise ChannelError('Channel is closed.') raise ChannelError('Channel is closed.')
return data return data
@ -134,7 +134,7 @@ class Channel(object):
is closed.""" is closed."""
while True: while True:
try: try:
yield self.Receive() yield self.on_receive()
except ChannelError: except ChannelError:
return return
@ -179,7 +179,7 @@ class SlaveModuleImporter(object):
def load_module(self, fullname): def load_module(self, fullname):
LOG.debug('SlaveModuleImporter.load_module(%r)', fullname) LOG.debug('SlaveModuleImporter.load_module(%r)', fullname)
ret = self._context.EnqueueAwaitReply(GET_MODULE, None, (fullname,)) ret = self._context.enqueue_await_reply(GET_MODULE, None, (fullname,))
if ret is None: if ret is None:
raise ImportError('Master does not have %r' % (fullname,)) raise ImportError('Master does not have %r' % (fullname,))
@ -211,7 +211,7 @@ class LogHandler(logging.Handler):
self.local.in_emit = True self.local.in_emit = True
try: try:
msg = self.format(rec) msg = self.format(rec)
self.context.Enqueue(FORWARD_LOG, (rec.name, rec.levelno, msg)) self.context.enqueue(FORWARD_LOG, (rec.name, rec.levelno, msg))
finally: finally:
self.local.in_emit = False self.local.in_emit = False
@ -244,17 +244,17 @@ class BasicStream(object):
read_side = None read_side = None
write_side = None write_side = None
def Disconnect(self): def on_disconnect(self):
LOG.debug('%r.Disconnect()', self) LOG.debug('%r.on_disconnect()', self)
self.read_side.close() self.read_side.close()
self.write_side.close() self.write_side.close()
def Shutdown(self): def shutdown(self):
LOG.debug('%r.Shutdown()', self) LOG.debug('%r.shutdown()', self)
self.read_side.close() self.read_side.close()
self.write_side.close() self.write_side.close()
def WriteMore(self): def has_output(self):
return False return False
@ -273,31 +273,31 @@ class Stream(BasicStream):
self._rhmac = hmac.new(context.key, digestmod=sha.new) self._rhmac = hmac.new(context.key, digestmod=sha.new)
self._whmac = self._rhmac.copy() self._whmac = self._rhmac.copy()
_FindGlobal = None _find_global = None
def Unpickle(self, data): def unpickle(self, data):
"""Deserialize `data` into an object.""" """Deserialize `data` into an object."""
IOLOG.debug('%r.Unpickle(%r)', self, data) IOLOG.debug('%r.unpickle(%r)', self, data)
fp = cStringIO.StringIO(data) fp = cStringIO.StringIO(data)
unpickler = cPickle.Unpickler(fp) unpickler = cPickle.Unpickler(fp)
if self._FindGlobal: if self._find_global:
unpickler.find_global = self._FindGlobal unpickler.find_global = self._find_global
return unpickler.load() return unpickler.load()
def Receive(self): def on_receive(self):
"""Handle the next complete message on the stream. Raise """Handle the next complete message on the stream. Raise
CorruptMessageError or IOError on failure.""" CorruptMessageError or IOError on failure."""
IOLOG.debug('%r.Receive()', self) IOLOG.debug('%r.on_receive()', self)
buf = os.read(self.read_side.fd, 4096) buf = os.read(self.read_side.fd, 4096)
self._input_buf += buf self._input_buf += buf
while self._ReceiveOne(): while self._receive_one():
pass pass
if not buf: if not buf:
return self.Disconnect() return self.on_disconnect()
def _ReceiveOne(self): def _receive_one(self):
if len(self._input_buf) < 24: if len(self._input_buf) < 24:
return False return False
@ -316,16 +316,16 @@ class Stream(BasicStream):
self._input_buf[24:msg_len+24]) self._input_buf[24:msg_len+24])
try: try:
handle, data = self.Unpickle(self._input_buf[24:msg_len+24]) handle, data = self.unpickle(self._input_buf[24:msg_len+24])
except (TypeError, ValueError), ex: except (TypeError, ValueError), ex:
raise CorruptMessageError('invalid message: %s', ex) raise CorruptMessageError('invalid message: %s', ex)
self._input_buf = self._input_buf[msg_len+24:] self._input_buf = self._input_buf[msg_len+24:]
self._Invoke(handle, data) self._invoke(handle, data)
return True return True
def _Invoke(self, handle, data): def _invoke(self, handle, data):
IOLOG.debug('%r._Invoke(): handle=%r; data=%r', self, handle, data) IOLOG.debug('%r._invoke(): handle=%r; data=%r', self, handle, data)
try: try:
persist, fn = self._context._handle_map[handle] persist, fn = self._context._handle_map[handle]
except KeyError: except KeyError:
@ -335,20 +335,20 @@ class Stream(BasicStream):
del self._context._handle_map[handle] del self._context._handle_map[handle]
fn(data) fn(data)
def Transmit(self): def on_transmit(self):
"""Transmit buffered messages.""" """on_transmit buffered messages."""
IOLOG.debug('%r.Transmit()', self) IOLOG.debug('%r.on_transmit()', self)
written = os.write(self.write_side.fd, self._output_buf[:4096]) written = os.write(self.write_side.fd, self._output_buf[:4096])
self._output_buf = self._output_buf[written:] self._output_buf = self._output_buf[written:]
if (not self._output_buf) and not self._context.broker.graceful_count: if (not self._output_buf) and not self._context.broker.graceful_count:
self.Disconnect() self.on_disconnect()
def WriteMore(self): def has_output(self):
return bool(self._output_buf) return bool(self._output_buf)
def Enqueue(self, handle, obj): def enqueue(self, handle, obj):
"""Enqueue `obj` to `handle`, and tell the broker we have output.""" """enqueue `obj` to `handle`, and tell the broker we have output."""
IOLOG.debug('%r.Enqueue(%r, %r)', self, handle, obj) IOLOG.debug('%r.enqueue(%r, %r)', self, handle, obj)
self._lock.acquire() self._lock.acquire()
try: try:
encoded = cPickle.dumps((handle, obj), protocol=2) encoded = cPickle.dumps((handle, obj), protocol=2)
@ -357,38 +357,38 @@ class Stream(BasicStream):
self._output_buf += self._whmac.digest() + msg self._output_buf += self._whmac.digest() + msg
finally: finally:
self._lock.release() self._lock.release()
self._context.broker.UpdateStream(self) self._context.broker.update_stream(self)
def Disconnect(self): def on_disconnect(self):
"""Close our associated file descriptor and tell registered callbacks """close our associated file descriptor and tell registered callbacks
the connection has been destroyed.""" the connection has been destroyed."""
super(Stream, self).Disconnect() super(Stream, self).on_disconnect()
if self._context.stream is self: if self._context.stream is self:
self._context.Disconnect() self._context.on_disconnect()
for handle, (persist, fn) in self._context._handle_map.iteritems(): for handle, (persist, fn) in self._context._handle_map.iteritems():
LOG.debug('%r.Disconnect(): killing %r: %r', self, handle, fn) LOG.debug('%r.on_disconnect(): killing %r: %r', self, handle, fn)
fn(_DEAD) fn(_DEAD)
def Shutdown(self): def shutdown(self):
"""Override BasicStream behaviour of immediately disconnecting.""" """Override BasicStream behaviour of immediately disconnecting."""
def Accept(self, rfd, wfd): def accept(self, rfd, wfd):
self.read_side = Side(self, os.dup(rfd)) self.read_side = Side(self, os.dup(rfd))
self.write_side = Side(self, os.dup(wfd)) self.write_side = Side(self, os.dup(wfd))
set_cloexec(self.read_side.fd) set_cloexec(self.read_side.fd)
set_cloexec(self.write_side.fd) set_cloexec(self.write_side.fd)
self._context.stream = self self._context.stream = self
def Connect(self): def connect(self):
"""Connect to a Broker at the address specified in our associated """connect to a Broker at the address specified in our associated
Context.""" Context."""
LOG.debug('%r.Connect()', self) LOG.debug('%r.connect()', self)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.read_side = Side(self, sock.fileno()) self.read_side = Side(self, sock.fileno())
self.write_side = Side(self, sock.fileno()) self.write_side = Side(self, sock.fileno())
sock.connect(self._context.parent_addr) sock.connect(self._context.parent_addr)
self.Enqueue(0, self._context.name) self.enqueue(0, self._context.name)
def __repr__(self): def __repr__(self):
return '%s(<context=%r>)' % (self.__class__.__name__, self._context) return '%s(<context=%r>)' % (self.__class__.__name__, self._context)
@ -413,22 +413,22 @@ class Context(object):
self._last_handle = 1000L self._last_handle = 1000L
self._handle_map = {} self._handle_map = {}
self._lock = threading.Lock() self._lock = threading.Lock()
self.AddHandleCB(self._Shutdown, SHUTDOWN) self.add_handle_cb(self._shutdown, SHUTDOWN)
def Shutdown(self): def shutdown(self):
"""Slave does nothing, _BrokerMain() will .Shutdown its streams.""" """Slave does nothing, _broker_main() will .shutdown its streams."""
def _Shutdown(self, data): def _shutdown(self, data):
if data != _DEAD and self.stream: if data != _DEAD and self.stream:
LOG.debug('Received SHUTDOWN') LOG.debug('Received SHUTDOWN')
self.broker.Shutdown() self.broker.shutdown()
def Disconnect(self): def on_disconnect(self):
self.stream = None self.stream = None
LOG.debug('Parent stream is gone, dying.') LOG.debug('Parent stream is gone, dying.')
self.broker.Shutdown() self.broker.shutdown()
def AllocHandle(self): def alloc_handle(self):
"""Allocate a handle.""" """Allocate a handle."""
self._lock.acquire() self._lock.acquire()
try: try:
@ -437,48 +437,48 @@ class Context(object):
finally: finally:
self._lock.release() self._lock.release()
def AddHandleCB(self, fn, handle, persist=True): def add_handle_cb(self, fn, handle, persist=True):
"""Register `fn(obj)` to run for each `obj` sent to `handle`. If """register `fn(obj)` to run for each `obj` sent to `handle`. If
`persist` is ``False`` then unregister after one delivery.""" `persist` is ``False`` then unregister after one delivery."""
IOLOG.debug('%r.AddHandleCB(%r, %r, persist=%r)', IOLOG.debug('%r.add_handle_cb(%r, %r, persist=%r)',
self, fn, handle, persist) self, fn, handle, persist)
self._handle_map[handle] = persist, fn self._handle_map[handle] = persist, fn
def Enqueue(self, handle, obj): def enqueue(self, handle, obj):
if self.stream: if self.stream:
self.stream.Enqueue(handle, obj) self.stream.enqueue(handle, obj)
def EnqueueAwaitReply(self, handle, deadline, data): def enqueue_await_reply(self, handle, deadline, data):
"""Send `data` to `handle` and wait for a response with an optional """Send `data` to `handle` and wait for a response with an optional
timeout. The message contains `(reply_to, data)`, where `reply_to` is timeout. The message contains `(reply_to, data)`, where `reply_to` is
the handle on which this function expects its reply.""" the handle on which this function expects its reply."""
reply_to = self.AllocHandle() reply_to = self.alloc_handle()
LOG.debug('%r.EnqueueAwaitReply(%r, %r, %r) -> reply handle %d', LOG.debug('%r.enqueue_await_reply(%r, %r, %r) -> reply handle %d',
self, handle, deadline, data, reply_to) self, handle, deadline, data, reply_to)
queue = Queue.Queue() queue = Queue.Queue()
def _PutReply(data): def _put_reply(data):
IOLOG.debug('%r._PutReply(%r)', self, data) IOLOG.debug('%r._put_reply(%r)', self, data)
queue.put(data) queue.put(data)
self.AddHandleCB(_PutReply, reply_to, persist=False) self.add_handle_cb(_put_reply, reply_to, persist=False)
self.stream.Enqueue(handle, (reply_to,) + data) self.stream.enqueue(handle, (reply_to,) + data)
try: try:
data = queue.get(True, deadline) data = queue.get(True, deadline)
except Queue.Empty: except Queue.Empty:
self.stream.Disconnect() self.stream.on_disconnect()
raise TimeoutError('deadline exceeded.') raise TimeoutError('deadline exceeded.')
if data == _DEAD: if data == _DEAD:
raise StreamError('lost connection during call.') raise StreamError('lost connection during call.')
IOLOG.debug('%r._EnqueueAwaitReply(): got reply: %r', self, data) IOLOG.debug('%r._enqueue_await_reply(): got reply: %r', self, data)
return data return data
def CallWithDeadline(self, deadline, with_context, fn, *args, **kwargs): def call_with_deadline(self, deadline, with_context, fn, *args, **kwargs):
LOG.debug('%r.CallWithDeadline(%r, %r, %r, *%r, **%r)', LOG.debug('%r.call_with_deadline(%r, %r, %r, *%r, **%r)',
self, deadline, with_context, fn, args, kwargs) self, deadline, with_context, fn, args, kwargs)
if isinstance(fn, types.MethodType) and \ if isinstance(fn, types.MethodType) and \
@ -488,13 +488,13 @@ class Context(object):
klass = None klass = None
call = (with_context, fn.__module__, klass, fn.__name__, args, kwargs) call = (with_context, fn.__module__, klass, fn.__name__, args, kwargs)
result = self.EnqueueAwaitReply(CALL_FUNCTION, deadline, call) result = self.enqueue_await_reply(CALL_FUNCTION, deadline, call)
if isinstance(result, CallError): if isinstance(result, CallError):
raise result raise result
return result return result
def Call(self, fn, *args, **kwargs): def call(self, fn, *args, **kwargs):
return self.CallWithDeadline(None, False, fn, *args, **kwargs) return self.call_with_deadline(None, False, fn, *args, **kwargs)
def __repr__(self): def __repr__(self):
bits = filter(None, (self.name, self.hostname, self.username)) bits = filter(None, (self.name, self.hostname, self.username))
@ -509,16 +509,16 @@ class Waker(BasicStream):
set_cloexec(wfd) set_cloexec(wfd)
self.read_side = Side(self, rfd) self.read_side = Side(self, rfd)
self.write_side = Side(self, wfd) self.write_side = Side(self, wfd)
broker.UpdateStream(self) broker.update_stream(self)
def __repr__(self): def __repr__(self):
return '<Waker>' return '<Waker>'
def Wake(self): def wake(self):
if self.write_side.fd: if self.write_side.fd:
os.write(self.write_side.fd, ' ') os.write(self.write_side.fd, ' ')
def Receive(self): def on_receive(self):
os.read(self.read_side.fd, 1) os.read(self.read_side.fd, 1)
@ -538,31 +538,31 @@ class IoLogger(BasicStream):
self.read_side = Side(self, self._rsock.fileno()) self.read_side = Side(self, self._rsock.fileno())
self.write_side = Side(self, dest_fd) self.write_side = Side(self, dest_fd)
broker.graceful_count += 1 broker.graceful_count += 1
self._broker.UpdateStream(self) self._broker.update_stream(self)
def __repr__(self): def __repr__(self):
return '<IoLogger %s fd %d>' % (self._name, self.read_side.fd) return '<IoLogger %s fd %d>' % (self._name, self.read_side.fd)
def _LogLines(self): def _log_lines(self):
while self._buf.find('\n') != -1: while self._buf.find('\n') != -1:
line, _, self._buf = self._buf.partition('\n') line, _, self._buf = self._buf.partition('\n')
self._log.info('%s', line.rstrip('\n')) self._log.info('%s', line.rstrip('\n'))
def Shutdown(self): def shutdown(self):
LOG.debug('%r.Shutdown()', self) LOG.debug('%r.shutdown()', self)
self._wsock.shutdown(socket.SHUT_WR) self._wsock.shutdown(socket.SHUT_WR)
self._wsock.close() self._wsock.close()
def Receive(self): def on_receive(self):
LOG.debug('%r.Receive()', self) LOG.debug('%r.on_receive()', self)
buf = os.read(self.read_side.fd, 4096) buf = os.read(self.read_side.fd, 4096)
if not buf: if not buf:
LOG.debug('%r decrement graceful_count', self) LOG.debug('%r decrement graceful_count', self)
self._broker.graceful_count -= 1 self._broker.graceful_count -= 1
return self.Disconnect() return self.on_disconnect()
self._buf += buf self._buf += buf
self._LogLines() self._log_lines()
class Broker(object): class Broker(object):
@ -582,12 +582,12 @@ class Broker(object):
self._writers = set() self._writers = set()
self._waker = Waker(self) self._waker = Waker(self)
self._thread = threading.Thread(target=self._BrokerMain, self._thread = threading.Thread(target=self._broker_main,
name='econtext-broker') name='econtext-broker')
self._thread.start() self._thread.start()
def _UpdateStream(self, stream): def _update_stream(self, stream):
IOLOG.debug('_UpdateStream(%r)', stream) IOLOG.debug('_update_stream(%r)', stream)
self._lock.acquire() self._lock.acquire()
try: try:
if stream.read_side.fd is not None: if stream.read_side.fd is not None:
@ -595,91 +595,91 @@ class Broker(object):
else: else:
self._readers.discard(stream.read_side) self._readers.discard(stream.read_side)
if stream.write_side.fd is not None and stream.WriteMore(): if stream.write_side.fd is not None and stream.has_output():
self._writers.add(stream.write_side) self._writers.add(stream.write_side)
else: else:
self._writers.discard(stream.write_side) self._writers.discard(stream.write_side)
finally: finally:
self._lock.release() self._lock.release()
def UpdateStream(self, stream): def update_stream(self, stream):
self._UpdateStream(stream) self._update_stream(stream)
if self._waker: if self._waker:
self._waker.Wake() self._waker.wake()
def Register(self, context): def register(self, context):
"""Put a context under control of this broker.""" """Put a context under control of this broker."""
LOG.debug('%r.Register(%r) -> r=%r w=%r', self, context, LOG.debug('%r.register(%r) -> r=%r w=%r', self, context,
context.stream.read_side, context.stream.read_side,
context.stream.write_side) context.stream.write_side)
self.UpdateStream(context.stream) self.update_stream(context.stream)
self._contexts[context.name] = context self._contexts[context.name] = context
return context return context
def _CallAndUpdate(self, stream, func): def _call_and_update(self, stream, func):
try: try:
func() func()
except Exception: except Exception:
LOG.exception('%r crashed', stream) LOG.exception('%r crashed', stream)
stream.Disconnect() stream.on_disconnect()
self._UpdateStream(stream) self._update_stream(stream)
def _LoopOnce(self, timeout=None): def _loop_once(self, timeout=None):
IOLOG.debug('%r._LoopOnce(%r)', self, timeout) IOLOG.debug('%r._loop_once(%r)', self, timeout)
#IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers]) #IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers])
#IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers]) #IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers])
rsides, wsides, _ = select.select(self._readers, self._writers, rsides, wsides, _ = select.select(self._readers, self._writers,
(), timeout) (), timeout)
for side in rsides: for side in rsides:
IOLOG.debug('%r: POLLIN for %r', self, side.stream) IOLOG.debug('%r: POLLIN for %r', self, side.stream)
self._CallAndUpdate(side.stream, side.stream.Receive) self._call_and_update(side.stream, side.stream.on_receive)
for side in wsides: for side in wsides:
IOLOG.debug('%r: POLLOUT for %r', self, side.stream) IOLOG.debug('%r: POLLOUT for %r', self, side.stream)
self._CallAndUpdate(side.stream, side.stream.Transmit) self._call_and_update(side.stream, side.stream.on_transmit)
def _BrokerMain(self): def _broker_main(self):
"""Handle events until Shutdown() is called.""" """Handle events until shutdown() is called."""
try: try:
while self._alive: while self._alive:
self._LoopOnce() self._loop_once()
for side in self._readers | self._writers: for side in self._readers | self._writers:
self._CallAndUpdate(side.stream, side.stream.Shutdown) self._call_and_update(side.stream, side.stream.shutdown)
deadline = time.time() + self.graceful_timeout deadline = time.time() + self.graceful_timeout
while ((self._readers or self._writers) and while ((self._readers or self._writers) and
(self.graceful_count or time.time() < deadline)): (self.graceful_count or time.time() < deadline)):
self._LoopOnce(1.0) self._loop_once(1.0)
for context in self._contexts.itervalues(): for context in self._contexts.itervalues():
stream = context.stream stream = context.stream
if stream: if stream:
stream.Disconnect() stream.on_disconnect()
self._UpdateStream(stream) self._update_stream(stream)
for side in self._readers | self._writers: for side in self._readers | self._writers:
LOG.error('_BrokerMain() force disconnecting %r', side) LOG.error('_broker_main() force disconnecting %r', side)
side.stream.Disconnect() side.stream.on_disconnect()
except Exception: except Exception:
LOG.exception('_BrokerMain() crashed') LOG.exception('_broker_main() crashed')
def Wait(self): def wait(self):
"""Wait for the broker to stop.""" """wait for the broker to stop."""
self._thread.join() self._thread.join()
def Shutdown(self): def shutdown(self):
"""Gracefully disconnect streams and wait for broker to stop.""" """Gracefully disconnect streams and wait for broker to stop."""
LOG.debug('%r.Shutdown()', self) LOG.debug('%r.shutdown()', self)
self._alive = False self._alive = False
self._waker.Wake() self._waker.wake()
def __repr__(self): def __repr__(self):
return 'Broker()' return 'Broker()'
class ExternalContext(object): class ExternalContext(object):
def _FixupMainModule(self): def _fixup_main_module(self):
main = sys.modules['__main__'] main = sys.modules['__main__']
main.__path__ = [] main.__path__ = []
main.core = main main.core = main
@ -690,7 +690,7 @@ class ExternalContext(object):
if hasattr(klass, '__module__'): if hasattr(klass, '__module__'):
klass.__module__ = 'econtext.core' klass.__module__ = 'econtext.core'
def _ReapFirstStage(self): def _reap_first_stage(self):
os.wait() os.wait()
os.dup2(100, 0) os.dup2(100, 0)
os.close(100) os.close(100)
@ -700,20 +700,20 @@ class ExternalContext(object):
self.context = Context(self.broker, 'master', key=key) self.context = Context(self.broker, 'master', key=key)
self.channel = Channel(self.context, CALL_FUNCTION) self.channel = Channel(self.context, CALL_FUNCTION)
self.context.stream = Stream(self.context) self.context.stream = Stream(self.context)
self.context.stream.Accept(0, 1) self.context.stream.accept(0, 1)
def _SetupLogging(self, log_level): def _setup_logging(self, log_level):
logging.basicConfig(level=log_level) logging.basicConfig(level=log_level)
root = logging.getLogger() root = logging.getLogger()
root.setLevel(log_level) root.setLevel(log_level)
root.handlers = [LogHandler(self.context)] root.handlers = [LogHandler(self.context)]
LOG.debug('Connected to %s', self.context) LOG.debug('Connected to %s', self.context)
def _SetupImporter(self): def _setup_importer(self):
self.importer = SlaveModuleImporter(self.context) self.importer = SlaveModuleImporter(self.context)
sys.meta_path.append(self.importer) sys.meta_path.append(self.importer)
def _SetupStdio(self): def _setup_stdio(self):
self.stdout_log = IoLogger(self.broker, 'stdout', 1) self.stdout_log = IoLogger(self.broker, 'stdout', 1)
self.stderr_log = IoLogger(self.broker, 'stderr', 2) self.stderr_log = IoLogger(self.broker, 'stderr', 2)
# Reopen with line buffering. # Reopen with line buffering.
@ -725,9 +725,9 @@ class ExternalContext(object):
finally: finally:
fp.close() fp.close()
def _DispatchCalls(self): def _dispatch_calls(self):
for data in self.channel: for data in self.channel:
LOG.debug('_DispatchCalls(%r)', data) LOG.debug('_dispatch_calls(%r)', data)
reply_to, with_context, modname, klass, func, args, kwargs = data reply_to, with_context, modname, klass, func, args, kwargs = data
if with_context: if with_context:
args = (self,) + args args = (self,) + args
@ -737,22 +737,22 @@ class ExternalContext(object):
if klass: if klass:
obj = getattr(obj, klass) obj = getattr(obj, klass)
fn = getattr(obj, func) fn = getattr(obj, func)
self.context.Enqueue(reply_to, fn(*args, **kwargs)) self.context.enqueue(reply_to, fn(*args, **kwargs))
except Exception, e: except Exception, e:
self.context.Enqueue(reply_to, CallError(e)) self.context.enqueue(reply_to, CallError(e))
def main(self, key, log_level): def main(self, key, log_level):
self._ReapFirstStage() self._reap_first_stage()
self._FixupMainModule() self._fixup_main_module()
self._SetupMaster(key) self._SetupMaster(key)
try: try:
self._SetupLogging(log_level) self._setup_logging(log_level)
self._SetupImporter() self._setup_importer()
self._SetupStdio() self._setup_stdio()
self.broker.Register(self.context) self.broker.register(self.context)
self._DispatchCalls() self._dispatch_calls()
self.broker.Wait() self.broker.wait()
LOG.debug('ExternalContext.main() exitting') LOG.debug('ExternalContext.main() exitting')
finally: finally:
self.broker.Shutdown() self.broker.shutdown()

@ -22,7 +22,7 @@ DOCSTRING_RE = re.compile(r'""".+?"""', re.M | re.S)
COMMENT_RE = re.compile(r'^\s*#.*$', re.M) COMMENT_RE = re.compile(r'^\s*#.*$', re.M)
def MinimizeSource(source): def minimize_source(source):
"""Remove comments and docstrings from Python `source`, preserving line """Remove comments and docstrings from Python `source`, preserving line
numbers and syntax of empty blocks.""" numbers and syntax of empty blocks."""
subber = lambda match: '""' + ('\n' * match.group(0).count('\n')) subber = lambda match: '""' + ('\n' * match.group(0).count('\n'))
@ -31,13 +31,13 @@ def MinimizeSource(source):
return source.replace(' ', '\t') return source.replace(' ', '\t')
def GetChildModules(module, prefix): def get_child_modules(module, prefix):
"""Return the canonical names of all submodules of a package `module`.""" """Return the canonical names of all submodules of a package `module`."""
it = pkgutil.iter_modules(module.__path__, prefix) it = pkgutil.iter_modules(module.__path__, prefix)
return [name for _, name, _ in it] return [name for _, name, _ in it]
def CreateChild(*args): def create_child(*args):
"""Create a child process whose stdin/stdout is connected to a socket, """Create a child process whose stdin/stdout is connected to a socket,
returning `(pid, socket_obj)`.""" returning `(pid, socket_obj)`."""
parentfp, childfp = socket.socketpair() parentfp, childfp = socket.socketpair()
@ -51,7 +51,7 @@ def CreateChild(*args):
raise SystemExit raise SystemExit
childfp.close() childfp.close()
LOG.debug('CreateChild() child %d fd %d, parent %d, args %r', LOG.debug('create_child() child %d fd %d, parent %d, args %r',
pid, parentfp.fileno(), os.getpid(), args) pid, parentfp.fileno(), os.getpid(), args)
return pid, parentfp return pid, parentfp
@ -65,23 +65,23 @@ class Listener(econtext.core.BasicStream):
econtext.core.set_cloexec(self._sock.fileno()) econtext.core.set_cloexec(self._sock.fileno())
self._listen_addr = self._sock.getsockname() self._listen_addr = self._sock.getsockname()
self.read_side = econtext.core.Side(self, self._sock.fileno()) self.read_side = econtext.core.Side(self, self._sock.fileno())
broker.UpdateStream(self) broker.update_stream(self)
def Receive(self): def on_receive(self):
sock, addr = self._sock.accept() sock, addr = self._sock.accept()
context = Context(self._broker, name=addr) context = Context(self._broker, name=addr)
stream = econtext.core.Stream(context) stream = econtext.core.Stream(context)
stream.Accept(sock.fileno(), sock.fileno()) stream.accept(sock.fileno(), sock.fileno())
class LogForwarder(object): class LogForwarder(object):
def __init__(self, context): def __init__(self, context):
self._context = context self._context = context
self._context.AddHandleCB(self.ForwardLog, self._context.add_handle_cb(self.forward_log,
handle=econtext.core.FORWARD_LOG) handle=econtext.core.FORWARD_LOG)
self._log = RLOG.getChild(self._context.name) self._log = RLOG.getChild(self._context.name)
def ForwardLog(self, data): def forward_log(self, data):
if data == econtext.core._DEAD: if data == econtext.core._DEAD:
return return
@ -92,15 +92,15 @@ class LogForwarder(object):
class ModuleResponder(object): class ModuleResponder(object):
def __init__(self, context): def __init__(self, context):
self._context = context self._context = context
self._context.AddHandleCB(self.GetModule, self._context.add_handle_cb(self.get_module,
handle=econtext.core.GET_MODULE) handle=econtext.core.GET_MODULE)
def GetModule(self, data): def get_module(self, data):
if data == econtext.core._DEAD: if data == econtext.core._DEAD:
return return
reply_to, fullname = data reply_to, fullname = data
LOG.debug('GetModule(%r, %r)', reply_to, fullname) LOG.debug('get_module(%r, %r)', reply_to, fullname)
try: try:
module = __import__(fullname, fromlist=['']) module = __import__(fullname, fromlist=[''])
is_pkg = getattr(module, '__path__', None) is not None is_pkg = getattr(module, '__path__', None) is not None
@ -114,16 +114,16 @@ class ModuleResponder(object):
if is_pkg: if is_pkg:
prefix = module.__name__ + '.' prefix = module.__name__ + '.'
present = GetChildModules(module, prefix) present = get_child_modules(module, prefix)
else: else:
present = None present = None
compressed = zlib.compress(MinimizeSource(source)) compressed = zlib.compress(minimize_source(source))
reply = (is_pkg, present, path, compressed) reply = (is_pkg, present, path, compressed)
self._context.Enqueue(reply_to, reply) self._context.enqueue(reply_to, reply)
except Exception: except Exception:
LOG.debug('While importing %r', fullname, exc_info=True) LOG.debug('While importing %r', fullname, exc_info=True)
self._context.Enqueue(reply_to, None) self._context.enqueue(reply_to, None)
class LocalStream(econtext.core.Stream): class LocalStream(econtext.core.Stream):
@ -137,12 +137,12 @@ class LocalStream(econtext.core.Stream):
super(LocalStream, self).__init__(context) super(LocalStream, self).__init__(context)
self._permitted_classes = set([('econtext.core', 'CallError')]) self._permitted_classes = set([('econtext.core', 'CallError')])
def Shutdown(self): def shutdown(self):
"""Requesting the slave gracefully shut itself down.""" """Requesting the slave gracefully shut itself down."""
LOG.debug('%r enqueuing SHUTDOWN', self) LOG.debug('%r enqueuing SHUTDOWN', self)
self.Enqueue(econtext.core.SHUTDOWN, None) self.enqueue(econtext.core.SHUTDOWN, None)
def _FindGlobal(self, module_name, class_name): def _find_global(self, module_name, class_name):
"""Return the class implementing `module_name.class_name` or raise """Return the class implementing `module_name.class_name` or raise
`StreamError` if the module is not whitelisted.""" `StreamError` if the module is not whitelisted."""
if (module_name, class_name) not in self._permitted_classes: if (module_name, class_name) not in self._permitted_classes:
@ -151,14 +151,14 @@ class LocalStream(econtext.core.Stream):
self._context, class_name, module_name) self._context, class_name, module_name)
return getattr(sys.modules[module_name], class_name) return getattr(sys.modules[module_name], class_name)
def AllowClass(self, module_name, class_name): def allow_class(self, module_name, class_name):
"""Add `module_name` to the list of permitted modules.""" """Add `module_name` to the list of permitted modules."""
self._permitted_modules.add((module_name, class_name)) self._permitted_modules.add((module_name, class_name))
# base64'd and passed to 'python -c'. It forks, dups 0->100, creates a # base64'd and passed to 'python -c'. It forks, dups 0->100, creates a
# pipe, then execs a new interpreter with a custom argv. CONTEXT_NAME is # pipe, then execs a new interpreter with a custom argv. CONTEXT_NAME is
# replaced with the context name. Optimized for size. # replaced with the context name. Optimized for size.
def _FirstStage(): def _first_stage():
import os,sys,zlib import os,sys,zlib
R,W=os.pipe() R,W=os.pipe()
if os.fork(): if os.fork():
@ -172,13 +172,13 @@ class LocalStream(econtext.core.Stream):
print 'OK' print 'OK'
sys.exit(0) sys.exit(0)
def GetBootCommand(self): def get_boot_command(self):
name = self._context.remote_name name = self._context.remote_name
if name is None: if name is None:
name = '%s@%s:%d' name = '%s@%s:%d'
name %= (getpass.getuser(), socket.gethostname(), os.getpid()) name %= (getpass.getuser(), socket.gethostname(), os.getpid())
source = inspect.getsource(self._FirstStage) source = inspect.getsource(self._first_stage)
source = textwrap.dedent('\n'.join(source.strip().split('\n')[1:])) source = textwrap.dedent('\n'.join(source.strip().split('\n')[1:]))
source = source.replace(' ', '\t') source = source.replace(' ', '\t')
source = source.replace('CONTEXT_NAME', repr(name)) source = source.replace('CONTEXT_NAME', repr(name))
@ -189,26 +189,26 @@ class LocalStream(econtext.core.Stream):
def __repr__(self): def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self._context) return '%s(%s)' % (self.__class__.__name__, self._context)
def GetPreamble(self): def get_preamble(self):
source = inspect.getsource(econtext.core) source = inspect.getsource(econtext.core)
source += '\nExternalContext().main%r\n' % (( source += '\nExternalContext().main%r\n' % ((
self._context.key, self._context.key,
LOG.level or logging.getLogger().level or logging.INFO, LOG.level or logging.getLogger().level or logging.INFO,
),) ),)
compressed = zlib.compress(MinimizeSource(source)) compressed = zlib.compress(minimize_source(source))
return str(len(compressed)) + '\n' + compressed return str(len(compressed)) + '\n' + compressed
def Connect(self): def connect(self):
LOG.debug('%r.Connect()', self) LOG.debug('%r.connect()', self)
pid, sock = CreateChild(*self.GetBootCommand()) pid, sock = create_child(*self.get_boot_command())
self.read_side = econtext.core.Side(self, os.dup(sock.fileno())) self.read_side = econtext.core.Side(self, os.dup(sock.fileno()))
self.write_side = econtext.core.Side(self, os.dup(sock.fileno())) self.write_side = econtext.core.Side(self, os.dup(sock.fileno()))
sock.close() sock.close()
LOG.debug('%r.Connect(): child process stdin/stdout=%r', LOG.debug('%r.connect(): child process stdin/stdout=%r',
self, self.read_side.fd) self, self.read_side.fd)
econtext.core.write_all(self.write_side.fd, self.GetPreamble()) econtext.core.write_all(self.write_side.fd, self.get_preamble())
s = os.read(self.read_side.fd, 4096) s = os.read(self.read_side.fd, 4096)
if s != 'OK\n': if s != 'OK\n':
raise econtext.core.StreamError('Bootstrap failed; stdout: %r', s) raise econtext.core.StreamError('Bootstrap failed; stdout: %r', s)
@ -218,12 +218,12 @@ class SSHStream(LocalStream):
#: The path to the SSH binary. #: The path to the SSH binary.
ssh_path = 'ssh' ssh_path = 'ssh'
def GetBootCommand(self): def get_boot_command(self):
bits = [self.ssh_path] bits = [self.ssh_path]
if self._context.username: if self._context.username:
bits += ['-l', self._context.username] bits += ['-l', self._context.username]
bits.append(self._context.hostname) bits.append(self._context.hostname)
base = super(SSHStream, self).GetBootCommand() base = super(SSHStream, self).get_boot_command()
return bits + map(commands.mkarg, base) return bits + map(commands.mkarg, base)
@ -231,21 +231,21 @@ class Broker(econtext.core.Broker):
#: Always allow time for slaves to drain. #: Always allow time for slaves to drain.
graceful_count = 1 graceful_count = 1
def CreateListener(self, address=None, backlog=30): def create_listener(self, address=None, backlog=30):
"""Listen on `address `for connections from newly spawned contexts.""" """Listen on `address `for connections from newly spawned contexts."""
self._listener = Listener(self, address, backlog) self._listener = Listener(self, address, backlog)
def GetLocal(self, name='default', python_path=None): def get_local(self, name='default', python_path=None):
"""Get the named context running on the local machine, creating it if """Get the named context running on the local machine, creating it if
it does not exist.""" it does not exist."""
context = Context(self, name) context = Context(self, name)
context.stream = LocalStream(context) context.stream = LocalStream(context)
if python_path: if python_path:
context.stream.python_path = python_path context.stream.python_path = python_path
context.stream.Connect() context.stream.connect()
return self.Register(context) return self.register(context)
def GetRemote(self, hostname, username, name=None, python_path=None): def get_remote(self, hostname, username, name=None, python_path=None):
"""Get the named remote context, creating it if it does not exist.""" """Get the named remote context, creating it if it does not exist."""
if name is None: if name is None:
name = hostname name = hostname
@ -254,8 +254,8 @@ class Broker(econtext.core.Broker):
context.stream = SSHStream(context) context.stream = SSHStream(context)
if python_path: if python_path:
context.stream.python_path = python_path context.stream.python_path = python_path
context.stream.Connect() context.stream.connect()
return self.Register(context) return self.register(context)
class Context(econtext.core.Context): class Context(econtext.core.Context):
@ -264,5 +264,5 @@ class Context(econtext.core.Context):
self.responder = ModuleResponder(self) self.responder = ModuleResponder(self)
self.log_forwarder = LogForwarder(self) self.log_forwarder = LogForwarder(self)
def Disconnect(self): def on_disconnect(self):
self.stream = None self.stream = None

@ -19,8 +19,8 @@ def run_with_broker(func, *args, **kwargs):
try: try:
return func(broker, *args, **kwargs) return func(broker, *args, **kwargs)
finally: finally:
broker.Shutdown() broker.shutdown()
broker.Wait() broker.wait()
def with_broker(func): def with_broker(func):

Loading…
Cancel
Save