core: split out iter_split() for use in parent.py.

pull/607/head
David Wilson 5 years ago
parent f43f886c37
commit 5aca9d6c3f

@ -596,6 +596,23 @@ def import_module(modname):
return __import__(modname, None, None, [''])
def iter_split(buf, delim, func):
"""
Invoke `func(s)` for each `delim`-delimited chunk in the potentially large
`buf`, avoiding intermediate lists and quadratic copies. Return the
trailing undelimited portion of `buf`.
"""
start = 0
while True:
nl = buf.find(delim, start)
if nl == -1:
break
func(buf[start:nl])
start = nl + 1
return buf[start:]
class Py24Pickler(py_pickle.Pickler):
"""
Exceptions were classic classes until Python 2.5. Sadly for 2.4, cPickle
@ -2427,18 +2444,6 @@ class IoLogger(BasicStream):
def __repr__(self):
return '<IoLogger %s>' % (self._name,)
def _log_lines(self, buf):
start = 0
while True:
nl = min(buf.find('\n', start), start+1024)
if nl == -1:
break
self._log.info('%s', buf[start:nl])
start = nl + 1
if start:
self._trailer = buf[start:]
def on_shutdown(self, broker):
"""Shut down the write end of the logging socket."""
_v and LOG.debug('%r.on_shutdown()', self)
@ -2454,7 +2459,11 @@ class IoLogger(BasicStream):
if not buf:
return self.on_disconnect(broker)
self._log_lines(self._trailer + buf.decode('latin1'))
self._trailer = iter_split(
buf=self._trailer + buf.decode('latin1'),
delim='\n',
func=lambda s: self._log.info('%s', s)
)
class Router(object):

@ -0,0 +1,41 @@
import mock
import unittest2
import mitogen.core
import testlib
class IterSplitTest(unittest2.TestCase):
func = staticmethod(mitogen.core.iter_split)
def test_empty_buffer(self):
lst = []
trailer = self.func(buf='', delim='\n', func=lst.append)
self.assertEquals('', trailer)
self.assertEquals([], lst)
def test_empty_line(self):
lst = []
trailer = self.func(buf='\n', delim='\n', func=lst.append)
self.assertEquals('', trailer)
self.assertEquals([''], lst)
def test_one_line(self):
buf = 'xxxx\n'
lst = []
trailer = self.func(buf=buf, delim='\n', func=lst.append)
self.assertEquals('', trailer)
self.assertEquals(lst, ['xxxx'])
def test_one_incomplete(self):
buf = 'xxxx\nyy'
lst = []
trailer = self.func(buf=buf, delim='\n', func=lst.append)
self.assertEquals('yy', trailer)
self.assertEquals(lst, ['xxxx'])
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save