Skip to content
Snippets Groups Projects
Commit d11887d0 authored by Wolfgang Kastaun's avatar Wolfgang Kastaun
Browse files

Added module for dynamic shift interpolation of dask arrays.

This includes a rewrite of the Lagrange interpolator using a new algorithm based on FIR filters.
parent 9478ea8e
No related branches found
No related tags found
No related merge requests found
"""Functions for applying dynamic real-valued shifts to dask arrays using Lagrange interpolation
Use make_dynamic_shift_lagrange_dask to create a Lagrange interpolator for dask arrays.
"""
from __future__ import annotations
import functools
import operator
from enum import Enum
from typing import Callable, Protocol
import dask
import dask.array as da
import numpy as np
from numpy.polynomial import Polynomial
from lisainstrument.fir_filters_dask import DaskArray1D, make_dask_array_1d
from lisainstrument.fir_filters_numpy import (
DefFilterFIR,
EdgeHandling,
FilterFirNumpyType,
NumpyArray1D,
make_filter_fir_numpy,
make_numpy_array_1d,
)
class RegularInterpMethod(Protocol):
"""
Class instances implementing this protocol act as a function accepting a 1D array
of sample values and locations to interpolate the samples to. The x-coordinate coincides
with the sample index at integer locations. Locations are passed as a 1D array of floating
point values and an integer array with additional integer offsets. The reason is
to avoid loss of accuracy by adding largert integers to smaller offsets.
The floating point part of the location is not restricted, however (except by the
range of provided data). The integer part of the locations is split off and combined
with the integer offsets, the residual is denoted here as fractional locations.
Boundary treatment is not part of this protocol but is the responability of higher level
interfaces that are nont specific to the interpolation method. Here, sample locations have
to be restricted to the range that can be computed without boundary conditions.
The corresponding margins sizes are provided as properties.
"""
@property
def margin_left(self) -> int:
"""Margin size (>= 0) on the left boundary
The interpolator cannot be called with locations within this margin from the leftmost sample.
"""
@property
def margin_right(self) -> int:
"""Margin size (>= 0) on the right boundary
The interpolator cannot be called with locations within this margin from the rightmost sample.
"""
def __call__(
self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0
) -> NumpyArray1D:
"""Interpolate regularly spaced data to location in index-space
The locations to interpolate to are the sum of the locations and int_offsets arguments.
The location argument is an 1D array with arbitrary floating point locations, and the
int_offset argument is an integer or integer array with additional integer offsets. The locations
argument is not restricted, i.e. it is not necessarily the residual fractional locations but
can have arbitrary values.
The locations refer to the index of the array with the sampled data, starting at 0.
The length of samples array does not have to match the size of the location arrays.
If int_offsets is an array, it needs to have same size arrays location and. All
arrays need to be onedimensional.
Arguments:
samples: 1D numpy array with sampled data
locations: real-valued 1D numpy array with locations to interpolate to
int_offsets: integer or integer 1D array with additional offsets to the locations.
Returns:
Interpolated samples.
"""
def check_arguments(
self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int
) -> None:
"""Ensure arguments have correct types"""
if isinstance(int_offsets, np.ndarray):
if int_offsets.shape != locations.shape:
msg = (
f"RegularInterpLinear: inconsistent arrays sizes of "
f"locations ({locations.shape}) and offsets ({int_offsets.shape})"
)
raise ValueError(msg)
if not np.issubdtype(int_offsets.dtype, np.integer):
msg = (
"RegularInterpLinear: non-integer dtype for int_offsets not allowed"
)
raise TypeError(msg)
elif not isinstance(int_offsets, int):
msg = "RegularInterpLinear: int_offsets must be int or int array"
raise TypeError(msg)
if not np.issubdtype(locations.dtype, np.floating):
msg = "RegularInterpLinear: non-float dtype for locations not allowed"
raise TypeError(msg)
def make_lagrange_polynomials(length: int, offset: int) -> list[Polynomial]:
r"""Construct lagrange interpolating polynomials
This constructs Lagrange interpolation polynomials with given order,
specialized to regularly spaced coordinates with spacing of one, with a
center specified in terms of an integer offset.
This produces $N$ polynomials $p_j(x)$ of order $N-1$, which satisfy
$p_j(k) = 1$ if $k = j+D$ and $p_j(k) = 0$ for integers $k=0 \ldots N-1, k \ne j$
Arguments:
length: The number $N$ of polynomials of order $N-1$
offset: The offset $D$
Returns:
List of polynomials $p_j$ given as numpy Polynomial objects
"""
def k_j(j: int) -> Polynomial:
x = Polynomial([0.0, 1.0])
ms = [i for i in range(length) if i != j]
pm = [(x - offset - m) / (j - m) for m in ms]
return functools.reduce(operator.mul, pm)
return [k_j(j) for j in range(length)]
class RegularInterpLagrange(RegularInterpMethod):
r"""Class implementing interpolation of regularly spaced 1D data using Lagrange polynomials.
The algorithm uses Lagrange interpolation is specialized to regularly spaced data.
The coefficients of the Lagrange polynomials are computed in advance, and converted
to a set of FIR filters. The FIR filters will be applied to the samples and the result
at the integer locations multiplied with the corresponding power of the fractional
locations.
For each interpolation location, this uses a stencil with center as close to the
location as possible. For odd length, the center point is obtained by rounding the
location, and that the remaining fractional shift is within $[-1/2,1/2]$. For even
locations, the center points is the floor of the location, with remaining fractional
shift within $[0,1)$
See RegularInterpMethod for general properties not specific to the interpolation method.
"""
@staticmethod
def _make_firs(length: int, offset: int) -> list[FilterFirNumpyType]:
"""Set up lagrange polynomials and convert coefficients to FIR filters"""
plag = make_lagrange_polynomials(length, offset)
coeffs = np.array([p.convert().coef for p in plag]).T
filts: list[FilterFirNumpyType] = []
for c in coeffs:
fdef = DefFilterFIR(filter_coeffs=c, offset=offset)
filt = make_filter_fir_numpy(fdef, EdgeHandling.VALID, EdgeHandling.VALID)
filts.append(filt)
return filts
def __init__(self, length: int):
"""Set up interpolation parameters.
The order of the interpolation is chosen in terms of the stencil
size (number of samples used for each interpolation point). The size
is one plus the order of the interpolator, meaning the order of a polynomial
that is still interpolated with zero errror.
Arguments:
length: Size of the lagrange stencil
"""
if length <= 1:
msg = (
f"RegularInterpLagrange: stencil length must be 2 or more, got {length}"
)
raise ValueError(msg)
offset = -((length - 1) // 2)
self._length: int = length
self._offset: int = offset
self._fir_filt = self._make_firs(length, offset)
@property
def margin_left(self) -> int:
"""Margin size (>= 0) on the left boundary
The interpolator cannot be called with locations within this margin from the leftmost sample.
"""
return -self._offset
@property
def margin_right(self) -> int:
"""Margin size (>= 0) on the right boundary
The interpolator cannot be called with locations within this margin from the rightmost sample.
"""
return self._length - 1 + self._offset
def __call__(
self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0
) -> NumpyArray1D:
"""Interpolate regularly spaced data to location in index-space
See RegularInterpMethod.__call__
Arguments:
samples: 1D numpy array with sampled data
locations: real-valued 1D numpy array with locations to interpolate to
int_offsets: integer or integer 1D array with additional offsets to the locations.
Returns:
Interpolated samples.
"""
self.check_arguments(samples, locations, int_offsets)
if self._length % 2 == 0:
loc_int = np.floor(locations).astype(int)
else:
loc_int = np.round(locations).astype(int)
loc_frac = locations - loc_int
k = loc_int + int_offsets - self.margin_left
if np.any(k < 0):
msg = "RegularInterpLagrange: interpolation requires samples below provided range"
raise RuntimeError(msg)
if np.any(k >= samples.shape[0] - self._length + 1):
msg = "RegularInterpLagrange: interpolation requires samples above provided range"
raise RuntimeError(msg)
result = np.zeros(locations.shape[0], dtype=samples.dtype)
xpow = np.ones_like(loc_frac)
for fir in self._fir_filt:
result[:] += fir(samples)[k] * xpow
xpow[:] *= loc_frac
return make_numpy_array_1d(result)
class RegularInterpLinear(RegularInterpMethod):
"""Class implementing interpolation of regularly spaced 1D data using linear interpolation.
See RegularInterpMethod for general properties not specific to the interpolation method.
"""
@property
def margin_left(self) -> int:
"""Margin size (= 0) on the left boundary
The linear interpolator can be called for all locations within the sample range.
"""
return 0
@property
def margin_right(self) -> int:
"""Margin size (= 0) on the right boundary
The linear interpolator can be called for all locations within the sample range.
"""
return 0
def __call__(
self, samples, locations: NumpyArray1D, int_offsets: NumpyArray1D | int = 0
) -> NumpyArray1D:
"""Interpolate regularly spaced data to location in index-space
See RegularInterpMethod.__call__
Arguments:
samples: 1D numpy array with sampled data
locations: real-valued 1D numpy array with locations to interpolate to
int_offsets: integer or integer 1D array with additional offsets to the locations.
Returns:
Interpolated samples.
"""
self.check_arguments(samples, locations, int_offsets)
loc_floor = np.floor(locations)
loc_frac = locations - loc_floor
k = loc_floor.astype(int) + int_offsets
if np.any(k < 0) or np.any(k + 1 >= samples.shape[0]):
msg = "RegularInterpLinear: interpolation requires samples out of provided range"
raise RuntimeError(msg)
return samples[k] * (1.0 - loc_frac) + samples[k + 1] * loc_frac
class DynShiftBC(Enum):
"""Enum for various methods of handling boundaries in dynamic shift
ZEROPAD: Assume all data outside given range is zero.
FLAT: Assume all data outside given range is equal to value on nearest boundary
EXCEPTION: raise exception if data outside range is needed
"""
EXCEPTION = 1
ZEROPAD = 2
FLAT = 3
class DynamicShiftDask:
"""Interpolate to locations given by a non-const shift.
This allows to interpolate samples in a dask array to locations specified
by a shift given by another dask array of same size. The shift is specified in
units of the array index, i.e. there is no separate coordinate array.
A negative shift refers to values left of a given sample, positive shifts
to values on the right.
The boundary treatment can be specified for each boundary in terms of
DynShiftBC enums.
The interpolation method is not fixed but provided via an interpolator
instance implementing the RegularInterpMethod protocol.
For technical reasons, the shift values have to be within some bound that
has to be provided.
"""
def __init__(
self,
min_delay: float,
max_delay: float,
left_bound: EdgeHandling,
right_bound: EdgeHandling,
interp: RegularInterpMethod,
):
"""Set up interpolation parameters
Arguments:
min_delay: Assume that any shift < -min_delay
max_delay: Assume that any shift > -max_delay
left_bound: Treatment of left boundary
right_bound: Treatment of right boundary
interp: implementation of an interpolation method
"""
if min_delay > max_delay:
msg = f"dynamic_shift_dask: invalid delay range {min_delay=}, {max_delay=}"
raise ValueError(msg)
self._min_delay = int(np.floor(min_delay))
self._max_delay = int(np.ceil(max_delay))
self._left_bound = left_bound
self._right_bound = right_bound
self._interp_np = interp
def _op_interp(
self, samp: np.ndarray, loc: np.ndarray, ofs: np.ndarray
) -> np.ndarray:
"""helper method ensuring correct array dimensionalities"""
return self._interp_np(
make_numpy_array_1d(samp),
make_numpy_array_1d(loc),
make_numpy_array_1d(ofs),
)
@property
def margin_left(self) -> int:
"""Left margin size.
If positive, specifies how many samples on the left have to be added by boundary conditions.
If negative, specifies how many samples on the left are unused.
"""
return self._interp_np.margin_left + self._max_delay
@property
def margin_right(self) -> int:
"""Right margin size.
If positive, specifies how many samples on the right have to be added by boundary conditions.
If negative, specifies how many samples on the right are unused.
"""
return self._interp_np.margin_right - self._min_delay
def __call__(self, samples: da.Array, shift: da.Array) -> DaskArray1D:
"""Apply shift given by dask array to samples in another dask array.
The shift and sample arrays need to have the same size, and each shift provides
the interpolation location relative to the sample with the same index.
Shifts are floating point values.A shift of +1 refers to the sample on the right,
-1 the sample on the left, etc. All arrays have to be 1D.
Arguments:
samples: 1D dask array with data samples
shift: 1D dask array with shifts
Returns:
Dask array with interpolated samples
"""
npad_left = 0
if self.margin_left > 0:
match self._left_bound:
case DynShiftBC.ZEROPAD:
npad_left = self.margin_left
samples = da.concatenate([da.zeros(npad_left), samples])
case DynShiftBC.FLAT:
npad_left = self.margin_left
samples = da.concatenate([da.ones(npad_left) * samples[0], samples])
case _:
msg = (
f"DynamicShiftDask: left edge handling {self._left_bound.name} not "
f"possible for given max delay {self._max_delay}."
)
raise RuntimeError(msg)
if self.margin_right > 0:
match self._right_bound:
case DynShiftBC.ZEROPAD:
samples = da.concatenate([samples, da.zeros(self.margin_right)])
case DynShiftBC.FLAT:
samples = da.concatenate(
[samples, da.ones(self.margin_right) * samples[-1]]
)
case _:
msg = (
f"DynamicShiftDask: right edge handling {self._right_bound.name} not "
f"possible for given min delay {self._min_delay=}."
)
raise RuntimeError(msg)
chunks = shift.to_delayed()
delayed_op = dask.delayed(self._op_interp)
results = []
pos = 0
for chunk, chunk_shape in zip(chunks, shift.chunks[0], strict=True):
n_size = npad_left + chunk_shape + self.margin_right
n_first = pos + npad_left - self.margin_left
samples_needed = samples[n_first : n_first + n_size]
loc = -chunk
offsets = self.margin_left + da.arange(chunk_shape, chunks=chunk_shape)
samples_shifted = delayed_op(samples_needed, loc, offsets)
delayed_chunk = da.from_delayed(
samples_shifted, (chunk_shape,), samples.dtype
)
results.append(delayed_chunk)
pos += chunk_shape
return make_dask_array_1d(da.concatenate(results, axis=0))
def make_dynamic_shift_linear_dask(
min_delay: float,
max_delay: float,
left_bound: EdgeHandling,
right_bound: EdgeHandling,
) -> DynamicShiftDask:
"""Set up DynamicShiftDask instance with linear interpolation method.
Arguments:
min_delay: Assume that any shift < -min_delay
max_delay: Assume that any shift > -max_delay
left_bound: Treatment of left boundary
right_bound: Treatment of right boundary
Returns:
Interpolation function of type DynamicShiftDask
"""
interp = RegularInterpLinear()
return DynamicShiftDask(min_delay, max_delay, left_bound, right_bound, interp)
def make_dynamic_shift_lagrange_dask(
length: int,
min_delay: float,
max_delay: float,
left_bound: EdgeHandling,
right_bound: EdgeHandling,
) -> DynamicShiftDask:
"""Set up DynamicShiftDask instance with Lagrange interpolation method.
Arguments:
length: Number of lagrange polynomials (of order length - 1)
min_delay: Assume that any shift < -min_delay
max_delay: Assume that any shift > -max_delay
left_bound: Treatment of left boundary
right_bound: Treatment of right boundary
Returns:
Interpolation function of type DynamicShiftDask
"""
interp = RegularInterpLagrange(length)
return DynamicShiftDask(min_delay, max_delay, left_bound, right_bound, interp)
def numpyfy_dask_bivariate(
dafunc: Callable[[da.Array, da.Array], DaskArray1D], chunks: int
) -> Callable[[np.ndarray, np.ndarray], NumpyArray1D]:
"""Convert function operating on dask arrays to work with numpy arrays.
The dask function will be called with two chunked dask arrays created from
the numpy arrays, and then evaluate the result.
Arguments:
dafunc: function operating on two dask arrays returning single 1D dask array
chunks: chunk size to be used internally
Returns:
function operating on numpy arrays
"""
def func(x, y):
dx = da.from_array(x, chunks=chunks)
dy = da.from_array(y, chunks=chunks)
r = dafunc(make_dask_array_1d(dx), make_dask_array_1d(dy)).compute()
return make_numpy_array_1d(r)
return func
class DynamicShiftNumpy:
"""Interpolate to locations given by a non-const shift.
This allows to interpolate samples in a numpy array to locations specified
by a shift given by another numpy array of same size. The shift is specified in
units of the array index, i.e. there is no separate coordinate array.
A negative shift refers to values left of a given sample, positive shifts
to values on the right.
The boundary treatment can be specified for each boundary in terms of
DynShiftBC enums.
The interpolation method is not fixed but provided via an interpolator
instance implementing the RegularInterpMethod protocol.
For technical reasons, the shift values have to be within some bound that
has to be provided.
"""
def __init__(
self,
min_delay: float,
max_delay: float,
left_bound: EdgeHandling,
right_bound: EdgeHandling,
interp: RegularInterpMethod,
):
if min_delay > max_delay:
msg = f"dynamic_shift_dask: invalid delay range {min_delay=}, {max_delay=}"
raise ValueError(msg)
self._min_delay = int(np.floor(min_delay))
self._max_delay = int(np.ceil(max_delay))
self._left_bound = left_bound
self._right_bound = right_bound
self._interp_np = interp
def _op_interp(
self, samp: np.ndarray, loc: np.ndarray, ofs: np.ndarray
) -> np.ndarray:
"""helper method ensuring correct array dimensionalities"""
return self._interp_np(
make_numpy_array_1d(samp),
make_numpy_array_1d(loc),
make_numpy_array_1d(ofs),
)
@property
def margin_left(self) -> int:
"""Left margin size.
If positive, specifies how many samples on the left have to be added by boundary conditions.
If negative, specifies how many samples on the left are unused.
"""
return self._interp_np.margin_left + self._max_delay
@property
def margin_right(self) -> int:
"""Right margin size.
If positive, specifies how many samples on the right have to be added by boundary conditions.
If negative, specifies how many samples on the right are unused.
"""
return self._interp_np.margin_right - self._min_delay
def __call__(self, samples: np.ndarray, shift: np.ndarray) -> NumpyArray1D:
"""Apply shift given by numpy array to samples in another numpy array.
The shift and sample arrays need to have the same size, and each shift provides
the interpolation location relative to the sample with the same index.
Shifts are floating point values.A shift of +1 refers to the sample on the right,
-1 the sample on the left, etc. All arrays have to be 1D.
Arguments:
samples: 1D numpy array with data samples
shift: 1D numpy array with shifts
Returns:
Numpy array with interpolated samples
"""
npad_left = 0
if self.margin_left > 0:
match self._left_bound:
case DynShiftBC.ZEROPAD:
npad_left = self.margin_left
samples = np.concatenate([np.zeros(npad_left), samples])
case DynShiftBC.FLAT:
npad_left = self.margin_left
samples = np.concatenate([np.ones(npad_left) * samples[0], samples])
case _:
msg = (
f"DynamicShiftNumpy: left edge handling {self._left_bound.name} not "
f"possible for given max delay {self._max_delay}."
)
raise RuntimeError(msg)
if self.margin_right > 0:
match self._right_bound:
case DynShiftBC.ZEROPAD:
samples = np.concatenate([samples, np.zeros(self.margin_right)])
case DynShiftBC.FLAT:
samples = np.concatenate(
[samples, np.ones(self.margin_right) * samples[-1]]
)
case _:
msg = (
f"DynamicShiftNumpy: right edge handling {self._right_bound.name} not "
f"possible for given min delay {self._min_delay=}."
)
raise RuntimeError(msg)
pos = 0
shift_shape = len(shift)
n_size = npad_left + shift_shape + self.margin_right
n_first = pos + npad_left - self.margin_left
samples_needed = samples[n_first : n_first + n_size]
loc = -shift
offsets = self.margin_left + np.arange(shift_shape)
samples_shifted = self._op_interp(samples_needed, loc, offsets)
return make_numpy_array_1d(samples_shifted)
def make_dynamic_shift_lagrange_numpy(
length: int,
min_delay: float,
max_delay: float,
left_bound: EdgeHandling,
right_bound: EdgeHandling,
) -> DynamicShiftNumpy:
"""Set up DynamicShiftNumpy instance with Lagrange interpolation method.
Arguments:
length: Number of lagrange polynomials (of order length - 1)
min_delay: Assume that any shift < -min_delay
max_delay: Assume that any shift > -max_delay
left_bound: Treatment of left boundary
right_bound: Treatment of right boundary
Returns:
Interpolation function of type DynamicShiftNumpy
"""
interp = RegularInterpLagrange(length)
return DynamicShiftNumpy(min_delay, max_delay, left_bound, right_bound, interp)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment