Skip to content
Snippets Groups Projects
Commit a9ecb979 authored by Wolfgang Kastaun's avatar Wolfgang Kastaun
Browse files

created unit tests for FIR filters with dask and numpy

parent 6ae23bbd
No related branches found
No related tags found
1 merge request!189Draft: Resolve "Daskify LISA Instrument"
Pipeline #371649 failed
"""Unit tests for the module fir_filters_dask"""
import numpy as np
import dask.array as da
import pytest
from lisainstrument.fir_filters_numpy import (
DefFilterFIR,
EdgeHandling,
make_filter_fir_numpy,
)
from lisainstrument.fir_filters_dask import make_filter_fir_dask
def test_filter_dask_fir():
"""Ensure dask-based FIR filter results agree with numpy based ones"""
sig_np = np.array([1.0, 1.0, 1.0, 0.01, 0, 1, 0.2, 0.1])
coeffs = [0.1, 0.7, 0.1, 0.1]
fir = DefFilterFIR(filter_coeffs=coeffs, offset=-1)
op_np = make_filter_fir_numpy(fir, EdgeHandling.ZEROPAD, EdgeHandling.ZEROPAD)
op_da = make_filter_fir_dask(fir, EdgeHandling.ZEROPAD, EdgeHandling.ZEROPAD)
out_np = op_np(sig_np)
# ~ print("testing FIR filtering of chunked dask array")
for chunksize in (2, 3, 4, 5, 6, 7, 8):
sig_da = da.from_array(sig_np, chunks=chunksize)
# ~ print(sig_da)
out_da = op_da(sig_da).compute()
assert len(out_np) == len(out_da)
assert out_np == pytest.approx(out_da, abs=1e-14, rel=0)
"""Unit tests for the module fir_filters_numpy"""
import numpy as np
import pytest
from lisainstrument.fir_filters_numpy import (
DefFilterFIR,
EdgeHandling,
make_filter_fir_numpy,
)
def test_filter_numpy_fir_valid():
"""Test basic FIR filter with boundary handling using valid points on both sides."""
sig_in = np.array([1.0, 1.0, 1.0, 0, 0, 1, 0, 0])
size = len(sig_in)
coeffs = [0.1, 0.7, 0.1, 0.1]
width = len(coeffs)
fir = DefFilterFIR(filter_coeffs=coeffs, offset=-1)
op_vv = make_filter_fir_numpy(fir, EdgeHandling.VALID, EdgeHandling.VALID)
out_vv = op_vv(sig_in)
size_vv = size - width + 1
exp_vv = sum(coeffs[i] * sig_in[i : size_vv + i] for i in range(width))
assert len(out_vv) == size_vv
assert out_vv == pytest.approx(exp_vv, abs=1e-14, rel=0)
def test_filter_numpy_fir_zeropad():
"""Test basic FIR filter with boundary handling using zero padding on both sides."""
sig_in = np.array([1.0, 1.0, 1.0, 0, 0, 1, 0, 0])
size = len(sig_in)
cases = [
([0.1, 0.7, 0.1, 0.1], 1),
([0.1, 0.7, 0.1, 0.1], 2),
([0.1, 0.7, 0.2], 0),
([0.1, 0.7, 0.2], 1),
([0.1, 0.7, 0.2], 2),
]
for coeffs, marg_left in cases:
fir = DefFilterFIR(filter_coeffs=coeffs, offset=-marg_left)
# ~ print(fir)
op_zz = make_filter_fir_numpy(fir, EdgeHandling.ZEROPAD, EdgeHandling.ZEROPAD)
out_zz = op_zz(sig_in)
width = len(coeffs)
marg_right = width - 1 - marg_left
sig_zz = np.concatenate([[0] * marg_left, sig_in, [0] * marg_right])
exp_zz = sum(coeffs[i] * sig_zz[i : size + i] for i in range(width))
assert len(out_zz) == size
assert out_zz == pytest.approx(exp_zz, abs=1e-14, rel=0)
def test_filter_numpy_fir_mixed():
"""Test basic FIR filter with different boundary handling on both sides."""
sig_in = np.array([1.0, 1.0, 1.0, 0, 0, 1, 0, 0])
size = len(sig_in)
coeffs = [0.1, 0.7, 0.1, 0.1]
width = len(coeffs)
marg_left = 1
marg_right = width - 1 - marg_left
fir = DefFilterFIR(filter_coeffs=coeffs, offset=-marg_left)
op_zv = make_filter_fir_numpy(fir, EdgeHandling.ZEROPAD, EdgeHandling.VALID)
out_zv = op_zv(sig_in)
size_zv = size - marg_right
sig_zv = np.concatenate([[0] * marg_left, sig_in])
exp_zv = sum(coeffs[i] * sig_zv[i : size_zv + i] for i in range(width))
assert len(out_zv) == size_zv
assert out_zv == pytest.approx(exp_zv, abs=1e-14, rel=0)
op_vz = make_filter_fir_numpy(fir, EdgeHandling.VALID, EdgeHandling.ZEROPAD)
out_vz = op_vz(sig_in)
size_vz = size - marg_left
sig_vz = np.concatenate([sig_in, [0] * marg_right])
exp_vz = sum(coeffs[i] * sig_vz[i : size_vz + i] for i in range(width))
assert len(out_vz) == size_vz
assert out_vz == pytest.approx(exp_vz, abs=1e-14, rel=0)
def test_filter_numpy_fir_degenerate():
"""Test basic FIR filter for the degenerate case of size-one array"""
sig_in = np.array([1.0])
coeffs = [0.1, 0.7, 0.1, 0.1]
fir = DefFilterFIR(filter_coeffs=coeffs, offset=-1)
# ~ print("Testing filter")
# ~ print(fir)
op_zz = make_filter_fir_numpy(fir, EdgeHandling.ZEROPAD, EdgeHandling.ZEROPAD)
out_zz = op_zz(sig_in)
exp_zz = np.array([coeffs[1]])
assert len(out_zz) == 1
assert out_zz == pytest.approx(exp_zz, abs=1e-14, rel=0)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment