"""Bitstruct based codec for binary data structures."""
import math
import warnings
import functools
from typing import Optional
try:
import cbitstruct as bitstruct
except ImportError:
import bitstruct
try:
import bitstruct.c
except ImportError:
pass
import bpack
import bpack.utils
import bpack.codecs
from .enums import EBaseUnits, EByteOrder
from .codecs import has_codec, get_codec
from .descriptors import field_descriptors
__all__ = [
'Decoder', 'decoder', 'Encoder', 'encoder', 'Codec', 'codec',
'BACKEND_NAME', 'BACKEND_TYPE',
'packbits', 'unpackbits',
]
BACKEND_NAME = 'bitstruct'
BACKEND_TYPE = EBaseUnits.BITS
class BitStruct:
@staticmethod
def _simplified_fmt(format: str) -> Optional[str]:
fmt = format.replace('>', '')
if '<' in fmt:
return None
else:
return fmt
def __init__(self, format: str, names=None): # noqa
codec_ = None
if hasattr(bitstruct, 'c'):
fmt = self._simplified_fmt(format)
if fmt is not None:
try:
codec_ = bitstruct.c.compile(fmt, names)
except NotImplementedError:
pass
if codec_ is None:
codec_ = bitstruct.compile(format, names)
self._bitstruct = codec_
self._format: str = format
@property
def format(self) -> str:
return self._format
def __getattr__(self, name):
return getattr(self._bitstruct, name)
_TYPE_TO_STR = {
bool: 'b',
int: 'u',
(int, False): 'u',
(int, True): 's',
float: 'f',
bytes: 'r',
str: 't',
None: 'p',
}
def _format_string_without_order(fmt: str, order: str) -> str:
# NOTE: in the current implementation the byte order is handled
# externally to _to_fmt
if order != '':
fmt = fmt[:-1] if fmt.endswith(order) else fmt
return fmt
def _to_fmt(type_, size: int, bitorder: str = '', byteorder: str = '',
signed: Optional[bool] = None,
repeat: Optional[int] = None) -> str:
assert size > 0, f'invalid size: {size:r}'
assert bitorder in ('', '>', '<'), f'invalid order: {bitorder:r}'
if repeat is None:
repeat = 1
assert repeat > 0, f'invalid repeat: {repeat:r}'
if has_codec(type_, bpack.codecs.Decoder):
decoder_ = get_codec(type_)
if isinstance(decoder_, Decoder):
return _format_string_without_order(decoder_.format, byteorder)
elif (bpack.is_descriptor(type_) and
bpack.baseunits(type_) is Decoder.baseunits):
decoder_ = Decoder(type_)
return _format_string_without_order(decoder_.format, byteorder)
etype = bpack.utils.effective_type(type_)
key = (etype, signed) if etype is int and signed is not None else etype
try:
fmt = f'{bitorder}{_TYPE_TO_STR[key]}{size}' * repeat # noqa
except KeyError:
raise TypeError(f'unsupported type: {etype:!r}')
# fmt += byteorder # NOTE: handled externally
return fmt
def _endianess_to_str(order: EByteOrder) -> str:
if order is EByteOrder.NATIVE:
return EByteOrder.get_native().value
return order.value
[docs]class Codec(bpack.codecs.BaseStructCodec):
"""Bitstruct based codec.
Default bit-order: MSB.
"""
baseunits = EBaseUnits.BITS
@staticmethod
def _get_base_codec(descriptor):
byteorder = bpack.byteorder(descriptor)
byteorder = _endianess_to_str(byteorder)
bitorder = bpack.bitorder(descriptor).value
fmt = ''.join(
_to_fmt(field_descr.type, size=field_descr.size, bitorder=bitorder,
byteorder=byteorder, signed=field_descr.signed,
repeat=field_descr.repeat)
for field_descr in field_descriptors(descriptor, pad=True)
)
fmt = fmt + byteorder # byte order
return BitStruct(fmt)
@staticmethod
def _get_decode_converters_map(descriptor):
return {
field_descr.type: field_descr.type
for field_descr in field_descriptors(descriptor)
if bpack.utils.is_enum_type(field_descr.type)
}
@staticmethod
def _get_encode_converters_map(descriptor):
def from_enum(x):
return x.value
converters_map = {
field_descr.type: from_enum
for field_descr in field_descriptors(descriptor)
if (bpack.utils.is_enum_type(field_descr.type) and
not issubclass(field_descr.type, int))
}
return converters_map
codec = bpack.codecs.make_codec_decorator(Codec)
Decoder = Encoder = Codec
decoder = encoder = codec
@functools.lru_cache()
def _get_sequence_codec(nsamples: int, bits_per_sample, signed=False,
byteorder: str = '') -> BitStruct:
nbits = nsamples * bits_per_sample
outsize = math.ceil(nbits / 8)
npad = outsize * 8 - nbits
if signed:
fmt = f's{bits_per_sample:d}' * nsamples
else:
fmt = f'u{bits_per_sample:d}' * nsamples
if npad > 0:
fmt += f'p{npad:d}'
fmt += byteorder
return BitStruct(fmt)
[docs]def packbits(values, bits_per_sample: int, signed: bool = False,
byteorder: str = '') -> bytes:
"""Pack integer values using the specified number of bits for each sample.
Converts a sequence of values into a string of bytes in which each
sample is stored according to the specified number of bits.
Example::
4 samples 3 bytes
[samp_1, samp_2, samp_3, samp_4] --> |------|------|------|------|
4 samples (6 bits per sample)
Please note that no check that the input values actually fits in the
specified number of bits is performed is performed.
The function return a sting of bytes including same number of samples
of the input plus possibly some padding bit (at the end) to fill an
integer number of bytes.
If ``signed`` is set to True integers are stored as signed integers.
"""
nsamples = len(values)
if (nsamples * bits_per_sample) % 8:
warnings.warn(f'packing {nsamples} with {bits_per_sample} bits per '
f'sample requires padding')
encoder_ = _get_sequence_codec(nsamples, bits_per_sample,
signed=signed, byteorder=byteorder)
return encoder_.pack(*values)
[docs]def unpackbits(data: bytes, bits_per_sample: int, signed: bool = False,
byteorder: str = ''):
"""Unpack packed (integer) values form a string of bytes.
Takes in input a string of bytes in which (integer) samples have been
stored using ``bits_per_sample`` bit for each sample, and returns
the sequence of corresponding Python integers.
Example::
3 bytes 4 samples
|------|------|------|------| --> [samp_1, samp_2, samp_3, samp_4]
4 samples (6 bits per sample)
If ``signed`` is set to True integers are assumed to be stored as
signed integers.
"""
nsamples = len(data) * 8 // bits_per_sample
decoder_ = _get_sequence_codec(nsamples, bits_per_sample,
signed=signed, byteorder=byteorder)
return decoder_.unpack(data)