diff --git a/lisainstrument/fixed_shift_dask.py b/lisainstrument/fixed_shift_dask.py new file mode 100644 index 0000000000000000000000000000000000000000..90edae7a95a4aae6e388f9a63c3485f28af9e1b5 --- /dev/null +++ b/lisainstrument/fixed_shift_dask.py @@ -0,0 +1,161 @@ +"""Functions for applying dynamic real-valued shifts to dask arrays using Lagrange interpolation + +Use make_dynamic_shift_lagrange_dask to create a Lagrange interpolator for dask arrays. +""" + +from __future__ import annotations + +from typing import Final + +import dask +import dask.array as da +import numpy as np +from typing_extensions import assert_never + +from lisainstrument.dynamic_delay_numpy import DynShiftBC +from lisainstrument.fir_filters_dask import DaskArray1D, make_dask_array_1d +from lisainstrument.fixed_shift_numpy import ( + FixedShiftFactory, + make_fixed_shift_factory_lagrange, +) + + +class FixedShiftDask: # pylint: disable=too-few-public-methods + """Interpolate Dask arrays to locations given by a const shift. + + This allows to interpolate samples in a Dask array to locations specified + by a fixed shift. The shift is specified in units of the array index, i.e. + there is no separate coordinate array. A positive shift refers to values + left of a given sample, negative shifts to values on the right. + + The boundary treatment can be specified for each boundary in terms of + DynShiftBC enums. + + The interpolation method is not fixed but provided via an interpolator + instance implementing the RegularInterpMethod protocol. + + """ + + def __init__( + self, + left_bound: DynShiftBC, + right_bound: DynShiftBC, + interp_fac: FixedShiftFactory, + ): + """Not intended for direct use, employ named constructors instead + + Arguments: + left_bound: boundary treatment on the left + right_bound: boundary treatment on the right + interp_fac: Function to create interpolator engine for given shift + """ + self._left_bound: Final = left_bound + self._right_bound: Final = right_bound + self._interp_fac: Final = interp_fac + + def _padding_left(self, samples: da.Array, margin_left: int) -> da.Array: + if margin_left > 0: + match self._left_bound: + case DynShiftBC.ZEROPAD: + samples = da.concatenate([da.zeros(margin_left), samples]) + case DynShiftBC.FLAT: + samples = da.concatenate( + [da.ones(margin_left) * samples[0], samples] + ) + case DynShiftBC.EXCEPTION: + msg = ( + f"FixedShiftDask: left edge handling {self._left_bound.name} not " + f"possible for given delay, need margin of {margin_left}." + ) + raise RuntimeError(msg) + case _ as unreachable: + assert_never(unreachable) + elif margin_left < 0: + samples = samples[-margin_left:] + return samples + + def _padding_right(self, samples: da.Array, margin_right: int) -> da.Array: + if margin_right > 0: + match self._right_bound: + case DynShiftBC.ZEROPAD: + samples = da.concatenate([samples, da.zeros(margin_right)]) + case DynShiftBC.FLAT: + samples = da.concatenate( + [samples, da.ones(margin_right) * samples[-1]] + ) + case DynShiftBC.EXCEPTION: + msg = ( + f"FixedShiftNumpy: right edge handling {self._right_bound.name} not " + f"possible for given delay, need margin of {margin_right}." + ) + raise RuntimeError(msg) + case _ as unreachable: + assert_never(unreachable) + elif margin_right < 0: + samples = samples[:margin_right] + + return samples + + def __call__(self, samples: da.Array, shift: float) -> DaskArray1D: + r"""Apply shift $s$ to samples in Dask array. + + Denoting the input data as $y_i$ with $i=0 \ldots N-1$, and the interpolated + input data as $y(t)$, such that $y(i)=y_i$, the output $z_k$ is given by + $z_k = y(k-s), k=0 \ ldots N - 1$. + + Required data outside the provided samples is created if the specified + boundary condition allows it, or an exception is raised if the BC disallows it. + The output has same length and chunking as the input. + + + Arguments: + samples: 1D Dask array with data samples + shift: The shift $s$ + + Returns: + Dask array with interpolated samples + """ + + loc = -shift + loc_int = int(np.floor(loc)) + loc_frac = loc - loc_int + + interp = self._interp_fac(-loc_frac) + + margin_left = interp.margin_left - loc_int + margin_right = interp.margin_right + loc_int + + samples_checked = make_dask_array_1d(samples) + samples_padleft = self._padding_left(samples_checked, margin_left) + samples_padded = self._padding_right(samples_padleft, margin_right) + + delayed_op = dask.delayed(interp.apply) + results = [] + pos = 0 + for chunk_shape in samples.chunks[0]: + n_size = interp.margin_left + chunk_shape + interp.margin_right + samples_shifted = delayed_op(samples_padded[pos : pos + n_size]) + delayed_chunk = da.from_delayed( + samples_shifted, (chunk_shape,), samples.dtype + ) + results.append(delayed_chunk) + pos += chunk_shape + + return make_dask_array_1d(da.concatenate(results, axis=0)) + + +def make_fixed_shift_lagrange_dask( + left_bound: DynShiftBC, right_bound: DynShiftBC, length: int +) -> FixedShiftDask: + """Create a FixedShiftDask instance that uses Lagrange interpolator + + Arguments: + left_bound: boundary treatment on the left + right_bound: boundary treatment on the right + length: number of Lagrange plolynomials (=order + 1) + + Returns: + Fixed shift interpolator + """ + fac = make_fixed_shift_factory_lagrange(length) + return FixedShiftDask(left_bound, right_bound, fac) diff --git a/lisainstrument/fixed_shift_numpy.py b/lisainstrument/fixed_shift_numpy.py new file mode 100644 index 0000000000000000000000000000000000000000..538497b91e7bc696160d8b0a2d6814278a8a0d28 --- /dev/null +++ b/lisainstrument/fixed_shift_numpy.py @@ -0,0 +1,367 @@ +"""Functions for interpolating numpy arrays with 1D regularly spaced data + +This provides a generic interface RegularInterpolator as well as two interpolation +methods, linear and Lagrange. The latter is written from scratch, see module +regular_interpolator_dsp for another one based on the dsp.timeshift Lagrange interpolator. +""" + +from __future__ import annotations + +from typing import Callable, Final, Protocol, TypeAlias + +import numpy as np +from typing_extensions import assert_never + +from lisainstrument.dynamic_delay_numpy import DynShiftBC +from lisainstrument.fir_filters_numpy import ( + DefFilterFIR, + EdgeHandling, + NumpyArray1D, + make_filter_fir_numpy, + make_numpy_array_1d, +) + + +def make_numpy_array_1d_float(a: np.ndarray) -> NumpyArray1D: + """Ensure numpy array is 1D and floating point type""" + a1 = make_numpy_array_1d(a) + if not np.issubdtype(a.dtype, np.floating): + msg = f"Expected numpy array with floating type, got {a.dtype}" + raise TypeError(msg) + return a1 + + +class FixedShiftCore(Protocol): + """Protocol for applying fixed shift to regularly spaced samples + + This defines an interface for applying a fixed shift to regularly spaced + samples in 1D, provided as numpy arrays. + + Boundary treatment is not part of this protocol. Implementations only compute + locations that can be interpolated to without using any form of boundary + conditions. The corresponding margin sizes required by the interpolation + method are exposed as properties. + + Arbitrary shifts are valid, but the main use case are shifts of magnitude around 1. + For large shifts, the total margin size required will increase, in which case + it might be more efficient to handle the integer shift before and then apply + the remaining fractional shift. + """ + + @property + def margin_left(self) -> int: + """Margin size (>= 0) needed on the left boundary""" + + @property + def margin_right(self) -> int: + """Margin size (>= 0) needed on the right boundary""" + + @property + def shift(self) -> float: + """The shift""" + + def apply(self, samples: np.ndarray) -> NumpyArray1D: + r"""Apply the fractional shift to a 1D numpy array. + + The output is the shifted input with left and right margins removed. + The shift $s$ should be bounded $-1 < s < 1$ + Denoting the input data as $y_i$ with $i=0 \ldots N-1$, and the interpolated + input data as $y(t)$, such that $y(i)=y_i$, the output $z_k$ is given by + $z_k = y(k+M_L-s), k=0 \ ldots N - 1 - M_L - M_R$, where $M_L$ and $M_R$ + are the left and right margin sizes. + + Arguments: + samples: 1D numpy array with sampled data + start: integer part of + size: number of points to return + + Returns: + Interpolated samples + """ + + +def make_fir_lagrange_fixed(length: int, frac_shift: float) -> DefFilterFIR: + r"""Create FIR filter corresponding to non-integer shift using Lagrange interpolation + + This creates a FIR filter with coefficients given by an Lagrangian + interpolation polynomial evaluated at a fixed location $s$. + + We work completely in index space of the input sequence, i.e. the + coordinate of a given index is the index. The location $s$ also refers + to index space. + + Thus, we specialize Lagrange interpolation to the case where + the sample locations are + $t_j = j + D$, whith $j=0 \ldots L-1$, and $L$ being the number of + points to use. The integer offset $D$ determines which points to use, + and should be chosen such that the center $D + L/2$ lies near $s$. + In other words, the input sequence indices $D \ldots D+L-1$ are used + to interpolate to (fractional) index $s$. + + So far we described interpolation to one location. The FIR filter + is defined by simple translation, such that the element with index 0 + in the output sequence corresponds to the input sequence interpolated + to fractional index $s$, and any element $a$ to $s+a$. + + $$ + \begin{align*} + y_a &= \sum_{j=0}^{L-1} K_j x_{a+j+D} \\ + K_j &= \prod_\substack{m=0\\m \neq j}^{L-1} \frac{s-D-m}{j-m} + \end{align*} + $$ + + The offset is chosen to center the stencil around a shift + $s=0$ for odd length and s=$1/2$ for even length. The shift should + not exceed the bounds $-1/2 < s < 1/2$ for odd length or $0 < s < 1$ + for even length significantly, to avoid large overshoots that inherently + occur off-center for high order lagrange interpolation. + + Arguments: + length: Number of FIR coefficients (=Lagrange order + 1) + frac_shift: The shift $s$ + + Returns: + FIR filter definition + """ + + if length <= 1: + msg = f"make_fir_lagrange_fixed: stencil length must be 2 or more, got {length}" + raise ValueError(msg) + + offset = -((length - 1) // 2) + r = np.arange(length) + m2, j2 = np.meshgrid(r, r) + msk = m2 != j2 + s = np.array(frac_shift) + x = s - offset + p = np.ones_like(m2, dtype=np.float64) + p[msk] = (x - m2[msk]) / (j2 - m2)[msk] + kj = np.prod(p, axis=1) + kj /= np.sum(kj) # only to reduce rounding errors + + return DefFilterFIR(filter_coeffs=kj, offset=offset) + + +class FixedShiftLagrange(FixedShiftCore): + r"""Class implementing fixed shift of regularly spaced 1D data using Lagrange interpolation. + + The algorithm uses Lagrange interpolation, using a FIR filter based on lagrange polynomials + evaluated at fixed fractional shift. This uses a stencil with center as close to the + location as possible. For odd length, the center point is obtained by rounding the + location, and that the remaining fractional shift is within $[-1/2,1/2]$. For even + locations, the center points is the floor of the location, with remaining fractional + shift within $[0,1)$ + + See FixedShiftCore for general properties not specific to the interpolation method. + """ + + def __init__(self, length: int, shift: float): + """Set up interpolation parameters. + + The length parameter specifies the number of interpolation polynomials, + which have order length - 1. The length is also the number of samples used + for each interpolation point. The order of the interpolating polynomials + is also the order of plynomials that are interpolated with zero error. + + The shift is given as a delay, i.e. positive values refer to locations + to the left of the output sample at hand. + + Arguments: + length: Size of the interpolation stencil + shift: The constant shift + """ + + loc = -float(shift) + if length % 2 == 0: + loc_int = int(np.floor(loc)) + else: + loc_int = int(np.round(loc)) + loc_frac = loc - loc_int + + firdef = make_fir_lagrange_fixed(length, loc_frac) + + self._margin_left = max(0, -firdef.domain_of_dependence[0] - loc_int) + self._margin_right = max(0, firdef.domain_of_dependence[1] + loc_int) + self._filt = make_filter_fir_numpy( + firdef, EdgeHandling.VALID, EdgeHandling.VALID + ) + self._shift = shift + + @property + def margin_left(self) -> int: + """Margin size (>= 0) needed on the left boundary""" + return self._margin_left + + @property + def margin_right(self) -> int: + """Margin size (>= 0) needed on the right boundary""" + return self._margin_right + + @property + def shift(self) -> float: + """The shift""" + return self._shift + + def apply(self, samples: np.ndarray) -> NumpyArray1D: + """Apply shift, see FixedShiftCore.apply() + + Arguments: + samples: 1D numpy array with sampled data + + Returns: + Shifted input excluding margins + """ + a = make_numpy_array_1d_float(samples) + return make_numpy_array_1d(self._filt(a)) + + +FixedShiftFactory: TypeAlias = Callable[[float], FixedShiftCore] + + +def make_fixed_shift_factory_lagrange(length: int) -> FixedShiftFactory: + """Factory for making FixedShiftLagrange instances + + Arguments: + length: number of Lagrange polynomials to use + + Returns: + Factory function for making FixedShiftLagrange instances from shift + """ + + def factory(shift: float) -> FixedShiftCore: + """FixedShiftLagrange instances from shift using preselected order + Arguments: + shift: The fixed shift + """ + return FixedShiftLagrange(length, shift) + + return factory + + +class FixedShiftNumpy: # pylint: disable=too-few-public-methods + """Interpolate to locations given by a const shift. + + This allows to interpolate samples in a numpy array to locations specified + by a fixed shift. The shift is specified in units of the array index, i.e. + there is no separate coordinate array. A positive shift refers to values + left of a given sample, negative shifts to values on the right. + + The boundary treatment can be specified for each boundary in terms of + DynShiftBC enums. + + The interpolation method is not fixed but provided via an interpolator + instance implementing the FixedShiftCore protocol. + + """ + + def __init__( + self, + left_bound: DynShiftBC, + right_bound: DynShiftBC, + interp_fac: FixedShiftFactory, + ): + """Not intended for direct use, employ named constructors instead + + Arguments: + left_bound: boundary treatment on the left + right_bound: boundary treatment on the right + interp_fac: Function to create interpolator engine for given shift + """ + self._left_bound: Final = left_bound + self._right_bound: Final = right_bound + self._interp_fac: Final = interp_fac + + def _padding_left(self, samples: np.ndarray, margin_left: int) -> np.ndarray: + if margin_left > 0: + match self._left_bound: + case DynShiftBC.ZEROPAD: + samples = np.concatenate([np.zeros(margin_left), samples]) + case DynShiftBC.FLAT: + samples = np.concatenate( + [np.ones(margin_left) * samples[0], samples] + ) + case DynShiftBC.EXCEPTION: + msg = ( + f"FixedShiftNumpy: left edge handling {self._left_bound.name} not " + f"possible for given delay, need margin of {margin_left}." + ) + raise RuntimeError(msg) + case _ as unreachable: + assert_never(unreachable) + elif margin_left < 0: + samples = samples[-margin_left:] + return samples + + def _padding_right(self, samples: np.ndarray, margin_right: int) -> np.ndarray: + if margin_right > 0: + match self._right_bound: + case DynShiftBC.ZEROPAD: + samples = np.concatenate([samples, np.zeros(margin_right)]) + case DynShiftBC.FLAT: + samples = np.concatenate( + [samples, np.ones(margin_right) * samples[-1]] + ) + case DynShiftBC.EXCEPTION: + msg = ( + f"FixedShiftNumpy: right edge handling {self._right_bound.name} not " + f"possible for given delay, need margin of {margin_right}." + ) + raise RuntimeError(msg) + case _ as unreachable: + assert_never(unreachable) + elif margin_right < 0: + samples = samples[:margin_right] + + return samples + + def __call__(self, samples: np.ndarray, shift: float) -> NumpyArray1D: + r"""Apply shift $s$ to samples in numpy array. + + Denoting the input data as $y_i$ with $i=0 \ldots N-1$, and the interpolated + input data as $y(t)$, such that $y(i)=y_i$, the output $z_k$ is given by + $z_k = y(k-s), k=0 \ ldots N - 1$. + + Required data outside the provided samples is created if the specified + boundary condition allows it, or an exception is raised if the BC disallows it. + The output has same length as the input. + + + Arguments: + samples: 1D numpy array with data samples + shift: The shift $s$ + + Returns: + Numpy array with interpolated samples + """ + + loc = -shift + loc_int = int(np.floor(loc)) + loc_frac = loc - loc_int + + interp = self._interp_fac(-loc_frac) + + margin_left = interp.margin_left - loc_int + margin_right = interp.margin_right + loc_int + + samples_checked = make_numpy_array_1d(samples) + samples_padleft = self._padding_left(samples_checked, margin_left) + samples_padded = self._padding_right(samples_padleft, margin_right) + + return interp.apply(samples_padded) + + +def make_fixed_shift_lagrange_numpy( + left_bound: DynShiftBC, right_bound: DynShiftBC, length: int +) -> FixedShiftNumpy: + """Create a FixedShiftNumpy instance that uses Lagrange interpolator + + Arguments: + left_bound: boundary treatment on the left + right_bound: boundary treatment on the right + length: number of Lagrange plolynomials (=order + 1) + + Returns: + Fixed shift interpolator + """ + fac = make_fixed_shift_factory_lagrange(length) + return FixedShiftNumpy(left_bound, right_bound, fac)