diff --git a/lisainstrument/dynamic_delay_dask.py b/lisainstrument/dynamic_delay_dask.py index f2d59c3a8ff76404592062703ad75d433db096cb..fb058bdfb1fa52590db86fad495c9a318bfa918e 100644 --- a/lisainstrument/dynamic_delay_dask.py +++ b/lisainstrument/dynamic_delay_dask.py @@ -10,6 +10,7 @@ from typing import Callable, Final import dask import dask.array as da import numpy as np +from typing_extensions import assert_never from lisainstrument.dynamic_delay_numpy import ( DynShiftBC, @@ -121,12 +122,14 @@ class DynamicShiftDask: case DynShiftBC.FLAT: npad_left = self.margin_left samples = da.concatenate([da.ones(npad_left) * samples[0], samples]) - case _: + case DynShiftBC.EXCEPTION: msg = ( f"DynamicShiftDask: left edge handling {self._left_bound.name} not " f"possible for given max delay {self._max_delay}." ) raise RuntimeError(msg) + case _ as unreachable: + assert_never(unreachable) if self.margin_right > 0: match self._right_bound: @@ -136,12 +139,14 @@ class DynamicShiftDask: samples = da.concatenate( [samples, da.ones(self.margin_right) * samples[-1]] ) - case _: + case DynShiftBC.EXCEPTION: msg = ( f"DynamicShiftDask: right edge handling {self._right_bound.name} not " f"possible for given min delay {self._min_delay=}." ) raise RuntimeError(msg) + case _ as unreachable: + assert_never(unreachable) chunks = shift.to_delayed() delayed_op = dask.delayed(self._op_interp) diff --git a/lisainstrument/dynamic_delay_numpy.py b/lisainstrument/dynamic_delay_numpy.py index 2a30cd92decb84c71e086501e8a12ba2a28a423e..c75e29c8e2af8a48847f697b87dc2a62a6eb14ca 100644 --- a/lisainstrument/dynamic_delay_numpy.py +++ b/lisainstrument/dynamic_delay_numpy.py @@ -12,6 +12,7 @@ from typing import Final, Protocol import numpy as np from numpy.polynomial import Polynomial +from typing_extensions import assert_never from lisainstrument.fir_filters_numpy import ( DefFilterFIR, @@ -398,12 +399,14 @@ class DynamicShiftNumpy: case DynShiftBC.FLAT: npad_left = self.margin_left samples = np.concatenate([np.ones(npad_left) * samples[0], samples]) - case _: + case DynShiftBC.EXCEPTION: msg = ( f"DynamicShiftNumpy: left edge handling {self._left_bound.name} not " f"possible for given max delay {self._max_delay}." ) raise RuntimeError(msg) + case _ as unreachable: + assert_never(unreachable) if self.margin_right > 0: match self._right_bound: @@ -413,12 +416,14 @@ class DynamicShiftNumpy: samples = np.concatenate( [samples, np.ones(self.margin_right) * samples[-1]] ) - case _: + case DynShiftBC.EXCEPTION: msg = ( f"DynamicShiftNumpy: right edge handling {self._right_bound.name} not " f"possible for given min delay {self._min_delay=}." ) raise RuntimeError(msg) + case _ as unreachable: + assert_never(unreachable) pos = 0 shift_shape = len(shift)