@ -26,6 +26,7 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import pprint
import sys
import threading
@ -35,11 +36,20 @@ from mitogen.core import LOG
class Service ( object ) :
#: Sentinel object to suppress reply generation, since returning ``None``
#: will trigger a response message containing the pickled ``None``.
NO_REPLY = object ( )
#: If ``None``, a handle is dynamically allocated, otherwise the fixed
#: integer handle to use.
handle = None
max_message_size = 0
#: Mapping from required key names to their required corresponding types,
#: used by the default :py:meth:`validate_args` implementation to validate
#: requests.
required_args = { }
def __init__ ( self , router ) :
self . router = router
self . recv = mitogen . core . Receiver ( router , self . handle )
@ -48,7 +58,14 @@ class Service(object):
self . running = True
def validate_args ( self , args ) :
return True
return (
isinstance ( args , dict ) and
all ( isinstance ( args . get ( k ) , t )
for k , t in self . required_args . iteritems ( ) )
)
def dispatch ( self , args , msg ) :
raise NotImplementedError ( )
def dispatch_one ( self , msg ) :
if len ( msg . data ) > self . max_message_size :
@ -64,7 +81,9 @@ class Service(object):
return
try :
msg . reply ( self . dispatch ( args , msg ) )
response = self . dispatch ( args , msg )
if response is not self . NO_REPLY :
msg . reply ( response )
except Exception , e :
LOG . exception ( ' While invoking %r .dispatch() ' , self )
msg . reply ( mitogen . core . CallError ( e ) )
@ -85,8 +104,90 @@ class Service(object):
self . run_once ( )
class DeduplicatingService ( Service ) :
"""
A service that deduplicates and caches expensive responses . Requests are
deduplicated according to a customizable key , and the single expensive
response is broadcast to all requestors .
A side effect of this class is that processing of the single response is
always serialized according to the result of : py : meth : ` key_from_request ` .
Only one pool thread is blocked during generation of the response ,
regardless of the number of requestors .
"""
def __init__ ( self , router ) :
super ( DeduplicatingService , self ) . __init__ ( router )
self . _responses = { }
self . _waiters = { }
self . _lock = threading . Lock ( )
def key_from_request ( self , args ) :
"""
Generate a deduplication key from the request . The default
implementation returns a string based on a stable representation of the
input dictionary generated by : py : func : ` pprint . pformat ` .
"""
return pprint . pformat ( args )
def get_response ( self , args ) :
raise NotImplementedError ( )
def _produce_response ( self , key , response ) :
self . _lock . acquire ( )
try :
assert key not in self . _responses
assert key in self . _waiters
self . _responses [ key ] = response
for msg in self . _waiters . pop ( key ) :
msg . reply ( response )
finally :
self . _lock . release ( )
def dispatch ( self , args , msg ) :
key = self . key_from_request ( args )
self . _lock . acquire ( )
try :
if key in self . _responses :
return self . _responses [ key ]
if key in self . _waiters :
self . _waiters [ key ] . append ( msg )
return self . NO_REPLY
self . _waiters [ key ] = [ msg ]
finally :
self . _lock . release ( )
# I'm the unlucky thread that must generate the response.
try :
self . _produce_response ( key , self . get_response ( args ) )
except Exception , e :
self . _produce_response ( key , mitogen . core . CallError ( e ) )
return self . NO_REPLY
class Pool ( object ) :
"""
Manage a pool of at least one thread that will be used to process messages
for a collection of services .
Internally this is implemented by subscribing every : py : class : ` Service ` ' s
: py : class : ` mitogen . core . Receiver ` using a single
: py : class : ` mitogen . master . Select ` , then arranging for every thread to
consume messages delivered to that select .
In this way the threads are fairly shared by all available services , and no
resources are dedicated to a single idle service .
There is no penalty for exposing large numbers of services ; the list of
exposed services could even be generated dynamically in response to your
program ' s configuration or its input data.
"""
def __init__ ( self , router , services , size = 1 ) :
assert size > 0
self . router = router
self . services = list ( services )
self . size = size