"""
"""
import json
import logging
import warnings
import os
from datetime import datetime
from typing import Dict, Optional
from ...config import amda as amda_cfg
from ...core import AllowedKwargs, make_utc_datetime, EnsureUTCDateTime
from ...core.http import is_server_up
from ...core.cache import CACHE_ALLOWED_KWARGS, Cacheable, CacheCall
from ...core.dataprovider import (GET_DATA_ALLOWED_KWARGS, ParameterRangeCheck)
from ...core.datetime_range import DateTimeRange
from ...core.inventory.indexes import (CatalogIndex, ParameterIndex, TemplatedParameterIndex,
SpeasyIndex, TimetableIndex, ArgumentIndex)
from ...core.proxy import PROXY_ALLOWED_KWARGS, GetProduct, Proxyfiable, Version
from ...inventories import flat_inventories
from ...products.catalog import Catalog
from ...products.timetable import TimeTable
from ...products.variable import SpeasyVariable
from ...core.impex import ImpexProvider, ImpexEndpoint, to_xmlid
from ...core.impex.exceptions import BadTemplateArgDefinition
log = logging.getLogger(__name__)
amda_provider_name = 'amda'
amda_capabilities = [ImpexEndpoint.AUTH, ImpexEndpoint.OBSTREE, ImpexEndpoint.GETPARAM, ImpexEndpoint.LISTTT,
ImpexEndpoint.GETTT, ImpexEndpoint.LISTCAT, ImpexEndpoint.GETCAT, ImpexEndpoint.LISTPARAM,
ImpexEndpoint.GETSTATUS]
amda_name_mapping = {
"dataset": "xmlid",
"parameter": "xmlid",
"folder": "name",
"component": "xmlid",
"arguments": "name",
"argument": "key",
"item": "key"
}
AMDA_MIN_PROXY_VERSION = Version("0.12.0")
def _amda_arguments_to_dict(index):
if isinstance(index, SpeasyIndex):
res = {}
for key, value in index.__dict__.items():
if isinstance(value, SpeasyIndex) or isinstance(value, str) and key in ['name', 'default', 'type']:
if index.spz_name() == 'items_list':
key = value.key
res[key] = _amda_arguments_to_dict(value)
return res
elif type(index) is not str:
return str(index)
return index
def _argument_fits_allowed_values(value:str, argument_desc:ArgumentIndex):
if argument_desc.type == 'list':
return value in list(zip(*argument_desc.choices))[1]
return True
def _stack_level_outside_of_speasy():
import inspect
import speasy as spz
level = 0
for frame in inspect.stack():
if os.path.dirname(spz.__file__) not in frame.filename:
return level
level += 1
return level
def _amda_replace_arguments_in_template(product: TemplatedParameterIndex, product_inputs: Dict[str,str]):
product_id = product.template
for arg in product.spz_arguments():
k = arg.key
v = product_inputs.get(k)
if v is None:
v = arg.default
import speasy as spz
warnings.warn(f"""Argument {arg.key} is not provided, using default value {v}
You can set Derived Parameters inputs using:
spz.get_data("amda/{product.spz_uid()}, start_time, stop_time, product_inputs={{'{k}': '{arg.default}' }})
""", category=RuntimeWarning, stacklevel=_stack_level_outside_of_speasy())
if arg.type == 'list' and not _argument_fits_allowed_values(v, arg):
raise BadTemplateArgDefinition(f"""Argument {arg.key} has value {v} which is not in the allowed values {arg.choices}""")
product_id = product_id.replace(f'##{k}##', str(v), 1)
return product_id
def _amda_get_real_product_id(product_id: str or SpeasyIndex, **kwargs):
product_id = to_xmlid(product_id)
product = flat_inventories.__dict__[amda_provider_name].parameters[product_id]
if isinstance(product, TemplatedParameterIndex) and not hasattr(product, 'predefined'):
product_inputs = kwargs.get('product_inputs', {})
real_product_id = _amda_replace_arguments_in_template(product, product_inputs)
else:
real_product_id = product_id
return real_product_id
def _amda_cache_entry_name(prefix: str, product: str, start_time: str, **kwargs):
output_format: str = kwargs.get('output_format', 'cdf_istp')
real_product_id = _amda_get_real_product_id(product, **kwargs)
if output_format.lower() == 'cdf_istp':
return f"{prefix}/{real_product_id}-cdf_istp/{start_time}"
else:
return f"{prefix}/{real_product_id}/{start_time}"
def _amda_get_proxy_parameter_args(start_time: datetime, stop_time: datetime, product: str, **kwargs) -> Dict:
proxy_args = {'path': f"{amda_provider_name}/{product}", 'start_time': f'{start_time.isoformat()}',
'stop_time': f'{stop_time.isoformat()}',
'output_format': kwargs.get('output_format', amda_cfg.output_format.get())}
if kwargs.get('product_inputs') and isinstance(kwargs.get('product_inputs'), Dict):
proxy_args['product_inputs'] = json.dumps(kwargs.get('product_inputs'))
return proxy_args
[docs]
class AmdaWebservice(ImpexProvider):
def __init__(self):
ImpexProvider.__init__(self, provider_name=amda_provider_name, server_url=amda_cfg.entry_point() + "/php/rest",
max_chunk_size_days=amda_cfg.max_chunk_size_days(),
capabilities=amda_capabilities, name_mapping=amda_name_mapping,
username=amda_cfg.username(), password=amda_cfg.password(),
output_format=amda_cfg.output_format(), min_proxy_version=AMDA_MIN_PROXY_VERSION)
[docs]
@staticmethod
def is_server_up():
"""Check if AMDA Webservice is up by sending a dummy request to the AMDA Webservice URL with a short timeout.
Returns
-------
bool
True if AMDA Webservice is up, False otherwise.
"""
try:
return is_server_up(url=amda_cfg.entry_point())
except (Exception,):
pass
return False
[docs]
def has_time_restriction(self, product_id: str or SpeasyIndex, start_time: str or datetime,
stop_time: str or datetime):
"""Check if product is restricted for a given time range.
Parameters
----------
product_id: str or SpeasyIndex
product id
start_time: str or datetime
desired data start time
stop_time: str or datetime
desired data stop time
Returns
-------
bool
True if product is restricted for the given time range, False otherwise.
"""
dataset = self.find_parent_dataset(product_id)
if dataset:
dataset = self.flat_inventory.datasets[dataset]
if hasattr(dataset, 'timeRestriction'):
lower = make_utc_datetime(dataset.timeRestriction)
upper = make_utc_datetime(dataset.stop_date)
if lower < upper:
return DateTimeRange(lower, upper).intersect(
DateTimeRange(start_time, stop_time))
return False
[docs]
def product_version(self, parameter_id: str or ParameterIndex):
"""Get date of last modification of dataset or parameter.
Parameters
----------
parameter_id: str or ParameterIndex
parameter id
Returns
-------
str
product version
"""
dataset = self.find_parent_dataset(parameter_id)
if hasattr(self.flat_inventory.datasets[dataset], 'lastModificationDate'):
return self.flat_inventory.datasets[dataset].lastModificationDate
return self.flat_inventory.datasets[dataset].lastUpdate
[docs]
def get_real_product_id(self, product_id: str or SpeasyIndex, **kwargs):
return _amda_get_real_product_id(product_id, **kwargs)
[docs]
@CacheCall(cache_retention=amda_cfg.user_cache_retention(), is_pure=True)
def get_catalog(self, catalog_id: str or CatalogIndex, **kwargs) -> Optional[Catalog]:
"""Get catalog data by ID.
Parameters
----------
catalog_id: str or CatalogIndex
catalog id
Returns
-------
Optional[Catalog]
catalog data
Examples
--------
>>> import speasy as spz
>>> spz.amda.get_catalog("sharedcatalog_22")
<Catalog: model_regions_plasmas_mms_2019>
"""
return super().get_catalog(catalog_id, **kwargs)
[docs]
@CacheCall(cache_retention=amda_cfg.user_cache_retention())
def get_user_catalog(self, catalog_id: str or CatalogIndex, **kwargs) -> Optional[Catalog]:
"""Get user catalog. Raises an exception if user is not authenticated.
Parameters
----------
catalog_id: str or CatalogIndex
catalog id
Returns
-------
Optional[Catalog]
user catalog
Examples
--------
>>> import speasy as spz
>>> spz.amda.get_user_catalog("tt_0") # doctest: +SKIP
<Catalog: test_alexis>
Warnings
--------
Calling :meth:`~speasy.amda.amda.AMDA_Webservice.get_user_catalog` without having defined AMDA_Webservice
login credentials will result in a :class:`~speasy.core.impex.exceptions.MissingCredentials`
exception being raised.
"""
return super().get_user_catalog(catalog_id)
@AllowedKwargs(
PROXY_ALLOWED_KWARGS + CACHE_ALLOWED_KWARGS + GET_DATA_ALLOWED_KWARGS +
['output_format', 'restricted_period', 'product_inputs'])
@EnsureUTCDateTime()
@ParameterRangeCheck()
@Cacheable(prefix=amda_provider_name, version=product_version, fragment_hours=lambda x: 12,
entry_name=_amda_cache_entry_name)
@Proxyfiable(GetProduct, _amda_get_proxy_parameter_args, min_version=AMDA_MIN_PROXY_VERSION)
def _get_parameter(self, product, start_time, stop_time,
extra_http_headers: Dict or None = None, output_format: str or None = None,
restricted_period=False, **kwargs) -> \
Optional[
SpeasyVariable]:
"""Get parameter data.
Parameters
----------
product: str or ParameterIndex
parameter id
start_time:
desired data start time
stop_time:
desired data stop time
extra_http_headers: dict
reserved for internal use
output_format: str
request output format in case of success, only CDF_ISTP is supported for now
Returns
-------
Optional[SpeasyVariable]
product data if available
Examples
--------
>>> import speasy as spz
>>> import datetime
>>> imf_data = spz.amda.get_parameter("imf", "2018-01-01", "2018-01-01T01")
>>> print(imf_data.columns)
['imf[0]', 'imf[1]', 'imf[2]']
>>> print(imf_data.values.shape)
(225, 3)
"""
return super()._get_parameter(product, start_time, stop_time, extra_http_headers=extra_http_headers,
output_format=output_format, restricted_period=restricted_period, **kwargs)
@CacheCall(cache_retention=24 * 60 * 60, is_pure=True)
def _get_obs_data_tree(self) -> str or None:
return super()._get_obs_data_tree(add_template_info=True)
@CacheCall(cache_retention=amda_cfg.user_cache_retention(), is_pure=True)
def _get_timetables_tree(self) -> str or None:
return super()._get_timetables_tree()
@CacheCall(cache_retention=amda_cfg.user_cache_retention(), is_pure=True)
def _get_user_timetables_tree(self) -> str or None:
return super()._get_user_timetables_tree()
@CacheCall(cache_retention=amda_cfg.user_cache_retention(), is_pure=True)
def _get_catalogs_tree(self) -> str or None:
return super()._get_catalogs_tree()
@CacheCall(cache_retention=amda_cfg.user_cache_retention(), is_pure=True)
def _get_user_catalogs_tree(self) -> str or None:
return super()._get_user_catalogs_tree()
@CacheCall(cache_retention=amda_cfg.user_cache_retention(), is_pure=True)
def _get_derived_parameter_tree(self) -> str or None:
return super()._get_derived_parameter_tree()