issue #600: /etc/environment may be non-ASCII in an unknown encoding

pull/612/head
David Wilson 5 years ago
parent e87e41e69e
commit e4321f81a0

@ -52,7 +52,6 @@ import mitogen.core
import ansible_mitogen.target # TODO: circular import import ansible_mitogen.target # TODO: circular import
from mitogen.core import b from mitogen.core import b
from mitogen.core import bytes_partition from mitogen.core import bytes_partition
from mitogen.core import str_partition
from mitogen.core import str_rpartition from mitogen.core import str_rpartition
from mitogen.core import to_text from mitogen.core import to_text
@ -104,12 +103,20 @@ iteritems = getattr(dict, 'iteritems', dict.items)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def shlex_split_b(s):
"""
Use shlex.split() to split characters in some single-byte encoding, without
knowing what that encoding is. The input is bytes, the output is a list of
bytes.
"""
assert isinstance(s, mitogen.core.BytesType)
if mitogen.core.PY3: if mitogen.core.PY3:
shlex_split = shlex.split return [
else: t.encode('latin1')
def shlex_split(s, comments=False): for t in shlex.split(s.decode('latin1'), comments=True)
return [mitogen.core.to_text(token) ]
for token in shlex.split(str(s), comments=comments)]
return [t for t in shlex.split(s, comments=True)]
class TempFileWatcher(object): class TempFileWatcher(object):
@ -165,13 +172,19 @@ class EnvironmentFileWatcher(object):
A more robust future approach may simply be to arrange for the persistent A more robust future approach may simply be to arrange for the persistent
interpreter to restart when a change is detected. interpreter to restart when a change is detected.
""" """
# We know nothing about the character set of /etc/environment or the
# process environment.
environ = getattr(os, 'environb', os.environ)
def __init__(self, path): def __init__(self, path):
self.path = os.path.expanduser(path) self.path = os.path.expanduser(path)
#: Inode data at time of last check. #: Inode data at time of last check.
self._st = self._stat() self._st = self._stat()
#: List of inherited keys appearing to originated from this file. #: List of inherited keys appearing to originated from this file.
self._keys = [key for key, value in self._load() self._keys = [
if value == os.environ.get(key)] key for key, value in self._load()
if value == self.environ.get(key)
]
LOG.debug('%r installed; existing keys: %r', self, self._keys) LOG.debug('%r installed; existing keys: %r', self, self._keys)
def __repr__(self): def __repr__(self):
@ -185,7 +198,7 @@ class EnvironmentFileWatcher(object):
def _load(self): def _load(self):
try: try:
fp = codecs.open(self.path, 'r', encoding='utf-8') fp = open(self.path, 'rb')
try: try:
return list(self._parse(fp)) return list(self._parse(fp))
finally: finally:
@ -199,36 +212,36 @@ class EnvironmentFileWatcher(object):
""" """
for line in fp: for line in fp:
# ' #export foo=some var ' -> ['#export', 'foo=some var '] # ' #export foo=some var ' -> ['#export', 'foo=some var ']
bits = shlex_split(line, comments=True) bits = shlex_split_b(line)
if (not bits) or bits[0].startswith('#'): if (not bits) or bits[0].startswith(b('#')):
continue continue
if bits[0] == u'export': if bits[0] == b('export'):
bits.pop(0) bits.pop(0)
key, sep, value = str_partition(u' '.join(bits), u'=') key, sep, value = bytes_partition(b(' ').join(bits), b('='))
if key and sep: if key and sep:
yield key, value yield key, value
def _on_file_changed(self): def _on_file_changed(self):
LOG.debug('%r: file changed, reloading', self) LOG.debug('%r: file changed, reloading', self)
for key, value in self._load(): for key, value in self._load():
if key in os.environ: if key in self.environ:
LOG.debug('%r: existing key %r=%r exists, not setting %r', LOG.debug('%r: existing key %r=%r exists, not setting %r',
self, key, os.environ[key], value) self, key, self.environ[key], value)
else: else:
LOG.debug('%r: setting key %r to %r', self, key, value) LOG.debug('%r: setting key %r to %r', self, key, value)
self._keys.append(key) self._keys.append(key)
os.environ[key] = value self.environ[key] = value
def _remove_existing(self): def _remove_existing(self):
""" """
When a change is detected, remove keys that existed in the old file. When a change is detected, remove keys that existed in the old file.
""" """
for key in self._keys: for key in self._keys:
if key in os.environ: if key in self.environ:
LOG.debug('%r: removing old key %r', self, key) LOG.debug('%r: removing old key %r', self, key)
del os.environ[key] del self.environ[key]
self._keys = [] self._keys = []
def check(self): def check(self):

@ -0,0 +1,74 @@
import os
import sys
import tempfile
import mock
import unittest2
import testlib
from mitogen.core import b
import ansible_mitogen.runner
klass = ansible_mitogen.runner.EnvironmentFileWatcher
environb = getattr(os, 'environb', os.environ)
class WatcherTest(testlib.TestCase):
def setUp(self):
self.original_env = environb.copy()
self.tf = tempfile.NamedTemporaryFile()
def tearDown(self):
self.tf.close()
environb.clear()
environb.update(self.original_env)
def test_missing_file(self):
# just ensure it doesn't crash
watcher = klass('/nonexistent')
watcher.check()
def test_file_becomes_missing(self):
# just ensure it doesn't crash
watcher = klass(self.tf.name)
watcher.check()
os.unlink(self.tf.name)
watcher.check()
open(self.tf.name,'wb').close()
def test_key_deleted(self):
environb[b('SOMEKEY')] = b('123')
self.tf.write(b('SOMEKEY=123\n'))
self.tf.flush()
watcher = klass(self.tf.name)
self.tf.seek(0)
self.tf.truncate(0)
watcher.check()
self.assertTrue(b('SOMEKEY') not in environb)
def test_key_added(self):
watcher = klass(self.tf.name)
self.tf.write(b('SOMEKEY=123\n'))
self.tf.flush()
watcher.check()
self.assertEqual(environb[b('SOMEKEY')], b('123'))
def test_key_shadowed_nuchange(self):
environb[b('SOMEKEY')] = b('234')
self.tf.write(b('SOMEKEY=123\n'))
self.tf.flush()
watcher = klass(self.tf.name)
watcher.check()
self.assertEqual(environb[b('SOMEKEY')], b('234'))
def test_binary_key_added(self):
watcher = klass(self.tf.name)
self.tf.write(b('SOMEKEY=\xff\xff\xff\n'))
self.tf.flush()
watcher.check()
self.assertEqual(environb[b('SOMEKEY')], b('\xff\xff\xff'))
if __name__ == '__main__':
unittest2.main()

@ -91,6 +91,11 @@
shell: locale-gen shell: locale-gen
when: distro == "Debian" when: distro == "Debian"
- name: Write Unicode into /etc/environment
copy:
dest: /etc/environment
content: "UNICODE_SNOWMAN=\u2603\n"
- name: Install prebuilt 'doas' binary - name: Install prebuilt 'doas' binary
unarchive: unarchive:
dest: / dest: /

Loading…
Cancel
Save