fakessh: More (incomplete) work on graceful exit.

pull/35/head
David Wilson 7 years ago
parent 58a5af5be0
commit 168498198c

@ -60,6 +60,7 @@ import socket
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import threading
import econtext.core import econtext.core
import econtext.master import econtext.master
@ -77,17 +78,25 @@ _econtext = None
class IoPump(econtext.core.BasicStream): class IoPump(econtext.core.BasicStream):
_output_buf = '' _output_buf = ''
_closed = False
def __init__(self, process, broker, stdin_fd, stdout_fd): def __init__(self, process, broker, stdin_fd, stdout_fd):
self.process = process self.process = process
self._broker = broker self._broker = broker
self.transmit_side = econtext.core.Side(self, stdin_fd)
self.receive_side = econtext.core.Side(self, stdout_fd) self.receive_side = econtext.core.Side(self, stdout_fd)
self.transmit_side = econtext.core.Side(self, stdin_fd)
def write(self, s): def write(self, s):
self._output_buf += s self._output_buf += s
self._broker.start_transmit(self) self._broker.start_transmit(self)
def close(self):
self._closed = True
self._broker.start_transmit(self)
def on_shutdown(self, broker):
self.close()
def on_transmit(self, broker): def on_transmit(self, broker):
written = self.transmit_side.write(self._output_buf) written = self.transmit_side.write(self._output_buf)
IOLOG.debug('%r.on_transmit() -> len %r', self, written) IOLOG.debug('%r.on_transmit() -> len %r', self, written)
@ -98,18 +107,16 @@ class IoPump(econtext.core.BasicStream):
if not self._output_buf: if not self._output_buf:
broker.stop_transmit(self) broker.stop_transmit(self)
if self._closed:
self.on_disconnect(broker)
def on_receive(self, broker): def on_receive(self, broker):
s = self.receive_side.read() s = self.receive_side.read()
IOLOG.debug('%r.on_receive() -> len %r', self, len(s)) IOLOG.debug('%r.on_receive() -> len %r', self, len(s))
if not s: if s:
self.on_disconnect(broker) econtext.core.fire(self, 'receive', s)
else: else:
self.process.stdin.put(s) self.on_disconnect(broker)
def on_disconnect(self, broker):
super(IoPump, self).on_disconnect(broker)
self.process.stdin.close()
def __repr__(self): def __repr__(self):
return 'IoPump(%r)' % ( return 'IoPump(%r)' % (
@ -122,23 +129,31 @@ class Process(object):
Manages the lifetime and pipe connections of the SSH command running in the Manages the lifetime and pipe connections of the SSH command running in the
slave. slave.
""" """
def __init__(self, router, stdin_fd, stdout_fd, pid=None): def __init__(self, router, stdin_fd, stdout_fd, proc=None):
self.router = router self.router = router
self.stdin_fd = stdin_fd self.stdin_fd = stdin_fd
self.stdout_fd = stdout_fd self.stdout_fd = stdout_fd
self.pid = None self.proc = proc
self.control_handle = router.add_handler(self._on_control) self.control_handle = router.add_handler(self._on_control)
self.stdin_handle = router.add_handler(self._on_stdin) self.stdin_handle = router.add_handler(self._on_stdin)
self.pump = IoPump(self, router.broker, stdin_fd, stdout_fd) self.pump = IoPump(self, router.broker, stdin_fd, stdout_fd)
self.stdin = None self.stdin = None
self.control = None self.control = None
self.wake_event = threading.Event()
econtext.core.listen(self.pump, 'disconnect', self._on_pump_disconnect)
econtext.core.listen(self.pump, 'receive', self._on_pump_receive)
if proc:
pmon = econtext.master.ProcessMonitor.instance()
pmon.add(proc.pid, self._on_proc_exit)
def __repr__(self): def __repr__(self):
return 'Process(%r, %r, %r)' % ( return 'Process(%r, %r)' % (self.stdin_fd, self.stdout_fd)
self.stdin_fd,
self.stdout_fd, def _on_proc_exit(self, status):
self.pid, LOG.debug('%r._on_proc_exit(%r)', self, status)
) self.control.put(('exit', status))
def _on_stdin(self, msg): def _on_stdin(self, msg):
if msg == econtext.core._DEAD: if msg == econtext.core._DEAD:
@ -148,28 +163,44 @@ class Process(object):
IOLOG.debug('%r._on_stdin(%r)', self, data) IOLOG.debug('%r._on_stdin(%r)', self, data)
if data == econtext.core._DEAD: if data == econtext.core._DEAD:
self.pump.transmit_side.close() self.pump.close()
else: else:
self.pump.write(data) self.pump.write(data)
def _on_control(self, msg): def _on_control(self, msg):
if msg == econtext.core._DEAD: if msg != econtext.core._DEAD:
return command, arg = msg.unpickle()
LOG.debug('%r._on_control(%r, %s)', self, command, arg)
func = getattr(self, '_on_%s' % (command,), None)
if func:
return func(msg, arg)
command, arg = msg.unpickle()
LOG.debug('%r._on_control(%r, %s)', self, command, arg)
if command == 'start':
dest = econtext.core.Context(self.router, msg.src_id)
self.control = econtext.core.Sender(dest, arg[0])
self.stdin = econtext.core.Sender(dest, arg[1])
self.router.broker.start_receive(self.pump)
elif command == 'kill':
if self.pid is not None:
os.kill(self.pid, signal.SIGTERM)
else:
LOG.warning('%r: unknown command %r', self, command) LOG.warning('%r: unknown command %r', self, command)
def _on_start(self, msg, arg):
dest = econtext.core.Context(self.router, msg.src_id)
self.control = econtext.core.Sender(dest, arg[0])
self.stdin = econtext.core.Sender(dest, arg[1])
self.router.broker.start_receive(self.pump)
def _on_exit(self, msg, arg):
LOG.info('on_exit: proc = %r', self.proc)
if self.proc:
self.proc.terminate()
else:
self.router.broker.shutdown()
def _on_pump_receive(self, s):
IOLOG.info('%r._on_pump_receive()', self)
self.stdin.put(s)
def _on_pump_disconnect(self):
LOG.info('%r._on_pump_disconnect()', self)
econtext.core.fire(self, 'disconnect')
self.stdin.close()
self.wake_event.set()
def start_master(self, stdin, control): def start_master(self, stdin, control):
self.stdin = stdin self.stdin = stdin
self.control = control self.control = control
@ -177,8 +208,8 @@ class Process(object):
self.router.broker.start_receive(self.pump) self.router.broker.start_receive(self.pump)
def wait(self): def wait(self):
import time while not self.wake_event.wait(0.1):
time.sleep(3) pass
def _start_slave(econtext_, src_id, args): def _start_slave(econtext_, src_id, args):
@ -198,8 +229,10 @@ def _start_slave(econtext_, src_id, args):
process = Process(econtext_.router, process = Process(econtext_.router,
proc.stdin.fileno(), proc.stdin.fileno(),
proc.stdout.fileno(), proc.stdout.fileno(),
proc.pid, proc,
) )
#process.control.put(('exit', None))
return process.control_handle, process.stdin_handle return process.control_handle, process.stdin_handle
@ -252,9 +285,9 @@ def _fakessh_main(econtext_, dest_context_id):
else: else:
LOG.debug('Warning option %s %s is ignored.', opt, optarg) LOG.debug('Warning option %s %s is ignored.', opt, optarg)
LOG.info('hostname: %r', hostname) LOG.debug('hostname: %r', hostname)
LOG.info('opts: %r', opts) LOG.debug('opts: %r', opts)
LOG.info('args: %r', args) LOG.debug('args: %r', args)
dest = econtext.master.Context(econtext_.router, dest_context_id) dest = econtext.master.Context(econtext_.router, dest_context_id)
control_handle, stdin_handle = dest.call_with_deadline(None, True, control_handle, stdin_handle = dest.call_with_deadline(None, True,
@ -269,7 +302,7 @@ def _fakessh_main(econtext_, dest_context_id):
control=econtext.core.Sender(dest, control_handle), control=econtext.core.Sender(dest, control_handle),
) )
process.wait() process.wait()
process.control.put(('kill', None)) process.control.put(('exit', None))
# #
@ -299,6 +332,7 @@ def run_with_fake_ssh(dest, router, args, deadline=None):
econtext.core.set_cloexec(sock1.fileno()) econtext.core.set_cloexec(sock1.fileno())
stream = econtext.core.Stream(router, context_id, fakessh.key) stream = econtext.core.Stream(router, context_id, fakessh.key)
stream.name = 'fakessh'
stream.accept(sock1.fileno(), sock1.fileno()) stream.accept(sock1.fileno(), sock1.fileno())
router.register(fakessh, stream) router.register(fakessh, stream)

Loading…
Cancel
Save