import json
import logging
import os
import platform
import re
import time
from functools import partial, cache
from typing import Optional, Dict
import urllib3.response
from urllib3 import PoolManager, ProxyManager
from urllib3.util.retry import Retry
import certifi
import netrc
from speasy import __version__
from speasy.config import core as core_config
from .url_utils import host_and_port, ApplyRewriteRules
from .platform import is_running_on_wasm
log = logging.getLogger(__name__)
USER_AGENT = core_config.http_user_agent.get() or f'Speasy/{__version__} {platform.uname()} (SciQLop project)'
DEFAULT_TIMEOUT = 60 # seconds
DEFAULT_DELAY = 5 # seconds
DEFAULT_RETRY_COUNT = 5
STATUS_FORCE_LIST = [500, 502, 504, 413, 429, 503]
RETRY_AFTER_LIST = [429, 503] # Note: Specific treatment for 429 & 503 error codes (see below)
_HREF_REGEX = re.compile(' href="([A-Za-z0-9.-_]+)">')
def _connection_manager_builder():
kwargs = {
'num_pools': core_config.urlib_num_pools.get(),
'maxsize': core_config.urlib_pool_size.get(),
'cert_reqs': 'CERT_REQUIRED',
'ca_certs': certifi.where()
}
if os.environ.get("HTTP_PROXY", None) is not None:
proxy_url = os.environ["HTTP_PROXY"]
log.info(f"Using HTTP proxy: {proxy_url}")
return ProxyManager(proxy_url, **kwargs)
else:
return PoolManager(**kwargs)
pool = _connection_manager_builder()
[docs]
class Response:
def __init__(self, response: urllib3.response.BaseHTTPResponse):
self._response = response
@property
def status_code(self):
return self._response.status
@property
def text(self):
return self._response.data.decode()
@property
def headers(self):
return self._response.headers
[docs]
def json(self):
return json.loads(self._response.data)
@property
def bytes(self):
return self._response.data
@property
def url(self):
return self._response.geturl()
@property
def ok(self):
return self.status_code in (200, 304)
def __getattr__(self, item):
return getattr(self._response, item)
def __enter__(self):
return self
def __exit__(self, *exc):
return False
@cache
def _auth(hostname: str) -> Dict[str, str]:
"""
Authenticates a user for a specified hostname by retrieving credentials from
the user's `.netrc` file if it exists. Utilizes caching for performance.
Parameters:
hostname: The hostname for which credentials need to be fetched.
Returns:
Dict[str, str]: A dictionary containing headers for basic authentication
if the credentials are available in the `.netrc` file. If no credentials
are found or the file is not present, an empty dictionary is returned.
"""
try:
netrc_file = netrc.netrc()
auth = netrc_file.authenticators(hostname)
if auth:
username, _, password = auth
return urllib3.make_headers(basic_auth=f'{username}:{password}')
except FileNotFoundError:
pass
return {}
def _build_headers(url: str, headers: Dict = None) -> Dict[str, str]:
"""
Construct HTTP headers for a given URL, including a default User-Agent and
authorization headers.
Parameters:
url : str
The URL for which the headers are being constructed.
headers : Dict, optional
Existing headers to include in the request. Defaults to an empty dictionary
if not provided.
Returns:
Dict[str, str]
A dictionary containing the constructed HTTP headers.
"""
headers = headers or {}
headers['User-Agent'] = USER_AGENT
headers.update(auth_header(url))
return headers
class _HttpVerb:
def __init__(self, verb):
# cf. https://findwork.dev/blog/advanced-usage-python-requests-timeouts-retries-hooks/
retry_strategy = Retry(
total=DEFAULT_RETRY_COUNT,
backoff_factor=1,
status_forcelist=STATUS_FORCE_LIST,
allowed_methods=[verb],
respect_retry_after_header=True
)
# self._adapter = TimeoutHTTPAdapter(max_retries=retry_strategy, timeout=DEFAULT_TIMEOUT)
# self._http = requests.Session()
# self._http.mount("https://", self._adapter)
# self._http.mount("http://", self._adapter)
self._verb = partial(pool.request, method=verb, retries=retry_strategy)
@ApplyRewriteRules(is_method=True)
def __call__(self, url, headers: dict = None, params: dict = None, timeout: int = DEFAULT_TIMEOUT,
**kwargs) -> Response:
# self._adapter.timeout = timeout
return Response(
self._verb(url=url, headers=_build_headers(url=url, headers=headers), fields=params, timeout=timeout,
**kwargs))
get = _HttpVerb("GET")
post = _HttpVerb("POST")
head = _HttpVerb("HEAD")
[docs]
@ApplyRewriteRules()
def urlopen(url, timeout: int = DEFAULT_TIMEOUT, headers: dict = None) -> Response:
return Response(
pool.urlopen(method="GET", url=url, headers=_build_headers(url=url, headers=headers), timeout=timeout))
def _wasm_is_server_up(*args, **kwargs) -> bool:
log.debug(
"server availability check implementation can't be performed in WASM environment, assuming server is up")
return True
[docs]
@ApplyRewriteRules()
def is_server_up(url: Optional[str] = None, host: Optional[str] = None, port: Optional[int] = None, timeout: int = 5,
retries=5) -> bool:
"""Checks if a server is up and running. If url is provided, host and port are ignored.
Parameters
----------
url : Optional[str]
url to check (scheme://host[:port]), if provided host and port are ignored
host : Optional[str]
host to check, if provided port must be provided as well
port : Optional[int]
port to check, if provided host must be provided as well
timeout : int
timeout in seconds
retries : int
number of retries
Returns
-------
bool
True if server is up and running, False otherwise
Raises
------
ValueError
If neither url nor host and port are provided
"""
if url is not None:
host, port = host_and_port(url)
if is_running_on_wasm():
return _wasm_is_server_up(f"{host}:{port}", timeout=timeout, retries=retries)
elif host is None or port is None:
raise ValueError("Either url or host and port must be provided")
import socks
for _ in range(retries):
try:
sock = socks.socksocket()
if os.environ.get("HTTP_PROXY", None) is not None:
proxy_host, proxy_port = host_and_port(os.environ["HTTP_PROXY"])
sock.set_proxy(socks.HTTP, proxy_host, proxy_port)
sock.settimeout(timeout)
sock.connect((host, int(port)))
sock.close()
return True
except (socks.ProxyError, OSError):
log.debug(f"Server {host}:{port} not up yet, retrying in 1 second")
time.sleep(1.)
return False