diff --git a/lisainstrument/dynamic_delay_dask.py b/lisainstrument/dynamic_delay_dask.py new file mode 100644 index 0000000000000000000000000000000000000000..727b011009fe73fa45394c3fa09191a026b3d45b --- /dev/null +++ b/lisainstrument/dynamic_delay_dask.py @@ -0,0 +1,670 @@ +"""Functions for applying dynamic real-valued shifts to dask arrays using Lagrange interpolation + +Use make_dynamic_shift_lagrange_dask to create a Lagrange interpolator for dask arrays. +""" + +from __future__ import annotations + +import functools +import operator +from enum import Enum +from typing import Callable, Protocol + +import dask +import dask.array as da +import numpy as np +from numpy.polynomial import Polynomial + +from lisainstrument.fir_filters_dask import DaskArray1D, make_dask_array_1d +from lisainstrument.fir_filters_numpy import ( + DefFilterFIR, + EdgeHandling, + FilterFirNumpyType, + NumpyArray1D, + make_filter_fir_numpy, + make_numpy_array_1d, +) + + +class RegularInterpMethod(Protocol): + """ + Class instances implementing this protocol act as a function accepting a 1D array + of sample values and locations to interpolate the samples to. The x-coordinate coincides + with the sample index at integer locations. Locations are passed as a 1D array of floating + point values and an integer array with additional integer offsets. The reason is + to avoid loss of accuracy by adding largert integers to smaller offsets. + The floating point part of the location is not restricted, however (except by the + range of provided data). The integer part of the locations is split off and combined + with the integer offsets, the residual is denoted here as fractional locations. + + Boundary treatment is not part of this protocol but is the responability of higher level + interfaces that are nont specific to the interpolation method. Here, sample locations have + to be restricted to the range that can be computed without boundary conditions. + The corresponding margins sizes are provided as properties. + """ + + @property + def margin_left(self) -> int: + """Margin size (>= 0) on the left boundary + + The interpolator cannot be called with locations within this margin from the leftmost sample. + """ + + @property + def margin_right(self) -> int: + """Margin size (>= 0) on the right boundary + + The interpolator cannot be called with locations within this margin from the rightmost sample. + """ + + def __call__( + self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 + ) -> NumpyArray1D: + """Interpolate regularly spaced data to location in index-space + + The locations to interpolate to are the sum of the locations and int_offsets arguments. + The location argument is an 1D array with arbitrary floating point locations, and the + int_offset argument is an integer or integer array with additional integer offsets. The locations + argument is not restricted, i.e. it is not necessarily the residual fractional locations but + can have arbitrary values. + + The locations refer to the index of the array with the sampled data, starting at 0. + The length of samples array does not have to match the size of the location arrays. + If int_offsets is an array, it needs to have same size arrays location and. All + arrays need to be onedimensional. + + + Arguments: + samples: 1D numpy array with sampled data + locations: real-valued 1D numpy array with locations to interpolate to + int_offsets: integer or integer 1D array with additional offsets to the locations. + + Returns: + Interpolated samples. + """ + + def check_arguments( + self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int + ) -> None: + """Ensure arguments have correct types""" + if isinstance(int_offsets, np.ndarray): + if int_offsets.shape != locations.shape: + msg = ( + f"RegularInterpLinear: inconsistent arrays sizes of " + f"locations ({locations.shape}) and offsets ({int_offsets.shape})" + ) + raise ValueError(msg) + if not np.issubdtype(int_offsets.dtype, np.integer): + msg = ( + "RegularInterpLinear: non-integer dtype for int_offsets not allowed" + ) + raise TypeError(msg) + elif not isinstance(int_offsets, int): + msg = "RegularInterpLinear: int_offsets must be int or int array" + raise TypeError(msg) + + if not np.issubdtype(locations.dtype, np.floating): + msg = "RegularInterpLinear: non-float dtype for locations not allowed" + raise TypeError(msg) + + +def make_lagrange_polynomials(length: int, offset: int) -> list[Polynomial]: + r"""Construct lagrange interpolating polynomials + + This constructs Lagrange interpolation polynomials with given order, + specialized to regularly spaced coordinates with spacing of one, with a + center specified in terms of an integer offset. + + This produces $N$ polynomials $p_j(x)$ of order $N-1$, which satisfy + $p_j(k) = 1$ if $k = j+D$ and $p_j(k) = 0$ for integers $k=0 \ldots N-1, k \ne j$ + + Arguments: + length: The number $N$ of polynomials of order $N-1$ + offset: The offset $D$ + + Returns: + List of polynomials $p_j$ given as numpy Polynomial objects + """ + + def k_j(j: int) -> Polynomial: + x = Polynomial([0.0, 1.0]) + ms = [i for i in range(length) if i != j] + pm = [(x - offset - m) / (j - m) for m in ms] + return functools.reduce(operator.mul, pm) + + return [k_j(j) for j in range(length)] + + +class RegularInterpLagrange(RegularInterpMethod): + r"""Class implementing interpolation of regularly spaced 1D data using Lagrange polynomials. + + The algorithm uses Lagrange interpolation is specialized to regularly spaced data. + The coefficients of the Lagrange polynomials are computed in advance, and converted + to a set of FIR filters. The FIR filters will be applied to the samples and the result + at the integer locations multiplied with the corresponding power of the fractional + locations. + + For each interpolation location, this uses a stencil with center as close to the + location as possible. For odd length, the center point is obtained by rounding the + location, and that the remaining fractional shift is within $[-1/2,1/2]$. For even + locations, the center points is the floor of the location, with remaining fractional + shift within $[0,1)$ + + See RegularInterpMethod for general properties not specific to the interpolation method. + """ + + @staticmethod + def _make_firs(length: int, offset: int) -> list[FilterFirNumpyType]: + """Set up lagrange polynomials and convert coefficients to FIR filters""" + plag = make_lagrange_polynomials(length, offset) + coeffs = np.array([p.convert().coef for p in plag]).T + filts: list[FilterFirNumpyType] = [] + for c in coeffs: + fdef = DefFilterFIR(filter_coeffs=c, offset=offset) + filt = make_filter_fir_numpy(fdef, EdgeHandling.VALID, EdgeHandling.VALID) + filts.append(filt) + return filts + + def __init__(self, length: int): + """Set up interpolation parameters. + + The order of the interpolation is chosen in terms of the stencil + size (number of samples used for each interpolation point). The size + is one plus the order of the interpolator, meaning the order of a polynomial + that is still interpolated with zero errror. + + Arguments: + length: Size of the lagrange stencil + """ + + if length <= 1: + msg = ( + f"RegularInterpLagrange: stencil length must be 2 or more, got {length}" + ) + raise ValueError(msg) + + offset = -((length - 1) // 2) + self._length: int = length + self._offset: int = offset + self._fir_filt = self._make_firs(length, offset) + + @property + def margin_left(self) -> int: + """Margin size (>= 0) on the left boundary + + The interpolator cannot be called with locations within this margin from the leftmost sample. + """ + return -self._offset + + @property + def margin_right(self) -> int: + """Margin size (>= 0) on the right boundary + + The interpolator cannot be called with locations within this margin from the rightmost sample. + """ + return self._length - 1 + self._offset + + def __call__( + self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 + ) -> NumpyArray1D: + """Interpolate regularly spaced data to location in index-space + + See RegularInterpMethod.__call__ + + Arguments: + samples: 1D numpy array with sampled data + locations: real-valued 1D numpy array with locations to interpolate to + int_offsets: integer or integer 1D array with additional offsets to the locations. + + Returns: + Interpolated samples. + """ + self.check_arguments(samples, locations, int_offsets) + + if self._length % 2 == 0: + loc_int = np.floor(locations).astype(int) + else: + loc_int = np.round(locations).astype(int) + loc_frac = locations - loc_int + k = loc_int + int_offsets - self.margin_left + + if np.any(k < 0): + msg = "RegularInterpLagrange: interpolation requires samples below provided range" + raise RuntimeError(msg) + if np.any(k >= samples.shape[0] - self._length + 1): + msg = "RegularInterpLagrange: interpolation requires samples above provided range" + raise RuntimeError(msg) + + result = np.zeros(locations.shape[0], dtype=samples.dtype) + xpow = np.ones_like(loc_frac) + for fir in self._fir_filt: + result[:] += fir(samples)[k] * xpow + xpow[:] *= loc_frac + + return make_numpy_array_1d(result) + + +class RegularInterpLinear(RegularInterpMethod): + """Class implementing interpolation of regularly spaced 1D data using linear interpolation. + + See RegularInterpMethod for general properties not specific to the interpolation method. + """ + + @property + def margin_left(self) -> int: + """Margin size (= 0) on the left boundary + + The linear interpolator can be called for all locations within the sample range. + """ + return 0 + + @property + def margin_right(self) -> int: + """Margin size (= 0) on the right boundary + + The linear interpolator can be called for all locations within the sample range. + """ + return 0 + + def __call__( + self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 + ) -> NumpyArray1D: + """Interpolate regularly spaced data to location in index-space + + See RegularInterpMethod.__call__ + + Arguments: + samples: 1D numpy array with sampled data + locations: real-valued 1D numpy array with locations to interpolate to + int_offsets: integer or integer 1D array with additional offsets to the locations. + + Returns: + Interpolated samples. + """ + self.check_arguments(samples, locations, int_offsets) + + loc_floor = np.floor(locations) + loc_frac = locations - loc_floor + + k = loc_floor.astype(int) + int_offsets + + if np.any(k < 0) or np.any(k + 1 >= samples.shape[0]): + msg = "RegularInterpLinear: interpolation requires samples out of provided range" + raise RuntimeError(msg) + + return samples[k] * (1.0 - loc_frac) + samples[k + 1] * loc_frac + + +class DynShiftBC(Enum): + """Enum for various methods of handling boundaries in dynamic shift + + ZEROPAD: Assume all data outside given range is zero. + FLAT: Assume all data outside given range is equal to value on nearest boundary + EXCEPTION: raise exception if data outside range is needed + """ + + EXCEPTION = 1 + ZEROPAD = 2 + FLAT = 3 + + +class DynamicShiftDask: + """Interpolate to locations given by a non-const shift. + + This allows to interpolate samples in a dask array to locations specified + by a shift given by another dask array of same size. The shift is specified in + units of the array index, i.e. there is no separate coordinate array. + A negative shift refers to values left of a given sample, positive shifts + to values on the right. + + The boundary treatment can be specified for each boundary in terms of + DynShiftBC enums. + + The interpolation method is not fixed but provided via an interpolator + instance implementing the RegularInterpMethod protocol. + + For technical reasons, the shift values have to be within some bound that + has to be provided. + """ + + def __init__( + self, + min_delay: float, + max_delay: float, + left_bound: EdgeHandling, + right_bound: EdgeHandling, + interp: RegularInterpMethod, + ): + """Set up interpolation parameters + + Arguments: + min_delay: Assume that any shift < -min_delay + max_delay: Assume that any shift > -max_delay + left_bound: Treatment of left boundary + right_bound: Treatment of right boundary + interp: implementation of an interpolation method + """ + + if min_delay > max_delay: + msg = f"dynamic_shift_dask: invalid delay range {min_delay=}, {max_delay=}" + raise ValueError(msg) + + self._min_delay = int(np.floor(min_delay)) + self._max_delay = int(np.ceil(max_delay)) + + self._left_bound = left_bound + self._right_bound = right_bound + self._interp_np = interp + + def _op_interp( + self, samp: np.ndarray, loc: np.ndarray, ofs: np.ndarray + ) -> np.ndarray: + """helper method ensuring correct array dimensionalities""" + return self._interp_np( + make_numpy_array_1d(samp), + make_numpy_array_1d(loc), + make_numpy_array_1d(ofs), + ) + + @property + def margin_left(self) -> int: + """Left margin size. + + If positive, specifies how many samples on the left have to be added by boundary conditions. + If negative, specifies how many samples on the left are unused. + """ + return self._interp_np.margin_left + self._max_delay + + @property + def margin_right(self) -> int: + """Right margin size. + + If positive, specifies how many samples on the right have to be added by boundary conditions. + If negative, specifies how many samples on the right are unused. + """ + return self._interp_np.margin_right - self._min_delay + + def __call__(self, samples: da.Array, shift: da.Array) -> DaskArray1D: + """Apply shift given by dask array to samples in another dask array. + + The shift and sample arrays need to have the same size, and each shift provides + the interpolation location relative to the sample with the same index. + Shifts are floating point values.A shift of +1 refers to the sample on the right, + -1 the sample on the left, etc. All arrays have to be 1D. + + Arguments: + samples: 1D dask array with data samples + shift: 1D dask array with shifts + + Returns: + Dask array with interpolated samples + """ + npad_left = 0 + if self.margin_left > 0: + match self._left_bound: + case DynShiftBC.ZEROPAD: + npad_left = self.margin_left + samples = da.concatenate([da.zeros(npad_left), samples]) + case DynShiftBC.FLAT: + npad_left = self.margin_left + samples = da.concatenate([da.ones(npad_left) * samples[0], samples]) + case _: + msg = ( + f"DynamicShiftDask: left edge handling {self._left_bound.name} not " + f"possible for given max delay {self._max_delay}." + ) + raise RuntimeError(msg) + + if self.margin_right > 0: + match self._right_bound: + case DynShiftBC.ZEROPAD: + samples = da.concatenate([samples, da.zeros(self.margin_right)]) + case DynShiftBC.FLAT: + samples = da.concatenate( + [samples, da.ones(self.margin_right) * samples[-1]] + ) + case _: + msg = ( + f"DynamicShiftDask: right edge handling {self._right_bound.name} not " + f"possible for given min delay {self._min_delay=}." + ) + raise RuntimeError(msg) + + chunks = shift.to_delayed() + delayed_op = dask.delayed(self._op_interp) + results = [] + pos = 0 + for chunk, chunk_shape in zip(chunks, shift.chunks[0], strict=True): + n_size = npad_left + chunk_shape + self.margin_right + n_first = pos + npad_left - self.margin_left + samples_needed = samples[n_first : n_first + n_size] + + loc = -chunk + offsets = self.margin_left + da.arange(chunk_shape, chunks=chunk_shape) + + samples_shifted = delayed_op(samples_needed, loc, offsets) + delayed_chunk = da.from_delayed( + samples_shifted, (chunk_shape,), samples.dtype + ) + results.append(delayed_chunk) + pos += chunk_shape + return make_dask_array_1d(da.concatenate(results, axis=0)) + + +def make_dynamic_shift_linear_dask( + min_delay: float, + max_delay: float, + left_bound: EdgeHandling, + right_bound: EdgeHandling, +) -> DynamicShiftDask: + """Set up DynamicShiftDask instance with linear interpolation method. + + Arguments: + min_delay: Assume that any shift < -min_delay + max_delay: Assume that any shift > -max_delay + left_bound: Treatment of left boundary + right_bound: Treatment of right boundary + + Returns: + Interpolation function of type DynamicShiftDask + """ + interp = RegularInterpLinear() + return DynamicShiftDask(min_delay, max_delay, left_bound, right_bound, interp) + + +def make_dynamic_shift_lagrange_dask( + length: int, + min_delay: float, + max_delay: float, + left_bound: EdgeHandling, + right_bound: EdgeHandling, +) -> DynamicShiftDask: + """Set up DynamicShiftDask instance with Lagrange interpolation method. + + Arguments: + length: Number of lagrange polynomials (of order length - 1) + min_delay: Assume that any shift < -min_delay + max_delay: Assume that any shift > -max_delay + left_bound: Treatment of left boundary + right_bound: Treatment of right boundary + + Returns: + Interpolation function of type DynamicShiftDask + """ + + interp = RegularInterpLagrange(length) + return DynamicShiftDask(min_delay, max_delay, left_bound, right_bound, interp) + + +def numpyfy_dask_bivariate( + dafunc: Callable[[da.Array, da.Array], DaskArray1D], chunks: int +) -> Callable[[np.ndarray, np.ndarray], NumpyArray1D]: + """Convert function operating on dask arrays to work with numpy arrays. + + The dask function will be called with two chunked dask arrays created from + the numpy arrays, and then evaluate the result. + + Arguments: + dafunc: function operating on two dask arrays returning single 1D dask array + chunks: chunk size to be used internally + Returns: + function operating on numpy arrays + """ + + def func(x, y): + dx = da.from_array(x, chunks=chunks) + dy = da.from_array(y, chunks=chunks) + r = dafunc(make_dask_array_1d(dx), make_dask_array_1d(dy)).compute() + return make_numpy_array_1d(r) + + return func + + +class DynamicShiftNumpy: + """Interpolate to locations given by a non-const shift. + + This allows to interpolate samples in a numpy array to locations specified + by a shift given by another numpy array of same size. The shift is specified in + units of the array index, i.e. there is no separate coordinate array. + A negative shift refers to values left of a given sample, positive shifts + to values on the right. + + The boundary treatment can be specified for each boundary in terms of + DynShiftBC enums. + + The interpolation method is not fixed but provided via an interpolator + instance implementing the RegularInterpMethod protocol. + + For technical reasons, the shift values have to be within some bound that + has to be provided. + """ + + def __init__( + self, + min_delay: float, + max_delay: float, + left_bound: EdgeHandling, + right_bound: EdgeHandling, + interp: RegularInterpMethod, + ): + + if min_delay > max_delay: + msg = f"dynamic_shift_dask: invalid delay range {min_delay=}, {max_delay=}" + raise ValueError(msg) + + self._min_delay = int(np.floor(min_delay)) + self._max_delay = int(np.ceil(max_delay)) + + self._left_bound = left_bound + self._right_bound = right_bound + self._interp_np = interp + + def _op_interp( + self, samp: np.ndarray, loc: np.ndarray, ofs: np.ndarray + ) -> np.ndarray: + """helper method ensuring correct array dimensionalities""" + return self._interp_np( + make_numpy_array_1d(samp), + make_numpy_array_1d(loc), + make_numpy_array_1d(ofs), + ) + + @property + def margin_left(self) -> int: + """Left margin size. + + If positive, specifies how many samples on the left have to be added by boundary conditions. + If negative, specifies how many samples on the left are unused. + """ + return self._interp_np.margin_left + self._max_delay + + @property + def margin_right(self) -> int: + """Right margin size. + + If positive, specifies how many samples on the right have to be added by boundary conditions. + If negative, specifies how many samples on the right are unused. + """ + return self._interp_np.margin_right - self._min_delay + + def __call__(self, samples: np.ndarray, shift: np.ndarray) -> NumpyArray1D: + """Apply shift given by numpy array to samples in another numpy array. + + The shift and sample arrays need to have the same size, and each shift provides + the interpolation location relative to the sample with the same index. + Shifts are floating point values.A shift of +1 refers to the sample on the right, + -1 the sample on the left, etc. All arrays have to be 1D. + + Arguments: + samples: 1D numpy array with data samples + shift: 1D numpy array with shifts + + Returns: + Numpy array with interpolated samples + """ + npad_left = 0 + if self.margin_left > 0: + match self._left_bound: + case DynShiftBC.ZEROPAD: + npad_left = self.margin_left + samples = np.concatenate([np.zeros(npad_left), samples]) + case DynShiftBC.FLAT: + npad_left = self.margin_left + samples = np.concatenate([np.ones(npad_left) * samples[0], samples]) + case _: + msg = ( + f"DynamicShiftNumpy: left edge handling {self._left_bound.name} not " + f"possible for given max delay {self._max_delay}." + ) + raise RuntimeError(msg) + + if self.margin_right > 0: + match self._right_bound: + case DynShiftBC.ZEROPAD: + samples = np.concatenate([samples, np.zeros(self.margin_right)]) + case DynShiftBC.FLAT: + samples = np.concatenate( + [samples, np.ones(self.margin_right) * samples[-1]] + ) + case _: + msg = ( + f"DynamicShiftNumpy: right edge handling {self._right_bound.name} not " + f"possible for given min delay {self._min_delay=}." + ) + raise RuntimeError(msg) + + pos = 0 + shift_shape = len(shift) + n_size = npad_left + shift_shape + self.margin_right + n_first = pos + npad_left - self.margin_left + samples_needed = samples[n_first : n_first + n_size] + + loc = -shift + offsets = self.margin_left + np.arange(shift_shape) + + samples_shifted = self._op_interp(samples_needed, loc, offsets) + + return make_numpy_array_1d(samples_shifted) + + +def make_dynamic_shift_lagrange_numpy( + length: int, + min_delay: float, + max_delay: float, + left_bound: EdgeHandling, + right_bound: EdgeHandling, +) -> DynamicShiftNumpy: + """Set up DynamicShiftNumpy instance with Lagrange interpolation method. + + Arguments: + length: Number of lagrange polynomials (of order length - 1) + min_delay: Assume that any shift < -min_delay + max_delay: Assume that any shift > -max_delay + left_bound: Treatment of left boundary + right_bound: Treatment of right boundary + + Returns: + Interpolation function of type DynamicShiftNumpy + """ + interp = RegularInterpLagrange(length) + return DynamicShiftNumpy(min_delay, max_delay, left_bound, right_bound, interp)