|
|
|
"""
|
|
|
|
A random assortment of utility functions useful on masters and slaves.
|
|
|
|
"""
|
|
|
|
|
|
|
|
import logging
|
|
|
|
import sys
|
|
|
|
|
|
|
|
import econtext
|
|
|
|
import econtext.core
|
|
|
|
import econtext.master
|
|
|
|
|
|
|
|
|
|
|
|
LOG = logging.getLogger('econtext')
|
|
|
|
|
|
|
|
|
|
|
|
def disable_site_packages():
|
|
|
|
"""Remove all entries mentioning site-packages or Extras from the system
|
|
|
|
path. Used primarily for testing on OS X within a virtualenv, where OS X
|
|
|
|
bundles some ancient version of the 'six' module."""
|
|
|
|
for entry in sys.path[:]:
|
|
|
|
if 'site-packages' in entry or 'Extras' in entry:
|
|
|
|
sys.path.remove(entry)
|
|
|
|
|
|
|
|
|
Introduce econtext.core.Router, refactor everything
* Header now contains (src, dst) context IDs for routing.
* econtext.context_id now contains current process' context ID.
* Now do 16kb-sized reads rather than 4kb.
* econtext package is uniformly imported in econtext/core.py in slave
and master.
* Introduce econtext.core.Message() to centralize pickling policy, and
various function interfaces, may rip it out again later.
* Teach slave/first stage to preserve the copy of econtext.core sent to
it, so that it can be used for subsequent slave-of-slave bootstraps.
* Disconnect Stream from Context, and teach Context to send messages via
Router. In this way the Context class works identically for slaves
directly connected via a Stream, or those for whom other slaves are
acting as proxies.
* Implement Router, which knows a list of contexts reachable via a
Stream. Move context registry out of Broker and into Router.
* Move _invoke crap out of stream and into Context.
* Try to avoid pickling on the Broker thread wherever possible.
* Delete connection-specific fields from Context, they live on the
associated Stream subclass now instead.
* Merge alloc_handle() and add_handle_cb() into add_handler().
* s/enqueue/send/
* Add a hacky guard to prevent send_await() deadlock from Broker thread.
* Temporarily break shutdown logic: graceful shutdown is broken since
Broker doesn't know about which contexts exist any more.
* Handle EIO in iter_read() too. Also need to support ECONNRESET in here.
* Make iter_read() show last 100 bytes on failure.
* econtext.master.connect() is now econtext.master.Router.connect(),
move most of the context/stream construction cutpaste into a single
function, and Stream.construct().
* Stop using sys.executable, since it is the empty string when Python
has been started with a custom argv[0]. Hard-wire python2.7 for now.
* Streams now have names, which are used as the default name for the
associated Context during construction. That way Stream<->Context
association is still fairly obviously and Stream.repr() prints
something nice.
7 years ago
|
|
|
def log_to_tmp():
|
|
|
|
import os
|
|
|
|
log_to_file(path='/tmp/econtext.%s.log' % (os.getpid(),))
|
|
|
|
|
|
|
|
|
|
|
|
def log_to_file(path=None, io=True, level=logging.DEBUG):
|
|
|
|
"""Install a new :py:class:`logging.Handler` writing applications logs to
|
|
|
|
the filesystem. Useful when debugging slave IO problems."""
|
|
|
|
log = logging.getLogger('')
|
|
|
|
if path:
|
|
|
|
fp = open(path, 'w', 1)
|
|
|
|
econtext.core.set_cloexec(fp.fileno())
|
|
|
|
else:
|
|
|
|
fp = sys.stderr
|
|
|
|
|
|
|
|
log.setLevel(level)
|
|
|
|
if io:
|
|
|
|
logging.getLogger('econtext.io').setLevel(level)
|
|
|
|
|
|
|
|
fmt = '%(asctime)s %(levelname).1s %(name)s: %(message)s'
|
|
|
|
datefmt = '%H:%M:%S'
|
|
|
|
handler = logging.StreamHandler(fp)
|
|
|
|
handler.formatter = logging.Formatter(fmt, datefmt)
|
|
|
|
log.handlers.insert(0, handler)
|
|
|
|
|
|
|
|
|
Introduce econtext.core.Router, refactor everything
* Header now contains (src, dst) context IDs for routing.
* econtext.context_id now contains current process' context ID.
* Now do 16kb-sized reads rather than 4kb.
* econtext package is uniformly imported in econtext/core.py in slave
and master.
* Introduce econtext.core.Message() to centralize pickling policy, and
various function interfaces, may rip it out again later.
* Teach slave/first stage to preserve the copy of econtext.core sent to
it, so that it can be used for subsequent slave-of-slave bootstraps.
* Disconnect Stream from Context, and teach Context to send messages via
Router. In this way the Context class works identically for slaves
directly connected via a Stream, or those for whom other slaves are
acting as proxies.
* Implement Router, which knows a list of contexts reachable via a
Stream. Move context registry out of Broker and into Router.
* Move _invoke crap out of stream and into Context.
* Try to avoid pickling on the Broker thread wherever possible.
* Delete connection-specific fields from Context, they live on the
associated Stream subclass now instead.
* Merge alloc_handle() and add_handle_cb() into add_handler().
* s/enqueue/send/
* Add a hacky guard to prevent send_await() deadlock from Broker thread.
* Temporarily break shutdown logic: graceful shutdown is broken since
Broker doesn't know about which contexts exist any more.
* Handle EIO in iter_read() too. Also need to support ECONNRESET in here.
* Make iter_read() show last 100 bytes on failure.
* econtext.master.connect() is now econtext.master.Router.connect(),
move most of the context/stream construction cutpaste into a single
function, and Stream.construct().
* Stop using sys.executable, since it is the empty string when Python
has been started with a custom argv[0]. Hard-wire python2.7 for now.
* Streams now have names, which are used as the default name for the
associated Context during construction. That way Stream<->Context
association is still fairly obviously and Stream.repr() prints
something nice.
7 years ago
|
|
|
def run_with_router(func, *args, **kwargs):
|
|
|
|
"""Arrange for `func(broker, *args, **kwargs)` to run with a temporary
|
Introduce econtext.core.Router, refactor everything
* Header now contains (src, dst) context IDs for routing.
* econtext.context_id now contains current process' context ID.
* Now do 16kb-sized reads rather than 4kb.
* econtext package is uniformly imported in econtext/core.py in slave
and master.
* Introduce econtext.core.Message() to centralize pickling policy, and
various function interfaces, may rip it out again later.
* Teach slave/first stage to preserve the copy of econtext.core sent to
it, so that it can be used for subsequent slave-of-slave bootstraps.
* Disconnect Stream from Context, and teach Context to send messages via
Router. In this way the Context class works identically for slaves
directly connected via a Stream, or those for whom other slaves are
acting as proxies.
* Implement Router, which knows a list of contexts reachable via a
Stream. Move context registry out of Broker and into Router.
* Move _invoke crap out of stream and into Context.
* Try to avoid pickling on the Broker thread wherever possible.
* Delete connection-specific fields from Context, they live on the
associated Stream subclass now instead.
* Merge alloc_handle() and add_handle_cb() into add_handler().
* s/enqueue/send/
* Add a hacky guard to prevent send_await() deadlock from Broker thread.
* Temporarily break shutdown logic: graceful shutdown is broken since
Broker doesn't know about which contexts exist any more.
* Handle EIO in iter_read() too. Also need to support ECONNRESET in here.
* Make iter_read() show last 100 bytes on failure.
* econtext.master.connect() is now econtext.master.Router.connect(),
move most of the context/stream construction cutpaste into a single
function, and Stream.construct().
* Stop using sys.executable, since it is the empty string when Python
has been started with a custom argv[0]. Hard-wire python2.7 for now.
* Streams now have names, which are used as the default name for the
associated Context during construction. That way Stream<->Context
association is still fairly obviously and Stream.repr() prints
something nice.
7 years ago
|
|
|
:py:class:`econtext.master.Router`, ensuring the Router and Broker are
|
|
|
|
correctly shut down during normal or exceptional return."""
|
|
|
|
broker = econtext.master.Broker()
|
Introduce econtext.core.Router, refactor everything
* Header now contains (src, dst) context IDs for routing.
* econtext.context_id now contains current process' context ID.
* Now do 16kb-sized reads rather than 4kb.
* econtext package is uniformly imported in econtext/core.py in slave
and master.
* Introduce econtext.core.Message() to centralize pickling policy, and
various function interfaces, may rip it out again later.
* Teach slave/first stage to preserve the copy of econtext.core sent to
it, so that it can be used for subsequent slave-of-slave bootstraps.
* Disconnect Stream from Context, and teach Context to send messages via
Router. In this way the Context class works identically for slaves
directly connected via a Stream, or those for whom other slaves are
acting as proxies.
* Implement Router, which knows a list of contexts reachable via a
Stream. Move context registry out of Broker and into Router.
* Move _invoke crap out of stream and into Context.
* Try to avoid pickling on the Broker thread wherever possible.
* Delete connection-specific fields from Context, they live on the
associated Stream subclass now instead.
* Merge alloc_handle() and add_handle_cb() into add_handler().
* s/enqueue/send/
* Add a hacky guard to prevent send_await() deadlock from Broker thread.
* Temporarily break shutdown logic: graceful shutdown is broken since
Broker doesn't know about which contexts exist any more.
* Handle EIO in iter_read() too. Also need to support ECONNRESET in here.
* Make iter_read() show last 100 bytes on failure.
* econtext.master.connect() is now econtext.master.Router.connect(),
move most of the context/stream construction cutpaste into a single
function, and Stream.construct().
* Stop using sys.executable, since it is the empty string when Python
has been started with a custom argv[0]. Hard-wire python2.7 for now.
* Streams now have names, which are used as the default name for the
associated Context during construction. That way Stream<->Context
association is still fairly obviously and Stream.repr() prints
something nice.
7 years ago
|
|
|
router = econtext.master.Router(broker)
|
|
|
|
try:
|
Introduce econtext.core.Router, refactor everything
* Header now contains (src, dst) context IDs for routing.
* econtext.context_id now contains current process' context ID.
* Now do 16kb-sized reads rather than 4kb.
* econtext package is uniformly imported in econtext/core.py in slave
and master.
* Introduce econtext.core.Message() to centralize pickling policy, and
various function interfaces, may rip it out again later.
* Teach slave/first stage to preserve the copy of econtext.core sent to
it, so that it can be used for subsequent slave-of-slave bootstraps.
* Disconnect Stream from Context, and teach Context to send messages via
Router. In this way the Context class works identically for slaves
directly connected via a Stream, or those for whom other slaves are
acting as proxies.
* Implement Router, which knows a list of contexts reachable via a
Stream. Move context registry out of Broker and into Router.
* Move _invoke crap out of stream and into Context.
* Try to avoid pickling on the Broker thread wherever possible.
* Delete connection-specific fields from Context, they live on the
associated Stream subclass now instead.
* Merge alloc_handle() and add_handle_cb() into add_handler().
* s/enqueue/send/
* Add a hacky guard to prevent send_await() deadlock from Broker thread.
* Temporarily break shutdown logic: graceful shutdown is broken since
Broker doesn't know about which contexts exist any more.
* Handle EIO in iter_read() too. Also need to support ECONNRESET in here.
* Make iter_read() show last 100 bytes on failure.
* econtext.master.connect() is now econtext.master.Router.connect(),
move most of the context/stream construction cutpaste into a single
function, and Stream.construct().
* Stop using sys.executable, since it is the empty string when Python
has been started with a custom argv[0]. Hard-wire python2.7 for now.
* Streams now have names, which are used as the default name for the
associated Context during construction. That way Stream<->Context
association is still fairly obviously and Stream.repr() prints
something nice.
7 years ago
|
|
|
return func(router, *args, **kwargs)
|
|
|
|
finally:
|
|
|
|
broker.shutdown()
|
|
|
|
broker.join()
|
|
|
|
|
|
|
|
|
Introduce econtext.core.Router, refactor everything
* Header now contains (src, dst) context IDs for routing.
* econtext.context_id now contains current process' context ID.
* Now do 16kb-sized reads rather than 4kb.
* econtext package is uniformly imported in econtext/core.py in slave
and master.
* Introduce econtext.core.Message() to centralize pickling policy, and
various function interfaces, may rip it out again later.
* Teach slave/first stage to preserve the copy of econtext.core sent to
it, so that it can be used for subsequent slave-of-slave bootstraps.
* Disconnect Stream from Context, and teach Context to send messages via
Router. In this way the Context class works identically for slaves
directly connected via a Stream, or those for whom other slaves are
acting as proxies.
* Implement Router, which knows a list of contexts reachable via a
Stream. Move context registry out of Broker and into Router.
* Move _invoke crap out of stream and into Context.
* Try to avoid pickling on the Broker thread wherever possible.
* Delete connection-specific fields from Context, they live on the
associated Stream subclass now instead.
* Merge alloc_handle() and add_handle_cb() into add_handler().
* s/enqueue/send/
* Add a hacky guard to prevent send_await() deadlock from Broker thread.
* Temporarily break shutdown logic: graceful shutdown is broken since
Broker doesn't know about which contexts exist any more.
* Handle EIO in iter_read() too. Also need to support ECONNRESET in here.
* Make iter_read() show last 100 bytes on failure.
* econtext.master.connect() is now econtext.master.Router.connect(),
move most of the context/stream construction cutpaste into a single
function, and Stream.construct().
* Stop using sys.executable, since it is the empty string when Python
has been started with a custom argv[0]. Hard-wire python2.7 for now.
* Streams now have names, which are used as the default name for the
associated Context during construction. That way Stream<->Context
association is still fairly obviously and Stream.repr() prints
something nice.
7 years ago
|
|
|
def with_router(func):
|
|
|
|
"""Decorator version of :py:func:`run_with_broker`. Example:
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
@with_broker
|
|
|
|
def do_stuff(broker, arg):
|
|
|
|
pass
|
|
|
|
|
|
|
|
do_stuff(blah, 123)
|
|
|
|
"""
|
|
|
|
def wrapper(*args, **kwargs):
|
Introduce econtext.core.Router, refactor everything
* Header now contains (src, dst) context IDs for routing.
* econtext.context_id now contains current process' context ID.
* Now do 16kb-sized reads rather than 4kb.
* econtext package is uniformly imported in econtext/core.py in slave
and master.
* Introduce econtext.core.Message() to centralize pickling policy, and
various function interfaces, may rip it out again later.
* Teach slave/first stage to preserve the copy of econtext.core sent to
it, so that it can be used for subsequent slave-of-slave bootstraps.
* Disconnect Stream from Context, and teach Context to send messages via
Router. In this way the Context class works identically for slaves
directly connected via a Stream, or those for whom other slaves are
acting as proxies.
* Implement Router, which knows a list of contexts reachable via a
Stream. Move context registry out of Broker and into Router.
* Move _invoke crap out of stream and into Context.
* Try to avoid pickling on the Broker thread wherever possible.
* Delete connection-specific fields from Context, they live on the
associated Stream subclass now instead.
* Merge alloc_handle() and add_handle_cb() into add_handler().
* s/enqueue/send/
* Add a hacky guard to prevent send_await() deadlock from Broker thread.
* Temporarily break shutdown logic: graceful shutdown is broken since
Broker doesn't know about which contexts exist any more.
* Handle EIO in iter_read() too. Also need to support ECONNRESET in here.
* Make iter_read() show last 100 bytes on failure.
* econtext.master.connect() is now econtext.master.Router.connect(),
move most of the context/stream construction cutpaste into a single
function, and Stream.construct().
* Stop using sys.executable, since it is the empty string when Python
has been started with a custom argv[0]. Hard-wire python2.7 for now.
* Streams now have names, which are used as the default name for the
associated Context during construction. That way Stream<->Context
association is still fairly obviously and Stream.repr() prints
something nice.
7 years ago
|
|
|
return run_with_router(func, *args, **kwargs)
|
|
|
|
wrapper.func_name = func.func_name
|
|
|
|
return wrapper
|