Skip to content
Snippets Groups Projects
Commit 7d1014df authored by Wolfgang Kastaun's avatar Wolfgang Kastaun
Browse files

Minor restructuring of dynamic delay modules to remove duplicate code

parent 2a53f2a8
No related branches found
No related tags found
No related merge requests found
......@@ -14,9 +14,10 @@ from typing_extensions import assert_never
from lisainstrument.dynamic_delay_numpy import (
DynShiftBC,
RegularInterpLagrange,
RegularInterpLinear,
RegularInterpMethod,
DynShiftCfg,
RegularInterpolator,
make_regular_interpolator_lagrange,
make_regular_interpolator_linear,
)
from lisainstrument.fir_filters_dask import DaskArray1D, make_dask_array_1d
from lisainstrument.fir_filters_numpy import NumpyArray1D, make_numpy_array_1d
......@@ -43,43 +44,19 @@ class DynamicShiftDask:
def __init__(
self,
min_delay: float,
max_delay: float,
left_bound: DynShiftBC,
right_bound: DynShiftBC,
interp: RegularInterpMethod,
cfg: DynShiftCfg,
interp: RegularInterpolator,
):
"""Set up interpolation parameters
Arguments:
min_delay: Assume that any shift < -min_delay
max_delay: Assume that any shift > -max_delay
left_bound: Treatment of left boundary
right_bound: Treatment of right boundary
cfg: limits of delay and boundary treatment
interp: implementation of an interpolation method
"""
if min_delay > max_delay:
msg = f"dynamic_shift_dask: invalid delay range {min_delay=}, {max_delay=}"
raise ValueError(msg)
self._min_delay: Final[int] = int(np.floor(min_delay))
self._max_delay: Final[int] = int(np.ceil(max_delay))
self._left_bound: Final = left_bound
self._right_bound: Final = right_bound
self._cfg: Final = cfg
self._interp_np: Final = interp
def _op_interp(
self, samp: np.ndarray, loc: np.ndarray, ofs: np.ndarray
) -> np.ndarray:
"""helper method ensuring correct array dimensionalities"""
return self._interp_np(
make_numpy_array_1d(samp),
make_numpy_array_1d(loc),
make_numpy_array_1d(ofs),
)
@property
def margin_left(self) -> int:
"""Left margin size.
......@@ -87,7 +64,7 @@ class DynamicShiftDask:
If positive, specifies how many samples on the left have to be added by boundary conditions.
If negative, specifies how many samples on the left are unused.
"""
return self._interp_np.margin_left + self._max_delay
return self._interp_np.margin_left + self._cfg.max_delay_int
@property
def margin_right(self) -> int:
......@@ -96,7 +73,7 @@ class DynamicShiftDask:
If positive, specifies how many samples on the right have to be added by boundary conditions.
If negative, specifies how many samples on the right are unused.
"""
return self._interp_np.margin_right - self._min_delay
return self._interp_np.margin_right - self._cfg.min_delay_int
def __call__(self, samples: da.Array, shift: da.Array) -> DaskArray1D:
"""Apply shift given by dask array to samples in another dask array.
......@@ -115,7 +92,7 @@ class DynamicShiftDask:
"""
npad_left = 0
if self.margin_left > 0:
match self._left_bound:
match self._cfg.left_bound:
case DynShiftBC.ZEROPAD:
npad_left = self.margin_left
samples = da.concatenate([da.zeros(npad_left), samples])
......@@ -124,15 +101,15 @@ class DynamicShiftDask:
samples = da.concatenate([da.ones(npad_left) * samples[0], samples])
case DynShiftBC.EXCEPTION:
msg = (
f"DynamicShiftDask: left edge handling {self._left_bound.name} not "
f"possible for given max delay {self._max_delay}."
f"DynamicShiftDask: left edge handling {self._cfg.left_bound.name} not "
f"possible for given max delay {self._cfg.max_delay}."
)
raise RuntimeError(msg)
case _ as unreachable:
assert_never(unreachable)
if self.margin_right > 0:
match self._right_bound:
match self._cfg.right_bound:
case DynShiftBC.ZEROPAD:
samples = da.concatenate([samples, da.zeros(self.margin_right)])
case DynShiftBC.FLAT:
......@@ -141,15 +118,15 @@ class DynamicShiftDask:
)
case DynShiftBC.EXCEPTION:
msg = (
f"DynamicShiftDask: right edge handling {self._right_bound.name} not "
f"possible for given min delay {self._min_delay=}."
f"DynamicShiftDask: right edge handling {self._cfg.right_bound.name} not "
f"possible for given min delay {self._cfg.min_delay}."
)
raise RuntimeError(msg)
case _ as unreachable:
assert_never(unreachable)
chunks = shift.to_delayed()
delayed_op = dask.delayed(self._op_interp)
delayed_op = dask.delayed(self._interp_np)
results = []
pos = 0
for chunk, chunk_shape in zip(chunks, shift.chunks[0], strict=True):
......@@ -186,8 +163,9 @@ def make_dynamic_shift_linear_dask(
Returns:
Interpolation function of type DynamicShiftDask
"""
interp = RegularInterpLinear()
return DynamicShiftDask(min_delay, max_delay, left_bound, right_bound, interp)
interp = make_regular_interpolator_linear()
cfg = DynShiftCfg(min_delay, max_delay, left_bound, right_bound)
return DynamicShiftDask(cfg, interp)
def make_dynamic_shift_lagrange_dask(
......@@ -210,8 +188,9 @@ def make_dynamic_shift_lagrange_dask(
Interpolation function of type DynamicShiftDask
"""
interp = RegularInterpLagrange(length)
return DynamicShiftDask(min_delay, max_delay, left_bound, right_bound, interp)
interp = make_regular_interpolator_lagrange(length)
cfg = DynShiftCfg(min_delay, max_delay, left_bound, right_bound)
return DynamicShiftDask(cfg, interp)
def numpyfy_dask_bivariate(
......
......@@ -7,6 +7,7 @@ from __future__ import annotations
import functools
import operator
from dataclasses import dataclass
from enum import Enum
from typing import Final, Protocol
......@@ -24,21 +25,17 @@ from lisainstrument.fir_filters_numpy import (
)
class RegularInterpMethod(Protocol):
"""
Class instances implementing this protocol act as a function accepting a 1D array
of sample values and locations to interpolate the samples to. The x-coordinate coincides
with the sample index at integer locations. Locations are passed as a 1D array of floating
point values and an integer array with additional integer offsets. The reason is
to avoid loss of accuracy by adding largert integers to smaller offsets.
The floating point part of the location is not restricted, however (except by the
range of provided data). The integer part of the locations is split off and combined
with the integer offsets, the residual is denoted here as fractional locations.
Boundary treatment is not part of this protocol but is the responability of higher level
interfaces that are nont specific to the interpolation method. Here, sample locations have
to be restricted to the range that can be computed without boundary conditions.
The corresponding margins sizes are provided as properties.
class RegularInterpCore(Protocol):
"""Protocol for interpolator engine interface for regularly spaced samples
This defines an interface for core functionality of interpolating regularly
spaced samples in 1D, using numpy arrays. It is not intended for direct use
but for use in the RegularInterpolator class.
Boundary treatment is not part of this protocol. Implementations should only
accept locations that can be interpolated to without using any form of boundary
conditions, and raise an exception otherwise. The margin sizes required by the
interpolation method are exposed as properties.
"""
@property
......@@ -55,9 +52,12 @@ class RegularInterpMethod(Protocol):
The interpolator cannot be called with locations within this margin from the rightmost sample.
"""
def __call__(
self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0
) -> NumpyArray1D:
def apply(
self,
samples: np.ndarray,
locations: np.ndarray,
int_offsets: np.ndarray | int = 0,
) -> np.ndarray:
"""Interpolate regularly spaced data to location in index-space
The locations to interpolate to are the sum of the locations and int_offsets arguments.
......@@ -71,6 +71,8 @@ class RegularInterpMethod(Protocol):
If int_offsets is an array, it needs to have same size arrays location and. All
arrays need to be onedimensional.
Implementations do not need to check the array dimensionality, size consistency, and types.
This is done in RegularInterpolator.
Arguments:
samples: 1D numpy array with sampled data
......@@ -81,30 +83,6 @@ class RegularInterpMethod(Protocol):
Interpolated samples.
"""
def check_arguments(
self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int
) -> None:
"""Ensure arguments have correct types"""
if isinstance(int_offsets, np.ndarray):
if int_offsets.shape != locations.shape:
msg = (
f"RegularInterpLinear: inconsistent arrays sizes of "
f"locations ({locations.shape}) and offsets ({int_offsets.shape})"
)
raise ValueError(msg)
if not np.issubdtype(int_offsets.dtype, np.integer):
msg = (
"RegularInterpLinear: non-integer dtype for int_offsets not allowed"
)
raise TypeError(msg)
elif not isinstance(int_offsets, int):
msg = "RegularInterpLinear: int_offsets must be int or int array"
raise TypeError(msg)
if not np.issubdtype(locations.dtype, np.floating):
msg = "RegularInterpLinear: non-float dtype for locations not allowed"
raise TypeError(msg)
def make_lagrange_polynomials(length: int, offset: int) -> list[Polynomial]:
r"""Construct lagrange interpolating polynomials
......@@ -133,7 +111,7 @@ def make_lagrange_polynomials(length: int, offset: int) -> list[Polynomial]:
return [k_j(j) for j in range(length)]
class RegularInterpLagrange(RegularInterpMethod):
class RegularInterpLagrange(RegularInterpCore):
r"""Class implementing interpolation of regularly spaced 1D data using Lagrange polynomials.
The algorithm uses Lagrange interpolation is specialized to regularly spaced data.
......@@ -148,7 +126,7 @@ class RegularInterpLagrange(RegularInterpMethod):
locations, the center points is the floor of the location, with remaining fractional
shift within $[0,1)$
See RegularInterpMethod for general properties not specific to the interpolation method.
See RegularInterpCore for general properties not specific to the interpolation method.
"""
@staticmethod
......@@ -166,13 +144,13 @@ class RegularInterpLagrange(RegularInterpMethod):
def __init__(self, length: int):
"""Set up interpolation parameters.
The order of the interpolation is chosen in terms of the stencil
size (number of samples used for each interpolation point). The size
is one plus the order of the interpolator, meaning the order of a polynomial
that is still interpolated with zero errror.
The length parameter specifies the number of interpolation polynomials,
which have order length - 1. The length is also the number of samples used
for each interpolation point. The order of the interpolating polynomials
is also the order of plynomials that are interpolated with zero error.
Arguments:
length: Size of the lagrange stencil
length: Size of the interpolation stencil
"""
if length <= 1:
......@@ -204,22 +182,24 @@ class RegularInterpLagrange(RegularInterpMethod):
"""
return self._length - 1 + self._offset
def __call__(
self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0
) -> NumpyArray1D:
def apply(
self,
samples: np.ndarray,
locations: np.ndarray,
int_offsets: np.ndarray | int = 0,
) -> np.ndarray:
"""Interpolate regularly spaced data to location in index-space
See RegularInterpMethod.__call__
See RegularInterpCore.apply()
Arguments:
samples: 1D numpy array with sampled data
samples: real-valued 1D numpy array with sampled data
locations: real-valued 1D numpy array with locations to interpolate to
int_offsets: integer or integer 1D array with additional offsets to the locations.
Returns:
Interpolated samples.
"""
self.check_arguments(samples, locations, int_offsets)
if self._length % 2 == 0:
loc_int = np.floor(locations).astype(int)
......@@ -241,13 +221,13 @@ class RegularInterpLagrange(RegularInterpMethod):
result[:] += fir(samples)[k] * xpow
xpow[:] *= loc_frac
return make_numpy_array_1d(result)
return result
class RegularInterpLinear(RegularInterpMethod):
class RegularInterpLinear(RegularInterpCore):
"""Class implementing interpolation of regularly spaced 1D data using linear interpolation.
See RegularInterpMethod for general properties not specific to the interpolation method.
See RegularInterpCore for general properties not specific to the interpolation method.
"""
@property
......@@ -266,12 +246,15 @@ class RegularInterpLinear(RegularInterpMethod):
"""
return 0
def __call__(
self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0
) -> NumpyArray1D:
def apply(
self,
samples: np.ndarray,
locations: np.ndarray,
int_offsets: np.ndarray | int = 0,
) -> np.ndarray:
"""Interpolate regularly spaced data to location in index-space
See RegularInterpMethod.__call__
See RegularInterpCore.apply()
Arguments:
samples: 1D numpy array with sampled data
......@@ -281,7 +264,6 @@ class RegularInterpLinear(RegularInterpMethod):
Returns:
Interpolated samples.
"""
self.check_arguments(samples, locations, int_offsets)
loc_floor = np.floor(locations)
loc_frac = locations - loc_floor
......@@ -295,6 +277,124 @@ class RegularInterpLinear(RegularInterpMethod):
return samples[k] * (1.0 - loc_frac) + samples[k + 1] * loc_frac
class RegularInterpolator:
"""User-facing class for interpolation of regularly spaced data
The interpolation method is not fixed but given by an interpolation engine.
"""
def __init__(self, core: RegularInterpCore):
"""Constructor not intended for direct use.
Use named constructors make_regular_interpolator_lagrange() or
make_regular_interpolator_linear() to get interpolators employing specific methods.
"""
self._core: Final = core
@property
def margin_left(self) -> int:
"""Margin size (>= 0) on the left boundary
The interpolator cannot be called with locations within this margin from the leftmost sample.
"""
return self._core.margin_left
@property
def margin_right(self) -> int:
"""Margin size (>= 0) on the right boundary
The interpolator cannot be called with locations within this margin from the rightmost sample.
"""
return self._core.margin_right
def __call__(
self,
samples_: np.ndarray,
locations_: np.ndarray,
int_offsets_: np.ndarray | int,
) -> NumpyArray1D:
"""Interpolate regularly spaced data to location in index-space
The locations to interpolate to are the sum of the locations and int_offsets arguments.
The location argument is an 1D array with arbitrary floating point locations, and the
int_offset argument is an integer or integer array with additional integer offsets. The locations
argument is not restricted, i.e. it is not necessarily the residual fractional locations but
can have arbitrary values.
The locations refer to the index of the array with the sampled data, starting at 0.
The length of samples array does not have to match the size of the location arrays.
If int_offsets is an array, it needs to have same size arrays location and. All
arrays need to be onedimensional.
The locations must be within the margins given by the margin_left and margin_right
properties.
Arguments:
samples: 1D numpy array with sampled data
locations: real-valued 1D numpy array with locations to interpolate to
int_offsets: integer or integer 1D array with additional offsets to the locations.
Returns:
Interpolated samples.
"""
samples = make_numpy_array_1d(samples_)
if not np.issubdtype(samples_.dtype, np.floating):
msg = "RegularInterpolator: non-float dtype for samples not allowed"
raise TypeError(msg)
locations = make_numpy_array_1d(locations_)
if not np.issubdtype(locations_.dtype, np.floating):
msg = "RegularInterpolator: non-float dtype for locations not allowed"
raise TypeError(msg)
int_offsets: NumpyArray1D | int
if isinstance(int_offsets_, np.ndarray):
int_offsets = make_numpy_array_1d(int_offsets_)
if int_offsets_.shape != locations_.shape:
msg = (
f"RegularInterpolator: inconsistent arrays sizes of "
f"locations ({locations_.shape}) and offsets ({int_offsets_.shape})"
)
raise ValueError(msg)
if not np.issubdtype(int_offsets_.dtype, np.integer):
msg = (
"RegularInterpolator: non-integer dtype for int_offsets not allowed"
)
raise TypeError(msg)
elif isinstance(int_offsets_, int):
int_offsets = int_offsets_
else:
msg = "RegularInterpolator: int_offset must be integer or integer array"
raise TypeError(msg)
res = self._core.apply(samples, locations, int_offsets)
return make_numpy_array_1d(res)
def make_regular_interpolator_lagrange(length: int) -> RegularInterpolator:
"""Create an interpolator using Lagrange interpolation
See RegularInterpLagrange for details of the method.
Arguments:
length: size of interpolation stencil
Returns:
Interpolation function
"""
return RegularInterpolator(RegularInterpLagrange(length))
def make_regular_interpolator_linear() -> RegularInterpolator:
"""Create an interpolator using linear interpolation
Returns:
Interpolation function
"""
return RegularInterpolator(RegularInterpLinear())
class DynShiftBC(Enum):
"""Enum for various methods of handling boundaries in dynamic shift
......@@ -308,6 +408,38 @@ class DynShiftBC(Enum):
FLAT = 3
@dataclass(frozen=True)
class DynShiftCfg:
"""Config class for dynamic shift interpolation
Attributes:
min_delay: Assume that any shift < -min_delay
max_delay: Assume that any shift > -max_delay
left_bound: Treatment of left boundary
right_bound: Treatment of right boundary
"""
min_delay: float
max_delay: float
left_bound: DynShiftBC
right_bound: DynShiftBC
@property
def min_delay_int(self) -> int:
"""Minimum delay in integer index space"""
return int(np.floor(self.min_delay))
@property
def max_delay_int(self) -> int:
"""Maximum delay in integer index space"""
return int(np.ceil(self.max_delay))
def __post__init__(self):
if self.min_delay > self.max_delay:
msg = f"Invalid delay range for dynamic shift ({self.min_delay}, {self.max_delay})"
raise ValueError(msg)
class DynamicShiftNumpy:
"""Interpolate to locations given by a non-const shift.
......@@ -329,34 +461,18 @@ class DynamicShiftNumpy:
def __init__(
self,
min_delay: float,
max_delay: float,
left_bound: DynShiftBC,
right_bound: DynShiftBC,
interp: RegularInterpMethod,
cfg: DynShiftCfg,
interp: RegularInterpolator,
):
"""Set up interpolator.
if min_delay > max_delay:
msg = f"dynamic_shift_dask: invalid delay range {min_delay=}, {max_delay=}"
raise ValueError(msg)
self._min_delay: Final[int] = int(np.floor(min_delay))
self._max_delay: Final[int] = int(np.ceil(max_delay))
self._left_bound: Final = left_bound
self._right_bound: Final = right_bound
Arguments:
cfg: limits of delay and boundary treatment
interp: interpolation method
"""
self._cfg: Final = cfg
self._interp_np: Final = interp
def _op_interp(
self, samp: np.ndarray, loc: np.ndarray, ofs: np.ndarray
) -> np.ndarray:
"""helper method ensuring correct array dimensionalities"""
return self._interp_np(
make_numpy_array_1d(samp),
make_numpy_array_1d(loc),
make_numpy_array_1d(ofs),
)
@property
def margin_left(self) -> int:
"""Left margin size.
......@@ -364,7 +480,7 @@ class DynamicShiftNumpy:
If positive, specifies how many samples on the left have to be added by boundary conditions.
If negative, specifies how many samples on the left are unused.
"""
return self._interp_np.margin_left + self._max_delay
return self._interp_np.margin_left + self._cfg.max_delay_int
@property
def margin_right(self) -> int:
......@@ -373,7 +489,7 @@ class DynamicShiftNumpy:
If positive, specifies how many samples on the right have to be added by boundary conditions.
If negative, specifies how many samples on the right are unused.
"""
return self._interp_np.margin_right - self._min_delay
return self._interp_np.margin_right - self._cfg.min_delay_int
def __call__(self, samples: np.ndarray, shift: np.ndarray) -> NumpyArray1D:
"""Apply shift given by numpy array to samples in another numpy array.
......@@ -392,7 +508,7 @@ class DynamicShiftNumpy:
"""
npad_left = 0
if self.margin_left > 0:
match self._left_bound:
match self._cfg.left_bound:
case DynShiftBC.ZEROPAD:
npad_left = self.margin_left
samples = np.concatenate([np.zeros(npad_left), samples])
......@@ -401,15 +517,15 @@ class DynamicShiftNumpy:
samples = np.concatenate([np.ones(npad_left) * samples[0], samples])
case DynShiftBC.EXCEPTION:
msg = (
f"DynamicShiftNumpy: left edge handling {self._left_bound.name} not "
f"possible for given max delay {self._max_delay}."
f"DynamicShiftNumpy: left edge handling {self._cfg.left_bound.name} not "
f"possible for given max delay {self._cfg.max_delay}."
)
raise RuntimeError(msg)
case _ as unreachable:
assert_never(unreachable)
if self.margin_right > 0:
match self._right_bound:
match self._cfg.right_bound:
case DynShiftBC.ZEROPAD:
samples = np.concatenate([samples, np.zeros(self.margin_right)])
case DynShiftBC.FLAT:
......@@ -418,8 +534,8 @@ class DynamicShiftNumpy:
)
case DynShiftBC.EXCEPTION:
msg = (
f"DynamicShiftNumpy: right edge handling {self._right_bound.name} not "
f"possible for given min delay {self._min_delay=}."
f"DynamicShiftNumpy: right edge handling {self._cfg.right_bound.name} not "
f"possible for given min delay {self._cfg.min_delay=}."
)
raise RuntimeError(msg)
case _ as unreachable:
......@@ -434,9 +550,7 @@ class DynamicShiftNumpy:
loc = -shift
offsets = self.margin_left + np.arange(shift_shape)
samples_shifted = self._op_interp(samples_needed, loc, offsets)
return make_numpy_array_1d(samples_shifted)
return self._interp_np(samples_needed, loc, offsets)
def make_dynamic_shift_lagrange_numpy(
......@@ -458,5 +572,6 @@ def make_dynamic_shift_lagrange_numpy(
Returns:
Interpolation function of type DynamicShiftNumpy
"""
interp = RegularInterpLagrange(length)
return DynamicShiftNumpy(min_delay, max_delay, left_bound, right_bound, interp)
interp = make_regular_interpolator_lagrange(length)
cfg = DynShiftCfg(min_delay, max_delay, left_bound, right_bound)
return DynamicShiftNumpy(cfg, interp)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment