from __future__ import annotations
import datetime
import isodate
import ormsgpack
from decimal import Decimal
from markupsafe import Markup
from sqlalchemy.util import lightweight_named_tuple
from sqlalchemy.util._collections import AbstractKeyedTuple
from types import GeneratorType
from uuid import UUID
from typing import Any, ClassVar, Generic, TypeVar, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable, Collection, Iterable
[docs]
_TypeT = TypeVar('_TypeT', bound=type[object])
[docs]
class Serializer(Generic[_T]):
""" Provides a way to encode all objects of a given class or its
subclasses to and from MessagePack using extension types.
"""
def __init__(self, tag: int, target: type[_T]):
assert isinstance(target, type), 'expects a class'
assert 0 <= tag <= 127, 'needs to be between 0 and 127'
[docs]
def encode(self, obj: _T) -> bytes:
raise NotImplementedError
[docs]
def decode(self, value: bytes) -> _T:
raise NotImplementedError
[docs]
class BytesSerializer(Serializer[_T]):
""" Serializes objects to a byte string. """
def __init__(
self,
tag: int,
target: type[_T],
encode: Callable[[_T], bytes],
decode: Callable[[bytes], _T],
):
super().__init__(tag, target)
[docs]
def encode(self, obj: _T) -> bytes:
return self._encode(obj)
[docs]
def decode(self, value: bytes) -> _T:
return self._decode(value)
[docs]
class StringSerializer(Serializer[_T]):
""" Serializes objects to a string. """
def __init__(
self,
tag: int,
target: type[_T],
encode: Callable[[_T], str],
decode: Callable[[str], _T],
):
super().__init__(tag, target)
[docs]
def encode(self, obj: _T) -> bytes:
return self._encode(obj).encode('utf-8')
[docs]
def decode(self, value: bytes) -> _T:
return self._decode(value.decode('utf-8'))
[docs]
class DictionarySerializer(Serializer[_T]):
""" Serializes objects that can be built with keyword arguments.
For example::
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
Can be serialised using::
DictionarySerializer(Point, ('x', 'y'))
Which results in something like this in JSON::
{'x': 1, 'y': 2}
As the internal __dict__ represenation is of no concern, __slots__ may
be used:
class Point:
__slots__ = ('x', 'y')
def __init__(self, x, y):
self.x = x
self.y = y
"""
def __init__(self, tag: int, target: type[_T], keys: Iterable[str]):
super().__init__(tag, target)
self.keys = keys = tuple(keys)
assert len(keys) == len(set(keys)), 'duplicate keys given'
[docs]
self.constructor = getattr(target, 'from_dict', target)
[docs]
def encode(self, obj: _T) -> bytes:
return packb([getattr(obj, k) for k in self.keys])
[docs]
def decode(self, value: bytes) -> _T:
values = unpackb(value)
assert isinstance(values, list)
return self.constructor(**dict(zip(self.keys, values, strict=True)))
[docs]
class Serializers:
""" Organises the different serializer implementations under a unifiying
interface. This allows the actual encoder/decoder to call a single class
without having to worry how the various serializers need to be looked up
and called.
"""
[docs]
by_tag: dict[int, Serializer[Any]]
[docs]
by_type: dict[type[object], tuple[int, Serializer[Any]]]
def __init__(self) -> None:
self.by_tag = {}
self.by_type = {}
[docs]
def register(
self,
serializer: Serializer[Any]
) -> None:
tag = serializer.tag
assert tag not in self.by_tag
self.by_tag[tag] = serializer
assert serializer.target not in self.by_type
self.by_type[serializer.target] = (tag, serializer)
[docs]
def encode(self, value: object) -> object:
tag_and_serializer = self.by_type.get(value.__class__)
if tag_and_serializer is None:
if isinstance(value, dict):
return dict(value)
elif isinstance(value, AbstractKeyedTuple):
tag_and_serializer = self.by_type[AbstractKeyedTuple]
elif isinstance(value, GeneratorType):
tag_and_serializer = self.by_type[tuple]
else:
raise TypeError(f'{value!r} is not MessagePack serializable')
tag, serializer = tag_and_serializer
return ormsgpack.Ext(tag, serializer.encode(value))
[docs]
def decode(self, tag: int, value: bytes) -> Any:
serializer = self.by_tag.get(tag)
if serializer is not None:
value = serializer.decode(value)
return value
# The builtin serializers
[docs]
default_serializers = Serializers()
default_serializers.register(StringSerializer(
tag=0,
target=datetime.datetime,
encode=isodate.datetime_isoformat,
decode=isodate.parse_datetime
))
default_serializers.register(StringSerializer(
tag=1,
target=datetime.time,
encode=isodate.time_isoformat,
decode=isodate.parse_time
))
default_serializers.register(StringSerializer(
tag=2,
target=datetime.date,
encode=isodate.date_isoformat,
decode=isodate.parse_date
))
default_serializers.register(StringSerializer(
tag=3,
target=Decimal,
encode=str,
decode=Decimal
))
default_serializers.register(BytesSerializer(
tag=4,
target=UUID,
encode=lambda u: u.bytes,
decode=lambda b: UUID(bytes=b)
))
# NOTE: This might not be worth the cost in performance
# maybe we're fine with tuples turning into lists
# remove `OPT_PASSTHROUGH_TUPLE` when removing this
default_serializers.register(BytesSerializer(
tag=5,
target=tuple,
encode=lambda t: packb(list(t)),
decode=lambda b: tuple(unpackb(b))
))
# NOTE: We currently only use this to serialize the groupids
# for the current identity, we could consider replacing
# this with a serializer for morepath.Identity instead
default_serializers.register(BytesSerializer(
tag=6,
target=frozenset,
encode=lambda t: packb(list(t)),
decode=lambda b: frozenset(unpackb(b))
))
# NOTE: SQLAlchemy result support
[docs]
def load_keyed_tuple(b: bytes) -> AbstractKeyedTuple[Any]:
name, items = unpackb(b)
cls = lightweight_named_tuple(name, items.keys())
return cls(**items)
default_serializers.register(BytesSerializer(
tag=7,
target=AbstractKeyedTuple,
encode=lambda t: packb([
t.__class__.__name__,
{key: getattr(t, key) for key in t.keys()},
]),
decode=load_keyed_tuple
))
default_serializers.register(StringSerializer(
tag=8,
target=Markup,
encode=str,
decode=Markup
))
[docs]
class Serializable:
""" Classes inheriting from this base are serialised using the
:class:`DictionarySerializer` class.
The keys that should be used need to be specified as follows::
class Point(Serializable, keys=('x', 'y')):
def __init__(self, x, y):
self.x = x
self.y = y
"""
[docs]
serialized_keys: ClassVar[Collection[str]]
@classmethod
[docs]
def serializers(cls) -> Serializers:
return default_serializers # for testing
[docs]
def __init_subclass__(cls, tag: int, keys: Collection[str], **kwargs: Any):
super().__init_subclass__(**kwargs)
cls.serialized_keys = keys
cls.serializers().register(DictionarySerializer(
tag=tag,
target=cls,
keys=keys
))
[docs]
def make_serializable(
*,
tag: int,
serializers: Serializers = default_serializers
) -> Callable[[_TypeT], _TypeT]:
def decorator(cls: _TypeT) -> _TypeT:
keys = getattr(cls, '_fields', getattr(cls, '__slots__', ()))
assert keys, f'{cls!r} is not serializable'
serializers.register(DictionarySerializer(
tag=tag,
target=cls,
keys=keys
))
return cls
return decorator
[docs]
PACK_OPTIONS = (
# NOTE: Technically we only need this for one instance where we
# want `None` as a key, but it should be fine to allow
# other types as well. It only allows the basic types
# anyways and no Ext types.
ormsgpack.OPT_NON_STR_KEYS
# NOTE: We want serialization to be fully reversible
# so we use our own serialization for these
| ormsgpack.OPT_PASSTHROUGH_SUBCLASS
| ormsgpack.OPT_PASSTHROUGH_DATACLASS
| ormsgpack.OPT_PASSTHROUGH_DATETIME
| ormsgpack.OPT_PASSTHROUGH_TUPLE
| ormsgpack.OPT_PASSTHROUGH_UUID
)
[docs]
def packb(obj: Any) -> bytes:
return ormsgpack.packb(
obj,
default=default_serializers.encode,
option=PACK_OPTIONS
)
[docs]
def unpackb(value: bytes) -> Any:
return ormsgpack.unpackb(
value,
ext_hook=default_serializers.decode,
option=ormsgpack.OPT_NON_STR_KEYS,
)