diff --git a/mitogen/core.py b/mitogen/core.py index 10d02e1a..23ecebe7 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -332,26 +332,27 @@ except NameError: return True -def _partition(s, splitter, find): +def _partition(s, sep, find): """ (str|unicode).(partition|rpartition) for Python 2.4/2.5. """ - idx = find(splitter) - if idx == -1: - return type(s)(), type(s)(), s - left = s[0:idx] - mid = s[idx:idx+len(splitter)] - return left, mid, s[len(left)+len(mid):] + idx = find(sep) + if idx != -1: + left = s[0:idx] + return left, sep, s[len(left)+len(sep):] if hasattr(UnicodeType, 'rpartition'): - unicode__partition = UnicodeType.partition - unicode__rpartition = UnicodeType.rpartition - bytes__partition = BytesType.partition + str_partition = UnicodeType.partition + str_rpartition = UnicodeType.rpartition + bytes_partition = BytesType.partition else: - unicode__partition = lambda s, splitter: _partition(s, splitter, s.find) - unicode__rpartition = lambda s, splitter: _partition(s, splitter, s.rfind) - bytes__partition = lambda s, splitter: _partition(s, splitter, s.find) + def str_partition(s, sep): + return _partition(s, sep, s.find) or (s, u'', u'') + def str_rpartition(s, sep): + return _partition(s, sep, s.rfind) or (u'', u'', s) + def bytes_partition(s, sep): + return _partition(s, sep, s.find) or (s, '', '') def has_parent_authority(msg, _stream=None): @@ -1116,7 +1117,7 @@ class Importer(object): if fullname == '__main__': raise ModuleNotFoundError() - parent, _, modname = unicode__rpartition(fullname, '.') + parent, _, modname = str_rpartition(fullname, '.') if parent: path = sys.modules[parent].__path__ else: @@ -1134,7 +1135,7 @@ class Importer(object): try: _v and LOG.debug('%r.find_module(%r)', self, fullname) fullname = to_text(fullname) - pkgname, dot, _ = unicode__rpartition(fullname, '.') + pkgname, dot, _ = str_rpartition(fullname, '.') pkg = sys.modules.get(pkgname) if pkgname and getattr(pkg, '__loader__', None) is not self: LOG.debug('%r: %r is submodule of a package we did not load', @@ -1263,7 +1264,7 @@ class Importer(object): mod.__package__ = fullname self._present[fullname] = pkg_present else: - mod.__package__ = unicode__rpartition(fullname, '.')[0] or None + mod.__package__ = str_rpartition(fullname, '.')[0] or None if mod.__package__ and not PY3: # 2.x requires __package__ to be exactly a string. @@ -2326,7 +2327,7 @@ class IoLogger(BasicStream): def _log_lines(self): while self._buf.find('\n') != -1: - line, _, self._buf = bytes__partition(self._buf, '\n') + line, _, self._buf = str_partition(self._buf, '\n') self._log.info('%s', line.rstrip('\n')) def on_shutdown(self, broker): @@ -2408,7 +2409,7 @@ class Router(object): """ LOG.error('%r._on_del_route() %r', self, msg) if not msg.is_dead: - target_id_s, _, name = bytes__partition(msg.data, b(':')) + target_id_s, _, name = bytes_partition(msg.data, b(':')) target_id = int(target_id_s, 10) if target_id not in self._context_by_id: LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg) diff --git a/mitogen/master.py b/mitogen/master.py index da0812df..c439cbdc 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -59,9 +59,10 @@ import mitogen.minify import mitogen.parent from mitogen.core import b -from mitogen.core import to_text -from mitogen.core import LOG from mitogen.core import IOLOG +from mitogen.core import LOG +from mitogen.core import str_partition +from mitogen.core import to_text imap = getattr(itertools, 'imap', map) izip = getattr(itertools, 'izip', zip) @@ -464,7 +465,7 @@ class ModuleFinder(object): # else we could return junk. return - pkgname, _, modname = fullname.rpartition('.') + pkgname, _, modname = str_partition(fullname, '.') pkg = sys.modules.get(pkgname) if pkg is None or not hasattr(pkg, '__file__'): return @@ -559,7 +560,7 @@ class ModuleFinder(object): def generate_parent_names(self, fullname): while '.' in fullname: - fullname, _, _ = fullname.rpartition('.') + fullname, _, _ = str_partition(fullname, '.') yield fullname def find_related_imports(self, fullname): @@ -784,7 +785,7 @@ class ModuleResponder(object): return for name in tup[4]: # related - parent, _, _ = name.partition('.') + parent, _, _ = str_partition(name, '.') if parent != fullname and parent not in stream.sent_modules: # Parent hasn't been sent, so don't load submodule yet. continue diff --git a/tests/polyfill_functions_test.py b/tests/polyfill_functions_test.py new file mode 100644 index 00000000..ae65eb2f --- /dev/null +++ b/tests/polyfill_functions_test.py @@ -0,0 +1,103 @@ + +import testlib +import unittest2 + +import mitogen.core +from mitogen.core import b + + +class BytesPartitionTest(testlib.TestCase): + func = staticmethod(mitogen.core.bytes_partition) + + def test_no_sep(self): + left, sep, right = self.func(b('dave'), b('x')) + self.assertTrue(isinstance(left, mitogen.core.BytesType)) + self.assertTrue(isinstance(sep, mitogen.core.BytesType)) + self.assertTrue(isinstance(right, mitogen.core.BytesType)) + self.assertEquals(left, b('dave')) + self.assertEquals(sep, b('')) + self.assertEquals(right, b('')) + + def test_one_sep(self): + left, sep, right = self.func(b('davexdave'), b('x')) + self.assertTrue(isinstance(left, mitogen.core.BytesType)) + self.assertTrue(isinstance(sep, mitogen.core.BytesType)) + self.assertTrue(isinstance(right, mitogen.core.BytesType)) + self.assertEquals(left, b('dave')) + self.assertEquals(sep, b('x')) + self.assertEquals(right, b('dave')) + + def test_two_seps(self): + left, sep, right = self.func(b('davexdavexdave'), b('x')) + self.assertTrue(isinstance(left, mitogen.core.BytesType)) + self.assertTrue(isinstance(sep, mitogen.core.BytesType)) + self.assertTrue(isinstance(right, mitogen.core.BytesType)) + self.assertEquals(left, b('dave')) + self.assertEquals(sep, b('x')) + self.assertEquals(right, b('davexdave')) + + +class StrPartitionTest(testlib.TestCase): + func = staticmethod(mitogen.core.str_partition) + + def test_no_sep(self): + left, sep, right = self.func(u'dave', u'x') + self.assertTrue(isinstance(left, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(sep, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(right, mitogen.core.UnicodeType)) + self.assertEquals(left, u'dave') + self.assertEquals(sep, u'') + self.assertEquals(right, u'') + + def test_one_sep(self): + left, sep, right = self.func(u'davexdave', u'x') + self.assertTrue(isinstance(left, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(sep, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(right, mitogen.core.UnicodeType)) + self.assertEquals(left, u'dave') + self.assertEquals(sep, u'x') + self.assertEquals(right, u'dave') + + def test_two_seps(self): + left, sep, right = self.func(u'davexdavexdave', u'x') + self.assertTrue(isinstance(left, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(sep, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(right, mitogen.core.UnicodeType)) + self.assertEquals(left, u'dave') + self.assertEquals(sep, u'x') + self.assertEquals(right, u'davexdave') + + +class StrRpartitionTest(testlib.TestCase): + func = staticmethod(mitogen.core.str_rpartition) + + def test_no_sep(self): + left, sep, right = self.func(u'dave', u'x') + self.assertTrue(isinstance(left, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(sep, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(right, mitogen.core.UnicodeType)) + self.assertEquals(left, u'') + self.assertEquals(sep, u'') + self.assertEquals(right, u'dave') + + def test_one_sep(self): + left, sep, right = self.func(u'davexdave', u'x') + self.assertTrue(isinstance(left, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(sep, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(right, mitogen.core.UnicodeType)) + self.assertEquals(left, u'dave') + self.assertEquals(sep, u'x') + self.assertEquals(right, u'dave') + + def test_two_seps(self): + left, sep, right = self.func(u'davexdavexdave', u'x') + self.assertTrue(isinstance(left, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(sep, mitogen.core.UnicodeType)) + self.assertTrue(isinstance(right, mitogen.core.UnicodeType)) + self.assertEquals(left, u'davexdave') + self.assertEquals(sep, u'x') + self.assertEquals(right, u'dave') + + +if __name__ == '__main__': + unittest2.main()