Source code for speasy.core.impex.client

from enum import Enum
from typing import Dict, Union
from json.decoder import JSONDecodeError
import logging
import time

from ...core import http
from ...core.http import is_server_up

from .exceptions import MissingCredentials, UnavailableEndpoint

log = logging.getLogger(__name__)


[docs] class ImpexEndpoint(Enum): """Impex API endpoints. """ AUTH = "auth.php" OBSTREE = "getObsDataTree.php" LISTTT = "getTimeTablesList.php" LISTCAT = "getCatalogsList.php" LISTPARAM = "getParameterList.php" GETTT = "getTimeTable.php" GETCAT = "getCatalog.php" GETPARAM = "getParameter.php" GETSTATUS = "getStatus.php" ISALIVE = "isAlive.php"
[docs] class ImpexClient: def __init__(self, server_url="", capabilities=None, username="", password="", output_format="CDF_ISTP", time_format='UNIXTIME'): self.server_url = server_url if capabilities is None: capabilities = [ImpexEndpoint.OBSTREE, ImpexEndpoint.GETPARAM] self.capabilities = capabilities self.username = username self.password = password self.output_format = output_format self.time_format = time_format self.use_token = self.is_capable(ImpexEndpoint.AUTH)
[docs] def is_capable(self, api: ImpexEndpoint): return api in self.capabilities
[docs] def credential_are_valid(self): return self.username != "" and self.password != ""
[docs] def get_credentials(self): if self.credential_are_valid(): return self.username, self.password else: raise MissingCredentials()
[docs] def reset_credentials(self, username: str = "", password: str = ""): self.username = username self.password = password
[docs] def reachable(self): try: return is_server_up(url=f"{self.server_url}/") except (Exception,): pass return False
[docs] def is_alive(self): if not self.is_capable(ImpexEndpoint.ISALIVE): return True return self._send_request(ImpexEndpoint.ISALIVE)
[docs] def auth(self): return self._send_request(ImpexEndpoint.AUTH)
[docs] @staticmethod def in_progress(result): return result == "in progress"
[docs] def get_obs_data_tree(self, use_credentials=False, **kwargs): params = { } if kwargs.get('add_template_info', False): params['templateInfo'] = True if use_credentials: params['userID'], params['password'] = self.get_credentials() return self._send_indirect_request(ImpexEndpoint.OBSTREE, params=params)
[docs] def get_time_table_list(self, use_credentials=False): params = {} if use_credentials: params['userID'], params['password'] = self.get_credentials() return self._send_indirect_request(ImpexEndpoint.LISTTT, params=params)
[docs] def get_catalog_list(self, use_credentials=False): params = {} if use_credentials: params['userID'], params['password'] = self.get_credentials() return self._send_indirect_request(ImpexEndpoint.LISTCAT, params=params)
[docs] def get_derived_parameter_list(self): params = {} params['userID'], params['password'] = self.get_credentials() return self._send_indirect_request(ImpexEndpoint.LISTPARAM, params=params)
[docs] def get_status(self, process_id, extra_http_headers=None): params = { 'id': process_id, } return self._send_request(ImpexEndpoint.GETSTATUS, params=params, extra_http_headers=extra_http_headers)
[docs] def get_parameter(self, start_time, stop_time, parameter_id, extra_http_headers=None, use_credentials=False, **kwargs): params = { 'startTime': start_time, 'stopTime': stop_time, 'parameterID': kwargs.get('real_product_id', parameter_id), 'outputFormat': kwargs.get('output_format', self.output_format) } if kwargs.get('time_format'): params['timeFormat'] = kwargs.get('time_format') if use_credentials: params['userID'], params['password'] = self.get_credentials() if self.use_token: params['token'] = self.auth() return self._send_request(ImpexEndpoint.GETPARAM, params=params, extra_http_headers=extra_http_headers, timeout=5 * 60)
[docs] def get_timetable(self, tt_id, use_credentials=False, **kwargs): params = { 'ttID': tt_id } if use_credentials: params['userID'], params['password'] = self.get_credentials() return self._send_request(ImpexEndpoint.GETTT, params=params, **kwargs)
[docs] def get_catalog(self, catalog_id, use_credentials=False, **kwargs): params = { 'catID': catalog_id } if use_credentials: params['userID'], params['password'] = self.get_credentials() return self._send_request(ImpexEndpoint.GETCAT, params=params, **kwargs)
def _request_url(self, endpoint: ImpexEndpoint) -> str: if isinstance(endpoint, ImpexEndpoint): return f"{self.server_url}/{endpoint.value}" else: raise TypeError(f"You must provide an {ImpexEndpoint} instead of {type(endpoint)}") def _send_indirect_request(self, endpoint: ImpexEndpoint, params: dict = None, timeout: int = http.DEFAULT_TIMEOUT) -> str or None: next_url = self._send_request(endpoint=endpoint, params=params, timeout=timeout) if '<' in next_url and '>' in next_url: next_url = next_url.split(">")[1].split("<")[0] r = http.get(next_url, timeout=timeout) if r.status_code == 200: return r.text.strip() return None def _send_request(self, endpoint: ImpexEndpoint, params: Dict = None, timeout: int = http.DEFAULT_TIMEOUT, extra_http_headers: Union[Dict, None] = None) -> Union[str, bool, None]: if not self.is_capable(endpoint): raise UnavailableEndpoint() url = self._request_url(endpoint) params = params or {} http_headers = extra_http_headers or {} r = http.get(url, params=params, headers=http_headers, timeout=timeout) if r.status_code != 200: log.debug(f"Failed: {r.status_code}") return None try: js = r.json() if 'success' in js and (js['success'] is True) and ('dataFileURLs' in js): log.debug(f"success: {js['dataFileURLs']}") return js['dataFileURLs'] elif "success" in js and (js["success"] is True) and ("status" in js): if ImpexClient.in_progress(js['status']): if "id" in js: log.warning("This request duration is too long, consider reducing time range") process_id = js["id"] elif "id" in params: process_id = params["id"] else: log.debug("Cannot retrieve process id") return None default_sleep_time = 10. time.sleep(default_sleep_time) return self.get_status(process_id) elif "alive" in js and (js["alive"] is True): return True else: log.debug(f"Failed: {r.text}") except JSONDecodeError: return r.text.strip() return None