Replace dodgy synchronization

Use Queue.Queue() rather than event.
pull/35/head
David Wilson 8 years ago
parent e62b891b9a
commit 07ba2de7b0

@ -4,7 +4,7 @@
Python External Execution Contexts. Python External Execution Contexts.
''' '''
import atexit import Queue
import cPickle import cPickle
import cStringIO import cStringIO
import commands import commands
@ -124,9 +124,7 @@ class Channel(object):
def __init__(self, stream, handle): def __init__(self, stream, handle):
self._stream = stream self._stream = stream
self._handle = handle self._handle = handle
self._wake_event = threading.Event() self._queue = Queue.Queue()
self._queue_lock = threading.Lock()
self._queue = []
self._stream.AddHandleCB(self._InternalReceive, handle) self._stream.AddHandleCB(self._InternalReceive, handle)
def _InternalReceive(self, killed, data): def _InternalReceive(self, killed, data):
@ -146,12 +144,7 @@ class Channel(object):
) )
''' '''
LOG.debug('%r._InternalReceive(%r, %r)', self, killed, data) LOG.debug('%r._InternalReceive(%r, %r)', self, killed, data)
self._queue_lock.acquire() self._queue.put((killed or data[0], killed or data[1]))
try:
self._queue.append((killed or data[0], killed or data[1]))
self._wake_event.set()
finally:
self._queue_lock.release()
def Close(self): def Close(self):
''' '''
@ -179,22 +172,15 @@ class Channel(object):
object object
''' '''
LOG.debug('%r.Receive(%r)', self, timeout) LOG.debug('%r.Receive(%r)', self, timeout)
if not self._queue:
self._wake_event.wait(timeout)
if not self._wake_event.isSet():
return
self._queue_lock.acquire()
try: try:
self._wake_event.clear() killed, data = self._queue.get(True, timeout)
LOG.debug('%r.Receive() queue is %r', self, self._queue) except Queue.Empty:
killed, data = self._queue.pop(0) return
LOG.debug('%r.Receive() got killed=%r, data=%r', self, killed, data)
if killed: LOG.debug('%r.Receive() got killed=%r, data=%r', self, killed, data)
raise ChannelError('Channel is closed.') if killed:
return data raise ChannelError('Channel is closed.')
finally: return data
self._queue_lock.release()
def __iter__(self): def __iter__(self):
''' '''
@ -298,6 +284,9 @@ class BasicStream(object):
class Stream(BasicStream): class Stream(BasicStream):
_input_buf = ''
_output_buf = ''
def __init__(self, context): def __init__(self, context):
''' '''
Initialize a new Stream instance. Initialize a new Stream instance.
@ -306,19 +295,13 @@ class Stream(BasicStream):
context: econtext.Context context: econtext.Context
''' '''
self._context = context self._context = context
self._lock = threading.Lock()
self._input_buf = self._output_buf = ''
self._input_buf_lock = threading.Lock()
self._output_buf_lock = threading.Lock()
self._rhmac = hmac.new(context.key, digestmod=sha.new) self._rhmac = hmac.new(context.key, digestmod=sha.new)
self._whmac = self._rhmac.copy() self._whmac = self._rhmac.copy()
self._last_handle = 1000L self._last_handle = 1000L
self._handle_map = {} self._handle_map = {}
self._handle_lock = threading.Lock()
self._func_refs = {}
self._func_ref_lock = threading.Lock()
self._pickler_file = cStringIO.StringIO() self._pickler_file = cStringIO.StringIO()
self._pickler = cPickle.Pickler(self._pickler_file, protocol=2) self._pickler = cPickle.Pickler(self._pickler_file, protocol=2)
@ -367,12 +350,12 @@ class Stream(BasicStream):
Returns: Returns:
long long
''' '''
self._handle_lock.acquire() self._lock.acquire()
try: try:
self._last_handle += 1L self._last_handle += 1L
return self._last_handle
finally: finally:
self._handle_lock.release() self._lock.release()
return self._last_handle
def AddHandleCB(self, fn, handle, persist=True): def AddHandleCB(self, fn, handle, persist=True):
''' '''
@ -385,11 +368,7 @@ class Stream(BasicStream):
''' '''
LOG.debug('%r.AddHandleCB(%r, %r, persist=%r)', LOG.debug('%r.AddHandleCB(%r, %r, persist=%r)',
self, fn, handle, persist) self, fn, handle, persist)
self._handle_lock.acquire() self._handle_map[handle] = persist, fn
try:
self._handle_map[handle] = persist, fn
finally:
self._handle_lock.release()
def Receive(self): def Receive(self):
''' '''
@ -465,14 +444,14 @@ class Stream(BasicStream):
''' '''
LOG.debug('%r.Enqueue(%r, %r)', self, handle, obj) LOG.debug('%r.Enqueue(%r, %r)', self, handle, obj)
self._output_buf_lock.acquire() self._lock.acquire()
try: try:
encoded = self.Pickle((handle, obj)) encoded = self.Pickle((handle, obj))
msg = struct.pack('>L', len(encoded)) + encoded msg = struct.pack('>L', len(encoded)) + encoded
self._whmac.update(msg) self._whmac.update(msg)
self._output_buf += self._whmac.digest() + msg self._output_buf += self._whmac.digest() + msg
finally: finally:
self._output_buf_lock.release() self._lock.release()
self._context.broker.UpdateStream(self, wake=True) self._context.broker.UpdateStream(self, wake=True)
def Disconnect(self): def Disconnect(self):
@ -579,7 +558,7 @@ class LocalStream(Stream):
# Hexed and passed to 'python -c'. It forks, dups 0->100, creates a pipe, # Hexed and passed to 'python -c'. It forks, dups 0->100, creates a pipe,
# then execs a new interpreter with a custom argv. CONTEXT_NAME is replaced # then execs a new interpreter with a custom argv. CONTEXT_NAME is replaced
# with the context name. Optimized for source size. # with the context name. Optimized for size.
def _FirstStage(): def _FirstStage():
import os,sys,zlib import os,sys,zlib
R,W=os.pipe() R,W=os.pipe()
@ -670,26 +649,24 @@ class Context(object):
reply_handle is the handle on which this function expects its reply. reply_handle is the handle on which this function expects its reply.
''' '''
reply_handle = self._stream.AllocHandle() reply_handle = self._stream.AllocHandle()
reply_event = threading.Event()
container = []
LOG.debug('%r.EnqueueAwaitReply(%r, %r, %r) -> reply handle %d', LOG.debug('%r.EnqueueAwaitReply(%r, %r, %r) -> reply handle %d',
self, handle, deadline, data, reply_handle) self, handle, deadline, data, reply_handle)
queue = Queue.Queue()
def _Receive(killed, data): def _Receive(killed, data):
LOG.debug('%r._Receive(%r, %r)', self, killed, data) LOG.debug('%r._Receive(%r, %r)', self, killed, data)
container.extend([killed, data]) queue.put((killed, data))
reply_event.set()
self._stream.AddHandleCB(_Receive, reply_handle, persist=False) self._stream.AddHandleCB(_Receive, reply_handle, persist=False)
self._stream.Enqueue(handle, (False, (reply_handle,) + data)) self._stream.Enqueue(handle, (False, (reply_handle,) + data))
reply_event.wait(deadline) try:
if not reply_event.isSet(): killed, data = queue.get(True, deadline)
self.Disconnect() except Queue.Empty:
self._stream.Disconnect()
raise TimeoutError('deadline exceeded.') raise TimeoutError('deadline exceeded.')
killed, data = container
if killed: if killed:
raise StreamError('lost connection during call.') raise StreamError('lost connection during call.')

Loading…
Cancel
Save