From 258157079926e7e5cdb9007ea76ea81499c76fe8 Mon Sep 17 00:00:00 2001 From: Wolfgang Kastaun <wolfgang.kastaun@aei.mpg.de> Date: Sat, 14 Dec 2024 12:51:14 +0100 Subject: [PATCH] split dynamic_delay_dask module, move numpy stuff into own module --- lisainstrument/dynamic_delay_dask.py | 447 +------------------------ lisainstrument/dynamic_delay_numpy.py | 455 ++++++++++++++++++++++++++ tests/test_dynamic_delay_dask.py | 47 +-- 3 files changed, 487 insertions(+), 462 deletions(-) create mode 100644 lisainstrument/dynamic_delay_numpy.py diff --git a/lisainstrument/dynamic_delay_dask.py b/lisainstrument/dynamic_delay_dask.py index 727b011..dd460e3 100644 --- a/lisainstrument/dynamic_delay_dask.py +++ b/lisainstrument/dynamic_delay_dask.py @@ -5,309 +5,26 @@ Use make_dynamic_shift_lagrange_dask to create a Lagrange interpolator for dask from __future__ import annotations -import functools -import operator -from enum import Enum -from typing import Callable, Protocol +from typing import Callable import dask import dask.array as da import numpy as np -from numpy.polynomial import Polynomial +from lisainstrument.dynamic_delay_numpy import ( + DynShiftBC, + RegularInterpLagrange, + RegularInterpLinear, + RegularInterpMethod, +) from lisainstrument.fir_filters_dask import DaskArray1D, make_dask_array_1d from lisainstrument.fir_filters_numpy import ( - DefFilterFIR, EdgeHandling, - FilterFirNumpyType, NumpyArray1D, - make_filter_fir_numpy, make_numpy_array_1d, ) -class RegularInterpMethod(Protocol): - """ - Class instances implementing this protocol act as a function accepting a 1D array - of sample values and locations to interpolate the samples to. The x-coordinate coincides - with the sample index at integer locations. Locations are passed as a 1D array of floating - point values and an integer array with additional integer offsets. The reason is - to avoid loss of accuracy by adding largert integers to smaller offsets. - The floating point part of the location is not restricted, however (except by the - range of provided data). The integer part of the locations is split off and combined - with the integer offsets, the residual is denoted here as fractional locations. - - Boundary treatment is not part of this protocol but is the responability of higher level - interfaces that are nont specific to the interpolation method. Here, sample locations have - to be restricted to the range that can be computed without boundary conditions. - The corresponding margins sizes are provided as properties. - """ - - @property - def margin_left(self) -> int: - """Margin size (>= 0) on the left boundary - - The interpolator cannot be called with locations within this margin from the leftmost sample. - """ - - @property - def margin_right(self) -> int: - """Margin size (>= 0) on the right boundary - - The interpolator cannot be called with locations within this margin from the rightmost sample. - """ - - def __call__( - self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 - ) -> NumpyArray1D: - """Interpolate regularly spaced data to location in index-space - - The locations to interpolate to are the sum of the locations and int_offsets arguments. - The location argument is an 1D array with arbitrary floating point locations, and the - int_offset argument is an integer or integer array with additional integer offsets. The locations - argument is not restricted, i.e. it is not necessarily the residual fractional locations but - can have arbitrary values. - - The locations refer to the index of the array with the sampled data, starting at 0. - The length of samples array does not have to match the size of the location arrays. - If int_offsets is an array, it needs to have same size arrays location and. All - arrays need to be onedimensional. - - - Arguments: - samples: 1D numpy array with sampled data - locations: real-valued 1D numpy array with locations to interpolate to - int_offsets: integer or integer 1D array with additional offsets to the locations. - - Returns: - Interpolated samples. - """ - - def check_arguments( - self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int - ) -> None: - """Ensure arguments have correct types""" - if isinstance(int_offsets, np.ndarray): - if int_offsets.shape != locations.shape: - msg = ( - f"RegularInterpLinear: inconsistent arrays sizes of " - f"locations ({locations.shape}) and offsets ({int_offsets.shape})" - ) - raise ValueError(msg) - if not np.issubdtype(int_offsets.dtype, np.integer): - msg = ( - "RegularInterpLinear: non-integer dtype for int_offsets not allowed" - ) - raise TypeError(msg) - elif not isinstance(int_offsets, int): - msg = "RegularInterpLinear: int_offsets must be int or int array" - raise TypeError(msg) - - if not np.issubdtype(locations.dtype, np.floating): - msg = "RegularInterpLinear: non-float dtype for locations not allowed" - raise TypeError(msg) - - -def make_lagrange_polynomials(length: int, offset: int) -> list[Polynomial]: - r"""Construct lagrange interpolating polynomials - - This constructs Lagrange interpolation polynomials with given order, - specialized to regularly spaced coordinates with spacing of one, with a - center specified in terms of an integer offset. - - This produces $N$ polynomials $p_j(x)$ of order $N-1$, which satisfy - $p_j(k) = 1$ if $k = j+D$ and $p_j(k) = 0$ for integers $k=0 \ldots N-1, k \ne j$ - - Arguments: - length: The number $N$ of polynomials of order $N-1$ - offset: The offset $D$ - - Returns: - List of polynomials $p_j$ given as numpy Polynomial objects - """ - - def k_j(j: int) -> Polynomial: - x = Polynomial([0.0, 1.0]) - ms = [i for i in range(length) if i != j] - pm = [(x - offset - m) / (j - m) for m in ms] - return functools.reduce(operator.mul, pm) - - return [k_j(j) for j in range(length)] - - -class RegularInterpLagrange(RegularInterpMethod): - r"""Class implementing interpolation of regularly spaced 1D data using Lagrange polynomials. - - The algorithm uses Lagrange interpolation is specialized to regularly spaced data. - The coefficients of the Lagrange polynomials are computed in advance, and converted - to a set of FIR filters. The FIR filters will be applied to the samples and the result - at the integer locations multiplied with the corresponding power of the fractional - locations. - - For each interpolation location, this uses a stencil with center as close to the - location as possible. For odd length, the center point is obtained by rounding the - location, and that the remaining fractional shift is within $[-1/2,1/2]$. For even - locations, the center points is the floor of the location, with remaining fractional - shift within $[0,1)$ - - See RegularInterpMethod for general properties not specific to the interpolation method. - """ - - @staticmethod - def _make_firs(length: int, offset: int) -> list[FilterFirNumpyType]: - """Set up lagrange polynomials and convert coefficients to FIR filters""" - plag = make_lagrange_polynomials(length, offset) - coeffs = np.array([p.convert().coef for p in plag]).T - filts: list[FilterFirNumpyType] = [] - for c in coeffs: - fdef = DefFilterFIR(filter_coeffs=c, offset=offset) - filt = make_filter_fir_numpy(fdef, EdgeHandling.VALID, EdgeHandling.VALID) - filts.append(filt) - return filts - - def __init__(self, length: int): - """Set up interpolation parameters. - - The order of the interpolation is chosen in terms of the stencil - size (number of samples used for each interpolation point). The size - is one plus the order of the interpolator, meaning the order of a polynomial - that is still interpolated with zero errror. - - Arguments: - length: Size of the lagrange stencil - """ - - if length <= 1: - msg = ( - f"RegularInterpLagrange: stencil length must be 2 or more, got {length}" - ) - raise ValueError(msg) - - offset = -((length - 1) // 2) - self._length: int = length - self._offset: int = offset - self._fir_filt = self._make_firs(length, offset) - - @property - def margin_left(self) -> int: - """Margin size (>= 0) on the left boundary - - The interpolator cannot be called with locations within this margin from the leftmost sample. - """ - return -self._offset - - @property - def margin_right(self) -> int: - """Margin size (>= 0) on the right boundary - - The interpolator cannot be called with locations within this margin from the rightmost sample. - """ - return self._length - 1 + self._offset - - def __call__( - self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 - ) -> NumpyArray1D: - """Interpolate regularly spaced data to location in index-space - - See RegularInterpMethod.__call__ - - Arguments: - samples: 1D numpy array with sampled data - locations: real-valued 1D numpy array with locations to interpolate to - int_offsets: integer or integer 1D array with additional offsets to the locations. - - Returns: - Interpolated samples. - """ - self.check_arguments(samples, locations, int_offsets) - - if self._length % 2 == 0: - loc_int = np.floor(locations).astype(int) - else: - loc_int = np.round(locations).astype(int) - loc_frac = locations - loc_int - k = loc_int + int_offsets - self.margin_left - - if np.any(k < 0): - msg = "RegularInterpLagrange: interpolation requires samples below provided range" - raise RuntimeError(msg) - if np.any(k >= samples.shape[0] - self._length + 1): - msg = "RegularInterpLagrange: interpolation requires samples above provided range" - raise RuntimeError(msg) - - result = np.zeros(locations.shape[0], dtype=samples.dtype) - xpow = np.ones_like(loc_frac) - for fir in self._fir_filt: - result[:] += fir(samples)[k] * xpow - xpow[:] *= loc_frac - - return make_numpy_array_1d(result) - - -class RegularInterpLinear(RegularInterpMethod): - """Class implementing interpolation of regularly spaced 1D data using linear interpolation. - - See RegularInterpMethod for general properties not specific to the interpolation method. - """ - - @property - def margin_left(self) -> int: - """Margin size (= 0) on the left boundary - - The linear interpolator can be called for all locations within the sample range. - """ - return 0 - - @property - def margin_right(self) -> int: - """Margin size (= 0) on the right boundary - - The linear interpolator can be called for all locations within the sample range. - """ - return 0 - - def __call__( - self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 - ) -> NumpyArray1D: - """Interpolate regularly spaced data to location in index-space - - See RegularInterpMethod.__call__ - - Arguments: - samples: 1D numpy array with sampled data - locations: real-valued 1D numpy array with locations to interpolate to - int_offsets: integer or integer 1D array with additional offsets to the locations. - - Returns: - Interpolated samples. - """ - self.check_arguments(samples, locations, int_offsets) - - loc_floor = np.floor(locations) - loc_frac = locations - loc_floor - - k = loc_floor.astype(int) + int_offsets - - if np.any(k < 0) or np.any(k + 1 >= samples.shape[0]): - msg = "RegularInterpLinear: interpolation requires samples out of provided range" - raise RuntimeError(msg) - - return samples[k] * (1.0 - loc_frac) + samples[k + 1] * loc_frac - - -class DynShiftBC(Enum): - """Enum for various methods of handling boundaries in dynamic shift - - ZEROPAD: Assume all data outside given range is zero. - FLAT: Assume all data outside given range is equal to value on nearest boundary - EXCEPTION: raise exception if data outside range is needed - """ - - EXCEPTION = 1 - ZEROPAD = 2 - FLAT = 3 - - class DynamicShiftDask: """Interpolate to locations given by a non-const shift. @@ -518,153 +235,3 @@ def numpyfy_dask_bivariate( return make_numpy_array_1d(r) return func - - -class DynamicShiftNumpy: - """Interpolate to locations given by a non-const shift. - - This allows to interpolate samples in a numpy array to locations specified - by a shift given by another numpy array of same size. The shift is specified in - units of the array index, i.e. there is no separate coordinate array. - A negative shift refers to values left of a given sample, positive shifts - to values on the right. - - The boundary treatment can be specified for each boundary in terms of - DynShiftBC enums. - - The interpolation method is not fixed but provided via an interpolator - instance implementing the RegularInterpMethod protocol. - - For technical reasons, the shift values have to be within some bound that - has to be provided. - """ - - def __init__( - self, - min_delay: float, - max_delay: float, - left_bound: EdgeHandling, - right_bound: EdgeHandling, - interp: RegularInterpMethod, - ): - - if min_delay > max_delay: - msg = f"dynamic_shift_dask: invalid delay range {min_delay=}, {max_delay=}" - raise ValueError(msg) - - self._min_delay = int(np.floor(min_delay)) - self._max_delay = int(np.ceil(max_delay)) - - self._left_bound = left_bound - self._right_bound = right_bound - self._interp_np = interp - - def _op_interp( - self, samp: np.ndarray, loc: np.ndarray, ofs: np.ndarray - ) -> np.ndarray: - """helper method ensuring correct array dimensionalities""" - return self._interp_np( - make_numpy_array_1d(samp), - make_numpy_array_1d(loc), - make_numpy_array_1d(ofs), - ) - - @property - def margin_left(self) -> int: - """Left margin size. - - If positive, specifies how many samples on the left have to be added by boundary conditions. - If negative, specifies how many samples on the left are unused. - """ - return self._interp_np.margin_left + self._max_delay - - @property - def margin_right(self) -> int: - """Right margin size. - - If positive, specifies how many samples on the right have to be added by boundary conditions. - If negative, specifies how many samples on the right are unused. - """ - return self._interp_np.margin_right - self._min_delay - - def __call__(self, samples: np.ndarray, shift: np.ndarray) -> NumpyArray1D: - """Apply shift given by numpy array to samples in another numpy array. - - The shift and sample arrays need to have the same size, and each shift provides - the interpolation location relative to the sample with the same index. - Shifts are floating point values.A shift of +1 refers to the sample on the right, - -1 the sample on the left, etc. All arrays have to be 1D. - - Arguments: - samples: 1D numpy array with data samples - shift: 1D numpy array with shifts - - Returns: - Numpy array with interpolated samples - """ - npad_left = 0 - if self.margin_left > 0: - match self._left_bound: - case DynShiftBC.ZEROPAD: - npad_left = self.margin_left - samples = np.concatenate([np.zeros(npad_left), samples]) - case DynShiftBC.FLAT: - npad_left = self.margin_left - samples = np.concatenate([np.ones(npad_left) * samples[0], samples]) - case _: - msg = ( - f"DynamicShiftNumpy: left edge handling {self._left_bound.name} not " - f"possible for given max delay {self._max_delay}." - ) - raise RuntimeError(msg) - - if self.margin_right > 0: - match self._right_bound: - case DynShiftBC.ZEROPAD: - samples = np.concatenate([samples, np.zeros(self.margin_right)]) - case DynShiftBC.FLAT: - samples = np.concatenate( - [samples, np.ones(self.margin_right) * samples[-1]] - ) - case _: - msg = ( - f"DynamicShiftNumpy: right edge handling {self._right_bound.name} not " - f"possible for given min delay {self._min_delay=}." - ) - raise RuntimeError(msg) - - pos = 0 - shift_shape = len(shift) - n_size = npad_left + shift_shape + self.margin_right - n_first = pos + npad_left - self.margin_left - samples_needed = samples[n_first : n_first + n_size] - - loc = -shift - offsets = self.margin_left + np.arange(shift_shape) - - samples_shifted = self._op_interp(samples_needed, loc, offsets) - - return make_numpy_array_1d(samples_shifted) - - -def make_dynamic_shift_lagrange_numpy( - length: int, - min_delay: float, - max_delay: float, - left_bound: EdgeHandling, - right_bound: EdgeHandling, -) -> DynamicShiftNumpy: - """Set up DynamicShiftNumpy instance with Lagrange interpolation method. - - Arguments: - length: Number of lagrange polynomials (of order length - 1) - min_delay: Assume that any shift < -min_delay - max_delay: Assume that any shift > -max_delay - left_bound: Treatment of left boundary - right_bound: Treatment of right boundary - - Returns: - Interpolation function of type DynamicShiftNumpy - """ - interp = RegularInterpLagrange(length) - return DynamicShiftNumpy(min_delay, max_delay, left_bound, right_bound, interp) diff --git a/lisainstrument/dynamic_delay_numpy.py b/lisainstrument/dynamic_delay_numpy.py new file mode 100644 index 0000000..365ef5f --- /dev/null +++ b/lisainstrument/dynamic_delay_numpy.py @@ -0,0 +1,455 @@ +"""Functions for applying dynamic real-valued shifts to numpy arrays using Lagrange interpolation + +Use make_dynamic_shift_lagrange_numpy to create a Lagrange interpolator for numpy arrays. +""" + +from __future__ import annotations + +import functools +import operator +from enum import Enum +from typing import Callable, Protocol + +import numpy as np +from numpy.polynomial import Polynomial + +from lisainstrument.fir_filters_numpy import ( + DefFilterFIR, + EdgeHandling, + FilterFirNumpyType, + NumpyArray1D, + make_filter_fir_numpy, + make_numpy_array_1d, +) + + +class RegularInterpMethod(Protocol): + """ + Class instances implementing this protocol act as a function accepting a 1D array + of sample values and locations to interpolate the samples to. The x-coordinate coincides + with the sample index at integer locations. Locations are passed as a 1D array of floating + point values and an integer array with additional integer offsets. The reason is + to avoid loss of accuracy by adding largert integers to smaller offsets. + The floating point part of the location is not restricted, however (except by the + range of provided data). The integer part of the locations is split off and combined + with the integer offsets, the residual is denoted here as fractional locations. + + Boundary treatment is not part of this protocol but is the responability of higher level + interfaces that are nont specific to the interpolation method. Here, sample locations have + to be restricted to the range that can be computed without boundary conditions. + The corresponding margins sizes are provided as properties. + """ + + @property + def margin_left(self) -> int: + """Margin size (>= 0) on the left boundary + + The interpolator cannot be called with locations within this margin from the leftmost sample. + """ + + @property + def margin_right(self) -> int: + """Margin size (>= 0) on the right boundary + + The interpolator cannot be called with locations within this margin from the rightmost sample. + """ + + def __call__( + self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 + ) -> NumpyArray1D: + """Interpolate regularly spaced data to location in index-space + + The locations to interpolate to are the sum of the locations and int_offsets arguments. + The location argument is an 1D array with arbitrary floating point locations, and the + int_offset argument is an integer or integer array with additional integer offsets. The locations + argument is not restricted, i.e. it is not necessarily the residual fractional locations but + can have arbitrary values. + + The locations refer to the index of the array with the sampled data, starting at 0. + The length of samples array does not have to match the size of the location arrays. + If int_offsets is an array, it needs to have same size arrays location and. All + arrays need to be onedimensional. + + + Arguments: + samples: 1D numpy array with sampled data + locations: real-valued 1D numpy array with locations to interpolate to + int_offsets: integer or integer 1D array with additional offsets to the locations. + + Returns: + Interpolated samples. + """ + + def check_arguments( + self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int + ) -> None: + """Ensure arguments have correct types""" + if isinstance(int_offsets, np.ndarray): + if int_offsets.shape != locations.shape: + msg = ( + f"RegularInterpLinear: inconsistent arrays sizes of " + f"locations ({locations.shape}) and offsets ({int_offsets.shape})" + ) + raise ValueError(msg) + if not np.issubdtype(int_offsets.dtype, np.integer): + msg = ( + "RegularInterpLinear: non-integer dtype for int_offsets not allowed" + ) + raise TypeError(msg) + elif not isinstance(int_offsets, int): + msg = "RegularInterpLinear: int_offsets must be int or int array" + raise TypeError(msg) + + if not np.issubdtype(locations.dtype, np.floating): + msg = "RegularInterpLinear: non-float dtype for locations not allowed" + raise TypeError(msg) + + +def make_lagrange_polynomials(length: int, offset: int) -> list[Polynomial]: + r"""Construct lagrange interpolating polynomials + + This constructs Lagrange interpolation polynomials with given order, + specialized to regularly spaced coordinates with spacing of one, with a + center specified in terms of an integer offset. + + This produces $N$ polynomials $p_j(x)$ of order $N-1$, which satisfy + $p_j(k) = 1$ if $k = j+D$ and $p_j(k) = 0$ for integers $k=0 \ldots N-1, k \ne j$ + + Arguments: + length: The number $N$ of polynomials of order $N-1$ + offset: The offset $D$ + + Returns: + List of polynomials $p_j$ given as numpy Polynomial objects + """ + + def k_j(j: int) -> Polynomial: + x = Polynomial([0.0, 1.0]) + ms = [i for i in range(length) if i != j] + pm = [(x - offset - m) / (j - m) for m in ms] + return functools.reduce(operator.mul, pm) + + return [k_j(j) for j in range(length)] + + +class RegularInterpLagrange(RegularInterpMethod): + r"""Class implementing interpolation of regularly spaced 1D data using Lagrange polynomials. + + The algorithm uses Lagrange interpolation is specialized to regularly spaced data. + The coefficients of the Lagrange polynomials are computed in advance, and converted + to a set of FIR filters. The FIR filters will be applied to the samples and the result + at the integer locations multiplied with the corresponding power of the fractional + locations. + + For each interpolation location, this uses a stencil with center as close to the + location as possible. For odd length, the center point is obtained by rounding the + location, and that the remaining fractional shift is within $[-1/2,1/2]$. For even + locations, the center points is the floor of the location, with remaining fractional + shift within $[0,1)$ + + See RegularInterpMethod for general properties not specific to the interpolation method. + """ + + @staticmethod + def _make_firs(length: int, offset: int) -> list[FilterFirNumpyType]: + """Set up lagrange polynomials and convert coefficients to FIR filters""" + plag = make_lagrange_polynomials(length, offset) + coeffs = np.array([p.convert().coef for p in plag]).T + filts: list[FilterFirNumpyType] = [] + for c in coeffs: + fdef = DefFilterFIR(filter_coeffs=c, offset=offset) + filt = make_filter_fir_numpy(fdef, EdgeHandling.VALID, EdgeHandling.VALID) + filts.append(filt) + return filts + + def __init__(self, length: int): + """Set up interpolation parameters. + + The order of the interpolation is chosen in terms of the stencil + size (number of samples used for each interpolation point). The size + is one plus the order of the interpolator, meaning the order of a polynomial + that is still interpolated with zero errror. + + Arguments: + length: Size of the lagrange stencil + """ + + if length <= 1: + msg = ( + f"RegularInterpLagrange: stencil length must be 2 or more, got {length}" + ) + raise ValueError(msg) + + offset = -((length - 1) // 2) + self._length: int = length + self._offset: int = offset + self._fir_filt = self._make_firs(length, offset) + + @property + def margin_left(self) -> int: + """Margin size (>= 0) on the left boundary + + The interpolator cannot be called with locations within this margin from the leftmost sample. + """ + return -self._offset + + @property + def margin_right(self) -> int: + """Margin size (>= 0) on the right boundary + + The interpolator cannot be called with locations within this margin from the rightmost sample. + """ + return self._length - 1 + self._offset + + def __call__( + self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 + ) -> NumpyArray1D: + """Interpolate regularly spaced data to location in index-space + + See RegularInterpMethod.__call__ + + Arguments: + samples: 1D numpy array with sampled data + locations: real-valued 1D numpy array with locations to interpolate to + int_offsets: integer or integer 1D array with additional offsets to the locations. + + Returns: + Interpolated samples. + """ + self.check_arguments(samples, locations, int_offsets) + + if self._length % 2 == 0: + loc_int = np.floor(locations).astype(int) + else: + loc_int = np.round(locations).astype(int) + loc_frac = locations - loc_int + k = loc_int + int_offsets - self.margin_left + + if np.any(k < 0): + msg = "RegularInterpLagrange: interpolation requires samples below provided range" + raise RuntimeError(msg) + if np.any(k >= samples.shape[0] - self._length + 1): + msg = "RegularInterpLagrange: interpolation requires samples above provided range" + raise RuntimeError(msg) + + result = np.zeros(locations.shape[0], dtype=samples.dtype) + xpow = np.ones_like(loc_frac) + for fir in self._fir_filt: + result[:] += fir(samples)[k] * xpow + xpow[:] *= loc_frac + + return make_numpy_array_1d(result) + + +class RegularInterpLinear(RegularInterpMethod): + """Class implementing interpolation of regularly spaced 1D data using linear interpolation. + + See RegularInterpMethod for general properties not specific to the interpolation method. + """ + + @property + def margin_left(self) -> int: + """Margin size (= 0) on the left boundary + + The linear interpolator can be called for all locations within the sample range. + """ + return 0 + + @property + def margin_right(self) -> int: + """Margin size (= 0) on the right boundary + + The linear interpolator can be called for all locations within the sample range. + """ + return 0 + + def __call__( + self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 + ) -> NumpyArray1D: + """Interpolate regularly spaced data to location in index-space + + See RegularInterpMethod.__call__ + + Arguments: + samples: 1D numpy array with sampled data + locations: real-valued 1D numpy array with locations to interpolate to + int_offsets: integer or integer 1D array with additional offsets to the locations. + + Returns: + Interpolated samples. + """ + self.check_arguments(samples, locations, int_offsets) + + loc_floor = np.floor(locations) + loc_frac = locations - loc_floor + + k = loc_floor.astype(int) + int_offsets + + if np.any(k < 0) or np.any(k + 1 >= samples.shape[0]): + msg = "RegularInterpLinear: interpolation requires samples out of provided range" + raise RuntimeError(msg) + + return samples[k] * (1.0 - loc_frac) + samples[k + 1] * loc_frac + + +class DynShiftBC(Enum): + """Enum for various methods of handling boundaries in dynamic shift + + ZEROPAD: Assume all data outside given range is zero. + FLAT: Assume all data outside given range is equal to value on nearest boundary + EXCEPTION: raise exception if data outside range is needed + """ + + EXCEPTION = 1 + ZEROPAD = 2 + FLAT = 3 + + +class DynamicShiftNumpy: + """Interpolate to locations given by a non-const shift. + + This allows to interpolate samples in a numpy array to locations specified + by a shift given by another numpy array of same size. The shift is specified in + units of the array index, i.e. there is no separate coordinate array. + A negative shift refers to values left of a given sample, positive shifts + to values on the right. + + The boundary treatment can be specified for each boundary in terms of + DynShiftBC enums. + + The interpolation method is not fixed but provided via an interpolator + instance implementing the RegularInterpMethod protocol. + + For technical reasons, the shift values have to be within some bound that + has to be provided. + """ + + def __init__( + self, + min_delay: float, + max_delay: float, + left_bound: EdgeHandling, + right_bound: EdgeHandling, + interp: RegularInterpMethod, + ): + + if min_delay > max_delay: + msg = f"dynamic_shift_dask: invalid delay range {min_delay=}, {max_delay=}" + raise ValueError(msg) + + self._min_delay = int(np.floor(min_delay)) + self._max_delay = int(np.ceil(max_delay)) + + self._left_bound = left_bound + self._right_bound = right_bound + self._interp_np = interp + + def _op_interp( + self, samp: np.ndarray, loc: np.ndarray, ofs: np.ndarray + ) -> np.ndarray: + """helper method ensuring correct array dimensionalities""" + return self._interp_np( + make_numpy_array_1d(samp), + make_numpy_array_1d(loc), + make_numpy_array_1d(ofs), + ) + + @property + def margin_left(self) -> int: + """Left margin size. + + If positive, specifies how many samples on the left have to be added by boundary conditions. + If negative, specifies how many samples on the left are unused. + """ + return self._interp_np.margin_left + self._max_delay + + @property + def margin_right(self) -> int: + """Right margin size. + + If positive, specifies how many samples on the right have to be added by boundary conditions. + If negative, specifies how many samples on the right are unused. + """ + return self._interp_np.margin_right - self._min_delay + + def __call__(self, samples: np.ndarray, shift: np.ndarray) -> NumpyArray1D: + """Apply shift given by numpy array to samples in another numpy array. + + The shift and sample arrays need to have the same size, and each shift provides + the interpolation location relative to the sample with the same index. + Shifts are floating point values.A shift of +1 refers to the sample on the right, + -1 the sample on the left, etc. All arrays have to be 1D. + + Arguments: + samples: 1D numpy array with data samples + shift: 1D numpy array with shifts + + Returns: + Numpy array with interpolated samples + """ + npad_left = 0 + if self.margin_left > 0: + match self._left_bound: + case DynShiftBC.ZEROPAD: + npad_left = self.margin_left + samples = np.concatenate([np.zeros(npad_left), samples]) + case DynShiftBC.FLAT: + npad_left = self.margin_left + samples = np.concatenate([np.ones(npad_left) * samples[0], samples]) + case _: + msg = ( + f"DynamicShiftNumpy: left edge handling {self._left_bound.name} not " + f"possible for given max delay {self._max_delay}." + ) + raise RuntimeError(msg) + + if self.margin_right > 0: + match self._right_bound: + case DynShiftBC.ZEROPAD: + samples = np.concatenate([samples, np.zeros(self.margin_right)]) + case DynShiftBC.FLAT: + samples = np.concatenate( + [samples, np.ones(self.margin_right) * samples[-1]] + ) + case _: + msg = ( + f"DynamicShiftNumpy: right edge handling {self._right_bound.name} not " + f"possible for given min delay {self._min_delay=}." + ) + raise RuntimeError(msg) + + pos = 0 + shift_shape = len(shift) + n_size = npad_left + shift_shape + self.margin_right + n_first = pos + npad_left - self.margin_left + samples_needed = samples[n_first : n_first + n_size] + + loc = -shift + offsets = self.margin_left + np.arange(shift_shape) + + samples_shifted = self._op_interp(samples_needed, loc, offsets) + + return make_numpy_array_1d(samples_shifted) + + +def make_dynamic_shift_lagrange_numpy( + length: int, + min_delay: float, + max_delay: float, + left_bound: EdgeHandling, + right_bound: EdgeHandling, +) -> DynamicShiftNumpy: + """Set up DynamicShiftNumpy instance with Lagrange interpolation method. + + Arguments: + length: Number of lagrange polynomials (of order length - 1) + min_delay: Assume that any shift < -min_delay + max_delay: Assume that any shift > -max_delay + left_bound: Treatment of left boundary + right_bound: Treatment of right boundary + + Returns: + Interpolation function of type DynamicShiftNumpy + """ + interp = RegularInterpLagrange(length) + return DynamicShiftNumpy(min_delay, max_delay, left_bound, right_bound, interp) diff --git a/tests/test_dynamic_delay_dask.py b/tests/test_dynamic_delay_dask.py index 64eacf5..41d1315 100644 --- a/tests/test_dynamic_delay_dask.py +++ b/tests/test_dynamic_delay_dask.py @@ -1,11 +1,14 @@ import numpy as np import pytest + from lisainstrument.dynamic_delay_dask import ( - make_dynamic_shift_linear_dask, make_dynamic_shift_lagrange_dask, - make_dynamic_shift_lagrange_numpy, + make_dynamic_shift_linear_dask, numpyfy_dask_bivariate, +) +from lisainstrument.dynamic_delay_numpy import ( DynShiftBC, + make_dynamic_shift_lagrange_numpy, make_lagrange_polynomials, ) @@ -32,14 +35,15 @@ def test_dynamic_shift_linear_dask(): assert s_ex == pytest.approx(s_da, abs=1e-15, rel=1e-14) + def test_dynamic_shift_lagrange_dask(): order = 4 length = order + 1 - + t, dt = np.linspace(-5.345, 10.345, 1003, retstep=True) def g(x): - n = x / 10. + n = x / 10.0 return 4.32546 + 3.34324 * x + 4.342 * x**2 + 0.46 * x**3 + 1.43598 * x**4 y = g(t) @@ -47,36 +51,35 @@ def test_dynamic_shift_lagrange_dask(): d = (0.93456456 + 0.0235345 * np.cos(4.3354 * t)) / dt # ~ print(y[0], t[0], d[0]) - - op_da = make_dynamic_shift_lagrange_dask(length, - d.min(), d.max(), DynShiftBC.FLAT, DynShiftBC.EXCEPTION + + op_da = make_dynamic_shift_lagrange_dask( + length, d.min(), d.max(), DynShiftBC.FLAT, DynShiftBC.EXCEPTION ) op_na = numpyfy_dask_bivariate(op_da, chunks=19) s_da = op_na(y, d) s_ex = g(np.maximum(t[0], t - d * dt)) - assert s_ex[op_da.margin_left:] == pytest.approx(s_da[op_da.margin_left:], abs=1e-15, rel=1e-14) - - - op_np = make_dynamic_shift_lagrange_numpy(length, - d.min(), d.max(), DynShiftBC.FLAT, DynShiftBC.EXCEPTION + assert s_ex[op_da.margin_left :] == pytest.approx( + s_da[op_da.margin_left :], abs=1e-15, rel=1e-14 ) - - s_np = op_np(y,d) - + + op_np = make_dynamic_shift_lagrange_numpy( + length, d.min(), d.max(), DynShiftBC.FLAT, DynShiftBC.EXCEPTION + ) + + s_np = op_np(y, d) + assert np.all(s_np == s_da) - def test_lagrange_polynomials(): length = 11 - d = -(length//2) - lagps = make_lagrange_polynomials(length,d) - for j,p in enumerate(lagps): - for x in range(d,d+length): - if x-d == j: + d = -(length // 2) + lagps = make_lagrange_polynomials(length, d) + for j, p in enumerate(lagps): + for x in range(d, d + length): + if x - d == j: assert p(x) == pytest.approx(1, abs=0, rel=1e-12) else: assert p(x) == pytest.approx(0, abs=1e-11, rel=0) - -- GitLab