Skip to content
Snippets Groups Projects
Commit a01f280d authored by Wolfgang Kastaun's avatar Wolfgang Kastaun
Browse files

reverted commits that should have gone into a MR instead

parent d91ce505
No related branches found
No related tags found
1 merge request!189Draft: Resolve "Daskify LISA Instrument"
Pipeline #373648 failed
This commit is part of merge request !189. Comments created here will be created in the context of that merge request.
"""Functions for applying FIR filters to dask arrays.
To create a fir filter operating on dask arrays, use the `make_filter_fir_dask` function.
"""
from __future__ import annotations
from typing import Callable, TypeAlias
import dask
import dask.array as da
import numpy as np
from lisainstrument.fir_filters_numpy import (
DefFilterFIR,
EdgeHandling,
FIRCoreOp,
SemiLocalMapType,
)
def semilocal_map_dask(
op: SemiLocalMapType,
bound_left: EdgeHandling,
bound_right: EdgeHandling,
data: dask.array.Array,
) -> dask.array.Array:
"""Apply a semi local map to dask array and employ boundary conditions.
Arguments:
op: the semi local mapping
bound_left: Boundary treatment on left side
bound_right: Boundary treatment on right side
data: the 1D array to be mapped
Returns:
The mapped data. The size is the same as the input if both boundary
conditions are ZEROPAD. A boundary condition VALID reduces the output
size by the corresponding margin size of the semilocal map.
"""
margin = max(op.margin_left, op.margin_right)
def ext_op(d: np.ndarray) -> np.ndarray:
r = op(d)
if op.margin_left < margin:
r = r[margin - op.margin_left :]
if op.margin_right < margin:
r = r[: -(margin - op.margin_right)]
return r
ext = da.map_overlap(ext_op, data, depth={0: margin}, boundary=0, trim=False)
if bound_left == EdgeHandling.VALID:
ext = ext[op.margin_left :]
if bound_right == EdgeHandling.VALID:
ext = ext[: -op.margin_right]
return ext
FilterFirDaskType: TypeAlias = Callable[[dask.array.Array], dask.array.Array]
def make_filter_fir_dask(
fdef: DefFilterFIR, bound_left: EdgeHandling, bound_right: EdgeHandling
) -> FilterFirDaskType:
"""Create a function that applies a given FIR filter to dask arrays,
employing the specified boundary treatment.
Arguments:
fdef: The definition of the FIR filter
bound_left: Boundary treatment on left side
bound_right: Boundary treatment on right side
Returns:
Function which accepts a single 1D dask array as input and returns
the filtered dask array.
"""
fmap = FIRCoreOp(fdef)
def op(data):
return semilocal_map_dask(fmap, bound_left, bound_right, data)
return op
"""Functions for applying FIR filters to numpy arrays
To create a fir filter operating on numpy arrays, use the `make_filter_fir_numpy` function.
"""
from __future__ import annotations
from enum import Enum
from typing import Callable, Protocol, TypeAlias
import numpy as np
from attrs import field, frozen
from scipy.signal import convolve
@frozen
class DefFilterFIR:
r"""This dataclass defines a FIR filter
The finite impulse response filter is given by
$$
y_a = \sum_{i=0}^{L-1} K_i x_{a + i + D}
= \sum_{k=a+D}^{a+D+L-1} x_{k} K_{k-a-D}
$$
Note that there are different conventions for the order of coefficients.
In standard convolution notation, the convolution kernel is given by the
coefficients above in reversed order.
Attributes:
filter_coeffs: Filter coefficients $K_i$
offset: Offset $D$
"""
filter_coeffs: list[float] = field(converter=lambda lst: [float(e) for e in lst])
offset: int = field()
@filter_coeffs.validator
def check(self, _, value):
"""Validate filter coefficients"""
if len(value) < 1:
msg = "FIR filter coefficients array needs at least one entry"
raise ValueError(msg)
@property
def gain(self) -> float:
r"""The gain factor for a constant signal
The gain factor is defined by
$$
\sum_{i=0}^{L-1} K_i
$$
"""
return sum(self.filter_coeffs)
@property
def length(self) -> int:
"""The length of the domain of dependence of a given output sample
on the input samples. This does not take into account zeros anywhere
in the coefficients. Thus the result is simply the number of
coefficients.
"""
return len(self.filter_coeffs)
@property
def domain_of_dependence(self) -> tuple[int, int]:
r"""The domain of dependence
A point with index $a$ in the output sequence depends on
indices $a+D \ldots a+D+L-1$. This property provides the
domain of dependence for $a=0$.
"""
return (self.offset, self.length - 1 + self.offset)
def __str__(self) -> str:
return (
f"{self.__class__.__name__} \n"
f"Length = {self.length} \n"
f"Offset = {self.offset} \n"
f"Gain = {self.gain} \n"
f"Coefficients = {self.filter_coeffs} \n"
)
class SemiLocalMapType(Protocol):
"""Protocol for semi-local maps of numpy arrays
This is used to describe array operations which require boundary points
"""
@property
def margin_left(self) -> int:
"""How many points at the left boundary are missing in the output"""
@property
def margin_right(self) -> int:
"""How many points at the right boundary are missing in the output"""
def __call__(self, data_in: np.ndarray) -> np.ndarray:
"""Apply the array operation"""
class FIRCoreOp(SemiLocalMapType):
"""Function class applying FIR to numpy array
This does not include boundary tratment and only returns valid
points. It does provide the margin sizes corresponding to invalid
boundary points on each side.
"""
def __init__(self, fdef: DefFilterFIR):
self._margin_left = -fdef.domain_of_dependence[0]
self._margin_right = fdef.domain_of_dependence[1]
self._convolution_kernel = np.array(fdef.filter_coeffs[::-1], dtype=np.float64)
if self._margin_left < 0 or self._margin_right < 0:
msg = f"FilterFirNumpy instantiated with unsupported domain of dependence {fdef.domain_of_dependence}"
raise ValueError(msg)
@property
def margin_left(self) -> int:
"""How many points at the left boundary are missing in the output"""
return self._margin_left
@property
def margin_right(self) -> int:
"""How many points at the right boundary are missing in the output"""
return self._margin_right
def __call__(self, data_in: np.ndarray) -> np.ndarray:
"""Apply FIR filter using convolution
Only valid points are returned, i.e. points for which the filter stencil fully
overlaps with the data. No zero padding or similar is applied.
Arguments:
data_in: 1D array to be filtered
Returns:
Filtered array. Its size is smaller than the input by `m̀argin_left+margin_right`.
"""
if (dims := len(data_in.shape)) != 1:
msg = f"FIRCoreOP: only 1D arrays are allowed, got {dims} dims"
raise ValueError(msg)
return convolve(data_in, self._convolution_kernel, mode="valid")
class EdgeHandling(Enum):
"""Enum for various methods of handling boundaries in filters.
VALID: Use only valid points that can be computed from the given data,
without zero padding or similar
ZEROPAD: Compute output for every input point, pad input with suitable
number of zeros before applying filter.
"""
VALID = 1
ZEROPAD = 2
def semilocal_map_numpy(
op: SemiLocalMapType,
bound_left: EdgeHandling,
bound_right: EdgeHandling,
data: np.ndarray,
) -> np.ndarray:
"""Apply a semi local map to numpy array and employ boundary conditions.
Arguments:
op: the semi local mapping
bound_left: Boundary treatment on left side
bound_right: Boundary treatment on right side
data: the 1D array to be mapped
Returns:
The mapped data. The size is the same as the input if both boundary
conditions are ZEROPAD. A boundary condition VALID reduces the output
size by the corresponding margin size of the semilocal map.
"""
pad_left = op.margin_left if bound_left == EdgeHandling.ZEROPAD else 0
pad_right = op.margin_right if bound_right == EdgeHandling.ZEROPAD else 0
if pad_left != 0 or pad_right != 0:
data = np.pad(
data, pad_width=(pad_left, pad_right), mode="constant", constant_values=0
)
return op(data)
FilterFirNumpyType: TypeAlias = Callable[[np.ndarray], np.ndarray]
def make_filter_fir_numpy(
fdef: DefFilterFIR, bound_left: EdgeHandling, bound_right: EdgeHandling
) -> FilterFirNumpyType:
"""Create a function that applies a given FIR filter to numpy arrays,
employing the specified boundary treatment.
Arguments:
fdef: The definition of the FIR filter
bound_left: Boundary treatment on left side
bound_right: Boundary treatment on right side
Returns:
Function which accepts a single 1D numpy array as input and returns
the filtered array.
"""
fmap = FIRCoreOp(fdef)
def op(data):
return semilocal_map_numpy(fmap, bound_left, bound_right, data)
return op
"""Unit tests for the module fir_filters_dask"""
import dask.array as da
import numpy as np
import pytest
from lisainstrument.fir_filters_dask import make_filter_fir_dask
from lisainstrument.fir_filters_numpy import (
DefFilterFIR,
EdgeHandling,
make_filter_fir_numpy,
)
def test_filter_dask_fir():
"""Ensure dask-based FIR filter results agree with numpy based ones"""
sig_np = np.array([1.0, 1.0, 1.0, 0.01, 0, 1, 0.2, 0.1])
coeffs = [0.1, 0.7, 0.1, 0.1]
fir = DefFilterFIR(filter_coeffs=coeffs, offset=-1)
op_np = make_filter_fir_numpy(fir, EdgeHandling.ZEROPAD, EdgeHandling.ZEROPAD)
op_da = make_filter_fir_dask(fir, EdgeHandling.ZEROPAD, EdgeHandling.ZEROPAD)
out_np = op_np(sig_np)
# ~ print("testing FIR filtering of chunked dask array")
for chunksize in (2, 3, 4, 5, 6, 7, 8):
sig_da = da.from_array(sig_np, chunks=chunksize)
# ~ print(sig_da)
out_da = op_da(sig_da).compute()
assert len(out_np) == len(out_da)
assert out_np == pytest.approx(out_da, abs=1e-14, rel=0)
"""Unit tests for the module fir_filters_numpy"""
import numpy as np
import pytest
from lisainstrument.fir_filters_numpy import (
DefFilterFIR,
EdgeHandling,
make_filter_fir_numpy,
)
def test_filter_numpy_fir_valid():
"""Test basic FIR filter with boundary handling using valid points on both sides."""
sig_in = np.array([1.0, 1.0, 1.0, 0, 0, 1, 0, 0])
size = len(sig_in)
coeffs = [0.1, 0.7, 0.1, 0.1]
width = len(coeffs)
fir = DefFilterFIR(filter_coeffs=coeffs, offset=-1)
op_vv = make_filter_fir_numpy(fir, EdgeHandling.VALID, EdgeHandling.VALID)
out_vv = op_vv(sig_in)
size_vv = size - width + 1
exp_vv = sum(coeffs[i] * sig_in[i : size_vv + i] for i in range(width))
assert len(out_vv) == size_vv
assert out_vv == pytest.approx(exp_vv, abs=1e-14, rel=0)
def test_filter_numpy_fir_zeropad():
"""Test basic FIR filter with boundary handling using zero padding on both sides."""
sig_in = np.array([1.0, 1.0, 1.0, 0, 0, 1, 0, 0])
size = len(sig_in)
cases = [
([0.1, 0.7, 0.1, 0.1], 1),
([0.1, 0.7, 0.1, 0.1], 2),
([0.1, 0.7, 0.2], 0),
([0.1, 0.7, 0.2], 1),
([0.1, 0.7, 0.2], 2),
]
for coeffs, marg_left in cases:
fir = DefFilterFIR(filter_coeffs=coeffs, offset=-marg_left)
# ~ print(fir)
op_zz = make_filter_fir_numpy(fir, EdgeHandling.ZEROPAD, EdgeHandling.ZEROPAD)
out_zz = op_zz(sig_in)
width = len(coeffs)
marg_right = width - 1 - marg_left
sig_zz = np.concatenate([[0] * marg_left, sig_in, [0] * marg_right])
exp_zz = sum(coeffs[i] * sig_zz[i : size + i] for i in range(width))
assert len(out_zz) == size
assert out_zz == pytest.approx(exp_zz, abs=1e-14, rel=0)
def test_filter_numpy_fir_mixed():
"""Test basic FIR filter with different boundary handling on both sides."""
sig_in = np.array([1.0, 1.0, 1.0, 0, 0, 1, 0, 0])
size = len(sig_in)
coeffs = [0.1, 0.7, 0.1, 0.1]
width = len(coeffs)
marg_left = 1
marg_right = width - 1 - marg_left
fir = DefFilterFIR(filter_coeffs=coeffs, offset=-marg_left)
op_zv = make_filter_fir_numpy(fir, EdgeHandling.ZEROPAD, EdgeHandling.VALID)
out_zv = op_zv(sig_in)
size_zv = size - marg_right
sig_zv = np.concatenate([[0] * marg_left, sig_in])
exp_zv = sum(coeffs[i] * sig_zv[i : size_zv + i] for i in range(width))
assert len(out_zv) == size_zv
assert out_zv == pytest.approx(exp_zv, abs=1e-14, rel=0)
op_vz = make_filter_fir_numpy(fir, EdgeHandling.VALID, EdgeHandling.ZEROPAD)
out_vz = op_vz(sig_in)
size_vz = size - marg_left
sig_vz = np.concatenate([sig_in, [0] * marg_right])
exp_vz = sum(coeffs[i] * sig_vz[i : size_vz + i] for i in range(width))
assert len(out_vz) == size_vz
assert out_vz == pytest.approx(exp_vz, abs=1e-14, rel=0)
def test_filter_numpy_fir_degenerate():
"""Test basic FIR filter for the degenerate case of size-one array"""
sig_in = np.array([1.0])
coeffs = [0.1, 0.7, 0.1, 0.1]
fir = DefFilterFIR(filter_coeffs=coeffs, offset=-1)
# ~ print("Testing filter")
# ~ print(fir)
op_zz = make_filter_fir_numpy(fir, EdgeHandling.ZEROPAD, EdgeHandling.ZEROPAD)
out_zz = op_zz(sig_in)
exp_zz = np.array([coeffs[1]])
assert len(out_zz) == 1
assert out_zz == pytest.approx(exp_zz, abs=1e-14, rel=0)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment