diff --git a/lisainstrument/dynamic_delay_dask.py b/lisainstrument/dynamic_delay_dask.py index fb058bdfb1fa52590db86fad495c9a318bfa918e..d40bd2e10dfd377e673528080cf96b0bf4a1aac5 100644 --- a/lisainstrument/dynamic_delay_dask.py +++ b/lisainstrument/dynamic_delay_dask.py @@ -14,9 +14,10 @@ from typing_extensions import assert_never from lisainstrument.dynamic_delay_numpy import ( DynShiftBC, - RegularInterpLagrange, - RegularInterpLinear, - RegularInterpMethod, + DynShiftCfg, + RegularInterpolator, + make_regular_interpolator_lagrange, + make_regular_interpolator_linear, ) from lisainstrument.fir_filters_dask import DaskArray1D, make_dask_array_1d from lisainstrument.fir_filters_numpy import NumpyArray1D, make_numpy_array_1d @@ -43,43 +44,19 @@ class DynamicShiftDask: def __init__( self, - min_delay: float, - max_delay: float, - left_bound: DynShiftBC, - right_bound: DynShiftBC, - interp: RegularInterpMethod, + cfg: DynShiftCfg, + interp: RegularInterpolator, ): """Set up interpolation parameters Arguments: - min_delay: Assume that any shift < -min_delay - max_delay: Assume that any shift > -max_delay - left_bound: Treatment of left boundary - right_bound: Treatment of right boundary + cfg: limits of delay and boundary treatment interp: implementation of an interpolation method """ - if min_delay > max_delay: - msg = f"dynamic_shift_dask: invalid delay range {min_delay=}, {max_delay=}" - raise ValueError(msg) - - self._min_delay: Final[int] = int(np.floor(min_delay)) - self._max_delay: Final[int] = int(np.ceil(max_delay)) - - self._left_bound: Final = left_bound - self._right_bound: Final = right_bound + self._cfg: Final = cfg self._interp_np: Final = interp - def _op_interp( - self, samp: np.ndarray, loc: np.ndarray, ofs: np.ndarray - ) -> np.ndarray: - """helper method ensuring correct array dimensionalities""" - return self._interp_np( - make_numpy_array_1d(samp), - make_numpy_array_1d(loc), - make_numpy_array_1d(ofs), - ) - @property def margin_left(self) -> int: """Left margin size. @@ -87,7 +64,7 @@ class DynamicShiftDask: If positive, specifies how many samples on the left have to be added by boundary conditions. If negative, specifies how many samples on the left are unused. """ - return self._interp_np.margin_left + self._max_delay + return self._interp_np.margin_left + self._cfg.max_delay_int @property def margin_right(self) -> int: @@ -96,7 +73,7 @@ class DynamicShiftDask: If positive, specifies how many samples on the right have to be added by boundary conditions. If negative, specifies how many samples on the right are unused. """ - return self._interp_np.margin_right - self._min_delay + return self._interp_np.margin_right - self._cfg.min_delay_int def __call__(self, samples: da.Array, shift: da.Array) -> DaskArray1D: """Apply shift given by dask array to samples in another dask array. @@ -115,7 +92,7 @@ class DynamicShiftDask: """ npad_left = 0 if self.margin_left > 0: - match self._left_bound: + match self._cfg.left_bound: case DynShiftBC.ZEROPAD: npad_left = self.margin_left samples = da.concatenate([da.zeros(npad_left), samples]) @@ -124,15 +101,15 @@ class DynamicShiftDask: samples = da.concatenate([da.ones(npad_left) * samples[0], samples]) case DynShiftBC.EXCEPTION: msg = ( - f"DynamicShiftDask: left edge handling {self._left_bound.name} not " - f"possible for given max delay {self._max_delay}." + f"DynamicShiftDask: left edge handling {self._cfg.left_bound.name} not " + f"possible for given max delay {self._cfg.max_delay}." ) raise RuntimeError(msg) case _ as unreachable: assert_never(unreachable) if self.margin_right > 0: - match self._right_bound: + match self._cfg.right_bound: case DynShiftBC.ZEROPAD: samples = da.concatenate([samples, da.zeros(self.margin_right)]) case DynShiftBC.FLAT: @@ -141,15 +118,15 @@ class DynamicShiftDask: ) case DynShiftBC.EXCEPTION: msg = ( - f"DynamicShiftDask: right edge handling {self._right_bound.name} not " - f"possible for given min delay {self._min_delay=}." + f"DynamicShiftDask: right edge handling {self._cfg.right_bound.name} not " + f"possible for given min delay {self._cfg.min_delay}." ) raise RuntimeError(msg) case _ as unreachable: assert_never(unreachable) chunks = shift.to_delayed() - delayed_op = dask.delayed(self._op_interp) + delayed_op = dask.delayed(self._interp_np) results = [] pos = 0 for chunk, chunk_shape in zip(chunks, shift.chunks[0], strict=True): @@ -186,8 +163,9 @@ def make_dynamic_shift_linear_dask( Returns: Interpolation function of type DynamicShiftDask """ - interp = RegularInterpLinear() - return DynamicShiftDask(min_delay, max_delay, left_bound, right_bound, interp) + interp = make_regular_interpolator_linear() + cfg = DynShiftCfg(min_delay, max_delay, left_bound, right_bound) + return DynamicShiftDask(cfg, interp) def make_dynamic_shift_lagrange_dask( @@ -210,8 +188,9 @@ def make_dynamic_shift_lagrange_dask( Interpolation function of type DynamicShiftDask """ - interp = RegularInterpLagrange(length) - return DynamicShiftDask(min_delay, max_delay, left_bound, right_bound, interp) + interp = make_regular_interpolator_lagrange(length) + cfg = DynShiftCfg(min_delay, max_delay, left_bound, right_bound) + return DynamicShiftDask(cfg, interp) def numpyfy_dask_bivariate( diff --git a/lisainstrument/dynamic_delay_numpy.py b/lisainstrument/dynamic_delay_numpy.py index c75e29c8e2af8a48847f697b87dc2a62a6eb14ca..5449e0cd635f4a24a80ad2f34cce3d043ef0504a 100644 --- a/lisainstrument/dynamic_delay_numpy.py +++ b/lisainstrument/dynamic_delay_numpy.py @@ -7,6 +7,7 @@ from __future__ import annotations import functools import operator +from dataclasses import dataclass from enum import Enum from typing import Final, Protocol @@ -24,21 +25,17 @@ from lisainstrument.fir_filters_numpy import ( ) -class RegularInterpMethod(Protocol): - """ - Class instances implementing this protocol act as a function accepting a 1D array - of sample values and locations to interpolate the samples to. The x-coordinate coincides - with the sample index at integer locations. Locations are passed as a 1D array of floating - point values and an integer array with additional integer offsets. The reason is - to avoid loss of accuracy by adding largert integers to smaller offsets. - The floating point part of the location is not restricted, however (except by the - range of provided data). The integer part of the locations is split off and combined - with the integer offsets, the residual is denoted here as fractional locations. - - Boundary treatment is not part of this protocol but is the responability of higher level - interfaces that are nont specific to the interpolation method. Here, sample locations have - to be restricted to the range that can be computed without boundary conditions. - The corresponding margins sizes are provided as properties. +class RegularInterpCore(Protocol): + """Protocol for interpolator engine interface for regularly spaced samples + + This defines an interface for core functionality of interpolating regularly + spaced samples in 1D, using numpy arrays. It is not intended for direct use + but for use in the RegularInterpolator class. + + Boundary treatment is not part of this protocol. Implementations should only + accept locations that can be interpolated to without using any form of boundary + conditions, and raise an exception otherwise. The margin sizes required by the + interpolation method are exposed as properties. """ @property @@ -55,9 +52,12 @@ class RegularInterpMethod(Protocol): The interpolator cannot be called with locations within this margin from the rightmost sample. """ - def __call__( - self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 - ) -> NumpyArray1D: + def apply( + self, + samples: np.ndarray, + locations: np.ndarray, + int_offsets: np.ndarray | int = 0, + ) -> np.ndarray: """Interpolate regularly spaced data to location in index-space The locations to interpolate to are the sum of the locations and int_offsets arguments. @@ -71,6 +71,8 @@ class RegularInterpMethod(Protocol): If int_offsets is an array, it needs to have same size arrays location and. All arrays need to be onedimensional. + Implementations do not need to check the array dimensionality, size consistency, and types. + This is done in RegularInterpolator. Arguments: samples: 1D numpy array with sampled data @@ -81,30 +83,6 @@ class RegularInterpMethod(Protocol): Interpolated samples. """ - def check_arguments( - self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int - ) -> None: - """Ensure arguments have correct types""" - if isinstance(int_offsets, np.ndarray): - if int_offsets.shape != locations.shape: - msg = ( - f"RegularInterpLinear: inconsistent arrays sizes of " - f"locations ({locations.shape}) and offsets ({int_offsets.shape})" - ) - raise ValueError(msg) - if not np.issubdtype(int_offsets.dtype, np.integer): - msg = ( - "RegularInterpLinear: non-integer dtype for int_offsets not allowed" - ) - raise TypeError(msg) - elif not isinstance(int_offsets, int): - msg = "RegularInterpLinear: int_offsets must be int or int array" - raise TypeError(msg) - - if not np.issubdtype(locations.dtype, np.floating): - msg = "RegularInterpLinear: non-float dtype for locations not allowed" - raise TypeError(msg) - def make_lagrange_polynomials(length: int, offset: int) -> list[Polynomial]: r"""Construct lagrange interpolating polynomials @@ -133,7 +111,7 @@ def make_lagrange_polynomials(length: int, offset: int) -> list[Polynomial]: return [k_j(j) for j in range(length)] -class RegularInterpLagrange(RegularInterpMethod): +class RegularInterpLagrange(RegularInterpCore): r"""Class implementing interpolation of regularly spaced 1D data using Lagrange polynomials. The algorithm uses Lagrange interpolation is specialized to regularly spaced data. @@ -148,7 +126,7 @@ class RegularInterpLagrange(RegularInterpMethod): locations, the center points is the floor of the location, with remaining fractional shift within $[0,1)$ - See RegularInterpMethod for general properties not specific to the interpolation method. + See RegularInterpCore for general properties not specific to the interpolation method. """ @staticmethod @@ -166,13 +144,13 @@ class RegularInterpLagrange(RegularInterpMethod): def __init__(self, length: int): """Set up interpolation parameters. - The order of the interpolation is chosen in terms of the stencil - size (number of samples used for each interpolation point). The size - is one plus the order of the interpolator, meaning the order of a polynomial - that is still interpolated with zero errror. + The length parameter specifies the number of interpolation polynomials, + which have order length - 1. The length is also the number of samples used + for each interpolation point. The order of the interpolating polynomials + is also the order of plynomials that are interpolated with zero error. Arguments: - length: Size of the lagrange stencil + length: Size of the interpolation stencil """ if length <= 1: @@ -204,22 +182,24 @@ class RegularInterpLagrange(RegularInterpMethod): """ return self._length - 1 + self._offset - def __call__( - self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 - ) -> NumpyArray1D: + def apply( + self, + samples: np.ndarray, + locations: np.ndarray, + int_offsets: np.ndarray | int = 0, + ) -> np.ndarray: """Interpolate regularly spaced data to location in index-space - See RegularInterpMethod.__call__ + See RegularInterpCore.apply() Arguments: - samples: 1D numpy array with sampled data + samples: real-valued 1D numpy array with sampled data locations: real-valued 1D numpy array with locations to interpolate to int_offsets: integer or integer 1D array with additional offsets to the locations. Returns: Interpolated samples. """ - self.check_arguments(samples, locations, int_offsets) if self._length % 2 == 0: loc_int = np.floor(locations).astype(int) @@ -241,13 +221,13 @@ class RegularInterpLagrange(RegularInterpMethod): result[:] += fir(samples)[k] * xpow xpow[:] *= loc_frac - return make_numpy_array_1d(result) + return result -class RegularInterpLinear(RegularInterpMethod): +class RegularInterpLinear(RegularInterpCore): """Class implementing interpolation of regularly spaced 1D data using linear interpolation. - See RegularInterpMethod for general properties not specific to the interpolation method. + See RegularInterpCore for general properties not specific to the interpolation method. """ @property @@ -266,12 +246,15 @@ class RegularInterpLinear(RegularInterpMethod): """ return 0 - def __call__( - self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0 - ) -> NumpyArray1D: + def apply( + self, + samples: np.ndarray, + locations: np.ndarray, + int_offsets: np.ndarray | int = 0, + ) -> np.ndarray: """Interpolate regularly spaced data to location in index-space - See RegularInterpMethod.__call__ + See RegularInterpCore.apply() Arguments: samples: 1D numpy array with sampled data @@ -281,7 +264,6 @@ class RegularInterpLinear(RegularInterpMethod): Returns: Interpolated samples. """ - self.check_arguments(samples, locations, int_offsets) loc_floor = np.floor(locations) loc_frac = locations - loc_floor @@ -295,6 +277,124 @@ class RegularInterpLinear(RegularInterpMethod): return samples[k] * (1.0 - loc_frac) + samples[k + 1] * loc_frac +class RegularInterpolator: + """User-facing class for interpolation of regularly spaced data + + The interpolation method is not fixed but given by an interpolation engine. + """ + + def __init__(self, core: RegularInterpCore): + """Constructor not intended for direct use. + + Use named constructors make_regular_interpolator_lagrange() or + make_regular_interpolator_linear() to get interpolators employing specific methods. + """ + self._core: Final = core + + @property + def margin_left(self) -> int: + """Margin size (>= 0) on the left boundary + + The interpolator cannot be called with locations within this margin from the leftmost sample. + """ + return self._core.margin_left + + @property + def margin_right(self) -> int: + """Margin size (>= 0) on the right boundary + + The interpolator cannot be called with locations within this margin from the rightmost sample. + """ + return self._core.margin_right + + def __call__( + self, + samples_: np.ndarray, + locations_: np.ndarray, + int_offsets_: np.ndarray | int, + ) -> NumpyArray1D: + """Interpolate regularly spaced data to location in index-space + + The locations to interpolate to are the sum of the locations and int_offsets arguments. + The location argument is an 1D array with arbitrary floating point locations, and the + int_offset argument is an integer or integer array with additional integer offsets. The locations + argument is not restricted, i.e. it is not necessarily the residual fractional locations but + can have arbitrary values. + + The locations refer to the index of the array with the sampled data, starting at 0. + The length of samples array does not have to match the size of the location arrays. + If int_offsets is an array, it needs to have same size arrays location and. All + arrays need to be onedimensional. + + The locations must be within the margins given by the margin_left and margin_right + properties. + + Arguments: + samples: 1D numpy array with sampled data + locations: real-valued 1D numpy array with locations to interpolate to + int_offsets: integer or integer 1D array with additional offsets to the locations. + + Returns: + Interpolated samples. + """ + + samples = make_numpy_array_1d(samples_) + if not np.issubdtype(samples_.dtype, np.floating): + msg = "RegularInterpolator: non-float dtype for samples not allowed" + raise TypeError(msg) + + locations = make_numpy_array_1d(locations_) + if not np.issubdtype(locations_.dtype, np.floating): + msg = "RegularInterpolator: non-float dtype for locations not allowed" + raise TypeError(msg) + + int_offsets: NumpyArray1D | int + + if isinstance(int_offsets_, np.ndarray): + int_offsets = make_numpy_array_1d(int_offsets_) + if int_offsets_.shape != locations_.shape: + msg = ( + f"RegularInterpolator: inconsistent arrays sizes of " + f"locations ({locations_.shape}) and offsets ({int_offsets_.shape})" + ) + raise ValueError(msg) + if not np.issubdtype(int_offsets_.dtype, np.integer): + msg = ( + "RegularInterpolator: non-integer dtype for int_offsets not allowed" + ) + raise TypeError(msg) + elif isinstance(int_offsets_, int): + int_offsets = int_offsets_ + else: + msg = "RegularInterpolator: int_offset must be integer or integer array" + raise TypeError(msg) + + res = self._core.apply(samples, locations, int_offsets) + return make_numpy_array_1d(res) + + +def make_regular_interpolator_lagrange(length: int) -> RegularInterpolator: + """Create an interpolator using Lagrange interpolation + + See RegularInterpLagrange for details of the method. + + Arguments: + length: size of interpolation stencil + Returns: + Interpolation function + """ + return RegularInterpolator(RegularInterpLagrange(length)) + + +def make_regular_interpolator_linear() -> RegularInterpolator: + """Create an interpolator using linear interpolation + + Returns: + Interpolation function + """ + return RegularInterpolator(RegularInterpLinear()) + + class DynShiftBC(Enum): """Enum for various methods of handling boundaries in dynamic shift @@ -308,6 +408,38 @@ class DynShiftBC(Enum): FLAT = 3 +@dataclass(frozen=True) +class DynShiftCfg: + """Config class for dynamic shift interpolation + + Attributes: + min_delay: Assume that any shift < -min_delay + max_delay: Assume that any shift > -max_delay + left_bound: Treatment of left boundary + right_bound: Treatment of right boundary + """ + + min_delay: float + max_delay: float + left_bound: DynShiftBC + right_bound: DynShiftBC + + @property + def min_delay_int(self) -> int: + """Minimum delay in integer index space""" + return int(np.floor(self.min_delay)) + + @property + def max_delay_int(self) -> int: + """Maximum delay in integer index space""" + return int(np.ceil(self.max_delay)) + + def __post__init__(self): + if self.min_delay > self.max_delay: + msg = f"Invalid delay range for dynamic shift ({self.min_delay}, {self.max_delay})" + raise ValueError(msg) + + class DynamicShiftNumpy: """Interpolate to locations given by a non-const shift. @@ -329,34 +461,18 @@ class DynamicShiftNumpy: def __init__( self, - min_delay: float, - max_delay: float, - left_bound: DynShiftBC, - right_bound: DynShiftBC, - interp: RegularInterpMethod, + cfg: DynShiftCfg, + interp: RegularInterpolator, ): + """Set up interpolator. - if min_delay > max_delay: - msg = f"dynamic_shift_dask: invalid delay range {min_delay=}, {max_delay=}" - raise ValueError(msg) - - self._min_delay: Final[int] = int(np.floor(min_delay)) - self._max_delay: Final[int] = int(np.ceil(max_delay)) - - self._left_bound: Final = left_bound - self._right_bound: Final = right_bound + Arguments: + cfg: limits of delay and boundary treatment + interp: interpolation method + """ + self._cfg: Final = cfg self._interp_np: Final = interp - def _op_interp( - self, samp: np.ndarray, loc: np.ndarray, ofs: np.ndarray - ) -> np.ndarray: - """helper method ensuring correct array dimensionalities""" - return self._interp_np( - make_numpy_array_1d(samp), - make_numpy_array_1d(loc), - make_numpy_array_1d(ofs), - ) - @property def margin_left(self) -> int: """Left margin size. @@ -364,7 +480,7 @@ class DynamicShiftNumpy: If positive, specifies how many samples on the left have to be added by boundary conditions. If negative, specifies how many samples on the left are unused. """ - return self._interp_np.margin_left + self._max_delay + return self._interp_np.margin_left + self._cfg.max_delay_int @property def margin_right(self) -> int: @@ -373,7 +489,7 @@ class DynamicShiftNumpy: If positive, specifies how many samples on the right have to be added by boundary conditions. If negative, specifies how many samples on the right are unused. """ - return self._interp_np.margin_right - self._min_delay + return self._interp_np.margin_right - self._cfg.min_delay_int def __call__(self, samples: np.ndarray, shift: np.ndarray) -> NumpyArray1D: """Apply shift given by numpy array to samples in another numpy array. @@ -392,7 +508,7 @@ class DynamicShiftNumpy: """ npad_left = 0 if self.margin_left > 0: - match self._left_bound: + match self._cfg.left_bound: case DynShiftBC.ZEROPAD: npad_left = self.margin_left samples = np.concatenate([np.zeros(npad_left), samples]) @@ -401,15 +517,15 @@ class DynamicShiftNumpy: samples = np.concatenate([np.ones(npad_left) * samples[0], samples]) case DynShiftBC.EXCEPTION: msg = ( - f"DynamicShiftNumpy: left edge handling {self._left_bound.name} not " - f"possible for given max delay {self._max_delay}." + f"DynamicShiftNumpy: left edge handling {self._cfg.left_bound.name} not " + f"possible for given max delay {self._cfg.max_delay}." ) raise RuntimeError(msg) case _ as unreachable: assert_never(unreachable) if self.margin_right > 0: - match self._right_bound: + match self._cfg.right_bound: case DynShiftBC.ZEROPAD: samples = np.concatenate([samples, np.zeros(self.margin_right)]) case DynShiftBC.FLAT: @@ -418,8 +534,8 @@ class DynamicShiftNumpy: ) case DynShiftBC.EXCEPTION: msg = ( - f"DynamicShiftNumpy: right edge handling {self._right_bound.name} not " - f"possible for given min delay {self._min_delay=}." + f"DynamicShiftNumpy: right edge handling {self._cfg.right_bound.name} not " + f"possible for given min delay {self._cfg.min_delay=}." ) raise RuntimeError(msg) case _ as unreachable: @@ -434,9 +550,7 @@ class DynamicShiftNumpy: loc = -shift offsets = self.margin_left + np.arange(shift_shape) - samples_shifted = self._op_interp(samples_needed, loc, offsets) - - return make_numpy_array_1d(samples_shifted) + return self._interp_np(samples_needed, loc, offsets) def make_dynamic_shift_lagrange_numpy( @@ -458,5 +572,6 @@ def make_dynamic_shift_lagrange_numpy( Returns: Interpolation function of type DynamicShiftNumpy """ - interp = RegularInterpLagrange(length) - return DynamicShiftNumpy(min_delay, max_delay, left_bound, right_bound, interp) + interp = make_regular_interpolator_lagrange(length) + cfg = DynShiftCfg(min_delay, max_delay, left_bound, right_bound) + return DynamicShiftNumpy(cfg, interp)