Source code for bpack.bs

"""Bitstruct based codec for binary data structures."""

import math
import warnings
import functools
from typing import Optional

try:
    import cbitstruct as bitstruct
except ImportError:
    import bitstruct

    try:
        import bitstruct.c
    except ImportError:
        pass

import bpack
import bpack.utils
import bpack.codecs

from .enums import EBaseUnits, EByteOrder
from .codecs import has_codec, get_codec
from .descriptors import field_descriptors

__all__ = [
    "Decoder",
    "decoder",
    "Encoder",
    "encoder",
    "Codec",
    "codec",
    "BACKEND_NAME",
    "BACKEND_TYPE",
    "packbits",
    "unpackbits",
]


BACKEND_NAME = "bitstruct"
BACKEND_TYPE = EBaseUnits.BITS


class BitStruct:
    @staticmethod
    def _simplified_fmt(format_: str) -> Optional[str]:
        fmt = format_.replace(">", "")
        if "<" in fmt:
            return None
        else:
            return fmt

    def __init__(self, format_: str, names=None):
        codec_ = None
        if hasattr(bitstruct, "c"):
            fmt = self._simplified_fmt(format_)
            if fmt is not None:
                try:
                    codec_ = bitstruct.c.compile(fmt, names)
                except NotImplementedError:
                    pass

        if codec_ is None:
            codec_ = bitstruct.compile(format_, names)

        self._bitstruct = codec_
        self._format: str = format_

    @property
    def format(self) -> str:  # noqa: A003
        return self._format

    def __getattr__(self, name):
        return getattr(self._bitstruct, name)


_TYPE_TO_STR = {
    bool: "b",
    int: "u",
    (int, False): "u",
    (int, True): "s",
    float: "f",
    bytes: "r",
    str: "t",
    None: "p",
}


def _format_string_without_order(fmt: str, order: str) -> str:
    # NOTE: in the current implementation the byte order is handled
    #       externally to _to_fmt
    if order != "":
        fmt = fmt[:-1] if fmt.endswith(order) else fmt
    return fmt


def _to_fmt(
    type_,
    size: int,
    bitorder: str = "",
    byteorder: str = "",
    signed: Optional[bool] = None,
    repeat: Optional[int] = None,
) -> str:
    assert size > 0, f"invalid size: {size:r}"
    assert bitorder in ("", ">", "<"), f"invalid order: {bitorder:r}"
    if repeat is None:
        repeat = 1
    assert repeat > 0, f"invalid repeat: {repeat:r}"

    if has_codec(type_, bpack.codecs.Decoder):
        decoder_ = get_codec(type_)
        if isinstance(decoder_, Decoder):
            return _format_string_without_order(decoder_.format, byteorder)
    elif (
        bpack.is_descriptor(type_)
        and bpack.baseunits(type_) is Decoder.baseunits
    ):
        decoder_ = Decoder(type_)
        return _format_string_without_order(decoder_.format, byteorder)

    etype = bpack.utils.effective_type(type_)
    key = (etype, signed) if etype is int and signed is not None else etype

    try:
        fmt = f"{bitorder}{_TYPE_TO_STR[key]}{size}" * repeat
    except KeyError:
        raise TypeError(f"unsupported type: {etype:!r}")

    # fmt += byteorder  # NOTE: handled externally

    return fmt


def _endianess_to_str(order: EByteOrder) -> str:
    if order is EByteOrder.NATIVE:
        return EByteOrder.get_native().value
    return order.value


[docs] class Codec(bpack.codecs.BaseStructCodec): """Bitstruct based codec. Default bit-order: MSB. """ baseunits = EBaseUnits.BITS @staticmethod def _get_base_codec(descriptor): byteorder = bpack.byteorder(descriptor) byteorder = _endianess_to_str(byteorder) bitorder = bpack.bitorder(descriptor).value fmt = "".join( _to_fmt( field_descr.type, size=field_descr.size, bitorder=bitorder, byteorder=byteorder, signed=field_descr.signed, repeat=field_descr.repeat, ) for field_descr in field_descriptors(descriptor, pad=True) ) fmt = fmt + byteorder # byte order return BitStruct(fmt) @staticmethod def _get_decode_converters_map(descriptor): return { field_descr.type: field_descr.type for field_descr in field_descriptors(descriptor) if bpack.utils.is_enum_type(field_descr.type) } @staticmethod def _get_encode_converters_map(descriptor): def from_enum(x): return x.value converters_map = { field_descr.type: from_enum for field_descr in field_descriptors(descriptor) if ( bpack.utils.is_enum_type(field_descr.type) and not issubclass(field_descr.type, int) ) } return converters_map
codec = bpack.codecs.make_codec_decorator(Codec) Decoder = Encoder = Codec decoder = encoder = codec @functools.lru_cache def _get_sequence_codec( nsamples: int, bits_per_sample, signed=False, byteorder: str = "" ) -> BitStruct: nbits = nsamples * bits_per_sample outsize = math.ceil(nbits / 8) npad = outsize * 8 - nbits if signed: fmt = f"s{bits_per_sample:d}" * nsamples else: fmt = f"u{bits_per_sample:d}" * nsamples if npad > 0: fmt += f"p{npad:d}" fmt += byteorder return BitStruct(fmt)
[docs] def packbits( values, bits_per_sample: int, signed: bool = False, byteorder: str = "" ) -> bytes: """Pack integer values using the specified number of bits for each sample. Converts a sequence of values into a string of bytes in which each sample is stored according to the specified number of bits. Example:: 4 samples 3 bytes [samp_1, samp_2, samp_3, samp_4] --> |------|------|------|------| 4 samples (6 bits per sample) Please note that no check that the input values actually fits in the specified number of bits is performed is performed. The function return a sting of bytes including same number of samples of the input plus possibly some padding bit (at the end) to fill an integer number of bytes. If ``signed`` is set to True integers are stored as signed integers. """ nsamples = len(values) if (nsamples * bits_per_sample) % 8: warnings.warn( f"packing {nsamples} with {bits_per_sample} bits per " f"sample requires padding" ) encoder_ = _get_sequence_codec( nsamples, bits_per_sample, signed=signed, byteorder=byteorder ) return encoder_.pack(*values)
[docs] def unpackbits( data: bytes, bits_per_sample: int, signed: bool = False, byteorder: str = "", ): """Unpack packed (integer) values form a string of bytes. Takes in input a string of bytes in which (integer) samples have been stored using ``bits_per_sample`` bit for each sample, and returns the sequence of corresponding Python integers. Example:: 3 bytes 4 samples |------|------|------|------| --> [samp_1, samp_2, samp_3, samp_4] 4 samples (6 bits per sample) If ``signed`` is set to True integers are assumed to be stored as signed integers. """ nsamples = len(data) * 8 // bits_per_sample decoder_ = _get_sequence_codec( nsamples, bits_per_sample, signed=signed, byteorder=byteorder ) return decoder_.unpack(data)