Source code for speasy.core.codecs.bundled_codecs.istp_cdf

from typing import List, AnyStr, Optional, Mapping, Union
import io
import re
import logging

from datetime import timedelta
import numpy as np

import pyistp
from pyistp.support_data_variable import SupportDataVariable
import pycdfpp

from speasy.core.codecs import CodecInterface, register_codec, Buffer
from speasy.core.any_files import any_loc_open
from speasy.core.url_utils import urlparse, is_local_file
from speasy.core.cache import CacheCall
from speasy.products import SpeasyVariable, VariableAxis, VariableTimeAxis, DataContainer

log = logging.getLogger(__name__)
_PTR_rx = re.compile(r".*_PTR(_\d+)?")


def _fix_value_type(value):
    if type(value) in (str, int, float):
        return value
    if type(value) is list:
        return [_fix_value_type(sub_v) for sub_v in value]
    if type(value) is bytes:
        return value.decode('utf-8')
    return str(value)


def _fix_attributes_types(attributes: dict):
    cleaned = {}
    for key, value in attributes.items():
        cleaned[key] = _fix_value_type(value)
    return cleaned


def _is_time_dependent(axis, time_axis_name):
    if axis.attributes.get('DEPEND_TIME', '') == time_axis_name:
        return not axis.is_nrv
    if axis.attributes.get('DEPEND_0', '') == time_axis_name:
        return not axis.is_nrv
    return False


def _display_type(variable: pyistp.loader.DataVariable) -> str:
    if 'DISPLAY_TYPE' in variable.attributes:
        return variable.attributes['DISPLAY_TYPE']
    if 'display_type' in variable.attributes:
        return variable.attributes['display_type']
    return ''


def _make_axis(axis, time_axis_name):
    return VariableAxis(values=axis.values.copy(), meta=_fix_attributes_types(axis.attributes), name=axis.name,
                        is_time_dependent=_is_time_dependent(axis, time_axis_name))


def _build_labels(variable: pyistp.loader.DataVariable):
    if len(variable.values.shape) != 2:
        return _fix_value_type(variable.labels)
    if type(variable.labels) is list and len(variable.labels) == variable.values.shape[1]:
        return _fix_value_type(variable.labels)
    if type(variable.labels) is list and len(variable.labels) == 1:
        return [f"{variable.labels[0]}[{i}]" for i in range(variable.values.shape[1])]
    return [f"component_{i}" for i in range(variable.values.shape[1])]


def _filter_extra_axes(variable: pyistp.loader.DataVariable) -> List[SupportDataVariable]:
    return variable.axes[1:]


def _valid_variable_or_none(variable: SpeasyVariable) -> Optional[SpeasyVariable]:
    if len(variable) == 1 and variable.time[0] < np.datetime64('1900-01-01'):  # handle fill values in epoch
        return None
    return variable


def _load_variable(istp_loader: pyistp.loader.ISTPLoader, variable) -> SpeasyVariable or None:
    if variable in istp_loader.data_variables():
        var = istp_loader.data_variable(variable)
    elif variable.replace('-', '_') in istp_loader.data_variables():  # THX CSA/ISTP
        var = istp_loader.data_variable(variable.replace('-', '_'))
    else:  # CDA https://cdaweb.gsfc.nasa.gov/WebServices/REST/#Get_Data_GET
        alternative = re.sub(r"[\\/.%!@#^&*()\-+=`~|?<> ]", "$", variable)
        if alternative in istp_loader.data_variables():
            var = istp_loader.data_variable(alternative)
        else:
            return None
    if (var is not None) and (var.values.shape[0] == var.axes[0].values.shape[0]):
        time_axis_name = var.axes[0].name
        return _valid_variable_or_none(SpeasyVariable(
            axes=[VariableTimeAxis(values=var.axes[0].values.copy(),
                                   meta=_fix_attributes_types(var.axes[0].attributes))] + [
                     _make_axis(axis, time_axis_name) for axis in _filter_extra_axes(var)],
            values=DataContainer(values=var.values.copy(), meta=_fix_attributes_types(var.attributes),
                                 name=var.name,
                                 is_time_dependent=True),
            columns=_build_labels(var)))
    return None


def _load_variables(variables, file=None, buffer=None, master_file=None, master_buffer=None) -> SpeasyVariable or None:
    istp_loader = pyistp.load(file=file, buffer=buffer, master_file=master_file, master_buffer=master_buffer)
    if istp_loader is not None:
        return {variable: _load_variable(istp_loader, variable) for variable in variables}
    return None


def _resolve_url_type(url, prefix="", cache_remote_files=True):
    if url is None:
        return prefix + "file", None
    if type(url) is str:
        if is_local_file(url):
            return prefix + "file", urlparse(url=url).path
        return prefix + "buffer", any_loc_open(url, mode='rb', cache_remote_files=cache_remote_files).read()
    if type(url) in (memoryview, bytes):
        return prefix + "buffer", url
    if hasattr(url, 'read'):
        return prefix + "buffer", url.read()
    return prefix + "file", None


def _simplify_shape(values: np.ndarray) -> np.ndarray:
    if len(values.shape) == 2 and values.shape[1] == 1:
        return np.reshape(values, (-1))
    return values


def _convert_attributes_to_variables(variable_name: str, attrs: Mapping, cdf: pycdfpp.CDF):
    clean_attrs = {}
    for name, attr_v in attrs.items():
        target_name = f"{variable_name}_{name}_{variable_name}"
        if _PTR_rx.match(name):
            cdf.add_variable(
                name=target_name,
                values=attr_v
            )
            clean_attrs[name] = target_name
        else:
            clean_attrs[name] = attr_v
    return clean_attrs


def _write_axis(ax: VariableAxis, cdf: pycdfpp.CDF, compress_variables=False) -> bool:
    data_type = None
    if ax.values.dtype == np.dtype("datetime64[ns]"):
        data_type = pycdfpp.DataType.CDF_TIME_TT2000
    cdf.add_variable(
        name=ax.name,
        values=_simplify_shape(ax.values),
        data_type=data_type,
        compression=pycdfpp.CompressionType.gzip_compression if compress_variables else pycdfpp.CompressionType.no_compression
    )
    return True


def _write_variable(v: SpeasyVariable, cdf: pycdfpp.CDF, already_saved_axes: List[VariableAxis],
                    compress_variables=False) -> bool:
    def _already_in_cdf(ax: VariableAxis):
        for _ax in already_saved_axes:
            if _ax == ax:
                return _ax.name
        return None

    depends = {}
    for index, ax in enumerate(v.axes):
        a = _already_in_cdf(ax)
        if a is None:
            _write_axis(ax, cdf, compress_variables)
            depends[f"DEPEND_{index}"] = ax.name
            already_saved_axes.append(ax)
        else:
            depends[f"DEPEND_{index}"] = a.name
    attributes = v.meta
    attributes.update(depends)
    cdf.add_variable(
        name=v.name,
        values=_simplify_shape(v.values),
        attributes=_convert_attributes_to_variables(variable_name=v.name, attrs=attributes, cdf=cdf),
        compression=pycdfpp.CompressionType.gzip_compression if compress_variables else pycdfpp.CompressionType.no_compression
    )


[docs] @register_codec class IstpCdf(CodecInterface): """Codec for ISTP CDF files. This codec is a wrapper around PyISTP library. It supports some variations around the ISTP standard."""
[docs] def load_variables(self, variables: List[AnyStr], file: Union[Buffer, str, io.IOBase], cache_remote_files=True, master_cdf_url: Optional[Union[Buffer, str, io.IOBase]] = None, **kwargs ) -> Optional[Mapping[AnyStr, SpeasyVariable]]: kwargs["variables"] = variables kwargs.update((_resolve_url_type(file, prefix="", cache_remote_files=cache_remote_files), _resolve_url_type(master_cdf_url, prefix="master_", cache_remote_files=cache_remote_files))) return _load_variables(**kwargs)
[docs] @CacheCall(cache_retention=timedelta(seconds=120), is_pure=True) def load_variable(self, variable: AnyStr, file: Union[Buffer, str, io.IOBase], cache_remote_files=True, master_cdf_url: Optional[Union[Buffer, str, io.IOBase]] = None, **kwargs ) -> Optional[SpeasyVariable]: r = self.load_variables(variables=[variable], file=file, master_cdf_url=master_cdf_url, cache_remote_files=cache_remote_files, **kwargs) if r is not None: return r.get(variable) return None
[docs] def save_variables(self, variables: List[SpeasyVariable], file: Optional[Union[str, io.IOBase]] = None, compress_variables=False, **kwargs ) -> Union[bool, Buffer]: cdf = pycdfpp.CDF() axes = [] for variable in variables: if not isinstance(variable, SpeasyVariable): raise ValueError(f"Expected SpeasyVariable, got {type(variable)}") _write_variable(variable, cdf, axes, compress_variables) if type(file) is str: pycdfpp.save(cdf, file) return True elif hasattr(file, 'write'): file.write(pycdfpp.save(cdf)) return True elif file is None: return memoryview(pycdfpp.save(cdf)) return False
@property def supported_extensions(self) -> List[str]: return ["cdf"] @property def supported_mimetypes(self) -> List[str]: return ["application/x-cdf"] @property def name(self) -> str: return self.__class__.__name__