Source code for scistag.filestag.bundle

"""
Helps to bundle a list of common data types such as bytes, dictionaries
and numpy arrays in a zip file.
"""
from __future__ import annotations

import json
import zipfile
from dataclasses import dataclass
from typing import Any, Callable, Literal, Optional

from pydantic import BaseModel

from scistag.common.mt.stag_lock import StagLock
from scistag.filestag.memory_zip import MemoryZip

IS_TUPLE_FLAG = "is_tuple"
"""
A flag which is attached to a list property if it was originally a tuple and
need to be backconverted upon reading.
"""

BI_SIMPLE_ELEMENT_FLAG = "__bi_"
"""
Additional entry which can be stored in the simple_elements list to further 
specify details about a simple element if required
"""

TUPLE = "tuple"
"Defines the input/output type tuple"
LIST = "list"
"Defines the input/output type list"
DICT = "dict"
"Defines the input/output type dict"

BUNDLE_INFO_NAME = "__bundle_info.json"
"Filename of the bundle info within the zip archive"


[docs]class BundleElementInfo(BaseModel): """ Defines a single bundle element """ version: int = 1 "The protocol version" data_type: Optional[str] = None """ The object's data type. Only needs to be provided if the data type is advanced, e.g. not "just" an integer or string, otherwise the type is derived from data. """ properties: Optional[dict] = None "Optional advanced properties"
_simple_types = [str, float, int, bool, dict, list, tuple] "The simple data types which do not need an own file to be stored in"
[docs]class BundleInfo(BaseModel): """ Defines the bundle information """ source_type: Literal["dict", "tuple", "list"] "The list of keys in their original order" keys: list[str] "The source type" adv_elements: dict[str, BundleElementInfo] "The advanced elements and their descriptor" simple_elements: dict[str, Any] "The simple elements which can make use of direct key value storage" version: int = 1 "The protocol version" recursive = False """ Defines if the elements of the first level e.g. dictionaries and lists may contain further, bundled advanced types such as np.arrays. """
[docs]@dataclass class BundlingOptions: """ Options for bundling the data """ recursive = False "Defines if the data shall be bundled recursive (not supported yet)"
[docs]@dataclass class UnpackOptions: """ Options for bundling the data """ recursive = False "Defines if the data shall be bundled recursive (not supported yet)"
BundleToBytesCallback = Callable[[Any, BundlingOptions], tuple[str, bytes]] """ Function definition for plugins which help converting data types of various kinds to bytes. """ UnpackFromBytesCallback = Callable[[bytes, UnpackOptions], Any] """ Helper function for converting data back from bytes to their original type """
[docs]class Bundle: """ The bundle class is able to store a list, a tuple or a dictionary in a zip file. In case of a dictionary only the "first" level can be used for the storage of "advanced" data types such as images, DataFrames, numpy arrays etc. With help of the Bundle class function parameters can easily be passed between different processes, clients and servers without the need to pickle them and in consequence without being bound to fully compatible module versions by using common interchange formats such as parquet files or json files. """ _access_lock = StagLock() "Configuration access lock" _base_types_registered = False "Defines if the base types were registered" _bundlers: dict[type, BundleToBytesCallback] = {} """ Registered functions to convert an of type key to it's bytes representation """ _unpackers: dict[str, UnpackFromBytesCallback] = {} """ Registered functions to convert an object from a known type and it's bytes representation back to it's original form such as a dictionary, numpy array or pandas DataFrame """
[docs] @classmethod def bundle(cls, elements: dict[str, Any] | list[Any] | tuple, compression: int | None = None) -> bytes: """ Stores a dictionary, a list or a tuple in a zip file in memory and returns its bytes string representation which can then for example be passed to another process, client or server and extracted there or just be dumped to a database or a disk. :param elements: The elements to be stored. Supported data types are (w/o extensions which may be registered) dictionaries, lists, tuples, strings, floats, booleans, Pandas DataFrames, DataSeries, numpy arrays and byte strings. :param compression: The compression level (from 0 to 99) (fast to small) :return: The bytes dump of the zip archive. """ cls._ensure_base_types() options = BundlingOptions() if compression is None: compression = 10 comp_level = min(max((compression // 10), 0), 9) comp_method = (zipfile.ZIP_STORED if comp_level == 0 else zipfile.ZIP_DEFLATED) with MemoryZip(compresslevel=comp_level, compression=comp_method) as mem_zip: source_type = (DICT if isinstance(elements, dict) else LIST if isinstance(elements, list) else TUPLE if isinstance(elements, tuple) else None) if source_type is None: raise ValueError("Unsupported source type") keys = [] simple = {} advanced = {} if not isinstance(elements, dict): # convert to dict if necessary new_elements = {} for index, data in enumerate(list(elements)): new_elements[f"__{index:04d}__"] = data elements = new_elements for key, element in elements.items(): keys.append(key) if type(element) in _simple_types: # store basic types directly simple[key] = element if isinstance(element, tuple): # remember original type simple[BI_SIMPLE_ELEMENT_FLAG + key] = { IS_TUPLE_FLAG: True} continue data_type, byte_data = cls.to_bytes(element, options=options) advanced[key] = BundleElementInfo( data_type=data_type) mem_zip.writestr(key, byte_data) bundle_info = BundleInfo(source_type=source_type, keys=keys, simple_elements=simple, adv_elements=advanced) mem_zip.writestr(BUNDLE_INFO_NAME, json.dumps(bundle_info.json()).encode("utf-8")) return mem_zip.to_bytes()
[docs] @classmethod def unpack(cls, data: bytes) -> dict[str, Any] | list[Any] | tuple: """ Unpacks a previously bundled data package to it's original form :param data: The data to unpack (as returned by :meth:`bundle`) :return: The dictionary, tuple or list containing the bundled objects """ if data is None: raise ValueError("data is None") cls._ensure_base_types() options = UnpackOptions() with MemoryZip(data) as mem_zip: if BUNDLE_INFO_NAME not in mem_zip.NameToInfo: raise AssertionError("Could not find bundle info") data = mem_zip.read(BUNDLE_INFO_NAME).decode("utf-8") data = json.loads(data) bundle_info: BundleInfo = BundleInfo.parse_raw(data) result = {} # reconstruct all objects result_elements = [] simple = bundle_info.simple_elements adv = bundle_info.adv_elements for key in bundle_info.keys: if key in simple: data = simple[key] if isinstance(data, list): add_info_name = BI_SIMPLE_ELEMENT_FLAG + key # check special flags, e.g. list to tuple conversion if add_info_name in simple: if simple[add_info_name].get(IS_TUPLE_FLAG, False): data = tuple(data) result[key] = data result_elements.append(data) else: byte_stream = mem_zip.read(key) rec_object = cls.from_bytes(data_type=adv[key].data_type, data=byte_stream, options=options) result[key] = rec_object result_elements.append(rec_object) st = bundle_info.source_type if st == DICT: # just a dict? we're done return result elif st == LIST: return result_elements elif st == TUPLE: return tuple(result_elements) else: raise NotImplementedError( f"The return type {st} is not supported")
[docs] @classmethod def to_bytes(cls, element: Any, options: BundlingOptions) -> \ tuple[str, bytes]: """ Converts an element to its storable bytes representation :param element: The element to convert to bytes :param options: Advanced bundling options :return: The data type to be stored in the bundle and the bytes string """ el_type = type(element) bundler = None with cls._access_lock: if el_type in cls._bundlers: bundler = cls._bundlers[el_type] if bundler is None: for cur_type, cur_bundler in cls._bundlers.items(): if isinstance(element, cur_type): bundler = cur_bundler break if bundler is None: raise NotImplementedError( f"No bundler found for data type {str(el_type)}") return bundler(element, options)
[docs] @classmethod def from_bytes(cls, data_type: str, data: bytes, options: UnpackOptions) -> Any: """ Converts an object from it's byte representation back to its normal form. :param data_type: The data type as returned from to_bytes :param data: The data :param options: Advanced unpacking options :return: The reconstructed object """ with cls._access_lock: if data_type not in cls._unpackers: return NotImplementedError( f"No unpacker found for data type {data_type}") unpacker = cls._unpackers[data_type] return unpacker(data, options)
[docs] @classmethod def register_bundler(cls, data_type: type, callback: BundleToBytesCallback): """ Registers a new bundling helper function :param data_type: The data type to look out for :param callback: The function be called to bundle the data """ with cls._access_lock: cls._bundlers[data_type] = callback
[docs] @classmethod def register_unpacker(cls, data_type: str, callback: UnpackFromBytesCallback): """ Registers a new unpacking helper function :param data_type: The data type as string to look out for (as stored in the dictionary of the bundled data and returned by the corresponding BundleToBytesCallback) :param callback: The function be called to unpack the data """ with cls._access_lock: cls._unpackers[data_type] = callback
[docs] @classmethod def _ensure_base_types(cls): """ Ensures that all base types are registered """ with cls._access_lock: if not cls._base_types_registered: cls._base_types_registered = True _register_base_types()
[docs]def _register_base_types(): """ Registers the base types which can be bundled """ # bytes packing and unpacking Bundle.register_bundler(bytes, lambda data, options: ("bytes", data)) Bundle.register_unpacker("bytes", lambda data, options: data) from .bundlers.numpy_bundler import NumpyBundler from .bundlers.dataframe_bundler import DataFrameBundler, DataSeriesBundler import numpy as np import pandas as pd # Numpy array Bundle.register_bundler(np.ndarray, NumpyBundler.bundle) Bundle.register_unpacker(NumpyBundler.__name__, NumpyBundler.unpack) # Pandas DataFrame Bundle.register_bundler(pd.DataFrame, DataFrameBundler.bundle) Bundle.register_unpacker(DataFrameBundler.__name__, DataFrameBundler.unpack) # Pandas Series Bundle.register_bundler(pd.Series, DataSeriesBundler.bundle) Bundle.register_unpacker(DataSeriesBundler.__name__, DataSeriesBundler.unpack)