Source code for speasy.core.requests_scheduling.request_dispatch

import os
from datetime import datetime
from typing import Iterable, List, Optional, Tuple, Union, overload
import traceback

import numpy as np
import logging

from .. import is_collection, progress_bar
from ..datetime_range import DateTimeRange
from ..inventory.indexes import (CatalogIndex, ComponentIndex,
                                 DatasetIndex, ParameterIndex,
                                 SpeasyIndex, TimetableIndex)
from ...config import core as core_cfg
from ...products import *
from ...data_providers import (AmdaWebservice, CdaWebservice, CsaWebservice,
                               SscWebservice, GenericArchive, UiowaEphTool,
                               Cdpp3dViewWebservice)
from ..http import is_server_up

log = logging.getLogger(__name__)

TimeT = Union[str, datetime, float, np.datetime64]
TimeRangeT = Union[DateTimeRange, Tuple[TimeT, TimeT]]
TimeSerieIndexT = Union[ParameterIndex, ComponentIndex]
TimeRangeCollectionT = Union[TimetableIndex, CatalogIndex, Iterable[Iterable[Union[TimeT]]]]

PROVIDERS = {}
amda = None
csa = None
cda = None
ssc = None
archive = None
uiowaephtool = None
cdpp3dview = None


def _is_server_up(ws_class):
    """Check if the webservice server is up. Will first look for an 'is_server_up' method in the class,
    then for a 'BASE_URL' attribute to use the generic 'is_server_up' function finally returns True if none of these are found.

    Parameters
    ----------
    ws_class : class
        The webservice class to check.
    Returns
    -------
    bool
        True if the server is up, False otherwise.
    """
    if hasattr(ws_class, 'is_server_up'):
        return ws_class.is_server_up()
    elif hasattr(ws_class, 'BASE_URL'):
        return is_server_up(ws_class.BASE_URL)
    return True


def _safe_init_provider(ws_class, names, ignore_disabled_status=False):
    """Initialize a data provider safely, catching exceptions and disabling the provider if initialization fails.

    Parameters
    ----------
    ws_class : class
        The webservice class to initialize.
    names : list of str
        The names under which to register the provider, the first name is considered the main name.
    ignore_disabled_status : bool, optional
        If True, ignore the disabled status from configuration and attempt to initialize the provider anyway.

    Returns
    -------
    None

    Raises
    ------
    RuntimeError
        If the server is not running.
    """
    try:
        if ignore_disabled_status or not core_cfg.disabled_providers().intersection(set(names)):
            main_name = names[0]
            if _is_server_up(ws_class):
                globals()[main_name] = ws_class()
                for name in names:
                    PROVIDERS[name] = globals()[main_name]
            else:
                raise RuntimeError(f'{main_name} is not running')
    except Exception:  # pylint: disable=broad-except
        log.warning(f"Provider {names} initialization failed, disabling provider")
        log.warning(f"Exception: {traceback.format_exc()}")


[docs] def init_amda(ignore_disabled_status=False): _safe_init_provider(AmdaWebservice, ['amda'], ignore_disabled_status=ignore_disabled_status)
[docs] def init_csa(ignore_disabled_status=False): if os.environ.get("HTTP_PROXY", None) is not None: log.warning("CSA webservice does not support proxy servers, disabling CSA provider") log.warning("See https://github.com/astropy/astroquery/issues/3228") else: _safe_init_provider(CsaWebservice, ['csa'], ignore_disabled_status=ignore_disabled_status)
[docs] def init_cdaweb(ignore_disabled_status=False): _safe_init_provider(CdaWebservice, ['cda', 'cdaweb'], ignore_disabled_status=ignore_disabled_status)
[docs] def init_sscweb(ignore_disabled_status=False): _safe_init_provider(SscWebservice, ['ssc', 'sscweb'], ignore_disabled_status=ignore_disabled_status)
[docs] def init_archive(ignore_disabled_status=False): _safe_init_provider(GenericArchive, ['archive', 'generic_archive'], ignore_disabled_status=ignore_disabled_status)
[docs] def init_uiowaephtool(ignore_disabled_status=False): _safe_init_provider(UiowaEphTool, ['uiowaephtool', 'UiowaEphTool'], ignore_disabled_status=ignore_disabled_status)
[docs] def init_cdpp3dview(ignore_disabled_status=False): _safe_init_provider(Cdpp3dViewWebservice, ['cdpp3dview', '3DView'], ignore_disabled_status=ignore_disabled_status)
[docs] def init_providers(ignore_disabled_status=False): init_amda(ignore_disabled_status=ignore_disabled_status) init_csa(ignore_disabled_status=ignore_disabled_status) init_cdaweb(ignore_disabled_status=ignore_disabled_status) init_sscweb(ignore_disabled_status=ignore_disabled_status) init_archive(ignore_disabled_status=ignore_disabled_status) init_uiowaephtool(ignore_disabled_status=ignore_disabled_status) init_cdpp3dview(ignore_disabled_status=ignore_disabled_status)
if 'SPEASY_SKIP_INIT_PROVIDERS' not in os.environ: init_providers()
[docs] def list_providers() -> List[str]: return list(PROVIDERS.keys())
@overload def get_data(product: CatalogIndex, **kwargs) -> Catalog: ... @overload def get_data(product: TimetableIndex, **kwargs) -> TimeTable: ... @overload def get_data(product: str, **kwargs) -> TimeTable or Catalog: ... @overload def get_data(product: DatasetIndex, start_time: TimeT, stop_time: TimeT, **kwargs) -> Dataset or None: ... @overload def get_data(product: DatasetIndex, time_range: TimeRangeT, **kwargs) -> Dataset or None: ... @overload def get_data(product: DatasetIndex, time_range: Iterable[TimeRangeT], **kwargs) -> List[Optional[Dataset]] or None: ... @overload def get_data(product: TimeSerieIndexT, start_time: TimeT, stop_time: TimeT, **kwargs) -> SpeasyVariable or None: ... @overload def get_data(product: TimeSerieIndexT, time_range: TimeRangeT, **kwargs) -> SpeasyVariable or None: ... @overload def get_data(product: TimeSerieIndexT, time_range: Iterable[TimeRangeT], **kwargs) -> List[Optional[ SpeasyVariable]] or None: ... @overload def get_data(product: TimeSerieIndexT, time_range: TimeRangeCollectionT, **kwargs) -> List[Optional[ SpeasyVariable]] or None: ... @overload def get_data(product: Iterable[TimeSerieIndexT], start_time: TimeT, stop_time: TimeT, **kwargs) -> List[Optional[SpeasyVariable]] or None: ... @overload def get_data(product: Iterable[TimeSerieIndexT], time_range: TimeRangeCollectionT, **kwargs) -> List[List[ Optional[SpeasyVariable]]] or None: ... def _could_be_datetime(value): return type(value) in (str, datetime, np.datetime64, float)
[docs] def provider_and_product(path_or_product: str or SpeasyIndex) -> (str, str): """Breaks given product in two parts: provider and product UID Parameters ---------- path_or_product : str or SpeasyIndex The product as SpeasyIndex or path as string you want to split Returns ------- (str, str) the provider UID and the product UID """ if isinstance(path_or_product, SpeasyIndex): return path_or_product.spz_provider().lower(), path_or_product.spz_uid() elif type(path_or_product) is str: if '/' in path_or_product: provider_uid, product_uid = path_or_product.split('/', 1) return provider_uid, product_uid else: raise ValueError(f"Given string does not look like a path {path_or_product}") raise TypeError( f"Wrong type for {path_or_product}, expecting a string or a SpeasyIndex, got {type(path_or_product)}")
def _scalar_get_data(index, *args, **kwargs): provider_uid, product_uid = provider_and_product(index) if provider_uid in PROVIDERS: return PROVIDERS[provider_uid].get_data(product_uid, *args, **kwargs) raise ValueError(f"Can't find a provider for {index}") def _get_catalog_or_timetable(index, **kwargs): return _scalar_get_data(index, **kwargs) def _get_timeserie1(index, dtrange, **kwargs): return _scalar_get_data(index, dtrange[0], dtrange[1], **kwargs) def _get_timeserie2(index, start, stop, **kwargs): return _scalar_get_data(index, start, stop, **kwargs) def _compile_args(*args, **kwargs): if len(args) == 0: if 'product' in kwargs: args = [kwargs.pop('product')] if len(args) == 1: if 'start_time' in kwargs and 'stop_time' in kwargs: args += [kwargs.pop('start_time'), kwargs.pop('stop_time')] elif 'time_range' in kwargs: args += [kwargs.pop('time_range')] return args, kwargs def _is_dtrange(value): return type(value) is DateTimeRange or ( hasattr(value, '__len__') and len(value) == 2 and _could_be_datetime(value[0]) and _could_be_datetime(value[1]))
[docs] def get_data(*args, **kwargs) -> MaybeAnyProduct: """Retrieve requested product(s). Speasy gives access to two kind of products, time-dependent products such as physical measurements or trajectories and products such as timetables or event catalogs. So depending on which product you want to retrieve :func:`speasy.get_data` accepts different sets of arguments: - For time-independent products: .. highlight:: python .. code-block:: python get_data(product_or_products, **kwargs) - For time-dependent products: .. highlight:: python .. code-block:: python get_data(product_or_products, start_time, stop_time, **kwargs) get_data(product_or_products, datetime_range_or_datetime_range, **kwargs) Since get_data accepts both at the same time a list of products and a list of ranges, it will always iterate first on products then on datetime ranges. In other words, all products will be retrieved for all given datetime ranges. Parameters ---------- args : Either a time independent or a list of time independent indexes or any combination of time dependent or list of time dependent indexes plus a datetime range or a list of datetime ranges. See examples below for more details. kwargs : For webservice specific keyword arguments check :doc:`/user/data_providers`. - disable_proxy: bool ignore proxy configuration and always bypass proxy server when True (default: False). - disable_cache: bool ignore cache content when True (default: False). - progress: bool show progress bar when True (default: False). Returns ------- requested product(s) according to given parameters, either a single product or a collection of products. Examples -------- - A simple parameter request on a single tile range: >>> import speasy as spz >>> spz.get_data("amda/imf_gsm", "2016-10-10", "2016-10-11") <speasy.products.variable.SpeasyVariable object at ...> - Same with a catalog using Speasy dynamic inventory: >>> import speasy as spz >>> amda_catalogs = spz.inventories.tree.amda.Catalogs >>> spz.get_data(amda_catalogs.SharedCatalogs.EARTH.model_regions_plasmas_cluster_2005) <Catalog: model_regions_plasmas_cluster_2005> - You can also request a collection of catalogs: >>> import speasy as spz >>> amda_catalogs = spz.inventories.tree.amda.Catalogs >>> spz.get_data([amda_catalogs.SharedCatalogs.EARTH.model_regions_plasmas_cluster_2005, ... amda_catalogs.SharedCatalogs.EARTH.model_regions_plasmas_mms_2019]) [<Catalog: model_regions_plasmas_cluster_2005>, <Catalog: model_regions_plasmas_mms_2019>] - You can also request a parameter for several intervals: >>> import speasy as spz >>> spz.get_data("amda/imf_gsm", [["2016-10-10", "2016-10-11"], ... ["2017-10-10", "2017-10-11"]]) [<speasy.products.variable.SpeasyVariable object at ...>, <speasy.products.variable.SpeasyVariable object at ...>] - Several products on a single interval: >>> import speasy as spz >>> spz.get_data(["amda/imf_gsm", spz.inventories.tree.ssc.Trajectories.wind], ... "2016-10-10", "2016-10-11") [<speasy.products.variable.SpeasyVariable object at ...>, <speasy.products.variable.SpeasyVariable object at ...>] - Several products for several time ranges: >>> import speasy as spz >>> data= spz.get_data(["amda/imf_gsm", ... spz.inventories.tree.ssc.Trajectories.wind], ... [["2016-10-10", "2016-10-11"], ... ["2017-10-10", "2017-10-11"]]) >>> len(data), len(data[0]) (2, 2) - A catalog or a timetable can also be used as time ranges collection to download a product: >>> import speasy as spz >>> amda_shared_tt = spz.inventories.tree.amda.TimeTables.SharedTimeTables >>> mex_inventory = spz.inventories.tree.amda.Parameters.MEX >>> mgs_inventory = spz.inventories.tree.amda.Parameters.MGS >>> conj_mex_mgs = spz.get_data(amda_shared_tt.MARS.conjonctions_mex_mgs_2004_0) >>> data = spz.get_data( ... [mex_inventory.ELS.mex_els_all.mex_els_spec, ... mgs_inventory.MAG.mgs_mag_mso.b_mgs_mso], ... conj_mex_mgs) >>> len(data), len(data[0]) (2, 28) - Can even pass a CatalogIndex or a TimeTableIndex directly: >>> import speasy as spz >>> amda_shared_tt = spz.inventories.tree.amda.TimeTables.SharedTimeTables >>> mex_inventory = spz.inventories.tree.amda.Parameters.MEX >>> mgs_inventory = spz.inventories.tree.amda.Parameters.MGS >>> data = spz.get_data( ... [mex_inventory.ELS.mex_els_all.mex_els_spec, ... mgs_inventory.MAG.mgs_mag_mso.b_mgs_mso], ... amda_shared_tt.MARS.conjonctions_mex_mgs_2004_0) >>> len(data), len(data[0]) (2, 28) """ args, kwargs = _compile_args(*args, **kwargs) if len(args) == 0: raise ValueError("You must at least provide a product to retrieve") product = args[0] if is_collection(product) and not isinstance(product, SpeasyIndex): return list(map(lambda p: get_data(p, *args[1:], **kwargs), progress_bar(leave=True, **kwargs)(product))) if len(args) == 1: return _get_catalog_or_timetable(*args, **kwargs) if len(args) == 2: t_range = args[1] if _is_dtrange(t_range): return _get_timeserie1(*args, **kwargs) if is_collection(t_range): return list( map(lambda r: get_data(product, r, *args[2:], **kwargs), progress_bar(leave=False, **kwargs)(t_range))) return get_data(product, get_data(t_range), *args[2:], **kwargs) if len(args) == 3: return _get_timeserie2(*args, **kwargs)