diff --git a/docs/api.rst b/docs/api.rst index 58b6b500..edec8a5f 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -83,9 +83,9 @@ contexts. total = 0 recvs = [c.call_async(long_running_operation) for c in contexts] - for recv, (msg, data) in mitogen.master.Select(recvs): - print 'Got %s from %s' % (data, recv) - total += data + for msg in mitogen.master.Select(recvs): + print 'Got %s from %s' % (msg, msg.receiver) + total += msg.unpickle() # Iteration ends when last Receiver yields a result. print 'Received total %s from %s receivers' % (total, len(recvs)) @@ -96,8 +96,8 @@ contexts. with mitogen.master.Select(oneshot=False) as select: while running(): - for recv, (msg, data) in select: - process_result(recv.context, msg.unpickle()) + for msg in select: + process_result(msg.receiver.context, msg.unpickle()) for context, workfunc in get_new_work(): select.add(context.call_async(workfunc)) @@ -114,8 +114,8 @@ contexts. ]) ] - for recv, (msg, data) in mitogen.master.Select(selects): - print data + for msg in mitogen.master.Select(selects): + print msg.unpickle() .. py:method:: get (timeout=None) @@ -123,11 +123,14 @@ contexts. :py:class:`mitogen.core.TimeoutError` if no value is available within `timeout` seconds. + On success, the message's :py:attr:`receiver + ` attribute is set to the receiver. + :param float timeout: Timeout in seconds. :return: - `(receiver, (msg, data))` + :py:class:`mitogen.core.Message` .. py:method:: __bool__ () diff --git a/mitogen/core.py b/mitogen/core.py index 6c58da16..aae36a76 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -223,6 +223,7 @@ class Message(object): data = '' router = None + receiver = None def __init__(self, **kwargs): self.src_id = mitogen.context_id diff --git a/mitogen/master.py b/mitogen/master.py index 236127e9..4f8d5135 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -146,8 +146,7 @@ class Select(object): def __iter__(self): while self._receivers: - recv, msg = self.get() - yield recv, msg + yield self.get() loop_msg = 'Adding this Select instance would create a Select cycle' @@ -206,7 +205,8 @@ class Select(object): msg = recv.get(block=False) if self._oneshot: self.remove(recv) - return recv, msg + msg.receiver = recv + return msg except mitogen.core.TimeoutError: # A receiver may have been queued with no result if another # thread drained it before we woke up, or because another diff --git a/tests/select_test.py b/tests/select_test.py index bf73561d..b057d322 100644 --- a/tests/select_test.py +++ b/tests/select_test.py @@ -183,7 +183,7 @@ class IterTest(testlib.RouterMixin, testlib.TestCase): select = self.klass([recv]) msg = mitogen.core.Message.pickled('123') recv._on_receive(msg) - self.assertEquals([(recv, (msg, '123'))], list(select)) + self.assertEquals([msg], list(select)) class OneShotTest(testlib.RouterMixin, testlib.TestCase): @@ -194,7 +194,7 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase): select = self.klass([recv]) msg = mitogen.core.Message.pickled('123') recv._on_receive(msg) - recv, (msg_, data) = select.get() + msg_ = select.get() self.assertEquals(msg, msg_) self.assertEquals(0, len(select._receivers)) self.assertEquals(None, recv.notify) @@ -204,7 +204,8 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase): select = self.klass([recv], oneshot=False) msg = mitogen.core.Message.pickled('123') recv._on_receive(msg) - self.assertEquals((recv, (msg, '123')), select.get()) + + self.assertEquals(msg, select.get()) self.assertEquals(1, len(select._receivers)) self.assertEquals(recv, select._receivers[0]) self.assertEquals(select._put, recv.notify) @@ -241,22 +242,29 @@ class GetTest(testlib.RouterMixin, testlib.TestCase): recv = mitogen.core.Receiver(self.router) recv._on_receive(mitogen.core.Message.pickled('123')) select = self.klass([recv]) - recv, (msg, data) = select.get() - self.assertEquals('123', data) + msg = select.get() + self.assertEquals('123', msg.unpickle()) def test_nonempty_after_add(self): recv = mitogen.core.Receiver(self.router) select = self.klass([recv]) recv._on_receive(mitogen.core.Message.pickled('123')) - recv, (msg, data) = select.get() - self.assertEquals('123', data) + msg = select.get() + self.assertEquals('123', msg.unpickle()) + + def test_nonempty_receiver_attr_set(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass([recv]) + recv._on_receive(mitogen.core.Message.pickled('123')) + msg = select.get() + self.assertEquals(msg.receiver, recv) def test_drained_by_other_thread(self): recv = mitogen.core.Receiver(self.router) recv._on_receive(mitogen.core.Message.pickled('123')) select = self.klass([recv]) - msg, data = recv.get() - self.assertEquals('123', data) + msg = recv.get() + self.assertEquals('123', msg.unpickle()) self.assertRaises(mitogen.core.TimeoutError, lambda: select.get(timeout=0.0))