diff --git a/lisainstrument/fixed_shift_dsp.py b/lisainstrument/fixed_shift_dsp.py new file mode 100644 index 0000000000000000000000000000000000000000..3d341ccbbe018affbf8b2ea0c624d8d42e8e5ea2 --- /dev/null +++ b/lisainstrument/fixed_shift_dsp.py @@ -0,0 +1,141 @@ +"""Functions for applying fixed real-valued shifts to dask arrays using dsp.timeshift + +There are two implementations of fixed-shift Lagrange interpolation: + +1. fixed_shift_dsp.make_fixed_shift_dsp wraps legacy implementation dsp.timeshift + into new interface +2. fixed_shift_numpy.make_fixed_shift_lagrange provides a completely new implementation + of Lagrange interpolation + +Use make_fixed_shift_dsp_dask to create an interpolator based on dsp.timeshift for dask arrays. +""" + +from __future__ import annotations + +from typing import Final + +import numpy as np + +from lisainstrument import dsp +from lisainstrument.fixed_shift_dask import FixedShiftDask +from lisainstrument.dynamic_delay_numpy import DynShiftBC + +from lisainstrument.fir_filters_numpy import NumpyArray1D, make_numpy_array_1d + +from lisainstrument.fixed_shift_numpy import ( + make_numpy_array_1d_float, + FixedShiftCore, + FixedShiftFactory, +) + + +class FixedShiftWrapDSP(FixedShiftCore): + r"""Class implementing fixed shift of regularly spaced 1D data using dsp.timeshift(). + + This class is an adaptor making dsp.timeshift function available through a + FixedShiftCore interface. + + See FixedShiftCore for general properties not specific to the interpolation method. + """ + + def __init__(self, length: int, shift: float): + """Set up interpolation parameters. + + The length parameter specifies the number of interpolation polynomials, + which have order length - 1. The length is also the number of samples used + for each interpolation point. The order of the interpolating polynomials + is also the order of plynomials that are interpolated with zero error. + + The shift is given as a delay, i.e. positive values refer to locations + to the left of the output sample at hand. + + Arguments: + length: Size of the interpolation stencil + shift: The constant shift + """ + + if length <= 1: + msg = f"FixedShiftWrapDSP: stencil length must be 2 or more, got {length}" + raise ValueError(msg) + + if length % 2 != 0: + msg = "FixedShiftWrapDSP: even Lagrange polynomial order not supported by dsp.timeshift" + raise ValueError(msg) + + loc = -float(shift) + loc_int = int(np.floor(loc)) + # ~ loc_frac = loc - loc_int + + margin_lagr_left = (length - 1) // 2 + margin_lagr_right = length - 1 - margin_lagr_left + + self._margin_left: Final = max(0, margin_lagr_left - loc_int) + self._margin_right: Final = max(0, margin_lagr_right + loc_int) + self._shift: Final = shift + self._order: Final = length - 1 + + @property + def margin_left(self) -> int: + """Margin size (>= 0) needed on the left boundary""" + return self._margin_left + + @property + def margin_right(self) -> int: + """Margin size (>= 0) needed on the right boundary""" + return self._margin_right + + @property + def shift(self) -> float: + """The shift""" + return self._shift + + def apply(self, samples: np.ndarray) -> NumpyArray1D: + """Apply shift, see FixedShiftCore.apply() + + Arguments: + samples: 1D numpy array with sampled data + + Returns: + Shifted input excluding margins + """ + a = make_numpy_array_1d_float(samples) + res = dsp.timeshift(a, -self.shift, self._order) + + return make_numpy_array_1d(res) + + @staticmethod + def factory(length: int) -> FixedShiftFactory: + """Factory for making FixedShiftWrapDSP instances + + Arguments: + length: number of Lagrange polynomials to use + + Returns: + Factory function for making FixedShiftWrapDSP instances from shift + """ + + def factory(shift: float) -> FixedShiftCore: + """FixedShiftWrapDSP instances from shift using preselected order + Arguments: + shift: The fixed shift + """ + return FixedShiftWrapDSP(length, shift) + + return factory + + +def make_fixed_shift_dsp_dask( + left_bound: DynShiftBC, right_bound: DynShiftBC, length: int +) -> FixedShiftDask: + """Create a FixedShiftDask instance that uses dsp.timeshift interpolator + + Arguments: + left_bound: boundary treatment on the left + right_bound: boundary treatment on the right + length: number of Lagrange plolynomials (=order + 1) + + Returns: + Fixed shift interpolator + """ + fac = FixedShiftWrapDSP.factory(length) + return FixedShiftDask(left_bound, right_bound, fac)