|
|
|
@ -9,14 +9,12 @@ import cPickle
|
|
|
|
|
import cStringIO
|
|
|
|
|
import errno
|
|
|
|
|
import fcntl
|
|
|
|
|
import hmac
|
|
|
|
|
import imp
|
|
|
|
|
import itertools
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
import random
|
|
|
|
|
import select
|
|
|
|
|
import sha
|
|
|
|
|
import socket
|
|
|
|
|
import struct
|
|
|
|
|
import sys
|
|
|
|
@ -510,8 +508,6 @@ class Stream(BasicStream):
|
|
|
|
|
self._router = router
|
|
|
|
|
self.remote_id = remote_id
|
|
|
|
|
self.key = key
|
|
|
|
|
self._rhmac = hmac.new(key, digestmod=sha)
|
|
|
|
|
self._whmac = self._rhmac.copy()
|
|
|
|
|
self.name = 'default'
|
|
|
|
|
self.construct(**kwargs)
|
|
|
|
|
|
|
|
|
@ -534,16 +530,15 @@ class Stream(BasicStream):
|
|
|
|
|
if not buf:
|
|
|
|
|
return self.on_disconnect(broker)
|
|
|
|
|
|
|
|
|
|
HEADER_FMT = '>20shhLLL'
|
|
|
|
|
HEADER_FMT = '>hhLLL'
|
|
|
|
|
HEADER_LEN = struct.calcsize(HEADER_FMT)
|
|
|
|
|
MAC_LEN = sha.digest_size
|
|
|
|
|
|
|
|
|
|
def _receive_one(self, broker):
|
|
|
|
|
if len(self._input_buf) < self.HEADER_LEN:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
msg = Message()
|
|
|
|
|
(msg_mac, msg.dst_id, msg.src_id,
|
|
|
|
|
(msg.dst_id, msg.src_id,
|
|
|
|
|
msg.handle, msg.reply_to, msg_len) = struct.unpack(
|
|
|
|
|
self.HEADER_FMT,
|
|
|
|
|
self._input_buf[:self.HEADER_LEN]
|
|
|
|
@ -554,16 +549,6 @@ class Stream(BasicStream):
|
|
|
|
|
self, msg_len, len(self._input_buf) - self.HEADER_LEN)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
self._rhmac.update(self._input_buf[
|
|
|
|
|
self.MAC_LEN : (msg_len + self.HEADER_LEN)
|
|
|
|
|
])
|
|
|
|
|
expected_mac = self._rhmac.digest()
|
|
|
|
|
if msg_mac != expected_mac:
|
|
|
|
|
raise StreamError('bad MAC: %r != got %r; %r',
|
|
|
|
|
msg_mac.encode('hex'),
|
|
|
|
|
expected_mac.encode('hex'),
|
|
|
|
|
self._input_buf[24:msg_len+24])
|
|
|
|
|
|
|
|
|
|
msg.data = self._input_buf[self.HEADER_LEN:self.HEADER_LEN+msg_len]
|
|
|
|
|
self._input_buf = self._input_buf[self.HEADER_LEN+msg_len:]
|
|
|
|
|
self._router.route(msg)
|
|
|
|
@ -590,8 +575,7 @@ class Stream(BasicStream):
|
|
|
|
|
pkt = struct.pack('>hhLLL', msg.dst_id, msg.src_id,
|
|
|
|
|
msg.handle, msg.reply_to or 0, len(msg.data)
|
|
|
|
|
) + msg.data
|
|
|
|
|
self._whmac.update(pkt)
|
|
|
|
|
self._output_buf += self._whmac.digest() + pkt
|
|
|
|
|
self._output_buf += pkt
|
|
|
|
|
self._router.broker.start_transmit(self)
|
|
|
|
|
|
|
|
|
|
def on_disconnect(self, broker):
|
|
|
|
|