|
|
|
@ -347,7 +347,11 @@ class Stream(BasicStream):
|
|
|
|
|
"""Transmit buffered messages."""
|
|
|
|
|
IOLOG.debug('%r.on_transmit()', self)
|
|
|
|
|
written = os.write(self.write_side.fd, self._output_buf[:4096])
|
|
|
|
|
self._output_buf = self._output_buf[written:]
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
self._output_buf = self._output_buf[written:]
|
|
|
|
|
finally:
|
|
|
|
|
self._lock.release()
|
|
|
|
|
if (not self._output_buf) and not self._context.broker.graceful_count:
|
|
|
|
|
self.on_disconnect()
|
|
|
|
|
|
|
|
|
@ -357,10 +361,10 @@ class Stream(BasicStream):
|
|
|
|
|
def enqueue(self, handle, obj):
|
|
|
|
|
"""Enqueue `obj` to `handle`, and tell the broker we have output."""
|
|
|
|
|
IOLOG.debug('%r.enqueue(%r, %r)', self, handle, obj)
|
|
|
|
|
encoded = cPickle.dumps((handle, obj), protocol=2)
|
|
|
|
|
msg = struct.pack('>L', len(encoded)) + encoded
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
encoded = cPickle.dumps((handle, obj), protocol=2)
|
|
|
|
|
msg = struct.pack('>L', len(encoded)) + encoded
|
|
|
|
|
self._whmac.update(msg)
|
|
|
|
|
self._output_buf += self._whmac.digest() + msg
|
|
|
|
|
finally:
|
|
|
|
|