"""Numpy based codec for binary data structures."""
import functools
import collections
from typing import NamedTuple, Optional
import numpy as np
import bpack
import bpack.utils
import bpack.codecs
from .enums import EBaseUnits
from .descriptors import (
field_descriptors, get_field_descriptor, BinFieldDescriptor,
)
__all__ = [
'Decoder', 'decoder', 'Encoder', 'encoder', 'Codec', 'codec',
'BACKEND_NAME', 'BACKEND_TYPE',
'descriptor_to_dtype', 'unpackbits',
]
BACKEND_NAME = 'numpy'
BACKEND_TYPE = EBaseUnits.BYTES
def bin_field_descripor_to_dtype(field_descr: BinFieldDescriptor) -> np.dtype:
"""Convert a field descriptor into a :class:`numpy.dtype`.
.. seealso:: :class:`bpack.descriptors.BinFieldDescriptor`.
"""
# TODO: add byteorder
size = field_descr.size
etype = bpack.utils.effective_type(field_descr.type)
typecode = np.dtype(etype).kind
if etype in (bytes, str):
typecode = 'S'
elif etype is int and not field_descr.signed:
typecode = 'u'
if typecode == 'O':
raise TypeError(f'unsupported type: {field_descr.type:!r}')
repeat = field_descr.repeat
repeat = repeat if repeat and repeat > 1 else ''
return np.dtype(f'{repeat}{typecode}{size}')
[docs]def descriptor_to_dtype(descriptor) -> np.dtype:
"""Convert the descriptor of a binary record into a :class:`numpy.dtype`.
Please note that (unicode) strings are treated as "utf-8" encoded
byte strings.
UCS4 encoded strings are not supported.
Sequences (:class:`typing.Sequence` and :class:`typing.List`) are
always converted into :class:`numpy.ndarray`.
.. seealso:: :func:`bpack.descriptors.descriptor`.
"""
params = collections.defaultdict(list)
for field in bpack.fields(descriptor):
field_descr = get_field_descriptor(field)
if bpack.is_descriptor(field_descr.type):
dtype = descriptor_to_dtype(field_descr.type)
else:
dtype = bin_field_descripor_to_dtype(field_descr)
params['names'].append(field.name)
params['formats'].append(dtype)
params['offsets'].append(field_descr.offset)
# params['titles'].append('...')
params = dict(params) # numpy do not accept defaultdict
params['itemsize'] = bpack.calcsize(descriptor)
dt = np.dtype(dict(params))
byteorder = bpack.byteorder(descriptor).value
if byteorder:
dt = dt.newbyteorder(byteorder)
return dt
def _decode_converter_factory(type_):
etype = bpack.utils.effective_type(type_)
if bpack.utils.is_enum_type(type_):
if etype is str:
def converter(x, cls=type_):
# TODO: harmonize with other backends that use 'ascii'
return cls(x.tobytes().decode('utf-8'))
else:
def converter(x, cls=type_):
return cls(x)
elif etype is str:
def converter(x):
# TODO: harmonize with other backends that use 'ascii'
return x.tobytes().decode('utf-8')
elif bpack.is_descriptor(type_):
def converter(x, cls=type_):
return cls(*x)
else:
converter = None
return converter
def _encode_converter_factory(type_):
converter = None
etype = bpack.utils.effective_type(type_)
if bpack.utils.is_enum_type(type_):
if etype is str:
def converter(x):
# TODO: harmonize with other backends that use 'ascii'
return x.value.encode('utf-8')
elif not issubclass(type_, int):
def converter(x):
return x.value
elif etype is str:
def converter(x):
# TODO: harmonize with other backends that use 'ascii'
return x.encode('utf-8')
# TODO: cleanup
# elif bpack.is_descriptor(type_):
# # astuple works recursively so nested descriptors have been
# # already converted into sequences
# #
# # def converter(x):
# # return bpack.astuple(x, tuple_factory=list)
# pass
return converter
[docs]class Codec(bpack.codecs.Codec):
"""Numpy based codec.
(Unicode) strings are treated as "utf-8" encoded byte strings.
UCS4 encoded strings are not supported.
"""
baseunits = EBaseUnits.BYTES
def __init__(self, descriptor):
"""Initializer.
The *descriptor* parameter* is a bpack record descriptor.
"""
super().__init__(descriptor)
assert bpack.bitorder(descriptor) is None
decode_converters = [
(idx, _decode_converter_factory(field_descr.type))
for idx, field_descr in enumerate(field_descriptors(descriptor))
]
encode_converters = [
(idx, _encode_converter_factory(field_descr.type))
for idx, field_descr in enumerate(field_descriptors(descriptor))
]
self._dtype = descriptor_to_dtype(descriptor)
self._decode_converters = [
(idx, func) for idx, func in decode_converters if func
]
self._encode_converters = [
(idx, func) for idx, func in encode_converters if func
]
@property
def dtype(self):
return self._dtype
[docs] def decode(self, data: bytes, count: int = 1):
"""Decode binary data and return a record object."""
v = np.frombuffer(data, dtype=self._dtype, count=count)
if self._decode_converters:
out = []
for item in v:
item = list(item) # fields of the np record
for idx, func in self._decode_converters:
item[idx] = func(item[idx])
out.append(self.descriptor(*item))
else:
out = [self.descriptor(*item) for item in v]
if len(v) == 1:
out = out[0]
return out
[docs] def encode(self, record):
# exploit the recursive behaviour of astuple
values = bpack.astuple(record) # , tuple_factory=list)
values = list(values) # nested record and sequences stay tuples
for idx, func in self._encode_converters:
values[idx] = func(values[idx])
return np.array(tuple(values), dtype=self.dtype).tobytes()
codec = bpack.codecs.make_codec_decorator(Codec)
Decoder = Encoder = Codec
decoder = encoder = codec
# --- bits packing/unpacking --------------------------------------------------
def _get_item_size(bits_per_sample: int) -> int:
"""Item size of the integer type that can take requested bits."""
if bits_per_sample > 64 or bits_per_sample < 1:
raise ValueError(f'bits_per_sample: {bits_per_sample}')
elif bits_per_sample <= 8:
return 1
else:
return 2**int(np.ceil(np.log2(bits_per_sample))-3)
def _get_buffer_size(bits_per_sample: int) -> int:
"""Item size of the integer type that can take requested bits and shift."""
return _get_item_size(bits_per_sample + 7)
def _get_mask(nbits: int, dtype: str) -> np.ndarray:
"""Returns a mask for dtype to select the nbits least significant bits."""
shift = np.array(64 - nbits, dtype=np.uint32)
mask = np.array(0xffffffffffffffff) >> shift
return mask.astype(dtype)
class _BitUnpackParams(NamedTuple):
samples: int
dtype: str
buf_itemsize: int
buf_dtype: str
index_map: np.ndarray
shifts: np.ndarray
mask: np.ndarray
@functools.lru_cache() # COPMPATIBILITY with Python3.7
def _unpackbits_params(nbits: int, bits_per_sample: int,
samples_per_block: int, bit_offset: int,
blockstride: int, signed: bool = False,
byteorder: str = '>') -> _BitUnpackParams:
assert nbits >= bit_offset
if samples_per_block is None:
if blockstride is not None:
raise ValueError(
'"samples_per_block" cannot be computed automatically '
'when "blockstride" is provided')
samples_per_block = (nbits - bit_offset) // bits_per_sample
blocksize = bits_per_sample * samples_per_block
if blockstride is None:
blockstride = blocksize
else:
assert blockstride >= blocksize
nstrides = (nbits - bit_offset) // blockstride
extrabits = nbits - bit_offset - nstrides * blockstride
if extrabits >= blocksize:
nblocks = nstrides + 1
extra_samples = 0
else:
nblocks = nstrides
extra_samples = extrabits // bits_per_sample
assert nblocks >= 0
pad = blockstride - blocksize
sizes = [bit_offset]
if nblocks > 0:
sizes.extend([bits_per_sample] * (samples_per_block - 1))
block_sizes = (
[bits_per_sample + pad] +
[bits_per_sample] * (samples_per_block - 1)
)
sizes.extend(block_sizes * (nblocks - 1))
if extra_samples:
sizes.append(bits_per_sample + pad)
sizes.extend([bits_per_sample] * (extra_samples - 1))
bit_offsets = np.cumsum(sizes)
byte_offsets = bit_offsets // 8
samples = len(bit_offsets)
itemsize = _get_item_size(bits_per_sample)
buf_itemsize = _get_buffer_size(bits_per_sample)
dtype = f'{byteorder}{"i" if signed else "u"}{itemsize}'
buf_dtype = f'{byteorder}{"i" if signed else "u"}{buf_itemsize}'
index = np.arange(buf_itemsize) + byte_offsets[:, None]
index = np.clip(index, 0, nbits // 8 - 1)
mask = _get_mask(bits_per_sample, buf_dtype)
shifts = (bit_offsets - byte_offsets * 8 + bits_per_sample)
shifts = buf_itemsize * 8 - shifts
return _BitUnpackParams(samples=samples,
dtype=dtype,
buf_itemsize=buf_itemsize,
buf_dtype=buf_dtype,
index_map=index,
shifts=shifts,
mask=mask)
[docs]def unpackbits(data: bytes, bits_per_sample: int,
samples_per_block: Optional[int] = None, bit_offset: int = 0,
blockstride: Optional[int] = None, signed: bool = False,
byteorder: str = '>') -> np.ndarray:
"""Unpack packed (integer) values form a string of bytes.
Takes in input a string of bytes in which (integer) samples have been
stored using ``bits_per_sample`` bit for each sample, and returns
the sequence of corresponding Python integers.
Example::
3 bytes 4 samples
|------|------|------|------| --> [samp_1, samp_2, samp_3, samp_4]
4 samples (6 bits per sample)
If ``signed`` is set to True integers are assumed to be stored as
signed integers.
"""
if bit_offset == 0 and blockstride is None:
if bits_per_sample == 1:
return np.unpackbits(np.frombuffer(data, dtype='uint8'))
elif bits_per_sample in {8, 16, 32, 64}:
size = bits_per_sample // 8
kind = "i" if signed else "u"
typestr = f'{byteorder}{kind}{size}'
return np.frombuffer(data, dtype=np.dtype(typestr))
nbits = len(data) * 8
params = _unpackbits_params(nbits, bits_per_sample, samples_per_block,
bit_offset, blockstride, signed, byteorder)
samples, dtype, buf_itemsize, buf_dtype, index_map, shifts, mask = params
npdata = np.frombuffer(data, dtype='u1')
buf = np.empty(samples, dtype=buf_dtype)
bytesview = buf.view(dtype='u1').reshape(samples, buf_itemsize)
bytesview[...] = npdata[index_map]
outdata = ((buf >> shifts) & mask).astype(dtype)
return outdata