|
|
|
@ -74,6 +74,7 @@ import logging
|
|
|
|
|
import os
|
|
|
|
|
import pickle as py_pickle
|
|
|
|
|
import pstats
|
|
|
|
|
import re
|
|
|
|
|
import signal
|
|
|
|
|
import socket
|
|
|
|
|
import struct
|
|
|
|
@ -183,12 +184,14 @@ if PY3:
|
|
|
|
|
FsPathTypes = (str,)
|
|
|
|
|
BufferType = lambda buf, start: memoryview(buf)[start:]
|
|
|
|
|
long = int
|
|
|
|
|
from types import SimpleNamespace
|
|
|
|
|
else:
|
|
|
|
|
b = str
|
|
|
|
|
BytesType = str
|
|
|
|
|
FsPathTypes = (str, unicode)
|
|
|
|
|
BufferType = buffer
|
|
|
|
|
UnicodeType = unicode
|
|
|
|
|
SimpleNamespace = None
|
|
|
|
|
|
|
|
|
|
AnyTextType = (BytesType, UnicodeType)
|
|
|
|
|
|
|
|
|
@ -799,6 +802,45 @@ else:
|
|
|
|
|
_Unpickler = pickle.Unpickler
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#: A list of compiled regex patterns which allow end-users to selectively opt
|
|
|
|
|
#: into deserializing certain globals.
|
|
|
|
|
_PICKLE_GLOBAL_WHITELIST_PATTERNS = None
|
|
|
|
|
_PICKLE_GLOBAL_WHITELIST = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_pickle_whitelist(pattern_strings):
|
|
|
|
|
"""
|
|
|
|
|
Specify regex patterns that control allowable global unpickling functions.
|
|
|
|
|
|
|
|
|
|
`pattern_strings` is sequence of pattern strings that will be fed into
|
|
|
|
|
`re.compile` and then used to authenticate pickle calls. In order for a
|
|
|
|
|
non-trivially typed message to unpickle, one of these patterns must
|
|
|
|
|
match against a complete [module].[function] string.
|
|
|
|
|
"""
|
|
|
|
|
if not isinstance(pattern_strings, (tuple, list, set)):
|
|
|
|
|
pattern_strings = (pattern_strings,)
|
|
|
|
|
|
|
|
|
|
global _PICKLE_GLOBAL_WHITELIST
|
|
|
|
|
global _PICKLE_GLOBAL_WHITELIST_PATTERNS
|
|
|
|
|
|
|
|
|
|
_PICKLE_GLOBAL_WHITELIST = pattern_strings
|
|
|
|
|
_PICKLE_GLOBAL_WHITELIST_PATTERNS = []
|
|
|
|
|
|
|
|
|
|
for patt_str in pattern_strings:
|
|
|
|
|
if not patt_str.endswith('$'):
|
|
|
|
|
patt_str += '$'
|
|
|
|
|
_PICKLE_GLOBAL_WHITELIST_PATTERNS.append(re.compile(patt_str))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _test_pickle_whitelist_accept(module, func):
|
|
|
|
|
if not _PICKLE_GLOBAL_WHITELIST_PATTERNS:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
test_str = "{}.{}".format(module, func)
|
|
|
|
|
return bool(any(
|
|
|
|
|
patt.match(test_str) for patt in _PICKLE_GLOBAL_WHITELIST_PATTERNS))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Message(object):
|
|
|
|
|
"""
|
|
|
|
|
Messages are the fundamental unit of communication, comprising fields from
|
|
|
|
@ -896,7 +938,16 @@ class Message(object):
|
|
|
|
|
return self._unpickle_bytes
|
|
|
|
|
elif module == '__builtin__' and func == 'bytes':
|
|
|
|
|
return BytesType
|
|
|
|
|
raise StreamError('cannot unpickle %r/%r', module, func)
|
|
|
|
|
elif SimpleNamespace and module == 'types' and func == 'SimpleNamespace':
|
|
|
|
|
return SimpleNamespace
|
|
|
|
|
elif _test_pickle_whitelist_accept(module, func):
|
|
|
|
|
try:
|
|
|
|
|
return getattr(import_module(module), func)
|
|
|
|
|
except AttributeError as e:
|
|
|
|
|
LOG.info(str(e))
|
|
|
|
|
raise StreamError(
|
|
|
|
|
'cannot unpickle %r/%r - try using `set_pickle_whitelist`',
|
|
|
|
|
module, func)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_dead(self):
|
|
|
|
@ -996,8 +1047,8 @@ class Message(object):
|
|
|
|
|
# Must occur off the broker thread.
|
|
|
|
|
try:
|
|
|
|
|
obj = unpickler.load()
|
|
|
|
|
except:
|
|
|
|
|
LOG.error('raw pickle was: %r', self.data)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
LOG.error('raw pickle was: %r (exc: %r)', self.data, e)
|
|
|
|
|
raise
|
|
|
|
|
self._unpickled = obj
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
@ -3302,7 +3353,7 @@ class Router(object):
|
|
|
|
|
This can be used from any thread, but its output is only meaningful
|
|
|
|
|
from the context of the :class:`Broker` thread, as disconnection or
|
|
|
|
|
replacement could happen in parallel on the broker thread at any
|
|
|
|
|
moment.
|
|
|
|
|
moment.
|
|
|
|
|
"""
|
|
|
|
|
return (
|
|
|
|
|
self._stream_by_id.get(dst_id) or
|
|
|
|
@ -4011,6 +4062,9 @@ class ExternalContext(object):
|
|
|
|
|
Router.max_message_size = self.config['max_message_size']
|
|
|
|
|
if self.config['profiling']:
|
|
|
|
|
enable_profiling()
|
|
|
|
|
if self.config['pickle_whitelist_patterns']:
|
|
|
|
|
set_pickle_whitelist(self.config['pickle_whitelist_patterns'])
|
|
|
|
|
|
|
|
|
|
self.broker = Broker(activate_compat=False)
|
|
|
|
|
self.router = Router(self.broker)
|
|
|
|
|
self.router.debug = self.config.get('debug', False)
|
|
|
|
|