Skip to content
Snippets Groups Projects

Draft: Resolve "Daskify LISA Instrument"

Open Olaf Hartwig requested to merge 158-daskify-lisa-instrument into master
All threads resolved!
1 file
+ 219
0
Compare changes
  • Side-by-side
  • Inline
+ 219
0
"""Functions for applying FIR filters to numpy arrays
To create a fir filter operating on numpy arrays, use the `make_filter_fir_numpy` function.
"""
from __future__ import annotations
from enum import Enum
from typing import Callable, Protocol, TypeAlias
from attrs import field, frozen
import numpy as np
from scipy.signal import convolve
@frozen
class DefFilterFIR:
r"""This dataclass defines a FIR filter
The finite impulse response filter is given by
$$
y_a = \sum_{i=0}^{L-1} K_i x_{a + i + D}
= \sum_{k=a+D}^{a+D+L-1} x_{k} K_{k-a-D}
$$
Note that there are different conventions for the order of coefficients.
In standard convolution notation, the convolution kernel is given by the
coefficients above in reversed order.
Attributes:
filter_coeffs: Filter coefficients $K_i$
offset: Offset $D$
"""
filter_coeffs: list[float] = field(converter=lambda lst: [float(e) for e in lst])
offset: int = field()
@filter_coeffs.validator
def check(self, _, value):
"""Validate filter coefficients"""
if len(value) < 1:
msg = "FIR filter coefficients array needs at least one entry"
raise ValueError(msg)
@property
def gain(self) -> float:
r"""The gain factor for a constant signal
The gain factor is defined by
$$
\sum_{i=0}^{L-1} K_i
$$
"""
return sum(self.filter_coeffs)
@property
def length(self) -> int:
"""The length of the domain of dependence of a given output sample
on the input samples. This does not take into account zeros anywhere
in the coefficients. Thus the result is simply the number of
coefficients.
"""
return len(self.filter_coeffs)
@property
def domain_of_dependence(self) -> tuple[int, int]:
r"""The domain of dependence
A point with index $a$ in the output sequence depends on
indices $a+D \ldots a+D+L-1$. This property provides the
domain of dependence for $a=0$.
"""
return (self.offset, self.length - 1 + self.offset)
def __str__(self) -> str:
return (
f"{self.__class__.__name__} \n"
f"Length = {self.length} \n"
f"Offset = {self.offset} \n"
f"Gain = {self.gain} \n"
f"Coefficients = {self.filter_coeffs} \n"
)
class SemiLocalMapType(Protocol):
"""Protocol for semi-local maps of numpy arrays
This is used to describe array operations which require boundary points
"""
@property
def margin_left(self) -> int:
"""How many points at the left boundary are missing in the output"""
@property
def margin_right(self) -> int:
"""How many points at the right boundary are missing in the output"""
def __call__(self, data_in: np.ndarray) -> np.ndarray:
"""Apply the array operation"""
class FIRCoreOp(SemiLocalMapType):
"""Function class applying FIR to numpy array
This does not include boundary tratment and only returns valid
points. It does provide the margin sizes corresponding to invalid
boundary points on each side.
"""
def __init__(self, fdef: DefFilterFIR):
self._margin_left = -fdef.domain_of_dependence[0]
self._margin_right = fdef.domain_of_dependence[1]
self._convolution_kernel = np.array(fdef.filter_coeffs[::-1], dtype=np.float64)
if self._margin_left < 0 or self._margin_right < 0:
msg = f"FilterFirNumpy instantiated with unsupported domain of dependence {fdef.domain_of_dependence}"
raise ValueError(msg)
@property
def margin_left(self) -> int:
"""How many points at the left boundary are missing in the output"""
return self._margin_left
@property
def margin_right(self) -> int:
"""How many points at the right boundary are missing in the output"""
return self._margin_right
def __call__(self, data_in: np.ndarray) -> np.ndarray:
"""Apply FIR filter using convolution
Only valid points are returned, i.e. points for which the filter stencil fully
overlaps with the data. No zero padding or similar is applied.
Arguments:
data_in: 1D array to be filtered
Returns:
Filtered array. Its size is smaller than the input by `m̀argin_left+margin_right`.
"""
if (dims := len(data_in.shape)) != 1:
msg = f"FIRCoreOP: only 1D arrays are allowed, got {dims} dims"
raise ValueError(msg)
return convolve(data_in, self._convolution_kernel, mode="valid")
class EdgeHandling(Enum):
"""Enum for various methods of handling boundaries in filters.
VALID: Use only valid points that can be computed from the given data,
without zero padding or similar
ZEROPAD: Compute output for every input point, pad input with suitable
number of zeros before applying filter.
"""
VALID = 1
ZEROPAD = 2
def semilocal_map_numpy(
op: SemiLocalMapType,
bound_left: EdgeHandling,
bound_right: EdgeHandling,
data: np.ndarray,
) -> np.ndarray:
"""Apply a semi local map to numpy array and employ boundary conditions.
Arguments:
op: the semi local mapping
bound_left: Boundary treatment on left side
bound_right: Boundary treatment on right side
data: the 1D array to be mapped
Returns:
The mapped data. The size is the same as the input if both boundary
conditions are ZEROPAD. A boundary condition VALID reduces the output
size by the corresponding margin size of the semilocal map.
"""
pad_left = op.margin_left if bound_left == EdgeHandling.ZEROPAD else 0
pad_right = op.margin_right if bound_right == EdgeHandling.ZEROPAD else 0
if pad_left != 0 or pad_right != 0:
data = np.pad(
data, pad_width=(pad_left, pad_right), mode="constant", constant_values=0
)
return op(data)
FilterFirNumpyType: TypeAlias = Callable[[np.ndarray], np.ndarray]
def make_filter_fir_numpy(
fdef: DefFilterFIR, bound_left: EdgeHandling, bound_right: EdgeHandling
) -> FilterFirNumpyType:
"""Create a function that applies a given FIR filter to numpy arrays,
employing the specified boundary treatment.
Arguments:
fdef: The definition of the FIR filter
bound_left: Boundary treatment on left side
bound_right: Boundary treatment on right side
Returns:
Function which accepts a single 1D numpy array as input and returns
the filtered array.
"""
fmap = FIRCoreOp(fdef)
def op(data):
return semilocal_map_numpy(fmap, bound_left, bound_right, data)
return op
Loading