You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
280 lines
12 KiB
Python
280 lines
12 KiB
Python
|
|
#
|
|
# This program is a stand-in for good intro docs. It just documents various
|
|
# basics of using Mitogen.
|
|
#
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import print_function
|
|
|
|
import hashlib
|
|
import io
|
|
import os
|
|
|
|
import mitogen.core
|
|
import mitogen.master
|
|
import mitogen.service
|
|
import mitogen.utils
|
|
|
|
|
|
|
|
def get_file_contents(path):
|
|
"""
|
|
Get the contents of a file.
|
|
"""
|
|
with open(path, 'rb') as fp:
|
|
# mitogen.core.Blob() is a bytes subclass with a repr() that returns a
|
|
# summary of the blob, rather than the raw blob data. This makes
|
|
# logging output *much* nicer. Unlike most custom types, blobs can be
|
|
# serialized.
|
|
return mitogen.core.Blob(fp.read())
|
|
|
|
|
|
def put_file_contents(path, s):
|
|
"""
|
|
Write the contents of a file.
|
|
"""
|
|
with open(path, 'wb') as fp:
|
|
fp.write(s)
|
|
|
|
|
|
def streamy_download_file(context, path):
|
|
"""
|
|
Fetch a file from the FileService hosted by `context`.
|
|
"""
|
|
bio = io.BytesIO()
|
|
|
|
# FileService.get() is not actually an exposed service method, it's just a
|
|
# classmethod that wraps up the complicated dance of implementing the
|
|
# transfer.
|
|
ok, metadata = mitogen.service.FileService.get(context, path, bio)
|
|
|
|
return {
|
|
'success': ok,
|
|
'metadata': metadata,
|
|
'size': len(bio.getvalue()),
|
|
}
|
|
|
|
|
|
def md5sum(path):
|
|
"""
|
|
Return the MD5 checksum for a file.
|
|
"""
|
|
return hashlib.md5(get_file_contents(path)).hexdigest()
|
|
|
|
|
|
|
|
def work_on_machine(context):
|
|
"""
|
|
Do stuff to a remote context.
|
|
"""
|
|
print("Created context. Context ID is", context.context_id)
|
|
|
|
# You don't need to understand any/all of this, but it's helpful to grok
|
|
# the whole chain:
|
|
|
|
# - Context.call() is a light wrapper around .call_async(), the wrapper
|
|
# simply blocks the caller until a reply arrives.
|
|
# - .call_async() serializes the call signature into a message and passes
|
|
# it to .send_async()
|
|
# - .send_async() creates a mitogen.core.Receiver() on the local router.
|
|
# The receiver constructor uses Router.add_handle() to allocate a
|
|
# 'reply_to' handle and install a callback function that wakes the
|
|
# receiver when a reply message arrives.
|
|
# - .send_async() puts the reply handle in Message.reply_to field and
|
|
# passes it to .send()
|
|
# - Context.send() stamps the destination context ID into the
|
|
# Message.dst_id field and passes it to Router.route()
|
|
# - Router.route() uses Broker.defer() to schedule _async_route(msg)
|
|
# on the Broker thread.
|
|
# [broker thread]
|
|
# - The broker thread wakes and calls _async_route(msg)
|
|
# - Router._async_route() notices 'dst_id' is for a remote context and
|
|
# looks up the stream on which messages for dst_id should be sent (may be
|
|
# direct connection or not), and calls Stream.send()
|
|
# - Stream.send() packs the message into a bytestring, appends it to
|
|
# Stream._output_buf, and calls Broker.start_transmit()
|
|
# - Broker finishes work, reenters IO loop. IO loop wakes due to writeable
|
|
# stream.
|
|
# - Stream.on_transmit() writes the full/partial buffer to SSH, calls
|
|
# stop_transmit() to mark the stream unwriteable once _output_buf is
|
|
# empty.
|
|
# - Broker IO loop sleeps, no readers/writers.
|
|
# - Broker wakes due to SSH stream readable.
|
|
# - Stream.on_receive() called, reads the reply message, converts it to a
|
|
# Message and passes it to Router._async_route().
|
|
# - Router._async_route() notices message is for local context, looks up
|
|
# target handle in the .add_handle() registry.
|
|
# - Receiver._on_receive() called, appends message to receiver queue.
|
|
# [main thread]
|
|
# - Receiver.get() used to block the original Context.call() wakes and pops
|
|
# the message from the queue.
|
|
# - Message data (pickled return value) is deserialized and returned to the
|
|
# caller.
|
|
print("It's running on the local machine. Its PID is",
|
|
context.call(os.getpid))
|
|
|
|
# Now let's call a function defined in this module. On receiving the
|
|
# function call request, the child attempts to import __main__, which is
|
|
# initially missing, causing the importer in the child to request it from
|
|
# its parent. That causes _this script_ to be sent as the module source
|
|
# over the wire.
|
|
print("Calling md5sum(/etc/passwd) in the child:",
|
|
context.call(md5sum, '/etc/passwd'))
|
|
|
|
# Now let's "transfer" a file. The simplest way to do this is calling a
|
|
# function that returns the file data, which is totally fine for small
|
|
# files.
|
|
print("Download /etc/passwd via function call: %d bytes" % (
|
|
len(context.call(get_file_contents, '/etc/passwd'))
|
|
))
|
|
|
|
# And using function calls, in the other direction:
|
|
print("Upload /tmp/blah via function call: %s" % (
|
|
context.call(put_file_contents, '/tmp/blah', b'blah!'),
|
|
))
|
|
|
|
# Now lets transfer what might be a big files. The problem with big files
|
|
# is that they may not fit in RAM. This uses mitogen.services.FileService
|
|
# to implement streamy file transfer instead. The sender must have a
|
|
# 'service pool' running that will host FileService. First let's do the
|
|
# 'upload' direction, where the master hosts FileService.
|
|
|
|
# Steals the 'Router' reference from the context object. In a real app the
|
|
# pool would be constructed once at startup, this is just demo code.
|
|
file_service = mitogen.service.FileService(context.router)
|
|
|
|
# Start the pool.
|
|
pool = mitogen.service.Pool(context.router, services=[file_service])
|
|
|
|
# Grant access to a file on the local disk from unprivileged contexts.
|
|
# .register() is also exposed as a service method -- you can call it on a
|
|
# child context from any more privileged context.
|
|
file_service.register('/etc/passwd')
|
|
|
|
# Now call our wrapper function that knows how to handle the transfer. In a
|
|
# real app, this wrapper might also set ownership/modes or do any other
|
|
# app-specific stuff relating to the file that was transferred.
|
|
print("Streamy upload /etc/passwd: remote result: %s" % (
|
|
context.call(
|
|
streamy_download_file,
|
|
# To avoid hard-wiring streamy_download_file(), we want to pass it
|
|
# a Context object that hosts the file service it should request
|
|
# files from. Router.myself() returns a Context referring to this
|
|
# process.
|
|
context=router.myself(),
|
|
path='/etc/passwd',
|
|
),
|
|
))
|
|
|
|
# Shut down the pool now we're done with it, else app will hang at exit.
|
|
# Once again, this should only happen once at app startup/exit, not for
|
|
# every file transfer!
|
|
pool.stop(join=True)
|
|
|
|
# Now let's do the same thing but in reverse: we use FileService on the
|
|
# remote download a file. This uses context.call_service(), which invokes a
|
|
# special code path that causes auto-initialization of a thread pool in the
|
|
# target, and auto-construction of the target service, but only if the
|
|
# service call was made by a more privileged context. We could write a
|
|
# helper function that runs in the remote to do all that by hand, but the
|
|
# library handles it for us.
|
|
|
|
# Make the file accessible. A future FileService could avoid the need for
|
|
# this for privileged contexts.
|
|
context.call_service(
|
|
service_name=mitogen.service.FileService,
|
|
method_name='register',
|
|
path='/etc/passwd'
|
|
)
|
|
|
|
# Now we can use our streamy_download_file() function in reverse -- running
|
|
# it from this process and having it fetch from the remote process:
|
|
print("Streamy download /etc/passwd: result: %s" % (
|
|
streamy_download_file(context, '/etc/passwd'),
|
|
))
|
|
|
|
|
|
def main():
|
|
# Setup logging. Mitogen produces a LOT of logging. Over the course of the
|
|
# stable series, Mitogen's loggers will be carved up so more selective /
|
|
# user-friendly logging is possible. mitogen.log_to_file() just sets up
|
|
# something basic, defaulting to INFO level, but you can override from the
|
|
# command-line by passing MITOGEN_LOG_LEVEL=debug or MITOGEN_LOG_LEVEL=io.
|
|
# IO logging is sometimes useful for hangs, but it is often creates more
|
|
# confusion than it solves.
|
|
mitogen.utils.log_to_file()
|
|
|
|
# Construct the Broker thread. It manages an async IO loop listening for
|
|
# reads from any active connection, or wakes from any non-Broker thread.
|
|
# Because Mitogen uses a background worker thread, it is extremely
|
|
# important to pay attention to the use of UNIX fork in your code --
|
|
# forking entails making a snapshot of the state of all locks in the
|
|
# program, including those in the logging module, and thus can create code
|
|
# that appears to work for a long time, before deadlocking randomly.
|
|
# Forking in a Mitogen app requires significant upfront planning!
|
|
broker = mitogen.master.Broker()
|
|
|
|
# Construct a Router. This accepts messages (mitogen.core.Message) and
|
|
# either dispatches locally addressed messages to local handlers (added via
|
|
# Router.add_handle()) on the broker thread, or forwards the message
|
|
# towards the target context.
|
|
|
|
# The router also acts as an uglyish God object for creating new
|
|
# connections. This was a design mistake, really those methods should be
|
|
# directly imported from e.g. 'mitogen.ssh'.
|
|
router = mitogen.master.Router(broker)
|
|
|
|
# Router can act like a context manager. It simply ensures
|
|
# Broker.shutdown() is called on exception / exit. That prevents the app
|
|
# hanging due to a forgotten background thread. For throwaway scripts,
|
|
# there are also decorator versions "@mitogen.main()" and
|
|
# "@mitogen.utils.with_router" that do the same thing with less typing.
|
|
with router:
|
|
# Now let's construct a context. The '.local()' constructor just creates
|
|
# the context as a subprocess, the simplest possible case.
|
|
child = router.local()
|
|
print("Created a context:", child)
|
|
print()
|
|
|
|
# This demonstrates the standard IO redirection. We call the print
|
|
# function in the remote context, that should cause a log message to be
|
|
# emitted. Any subprocesses started by the remote also get the same
|
|
# treatment, so it's very easy to spot otherwise discarded errors/etc.
|
|
# from remote tools.
|
|
child.call(print, "Hello from child.")
|
|
|
|
# Context objects make it semi-convenient to treat the local machine the
|
|
# same as a remote machine.
|
|
work_on_machine(child)
|
|
|
|
# Now let's construct a proxied context. We'll simply use the .local()
|
|
# constructor again, but construct it via 'child'. In effect we are
|
|
# constructing a sub-sub-process. Instead of .local() here, we could
|
|
# have used .sudo() or .ssh() or anything else.
|
|
subchild = router.local(via=child)
|
|
print()
|
|
print()
|
|
print()
|
|
print("Created a context as a child of another context:", subchild)
|
|
|
|
# Do everything again with the new child.
|
|
work_on_machine(subchild)
|
|
|
|
# We can selectively shut down individual children if we want:
|
|
subchild.shutdown(wait=True)
|
|
|
|
# Or we can simply fall off the end of the scope, effectively calling
|
|
# Broker.shutdown(), which causes all children to die as part of
|
|
# shutdown.
|
|
|
|
|
|
# The child module importer detects the execution guard below and removes any
|
|
# code appearing after it, and refuses to execute "__main__" if it is absent.
|
|
# This is necessary to prevent a common problem where people try to call
|
|
# functions defined in __main__ without first wrapping it up to be importable
|
|
# as a module, which previously hung the target, or caused bizarre recursive
|
|
# script runs.
|
|
if __name__ == '__main__':
|
|
main()
|