Skip to content
Snippets Groups Projects
Commit e444cbf9 authored by Wolfgang Kastaun's avatar Wolfgang Kastaun
Browse files

moved fixed point iteration out of ShiftInverseNumpy into free function

parent e78deea5
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,47 @@ from lisainstrument.regular_interpolators import ( ...@@ -19,6 +19,47 @@ from lisainstrument.regular_interpolators import (
) )
def fixed_point_iter(
f: Callable[[NumpyArray1D], NumpyArray1D],
ferr: Callable[[NumpyArray1D, NumpyArray1D], float],
x: NumpyArray1D,
tolerance: float,
max_iter: int,
) -> NumpyArray1D:
r"""Perform a fixed point iteration for functions operating on a 1D array.
This uses fixed-point iteration to find a solution for
$$ x = f(x) $$
where $x$ is a 1D array and $f$ returns an array of the same size.
The convergence criterion is provided by the user
via a function $r(x)$ that returns a scalar error measure. The iteration
is performed until $r(x) < \epsilon$. If convergence is not achieved
after a given number of iterations, an exception is raised.
Arguments:
f: The function $f(x)$
ferr: The error measure $r(x)$
x: The initial data for the iteration.
tolerance: The tolerance $\epsilon$
max_iter: Maximum number of iterations
Returns:
Array with solution
"""
for _ in range(max_iter):
x_next = f(x)
err = ferr(x, x_next)
if err < tolerance:
return x_next
x = x_next
msg = (
f"ShiftInverseNumpy: iteration did not converge (error={err}, "
f"tolerance={tolerance}), iterations={max_iter}"
)
raise RuntimeError(msg)
class ShiftInverseNumpy: # pylint: disable=too-few-public-methods class ShiftInverseNumpy: # pylint: disable=too-few-public-methods
"""Invert coordinate transformation given as shift""" """Invert coordinate transformation given as shift"""
...@@ -58,24 +99,6 @@ class ShiftInverseNumpy: # pylint: disable=too-few-public-methods ...@@ -58,24 +99,6 @@ class ShiftInverseNumpy: # pylint: disable=too-few-public-methods
""" """
return self._interp_np.margin_right + self._max_abs_shift return self._interp_np.margin_right + self._max_abs_shift
def _fixed_point_iter(
self,
f: Callable[[NumpyArray1D], NumpyArray1D],
ferr: Callable[[NumpyArray1D, NumpyArray1D], float],
x: NumpyArray1D,
) -> NumpyArray1D:
for _ in range(self._max_iter):
x_next = f(x)
err = ferr(x, x_next)
if err < self._tolerance:
return x_next
x = x_next
msg = (
f"ShiftInverseNumpy: iteration did not converge (error={err}, "
f"tolerance={self._tolerance}), iterations={self._max_iter}"
)
raise RuntimeError(msg)
def __call__(self, shift: np.ndarray, fsample: float) -> NumpyArray1D: def __call__(self, shift: np.ndarray, fsample: float) -> NumpyArray1D:
"""Find the shift for the inverse transformation given by a shift. """Find the shift for the inverse transformation given by a shift.
...@@ -103,7 +126,9 @@ class ShiftInverseNumpy: # pylint: disable=too-few-public-methods ...@@ -103,7 +126,9 @@ class ShiftInverseNumpy: # pylint: disable=too-few-public-methods
def f_err(x1: NumpyArray1D, x2: NumpyArray1D) -> float: def f_err(x1: NumpyArray1D, x2: NumpyArray1D) -> float:
return np.max(np.abs(x1 - x2)[self.margin_left : -self.margin_right]) return np.max(np.abs(x1 - x2)[self.margin_left : -self.margin_right])
shift_idx_inv = self._fixed_point_iter(f_iter, f_err, dx) shift_idx_inv = fixed_point_iter(
f_iter, f_err, dx, self._tolerance, self._max_iter
)
shift_inv = shift_idx_inv / fsample shift_inv = shift_idx_inv / fsample
return make_numpy_array_1d(shift_inv) return make_numpy_array_1d(shift_inv)
......
...@@ -8,6 +8,8 @@ from tempfile import TemporaryDirectory ...@@ -8,6 +8,8 @@ from tempfile import TemporaryDirectory
import h5py import h5py
import numpy as np import numpy as np
import pytest import pytest
from lisaconstants import SPEED_OF_LIGHT
from scipy.signal import welch
from lisainstrument import Instrument from lisainstrument import Instrument
...@@ -168,3 +170,249 @@ def test_initial_telemetry_size(): ...@@ -168,3 +170,249 @@ def test_initial_telemetry_size():
) )
instru.simulate() instru.simulate()
instru.write(path, mode="w") instru.write(path, mode="w")
def test_output_order_of_magnitude():
"""Test that output time series have the correct order of magnitude.
Mostly follows the reasoning outlined in 10.1103/PhysRevD.107.083019, section IX A.
We use a short simulation of 1000s.
Given the short simulation, we expect large variances for each individual frequency bin
We compute the mean of the ratio PSD/model as summary statistic, which should
stay close to 1. To account for the simplistic model, we allow deviations
by up to one order of magnitude.
"""
# Run short simulation with equal arms, everything else at default parameters
# We fix the noise seed for reproducibility
size = 1e3
ltt = 8.3
instr = Instrument(
size=size, orbits={mosa: ltt for mosa in Instrument.MOSAS}, seed=1
)
instr.simulate()
def estimate_psd(data, detrend=None):
"""Estimate the PSD of a time series. We drop the first and last 5 points to avoid edge effects."""
f, psd = welch(
data, fs=instr.fs, nperseg=data.size, window=("kaiser", 15), detrend=detrend
)
return f[5:-5], psd[5:-5]
# define the locking and non-locking ISI and RFI for the default locking configuration N1-12
locking_isi = ["21", "31"]
non_locking_isi = ["12", "23", "32", "13"]
locking_rfi = ["13", "23", "32"]
non_locking_rfi = ["12", "21", "31"]
# We need to skip some samples in the beginning to account for delays and filter warm up times
skip = 100
# Estimate PSDs for different outputs
locking_isi_psds = np.array(
[
estimate_psd(instr.isi_carrier_fluctuations[mosa][skip:])
for mosa in locking_isi
]
)
locking_rfi_psds = np.array(
[
estimate_psd(instr.rfi_carrier_fluctuations[mosa][skip:])
for mosa in locking_rfi
]
)
# Define frequency axis
f = locking_isi_psds[0][0]
# We expect the locking IFOs to be close to the numerical limit.
# We don't expect to reach the theoretical machine epsilon of double
# precision (around 15 decimal digits, so 30 orders of magnitude in PSD),
# due to accumulation of errors.
# Instead, we check that the locking beatnotes have PSDs at least 25 orders of
# magnitude below the level of the primary noise source.
for i in range(2):
assert np.mean((locking_isi_psds[i][1] / instr.laser_asds["12"] ** 2)) <= 1e-25
for i in range(3):
assert np.mean((locking_rfi_psds[i][1] / instr.laser_asds["12"] ** 2)) <= 1e-25
# Estimate non-locking ISI PSDs
non_locking_isi_psds = np.array(
[
estimate_psd(instr.isi_carrier_fluctuations[mosa][skip:])
for mosa in non_locking_isi
]
)
# For non-locking ISI on S/C 1, the laser noise appears modulated by a round-trip delay
# We add a small offset to avoid exact zeros in the transfer function
non_locking_isi_model_psd_1 = instr.laser_asds["12"] ** 2 * (
np.abs(1 - np.exp(-1j * 2 * np.pi * f * 2 * ltt)) ** 2 + 0.01
)
# For non-locking ISI on S/C 2 and 3, the laser noise appears modulated by a single delay
# We add a small offset to avoid exact zeros in the transfer function
non_locking_isi_model_psd_2 = instr.laser_asds["12"] ** 2 * (
np.abs(1 - np.exp(-1j * 2 * np.pi * f * ltt)) ** 2 + 0.01
)
# For the ISIs, handle 12, 13 separately from 23, 32
for i in [0, 3]:
assert (
0.1
<= np.mean((non_locking_isi_psds[i][1] / non_locking_isi_model_psd_1))
<= 10
)
for i in [1, 2]:
assert (
0.1
<= np.mean((non_locking_isi_psds[i][1] / non_locking_isi_model_psd_2))
<= 10
)
# Estimate non-locking RFI PSDs
non_locking_rfi_psds = np.array(
[
estimate_psd(instr.rfi_carrier_fluctuations[mosa][skip:])
for mosa in non_locking_rfi
]
)
# For the RFI: the model is given by the secondary noises uncorrelated with the lockign RFI on the same S/C
# There is a factor 2 since the noise of the locking RFI is imprinted on the laser and propagated to the non-locking one.
rfi_secondary_psd = (
instr.oms_rfi_carrier_asds["12"] ** 2 + instr.backlink_asds["12"] ** 2
)
m_2_hz = (2 * np.pi * f / SPEED_OF_LIGHT * instr.central_freq) ** 2
non_locking_rfi_model_psd = 2 * rfi_secondary_psd * m_2_hz
# We expect the same PSD for all non-locking RFIs
for i in range(3):
assert (
0.1
<= np.mean((non_locking_rfi_psds[i][1] / non_locking_rfi_model_psd))
<= 10
)
# Estimate TMI PSDs
tmi_psds = np.array(
[
estimate_psd(instr.tmi_carrier_fluctuations[mosa][skip:])
for mosa in Instrument.MOSAS
]
)
# For the TMI: the dominant effect is the jitter of the S/C
tmi_model = instr.mosa_jitter_x_asds["12"] ** 2
tmi_model_psd = 4 * tmi_model * m_2_hz
# We expect the same PSD for all TMIs
for i in range(6):
assert 0.1 <= np.mean(tmi_psds[i][1] / tmi_model_psd) <= 10
# Test the MPRs roughly measure the nominal LTT
# We don't expect this to be exact, due to clock effects and ranging errors
for mosa in instr.MOSAS:
np.testing.assert_allclose(8.3, instr.mprs[mosa][50:], atol=0.1)
# Test the PSD of the MPRs
# We expect them to be consistent with ranging noise
mpr_psds = np.array(
[
estimate_psd(instr.mprs[mosa][skip:], detrend="linear")
for mosa in Instrument.MOSAS
]
)
# Ranging noise is a white noise, so we directly compare against the nominal PSD
for i in range(6):
assert 0.1 <= np.mean(mpr_psds[i][1] / instr.ranging_asds["12"] ** 2) <= 10
# Test the PSD of the ISI sideband measurements
# We take the difference of the carrier and sideband to cancel out
# the dominant laser noise
isi_sb_psds = np.array(
[
estimate_psd(
(
instr.isi_carrier_fluctuations[mosa]
- instr.isi_usb_fluctuations[mosa]
)[skip:]
)
for mosa in instr.MOSAS
]
)
# The main noise sources affecting the ISI sidebands are
# the modulation noise, clock noise and OMS noise
# Clock and modulation noises enter scaled by the modulation frequencies
left_mod_model = (
instr.modulation_freqs["12"] ** 2
* instr.modulation_asds["12"] ** 2
* f ** (2 / 3)
)
right_mod_model = (
instr.modulation_freqs["21"] ** 2
* instr.modulation_asds["21"] ** 2
* f ** (2 / 3)
)
clock_model = instr.modulation_freqs["12"] ** 2 * instr.clock_asds["1"] ** 2 * f**-1
sb_readout_model = instr.oms_isi_usb_asds["12"] ** 2 * m_2_hz
isi_usb_model = (
2 * clock_model + left_mod_model + right_mod_model + sb_readout_model
)
for i in range(6):
assert 0.1 <= np.mean(isi_sb_psds[i][1] / isi_usb_model) <= 10
# Test the PSD of the RFI sideband measurements
rfi_sb_psds = np.array(
[
estimate_psd(
(
instr.rfi_carrier_fluctuations[mosa]
- instr.rfi_usb_fluctuations[mosa]
)[skip:]
)
for mosa in instr.MOSAS
]
)
# The main noise sources affecting the RFI sidebands are
# the modulation noise and OMS noise. Clock noise is common mode
# in both interfering beams and cancels out.
# Modulation noises enter scaled by the modulation frequencies.
rfi_sbreadoutmodel = instr.oms_rfi_usb_asds["12"] ** 2 * m_2_hz
rfi_usb_model = left_mod_model + right_mod_model + rfi_sbreadoutmodel
for i in range(6):
assert 0.1 <= np.mean(rfi_sb_psds[i][1] / rfi_usb_model) <= 10
# Estimate the PSD of DWS measurements
dws_eta_psds = np.array(
[estimate_psd((instr.isi_dws_etas[mosa])[skip:]) for mosa in instr.MOSAS]
)
dws_phi_psds = np.array(
[estimate_psd((instr.isi_dws_phis[mosa])[skip:]) for mosa in instr.MOSAS]
)
# DWS measures the combined SC + MOSA jitter of the receiving S/C + DWS noise
dws_eta_model = (
instr.dws_asds["12"] ** 2
+ instr.mosa_jitter_eta_asds["12"] ** 2
+ instr.sc_jitter_eta_asds["1"] ** 2
) * (2 * np.pi * f) ** 2
dws_phi_model = (
instr.dws_asds["12"] ** 2
+ instr.mosa_jitter_phi_asds["12"] ** 2
+ instr.sc_jitter_phi_asds["1"] ** 2
) * (2 * np.pi * f) ** 2
for i in range(6):
assert 0.1 <= np.mean(dws_eta_psds[i][1] / dws_eta_model) <= 10
assert 0.1 <= np.mean(dws_phi_psds[i][1] / dws_phi_model) <= 10
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment