Source code for speasy.signal.resampling

import numpy
from typing import Callable, Optional, Union, Collection
from speasy.products import SpeasyVariable
import numpy as np
from speasy.core import AnyDateTimeType, make_utc_datetime64, datetime64_to_epoch


def _dt_to_ns(dt: float) -> np.timedelta64:
    return np.timedelta64(int(dt * 1e9), 'ns')


[docs] def generate_time_vector(start: AnyDateTimeType, stop: AnyDateTimeType, dt: Union[float, numpy.timedelta64]) -> np.ndarray: """Generate a time vector given a start, stop and time step. The time vector will be generated in UTC time zone. The time step is in seconds. Parameters ---------- start: float or datetime or np.datetime64 The start time stop: float or datetime or np.datetime64 The stop time dt: float or np.timedelta64 The time step in seconds or as a numpy timedelta64 Returns ------- np.ndarray The time vector as a numpy array of datetime64[ns] """ if type(dt) in (float, int): dt = _dt_to_ns(dt) return np.arange(make_utc_datetime64(start), make_utc_datetime64(stop), dt, dtype='datetime64[ns]')
class _NumpyInterpolator: def __init__(self, x, y): self.x = x self.y = y def __call__(self, new_x): return np.interp(new_x, self.x, self.y) def _interpolate(ref_time: np.ndarray, var: SpeasyVariable, interpolate_callback: Optional[Callable] = None, *args, **kwargs) -> SpeasyVariable: res = SpeasyVariable.reserve_like(var, length=len(ref_time)) res.time[:] = ref_time var_epoch = datetime64_to_epoch(var.time) res_epoch = datetime64_to_epoch(res.time) for col in range(var.values.shape[1]): if interpolate_callback is None: interpolator = _NumpyInterpolator(var_epoch, var.values[:, col]) else: interpolator = interpolate_callback(var_epoch, var.values[:, col], *args, **kwargs) res.values[:, col] = interpolator(res_epoch) return res
[docs] def resample(var: Union[SpeasyVariable, Collection[SpeasyVariable]], new_dt: Union[float, np.timedelta64], interpolate_callback: Optional[Callable] = None, *args, **kwargs) -> Union[SpeasyVariable, Collection[SpeasyVariable]]: """Resample a variable(s) to a new time step. The time vector will be generated from the start and stop times of the input variable. Uses :func:`numpy.interp` to do the resampling by default. Parameters ---------- var: SpeasyVariable or Collection[SpeasyVariable] The variable(s) to resample new_dt: float or np.timedelta64 The new time step in seconds or as a numpy timedelta64 interpolate_callback: Callable or None The interpolation function to use, defaults to :func:`numpy.interp` Returns ------- SpeasyVariable or Collection[SpeasyVariable] The resampled variable(s) with all metadata preserved except for the new time axis Notes ----- It only supports 1D variables. """ if type(var) in (list, tuple): return [resample(v, new_dt, interpolate_callback, *args, **kwargs) for v in var] else: time = generate_time_vector(var.time[0], var.time[-1] + np.timedelta64(1, 'ns'), new_dt) return _interpolate(time, var, interpolate_callback, *args, **kwargs)
[docs] def interpolate(ref: Union[np.ndarray, SpeasyVariable], var: Union[SpeasyVariable, Collection[SpeasyVariable]], interpolate_callback: Optional[Callable] = None, *args, **kwargs) -> Union[SpeasyVariable, Collection[SpeasyVariable]]: """Interpolate a variable(s) to a new time vector. The time vector will be taken from the reference variable. Uses :func:`numpy.interp` to do the resampling by default. Parameters ---------- ref: np.ndarray or SpeasyVariable The reference time vector var: SpeasyVariable or Collection[SpeasyVariable] The variable(s) to interpolate interpolate_callback: Callable or None The interpolation function to use, defaults to :func:`numpy.interp` (Optional) Returns ------- SpeasyVariable or Collection[SpeasyVariable] The interpolated variable(s) with all metadata preserved except for the new time axis Notes ----- It only supports 1D variables. """ if isinstance(ref, SpeasyVariable): ref_time = ref.time elif isinstance(ref, np.ndarray) and ref.dtype == np.dtype('datetime64[ns]'): ref_time = ref else: raise ValueError("Invalid reference time vector, must be a numpy array of datetime64[ns] or a SpeasyVariable.") if type(var) in (list, tuple): return [_interpolate(ref_time, v, interpolate_callback, *args, **kwargs) for v in var] return _interpolate(ref_time, var, interpolate_callback, *args, **kwargs)