Skip to content
Snippets Groups Projects
Commit ebcb4052 authored by Wolfgang Kastaun's avatar Wolfgang Kastaun
Browse files

draft for wrapper of dsp.timeshift fixed shift functionality

parent 3c9a4fc8
No related branches found
No related tags found
No related merge requests found
"""Functions for applying fixed real-valued shifts to dask arrays using dsp.timeshift
There are two implementations of fixed-shift Lagrange interpolation:
1. fixed_shift_dsp.make_fixed_shift_dsp wraps legacy implementation dsp.timeshift
into new interface
2. fixed_shift_numpy.make_fixed_shift_lagrange provides a completely new implementation
of Lagrange interpolation
Use make_fixed_shift_dsp_dask to create an interpolator based on dsp.timeshift for dask arrays.
"""
from __future__ import annotations
from typing import Final
import numpy as np
from lisainstrument import dsp
from lisainstrument.fixed_shift_dask import FixedShiftDask
from lisainstrument.dynamic_delay_numpy import DynShiftBC
from lisainstrument.fir_filters_numpy import NumpyArray1D, make_numpy_array_1d
from lisainstrument.fixed_shift_numpy import (
make_numpy_array_1d_float,
FixedShiftCore,
FixedShiftFactory,
)
class FixedShiftWrapDSP(FixedShiftCore):
r"""Class implementing fixed shift of regularly spaced 1D data using dsp.timeshift().
This class is an adaptor making dsp.timeshift function available through a
FixedShiftCore interface.
See FixedShiftCore for general properties not specific to the interpolation method.
"""
def __init__(self, length: int, shift: float):
"""Set up interpolation parameters.
The length parameter specifies the number of interpolation polynomials,
which have order length - 1. The length is also the number of samples used
for each interpolation point. The order of the interpolating polynomials
is also the order of plynomials that are interpolated with zero error.
The shift is given as a delay, i.e. positive values refer to locations
to the left of the output sample at hand.
Arguments:
length: Size of the interpolation stencil
shift: The constant shift
"""
if length <= 1:
msg = f"FixedShiftWrapDSP: stencil length must be 2 or more, got {length}"
raise ValueError(msg)
if length % 2 != 0:
msg = "FixedShiftWrapDSP: even Lagrange polynomial order not supported by dsp.timeshift"
raise ValueError(msg)
loc = -float(shift)
loc_int = int(np.floor(loc))
# ~ loc_frac = loc - loc_int
margin_lagr_left = (length - 1) // 2
margin_lagr_right = length - 1 - margin_lagr_left
self._margin_left: Final = max(0, margin_lagr_left - loc_int)
self._margin_right: Final = max(0, margin_lagr_right + loc_int)
self._shift: Final = shift
self._order: Final = length - 1
@property
def margin_left(self) -> int:
"""Margin size (>= 0) needed on the left boundary"""
return self._margin_left
@property
def margin_right(self) -> int:
"""Margin size (>= 0) needed on the right boundary"""
return self._margin_right
@property
def shift(self) -> float:
"""The shift"""
return self._shift
def apply(self, samples: np.ndarray) -> NumpyArray1D:
"""Apply shift, see FixedShiftCore.apply()
Arguments:
samples: 1D numpy array with sampled data
Returns:
Shifted input excluding margins
"""
a = make_numpy_array_1d_float(samples)
res = dsp.timeshift(a, -self.shift, self._order)
return make_numpy_array_1d(res)
@staticmethod
def factory(length: int) -> FixedShiftFactory:
"""Factory for making FixedShiftWrapDSP instances
Arguments:
length: number of Lagrange polynomials to use
Returns:
Factory function for making FixedShiftWrapDSP instances from shift
"""
def factory(shift: float) -> FixedShiftCore:
"""FixedShiftWrapDSP instances from shift using preselected order
Arguments:
shift: The fixed shift
"""
return FixedShiftWrapDSP(length, shift)
return factory
def make_fixed_shift_dsp_dask(
left_bound: DynShiftBC, right_bound: DynShiftBC, length: int
) -> FixedShiftDask:
"""Create a FixedShiftDask instance that uses dsp.timeshift interpolator
Arguments:
left_bound: boundary treatment on the left
right_bound: boundary treatment on the right
length: number of Lagrange plolynomials (=order + 1)
Returns:
Fixed shift interpolator
"""
fac = FixedShiftWrapDSP.factory(length)
return FixedShiftDask(left_bound, right_bound, fac)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment