issue #477: rename and add tests for polyfill functions.

issue510
David Wilson 6 years ago
parent da13415b00
commit 97a96f5dd8

@ -332,26 +332,27 @@ except NameError:
return True return True
def _partition(s, splitter, find): def _partition(s, sep, find):
""" """
(str|unicode).(partition|rpartition) for Python 2.4/2.5. (str|unicode).(partition|rpartition) for Python 2.4/2.5.
""" """
idx = find(splitter) idx = find(sep)
if idx == -1: if idx != -1:
return type(s)(), type(s)(), s left = s[0:idx]
left = s[0:idx] return left, sep, s[len(left)+len(sep):]
mid = s[idx:idx+len(splitter)]
return left, mid, s[len(left)+len(mid):]
if hasattr(UnicodeType, 'rpartition'): if hasattr(UnicodeType, 'rpartition'):
unicode__partition = UnicodeType.partition str_partition = UnicodeType.partition
unicode__rpartition = UnicodeType.rpartition str_rpartition = UnicodeType.rpartition
bytes__partition = BytesType.partition bytes_partition = BytesType.partition
else: else:
unicode__partition = lambda s, splitter: _partition(s, splitter, s.find) def str_partition(s, sep):
unicode__rpartition = lambda s, splitter: _partition(s, splitter, s.rfind) return _partition(s, sep, s.find) or (s, u'', u'')
bytes__partition = lambda s, splitter: _partition(s, splitter, s.find) def str_rpartition(s, sep):
return _partition(s, sep, s.rfind) or (u'', u'', s)
def bytes_partition(s, sep):
return _partition(s, sep, s.find) or (s, '', '')
def has_parent_authority(msg, _stream=None): def has_parent_authority(msg, _stream=None):
@ -1116,7 +1117,7 @@ class Importer(object):
if fullname == '__main__': if fullname == '__main__':
raise ModuleNotFoundError() raise ModuleNotFoundError()
parent, _, modname = unicode__rpartition(fullname, '.') parent, _, modname = str_rpartition(fullname, '.')
if parent: if parent:
path = sys.modules[parent].__path__ path = sys.modules[parent].__path__
else: else:
@ -1134,7 +1135,7 @@ class Importer(object):
try: try:
_v and LOG.debug('%r.find_module(%r)', self, fullname) _v and LOG.debug('%r.find_module(%r)', self, fullname)
fullname = to_text(fullname) fullname = to_text(fullname)
pkgname, dot, _ = unicode__rpartition(fullname, '.') pkgname, dot, _ = str_rpartition(fullname, '.')
pkg = sys.modules.get(pkgname) pkg = sys.modules.get(pkgname)
if pkgname and getattr(pkg, '__loader__', None) is not self: if pkgname and getattr(pkg, '__loader__', None) is not self:
LOG.debug('%r: %r is submodule of a package we did not load', LOG.debug('%r: %r is submodule of a package we did not load',
@ -1263,7 +1264,7 @@ class Importer(object):
mod.__package__ = fullname mod.__package__ = fullname
self._present[fullname] = pkg_present self._present[fullname] = pkg_present
else: else:
mod.__package__ = unicode__rpartition(fullname, '.')[0] or None mod.__package__ = str_rpartition(fullname, '.')[0] or None
if mod.__package__ and not PY3: if mod.__package__ and not PY3:
# 2.x requires __package__ to be exactly a string. # 2.x requires __package__ to be exactly a string.
@ -2326,7 +2327,7 @@ class IoLogger(BasicStream):
def _log_lines(self): def _log_lines(self):
while self._buf.find('\n') != -1: while self._buf.find('\n') != -1:
line, _, self._buf = bytes__partition(self._buf, '\n') line, _, self._buf = str_partition(self._buf, '\n')
self._log.info('%s', line.rstrip('\n')) self._log.info('%s', line.rstrip('\n'))
def on_shutdown(self, broker): def on_shutdown(self, broker):
@ -2408,7 +2409,7 @@ class Router(object):
""" """
LOG.error('%r._on_del_route() %r', self, msg) LOG.error('%r._on_del_route() %r', self, msg)
if not msg.is_dead: if not msg.is_dead:
target_id_s, _, name = bytes__partition(msg.data, b(':')) target_id_s, _, name = bytes_partition(msg.data, b(':'))
target_id = int(target_id_s, 10) target_id = int(target_id_s, 10)
if target_id not in self._context_by_id: if target_id not in self._context_by_id:
LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg) LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg)

@ -59,9 +59,10 @@ import mitogen.minify
import mitogen.parent import mitogen.parent
from mitogen.core import b from mitogen.core import b
from mitogen.core import to_text
from mitogen.core import LOG
from mitogen.core import IOLOG from mitogen.core import IOLOG
from mitogen.core import LOG
from mitogen.core import str_partition
from mitogen.core import to_text
imap = getattr(itertools, 'imap', map) imap = getattr(itertools, 'imap', map)
izip = getattr(itertools, 'izip', zip) izip = getattr(itertools, 'izip', zip)
@ -464,7 +465,7 @@ class ModuleFinder(object):
# else we could return junk. # else we could return junk.
return return
pkgname, _, modname = fullname.rpartition('.') pkgname, _, modname = str_partition(fullname, '.')
pkg = sys.modules.get(pkgname) pkg = sys.modules.get(pkgname)
if pkg is None or not hasattr(pkg, '__file__'): if pkg is None or not hasattr(pkg, '__file__'):
return return
@ -559,7 +560,7 @@ class ModuleFinder(object):
def generate_parent_names(self, fullname): def generate_parent_names(self, fullname):
while '.' in fullname: while '.' in fullname:
fullname, _, _ = fullname.rpartition('.') fullname, _, _ = str_partition(fullname, '.')
yield fullname yield fullname
def find_related_imports(self, fullname): def find_related_imports(self, fullname):
@ -784,7 +785,7 @@ class ModuleResponder(object):
return return
for name in tup[4]: # related for name in tup[4]: # related
parent, _, _ = name.partition('.') parent, _, _ = str_partition(name, '.')
if parent != fullname and parent not in stream.sent_modules: if parent != fullname and parent not in stream.sent_modules:
# Parent hasn't been sent, so don't load submodule yet. # Parent hasn't been sent, so don't load submodule yet.
continue continue

@ -0,0 +1,103 @@
import testlib
import unittest2
import mitogen.core
from mitogen.core import b
class BytesPartitionTest(testlib.TestCase):
func = staticmethod(mitogen.core.bytes_partition)
def test_no_sep(self):
left, sep, right = self.func(b('dave'), b('x'))
self.assertTrue(isinstance(left, mitogen.core.BytesType))
self.assertTrue(isinstance(sep, mitogen.core.BytesType))
self.assertTrue(isinstance(right, mitogen.core.BytesType))
self.assertEquals(left, b('dave'))
self.assertEquals(sep, b(''))
self.assertEquals(right, b(''))
def test_one_sep(self):
left, sep, right = self.func(b('davexdave'), b('x'))
self.assertTrue(isinstance(left, mitogen.core.BytesType))
self.assertTrue(isinstance(sep, mitogen.core.BytesType))
self.assertTrue(isinstance(right, mitogen.core.BytesType))
self.assertEquals(left, b('dave'))
self.assertEquals(sep, b('x'))
self.assertEquals(right, b('dave'))
def test_two_seps(self):
left, sep, right = self.func(b('davexdavexdave'), b('x'))
self.assertTrue(isinstance(left, mitogen.core.BytesType))
self.assertTrue(isinstance(sep, mitogen.core.BytesType))
self.assertTrue(isinstance(right, mitogen.core.BytesType))
self.assertEquals(left, b('dave'))
self.assertEquals(sep, b('x'))
self.assertEquals(right, b('davexdave'))
class StrPartitionTest(testlib.TestCase):
func = staticmethod(mitogen.core.str_partition)
def test_no_sep(self):
left, sep, right = self.func(u'dave', u'x')
self.assertTrue(isinstance(left, mitogen.core.UnicodeType))
self.assertTrue(isinstance(sep, mitogen.core.UnicodeType))
self.assertTrue(isinstance(right, mitogen.core.UnicodeType))
self.assertEquals(left, u'dave')
self.assertEquals(sep, u'')
self.assertEquals(right, u'')
def test_one_sep(self):
left, sep, right = self.func(u'davexdave', u'x')
self.assertTrue(isinstance(left, mitogen.core.UnicodeType))
self.assertTrue(isinstance(sep, mitogen.core.UnicodeType))
self.assertTrue(isinstance(right, mitogen.core.UnicodeType))
self.assertEquals(left, u'dave')
self.assertEquals(sep, u'x')
self.assertEquals(right, u'dave')
def test_two_seps(self):
left, sep, right = self.func(u'davexdavexdave', u'x')
self.assertTrue(isinstance(left, mitogen.core.UnicodeType))
self.assertTrue(isinstance(sep, mitogen.core.UnicodeType))
self.assertTrue(isinstance(right, mitogen.core.UnicodeType))
self.assertEquals(left, u'dave')
self.assertEquals(sep, u'x')
self.assertEquals(right, u'davexdave')
class StrRpartitionTest(testlib.TestCase):
func = staticmethod(mitogen.core.str_rpartition)
def test_no_sep(self):
left, sep, right = self.func(u'dave', u'x')
self.assertTrue(isinstance(left, mitogen.core.UnicodeType))
self.assertTrue(isinstance(sep, mitogen.core.UnicodeType))
self.assertTrue(isinstance(right, mitogen.core.UnicodeType))
self.assertEquals(left, u'')
self.assertEquals(sep, u'')
self.assertEquals(right, u'dave')
def test_one_sep(self):
left, sep, right = self.func(u'davexdave', u'x')
self.assertTrue(isinstance(left, mitogen.core.UnicodeType))
self.assertTrue(isinstance(sep, mitogen.core.UnicodeType))
self.assertTrue(isinstance(right, mitogen.core.UnicodeType))
self.assertEquals(left, u'dave')
self.assertEquals(sep, u'x')
self.assertEquals(right, u'dave')
def test_two_seps(self):
left, sep, right = self.func(u'davexdavexdave', u'x')
self.assertTrue(isinstance(left, mitogen.core.UnicodeType))
self.assertTrue(isinstance(sep, mitogen.core.UnicodeType))
self.assertTrue(isinstance(right, mitogen.core.UnicodeType))
self.assertEquals(left, u'davexdave')
self.assertEquals(sep, u'x')
self.assertEquals(right, u'dave')
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save