From 0b0663e21b559ea093ca8c3539634fd2102dde0e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Wed, 10 Aug 2016 22:13:37 +0100 Subject: [PATCH] flake8 --- econtext/core.py | 45 +++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/econtext/core.py b/econtext/core.py index ecafcd77..def167b0 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -17,7 +17,6 @@ import os import random import select import sha -import signal import socket import struct import sys @@ -74,9 +73,11 @@ class CallError(ContextError): class Dead(object): def __eq__(self, other): return type(other) is Dead + def __repr__(self): return '' + _DEAD = Dead() @@ -124,12 +125,12 @@ class Channel(object): def Close(self): """Indicate this channel is closed to the remote side.""" IOLOG.debug('%r.Close()', self) - self._context.Enqueue(handle, _DEAD) + self._context.Enqueue(self._handle, _DEAD) def Send(self, data): """Send `data` to the remote.""" IOLOG.debug('%r.Send(%r)', self, data) - self._context.Enqueue(handle, data) + self._context.Enqueue(self._handle, data) def Receive(self, timeout=None): """Receive an object from the remote, or return ``None`` if `timeout` @@ -205,7 +206,7 @@ class MasterModuleResponder(object): module = __import__(fullname) source = zlib.compress(inspect.getsource(module)) self._context.Enqueue(reply_to, (module.__file__, source)) - except Exception, e: + except Exception: LOG.exception('While importing %r', fullname) self._context.Enqueue(reply_to, None) @@ -319,9 +320,10 @@ class Stream(BasicStream): self._rhmac.update(self._input_buf[20:msg_len+24]) expected_mac = self._rhmac.digest() if msg_mac != expected_mac: - raise CorruptMessageError('%r invalid MAC: expected %r, got %r', + raise CorruptMessageError('%r bad MAC: %r != got %r; %r', self, msg_mac.encode('hex'), - expected_mac.encode('hex')) + expected_mac.encode('hex'), + self._input_buf[24:msg_len+24]) try: handle, data = self.Unpickle(self._input_buf[24:msg_len+24]) @@ -336,8 +338,8 @@ class Stream(BasicStream): LOG.debug('%r._Invoke(): handle=%r; data=%r', self, handle, data) try: persist, fn = self._context._handle_map[handle] - except KeyError, ex: - raise CorruptMessageError('%r got invalid handle: %r', self, handle) + except KeyError: + raise CorruptMessageError('%r: invalid handle: %r', self, handle) if not persist: del self._context._handle_map[handle] @@ -543,7 +545,8 @@ class Context(object): self._handle_map[handle] = persist, fn def Enqueue(self, handle, obj): - self._stream.Enqueue(handle, obj) + if self.stream: + self.stream.Enqueue(handle, obj) def EnqueueAwaitReply(self, handle, deadline, data): """Send `data` to `handle` and wait for a response with an optional @@ -579,7 +582,7 @@ class Context(object): self, deadline, with_context, fn, args, kwargs) if isinstance(fn, types.MethodType) and \ - isinstance(fn.im_self, (type, types.ClassType)): + isinstance(fn.im_self, (type, types.ClassType)): klass = fn.im_self.__name__ else: klass = None @@ -594,8 +597,8 @@ class Context(object): return self.CallWithDeadline(None, False, fn, *args, **kwargs) def __repr__(self): - bits = map(repr, filter(None, [self.name, self.hostname, self.username])) - return 'Context(%s)' % ', '.join(bits) + bits = filter(None, (self.name, self.hostname, self.username)) + return 'Context(%s)' % ', '.join(map(repr, bits)) class Waker(BasicStream): @@ -651,7 +654,7 @@ class IoLogger(BasicStream): def _LogLines(self): while self._buf.find('\n') != -1: line, _, self._buf = self._buf.partition('\n') - self._log.debug('%s: %s', self._name, line.rstrip('\n')) + self._log.info('%s', line.rstrip('\n')) def Receive(self): LOG.debug('%r.Receive()', self) @@ -727,8 +730,8 @@ class Broker(object): def GetRemote(self, hostname, username, name=None, python_path=None): """Get the named remote context, creating it if it does not exist.""" if name is None: - name = 'econtext[%s@%s:%d]' %\ - (username, socket.gethostname(), os.getpid()) + name = '%s@%s:%d' + name %= (getpass.getuser(), socket.gethostname(), os.getpid()) context = Context(self, name, hostname, username) context.stream = SSHStream(context) @@ -740,15 +743,15 @@ class Broker(object): def _CallAndUpdate(self, stream, func): try: func() - except Exception, e: + except Exception: LOG.exception('%r crashed', stream) stream.Disconnect() self._UpdateStream(stream) def _LoopOnce(self): IOLOG.debug('%r.Loop()', self) - #IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers]) - #IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers]) + # IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers]) + # IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers]) rsides, wsides, _ = select.select(self._readers, self._writers, ()) for side in rsides: IOLOG.debug('%r: POLLIN for %r', self, side.stream) @@ -826,6 +829,10 @@ class ExternalContext(object): os.dup2(self.stdout_log.write_side.fd, 1) os.dup2(self.stderr_log.write_side.fd, 2) + # Why is this necessary? + sys.stdout = os.fdopen(self.stdout_log.write_side.fd, 'w', 0) + sys.stderr = os.fdopen(self.stderr_log.write_side.fd, 'w', 0) + fp = file('/dev/null') try: os.dup2(fp.fileno(), 0) @@ -856,6 +863,8 @@ class ExternalContext(object): self._SetupImporter() self._SetupStdio() + # signal.signal(signal.SIGINT, lambda *_: self.broker.Finalize()) + self.broker.Register(self.context) self._DispatchCalls() self.broker.Wait()