"""Bitstruct based codec for binary data structures."""
import math
import warnings
import functools
from typing import Optional
try:
import cbitstruct as bitstruct
except ImportError:
import bitstruct
try:
import bitstruct.c
except ImportError:
pass
import bpack
import bpack.utils
import bpack.codecs
from .enums import EBaseUnits, EByteOrder
from .codecs import has_codec, get_codec
from .descriptors import field_descriptors
__all__ = [
"Decoder",
"decoder",
"Encoder",
"encoder",
"Codec",
"codec",
"BACKEND_NAME",
"BACKEND_TYPE",
"packbits",
"unpackbits",
]
BACKEND_NAME = "bitstruct"
BACKEND_TYPE = EBaseUnits.BITS
class BitStruct:
@staticmethod
def _simplified_fmt(format_: str) -> Optional[str]:
fmt = format_.replace(">", "")
if "<" in fmt:
return None
else:
return fmt
def __init__(self, format_: str, names=None):
codec_ = None
if hasattr(bitstruct, "c"):
fmt = self._simplified_fmt(format_)
if fmt is not None:
try:
codec_ = bitstruct.c.compile(fmt, names)
except NotImplementedError:
pass
if codec_ is None:
codec_ = bitstruct.compile(format_, names)
self._bitstruct = codec_
self._format: str = format_
@property
def format(self) -> str: # noqa: A003
return self._format
def __getattr__(self, name):
return getattr(self._bitstruct, name)
_TYPE_TO_STR = {
bool: "b",
int: "u",
(int, False): "u",
(int, True): "s",
float: "f",
bytes: "r",
str: "t",
None: "p",
}
def _format_string_without_order(fmt: str, order: str) -> str:
# NOTE: in the current implementation the byte order is handled
# externally to _to_fmt
if order != "":
fmt = fmt[:-1] if fmt.endswith(order) else fmt
return fmt
def _to_fmt(
type_,
size: int,
bitorder: str = "",
byteorder: str = "",
signed: Optional[bool] = None,
repeat: Optional[int] = None,
) -> str:
assert size > 0, f"invalid size: {size:r}"
assert bitorder in ("", ">", "<"), f"invalid order: {bitorder:r}"
if repeat is None:
repeat = 1
assert repeat > 0, f"invalid repeat: {repeat:r}"
if has_codec(type_, bpack.codecs.Decoder):
decoder_ = get_codec(type_)
if isinstance(decoder_, Decoder):
return _format_string_without_order(decoder_.format, byteorder)
elif (
bpack.is_descriptor(type_)
and bpack.baseunits(type_) is Decoder.baseunits
):
decoder_ = Decoder(type_)
return _format_string_without_order(decoder_.format, byteorder)
etype = bpack.utils.effective_type(type_)
key = (etype, signed) if etype is int and signed is not None else etype
try:
fmt = f"{bitorder}{_TYPE_TO_STR[key]}{size}" * repeat
except KeyError:
raise TypeError(f"unsupported type: {etype:!r}")
# fmt += byteorder # NOTE: handled externally
return fmt
def _endianess_to_str(order: EByteOrder) -> str:
if order is EByteOrder.NATIVE:
return EByteOrder.get_native().value
return order.value
[docs]class Codec(bpack.codecs.BaseStructCodec):
"""Bitstruct based codec.
Default bit-order: MSB.
"""
baseunits = EBaseUnits.BITS
@staticmethod
def _get_base_codec(descriptor):
byteorder = bpack.byteorder(descriptor)
byteorder = _endianess_to_str(byteorder)
bitorder = bpack.bitorder(descriptor).value
fmt = "".join(
_to_fmt(
field_descr.type,
size=field_descr.size,
bitorder=bitorder,
byteorder=byteorder,
signed=field_descr.signed,
repeat=field_descr.repeat,
)
for field_descr in field_descriptors(descriptor, pad=True)
)
fmt = fmt + byteorder # byte order
return BitStruct(fmt)
@staticmethod
def _get_decode_converters_map(descriptor):
return {
field_descr.type: field_descr.type
for field_descr in field_descriptors(descriptor)
if bpack.utils.is_enum_type(field_descr.type)
}
@staticmethod
def _get_encode_converters_map(descriptor):
def from_enum(x):
return x.value
converters_map = {
field_descr.type: from_enum
for field_descr in field_descriptors(descriptor)
if (
bpack.utils.is_enum_type(field_descr.type)
and not issubclass(field_descr.type, int)
)
}
return converters_map
codec = bpack.codecs.make_codec_decorator(Codec)
Decoder = Encoder = Codec
decoder = encoder = codec
@functools.lru_cache() # @COPMPATIBILITY with Python 3.7
def _get_sequence_codec(
nsamples: int, bits_per_sample, signed=False, byteorder: str = ""
) -> BitStruct:
nbits = nsamples * bits_per_sample
outsize = math.ceil(nbits / 8)
npad = outsize * 8 - nbits
if signed:
fmt = f"s{bits_per_sample:d}" * nsamples
else:
fmt = f"u{bits_per_sample:d}" * nsamples
if npad > 0:
fmt += f"p{npad:d}"
fmt += byteorder
return BitStruct(fmt)
[docs]def packbits(
values, bits_per_sample: int, signed: bool = False, byteorder: str = ""
) -> bytes:
"""Pack integer values using the specified number of bits for each sample.
Converts a sequence of values into a string of bytes in which each
sample is stored according to the specified number of bits.
Example::
4 samples 3 bytes
[samp_1, samp_2, samp_3, samp_4] --> |------|------|------|------|
4 samples (6 bits per sample)
Please note that no check that the input values actually fits in the
specified number of bits is performed is performed.
The function return a sting of bytes including same number of samples
of the input plus possibly some padding bit (at the end) to fill an
integer number of bytes.
If ``signed`` is set to True integers are stored as signed integers.
"""
nsamples = len(values)
if (nsamples * bits_per_sample) % 8:
warnings.warn(
f"packing {nsamples} with {bits_per_sample} bits per "
f"sample requires padding"
)
encoder_ = _get_sequence_codec(
nsamples, bits_per_sample, signed=signed, byteorder=byteorder
)
return encoder_.pack(*values)
[docs]def unpackbits(
data: bytes,
bits_per_sample: int,
signed: bool = False,
byteorder: str = "",
):
"""Unpack packed (integer) values form a string of bytes.
Takes in input a string of bytes in which (integer) samples have been
stored using ``bits_per_sample`` bit for each sample, and returns
the sequence of corresponding Python integers.
Example::
3 bytes 4 samples
|------|------|------|------| --> [samp_1, samp_2, samp_3, samp_4]
4 samples (6 bits per sample)
If ``signed`` is set to True integers are assumed to be stored as
signed integers.
"""
nsamples = len(data) * 8 // bits_per_sample
decoder_ = _get_sequence_codec(
nsamples, bits_per_sample, signed=signed, byteorder=byteorder
)
return decoder_.unpack(data)