Source code for speasy.core.data_containers

from copy import deepcopy, copy
from datetime import datetime, timezone
from sys import getsizeof
from typing import Dict, List, Protocol, TypeVar, Union, Any

import astropy.units
import numpy as np


def _name(input_p: Any) -> str:
    if hasattr(input_p, "name"):
        return input_p.name
    return str(input)


[docs] def np_build_result_name(func, *args, **kwargs): return f"{func.__name__}({', '.join(map(_name, args))}{', ' * bool(kwargs)}{', '.join([f'{k}={_name(v)}' for k, v in kwargs.items()])})"
def _values(input_p: Any) -> Any: if hasattr(input_p, "values"): return input_p.values return input_p def _data_container(input_p: Any) -> 'DataContainer' or None: if hasattr(input_p, "data_container"): return input_p.data_container if isinstance(input_p, DataContainer): return input_p return input_p def _to_index(key, time): if key is None: return None if type(key) in (int, np.int64, np.int32, np.uint64, np.uint32): return key if isinstance(key, float): return np.searchsorted(time, np.datetime64(int(key * 1e9), 'ns'), side='left') if isinstance(key, datetime): without_tz = key.astimezone(timezone.utc).replace(tzinfo=None) return np.searchsorted(time, np.datetime64(without_tz, 'ns'), side='left') if isinstance(key, np.datetime64): return np.searchsorted(time, key, side='left') T = TypeVar("T") # keep until we drop python 3.11 support
[docs] def numpy_supported(cls): def __ge__(self, other:Any) -> Union[bool, np.ndarray]: return np.greater_equal(self, other) def __gt__(self, other:Any) -> Union[bool, np.ndarray]: return np.greater(self, other) def __le__(self, other:Any) -> Union[bool, np.ndarray]: return np.less_equal(self, other) def __lt__(self, other:Any) -> Union[bool, np.ndarray]: return np.less(self, other) def __add__(self, other:Any) -> 'DataContainer': return np.add(self, other) def __radd__(self, other:Any) -> 'DataContainer': return np.add(other, self) def __sub__(self, other:Any) -> 'DataContainer': return np.subtract(self, other) def __rsub__(self, other:Any) -> 'DataContainer': return np.subtract(other, self) def __mul__(self, other:Any) -> 'DataContainer': return np.multiply(self, other) def __rmul__(self, other:Any) -> 'DataContainer': return np.multiply(other, self) def __truediv__(self, other:Any) -> 'DataContainer': return np.divide(self, other) def __rtruediv__(self, other:Any) -> 'DataContainer': return np.divide(other, self) def __pow__(self, other:Any) -> 'DataContainer': return np.power(self, other) cls.__ge__ = __ge__ cls.__gt__ = __gt__ cls.__le__ = __le__ cls.__lt__ = __lt__ cls.__add__ = __add__ cls.__radd__ = __radd__ cls.__sub__ = __sub__ cls.__rsub__ = __rsub__ cls.__mul__ = __mul__ cls.__rmul__ = __rmul__ cls.__truediv__ = __truediv__ cls.__rtruediv__ = __rtruediv__ cls.__pow__ = __pow__ return cls
[docs] class DataContainerProtocol(Protocol[T]):
[docs] def select(self, indices, inplace=False) -> T: ...
[docs] def to_dictionary(self, array_to_list=False) -> Dict[str, object]: ...
[docs] @staticmethod def from_dictionary(dictionary: Dict[str, Union[str, Dict[str, str], List]], dtype=np.float64) -> T: ...
[docs] @staticmethod def reserve_like(other: T, length: int = 0) -> T: ...
def __getitem__(self, key) -> T: ... def __setitem__(self, k, v: Union[T, float, int]): ... def __len__(self) -> int: ... def __eq__(self, other: Union[T, float, int]) -> Union[bool, np.ndarray]: ... @property def unit(self) -> str: ... @property def is_time_dependent(self) -> bool: ... @property def values(self) -> np.array: ... @property def shape(self): ... @property def name(self) -> str: ... @property def nbytes(self) -> int: ... @property def dtype(self)-> np.dtype: ...
[docs] def view(self, index_range: Union[slice, np.ndarray]) -> T: ...
[docs] def astype(self, dtype) -> T: ...
[docs] @numpy_supported class DataContainer(DataContainerProtocol['DataContainer']): __slots__ = ['__values', '__name', '__meta', '__is_time_dependent'] __LIKE_NP_FUNCTIONS__ = {'zeros_like', 'empty_like', 'ones_like'} def __init__(self, values: np.array, meta: Dict = None, name: str = None, is_time_dependent: bool = True): if not isinstance(values, np.ndarray): raise ValueError(f"values must be a numpy array, got {type(values)}") self.__values = values self.__is_time_dependent = is_time_dependent self.__name = name or "" self.__meta = meta or {}
[docs] def reshape(self, new_shape) -> "DataContainer": self.__values = self.__values.reshape(new_shape) return self
[docs] def select(self, indices, inplace=False) -> "DataContainer": if inplace: res = self else: res = deepcopy(self) res.__values = res.__values[indices]
@property def is_time_dependent(self) -> bool: return self.__is_time_dependent @property def values(self) -> np.array: return self.__values @property def shape(self): return self.__values.shape @property def ndim(self): return self.__values.ndim @property def dtype(self): return self.__values.dtype
[docs] def astype(self, dtype) -> "DataContainer": return DataContainer(values=self.__values.astype(dtype), meta=copy(self.__meta), name=self.__name, is_time_dependent=self.__is_time_dependent)
@property def unit(self) -> str: return self.__meta.get('UNITS') @property def nbytes(self) -> int: return self.__values.nbytes + getsizeof(self.__meta) + getsizeof(self.__name)
[docs] def view(self, index_range: Union[slice, np.ndarray]): return DataContainer(name=self.__name, meta=self.__meta, values=self.__values[index_range], is_time_dependent=self.__is_time_dependent)
[docs] def unit_applied(self, unit: str or None = None) -> "DataContainer": try: u = astropy.units.Unit(unit or self.unit) except (ValueError, KeyError): u = astropy.units.Unit("") return DataContainer(values=self.__values * u, meta=self.__meta, name=self.__name, is_time_dependent=self.__is_time_dependent)
[docs] def to_dictionary(self, array_to_list=False) -> Dict[str, object]: return { "values": self.__values.tolist() if array_to_list else self.__values.copy(), "meta": self.__meta.copy(), "name": self.__name, "is_time_dependent": self.is_time_dependent, "values_type": str(self.__values.dtype) }
[docs] @staticmethod def from_dictionary(dictionary: Dict[str, Union[str, Dict[str, str], List]], dtype=np.float64) -> "DataContainer": try: return DataContainer( values=np.array(dictionary["values"], dtype=dictionary.get("values_type", dtype)), meta=dictionary["meta"], name=dictionary["name"], is_time_dependent=dictionary["is_time_dependent"] ) except ValueError: return DataContainer( values=np.array(dictionary["values"]), meta=dictionary["meta"], name=dictionary["name"], is_time_dependent=dictionary["is_time_dependent"] )
[docs] @staticmethod def reserve_like(other: 'DataContainer', length: int = 0) -> 'DataContainer': return DataContainer(name=other.__name, meta=other.__meta, values=np.empty( (length,) + other.shape[1:], dtype=other.__values.dtype), is_time_dependent=other.__is_time_dependent )
[docs] @staticmethod def zeros_like(other: 'DataContainer') -> 'DataContainer': return DataContainer(name=other.__name, meta=other.__meta, values=np.zeros_like(other.__values, dtype=other.__values.dtype), is_time_dependent=other.__is_time_dependent )
[docs] @staticmethod def ones_like(other: 'DataContainer') -> 'DataContainer': return DataContainer(name=other.__name, meta=other.__meta, values=np.ones_like(other.__values, dtype=other.__values.dtype), is_time_dependent=other.__is_time_dependent )
[docs] @staticmethod def empty_like(other: 'DataContainer') -> 'DataContainer': return DataContainer(name=other.__name, meta=other.__meta, values=np.empty_like(other.__values, dtype=other.__values.dtype), is_time_dependent=other.__is_time_dependent )
[docs] def copy(self, name=None): return DataContainer(name=name or self.__name, meta=deepcopy(self.__meta), values=deepcopy(self.__values), is_time_dependent=self.__is_time_dependent)
def __len__(self): return len(self.__values) def __getitem__(self, key): return self.view(key) def __setitem__(self, k, v: Union['DataContainer', float, int]): if type(v) is DataContainer: self.__values[k] = v.__values else: self.__values[k] = v def __eq__(self, other: Union['DataContainer', float, int]) -> Union[bool, np.ndarray]: if type(other) is DataContainer: return self.__meta == other.__meta and \ self.__name == other.__name and \ self.is_time_dependent == other.is_time_dependent and \ np.all(self.__values.shape == other.__values.shape) and \ np.array_equal(self.__values, other.__values, equal_nan=np.issubdtype(self.__values.dtype, np.floating)) else: return self.__values.__eq__(other) def __array_function__(self, func, types, args, kwargs): if func.__name__ in DataContainer.__LIKE_NP_FUNCTIONS__: return DataContainer.__dict__[func.__name__].__func__(self) if 'out' in kwargs: raise ValueError("out parameter is not supported") f_args = [_values(arg) for arg in args] f_kwargs = {name: _values(value) for name, value in kwargs.items()} res = func(*f_args, **f_kwargs) if np.isscalar(res): return res else: return DataContainer(values=res, meta=deepcopy(self.__meta), name=np_build_result_name(func, *args, **kwargs), is_time_dependent=self.__is_time_dependent) def __array_ufunc__(self, ufunc, method, *inputs, out: 'DataContainer' or None = None, **kwargs): if out is not None: _out = out[0].values else: _out = None res = ufunc(*list(map(_values, inputs)), **{name: _values(value) for name, value in kwargs}, out=_out) if _out is not None: return _out else: return DataContainer(values=res, meta=deepcopy(self.__meta), name=self.__name, is_time_dependent=self.__is_time_dependent) @property def meta(self): return self.__meta @property def name(self): return self.__name
[docs] class VariableAxis(DataContainerProtocol['VariableAxis']): __slots__ = ['__data'] __LIKE_NP_FUNCTIONS__ = {'zeros_like', 'empty_like', 'ones_like'} def __init__(self, values: np.array = None, meta: Dict = None, name: str = "", is_time_dependent: bool = False, data: DataContainer = None): if data is not None: self.__data = data else: self.__data = DataContainer( values=values, name=name, meta=meta, is_time_dependent=is_time_dependent)
[docs] def zeros_like(self) -> "VariableAxis": return VariableAxis(data=DataContainer.zeros_like(self.__data))
[docs] def ones_like(self) -> "VariableAxis": return VariableAxis(data=DataContainer.ones_like(self.__data))
[docs] def empty_like(self) -> "VariableAxis": return VariableAxis(data=DataContainer.empty_like(self.__data))
[docs] def to_dictionary(self, array_to_list=False) -> Dict[str, object]: d = self.__data.to_dictionary(array_to_list=array_to_list) d.update({"type": "VariableAxis"}) return d
[docs] def select(self, indices, inplace=False) -> "VariableAxis": if inplace: res = self else: res = deepcopy(self) res.__data.select(indices, inplace=True) return res
[docs] @staticmethod def from_dictionary(dictionary: Dict[str, Union[str, Dict[str, str], List]], time=None) -> "VariableAxis": assert dictionary['type'] == "VariableAxis" return VariableAxis(data=DataContainer.from_dictionary(dictionary))
[docs] @staticmethod def reserve_like(other: 'VariableAxis', length: int = 0) -> 'VariableAxis': return VariableAxis(data=DataContainer.reserve_like(other.__data, length))
def __getitem__(self, key): if isinstance(key, slice): return self.view(slice(_to_index(key.start, self.__data.values), _to_index(key.stop, self.__data.values))) else: return self.view(key) def __setitem__(self, k, v: Union['VariableAxis', float, int]): if type(v) is VariableAxis: self.__data[k] = v.__data else: self.__data[k] = v def __len__(self): return len(self.__data)
[docs] def view(self, index_range: Union[slice, np.ndarray]) -> 'VariableAxis': return VariableAxis(data=self.__data[index_range])
def __eq__(self, other: 'VariableAxis') -> bool: return type(other) is VariableAxis and self.__data == other.__data def __array_function__(self, func, types, args, kwargs): if func.__name__ in VariableAxis.__LIKE_NP_FUNCTIONS__: return VariableAxis.__dict__[func.__name__].__func__(self) if 'out' in kwargs: raise ValueError("out parameter is not supported") f_args = [_data_container(arg) for arg in args] f_kwargs = {name: _data_container(value) for name, value in kwargs.items()} r = func(*f_args, **f_kwargs) if isinstance(r, DataContainer): return VariableAxis(data=r) return r def __array_ufunc__(self, ufunc, method, *inputs, out: 'VariableAxis' or None = None, **kwargs): if out is not None: _out = out[0].__data else: _out = None res = self.__data.__array_ufunc__(ufunc, method, *inputs, out=_out, **kwargs) if _out is not None: return out[0] if isinstance(res, DataContainer): return VariableAxis(data=res) return res
[docs] def astype(self, dtype) -> 'VariableAxis': return VariableAxis(data=self.__data.astype(dtype))
@property def unit(self) -> str: return self.__data.unit @property def is_time_dependent(self) -> bool: return self.__data.is_time_dependent @property def values(self) -> np.array: return self.__data.values @property def data_container(self) -> DataContainer: return self.__data @property def shape(self): return self.__data.shape @property def name(self) -> str: return self.__data.name @property def nbytes(self) -> int: return self.__data.nbytes @property def meta(self) -> Dict: return self.__data.meta @property def dtype(self)-> np.dtype: return self.__data.dtype
[docs] @numpy_supported class VariableTimeAxis(DataContainerProtocol['VariableTimeAxis']): __slots__ = ['__data'] __LIKE_NP_FUNCTIONS__ = {'zeros_like', 'empty_like', 'ones_like'} def __init__(self, values: np.array = None, meta: Dict = None, name: str = "time", data: DataContainer = None): if data is not None: self.__data = data else: if values.dtype != np.dtype('datetime64[ns]'): raise ValueError( f"Please provide datetime64[ns] for time axis, got {values.dtype}") self.__data = DataContainer( values=values, name=name, meta=meta, is_time_dependent=True)
[docs] def to_dictionary(self, array_to_list=False) -> Dict[str, object]: d = self.__data.to_dictionary(array_to_list=array_to_list) d.update({"type": "VariableTimeAxis"}) return d
[docs] def select(self, indices, inplace=False) -> "VariableTimeAxis": if inplace: res = self else: res = deepcopy(self) res.__data.select(indices, inplace=True) return res
[docs] def zeros_like(self) -> "VariableTimeAxis": return VariableTimeAxis(data=DataContainer.zeros_like(self.__data))
[docs] def ones_like(self) -> "VariableTimeAxis": return VariableTimeAxis(data=DataContainer.ones_like(self.__data))
[docs] def empty_like(self) -> "VariableTimeAxis": return VariableTimeAxis(data=DataContainer.empty_like(self.__data))
@property def shape(self): return self.__data.shape
[docs] def astype(self, dtype) -> 'VariableTimeAxis': return VariableTimeAxis(data=self.__data.astype(dtype))
[docs] @staticmethod def from_dictionary(dictionary: Dict[str, Union[str, Dict[str, str], List]], time=None) -> "VariableTimeAxis": assert dictionary['type'] == "VariableTimeAxis" return VariableTimeAxis(data=DataContainer.from_dictionary(dictionary, dtype=np.dtype('datetime64[ns]')))
[docs] @staticmethod def reserve_like(other: 'VariableTimeAxis', length: int = 0) -> 'VariableTimeAxis': return VariableTimeAxis(data=DataContainer.reserve_like(other.__data, length))
def __getitem__(self, key): return self.view(key) def __setitem__(self, k, v: Union['VariableTimeAxis', float, int]): if type(v) is VariableTimeAxis: self.__data[k] = v.__data else: self.__data[k] = v def __len__(self): return len(self.__data) def __array_function__(self, func, types, args, kwargs): if func.__name__ in VariableTimeAxis.__LIKE_NP_FUNCTIONS__: return VariableTimeAxis.__dict__[func.__name__].__func__(self) if 'out' in kwargs: raise ValueError("out parameter is not supported") f_args = [_data_container(arg) for arg in args] f_kwargs = {name: _data_container(value) for name, value in kwargs.items()} res = func(*f_args, **f_kwargs) if isinstance(res, DataContainer): return VariableTimeAxis(data=res) return res def __array_ufunc__(self, ufunc, method, *inputs, out: 'VariableTimeAxis' or None = None, **kwargs): if out is not None: _out = _data_container(out[0]) else: _out = None res = self.__data.__array_ufunc__(ufunc, method, *inputs, out=_out, **kwargs) if _out is not None: return out[0] if isinstance(res, DataContainer): return VariableTimeAxis(data=res) return res
[docs] def view(self, index_range: Union[slice, np.ndarray]) -> 'VariableTimeAxis': return VariableTimeAxis(data=self.__data[index_range])
def __eq__(self, other: 'VariableTimeAxis') -> bool: return type(other) is VariableTimeAxis and self.__data == other.__data @property def is_time_dependent(self) -> bool: return True @property def values(self) -> np.array: return self.__data.values @property def data_container(self) -> DataContainer: return self.__data @property def unit(self) -> str: return 'ns' @property def name(self) -> str: return self.__data.name @property def nbytes(self) -> int: return self.__data.nbytes @property def meta(self) -> Dict: return self.__data.meta @property def dtype(self) -> np.dtype: return self.__data.dtype