diff --git a/lisainstrument/dynamic_delay_dask.py b/lisainstrument/dynamic_delay_dask.py index cfd87ca9a991bfaedd7b1f21dcace7f2b050400a..cec8cf89d68a9ff7ca6b4d083bb9a463d0e37695 100644 --- a/lisainstrument/dynamic_delay_dask.py +++ b/lisainstrument/dynamic_delay_dask.py @@ -126,18 +126,14 @@ class DynamicShiftDask: assert_never(unreachable) chunks = shift.to_delayed() - delayed_op = dask.delayed(self._interp_np) + delayed_op = dask.delayed(self._interp_np.apply_shift) results = [] pos = 0 for chunk, chunk_shape in zip(chunks, shift.chunks[0], strict=True): n_size = npad_left + chunk_shape + self.margin_right n_first = pos + npad_left - self.margin_left samples_needed = samples[n_first : n_first + n_size] - - loc = -chunk - offsets = self.margin_left + da.arange(chunk_shape, chunks=chunk_shape) - - samples_shifted = delayed_op(samples_needed, loc, offsets) + samples_shifted = delayed_op(samples_needed, chunk, self.margin_left) delayed_chunk = da.from_delayed( samples_shifted, (chunk_shape,), samples.dtype ) diff --git a/lisainstrument/dynamic_delay_numpy.py b/lisainstrument/dynamic_delay_numpy.py index 57fba71d7bdd24900fa3f64b0804d66a5f4da47e..75dc127a2e75443bbf64ed043817efe88a6f4811 100644 --- a/lisainstrument/dynamic_delay_numpy.py +++ b/lisainstrument/dynamic_delay_numpy.py @@ -54,9 +54,9 @@ class RegularInterpCore(Protocol): def apply( self, - samples: np.ndarray, - locations: np.ndarray, - int_offsets: np.ndarray | int = 0, + samples: NumpyArray1D, + locations: NumpyArray1D, + int_offsets: NumpyArray1D | int = 0, ) -> np.ndarray: """Interpolate regularly spaced data to location in index-space @@ -77,10 +77,34 @@ class RegularInterpCore(Protocol): Arguments: samples: 1D numpy array with sampled data locations: real-valued 1D numpy array with locations to interpolate to - int_offsets: integer or integer 1D array with additional offsets to the locations. + int_offsets: integer or integer 1D array with additional offsets to the locations Returns: - Interpolated samples. + Interpolated samples + """ + + def apply_shift( + self, + samples: NumpyArray1D, + shift: NumpyArray1D, + shift_offset: int, + ) -> np.ndarray: + """Iterpolate to location specified in terms of shifts instead absolute locations + + The locations are specified via an array s of real-valued shifts. For the element s[i] of + the shift array with array index i, the absolute location within the index space of the + input samples is given by i - s[i] + ofs, where ofs is a constant integer offset. A zero + shift means the output sample with index i is the input sample with index i+ofs. + The offset can be positive or negative. Shift values that would require samples not + in the input are not allowed. + + Arguments: + samples: 1D numpy array with sampled data + shifts: 1D float numpy array with shifts + shift_offset: constant integer offset + + Returns: + Interpolated samples """ @@ -184,9 +208,9 @@ class RegularInterpLagrange(RegularInterpCore): def apply( self, - samples: np.ndarray, - locations: np.ndarray, - int_offsets: np.ndarray | int = 0, + samples: NumpyArray1D, + locations: NumpyArray1D, + int_offsets: NumpyArray1D | int = 0, ) -> np.ndarray: """Interpolate regularly spaced data to location in index-space @@ -223,6 +247,28 @@ class RegularInterpLagrange(RegularInterpCore): return result + def apply_shift( + self, + samples: NumpyArray1D, + shift: NumpyArray1D, + shift_offset: int, + ) -> np.ndarray: + """Iterpolate to location specified in terms of shifts instead absolute locations + + See RegularInterpCore.apply_shift(). + + Arguments: + samples: 1D numpy array with sampled data + shifts: 1D float numpy array with shifts + shift_offset: constant integer offset + + Returns: + Interpolated samples + """ + loc = -shift + offsets = shift_offset + np.arange(shift.shape[0]) + return self.apply(samples, make_numpy_array_1d(loc), make_numpy_array_1d(offsets)) + class RegularInterpLinear(RegularInterpCore): """Class implementing interpolation of regularly spaced 1D data using linear interpolation. @@ -248,9 +294,9 @@ class RegularInterpLinear(RegularInterpCore): def apply( self, - samples: np.ndarray, - locations: np.ndarray, - int_offsets: np.ndarray | int = 0, + samples: NumpyArray1D, + locations: NumpyArray1D, + int_offsets: NumpyArray1D | int = 0, ) -> np.ndarray: """Interpolate regularly spaced data to location in index-space @@ -276,6 +322,28 @@ class RegularInterpLinear(RegularInterpCore): return samples[k] * (1.0 - loc_frac) + samples[k + 1] * loc_frac + def apply_shift( + self, + samples: NumpyArray1D, + shift: NumpyArray1D, + shift_offset: int, + ) -> np.ndarray: + """Iterpolate to location specified in terms of shifts instead absolute locations + + See RegularInterpCore.apply_shift(). + + Arguments: + samples: 1D numpy array with sampled data + shifts: 1D float numpy array with shifts + shift_offset: constant integer offset + + Returns: + Interpolated samples + """ + loc = -shift + offsets = shift_offset + np.arange(shift.shape[0]) + return self.apply(samples, make_numpy_array_1d(loc), make_numpy_array_1d(offsets)) + class RegularInterpolator: """User-facing class for interpolation of regularly spaced data @@ -374,6 +442,38 @@ class RegularInterpolator: res = self._core.apply(samples, locations, int_offsets) return make_numpy_array_1d(res) + def apply_shift( + self, + samples_: np.ndarray, + shift_: np.ndarray, + shift_offset: int, + ) -> NumpyArray1D: + """Iterpolate to location specified in terms of shifts instead absolute locations + + See RegularInterpCore.apply_shift(). + + Arguments: + samples: 1D numpy array with sampled data + shifts: 1D float numpy array with shifts + shift_offset: constant integer offset + + Returns: + Interpolated samples + """ + + samples = make_numpy_array_1d(samples_) + if not np.issubdtype(samples_.dtype, np.floating): + msg = "RegularInterpolator: non-float dtype for samples not allowed" + raise TypeError(msg) + + shift = make_numpy_array_1d(shift_) + if not np.issubdtype(shift_.dtype, np.floating): + msg = "RegularInterpolator: non-float dtype for shifts not allowed" + raise TypeError(msg) + + res = self._core.apply_shift(samples, shift, shift_offset) + return make_numpy_array_1d(res) + def make_regular_interpolator_lagrange(length: int) -> RegularInterpolator: """Create an interpolator using Lagrange interpolation @@ -549,10 +649,7 @@ class DynamicShiftNumpy: n_first = pos + npad_left - self.margin_left samples_needed = samples[n_first : n_first + n_size] - loc = -shift - offsets = self.margin_left + np.arange(shift_shape) - - return self._interp_np(samples_needed, loc, offsets) + return self._interp_np.apply_shift(samples_needed, shift, self.margin_left) def make_dynamic_shift_lagrange_numpy(