Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
LISA Instrument
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
LISA Simulation
LISA Instrument
Commits
d2cb05f1
Commit
d2cb05f1
authored
3 months ago
by
Wolfgang Kastaun
Browse files
Options
Downloads
Patches
Plain Diff
Added fixed-shift interpolator for numpy and dask
parent
a0237c3a
No related branches found
No related tags found
No related merge requests found
Pipeline
#376873
failed
3 months ago
Stage: test
Stage: build
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
lisainstrument/fixed_shift_dask.py
+161
-0
161 additions, 0 deletions
lisainstrument/fixed_shift_dask.py
lisainstrument/fixed_shift_numpy.py
+367
-0
367 additions, 0 deletions
lisainstrument/fixed_shift_numpy.py
with
528 additions
and
0 deletions
lisainstrument/fixed_shift_dask.py
0 → 100644
+
161
−
0
View file @
d2cb05f1
"""
Functions for applying dynamic real-valued shifts to dask arrays using Lagrange interpolation
Use make_dynamic_shift_lagrange_dask to create a Lagrange interpolator for dask arrays.
"""
from
__future__
import
annotations
from
typing
import
Final
import
dask
import
dask.array
as
da
import
numpy
as
np
from
typing_extensions
import
assert_never
from
lisainstrument.dynamic_delay_numpy
import
DynShiftBC
from
lisainstrument.fir_filters_dask
import
DaskArray1D
,
make_dask_array_1d
from
lisainstrument.fixed_shift_numpy
import
(
FixedShiftFactory
,
make_fixed_shift_factory_lagrange
,
)
class
FixedShiftDask
:
# pylint: disable=too-few-public-methods
"""
Interpolate Dask arrays to locations given by a const shift.
This allows to interpolate samples in a Dask array to locations specified
by a fixed shift. The shift is specified in units of the array index, i.e.
there is no separate coordinate array. A positive shift refers to values
left of a given sample, negative shifts to values on the right.
The boundary treatment can be specified for each boundary in terms of
DynShiftBC enums.
The interpolation method is not fixed but provided via an interpolator
instance implementing the RegularInterpMethod protocol.
"""
def
__init__
(
self
,
left_bound
:
DynShiftBC
,
right_bound
:
DynShiftBC
,
interp_fac
:
FixedShiftFactory
,
):
"""
Not intended for direct use, employ named constructors instead
Arguments:
left_bound: boundary treatment on the left
right_bound: boundary treatment on the right
interp_fac: Function to create interpolator engine for given shift
"""
self
.
_left_bound
:
Final
=
left_bound
self
.
_right_bound
:
Final
=
right_bound
self
.
_interp_fac
:
Final
=
interp_fac
def
_padding_left
(
self
,
samples
:
da
.
Array
,
margin_left
:
int
)
->
da
.
Array
:
if
margin_left
>
0
:
match
self
.
_left_bound
:
case
DynShiftBC
.
ZEROPAD
:
samples
=
da
.
concatenate
([
da
.
zeros
(
margin_left
),
samples
])
case
DynShiftBC
.
FLAT
:
samples
=
da
.
concatenate
(
[
da
.
ones
(
margin_left
)
*
samples
[
0
],
samples
]
)
case
DynShiftBC
.
EXCEPTION
:
msg
=
(
f
"
FixedShiftDask: left edge handling
{
self
.
_left_bound
.
name
}
not
"
f
"
possible for given delay, need margin of
{
margin_left
}
.
"
)
raise
RuntimeError
(
msg
)
case
_
as
unreachable
:
assert_never
(
unreachable
)
elif
margin_left
<
0
:
samples
=
samples
[
-
margin_left
:]
return
samples
def
_padding_right
(
self
,
samples
:
da
.
Array
,
margin_right
:
int
)
->
da
.
Array
:
if
margin_right
>
0
:
match
self
.
_right_bound
:
case
DynShiftBC
.
ZEROPAD
:
samples
=
da
.
concatenate
([
samples
,
da
.
zeros
(
margin_right
)])
case
DynShiftBC
.
FLAT
:
samples
=
da
.
concatenate
(
[
samples
,
da
.
ones
(
margin_right
)
*
samples
[
-
1
]]
)
case
DynShiftBC
.
EXCEPTION
:
msg
=
(
f
"
FixedShiftNumpy: right edge handling
{
self
.
_right_bound
.
name
}
not
"
f
"
possible for given delay, need margin of
{
margin_right
}
.
"
)
raise
RuntimeError
(
msg
)
case
_
as
unreachable
:
assert_never
(
unreachable
)
elif
margin_right
<
0
:
samples
=
samples
[:
margin_right
]
return
samples
def
__call__
(
self
,
samples
:
da
.
Array
,
shift
:
float
)
->
DaskArray1D
:
r
"""
Apply shift $s$ to samples in Dask array.
Denoting the input data as $y_i$ with $i=0 \ldots N-1$, and the interpolated
input data as $y(t)$, such that $y(i)=y_i$, the output $z_k$ is given by
$z_k = y(k-s), k=0 \ ldots N - 1$.
Required data outside the provided samples is created if the specified
boundary condition allows it, or an exception is raised if the BC disallows it.
The output has same length and chunking as the input.
Arguments:
samples: 1D Dask array with data samples
shift: The shift $s$
Returns:
Dask array with interpolated samples
"""
loc
=
-
shift
loc_int
=
int
(
np
.
floor
(
loc
))
loc_frac
=
loc
-
loc_int
interp
=
self
.
_interp_fac
(
-
loc_frac
)
margin_left
=
interp
.
margin_left
-
loc_int
margin_right
=
interp
.
margin_right
+
loc_int
samples_checked
=
make_dask_array_1d
(
samples
)
samples_padleft
=
self
.
_padding_left
(
samples_checked
,
margin_left
)
samples_padded
=
self
.
_padding_right
(
samples_padleft
,
margin_right
)
delayed_op
=
dask
.
delayed
(
interp
.
apply
)
results
=
[]
pos
=
0
for
chunk_shape
in
samples
.
chunks
[
0
]:
n_size
=
interp
.
margin_left
+
chunk_shape
+
interp
.
margin_right
samples_shifted
=
delayed_op
(
samples_padded
[
pos
:
pos
+
n_size
])
delayed_chunk
=
da
.
from_delayed
(
samples_shifted
,
(
chunk_shape
,),
samples
.
dtype
)
results
.
append
(
delayed_chunk
)
pos
+=
chunk_shape
return
make_dask_array_1d
(
da
.
concatenate
(
results
,
axis
=
0
))
def
make_fixed_shift_lagrange_dask
(
left_bound
:
DynShiftBC
,
right_bound
:
DynShiftBC
,
length
:
int
)
->
FixedShiftDask
:
"""
Create a FixedShiftDask instance that uses Lagrange interpolator
Arguments:
left_bound: boundary treatment on the left
right_bound: boundary treatment on the right
length: number of Lagrange plolynomials (=order + 1)
Returns:
Fixed shift interpolator
"""
fac
=
make_fixed_shift_factory_lagrange
(
length
)
return
FixedShiftDask
(
left_bound
,
right_bound
,
fac
)
This diff is collapsed.
Click to expand it.
lisainstrument/fixed_shift_numpy.py
0 → 100644
+
367
−
0
View file @
d2cb05f1
"""
Functions for interpolating numpy arrays with 1D regularly spaced data
This provides a generic interface RegularInterpolator as well as two interpolation
methods, linear and Lagrange. The latter is written from scratch, see module
regular_interpolator_dsp for another one based on the dsp.timeshift Lagrange interpolator.
"""
from
__future__
import
annotations
from
typing
import
Callable
,
Final
,
Protocol
,
TypeAlias
import
numpy
as
np
from
typing_extensions
import
assert_never
from
lisainstrument.dynamic_delay_numpy
import
DynShiftBC
from
lisainstrument.fir_filters_numpy
import
(
DefFilterFIR
,
EdgeHandling
,
NumpyArray1D
,
make_filter_fir_numpy
,
make_numpy_array_1d
,
)
def
make_numpy_array_1d_float
(
a
:
np
.
ndarray
)
->
NumpyArray1D
:
"""
Ensure numpy array is 1D and floating point type
"""
a1
=
make_numpy_array_1d
(
a
)
if
not
np
.
issubdtype
(
a
.
dtype
,
np
.
floating
):
msg
=
f
"
Expected numpy array with floating type, got
{
a
.
dtype
}
"
raise
TypeError
(
msg
)
return
a1
class
FixedShiftCore
(
Protocol
):
"""
Protocol for applying fixed shift to regularly spaced samples
This defines an interface for applying a fixed shift to regularly spaced
samples in 1D, provided as numpy arrays.
Boundary treatment is not part of this protocol. Implementations only compute
locations that can be interpolated to without using any form of boundary
conditions. The corresponding margin sizes required by the interpolation
method are exposed as properties.
Arbitrary shifts are valid, but the main use case are shifts of magnitude around 1.
For large shifts, the total margin size required will increase, in which case
it might be more efficient to handle the integer shift before and then apply
the remaining fractional shift.
"""
@property
def
margin_left
(
self
)
->
int
:
"""
Margin size (>= 0) needed on the left boundary
"""
@property
def
margin_right
(
self
)
->
int
:
"""
Margin size (>= 0) needed on the right boundary
"""
@property
def
shift
(
self
)
->
float
:
"""
The shift
"""
def
apply
(
self
,
samples
:
np
.
ndarray
)
->
NumpyArray1D
:
r
"""
Apply the fractional shift to a 1D numpy array.
The output is the shifted input with left and right margins removed.
The shift $s$ should be bounded $-1 < s < 1$
Denoting the input data as $y_i$ with $i=0 \ldots N-1$, and the interpolated
input data as $y(t)$, such that $y(i)=y_i$, the output $z_k$ is given by
$z_k = y(k+M_L-s), k=0 \ ldots N - 1 - M_L - M_R$, where $M_L$ and $M_R$
are the left and right margin sizes.
Arguments:
samples: 1D numpy array with sampled data
start: integer part of
size: number of points to return
Returns:
Interpolated samples
"""
def
make_fir_lagrange_fixed
(
length
:
int
,
frac_shift
:
float
)
->
DefFilterFIR
:
r
"""
Create FIR filter corresponding to non-integer shift using Lagrange interpolation
This creates a FIR filter with coefficients given by an Lagrangian
interpolation polynomial evaluated at a fixed location $s$.
We work completely in index space of the input sequence, i.e. the
coordinate of a given index is the index. The location $s$ also refers
to index space.
Thus, we specialize Lagrange interpolation to the case where
the sample locations are
$t_j = j + D$, whith $j=0 \ldots L-1$, and $L$ being the number of
points to use. The integer offset $D$ determines which points to use,
and should be chosen such that the center $D + L/2$ lies near $s$.
In other words, the input sequence indices $D \ldots D+L-1$ are used
to interpolate to (fractional) index $s$.
So far we described interpolation to one location. The FIR filter
is defined by simple translation, such that the element with index 0
in the output sequence corresponds to the input sequence interpolated
to fractional index $s$, and any element $a$ to $s+a$.
$$
\begin{align*}
y_a &= \sum_{j=0}^{L-1} K_j x_{a+j+D} \\
K_j &= \prod_\substack{m=0\\m \neq j}^{L-1} \frac{s-D-m}{j-m}
\end{align*}
$$
The offset is chosen to center the stencil around a shift
$s=0$ for odd length and s=$1/2$ for even length. The shift should
not exceed the bounds $-1/2 < s < 1/2$ for odd length or $0 < s < 1$
for even length significantly, to avoid large overshoots that inherently
occur off-center for high order lagrange interpolation.
Arguments:
length: Number of FIR coefficients (=Lagrange order + 1)
frac_shift: The shift $s$
Returns:
FIR filter definition
"""
if
length
<=
1
:
msg
=
f
"
make_fir_lagrange_fixed: stencil length must be 2 or more, got
{
length
}
"
raise
ValueError
(
msg
)
offset
=
-
((
length
-
1
)
//
2
)
r
=
np
.
arange
(
length
)
m2
,
j2
=
np
.
meshgrid
(
r
,
r
)
msk
=
m2
!=
j2
s
=
np
.
array
(
frac_shift
)
x
=
s
-
offset
p
=
np
.
ones_like
(
m2
,
dtype
=
np
.
float64
)
p
[
msk
]
=
(
x
-
m2
[
msk
])
/
(
j2
-
m2
)[
msk
]
kj
=
np
.
prod
(
p
,
axis
=
1
)
kj
/=
np
.
sum
(
kj
)
# only to reduce rounding errors
return
DefFilterFIR
(
filter_coeffs
=
kj
,
offset
=
offset
)
class
FixedShiftLagrange
(
FixedShiftCore
):
r
"""
Class implementing fixed shift of regularly spaced 1D data using Lagrange interpolation.
The algorithm uses Lagrange interpolation, using a FIR filter based on lagrange polynomials
evaluated at fixed fractional shift. This uses a stencil with center as close to the
location as possible. For odd length, the center point is obtained by rounding the
location, and that the remaining fractional shift is within $[-1/2,1/2]$. For even
locations, the center points is the floor of the location, with remaining fractional
shift within $[0,1)$
See FixedShiftCore for general properties not specific to the interpolation method.
"""
def
__init__
(
self
,
length
:
int
,
shift
:
float
):
"""
Set up interpolation parameters.
The length parameter specifies the number of interpolation polynomials,
which have order length - 1. The length is also the number of samples used
for each interpolation point. The order of the interpolating polynomials
is also the order of plynomials that are interpolated with zero error.
The shift is given as a delay, i.e. positive values refer to locations
to the left of the output sample at hand.
Arguments:
length: Size of the interpolation stencil
shift: The constant shift
"""
loc
=
-
float
(
shift
)
if
length
%
2
==
0
:
loc_int
=
int
(
np
.
floor
(
loc
))
else
:
loc_int
=
int
(
np
.
round
(
loc
))
loc_frac
=
loc
-
loc_int
firdef
=
make_fir_lagrange_fixed
(
length
,
loc_frac
)
self
.
_margin_left
=
max
(
0
,
-
firdef
.
domain_of_dependence
[
0
]
-
loc_int
)
self
.
_margin_right
=
max
(
0
,
firdef
.
domain_of_dependence
[
1
]
+
loc_int
)
self
.
_filt
=
make_filter_fir_numpy
(
firdef
,
EdgeHandling
.
VALID
,
EdgeHandling
.
VALID
)
self
.
_shift
=
shift
@property
def
margin_left
(
self
)
->
int
:
"""
Margin size (>= 0) needed on the left boundary
"""
return
self
.
_margin_left
@property
def
margin_right
(
self
)
->
int
:
"""
Margin size (>= 0) needed on the right boundary
"""
return
self
.
_margin_right
@property
def
shift
(
self
)
->
float
:
"""
The shift
"""
return
self
.
_shift
def
apply
(
self
,
samples
:
np
.
ndarray
)
->
NumpyArray1D
:
"""
Apply shift, see FixedShiftCore.apply()
Arguments:
samples: 1D numpy array with sampled data
Returns:
Shifted input excluding margins
"""
a
=
make_numpy_array_1d_float
(
samples
)
return
make_numpy_array_1d
(
self
.
_filt
(
a
))
FixedShiftFactory
:
TypeAlias
=
Callable
[[
float
],
FixedShiftCore
]
def
make_fixed_shift_factory_lagrange
(
length
:
int
)
->
FixedShiftFactory
:
"""
Factory for making FixedShiftLagrange instances
Arguments:
length: number of Lagrange polynomials to use
Returns:
Factory function for making FixedShiftLagrange instances from shift
"""
def
factory
(
shift
:
float
)
->
FixedShiftCore
:
"""
FixedShiftLagrange instances from shift using preselected order
Arguments:
shift: The fixed shift
"""
return
FixedShiftLagrange
(
length
,
shift
)
return
factory
class
FixedShiftNumpy
:
# pylint: disable=too-few-public-methods
"""
Interpolate to locations given by a const shift.
This allows to interpolate samples in a numpy array to locations specified
by a fixed shift. The shift is specified in units of the array index, i.e.
there is no separate coordinate array. A positive shift refers to values
left of a given sample, negative shifts to values on the right.
The boundary treatment can be specified for each boundary in terms of
DynShiftBC enums.
The interpolation method is not fixed but provided via an interpolator
instance implementing the FixedShiftCore protocol.
"""
def
__init__
(
self
,
left_bound
:
DynShiftBC
,
right_bound
:
DynShiftBC
,
interp_fac
:
FixedShiftFactory
,
):
"""
Not intended for direct use, employ named constructors instead
Arguments:
left_bound: boundary treatment on the left
right_bound: boundary treatment on the right
interp_fac: Function to create interpolator engine for given shift
"""
self
.
_left_bound
:
Final
=
left_bound
self
.
_right_bound
:
Final
=
right_bound
self
.
_interp_fac
:
Final
=
interp_fac
def
_padding_left
(
self
,
samples
:
np
.
ndarray
,
margin_left
:
int
)
->
np
.
ndarray
:
if
margin_left
>
0
:
match
self
.
_left_bound
:
case
DynShiftBC
.
ZEROPAD
:
samples
=
np
.
concatenate
([
np
.
zeros
(
margin_left
),
samples
])
case
DynShiftBC
.
FLAT
:
samples
=
np
.
concatenate
(
[
np
.
ones
(
margin_left
)
*
samples
[
0
],
samples
]
)
case
DynShiftBC
.
EXCEPTION
:
msg
=
(
f
"
FixedShiftNumpy: left edge handling
{
self
.
_left_bound
.
name
}
not
"
f
"
possible for given delay, need margin of
{
margin_left
}
.
"
)
raise
RuntimeError
(
msg
)
case
_
as
unreachable
:
assert_never
(
unreachable
)
elif
margin_left
<
0
:
samples
=
samples
[
-
margin_left
:]
return
samples
def
_padding_right
(
self
,
samples
:
np
.
ndarray
,
margin_right
:
int
)
->
np
.
ndarray
:
if
margin_right
>
0
:
match
self
.
_right_bound
:
case
DynShiftBC
.
ZEROPAD
:
samples
=
np
.
concatenate
([
samples
,
np
.
zeros
(
margin_right
)])
case
DynShiftBC
.
FLAT
:
samples
=
np
.
concatenate
(
[
samples
,
np
.
ones
(
margin_right
)
*
samples
[
-
1
]]
)
case
DynShiftBC
.
EXCEPTION
:
msg
=
(
f
"
FixedShiftNumpy: right edge handling
{
self
.
_right_bound
.
name
}
not
"
f
"
possible for given delay, need margin of
{
margin_right
}
.
"
)
raise
RuntimeError
(
msg
)
case
_
as
unreachable
:
assert_never
(
unreachable
)
elif
margin_right
<
0
:
samples
=
samples
[:
margin_right
]
return
samples
def
__call__
(
self
,
samples
:
np
.
ndarray
,
shift
:
float
)
->
NumpyArray1D
:
r
"""
Apply shift $s$ to samples in numpy array.
Denoting the input data as $y_i$ with $i=0 \ldots N-1$, and the interpolated
input data as $y(t)$, such that $y(i)=y_i$, the output $z_k$ is given by
$z_k = y(k-s), k=0 \ ldots N - 1$.
Required data outside the provided samples is created if the specified
boundary condition allows it, or an exception is raised if the BC disallows it.
The output has same length as the input.
Arguments:
samples: 1D numpy array with data samples
shift: The shift $s$
Returns:
Numpy array with interpolated samples
"""
loc
=
-
shift
loc_int
=
int
(
np
.
floor
(
loc
))
loc_frac
=
loc
-
loc_int
interp
=
self
.
_interp_fac
(
-
loc_frac
)
margin_left
=
interp
.
margin_left
-
loc_int
margin_right
=
interp
.
margin_right
+
loc_int
samples_checked
=
make_numpy_array_1d
(
samples
)
samples_padleft
=
self
.
_padding_left
(
samples_checked
,
margin_left
)
samples_padded
=
self
.
_padding_right
(
samples_padleft
,
margin_right
)
return
interp
.
apply
(
samples_padded
)
def
make_fixed_shift_lagrange_numpy
(
left_bound
:
DynShiftBC
,
right_bound
:
DynShiftBC
,
length
:
int
)
->
FixedShiftNumpy
:
"""
Create a FixedShiftNumpy instance that uses Lagrange interpolator
Arguments:
left_bound: boundary treatment on the left
right_bound: boundary treatment on the right
length: number of Lagrange plolynomials (=order + 1)
Returns:
Fixed shift interpolator
"""
fac
=
make_fixed_shift_factory_lagrange
(
length
)
return
FixedShiftNumpy
(
left_bound
,
right_bound
,
fac
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment