Skip to content
Snippets Groups Projects
shift_inversion_numpy.py 10.4 KiB
Newer Older
"""
Time coordinate inversion
=========================

Functions for inverting a 1D (time)coordinate transform given as numpy array
Wolfgang Kastaun's avatar
Wolfgang Kastaun committed
The inversion function internally requires an interpolation operator implementing the
RegularInterpolator interface, and which is provided by the user. Use
make_shift_inverse_lagrange_numpy to create an inversion operator employing Lagrange



.. autoclass:: ShiftInverseNumpy
    :members:
    :private-members:
    :special-members:


.. autofunction:: make_shift_inverse_lagrange_numpy

.. autofunction:: fixed_point_iter


"""

from __future__ import annotations

from typing import Callable, Final
from lisainstrument.fir_filters_numpy import NumpyArray1D, make_numpy_array_1d
from lisainstrument.regular_interpolators import (
    RegularInterpolator,
    make_regular_interpolator_lagrange,
)


def fixed_point_iter(
    f: Callable[[NumpyArray1D], NumpyArray1D],
    ferr: Callable[[NumpyArray1D, NumpyArray1D], float],
    x: NumpyArray1D,
    tolerance: float,
    max_iter: int,
) -> NumpyArray1D:
    r"""Perform a fixed point iteration for functions operating on a 1D array.

    This uses fixed-point iteration to find a solution for

    .. math::
       x = f(x)

    where :math:`x` is a 1D array and :math:`f` returns an array of the same size.

    The convergence criterion is provided by the user
    via a function :math:`r(x)` that returns a scalar error measure. The iteration
    is performed until :math:`r(x) < \epsilon`. If convergence is not achieved
    after a given number of iterations, an exception is raised.

    Arguments:
        f: The function :math:`f(x)`
        ferr: The error measure :math:`r(x)`
        x: The initial data for the iteration.
        tolerance: The tolerance $\epsilon$
        max_iter: Maximum number of iterations

    Returns:
        Array with solution
    """
    for _ in range(max_iter):
        x_next = f(x)
        err = ferr(x, x_next)
        if err < tolerance:
            return x_next
        x = x_next
    msg = (
        f"ShiftInverseNumpy: iteration did not converge (error={err}, "
        f"tolerance={tolerance}), iterations={max_iter}"
    )
    raise RuntimeError(msg)


Wolfgang Kastaun's avatar
Wolfgang Kastaun committed
class ShiftInverseNumpy:  # pylint: disable=too-few-public-methods
    r"""Invert time coordinate transformation given as numpy array

    The purpose of this class is the inversion of a 1D coordinate transform
    :math:`v \rightarrow U(v)` between some coordinates :math:`u` and :math:`v`.
    The inverse is denoted :math:`u \rightarrow V(u)`, i.e. :math:`V(U(v)) = v`.

    The transform is expressed as a
    shift
    :math:`\delta U(v) = U(v) - v`.
    It will be provided as samples
    :math:`\delta U_k = \delta U(v_k)`
    at regularly spaced sample locations
    :math:`v_k = v_0 + k \Delta v`.
    :math:`\delta V(u) = u - V(u)`.
    It will be computed for locations
    :math:`u_l = u_0 + l \Delta u` regularly spaced with respect to the
    :math:`u`-coordinate, i.e. the output will be the sequence
    :math:`\delta V_l = u_l - V(u_l)`.
    Currently, we restrict to the special case where :math:`u_k = v_k`,
    i.e. identical offsets :math:`v_0 = u_0` and spacings :math:`\Delta u = \Delta v`.

    By convention, the coordinates refer to times, and coordinate shift,
    tolerance, and sample rate are all given in SI units.
    """
        max_abs_shift: float,
        interp: RegularInterpolator,
        max_iter: int,
        tolerance: float,
    ):
        r"""Set up the coordinate inversion operator.

        One needs to provide the sample rate :math:`f_s` used
        both for input and output sequences. All calls to the operator will
        assume regularly sampled sequences with :math:`\Delta u = \Delta v = 1 / f_s`.

        For technical reasons, one also needs to provide a limit for the maximum
        shift between the coordinates, i.e. :math:`|\delta U(v)| < S_\mathrm{max}`.

        Another parameter is an interpolation operator to be used in the internal
        fixed-point iteration algorithm. One also needs to provide the desired
        accuracy of the resulting coordinate shift, as well as a limit for the
        allowed number of iterations before aborting with an exception.
            fsample: Sample rate :math:`f_s > 0` [s]
            max_abs_shift: Upper limit :math:`S_\mathrm{max} \ge 0` [s] for coordinate shift
            interp: Interpolation operator
            max_iter: Maximum iterations before fail
            tolerance: Maximum absolute error [s] of result
Wolfgang Kastaun's avatar
Wolfgang Kastaun committed

        self._fsample: Final = float(fsample)
        self._max_abs_shift_idx: Final = int(np.ceil(max_abs_shift * self._fsample))
Wolfgang Kastaun's avatar
Wolfgang Kastaun committed

        self._interp_np: Final = interp
        self._max_iter = int(max_iter)
        self._tolerance_idx = float(tolerance * self._fsample)

        if self._fsample <= 0:
            msg = f"ShiftInverseNumpy: fsample must be strictly positive, got {fsample}"
            raise ValueError(msg)

        if max_abs_shift < 0:
            msg = f"ShiftInverseNumpy: max_abs_shift must be positive, got {max_abs_shift}"
            raise ValueError(msg)

        if self._max_iter <= 0:
            msg = f"ShiftInverseNumpy: max_iter must be strictly positive integer, got {max_iter}"
            raise ValueError(msg)
        """Left margin size.

        Specifies how many samples on the left have to be added by boundary conditions.
        """
        return self._interp_np.margin_left + self._max_abs_shift_idx
        """Right margin size.

        Specifies how many samples on the right have to be added by boundary conditions.
        """
        return self._interp_np.margin_right + self._max_abs_shift_idx

    def __call__(self, shift: np.ndarray) -> NumpyArray1D:
        r"""Compute the inverse coordinate transform

        The coordinate transform is given an array with the sequence
        :math:`\delta U_k = \delta U(v_k)`. The sample locations :math:`v_k` do not have to be provided
        but are assumed to be regularly spaced, i.e.
        :math:`v_k = v_0 + k \Delta v`, and the first array element corresponds to
        :math:`k=0`.
        The output is given as an array with the sequence
        :math:`\delta V_l = u_l - V(u_l)` providing the shift
        at sample locations regularly spaced in the transformed coordinate
        :math:`u`, that is, at points :math:`u_l = u_0 + l \Delta u`. The first
        element of the output corresponds to :math:`l=0`.
        The output sample locations are not returned, but are implicitly
        equal to the input ones, meaning :math:`v_l = u_l`.

        The algorithm works using fixed point iteration
        :math:`\delta V_l^{n+1} = f(l,\delta V_l^{n})`,
        :math:`f(l,d) = I[v_k, \delta U_k](v_l - d)`, and :math:`I[v_k, \delta U_k]` is a
        function obtained by interpolating the samples :math:`v_k, \delta U_k`,
        approximating  :math:`I[v_k, \delta U_k](v) \approx \delta U(v)`.
        On the technical level, the interpolation operator is implemented
        using shifts directly, with an interface of the form
        :math:`I[\delta U_k](d_l) \approx \delta U(v_l + d_l)`.
        Hence, :math:`f(l,d_l) = I[\delta U_k](-d_l) \approx \delta U(v_l - d_l)`.

        If the iteration converges, it converges to a solution
Wolfgang Kastaun's avatar
Wolfgang Kastaun committed
        :math:`\delta \bar{V}_l = f(l,\delta \bar{V}_l)
        \approx \delta U(v_l - \delta\bar{V}_l) =  \delta U(u_l - \delta\bar{V}_l)`,
        where we used the implicit convention that :math:`u_l = v_l`.
        This equation fulfilled for :math:`\delta\bar{V}_l` is indeed the one that
        needs to be fulfilled for the desired quantity :math:`\delta V_l`, which
        :math:`u_l = U(V(u_l)) = \delta U(V(u_l)) + V(u_l)` and hence
        :math:`\delta V_l = u_l - V(u_l) = \delta U(V(u_l)) = \delta U(u_l - \delta V_l)`.
        This shows that the iteration converges to the correct solution if
        it converges.

        The initial value is :math:`\delta V_l^0 = \delta U_l`.
        The iteration is repeated until
        :math:`\max_l |\delta V_l^{n+1} - \delta V_l^{n} | < \epsilon`,
        where :math:`\epsilon` is the tolerance specified when constructing
        During the interpolation, we use flat boundary conditions, meaning that
        any time shift sample required outside the provided range will be replaced
        by the value of the nearest avaialble sample. The number of points potentially
        affected by boundary conditions is given by the margin_left and margin_right
        properties.

            shift: 1D numpy array with shifts :math:`\delta U_k` [s]
            1D numpy array with shifts :math:`\delta V_l` [s]
        shift_idx = shift * self._fsample
        dx_pad = np.pad(
            dx,
            constant_values=(shift_idx[0], shift_idx[-1]),
        def f_iter(x: NumpyArray1D) -> NumpyArray1D:
            return self._interp_np.apply_shift(dx_pad, -x, self.margin_left)

        def f_err(x1: NumpyArray1D, x2: NumpyArray1D) -> float:
            return np.max(np.abs(x1 - x2))
            f_iter, f_err, dx, self._tolerance_idx, self._max_iter
        shift_inv = shift_idx_inv / self._fsample


def make_shift_inverse_lagrange_numpy(
    order: int,
    max_abs_shift: float,
    max_iter: int,
    tolerance: float,
) -> ShiftInverseNumpy:
    r"""Set up ShiftInverseNumpy instance with Lagrange interpolation method.

    Arguments:
        order: Order of the Lagrange polynomials
        fsample: Sample rate $f_s > 0$ [s]
        max_abs_shift: Upper limit $S_\mathrm{max} \ge 0 $ [s] for coordinate shift
        max_iter: Maximum iterations before fail
        tolerance: Maximum absolute error of result

    Returns:
        Inversion function of type ShiftInverseNumpy
    """
    interp = make_regular_interpolator_lagrange(order)
    return ShiftInverseNumpy(
        fsample=fsample,
        max_abs_shift=max_abs_shift,
        interp=interp,
        max_iter=max_iter,
        tolerance=tolerance,
    )