mitogen: Add initial support for importlib ResourceReader

The new classes are modelled closely on their existing Module* counterparts.
For now I've duplicated the code, once it's bedded in I may refactor it. I
didn't replicate the FORWARD_MODULE plumbing, it didn't seem to be necessary
and may be dead code.
pull/1403/head
Alex Willmer 1 week ago
parent 73f60a3123
commit b7eddf2cdb

@ -21,6 +21,11 @@ To avail of fixes in an unreleased version, please download a ZIP file
In progress (unreleased)
------------------------
* :gh:issue:`1398` :mod:`mitogen`: Fix :exc:`FileNotFoundError` during
``import requests`` in a Mitogen child
* :gh:issue:`1403` :mod:`mitogen`: Add initial support for
:py:class:`importlib.resource.abc.ResourceReader` protocol
v0.3.36 (2025-12-01)
--------------------

@ -152,6 +152,8 @@ FORWARD_MODULE = 108
DETACHING = 109
CALL_SERVICE = 110
STUB_CALL_SERVICE = 111
GET_RESOURCE = 112
LOAD_RESOURCE = 113
#: Special value used to signal disconnection or the inability to route a
#: message, when it appears in the `reply_to` field. Usually causes
@ -1751,6 +1753,100 @@ class Importer(object):
return to_text(source)
return source
def get_resource_reader(self, fullname):
"""
Optional :class:`importlib.abc.Loader` method to provide data (files)
bundled with a package.
Introduced in Python 3.7.
"""
return ResourceReader(self._resource_requester, fullname)
class ResourceRequester(object):
"""
Requests Python package resources from upstreams & caches responses.
"""
def __init__(self, router, context):
self._context = context
self._lock = threading.Lock()
self._callbacks = {}
self._cache = {}
router.add_handler(
fn=self._on_load_resource,
handle=LOAD_RESOURCE,
policy=has_parent_authority,
)
def _get_resource(self, fullname, resource):
event = threading.Event()
self._request_resource(fullname, resource, event.set)
event.wait()
content = self._cache[(fullname, resource)]
return content
def _request_resource(self, fullname, resource, callback):
self._lock.acquire()
try:
present = (fullname, resource) in self._cache
if not present:
callbacks = self._callbacks.get((fullname, resource))
if callbacks is not None:
callbacks.append(callback)
else:
self._callbacks[(fullname, resource)] = [callback]
msg = Message.pickled(
(b(fullname), b(resource)),
handle=GET_RESOURCE,
)
self._context.send(msg)
finally:
self._lock.release()
if present:
callback()
def _on_load_resource(self, msg):
if msg.is_dead:
return
tup = msg.unpickle()
fullname, resource, content = tup
self._lock.acquire()
try:
self._cache[(fullname, resource)] = content
callbacks = self._callbacks.pop((fullname, resource), [])
finally:
self._lock.release()
for callback in callbacks:
callback()
class ResourceReader(object):
"""
Implements :class:`importlib.resource.abc.ResourceReader` (Python >= 3.7).
"""
def __init__(self, requester, fullname):
self._requester = requester
self._fullname = fullname
def open_resource(self, resource):
content = self._requester._get_resource(self._fullname, resource)
if content is None:
raise FileNotFoundError
return BytesIO(content)
def resource_path(self, resource):
raise FileNotFoundError
def is_resource(self, name):
content = self._requester._get_resource(self._fullname, name)
return bool(content is not None)
def contents(self):
raise NotImplementedError
class LogHandler(logging.Handler):
"""
@ -4090,6 +4186,10 @@ class ExternalContext(object):
self.router.importer = importer
sys.meta_path.insert(0, self.importer)
def _setup_resource_requester(self):
resource_getter = ResourceRequester(self.router, self.parent)
self.importer._resource_requester = resource_getter
def _setup_package(self):
global mitogen
mitogen = types.ModuleType('mitogen')
@ -4179,6 +4279,7 @@ class ExternalContext(object):
try:
self._setup_logging()
self._setup_importer()
self._setup_resource_requester()
self._reap_first_stage()
if self.config.get('setup_package', True):
self._setup_package()

@ -47,6 +47,9 @@ import threading
import types
import zlib
if sys.version_info >= (3, 7):
import importlib.resources
try:
# Python >= 3.4, PEP 451 ModuleSpec API
import importlib.machinery
@ -1282,6 +1285,48 @@ class ModuleResponder(object):
self._router.broker.defer(self._forward_modules, context, fullnames)
class ResourceResponder(object):
def __init__(self, router):
self._router = router
self._router.add_handler(
self._on_get_resource,
mitogen.core.GET_RESOURCE,
)
def _on_get_resource(self, msg):
if msg.is_dead:
return
stream = self._router.stream_by_id(msg.src_id)
if stream is None:
return
fullname_b, resource_b = msg.unpickle()
fullname, resource = fullname_b.decode(), resource_b.decode()
try:
content = importlib.resources.read_binary(fullname, resource)
except (FileNotFoundError, IsADirectoryError):
content = None
if content is not None:
self._send_resource(stream, fullname, resource, content)
else:
self._send_not_found(stream, fullname, resource)
def _send_resource(self, stream, fullname, resource, content):
msg = mitogen.core.Message.pickled(
(fullname, resource, content),
dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_RESOURCE,
)
self._router._async_route(msg)
def _send_not_found(self, stream, fullname, resource):
msg = mitogen.core.Message.pickled(
(fullname, resource, None),
dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_RESOURCE,
)
stream.protocol.send(msg)
class Broker(mitogen.core.Broker):
"""
.. note::
@ -1372,6 +1417,7 @@ class Router(mitogen.parent.Router):
def upgrade(self):
self.id_allocator = IdAllocator(self)
self.responder = ModuleResponder(self)
self.resource_responder = ResourceResponder(self)
self.log_forwarder = LogForwarder(self)
self.route_monitor = mitogen.parent.RouteMonitor(router=self)
self.add_handler( # TODO: cutpaste.

@ -2314,6 +2314,11 @@ class Router(mitogen.core.Router):
parent_context=parent,
importer=importer,
)
self.resource_responder = ResourceForwarder(
self,
parent,
importer._resource_requester,
)
self.route_monitor = RouteMonitor(self, parent)
self.add_handler(
fn=self._on_detaching,
@ -2799,3 +2804,45 @@ class ModuleForwarder(object):
handle=mitogen.core.LOAD_MODULE,
)
)
class ResourceForwarder(object):
"""
Handle :data:`mitogen.core.GET_RESOURCE` requests from children by
forwarding the request to our parent, or satisfying the request from
our local :class:`mitogen.core.ResourceRequester` cache.
"""
def __init__(self, router, parent_context, requester):
self.router = router
self.parent_context = parent_context
self.requester = requester
router.add_handler(
fn=self._on_get_resource,
handle=mitogen.core.GET_RESOURCE,
persist=True,
policy=is_immediate_child,
)
def _on_get_resource(self, msg):
if msg.is_dead:
return
fullname_b, resource_b = msg.unpickle()
fullname, resource = fullname_b.decode(), resource_b.decode()
callback = lambda: self._on_cache_callback(msg, fullname, resource)
self.requester._request_resource(fullname, resource, callback)
def _on_cache_callback(self, msg, fullname, resource):
stream = self.router.stream_by_id(msg.src_id)
self._send_resource(stream, fullname, resource)
def _send_resource(self, stream, fullname, resource):
content = self.requester._cache[(fullname, resource)]
msg = mitogen.core.Message.pickled(
(fullname, resource, content),
dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_RESOURCE,
)
self.router._async_route(msg)

@ -0,0 +1,2 @@
I'm a nested resource
One layer down

@ -0,0 +1,2 @@
I'm a little resource
Text, not source

@ -0,0 +1,51 @@
import sys
import unittest
import testlib
import resourced_pkg
import resourced_pkg.sub_pkg
@unittest.skipIf(sys.version_info < (3, 7), 'importlib.resources, Python >= 3.7')
class ResourceReaderBaselineTest(testlib.TestCase):
'Assert out-the-box stdlib behaviours to cross validate remote tests.'
def test_is_resource(self):
import importlib.resources
self.assertFalse(importlib.resources.is_resource(resourced_pkg, 'does_not_exist'))
self.assertFalse(importlib.resources.is_resource(resourced_pkg, 'sub_dir'))
self.assertFalse(importlib.resources.is_resource(resourced_pkg.sub_pkg, 'does_not_exist'))
self.assertTrue(importlib.resources.is_resource(resourced_pkg, 'binary'))
self.assertTrue(importlib.resources.is_resource(resourced_pkg, 'empty'))
self.assertTrue(importlib.resources.is_resource(resourced_pkg, 'text.txt'))
self.assertTrue(importlib.resources.is_resource(resourced_pkg, 'sub_dir/empty'))
self.assertTrue(importlib.resources.is_resource(resourced_pkg.sub_pkg, 'text.txt'))
@unittest.skipIf(sys.version_info < (3, 7), 'importlib.resources, Python >= 3.7')
class ResourceReaderTest(testlib.RouterMixin, testlib.TestCase):
def call_is_resource(self, conn):
import importlib.resources
self.assertFalse(conn.call(importlib.resources.is_resource, 'resourced_pkg', 'does_not_exist'))
self.assertFalse(conn.call(importlib.resources.is_resource, 'resourced_pkg', 'sub_dir'))
self.assertFalse(conn.call(importlib.resources.is_resource, 'resourced_pkg.sub_pkg', 'does_not_exist'))
self.assertTrue(conn.call(importlib.resources.is_resource, 'resourced_pkg', 'binary'))
self.assertTrue(conn.call(importlib.resources.is_resource, 'resourced_pkg', 'empty'))
self.assertTrue(conn.call(importlib.resources.is_resource, 'resourced_pkg', 'text.txt'))
self.assertTrue(conn.call(importlib.resources.is_resource, 'resourced_pkg', 'sub_dir/empty'))
self.assertTrue(conn.call(importlib.resources.is_resource, 'resourced_pkg.sub_pkg', 'text.txt'))
def test_is_resource(self):
# Uses the same version of Python so we can be sure importlib.resources is present
# TODO Cross Python version tests
connection = self.router.local(python_path=sys.executable)
self.call_is_resource(conn=connection)
def test_is_resource_2_hops(self):
# Uses the same version of Python so we can be sure importlib.resources is present
# TODO Cross Python version tests
hop_one = self.router.local(python_path=sys.executable)
hop_two = self.router.local(python_path=sys.executable, via=hop_one)
self.call_is_resource(conn=hop_two)
Loading…
Cancel
Save