pull/35/head
David Wilson 8 years ago
parent 34595dd00f
commit 0b0663e21b

@ -17,7 +17,6 @@ import os
import random
import select
import sha
import signal
import socket
import struct
import sys
@ -74,9 +73,11 @@ class CallError(ContextError):
class Dead(object):
def __eq__(self, other):
return type(other) is Dead
def __repr__(self):
return '<Dead>'
_DEAD = Dead()
@ -124,12 +125,12 @@ class Channel(object):
def Close(self):
"""Indicate this channel is closed to the remote side."""
IOLOG.debug('%r.Close()', self)
self._context.Enqueue(handle, _DEAD)
self._context.Enqueue(self._handle, _DEAD)
def Send(self, data):
"""Send `data` to the remote."""
IOLOG.debug('%r.Send(%r)', self, data)
self._context.Enqueue(handle, data)
self._context.Enqueue(self._handle, data)
def Receive(self, timeout=None):
"""Receive an object from the remote, or return ``None`` if `timeout`
@ -205,7 +206,7 @@ class MasterModuleResponder(object):
module = __import__(fullname)
source = zlib.compress(inspect.getsource(module))
self._context.Enqueue(reply_to, (module.__file__, source))
except Exception, e:
except Exception:
LOG.exception('While importing %r', fullname)
self._context.Enqueue(reply_to, None)
@ -319,9 +320,10 @@ class Stream(BasicStream):
self._rhmac.update(self._input_buf[20:msg_len+24])
expected_mac = self._rhmac.digest()
if msg_mac != expected_mac:
raise CorruptMessageError('%r invalid MAC: expected %r, got %r',
raise CorruptMessageError('%r bad MAC: %r != got %r; %r',
self, msg_mac.encode('hex'),
expected_mac.encode('hex'))
expected_mac.encode('hex'),
self._input_buf[24:msg_len+24])
try:
handle, data = self.Unpickle(self._input_buf[24:msg_len+24])
@ -336,8 +338,8 @@ class Stream(BasicStream):
LOG.debug('%r._Invoke(): handle=%r; data=%r', self, handle, data)
try:
persist, fn = self._context._handle_map[handle]
except KeyError, ex:
raise CorruptMessageError('%r got invalid handle: %r', self, handle)
except KeyError:
raise CorruptMessageError('%r: invalid handle: %r', self, handle)
if not persist:
del self._context._handle_map[handle]
@ -543,7 +545,8 @@ class Context(object):
self._handle_map[handle] = persist, fn
def Enqueue(self, handle, obj):
self._stream.Enqueue(handle, obj)
if self.stream:
self.stream.Enqueue(handle, obj)
def EnqueueAwaitReply(self, handle, deadline, data):
"""Send `data` to `handle` and wait for a response with an optional
@ -579,7 +582,7 @@ class Context(object):
self, deadline, with_context, fn, args, kwargs)
if isinstance(fn, types.MethodType) and \
isinstance(fn.im_self, (type, types.ClassType)):
isinstance(fn.im_self, (type, types.ClassType)):
klass = fn.im_self.__name__
else:
klass = None
@ -594,8 +597,8 @@ class Context(object):
return self.CallWithDeadline(None, False, fn, *args, **kwargs)
def __repr__(self):
bits = map(repr, filter(None, [self.name, self.hostname, self.username]))
return 'Context(%s)' % ', '.join(bits)
bits = filter(None, (self.name, self.hostname, self.username))
return 'Context(%s)' % ', '.join(map(repr, bits))
class Waker(BasicStream):
@ -651,7 +654,7 @@ class IoLogger(BasicStream):
def _LogLines(self):
while self._buf.find('\n') != -1:
line, _, self._buf = self._buf.partition('\n')
self._log.debug('%s: %s', self._name, line.rstrip('\n'))
self._log.info('%s', line.rstrip('\n'))
def Receive(self):
LOG.debug('%r.Receive()', self)
@ -727,8 +730,8 @@ class Broker(object):
def GetRemote(self, hostname, username, name=None, python_path=None):
"""Get the named remote context, creating it if it does not exist."""
if name is None:
name = 'econtext[%s@%s:%d]' %\
(username, socket.gethostname(), os.getpid())
name = '%s@%s:%d'
name %= (getpass.getuser(), socket.gethostname(), os.getpid())
context = Context(self, name, hostname, username)
context.stream = SSHStream(context)
@ -740,15 +743,15 @@ class Broker(object):
def _CallAndUpdate(self, stream, func):
try:
func()
except Exception, e:
except Exception:
LOG.exception('%r crashed', stream)
stream.Disconnect()
self._UpdateStream(stream)
def _LoopOnce(self):
IOLOG.debug('%r.Loop()', self)
#IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers])
#IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers])
# IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers])
# IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers])
rsides, wsides, _ = select.select(self._readers, self._writers, ())
for side in rsides:
IOLOG.debug('%r: POLLIN for %r', self, side.stream)
@ -826,6 +829,10 @@ class ExternalContext(object):
os.dup2(self.stdout_log.write_side.fd, 1)
os.dup2(self.stderr_log.write_side.fd, 2)
# Why is this necessary?
sys.stdout = os.fdopen(self.stdout_log.write_side.fd, 'w', 0)
sys.stderr = os.fdopen(self.stderr_log.write_side.fd, 'w', 0)
fp = file('/dev/null')
try:
os.dup2(fp.fileno(), 0)
@ -856,6 +863,8 @@ class ExternalContext(object):
self._SetupImporter()
self._SetupStdio()
# signal.signal(signal.SIGINT, lambda *_: self.broker.Finalize())
self.broker.Register(self.context)
self._DispatchCalls()
self.broker.Wait()

Loading…
Cancel
Save