You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitogen/econtext/utils.py

79 lines
2.1 KiB
Python

"""
A random assortment of utility functions useful on masters and slaves.
"""
import logging
import sys
import econtext
import econtext.core
import econtext.master
LOG = logging.getLogger('econtext')
def disable_site_packages():
"""Remove all entries mentioning site-packages or Extras from the system
path. Used primarily for testing on OS X within a virtualenv, where OS X
bundles some ancient version of the 'six' module."""
for entry in sys.path[:]:
if 'site-packages' in entry or 'Extras' in entry:
sys.path.remove(entry)
Introduce econtext.core.Router, refactor everything * Header now contains (src, dst) context IDs for routing. * econtext.context_id now contains current process' context ID. * Now do 16kb-sized reads rather than 4kb. * econtext package is uniformly imported in econtext/core.py in slave and master. * Introduce econtext.core.Message() to centralize pickling policy, and various function interfaces, may rip it out again later. * Teach slave/first stage to preserve the copy of econtext.core sent to it, so that it can be used for subsequent slave-of-slave bootstraps. * Disconnect Stream from Context, and teach Context to send messages via Router. In this way the Context class works identically for slaves directly connected via a Stream, or those for whom other slaves are acting as proxies. * Implement Router, which knows a list of contexts reachable via a Stream. Move context registry out of Broker and into Router. * Move _invoke crap out of stream and into Context. * Try to avoid pickling on the Broker thread wherever possible. * Delete connection-specific fields from Context, they live on the associated Stream subclass now instead. * Merge alloc_handle() and add_handle_cb() into add_handler(). * s/enqueue/send/ * Add a hacky guard to prevent send_await() deadlock from Broker thread. * Temporarily break shutdown logic: graceful shutdown is broken since Broker doesn't know about which contexts exist any more. * Handle EIO in iter_read() too. Also need to support ECONNRESET in here. * Make iter_read() show last 100 bytes on failure. * econtext.master.connect() is now econtext.master.Router.connect(), move most of the context/stream construction cutpaste into a single function, and Stream.construct(). * Stop using sys.executable, since it is the empty string when Python has been started with a custom argv[0]. Hard-wire python2.7 for now. * Streams now have names, which are used as the default name for the associated Context during construction. That way Stream<->Context association is still fairly obviously and Stream.repr() prints something nice.
7 years ago
def log_to_tmp():
import os
log_to_file(path='/tmp/econtext.%s.log' % (os.getpid(),))
def log_to_file(path=None, io=True, level=logging.DEBUG):
"""Install a new :py:class:`logging.Handler` writing applications logs to
the filesystem. Useful when debugging slave IO problems."""
log = logging.getLogger('')
if path:
fp = open(path, 'w', 1)
econtext.core.set_cloexec(fp.fileno())
else:
fp = sys.stderr
log.setLevel(level)
if io:
logging.getLogger('econtext.io').setLevel(level)
fmt = '%(asctime)s %(levelname).1s %(name)s: %(message)s'
datefmt = '%H:%M:%S'
handler = logging.StreamHandler(fp)
handler.formatter = logging.Formatter(fmt, datefmt)
log.handlers.insert(0, handler)
Introduce econtext.core.Router, refactor everything * Header now contains (src, dst) context IDs for routing. * econtext.context_id now contains current process' context ID. * Now do 16kb-sized reads rather than 4kb. * econtext package is uniformly imported in econtext/core.py in slave and master. * Introduce econtext.core.Message() to centralize pickling policy, and various function interfaces, may rip it out again later. * Teach slave/first stage to preserve the copy of econtext.core sent to it, so that it can be used for subsequent slave-of-slave bootstraps. * Disconnect Stream from Context, and teach Context to send messages via Router. In this way the Context class works identically for slaves directly connected via a Stream, or those for whom other slaves are acting as proxies. * Implement Router, which knows a list of contexts reachable via a Stream. Move context registry out of Broker and into Router. * Move _invoke crap out of stream and into Context. * Try to avoid pickling on the Broker thread wherever possible. * Delete connection-specific fields from Context, they live on the associated Stream subclass now instead. * Merge alloc_handle() and add_handle_cb() into add_handler(). * s/enqueue/send/ * Add a hacky guard to prevent send_await() deadlock from Broker thread. * Temporarily break shutdown logic: graceful shutdown is broken since Broker doesn't know about which contexts exist any more. * Handle EIO in iter_read() too. Also need to support ECONNRESET in here. * Make iter_read() show last 100 bytes on failure. * econtext.master.connect() is now econtext.master.Router.connect(), move most of the context/stream construction cutpaste into a single function, and Stream.construct(). * Stop using sys.executable, since it is the empty string when Python has been started with a custom argv[0]. Hard-wire python2.7 for now. * Streams now have names, which are used as the default name for the associated Context during construction. That way Stream<->Context association is still fairly obviously and Stream.repr() prints something nice.
7 years ago
def run_with_router(func, *args, **kwargs):
"""Arrange for `func(broker, *args, **kwargs)` to run with a temporary
Introduce econtext.core.Router, refactor everything * Header now contains (src, dst) context IDs for routing. * econtext.context_id now contains current process' context ID. * Now do 16kb-sized reads rather than 4kb. * econtext package is uniformly imported in econtext/core.py in slave and master. * Introduce econtext.core.Message() to centralize pickling policy, and various function interfaces, may rip it out again later. * Teach slave/first stage to preserve the copy of econtext.core sent to it, so that it can be used for subsequent slave-of-slave bootstraps. * Disconnect Stream from Context, and teach Context to send messages via Router. In this way the Context class works identically for slaves directly connected via a Stream, or those for whom other slaves are acting as proxies. * Implement Router, which knows a list of contexts reachable via a Stream. Move context registry out of Broker and into Router. * Move _invoke crap out of stream and into Context. * Try to avoid pickling on the Broker thread wherever possible. * Delete connection-specific fields from Context, they live on the associated Stream subclass now instead. * Merge alloc_handle() and add_handle_cb() into add_handler(). * s/enqueue/send/ * Add a hacky guard to prevent send_await() deadlock from Broker thread. * Temporarily break shutdown logic: graceful shutdown is broken since Broker doesn't know about which contexts exist any more. * Handle EIO in iter_read() too. Also need to support ECONNRESET in here. * Make iter_read() show last 100 bytes on failure. * econtext.master.connect() is now econtext.master.Router.connect(), move most of the context/stream construction cutpaste into a single function, and Stream.construct(). * Stop using sys.executable, since it is the empty string when Python has been started with a custom argv[0]. Hard-wire python2.7 for now. * Streams now have names, which are used as the default name for the associated Context during construction. That way Stream<->Context association is still fairly obviously and Stream.repr() prints something nice.
7 years ago
:py:class:`econtext.master.Router`, ensuring the Router and Broker are
correctly shut down during normal or exceptional return."""
broker = econtext.master.Broker()
Introduce econtext.core.Router, refactor everything * Header now contains (src, dst) context IDs for routing. * econtext.context_id now contains current process' context ID. * Now do 16kb-sized reads rather than 4kb. * econtext package is uniformly imported in econtext/core.py in slave and master. * Introduce econtext.core.Message() to centralize pickling policy, and various function interfaces, may rip it out again later. * Teach slave/first stage to preserve the copy of econtext.core sent to it, so that it can be used for subsequent slave-of-slave bootstraps. * Disconnect Stream from Context, and teach Context to send messages via Router. In this way the Context class works identically for slaves directly connected via a Stream, or those for whom other slaves are acting as proxies. * Implement Router, which knows a list of contexts reachable via a Stream. Move context registry out of Broker and into Router. * Move _invoke crap out of stream and into Context. * Try to avoid pickling on the Broker thread wherever possible. * Delete connection-specific fields from Context, they live on the associated Stream subclass now instead. * Merge alloc_handle() and add_handle_cb() into add_handler(). * s/enqueue/send/ * Add a hacky guard to prevent send_await() deadlock from Broker thread. * Temporarily break shutdown logic: graceful shutdown is broken since Broker doesn't know about which contexts exist any more. * Handle EIO in iter_read() too. Also need to support ECONNRESET in here. * Make iter_read() show last 100 bytes on failure. * econtext.master.connect() is now econtext.master.Router.connect(), move most of the context/stream construction cutpaste into a single function, and Stream.construct(). * Stop using sys.executable, since it is the empty string when Python has been started with a custom argv[0]. Hard-wire python2.7 for now. * Streams now have names, which are used as the default name for the associated Context during construction. That way Stream<->Context association is still fairly obviously and Stream.repr() prints something nice.
7 years ago
router = econtext.master.Router(broker)
try:
Introduce econtext.core.Router, refactor everything * Header now contains (src, dst) context IDs for routing. * econtext.context_id now contains current process' context ID. * Now do 16kb-sized reads rather than 4kb. * econtext package is uniformly imported in econtext/core.py in slave and master. * Introduce econtext.core.Message() to centralize pickling policy, and various function interfaces, may rip it out again later. * Teach slave/first stage to preserve the copy of econtext.core sent to it, so that it can be used for subsequent slave-of-slave bootstraps. * Disconnect Stream from Context, and teach Context to send messages via Router. In this way the Context class works identically for slaves directly connected via a Stream, or those for whom other slaves are acting as proxies. * Implement Router, which knows a list of contexts reachable via a Stream. Move context registry out of Broker and into Router. * Move _invoke crap out of stream and into Context. * Try to avoid pickling on the Broker thread wherever possible. * Delete connection-specific fields from Context, they live on the associated Stream subclass now instead. * Merge alloc_handle() and add_handle_cb() into add_handler(). * s/enqueue/send/ * Add a hacky guard to prevent send_await() deadlock from Broker thread. * Temporarily break shutdown logic: graceful shutdown is broken since Broker doesn't know about which contexts exist any more. * Handle EIO in iter_read() too. Also need to support ECONNRESET in here. * Make iter_read() show last 100 bytes on failure. * econtext.master.connect() is now econtext.master.Router.connect(), move most of the context/stream construction cutpaste into a single function, and Stream.construct(). * Stop using sys.executable, since it is the empty string when Python has been started with a custom argv[0]. Hard-wire python2.7 for now. * Streams now have names, which are used as the default name for the associated Context during construction. That way Stream<->Context association is still fairly obviously and Stream.repr() prints something nice.
7 years ago
return func(router, *args, **kwargs)
finally:
broker.shutdown()
8 years ago
broker.join()
Introduce econtext.core.Router, refactor everything * Header now contains (src, dst) context IDs for routing. * econtext.context_id now contains current process' context ID. * Now do 16kb-sized reads rather than 4kb. * econtext package is uniformly imported in econtext/core.py in slave and master. * Introduce econtext.core.Message() to centralize pickling policy, and various function interfaces, may rip it out again later. * Teach slave/first stage to preserve the copy of econtext.core sent to it, so that it can be used for subsequent slave-of-slave bootstraps. * Disconnect Stream from Context, and teach Context to send messages via Router. In this way the Context class works identically for slaves directly connected via a Stream, or those for whom other slaves are acting as proxies. * Implement Router, which knows a list of contexts reachable via a Stream. Move context registry out of Broker and into Router. * Move _invoke crap out of stream and into Context. * Try to avoid pickling on the Broker thread wherever possible. * Delete connection-specific fields from Context, they live on the associated Stream subclass now instead. * Merge alloc_handle() and add_handle_cb() into add_handler(). * s/enqueue/send/ * Add a hacky guard to prevent send_await() deadlock from Broker thread. * Temporarily break shutdown logic: graceful shutdown is broken since Broker doesn't know about which contexts exist any more. * Handle EIO in iter_read() too. Also need to support ECONNRESET in here. * Make iter_read() show last 100 bytes on failure. * econtext.master.connect() is now econtext.master.Router.connect(), move most of the context/stream construction cutpaste into a single function, and Stream.construct(). * Stop using sys.executable, since it is the empty string when Python has been started with a custom argv[0]. Hard-wire python2.7 for now. * Streams now have names, which are used as the default name for the associated Context during construction. That way Stream<->Context association is still fairly obviously and Stream.repr() prints something nice.
7 years ago
def with_router(func):
"""Decorator version of :py:func:`run_with_broker`. Example:
.. code-block:: python
@with_broker
def do_stuff(broker, arg):
pass
do_stuff(blah, 123)
"""
def wrapper(*args, **kwargs):
Introduce econtext.core.Router, refactor everything * Header now contains (src, dst) context IDs for routing. * econtext.context_id now contains current process' context ID. * Now do 16kb-sized reads rather than 4kb. * econtext package is uniformly imported in econtext/core.py in slave and master. * Introduce econtext.core.Message() to centralize pickling policy, and various function interfaces, may rip it out again later. * Teach slave/first stage to preserve the copy of econtext.core sent to it, so that it can be used for subsequent slave-of-slave bootstraps. * Disconnect Stream from Context, and teach Context to send messages via Router. In this way the Context class works identically for slaves directly connected via a Stream, or those for whom other slaves are acting as proxies. * Implement Router, which knows a list of contexts reachable via a Stream. Move context registry out of Broker and into Router. * Move _invoke crap out of stream and into Context. * Try to avoid pickling on the Broker thread wherever possible. * Delete connection-specific fields from Context, they live on the associated Stream subclass now instead. * Merge alloc_handle() and add_handle_cb() into add_handler(). * s/enqueue/send/ * Add a hacky guard to prevent send_await() deadlock from Broker thread. * Temporarily break shutdown logic: graceful shutdown is broken since Broker doesn't know about which contexts exist any more. * Handle EIO in iter_read() too. Also need to support ECONNRESET in here. * Make iter_read() show last 100 bytes on failure. * econtext.master.connect() is now econtext.master.Router.connect(), move most of the context/stream construction cutpaste into a single function, and Stream.construct(). * Stop using sys.executable, since it is the empty string when Python has been started with a custom argv[0]. Hard-wire python2.7 for now. * Streams now have names, which are used as the default name for the associated Context during construction. That way Stream<->Context association is still fairly obviously and Stream.repr() prints something nice.
7 years ago
return run_with_router(func, *args, **kwargs)
wrapper.func_name = func.func_name
return wrapper