Time coordinate inversion
Functions for inverting a 1D (time)coordinate transform given as numpy array
The inversion function internally requires an interpolation operator implementing the
RegularInterpolator interface, and which is provided by the user. Use
make_shift_inverse_lagrange_numpy to create an inversion operator employing Lagrange
.. autoclass:: ShiftInverseNumpy
.. autofunction:: make_shift_inverse_lagrange_numpy
.. autofunction:: fixed_point_iter
from __future__ import annotations
import numpy as np
from lisainstrument.fir_filters_numpy import NumpyArray1D, make_numpy_array_1d
from lisainstrument.regular_interpolators import (
def fixed_point_iter(
f: Callable[[NumpyArray1D], NumpyArray1D],
ferr: Callable[[NumpyArray1D, NumpyArray1D], float],
x: NumpyArray1D,
tolerance: float,
max_iter: int,
) -> NumpyArray1D:
r"""Perform a fixed point iteration for functions operating on a 1D array.
This uses fixed-point iteration to find a solution for
.. math::
x = f(x)
where :math:`x` is a 1D array and :math:`f` returns an array of the same size.
The convergence criterion is provided by the user
via a function :math:`r(x)` that returns a scalar error measure. The iteration
is performed until :math:`r(x) < \epsilon`. If convergence is not achieved
after a given number of iterations, an exception is raised.
f: The function :math:`f(x)`
ferr: The error measure :math:`r(x)`
x: The initial data for the iteration.
tolerance: The tolerance $\epsilon$
max_iter: Maximum number of iterations
Array with solution
for _ in range(max_iter):
x_next = f(x)
err = ferr(x, x_next)
if err < tolerance:
return x_next
x = x_next
msg = (
f"ShiftInverseNumpy: iteration did not converge (error={err}, "
f"tolerance={tolerance}), iterations={max_iter}"
raise RuntimeError(msg)
class ShiftInverseNumpy: # pylint: disable=too-few-public-methods
r"""Invert time coordinate transformation given as numpy array
The purpose of this class is the inversion of a 1D coordinate transform
between some coordinates :math:`u` and :math:`v`.
The transform is expressed as a
:math:`S(v) = u(v) - v`.
It will be provided as samples
:math:`S_k = S(v_k)`
at regularly spaced sample locations
:math:`v_k = v_0 + k \Delta v`.
The inverse will be expressed as
:math:`\hat{S}(u) = u - v(u)`.
It will be computed for locations
:math:`u_l = u_0 + l \Delta u` regularly spaced with respect to the
:math:`u`-coordinate, i.e. the output will be the sequence
:math:`\hat{S}_l = u_l - v(u_l)`.
Currently, we restrict to the special case where :math:`u_k = v_k`,
i.e. :math:`v_0 = u_0` and :math:`\Delta u = \Delta v`.
By convention, the coordinates refer to times, and coordinate shift,
tolerance, and sample rate are all given in SI units.
def __init__(
fsample: float,
max_abs_shift: float,
interp: RegularInterpolator,
max_iter: int,
tolerance: float,
r"""Set up the coordinate inversion operator.
One needs to provide the sample rate :math:`f_s` used
both for input and output sequences. All calls to the operator will
assume regularly sampled sequences with :math:`\Delta u = \Delta v = 1 / f_s`.
For technical reasons, one also needs to provide a limit for the maximum
shift between the coordinates, i.e. :math:`|S(v)| < S_\mathrm{max}`.
Another parameter is an interpolation operator to be used in the internal
fixed-point iteration algorithm. One also needs to provide the desired
accuracy of the resulting coordinate shift, as well as a limit for the
allowed number of iterations before aborting with an exception.
fsample: Sample rate :math:`f_s > 0` [s]
max_abs_shift: Upper limit :math:`S_\mathrm{max} \ge 0` [s] for coordinate shift
interp: Interpolation operator
max_iter: Maximum iterations before fail
tolerance: Maximum absolute error [s] of result
self._fsample: Final = float(fsample)
self._max_abs_shift_idx: Final = int(np.ceil(max_abs_shift * self._fsample))
self._interp_np: Final = interp
self._max_iter = int(max_iter)
self._tolerance_idx = float(tolerance * self._fsample)
if self._fsample <= 0:
msg = f"ShiftInverseNumpy: fsample must be strictly positive, got {fsample}"
raise ValueError(msg)
if max_abs_shift < 0:
msg = f"ShiftInverseNumpy: max_abs_shift must be positive, got {max_abs_shift}"
raise ValueError(msg)
if self._max_iter <= 0:
msg = f"ShiftInverseNumpy: max_iter must be strictly positive integer, got {max_iter}"
raise ValueError(msg)
Wolfgang Kastaun
def margin_left(self) -> int:
"""Left margin size.
Specifies how many samples on the left have to be added by boundary conditions.
return self._interp_np.margin_left + self._max_abs_shift_idx
Wolfgang Kastaun
def margin_right(self) -> int:
"""Right margin size.
Specifies how many samples on the right have to be added by boundary conditions.
return self._interp_np.margin_right + self._max_abs_shift_idx
def __call__(self, shift: np.ndarray) -> NumpyArray1D:
r"""Compute the inverse coordinate transform
The coordinate transform is given an array with the sequence
:math:`S_k = S(v_k)`. The sample locations :math:`v_k` do not have to be provided
but are assumed to be regularly spaced, i.e.
:math:`v_k = v_0 + k \Delta v`, and the first array element corresponds to
The output is given as an array with the sequence
:math:`\hat{S}_l` (see constructor docstring) providing the shift
at sample locations regularly spaced in the transformed coordinate
:math:`u`, that is, at points :math:`u_l = u_0 + l \Delta u`. The first
element of the output corresponds to :math:`l=0`.
The output sample locations are not returned, but are implicitly
equal to the input ones, meaning :math:`v_l = u_l`.
The algorithm works using fixed point iteration
:math:`\hat{S}_l^{n+1} = f(l,\hat{S}_l^{n})`,
:math:`f(l,d) = I[v_k, S_k](v_l - d)`, and :math:`I[v_k, S_k]` is a
function obtained by interpolating the samples :math:`v_k, S_k`,
approximating :math:`I[v_k, S_k](v) \approx S(v)`.
On the technical level, the interpolation operator is implemented
using shifts directly, with an interface of the form
:math:`I[S_k](d_l) \approx S(v_l + d_l)`.
If the iteration converges, it converges to a solution
:math:`\bar{S}_l = f(l,\bar{S}_l) \approx S(v_l - \bar{S}_l) = S(u_l - \bar{S}_l)`,
where we used the implicit convention that :math:`u_l = v_l`.
This equation fulfilled for :math:`\bar{S}_l` is indeed the one that
needs to be fulfilled for the desired quantity :math:`\hat{S}_l`, which
can be shown as follows:
:math:`u_l = u(v(u_l)) = S(v(u_l)) + v(u_l)` and hence
:math:`\hat{S}_l = u_l - v(u_l) = S(v(u_l)) = S(u_l - \hat{S}_l)`.
This shows that the iteration converges to the correct solution if
it converges.
The initial value is :math:`\hat{S}_l^0 = S_l`.
The iteration is repeated until
:math:`\max_l |\hat{S}_l^{n+1} - \hat{S}_l^{n} | < \epsilon`,
where :math:`\epsilon` is the tolerance specified when constructing
the operator.
shift: 1D numpy array with shifts of the coordinate transform [s]
1D numpy array with shift [s] at transformed coordinate
Wolfgang Kastaun
shift_idx = shift * self._fsample
Wolfgang Kastaun
dx = make_numpy_array_1d(shift_idx)
Wolfgang Kastaun
(self.margin_left, self.margin_right),
Wolfgang Kastaun
constant_values=(shift_idx[0], shift_idx[-1]),
def f_iter(x: NumpyArray1D) -> NumpyArray1D:
Wolfgang Kastaun
return self._interp_np.apply_shift(dx_pad, -x, self.margin_left)
def f_err(x1: NumpyArray1D, x2: NumpyArray1D) -> float:
Wolfgang Kastaun
shift_idx_inv = fixed_point_iter(
f_iter, f_err, dx, self._tolerance_idx, self._max_iter
shift_inv = shift_idx_inv / self._fsample
Wolfgang Kastaun
return make_numpy_array_1d(shift_inv)
def make_shift_inverse_lagrange_numpy(
order: int,
fsample: float,
max_abs_shift: float,
max_iter: int,
tolerance: float,
) -> ShiftInverseNumpy:
r"""Set up ShiftInverseNumpy instance with Lagrange interpolation method.
order: Order of the Lagrange polynomials
fsample: Sample rate $f_s > 0$ [s]
max_abs_shift: Upper limit $S_\mathrm{max} \ge 0 $ [s] for coordinate shift
max_iter: Maximum iterations before fail
tolerance: Maximum absolute error of result
Inversion function of type ShiftInverseNumpy
interp = make_regular_interpolator_lagrange(order)
return ShiftInverseNumpy(