Skip to content
Snippets Groups Projects
Commit 33c7e03c authored by Wolfgang Kastaun's avatar Wolfgang Kastaun
Browse files

Added dask version of the FixedShiftNumpy wrapper

parent 68cabacb
No related branches found
No related tags found
No related merge requests found
...@@ -5,7 +5,7 @@ Use make_fixed_shift_lagrange_dask to create a Lagrange interpolator for dask ar ...@@ -5,7 +5,7 @@ Use make_fixed_shift_lagrange_dask to create a Lagrange interpolator for dask ar
from __future__ import annotations from __future__ import annotations
from typing import Final from typing import Callable, Final
import dask import dask
import dask.array as da import dask.array as da
...@@ -166,3 +166,92 @@ def make_fixed_shift_lagrange_dask( ...@@ -166,3 +166,92 @@ def make_fixed_shift_lagrange_dask(
""" """
fac = FixedShiftLagrange.factory(order) fac = FixedShiftLagrange.factory(order)
return FixedShiftDask(left_bound, right_bound, fac) return FixedShiftDask(left_bound, right_bound, fac)
class AdaptiveShiftDask: # pylint: disable=too-few-public-methods
"""Time shifting that accepts fixed or dynamic shifts as well as constant data
Instances act as interpolator function, which delegates to interpolation
methods for fixed or dynamic time shifting. The specialised interpolation
functions are provided during construction.
In addition, instances store a fixed sample rate that is assumed for any
interpolated data, allowing time shifts to be given in time units.
"""
def __init__(
self,
delay_const: Callable[[da.Array, float], da.Array],
delay_dynamic: Callable[[da.Array, da.Array], da.Array],
fsample: float,
):
"""Construct from fixed and dynamic interpolator functions and sample rate
Arguments:
delay_const: Interpolator function with same interface as FixedShiftDask
delay_dynamic: Interpolator function with same interface as DynamicShiftDask
fsample: Sample rate [Hz]
"""
self._delay_const = delay_const
self._delay_dynamic = delay_dynamic
self._fsample = float(fsample)
def fixed(self, x: da.Array | float, shift_time: float) -> da.Array | float:
"""Apply fixed timeshift
Arguments:
x: the data to be shifted, as scalar or 1D array
shift_time: scalar time shift [s]
"""
if isinstance(x, da.Array):
shift_samps = float(shift_time * self._fsample)
if shift_samps == 0:
return x
return self._delay_const(x, shift_samps)
return float(x)
def dynamic(self, x: da.Array | float, shift_time: da.Array) -> da.Array | float:
"""Apply dynamic time shift
The shift is given in time units, and the data is assumed to be sampled
with rate fsample given in the constructor.
Both data and time shift can be scalar or 1D dask arrays. Scalars
are interpreted as constant arrays. In case of scalar data, the same
scalar is returned. In case of scalar shift, a more efficient algorithm
is used, which should yield identical results as for a const shift array.
Args:
x: the data to be shifted, as scalar or 1D array
shift_time: time shift [s], as 1D array
Returns:
The shifted data
"""
if isinstance(x, da.Array):
return self._delay_dynamic(x, shift_time * self._fsample)
return float(x)
def __call__(
self, x: da.Array | float, shift_time: da.Array | float
) -> da.Array | float:
"""Apply adaptive time shift to sequence, accepting scalars to represent constant arrays.
The shift is given in time units, and the data is assumed to be sampled
with rate fsample given in the constructor.
Both data and time shift can be scalar or 1D dask arrays. Scalars
are interpreted as constant arrays. In case of scalar data, the same
scalar is returned. In case of scalar shift, a more efficient algorithm
is used, which should yield identical results as for a const shift array.
Args:
x: the data to be shifted, as scalar or 1D array
shift_time: time shift [s], as scalar or 1D array
Returns:
The shifted data
"""
if isinstance(shift_time, da.Array):
return self.dynamic(x, shift_time)
return self.fixed(x, shift_time)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment