Skip to content
Snippets Groups Projects
Commit 23c6811e authored by Wolfgang Kastaun's avatar Wolfgang Kastaun
Browse files

Changed shift inversion interface and expanded documentation.

-Sample rate now specified when constructing operator
-Tolerance now given in same units as shifts, instead of index space.
parent 95fd8471
No related branches found
No related tags found
No related merge requests found
...@@ -22,34 +22,51 @@ from lisainstrument.shift_inversion_numpy import fixed_point_iter ...@@ -22,34 +22,51 @@ from lisainstrument.shift_inversion_numpy import fixed_point_iter
class ShiftInverseDask: class ShiftInverseDask:
"""Invert coordinate transformation given as shift""" """Invert time coordinate transformation given as dask array
See shift_inversion_numpy.ShiftInverseNumpy for details.
"""
def __init__( def __init__(
self, self,
fsample: float,
max_abs_shift: float, max_abs_shift: float,
interp: RegularInterpolator, interp: RegularInterpolator,
max_iter: int, max_iter: int,
tolerance: float, tolerance: float,
): ):
"""Set up interpolator. r"""Set up the coordinate inversion operator.
See shift_inversion_numpy.ShiftInverseNumpy for details
Arguments: Arguments:
max_abs_shift: Upper limit for absolute difference between coordinate frames w.r.t index space fsample: Sample rate $f_s > 0$ [s]
interp: Interpolation method max_abs_shift: Upper limit $S_\mathrm{max} \ge 0 $ [s] for coordinate shift
interp: Interpolation operator
max_iter: Maximum iterations before fail max_iter: Maximum iterations before fail
tolerance: Maximum absolute error of result tolerance: Maximum absolute error [s] of result
""" """
self._max_abs_shift: Final = int(np.ceil(max_abs_shift)) self._fsample: Final = float(fsample)
if self._max_abs_shift < 0: self._max_abs_shift_idx: Final = int(np.ceil(max_abs_shift * self._fsample))
self._interp_np: Final = interp
self._max_iter = int(max_iter)
self._tolerance_idx = float(tolerance * self._fsample)
if self._fsample <= 0:
msg = f"ShiftInverseDask: fsample must be strictly positive, got {fsample}"
raise ValueError(msg)
if max_abs_shift < 0:
msg = ( msg = (
f"ShiftInverseDask: max_abs_shift must be positive, got {max_abs_shift}" f"ShiftInverseDask: max_abs_shift must be positive, got {max_abs_shift}"
) )
raise ValueError(msg) raise ValueError(msg)
self._interp_np: Final = interp if self._max_iter <= 0:
self._max_iter = int(max_iter) msg = f"ShiftInverseDask: max_iter must be strictly positive integer, got {max_iter}"
self._tolerance = float(tolerance) raise ValueError(msg)
@property @property
def margin_left(self) -> int: def margin_left(self) -> int:
...@@ -57,7 +74,7 @@ class ShiftInverseDask: ...@@ -57,7 +74,7 @@ class ShiftInverseDask:
Specifies how many samples on the left have to be added by boundary conditions. Specifies how many samples on the left have to be added by boundary conditions.
""" """
return self._interp_np.margin_left + self._max_abs_shift return self._interp_np.margin_left + self._max_abs_shift_idx
@property @property
def margin_right(self) -> int: def margin_right(self) -> int:
...@@ -65,11 +82,24 @@ class ShiftInverseDask: ...@@ -65,11 +82,24 @@ class ShiftInverseDask:
Specifies how many samples on the right have to be added by boundary conditions. Specifies how many samples on the right have to be added by boundary conditions.
""" """
return self._interp_np.margin_right + self._max_abs_shift return self._interp_np.margin_right + self._max_abs_shift_idx
def _fixed_point_iter(self, dx_pad: np.ndarray, dx: np.ndarray) -> NumpyArray1D:
"""Call the fixed point iteration on a single chunk
The input shift includes margin points in addition to the corresponding
output points. One needs to specify initial data for the iteration, with
same shape as the result. Both are shorter than the input shift by the
left plus right margin size.
Arguments:
dx_pad: numpy array with shift
dx: initial value for iteration
Returns:
Converged iteration result
"""
def _fixed_point_iter(
self, dx_pad: np.ndarray, dx: np.ndarray, tol: float
) -> NumpyArray1D:
def f_iter(x: NumpyArray1D) -> NumpyArray1D: def f_iter(x: NumpyArray1D) -> NumpyArray1D:
return self._interp_np.apply_shift( return self._interp_np.apply_shift(
make_numpy_array_1d(dx_pad), -x, self.margin_left make_numpy_array_1d(dx_pad), -x, self.margin_left
...@@ -79,25 +109,24 @@ class ShiftInverseDask: ...@@ -79,25 +109,24 @@ class ShiftInverseDask:
return np.max(np.abs(x1 - x2)) return np.max(np.abs(x1 - x2))
return fixed_point_iter( return fixed_point_iter(
f_iter, f_err, make_numpy_array_1d(dx), tol, self._max_iter f_iter, f_err, make_numpy_array_1d(dx), self._tolerance_idx, self._max_iter
) )
def __call__(self, shift: da.Array, fsample: float) -> DaskArray1D: def __call__(self, shift: da.Array) -> DaskArray1D:
"""Find the shift for the inverse transformation given by a shift. """Compute the inverse coordinate transform
See shift_inversion_numpy.ShiftInverseNumpy for details
Arguments: Arguments:
shift: 1D dask array with shifts of the coordinate transform shift: 1D dask array with shifts of the coordinate transform [s]
fsample: sample rate of shift array
Returns: Returns:
1D dask array with shift at transformed coordinate 1D dask array with shift [s] at transformed coordinate
""" """
shift_idx = shift * fsample shift_idx = shift * self._fsample
make_dask_array_1d(shift_idx) make_dask_array_1d(shift_idx)
tol_idx = self._tolerance * fsample
dx_pad = da.pad( dx_pad = da.pad(
shift_idx, shift_idx,
(self.margin_left, self.margin_right), (self.margin_left, self.margin_right),
...@@ -114,7 +143,7 @@ class ShiftInverseDask: ...@@ -114,7 +143,7 @@ class ShiftInverseDask:
n_size = self.margin_left + chunk_shape + self.margin_right n_size = self.margin_left + chunk_shape + self.margin_right
n_first = pos n_first = pos
samples_needed = dx_pad[n_first : n_first + n_size] samples_needed = dx_pad[n_first : n_first + n_size]
samples_shifted = delayed_op(samples_needed, chunk, tol_idx) samples_shifted = delayed_op(samples_needed, chunk)
delayed_chunk = da.from_delayed( delayed_chunk = da.from_delayed(
samples_shifted, (chunk_shape,), shift_idx.dtype samples_shifted, (chunk_shape,), shift_idx.dtype
) )
...@@ -122,48 +151,63 @@ class ShiftInverseDask: ...@@ -122,48 +151,63 @@ class ShiftInverseDask:
pos += chunk_shape pos += chunk_shape
shift_idx_inv = da.concatenate(results, axis=0) shift_idx_inv = da.concatenate(results, axis=0)
shift_inv = shift_idx_inv / fsample shift_inv = shift_idx_inv / self._fsample
return make_dask_array_1d(shift_inv) return make_dask_array_1d(shift_inv)
def make_shift_inverse_lagrange_dask( def make_shift_inverse_lagrange_dask(
order: int, order: int,
fsample: float,
max_abs_shift: float, max_abs_shift: float,
max_iter: int, max_iter: int,
tolerance: float, tolerance: float,
) -> ShiftInverseDask: ) -> ShiftInverseDask:
"""Set up ShiftInverseDask instance with Lagrange interpolation method. r"""Set up ShiftInverseDask instance with new Lagrange interpolation method.
Arguments: Arguments:
order: Order of the Lagrange polynomials order: Order of the Lagrange polynomials
max_abs_shift: Upper limit for absolute difference between coordinate frames w.r.t index space fsample: Sample rate $f_s > 0$ [s]
max_abs_shift: Upper limit $S_\mathrm{max} \ge 0 $ [s] for coordinate shift
max_iter: Maximum iterations before fail max_iter: Maximum iterations before fail
tolerance: Maximum absolute error of result tolerance: Maximum absolute error of result
Returns: Returns:
Inversion function of type ShiftInverseNumpy Inversion function of type ShiftInverseDask
""" """
interp = make_regular_interpolator_lagrange(order) interp = make_regular_interpolator_lagrange(order)
return ShiftInverseDask(max_abs_shift, interp, max_iter, tolerance) return ShiftInverseDask(
fsample=fsample,
max_abs_shift=max_abs_shift,
interp=interp,
max_iter=max_iter,
tolerance=tolerance,
)
def make_shift_inverse_dsp_dask( def make_shift_inverse_dsp_dask(
order: int, order: int,
fsample: float,
max_abs_shift: float, max_abs_shift: float,
max_iter: int, max_iter: int,
tolerance: float, tolerance: float,
) -> ShiftInverseDask: ) -> ShiftInverseDask:
"""Set up ShiftInverseDask instance using dsp.timeshift as interpolation method. r"""Set up ShiftInverseDask instance using dsp.timeshift as interpolation method.
Arguments: Arguments:
order: Order of the Lagrange polynomials order: Order of the Lagrange polynomials
max_abs_shift: Upper limit for absolute difference between coordinate frames w.r.t index space fsample: Sample rate $f_s > 0$ [s]
max_abs_shift: Upper limit $S_\mathrm{max} \ge 0 $ [s] for coordinate shift
max_iter: Maximum iterations before fail max_iter: Maximum iterations before fail
tolerance: Maximum absolute error of result tolerance: Maximum absolute error of result
Returns: Returns:
Inversion function of type ShiftInverseNumpy Inversion function of type ShiftInverseDask
""" """
interp = make_regular_interpolator_dsp(order) interp = make_regular_interpolator_dsp(order)
return ShiftInverseDask(max_abs_shift, interp, max_iter, tolerance) return ShiftInverseDask(
fsample=fsample,
max_abs_shift=max_abs_shift,
interp=interp,
max_iter=max_iter,
tolerance=tolerance,
)
...@@ -61,32 +61,81 @@ def fixed_point_iter( ...@@ -61,32 +61,81 @@ def fixed_point_iter(
class ShiftInverseNumpy: # pylint: disable=too-few-public-methods class ShiftInverseNumpy: # pylint: disable=too-few-public-methods
"""Invert coordinate transformation given as shift""" r"""Invert time coordinate transformation given as numpy array
The purpose of this class is the inversion of a 1D coordinate transform
between some coordinates $u$ and $v$.
The transform is expressed as a
shift
$ S(v) = u(v) - v $.
It will be provided as samples
$ S_k = S(v_k) $
at regularly spaced sample locations
$ v_k = v_0 + k \Delta v $.
The inverse will be expressed as
$\hat{S}(u) = u - v(u)$.
It will be computed for locations
$ u_l = u_0 + l \Delta u $ regularly spaced with respect to the
$u$-coordinate, i.e. the output will be the sequence
$ \hat{S}_l = u_l - \hat{S}(u_l) $.
Currently, we restrict to the special case where $u_k = v_k$,
i.e. $v_0 = u_0$ and $\Delta u = \Delta v$.
By convention, the coordinates refer to times, and coordinate shift,
tolerance, and sample rate are all given in SI units.
"""
def __init__( def __init__(
self, self,
fsample: float,
max_abs_shift: float, max_abs_shift: float,
interp: RegularInterpolator, interp: RegularInterpolator,
max_iter: int, max_iter: int,
tolerance: float, tolerance: float,
): ):
"""Set up interpolator. r"""Set up the coordinate inversion operator.
One needs to provide the sample rate $f_s$ used
both for input and output sequences. All calls to the operator will
assume regularly sampled sequences with $\Delta u = \Delta v = 1 / f_s$.
For technical reasons, one also needs to provide a limit for the maximum
shift between the coordinates, i.e. $|S(v)| < S_\mathrm{max}$.
Another parameter is an interpolation operator to be used in the internal
fixed-point iteration algorithm. One also needs to provide the desired
accuracy of the resulting coordinate shift, as well as a limit for the
allowed number of iterations before aborting with an exception.
Arguments: Arguments:
max_abs_shift: Upper limit for absolute difference between coordinate frames w.r.t index space fsample: Sample rate $f_s > 0$ [s]
interp: Interpolation method max_abs_shift: Upper limit $S_\mathrm{max} \ge 0 $ [s] for coordinate shift
interp: Interpolation operator
max_iter: Maximum iterations before fail max_iter: Maximum iterations before fail
tolerance: Maximum absolute error of result tolerance: Maximum absolute error [s] of result
""" """
self._max_abs_shift: Final = int(np.ceil(max_abs_shift))
if self._max_abs_shift < 0: self._fsample: Final = float(fsample)
msg = f"ShiftInverseNumpy: max_abs_shift must be positive, got {max_abs_shift}" self._max_abs_shift_idx: Final = int(np.ceil(max_abs_shift * self._fsample))
raise ValueError(msg)
self._interp_np: Final = interp self._interp_np: Final = interp
self._max_iter = int(max_iter) self._max_iter = int(max_iter)
self._tolerance = float(tolerance) self._tolerance_idx = float(tolerance * self._fsample)
if self._fsample <= 0:
msg = f"ShiftInverseNumpy: fsample must be strictly positive, got {fsample}"
raise ValueError(msg)
if max_abs_shift < 0:
msg = f"ShiftInverseNumpy: max_abs_shift must be positive, got {max_abs_shift}"
raise ValueError(msg)
if self._max_iter <= 0:
msg = f"ShiftInverseNumpy: max_iter must be strictly positive integer, got {max_iter}"
raise ValueError(msg)
@property @property
def margin_left(self) -> int: def margin_left(self) -> int:
...@@ -94,7 +143,7 @@ class ShiftInverseNumpy: # pylint: disable=too-few-public-methods ...@@ -94,7 +143,7 @@ class ShiftInverseNumpy: # pylint: disable=too-few-public-methods
Specifies how many samples on the left have to be added by boundary conditions. Specifies how many samples on the left have to be added by boundary conditions.
""" """
return self._interp_np.margin_left + self._max_abs_shift return self._interp_np.margin_left + self._max_abs_shift_idx
@property @property
def margin_right(self) -> int: def margin_right(self) -> int:
...@@ -102,20 +151,32 @@ class ShiftInverseNumpy: # pylint: disable=too-few-public-methods ...@@ -102,20 +151,32 @@ class ShiftInverseNumpy: # pylint: disable=too-few-public-methods
Specifies how many samples on the right have to be added by boundary conditions. Specifies how many samples on the right have to be added by boundary conditions.
""" """
return self._interp_np.margin_right + self._max_abs_shift return self._interp_np.margin_right + self._max_abs_shift_idx
def __call__(self, shift: np.ndarray, fsample: float) -> NumpyArray1D: def __call__(self, shift: np.ndarray) -> NumpyArray1D:
"""Find the shift for the inverse transformation given by a shift. r"""Compute the inverse coordinate transform
The coordinate transform is given an array with the sequence
$ S_k = S(v_k) $. The sample locations $v_k$ do not have to be provided
but are assumed to be regularly spaced, i.e.
$ v_k = v_0 + k \Delta v $, and the first array element corresponds to
$k=0$.
The output is given as an array with the sequence
$ \hat{S}_l $ (see constructor docstring) providing the shift
at sample locations regularly spaced in the transformed coordinate
$u$, that is, at points $ u_l = u_0 + l \Delta u $. The first
element of the output corresponds to $l=0$.
The output sample locations are not returned, but are implicitly
equal to the input ones, meaning $ v_l = u_l $.
Arguments: Arguments:
shift: 1D numpy array with shifts of the coordinate transform shift: 1D numpy array with shifts of the coordinate transform [s]
fsample: sample rate of shift array
Returns: Returns:
1D numpy array with shift at transformed coordinate 1D numpy array with shift [s] at transformed coordinate
""" """
shift_idx = shift * fsample shift_idx = shift * self._fsample
dx = make_numpy_array_1d(shift_idx) dx = make_numpy_array_1d(shift_idx)
dx_pad = np.pad( dx_pad = np.pad(
...@@ -132,24 +193,26 @@ class ShiftInverseNumpy: # pylint: disable=too-few-public-methods ...@@ -132,24 +193,26 @@ class ShiftInverseNumpy: # pylint: disable=too-few-public-methods
return np.max(np.abs(x1 - x2)) return np.max(np.abs(x1 - x2))
shift_idx_inv = fixed_point_iter( shift_idx_inv = fixed_point_iter(
f_iter, f_err, dx, self._tolerance * fsample, self._max_iter f_iter, f_err, dx, self._tolerance_idx, self._max_iter
) )
shift_inv = shift_idx_inv / fsample shift_inv = shift_idx_inv / self._fsample
return make_numpy_array_1d(shift_inv) return make_numpy_array_1d(shift_inv)
def make_shift_inverse_lagrange_numpy( def make_shift_inverse_lagrange_numpy(
order: int, order: int,
fsample: float,
max_abs_shift: float, max_abs_shift: float,
max_iter: int, max_iter: int,
tolerance: float, tolerance: float,
) -> ShiftInverseNumpy: ) -> ShiftInverseNumpy:
"""Set up ShiftInverseNumpy instance with Lagrange interpolation method. r"""Set up ShiftInverseNumpy instance with Lagrange interpolation method.
Arguments: Arguments:
order: Order of the Lagrange polynomials order: Order of the Lagrange polynomials
max_abs_shift: Upper limit for absolute difference between coordinate frames w.r.t index space fsample: Sample rate $f_s > 0$ [s]
max_abs_shift: Upper limit $S_\mathrm{max} \ge 0 $ [s] for coordinate shift
max_iter: Maximum iterations before fail max_iter: Maximum iterations before fail
tolerance: Maximum absolute error of result tolerance: Maximum absolute error of result
...@@ -157,4 +220,10 @@ def make_shift_inverse_lagrange_numpy( ...@@ -157,4 +220,10 @@ def make_shift_inverse_lagrange_numpy(
Inversion function of type ShiftInverseNumpy Inversion function of type ShiftInverseNumpy
""" """
interp = make_regular_interpolator_lagrange(order) interp = make_regular_interpolator_lagrange(order)
return ShiftInverseNumpy(max_abs_shift, interp, max_iter, tolerance) return ShiftInverseNumpy(
fsample=fsample,
max_abs_shift=max_abs_shift,
interp=interp,
max_iter=max_iter,
tolerance=tolerance,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment