Source code for rlp.codec

import collections
import sys
from .exceptions import EncodingError, DecodingError

from .utils import (Atomic, str_to_bytes, is_integer, ascii_chr, safe_ord, big_endian_to_int,
                    int_to_big_endian)
from .sedes.binary import Binary as BinaryClass
from .sedes import big_endian_int, binary
from .sedes.lists import List, Serializable, is_sedes


if sys.version_info.major == 2:
    from itertools import imap as map


[docs]def encode(obj, sedes=None, infer_serializer=True, cache=False): """Encode a Python object in RLP format. By default, the object is serialized in a suitable way first (using :func:`rlp.infer_sedes`) and then encoded. Serialization can be explicitly suppressed by setting `infer_serializer` to ``False`` and not passing an alternative as `sedes`. If `obj` has an attribute :attr:`_cached_rlp` (as, notably, :class:`rlp.Serializable`) and its value is not `None`, this value is returned bypassing serialization and encoding, unless `sedes` is given (as the cache is assumed to refer to the standard serialization which can be replaced by specifying `sedes`). If `obj` is a :class:`rlp.Serializable` and `cache` is true, the result of the encoding will be stored in :attr:`_cached_rlp` if it is empty and :meth:`rlp.Serializable.make_immutable` will be invoked on `obj`. :param sedes: an object implementing a function ``serialize(obj)`` which will be used to serialize ``obj`` before encoding, or ``None`` to use the infered one (if any) :param infer_serializer: if ``True`` an appropriate serializer will be selected using :func:`rlp.infer_sedes` to serialize `obj` before encoding :param cache: cache the return value in `obj._cached_rlp` if possible and make `obj` immutable (default `False`) :returns: the RLP encoded item :raises: :exc:`rlp.EncodingError` in the rather unlikely case that the item is too big to encode (will not happen) :raises: :exc:`rlp.SerializationError` if the serialization fails """ if isinstance(obj, Serializable): if obj._cached_rlp and sedes is None: return obj._cached_rlp else: really_cache = cache if sedes is None else False else: really_cache = False if sedes: item = sedes.serialize(obj) elif infer_serializer: item = infer_sedes(obj).serialize(obj) else: item = obj result = encode_raw(item) if really_cache: obj._cached_rlp = result obj.make_immutable() return result
class RLPData(str): "wraper to mark already rlp serialized data" pass def encode_raw(item): """RLP encode (a nested sequence of) :class:`Atomic`s.""" if isinstance(item, RLPData): return item elif isinstance(item, Atomic): if len(item) == 1 and safe_ord(item[0]) < 128: return str_to_bytes(item) payload = str_to_bytes(item) prefix_offset = 128 # string elif isinstance(item, collections.Sequence): payload = b''.join(encode_raw(x) for x in item) prefix_offset = 192 # list else: msg = 'Cannot encode object of type {0}'.format(type(item).__name__) raise EncodingError(msg, item) try: prefix = length_prefix(len(payload), prefix_offset) except ValueError: raise EncodingError('Item too big to encode', item) return prefix + payload def length_prefix(length, offset): """Construct the prefix to lists or strings denoting their length. :param length: the length of the item in bytes :param offset: ``0x80`` when encoding raw bytes, ``0xc0`` when encoding a list """ if length < 56: return ascii_chr(offset + length) elif length < 256**8: length_string = int_to_big_endian(length) return ascii_chr(offset + 56 - 1 + len(length_string)) + length_string else: raise ValueError('Length greater than 256**8') def consume_length_prefix(rlp, start): """Read a length prefix from an RLP string. :param rlp: the rlp string to read from :param start: the position at which to start reading :returns: a tuple ``(type, length, end)``, where ``type`` is either ``str`` or ``list`` depending on the type of the following payload, ``length`` is the length of the payload in bytes, and ``end`` is the position of the first payload byte in the rlp string """ b0 = safe_ord(rlp[start]) if b0 < 128: # single byte return (str, 1, start) elif b0 < 128 + 56: # short string if b0 - 128 == 1 and safe_ord(rlp[start + 1]) < 128: raise DecodingError('Encoded as short string although single byte was possible', rlp) return (str, b0 - 128, start + 1) elif b0 < 192: # long string ll = b0 - 128 - 56 + 1 if rlp[start + 1:start + 2] == b'\x00': raise DecodingError('Length starts with zero bytes', rlp) l = big_endian_to_int(rlp[start + 1:start + 1 + ll]) if l < 56: raise DecodingError('Long string prefix used for short string', rlp) return (str, l, start + 1 + ll) elif b0 < 192 + 56: # short list return (list, b0 - 192, start + 1) else: # long list ll = b0 - 192 - 56 + 1 if rlp[start + 1:start + 2] == b'\x00': raise DecodingError('Length starts with zero bytes', rlp) l = big_endian_to_int(rlp[start + 1:start + 1 + ll]) if l < 56: raise DecodingError('Long list prefix used for short list', rlp) return (list, l, start + 1 + ll) def consume_payload(rlp, start, type_, length): """Read the payload of an item from an RLP string. :param rlp: the rlp string to read from :param type_: the type of the payload (``str`` or ``list``) :param start: the position at which to start reading :param length: the length of the payload in bytes :returns: a tuple ``(item, end)``, where ``item`` is the read item and ``end`` is the position of the first unprocessed byte """ if type_ == str: return (rlp[start:start + length], start + length) elif type_ == list: items = [] next_item_start = start end = next_item_start + length while next_item_start < end: # item, next_item_start = consume_item(rlp, next_item_start) t, l, s = consume_length_prefix(rlp, next_item_start) item, next_item_start = consume_payload(rlp, s, t, l) items.append(item) if next_item_start > end: raise DecodingError('List length prefix announced a too small ' 'length', rlp) return (items, next_item_start) else: raise TypeError('Type must be either list or str') def consume_item(rlp, start): """Read an item from an RLP string. :param rlp: the rlp string to read from :param start: the position at which to start reading :returns: a tuple ``(item, end)`` where ``item`` is the read item and ``end`` is the position of the first unprocessed byte """ t, l, s = consume_length_prefix(rlp, start) return consume_payload(rlp, s, t, l)
[docs]def decode(rlp, sedes=None, strict=True, **kwargs): """Decode an RLP encoded object. If the deserialized result `obj` has an attribute :attr:`_cached_rlp` (e.g. if `sedes` is a subclass of :class:`rlp.Serializable`) it will be set to `rlp`, which will improve performance on subsequent :func:`rlp.encode` calls. Bear in mind however that `obj` needs to make sure that this value is updated whenever one of its fields changes or prevent such changes entirely (:class:`rlp.sedes.Serializable` does the latter). :param sedes: an object implementing a function ``deserialize(code)`` which will be applied after decoding, or ``None`` if no deserialization should be performed :param \*\*kwargs: additional keyword arguments that will be passed to the deserializer :param strict: if false inputs that are longer than necessary don't cause an exception :returns: the decoded and maybe deserialized Python object :raises: :exc:`rlp.DecodingError` if the input string does not end after the root item and `strict` is true :raises: :exc:`rlp.DeserializationError` if the deserialization fails """ rlp = str_to_bytes(rlp) try: item, end = consume_item(rlp, 0) except IndexError: raise DecodingError('RLP string to short', rlp) if end != len(rlp) and strict: msg = 'RLP string ends with {} superfluous bytes'.format(len(rlp) - end) raise DecodingError(msg, rlp) if sedes: obj = sedes.deserialize(item, **kwargs) if hasattr(obj, '_cached_rlp'): obj._cached_rlp = rlp assert not isinstance(obj, Serializable) or not obj.is_mutable() return obj else: return item
def descend(rlp, *path): rlp = str_to_bytes(rlp) for p in path: pos = 0 _typ, _len, pos = consume_length_prefix(rlp, pos) if _typ != list: raise DecodingError('Trying to descend through a non-list!', rlp) for i in range(p): _, _l, _p = consume_length_prefix(rlp, pos) pos = _l + _p _, _l, _p = consume_length_prefix(rlp, pos) rlp = rlp[pos: _p + _l] return rlp
[docs]def infer_sedes(obj): """Try to find a sedes objects suitable for a given Python object. The sedes objects considered are `obj`'s class, `big_endian_int` and `binary`. If `obj` is a sequence, a :class:`rlp.sedes.List` will be constructed recursively. :param obj: the python object for which to find a sedes object :raises: :exc:`TypeError` if no appropriate sedes could be found """ if is_sedes(obj.__class__): return obj.__class__ if is_integer(obj) and obj >= 0: return big_endian_int if BinaryClass.is_valid_type(obj): return binary if isinstance(obj, collections.Sequence): return List(map(infer_sedes, obj)) msg = 'Did not find sedes handling type {}'.format(type(obj).__name__) raise TypeError(msg)
def append(rlpdata, obj): _typ, _len, _pos = consume_length_prefix(rlpdata, 0) assert _typ is list rlpdata = rlpdata[_pos:] + encode(obj) prefix = length_prefix(len(rlpdata), 192) return prefix + rlpdata def insert(rlpdata, index, obj): _typ, _len, _pos = consume_length_prefix(rlpdata, 0) _beginpos = _pos assert _typ is list for i in range(index): _, _l, _p = consume_length_prefix(rlpdata, _pos) _pos = _l + _p if _l + _p >= len(rlpdata): break rlpdata = rlpdata[_beginpos:_pos] + encode(obj) + rlpdata[_pos:] prefix = length_prefix(len(rlpdata), 192) return prefix + rlpdata def pop(rlpdata, index=2**50): _typ, _len, _pos = consume_length_prefix(rlpdata, 0) _initpos = _pos assert _typ is list while index > 0: _, _l, _p = consume_length_prefix(rlpdata, _pos) if _l + _p >= len(rlpdata): break _pos = _l + _p index -= 1 _, _l, _p = consume_length_prefix(rlpdata, _pos) newdata = rlpdata[_initpos:_pos] + rlpdata[_l + _p:] prefix = length_prefix(len(newdata), 192) return prefix + newdata EMPTYLIST = encode([]) def compare_length(rlpdata, length): _typ, _len, _pos = consume_length_prefix(rlpdata, 0) _initpos = _pos assert _typ is list lenlist = 0 if rlpdata == EMPTYLIST: return -1 if length > 0 else 1 if length < 0 else 0 while 1: if lenlist > length: return 1 _, _l, _p = consume_length_prefix(rlpdata, _pos) lenlist += 1 if _l + _p >= len(rlpdata): break _pos = _l + _p return 0 if lenlist == length else -1