"README.md" did not exist on "b49c6d6d81023bbddaee563e7cd474a377d50f9c"
Newer
Older
"""
Time coordinate inversion
=========================
Functions for inverting a 1D (time)coordinate transform given as numpy array
The inversion function internally requires an interpolation operator implementing the
RegularInterpolator interface, and which is provided by the user. Use
make_shift_inverse_lagrange_numpy to create an inversion operator employing Lagrange
.. autoclass:: ShiftInverseNumpy
:members:
:private-members:
:special-members:
.. autofunction:: make_shift_inverse_lagrange_numpy
.. autofunction:: fixed_point_iter
"""
from __future__ import annotations
import numpy as np
from lisainstrument.fir_filters_numpy import NumpyArray1D, make_numpy_array_1d
from lisainstrument.regular_interpolators import (
RegularInterpolator,
make_regular_interpolator_lagrange,
)
def fixed_point_iter(
f: Callable[[NumpyArray1D], NumpyArray1D],
ferr: Callable[[NumpyArray1D, NumpyArray1D], float],
x: NumpyArray1D,
tolerance: float,
max_iter: int,
) -> NumpyArray1D:
r"""Perform a fixed point iteration for functions operating on a 1D array.
This uses fixed-point iteration to find a solution for
.. math::
x = f(x)
where :math:`x` is a 1D array and :math:`f` returns an array of the same size.
The convergence criterion is provided by the user
via a function :math:`r(x)` that returns a scalar error measure. The iteration
is performed until :math:`r(x) < \epsilon`. If convergence is not achieved
after a given number of iterations, an exception is raised.
Arguments:
f: The function :math:`f(x)`
ferr: The error measure :math:`r(x)`
x: The initial data for the iteration.
tolerance: The tolerance $\epsilon$
max_iter: Maximum number of iterations
Returns:
Array with solution
"""
for _ in range(max_iter):
x_next = f(x)
err = ferr(x, x_next)
if err < tolerance:
return x_next
x = x_next
msg = (
f"ShiftInverseNumpy: iteration did not converge (error={err}, "
f"tolerance={tolerance}), iterations={max_iter}"
)
raise RuntimeError(msg)
class ShiftInverseNumpy: # pylint: disable=too-few-public-methods
r"""Invert time coordinate transformation given as numpy array
The purpose of this class is the inversion of a 1D coordinate transform
:math:`v \rightarrow U(v)` between some coordinates :math:`u` and :math:`v`.
The inverse is denoted :math:`u \rightarrow V(u)`, i.e. :math:`V(U(v)) = v`.
The transform is expressed as a
shift
:math:`\delta U(v) = U(v) - v`.
It will be provided as samples
:math:`\delta U_k = \delta U(v_k)`
at regularly spaced sample locations
:math:`v_k = v_0 + k \Delta v`.
The inverse will be expressed as
:math:`\delta V(u) = u - V(u)`.
It will be computed for locations
:math:`u_l = u_0 + l \Delta u` regularly spaced with respect to the
:math:`u`-coordinate, i.e. the output will be the sequence
:math:`\delta V_l = u_l - V(u_l)`.
Currently, we restrict to the special case where :math:`u_k = v_k`,
i.e. identical offsets :math:`v_0 = u_0` and spacings :math:`\Delta u = \Delta v`.
By convention, the coordinates refer to times, and coordinate shift,
tolerance, and sample rate are all given in SI units.
"""
def __init__(
self,
fsample: float,
max_abs_shift: float,
interp: RegularInterpolator,
max_iter: int,
tolerance: float,
):
r"""Set up the coordinate inversion operator.
One needs to provide the sample rate :math:`f_s` used
both for input and output sequences. All calls to the operator will
assume regularly sampled sequences with :math:`\Delta u = \Delta v = 1 / f_s`.
For technical reasons, one also needs to provide a limit for the maximum
shift between the coordinates, i.e. :math:`|\delta U(v)| < S_\mathrm{max}`.
Another parameter is an interpolation operator to be used in the internal
fixed-point iteration algorithm. One also needs to provide the desired
accuracy of the resulting coordinate shift, as well as a limit for the
allowed number of iterations before aborting with an exception.
fsample: Sample rate :math:`f_s > 0` [s]
max_abs_shift: Upper limit :math:`S_\mathrm{max} \ge 0` [s] for coordinate shift
interp: Interpolation operator
max_iter: Maximum iterations before fail
tolerance: Maximum absolute error [s] of result
self._fsample: Final = float(fsample)
self._max_abs_shift_idx: Final = int(np.ceil(max_abs_shift * self._fsample))
self._interp_np: Final = interp
self._max_iter = int(max_iter)
self._tolerance_idx = float(tolerance * self._fsample)
if self._fsample <= 0:
msg = f"ShiftInverseNumpy: fsample must be strictly positive, got {fsample}"
raise ValueError(msg)
if max_abs_shift < 0:
msg = f"ShiftInverseNumpy: max_abs_shift must be positive, got {max_abs_shift}"
raise ValueError(msg)
if self._max_iter <= 0:
msg = f"ShiftInverseNumpy: max_iter must be strictly positive integer, got {max_iter}"
raise ValueError(msg)
Wolfgang Kastaun
committed
def margin_left(self) -> int:
"""Left margin size.
Specifies how many samples on the left have to be added by boundary conditions.
"""
return self._interp_np.margin_left + self._max_abs_shift_idx
Wolfgang Kastaun
committed
def margin_right(self) -> int:
"""Right margin size.
Specifies how many samples on the right have to be added by boundary conditions.
"""
return self._interp_np.margin_right + self._max_abs_shift_idx
def __call__(self, shift: np.ndarray) -> NumpyArray1D:
r"""Compute the inverse coordinate transform
The coordinate transform is given an array with the sequence
:math:`\delta U_k = \delta U(v_k)`. The sample locations :math:`v_k` do not have to be provided
but are assumed to be regularly spaced, i.e.
:math:`v_k = v_0 + k \Delta v`, and the first array element corresponds to
:math:`k=0`.
The output is given as an array with the sequence
:math:`\delta V_l = u_l - V(u_l)` providing the shift
at sample locations regularly spaced in the transformed coordinate
:math:`u`, that is, at points :math:`u_l = u_0 + l \Delta u`. The first
element of the output corresponds to :math:`l=0`.
The output sample locations are not returned, but are implicitly
equal to the input ones, meaning :math:`v_l = u_l`.
The algorithm works using fixed point iteration
:math:`\delta V_l^{n+1} = f(l,\delta V_l^{n})`,
:math:`f(l,d) = I[v_k, \delta U_k](v_l - d)`, and :math:`I[v_k, \delta U_k]` is a
function obtained by interpolating the samples :math:`v_k, \delta U_k`,
approximating :math:`I[v_k, \delta U_k](v) \approx \delta U(v)`.
On the technical level, the interpolation operator is implemented
using shifts directly, with an interface of the form
:math:`I[\delta U_k](d_l) \approx \delta U(v_l + d_l)`.
Hence, :math:`f(l,d_l) = I[\delta U_k](-d_l) \approx \delta U(v_l - d_l)`.
If the iteration converges, it converges to a solution
:math:`\delta \bar{V}_l = f(l,\delta \bar{V}_l)
\approx \delta U(v_l - \delta\bar{V}_l) = \delta U(u_l - \delta\bar{V}_l)`,
where we used the implicit convention that :math:`u_l = v_l`.
This equation fulfilled for :math:`\delta\bar{V}_l` is indeed the one that
needs to be fulfilled for the desired quantity :math:`\delta V_l`, which
can be shown as follows:
:math:`u_l = U(V(u_l)) = \delta U(V(u_l)) + V(u_l)` and hence
:math:`\delta V_l = u_l - V(u_l) = \delta U(V(u_l)) = \delta U(u_l - \delta V_l)`.
This shows that the iteration converges to the correct solution if
it converges.
The initial value is :math:`\delta V_l^0 = \delta U_l`.
The iteration is repeated until
:math:`\max_l |\delta V_l^{n+1} - \delta V_l^{n} | < \epsilon`,
where :math:`\epsilon` is the tolerance specified when constructing
the operator.
During the interpolation, we use flat boundary conditions, meaning that
any time shift sample required outside the provided range will be replaced
by the value of the nearest avaialble sample. The number of points potentially
affected by boundary conditions is given by the margin_left and margin_right
properties.
shift: 1D numpy array with shifts :math:`\delta U_k` [s]
1D numpy array with shifts :math:`\delta V_l` [s]
Wolfgang Kastaun
committed
shift_idx = shift * self._fsample
Wolfgang Kastaun
committed
dx = make_numpy_array_1d(shift_idx)
Wolfgang Kastaun
committed
(self.margin_left, self.margin_right),
mode="constant",
Wolfgang Kastaun
committed
constant_values=(shift_idx[0], shift_idx[-1]),
def f_iter(x: NumpyArray1D) -> NumpyArray1D:
Wolfgang Kastaun
committed
return self._interp_np.apply_shift(dx_pad, -x, self.margin_left)
def f_err(x1: NumpyArray1D, x2: NumpyArray1D) -> float:
Wolfgang Kastaun
committed
shift_idx_inv = fixed_point_iter(
f_iter, f_err, dx, self._tolerance_idx, self._max_iter
)
shift_inv = shift_idx_inv / self._fsample
Wolfgang Kastaun
committed
return make_numpy_array_1d(shift_inv)
def make_shift_inverse_lagrange_numpy(
order: int,
fsample: float,
max_abs_shift: float,
max_iter: int,
tolerance: float,
) -> ShiftInverseNumpy:
r"""Set up ShiftInverseNumpy instance with Lagrange interpolation method.
Arguments:
order: Order of the Lagrange polynomials
fsample: Sample rate $f_s > 0$ [s]
max_abs_shift: Upper limit $S_\mathrm{max} \ge 0 $ [s] for coordinate shift
max_iter: Maximum iterations before fail
tolerance: Maximum absolute error of result
Returns:
Inversion function of type ShiftInverseNumpy
"""
interp = make_regular_interpolator_lagrange(order)
return ShiftInverseNumpy(
fsample=fsample,
max_abs_shift=max_abs_shift,
interp=interp,
max_iter=max_iter,
tolerance=tolerance,
)