Source code for bpack.codecs

"""Base classes and utility functions for codecs."""

import abc
from typing import Callable, NamedTuple, Optional, Type, Union

import bpack.utils
import bpack.descriptors

from .enums import EBaseUnits
from .descriptors import field_descriptors


CODEC_ATTR_NAME = '__bpack_decoder__'


[docs]class BaseCodec: baseunits: EBaseUnits @classmethod def _check_descriptor(cls, descriptor): if bpack.baseunits(descriptor) is not cls.baseunits: raise ValueError( f'{cls.__module__}.{cls.__name__} ' f'only accepts descriptors with base units ' f'"{cls.baseunits.value}"') def __init__(self, descriptor): self._check_descriptor(descriptor) self._descriptor = descriptor @property def descriptor(self): return self._descriptor
[docs]class Decoder(BaseCodec, abc.ABC):
[docs] @abc.abstractmethod def decode(self, data: bytes): pass
[docs]class Encoder(BaseCodec, abc.ABC):
[docs] @abc.abstractmethod def encode(self, record) -> bytes: pass
[docs]class Codec(Decoder, Encoder, abc.ABC): pass
CodecType = Union[Decoder, Encoder, Codec]
[docs]def make_codec_decorator(codec_type: Type[CodecType]): """Generate a codec decorator for the input decoder class.""" @bpack.utils.classdecorator def codec(cls): """Class decorator to add (de)coding methods to a descriptor class. The decorator automatically generates a *Codec* object form the input descriptor class and attach to it methods for conversion form/to bytes. """ codec_ = codec_type(descriptor=cls) bpack.utils.set_new_attribute(cls, CODEC_ATTR_NAME, codec_) if isinstance(codec_, Decoder): decode_func = bpack.utils.create_fn( name='frombytes', args=('cls', 'data'), body=[f'return cls.{CODEC_ATTR_NAME}.decode(data)'], ) decode_func = classmethod(decode_func) bpack.utils.set_new_attribute(cls, 'frombytes', decode_func) if isinstance(codec_, Encoder): encode_func = bpack.utils.create_fn( name='tobytes', args=('self',), body=[f'return self.{CODEC_ATTR_NAME}.encode(self)'], ) bpack.utils.set_new_attribute(cls, 'tobytes', encode_func) return cls return codec
[docs]def has_codec(descriptor, codec_type: Optional[Type[CodecType]] = None) -> bool: """Return True if the input descriptor has a codec attached. A descriptor decorated with a *codec* decorator has an attached codec instance and "frombytes"/"tobytes" methods (depending on the kind of codec). The *codec_type* parameter can be used to query for specific codec features: * codec_type=None: return True for any king of codec * codec_type=:class:`Decoder`: return True if the attached coded has decoding capabilities * codec_type=:class:`Encoder`: return True if the attached coded has encoding capabilities * codec_type=:class:`Codec`: return True if the attached coded has both encoding and decoding capabilities """ if hasattr(descriptor, CODEC_ATTR_NAME): assert isinstance(get_codec(descriptor), (Codec, Decoder, Encoder)) if codec_type is None: return True elif issubclass(codec_type, Codec): return (hasattr(descriptor, 'frombytes') and hasattr(descriptor, 'tobytes')) elif issubclass(codec_type, Decoder): return hasattr(descriptor, 'frombytes') elif issubclass(codec_type, Encoder): return hasattr(descriptor, 'tobytes') return False
[docs]def get_codec(descriptor) -> CodecType: """Return the codec instance attached to the input descriptor.""" return getattr(descriptor, CODEC_ATTR_NAME, None)
# TODO: remove
[docs]def get_codec_type(descriptor) -> Type[CodecType]: """Return the type of the codec attached to the input descriptor.""" codec_ = getattr(descriptor, CODEC_ATTR_NAME, None) if codec_ is not None: return type(codec_)
def _get_flat_len(descriptor): count = 0 for field_descr in field_descriptors(descriptor): if bpack.is_descriptor(field_descr.type): count += _get_flat_len(field_descr.type) elif field_descr.repeat is not None: count += field_descr.repeat else: count += 1 return count
[docs]class ConverterInfo(NamedTuple): func: Callable src: Union[int, slice] dst: Union[int, slice]
[docs]class BaseStructCodec(Codec): @staticmethod @abc.abstractmethod def _get_base_codec(descriptor): pass def __init__(self, descriptor, codec=None, decode_converters=None, encode_converters=None): """Initializer. The *descriptor* parameter* is a bpack record descriptor. """ super().__init__(descriptor) if codec is None: codec = self._get_base_codec(descriptor) if decode_converters is None: decode_converters = self._get_decode_converters(descriptor) if encode_converters is None: encode_converters = self._get_encode_converters(descriptor) self._codec = codec self._decode_converters = decode_converters self._encode_converters = encode_converters self._flat_len = _get_flat_len(descriptor) @property def format(self) -> str: """Return the format string.""" return self._codec.format @classmethod def _get_decoder(cls, descr): assert (bpack.is_descriptor(descr) and bpack.baseunits(descr) is cls.baseunits) if has_codec(descr, Decoder): decoder_ = get_codec(descr) return decoder_ decoder_ = cls(descr) # noqa return decoder_ @staticmethod def _get_decode_converters_map(descriptor): return {} @classmethod def _get_decode_converters(cls, descriptor): converters_map = cls._get_decode_converters_map(descriptor) converters = [] for idx, field_descr in enumerate(field_descriptors(descriptor)): if field_descr.type in converters_map: func = converters_map[field_descr.type] converters.append(ConverterInfo(func, idx, idx)) elif bpack.is_descriptor(field_descr.type): decoder_ = cls._get_decoder(field_descr.type) n_items = decoder_._flat_len src = slice(idx, idx + n_items) func = decoder_._from_flat_list converters.append(ConverterInfo(func, src, idx)) elif field_descr.repeat is not None: sequence_type = bpack.utils.sequence_type(field_descr.type, error=True) src = slice(idx, idx + field_descr.repeat) converters.append(ConverterInfo(sequence_type, src, idx)) return converters def _from_flat_list(self, values): for func, src, dst in self._decode_converters: if isinstance(src, int): values[dst] = func(values[src]) else: value = func(values[src]) del values[src] values.insert(dst, value) return self.descriptor(*values)
[docs] def decode(self, data: bytes): """Decode binary data and return a record object.""" values = list(self._codec.unpack(data)) return self._from_flat_list(values)
@classmethod def _get_encoder(cls, descr): assert (bpack.is_descriptor(descr) and bpack.baseunits(descr) is cls.baseunits) if has_codec(descr, Encoder): encoder_ = get_codec(descr) return encoder_ encoder_ = cls(descr) # noqa return encoder_ @staticmethod def _get_encode_converters_map(descriptor): return {} @classmethod def _get_encode_converters(cls, descriptor): converters_map = cls._get_encode_converters_map(descriptor) def nullop(x): return x converters = [] for idx, field_descr in enumerate(field_descriptors(descriptor)): if field_descr.type in converters_map: func = converters_map[field_descr.type] converters.append(ConverterInfo(func, idx, idx)) elif bpack.is_descriptor(field_descr.type): encoder_ = cls._get_encoder(field_descr.type) func = encoder_._to_flat_list slice_ = slice(idx, idx + 1) converters.append(ConverterInfo(func, idx, slice_)) elif field_descr.repeat is not None: slice_ = slice(idx, idx + 1) converters.append(ConverterInfo(nullop, idx, slice_)) return converters def _to_flat_list(self, record): values = [ getattr(record, field.name) for field in bpack.fields(record) ] for func, src, dst in self._encode_converters[::-1]: values[dst] = func(values[src]) return values
[docs] def encode(self, record) -> bytes: """Encode a record object into binary data.""" values = self._to_flat_list(record) return self._codec.pack(*values)