Source code for speasy.data_providers.csa

import io
import logging
import tarfile
from datetime import datetime, timedelta
from tempfile import TemporaryDirectory
from typing import Optional, Tuple, Dict

import numpy as np
from astroquery.utils.tap.core import TapPlus

from speasy.core import any_files, AllowedKwargs, fix_name, EnsureUTCDateTime
from speasy.core.codecs import get_codec, CodecInterface
from speasy.core.cache import Cacheable, CACHE_ALLOWED_KWARGS  # _cache is used for tests (hack...)
from speasy.core.dataprovider import DataProvider, ParameterRangeCheck, GET_DATA_ALLOWED_KWARGS
from speasy.core.datetime_range import DateTimeRange
from speasy.core.inventory.indexes import ParameterIndex, DatasetIndex, SpeasyIndex, make_inventory_node
from speasy.core.proxy import Proxyfiable, GetProduct, PROXY_ALLOWED_KWARGS
from speasy.core.requests_scheduling import SplitLargeRequests
from speasy.core.url_utils import build_url
from speasy.products.variable import SpeasyVariable

log = logging.getLogger(__name__)


def _only_primitive_types(d: dict) -> dict:
    for k, v in d.items():
        if not isinstance(v, (int, bool, str, type(None), list, tuple, set)):
            if isinstance(v, np.integer):
                d[k] = int(v)
            elif isinstance(v, np.floating):
                d[k] = float(v)
            elif isinstance(v, (np.bool_,bool)):
                d[k] = bool(v)
            elif isinstance(v, np.str_):
                d[k] = str(v)
            elif isinstance(v, np.ma.core.MaskedConstant):
                d[k] = None
            else:
                log.debug(f"Warning: removing non primitive type {type(v)} for key {k}")
    return d


[docs] def to_dataset_and_variable(index_or_str: ParameterIndex or str) -> Tuple[str, str]: if type(index_or_str) is str: parts = index_or_str.split('/') elif isinstance(index_or_str, ParameterIndex): parts = index_or_str.product.split('/') else: raise TypeError(f"given parameter {index_or_str} of type {type(index_or_str)} is not a compatible index") assert len(parts) == 2 return parts[0], parts[1]
[docs] def register_dataset(instruments, datasets, dataset): meta = {cname: dataset[cname] for cname in dataset.colnames} meta['stop_date'] = meta.pop('end_date') name = fix_name(meta['dataset_id']) node = make_inventory_node(instruments[dataset['instruments']], DatasetIndex, name=name, provider="csa", uid=meta['dataset_id'], **_only_primitive_types(meta)) datasets[meta['dataset_id']] = node
[docs] def register_observatory(missions, observatories, observatory): meta = {cname: observatory[cname] for cname in observatory.colnames} name = meta.pop('name') node = make_inventory_node(missions[observatory['mission_name']], SpeasyIndex, name=fix_name(name), provider="csa", uid=name, **_only_primitive_types(meta)) observatories[name] = node
[docs] def register_mission(inventory_tree, missions, mission): meta = {cname: mission[cname] for cname in mission.colnames} name = meta.pop('name') node = make_inventory_node(inventory_tree, SpeasyIndex, name=fix_name(name), provider="csa", uid=name, **_only_primitive_types(meta)) missions[name] = node
[docs] def register_instrument(observatories, instruments, instrument): meta = {cname: instrument[cname] for cname in instrument.colnames} name = meta.pop('name') node = make_inventory_node(observatories.get(instrument['observatories'], observatories['MULTIPLE']), SpeasyIndex, name=fix_name(name), provider="csa", uid=name, **_only_primitive_types(meta)) instruments[name] = node
[docs] def register_param(datasets, parameter): parent_dataset = datasets.get(parameter["dataset_id"], None) if parent_dataset is not None: meta = {cname: parameter[cname] for cname in parameter.colnames} meta['dataset'] = parameter["dataset_id"] meta['start_date'] = parent_dataset.start_date meta['stop_date'] = parent_dataset.stop_date name = fix_name(meta['parameter_id']) make_inventory_node(parent_dataset, ParameterIndex, name=name, provider="csa", uid=f"{parameter['dataset_id']}/{parameter['parameter_id']}", **_only_primitive_types(meta))
[docs] def build_inventory(root: SpeasyIndex, tapurl="https://csa.esac.esa.int/csa-sl-tap/tap/"): CSA = TapPlus(url=tapurl) missions_req = CSA.launch_job_async("SELECT * FROM csa.v_mission") observatories_req = CSA.launch_job_async("SELECT * FROM csa.v_observatory") instruments_req = CSA.launch_job_async("SELECT * FROM csa.v_instrument") datasets_req = CSA.launch_job_async( "SELECT * FROM csa.v_dataset WHERE dataset_id like '%GRMB' OR (is_cef='true' AND is_istp='true')") parameters_req = CSA.launch_job_async("SELECT * FROM csa.v_parameter WHERE data_type='Data' AND value_type<>'CHAR'") missions = {} observatories = {} instruments = {} datasets = {} list(map(lambda m: register_mission(root, missions, m), missions_req.get_results())) list(map(lambda o: register_observatory(missions, observatories, o), observatories_req.get_results())) list(map(lambda i: register_instrument(observatories, instruments, i), instruments_req.get_results())) list(map(lambda d: register_dataset(instruments, datasets, d), datasets_req.get_results())) list(map(lambda p: register_param(datasets, p), parameters_req.get_results())) return root
def _load_variable(archive: io.BytesIO, variable: str, cdf_codec: CodecInterface) -> Optional[SpeasyVariable]: with tarfile.open(fileobj=archive) as tar: tarname = tar.getnames() if len(tarname): with TemporaryDirectory() as tmp_dir: tar.extractall(tmp_dir) return cdf_codec.load_variable(file=f"{tmp_dir}/{tarname[0]}", variable=variable) return None
[docs] def get_parameter_args(start_time: datetime, stop_time: datetime, product: str, **kwargs): return {'path': f"csa/{product}", 'start_time': f'{start_time.isoformat()}', 'stop_time': f'{stop_time.isoformat()}'}
[docs] class CsaWebservice(DataProvider): BASE_URL = "https://csa.esac.esa.int" def __init__(self): DataProvider.__init__(self, provider_name='csa') self.__url = f"{self.BASE_URL}/csa-sl-tap/data" self._cdf_codec = get_codec("application/x-cdf") def _dataset_range(self, dataset: str or DatasetIndex) -> DateTimeRange: if type(dataset) is str: dataset = self.flat_inventory.datasets[dataset] return DateTimeRange(dataset.start_date, dataset.stop_date) def _dl_variable(self, dataset: str, variable: str, start_time: datetime, stop_time: datetime, extra_http_headers: Dict[str, str] or None = None) -> \ Optional[SpeasyVariable]: # https://csa.esac.esa.int/csa-sl-tap/data?RETRIEVAL_TYPE=product&&DATASET_ID=C3_CP_PEA_LERL_DEFlux&START_DATE=2001-06-10T22:12:14Z&END_DATE=2001-06-11T06:12:14Z&DELIVERY_FORMAT=CDF_ISTP&DELIVERY_INTERVAL=all ds_range = self._dataset_range(dataset) if not ds_range.intersect(DateTimeRange(start_time, stop_time)): log.warning(f"You are requesting {dataset}/{variable} outside of its definition range {ds_range}") return None headers = {} if extra_http_headers is not None: headers.update(extra_http_headers) return _load_variable( any_files.any_loc_open( build_url(base=self.__url, parameters={ "RETRIEVAL_TYPE": "product", "DATASET_ID": dataset, "START_DATE": start_time.strftime('%Y-%m-%dT%H:%M:%SZ'), "END_DATE": stop_time.strftime('%Y-%m-%dT%H:%M:%SZ'), "DELIVERY_FORMAT": "CDF_ISTP", "DELIVERY_INTERVAL": "all" }), headers=headers), variable, cdf_codec=self._cdf_codec)
[docs] @staticmethod def build_inventory(root: SpeasyIndex): return build_inventory(root)
[docs] def parameter_range(self, parameter_id: str or ParameterIndex) -> Optional[DateTimeRange]: """Get product time range. Parameters ---------- parameter_id: str or ParameterIndex parameter id Returns ------- Optional[DateTimeRange] Data time range Examples -------- >>> import speasy as spz >>> spz.csa.parameter_range("C3_CP_WBD_WAVEFORM_BM2/B__C3_CP_WBD_WAVEFORM_BM2") <DateTimeRange: ... -> ...> """ return self._parameter_range(parameter_id)
[docs] def dataset_range(self, dataset_id: str or DatasetIndex) -> Optional[DateTimeRange]: """Get product time range. Parameters ---------- dataset_id: str or DatasetIndex parameter id Returns ------- Optional[DateTimeRange] Data time range Examples -------- >>> import speasy as spz >>> spz.csa.dataset_range("D2_CP_FGM_SPIN") <DateTimeRange: 2004-07-27T00:00:00+00:00 -> ...> """ return self._dataset_range(dataset_id)
[docs] def product_last_update(self, product: str or ParameterIndex): """Get date of last modification of dataset or parameter. Parameters ---------- product: str or ParameterIndex product Returns ------- str product last update date """ dataset, _ = to_dataset_and_variable(product) return self.flat_inventory.datasets[dataset].date_last_update
[docs] @AllowedKwargs(PROXY_ALLOWED_KWARGS + CACHE_ALLOWED_KWARGS + GET_DATA_ALLOWED_KWARGS) @EnsureUTCDateTime() @ParameterRangeCheck() @Cacheable(prefix="csa", fragment_hours=lambda x: 12, version=product_last_update) @SplitLargeRequests(threshold=lambda x: timedelta(days=7)) @Proxyfiable(GetProduct, get_parameter_args) def get_data(self, product, start_time: datetime, stop_time: datetime, extra_http_headers: Dict[str, str] or None = None): dataset, variable = to_dataset_and_variable(product) return self._dl_variable(start_time=start_time, stop_time=stop_time, dataset=dataset, variable=variable, extra_http_headers=extra_http_headers)
[docs] def get_variable(self, dataset: str, variable: str, start_time: datetime or str, stop_time: datetime or str, **kwargs) -> \ Optional[SpeasyVariable]: return self.get_data(f"{dataset}/{variable}", start_time, stop_time, **kwargs)