From 09f3bda1bb8f437e9fb1220114153e3562c0cd94 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 10 Jan 2014 11:45:55 +0000 Subject: [PATCH] econtext-head --- econtext.py | 157 ++++++++++++++++++++++++++++++++--------------- econtext_test.py | 10 +++ st.py | 25 +++++++- 3 files changed, 139 insertions(+), 53 deletions(-) diff --git a/econtext.py b/econtext.py index 20547629..10da048a 100755 --- a/econtext.py +++ b/econtext.py @@ -71,9 +71,13 @@ def Log(fmt, *args): def CreateChild(*args): ''' - Create a child process whos stdin/stdout is connected to a socket. + Create a child process whose stdin/stdout is connected to a socket. + + Args: + *args: executable name and process arguments. + Returns: - pid, socket + pid, sock ''' sock1, sock2 = socket.socketpair() pid = os.fork() @@ -87,6 +91,9 @@ def CreateChild(*args): class PartialFunction(object): + ''' + Partial function implementation. + ''' def __init__(self, fn, *partial_args): self.fn = fn self.partial_args = partial_args @@ -191,22 +198,16 @@ class Channel(object): class SlaveModuleImporter(object): ''' - This implements the import protocol described in PEP 302. It works like so: - - - Python asks it if it can import a module. - - It asks Python (via imp module) if it can import the module. - - If Python says yes, it says no. - - If Python says no, it asks the parent context for the module. - - If the module isn't returned by the parent, asplode, otherwise ask Python - to load the returned module. - - This roundabout crap is necessary because the built-in importer is tried only - after custom hooks are. A class method is provided for the parent context to - satisfy the module request; it will only return modules that have been loaded - in the parent context. + Import protocol implementation that fetches modules from the parent process. ''' def __init__(self, context): + ''' + Initialise a new instance. + + Args: + context: Context instance this importer will communicate via. + ''' self._context = context def find_module(self, fullname, path=None): @@ -231,6 +232,12 @@ class SlaveModuleImporter(object): class Stream(object): def __init__(self, context): + ''' + Initialize a new Stream instance. + + Args: + context: econtext.Context + ''' self._context = context self._input_buf = self._output_buf = '' @@ -255,6 +262,15 @@ class Stream(object): self._unpickler.persistent_load = self._LoadFunctionFromPerID def Pickle(self, obj): + ''' + Serialize the given object using the pickler. + + Args: + obj: object + + Returns: + str + ''' self._pickler.dump(obj) data = self._pickler_file.getvalue() self._pickler_file.seek(0) @@ -262,6 +278,15 @@ class Stream(object): return data def Unpickle(self, data): + ''' + Unserialize the given string using the unpickler. + + Args: + data: str + + Returns: + object + ''' Log('%r.Unpickle(%r)', self, data) self._unpickler_file.write(data) self._unpickler_file.seek(0) @@ -272,8 +297,8 @@ class Stream(object): def _CheckFunctionPerID(self, obj): ''' - Please see the cPickle documentation. Given an object, return None - indicating normal pickle processing or a string 'persistent ID'. + Return None or a persistent ID for an object. + Please see the cPickle documentation. Args: obj: object @@ -288,8 +313,8 @@ class Stream(object): def _LoadFunctionFromPerID(self, pid): ''' - Please see the cPickle documentation. Given a string created by - _CheckFunctionPerID, turn it into an object again. + Load an object from a persistent ID. + Please see the cPickle documentation. Args: pid: str @@ -317,13 +342,12 @@ class Stream(object): def AddHandleCB(self, fn, handle, persist=True): ''' - Arrange to invoke the given function for all messages tagged with the given - handle. By default, process one message and discard this arrangement. + Invoke a function for all messages with the given handle. Args: fn: callable handle: long - persist: bool + persist: False to only receive a single message. ''' Log('%r.AddHandleCB(%r, %r, persist=%r)', self, fn, handle, persist) self._handle_lock.acquire() @@ -373,23 +397,33 @@ class Stream(object): def Transmit(self): ''' - Transmit pending messages. Raises IOError on failure. Return value - indicates whether there is still data buffered. + Transmit buffered messages. Returns: - bool + bool: more data left in bufer? + + Raises: + IOError ''' Log('%r.Transmit()', self) written = os.write(self._fd, self._output_buf[:4096]) self._output_buf = self._output_buf[written:] return bool(self._output_buf) - def Enqueue(self, handle, data): - Log('%r.Enqueue(%r, %r)', self, handle, data) + def Enqueue(self, handle, obj): + ''' + Serialize an object, send it to the given handle, and tell our context's + broker we have output. + + Args: + handle: long + obj: object + ''' + Log('%r.Enqueue(%r, %r)', self, handle, obj) self._output_buf_lock.acquire() try: - encoded = self.Pickle((handle, data)) + encoded = self.Pickle((handle, obj)) msg = struct.pack('>L', len(encoded)) + encoded self._whmac.update(msg) self._output_buf += self._whmac.digest() + msg @@ -399,13 +433,14 @@ class Stream(object): def Disconnect(self): ''' - Called to handle disconnects. + Close our associated file descriptor and tell any registered callbacks + that the connection has been destroyed. ''' Log('%r.Disconnect()', self) try: os.close(self._fd) except OSError, e: - Log('WARNING: %s', e) + Log('%r.Disconnect(): did not close fd %s: %s', self, self._fd, e) for handle, (persist, fn) in self._handle_map.iteritems(): Log('%r.Disconnect(): killing stale callback handle=%r; fn=%r', @@ -413,16 +448,20 @@ class Stream(object): fn(True, None) @classmethod - def Accept(cls, broker, sock): - context = Context(broker) + def Accept(cls, context, sock): + ''' + + ''' stream = cls(context) - context.SetStream(stream) + context.SetStream() broker.Register(context) def Connect(self): ''' - Connect to a Broker at the address given in the Context instance. + Connect to a Broker at the address specified in our associated Context. ''' + + Log('%r.Connect()', self) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._fd = sock.fileno() sock.connect(self._context.parent_addr) @@ -510,7 +549,7 @@ class LocalStream(Stream): Log('%r.Connect()', self) pid, sock = CreateChild(*self.GetBootCommand()) self._fd = sock.fileno() - Log('%r.Connect(): chlid process stdin/stdout=%r', self, self._fd) + Log('%r.Connect(): child process stdin/stdout=%r', self, self._fd) source = inspect.getsource(sys.modules[__name__]) source += '\nExternalContextMain(%r, %r, %r)\n' %\ @@ -596,7 +635,14 @@ class Context(object): Log('%r.CallWithDeadline(%r, %r, *%r, **%r)', self, fn, deadline, args, kwargs) - call = (fn.__module__, fn.__name__, args, kwargs) + use_channel = bool(kwargs.pop('use_channel', False)) + if isinstance(fn, types.MethodType) and \ + isinstance(fn.im_self, (type, types.ClassType)): + fn_class = fn.im_self.__name__ + else: + fn_class = None + + call = (use_channel, fn.__module__, fn_class, fn.__name__, args, kwargs) success, result = self.EnqueueAwaitReply(CALL_FUNCTION, deadline, call) if success: @@ -628,23 +674,31 @@ class Broker(object): self._contexts = {} self._wake_rfd, self._wake_wfd = os.pipe() + self._listen_sock = None self._poller.register(self._wake_rfd) - self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._listen_sock.bind(('0.0.0.0', 0)) # plz 2 allocate 4 me kthx. - self._listen_sock.listen(5) - self._listen_addr = self._listen_sock.getsockname() - self._poller.register(self._listen_sock) - self._thread = threading.Thread(target=self.Loop, name='Broker') self._thread.setDaemon(True) self._thread.start() + def CreateListener(self, address=None, backlog=30): + ''' + Create a socket to accept connections from newly spawned contexts. + Args: + address: The IPv4 address tuple to listen on. + backlog: Number of connections to accept while broker thread is busy. + ''' + self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._listen_sock.bind(address or ('0.0.0.0', 0)) + self._listen_sock.listen(backlog) + self._listen_addr = self._listen_sock.getsockname() + self._poller.register(self._listen_sock) + def Register(self, context): ''' Put a context under control of this broker. ''' - Log('%r.Register(%r)', self, context) + Log('%r.Register(%r) -> fd=%r', self, context, context.GetStream().fileno()) self._poller_lock.acquire() os.write(self._wake_wfd, ' ') try: @@ -692,25 +746,26 @@ class Broker(object): if fd == self._wake_rfd: Log('%r: got event on wake_rfd=%d.', self, self._wake_rfd) os.read(self._wake_rfd, 1) - break - elif fd == self._listen_sock.fileno(): - Stream.Accept(self, self._listen_sock.accept()) + continue + elif self._listen_sock and fd == self._listen_sock.fileno(): + context = Context(broker) + Stream.Accept(context, self._listen_sock.accept()) continue obj = self._poller_fd_map[fd] if event & select.POLLHUP: - Log('%r: POLLHUP on %r', self, obj) + Log('%r: POLLHUP for %d, %r', self, fd, obj) obj.Disconnect() elif event & select.POLLIN: - Log('%r: POLLIN on %r', self, obj) + Log('%r: POLLIN for %d, %r', self, fd, obj) obj.Receive() elif event & select.POLLOUT: - Log('%r: POLLOUT on %r', self, obj) + Log('%r: POLLOUT for %d, %r', self, fd, obj) if not obj.Transmit(): # If no output buffered, unset POLLOUT. self._poller.unregister(obj) self._poller.register(obj, select.POLLIN) elif event & select.POLLNVAL: - Log('%r: POLLNVAL for %r', self, obj) + Log('%r: POLLNVAL for %d, %r', self, fd, obj) obj.Disconnect() self._poller.unregister(obj) @@ -754,4 +809,4 @@ def ExternalContextMain(context_name, parent_addr, key): try: stream.Enqueue(reply_handle, (True, fn(*args, **kwargs))) except Exception, e: - stram.Enqueue(reply_handle, (False, (e, traceback.extract_stack()))) + stream.Enqueue(reply_handle, (False, (e, traceback.extract_stack()))) diff --git a/econtext_test.py b/econtext_test.py index c7958019..513ae6dc 100755 --- a/econtext_test.py +++ b/econtext_test.py @@ -12,6 +12,16 @@ try: ret = localhost.Evaluate(DoStuff) except OSError, e: + + +Tests + - Test Channel objects to destruction. + - External contexts sometimes don't appear to die during a crash. This needs + tested to destruction. + - Test reconnecting to previously idle-killed contexts. + - Test remote context longevity to destruction. They should never stay + around after parent dies or disconnects. + """ import sys diff --git a/st.py b/st.py index 35811263..50b238ca 100644 --- a/st.py +++ b/st.py @@ -1,4 +1,25 @@ -def try_something_silly(name): - file('/tmp/foo', 'w').write("hello " + name) +import socket +def GetCurrentHostname(): + ''' + Fetch the current hostname. + ''' + return socket.gethostname() + + +def LogCurrentUptime(hostname, pathname='/tmp/uptime.txt'): + ''' + Log the current uptime along with process ID that logs it. + + Args: + hostname: the string hostname. + ''' + + fp = file(pathname, 'a') + fp.write('%d %s %s\n' % (os.getpid(), hostname, os.popen('uptime').read())) + fp.close() + + +def try_something_silly(arg): + file('tty', 'w').write('ARG WAS: ' + str(arg) + '\n')