Merge remote-tracking branch 'origin/dmw'

- Closes #364.
issue510
David Wilson 7 years ago
commit 9a83b1a82b

@ -164,6 +164,10 @@ Fixes
environment variable if it is set, causing behaviour to diverge when Ansible environment variable if it is set, causing behaviour to diverge when Ansible
was invoked across user accounts via ``sudo``. was invoked across user accounts via ``sudo``.
* `#364 <https://github.com/dw/mitogen/issues/364>`_: file transfers from
controllers running Python 2.7.2 or earlier could be interrupted due to a
forking bug in the :mod:`tempfile` module.
* `#370 <https://github.com/dw/mitogen/issues/370>`_: the Ansible * `#370 <https://github.com/dw/mitogen/issues/370>`_: the Ansible
`reboot <https://docs.ansible.com/ansible/latest/modules/reboot_module.html>`_ `reboot <https://docs.ansible.com/ansible/latest/modules/reboot_module.html>`_
module is supported. module is supported.

@ -721,13 +721,19 @@ class Sender(object):
_vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
self.context.send(Message.pickled(data, handle=self.dst_handle)) self.context.send(Message.pickled(data, handle=self.dst_handle))
explicit_close_msg = 'Sender was explicitly closed'
def close(self): def close(self):
""" """
Send a dead message to the remote, causing :meth:`ChannelError` to be Send a dead message to the remote, causing :meth:`ChannelError` to be
raised in any waiting thread. raised in any waiting thread.
""" """
_vv and IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
self.context.send(Message.dead(handle=self.dst_handle)) self.context.send(
Message.dead(
reason=self.explicit_close_msg,
handle=self.dst_handle)
)
def __repr__(self): def __repr__(self):
return 'Sender(%r, %r)' % (self.context, self.dst_handle) return 'Sender(%r, %r)' % (self.context, self.dst_handle)

@ -754,8 +754,8 @@ class FileService(Service):
def __init__(self, router): def __init__(self, router):
super(FileService, self).__init__(router) super(FileService, self).__init__(router)
#: Mapping of registered path -> file size. #: Set of registered paths.
self._metadata_by_path = {} self._paths = set()
#: Mapping of Stream->FileStreamState. #: Mapping of Stream->FileStreamState.
self._state_by_stream = {} self._state_by_stream = {}
@ -772,20 +772,21 @@ class FileService(Service):
def register(self, path): def register(self, path):
""" """
Authorize a path for access by children. Repeat calls with the same Authorize a path for access by children. Repeat calls with the same
path is harmless. path has no effect.
:param str path: :param str path:
File path. File path.
""" """
if path in self._metadata_by_path: if path not in self._paths:
return LOG.debug('%r: registering %r', self, path)
self._paths.add(path)
def _generate_stat(self, path):
st = os.stat(path) st = os.stat(path)
if not stat.S_ISREG(st.st_mode): if not stat.S_ISREG(st.st_mode):
raise IOError('%r is not a regular file.' % (path,)) raise IOError('%r is not a regular file.' % (path,))
LOG.debug('%r: registering %r', self, path) return {
self._metadata_by_path[path] = {
'size': st.st_size, 'size': st.st_size,
'mode': st.st_mode, 'mode': st.st_mode,
'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'), 'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'),
@ -869,26 +870,26 @@ class FileService(Service):
:raises Error: :raises Error:
Unregistered path, or Sender did not match requestee context. Unregistered path, or Sender did not match requestee context.
""" """
if path not in self._metadata_by_path: if path not in self._paths:
raise Error(self.unregistered_msg) raise Error(self.unregistered_msg)
if msg.src_id != sender.context.context_id: if msg.src_id != sender.context.context_id:
raise Error(self.context_mismatch_msg) raise Error(self.context_mismatch_msg)
LOG.debug('Serving %r', path) LOG.debug('Serving %r', path)
# Response must arrive first so requestee can begin receive loop,
# otherwise first ack won't arrive until all pending chunks were
# delivered. In that case max BDP would always be 128KiB, aka. max
# ~10Mbit/sec over a 100ms link.
try: try:
fp = open(path, 'rb', self.IO_SIZE) fp = open(path, 'rb', self.IO_SIZE)
msg.reply(self._generate_stat(path))
except IOError: except IOError:
msg.reply(mitogen.core.CallError( msg.reply(mitogen.core.CallError(
sys.exc_info()[1] sys.exc_info()[1]
)) ))
return return
# Response must arrive first so requestee can begin receive loop,
# otherwise first ack won't arrive until all pending chunks were
# delivered. In that case max BDP would always be 128KiB, aka. max
# ~10Mbit/sec over a 100ms link.
msg.reply(self._metadata_by_path[path])
stream = self.router.stream_by_id(sender.context.context_id) stream = self.router.stream_by_id(sender.context.context_id)
state = self._state_by_stream.setdefault(stream, FileStreamState()) state = self._state_by_stream.setdefault(stream, FileStreamState())
state.lock.acquire() state.lock.acquire()
@ -949,6 +950,7 @@ class FileService(Service):
sender=recv.to_sender(), sender=recv.to_sender(),
) )
received_bytes = 0
for chunk in recv: for chunk in recv:
s = chunk.unpickle() s = chunk.unpickle()
LOG.debug('get_file(%r): received %d bytes', path, len(s)) LOG.debug('get_file(%r): received %d bytes', path, len(s))
@ -958,11 +960,19 @@ class FileService(Service):
size=len(s), size=len(s),
).close() ).close()
out_fp.write(s) out_fp.write(s)
received_bytes += len(s)
ok = out_fp.tell() == metadata['size'] ok = received_bytes == metadata['size']
if not ok: if received_bytes < metadata['size']:
LOG.error('get_file(%r): receiver was closed early, controller ' LOG.error('get_file(%r): receiver was closed early, controller '
'is likely shutting down.', path) 'may be shutting down, or the file was truncated '
'during transfer. Expected %d bytes, received %d.',
path, metadata['size'], received_bytes)
elif received_bytes > metadata['size']:
LOG.error('get_file(%r): the file appears to have grown '
'while transfer was in progress. Expected %d '
'bytes, received %d.',
path, metadata['size'], received_bytes)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms',
metadata['size'], path, context, 1000 * (time.time() - t0)) metadata['size'], path, context, 1000 * (time.time() - t0))

@ -1,6 +1,6 @@
# Verify copy module for small and large files, and inline content. # Verify copy module for small and large files, and inline content.
- name: integration/action/synchronize.yml - name: integration/action/copy.yml
hosts: test-targets hosts: test-targets
any_errors_fatal: true any_errors_fatal: true
tasks: tasks:

Loading…
Cancel
Save