Skip to content
Snippets Groups Projects
test_instrument.py 14.6 KiB
Newer Older
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""Test the Instrument class."""

import os
from tempfile import TemporaryDirectory
import numpy as np
import pytest
from lisaconstants import SPEED_OF_LIGHT
from scipy.signal import welch
from lisainstrument import Instrument


def test_run():
    """Test that simulations can run."""
    with TemporaryDirectory() as temp_dir:
        instru = Instrument(size=100)
        instru.simulate()
        path = os.path.join(temp_dir, "measurements.h5")
        instru.write(path, mode="w")
def test_seeded_run():
    """Test that simulations can run with a seed."""

    instru_1 = Instrument(size=100, seed=42)
    instru_1.simulate()
    instru_2 = Instrument(size=100, seed=42)
    instru_2.simulate()
    instru_3 = Instrument(size=100, seed=43)
    instru_3.simulate()

    with TemporaryDirectory() as temp_dir:
        path = os.path.join(temp_dir, "measurements.h5")
        instru_1.write(path, mode="w")
        with h5py.File(path, "r") as f:
            assert f.attrs["seed"] == 42

    for mosa in Instrument.MOSAS:
        np.testing.assert_array_equal(
            instru_1.testmass_noises[mosa], instru_2.testmass_noises[mosa]
        )
        np.testing.assert_array_equal(
            instru_1.isi_carrier_fluctuations[mosa],
            instru_2.isi_carrier_fluctuations[mosa],
        )
        np.testing.assert_array_equal(instru_1.tmi_usbs[mosa], instru_2.tmi_usbs[mosa])

        assert np.all(instru_1.testmass_noises[mosa] != instru_3.testmass_noises[mosa])
        assert np.all(
            instru_1.isi_carrier_fluctuations[mosa]
            != instru_3.isi_carrier_fluctuations[mosa]
        )
        assert np.all(instru_1.tmi_usbs[mosa] != instru_3.tmi_usbs[mosa])


def test_run_no_aafilter():
    """Test that simulations can run with no filter."""
    with TemporaryDirectory() as temp_dir:
        instru = Instrument(size=100, aafilter=None)
        instru.simulate()
        path = os.path.join(temp_dir, "measurements.h5")
        instru.write(path, mode="w")

def test_run_no_upsampling():
    """Test that simulations can run with no filter."""
    with TemporaryDirectory() as temp_dir:
        instru = Instrument(size=100, physics_upsampling=1, aafilter=None)
        instru.simulate()
        path = os.path.join(temp_dir, "measurements.h5")
        instru.write(path, mode="w")
def test_no_orbit_file():
    """Test that simulations fail with an invalid orbit file."""
    with pytest.raises(FileNotFoundError):
        Instrument(size=100, orbits="tests/data/nonexistent-orbits.h5")
    with pytest.raises(FileNotFoundError):
        Instrument(size=100, t0=0, orbits="tests/data/nonexistent-orbits.h5")
def test_keplerian_orbits_1_0_2():
    """Test that simulations can run with Keplerian orbit files v1.0.2."""
    with TemporaryDirectory() as temp_dir:
        instru = Instrument(size=100, orbits="tests/data/keplerian-orbits-1-0-2.h5")
        instru.simulate()
        path = os.path.join(temp_dir, "measurements.h5")
        instru.write(path, mode="w")
def test_esa_orbits_1_0_2():
    """Test that simulations can run with ESA orbit files v1.0.2."""
    with TemporaryDirectory() as temp_dir:
        instru = Instrument(size=100, orbits="tests/data/esa-orbits-1-0-2.h5")
        instru.simulate()
        path = os.path.join(temp_dir, "measurements.h5")
        instru.write(path, mode="w")
def test_keplerian_orbits_2_0():
    """Test that simulations can run with Keplerian orbit files v2.0."""
    with TemporaryDirectory() as temp_dir:
        instru = Instrument(size=100, orbits="tests/data/keplerian-orbits-2-0.h5")
        instru.simulate()
        path = os.path.join(temp_dir, "measurements.h5")
        instru.write(path, mode="w")
def test_esa_trailing_orbits_2_0():
    """Test that simulations can run with ESA trailing orbit files v2.0."""
    with TemporaryDirectory() as temp_dir:
        instru = Instrument(size=100, orbits="tests/data/esa-trailing-orbits-2-0.h5")
        instru.simulate()
        path = os.path.join(temp_dir, "measurements.h5")
        instru.write(path, mode="w")
def test_locking():
    """Test that simulations can run with various lock configurations."""
    with TemporaryDirectory() as temp_dir:
        path = os.path.join(temp_dir, "measurements.h5")
        # Test six free-running lasers
        instru = Instrument(size=100, lock="six")
        instru.simulate()
        instru.write(path, mode="w")
        # Test three free-running lasers
        instru = Instrument(size=100, lock="three")
        instru.simulate()
        instru.write(path, mode="w")
        # Test non-swap configurations
        for i in range(1, 6):
            instru = Instrument(size=100, lock=f"N{i}-12")
            instru.simulate()
            instru.write(path, mode="w")
        # Test that any other raises an error
        with pytest.raises(ValueError):
            Instrument(size=100, lock="whatever")
        with pytest.raises(ValueError):
            Instrument(size=100, lock="N7-12")
        with pytest.raises(ValueError):
            Instrument(size=100, lock="N1-67")

def test_initial_telemetry_size():
    """Test that simulations can run with a nonzero initial telemetry size."""

    orbit_paths = [
        "tests/data/keplerian-orbits-1-0-2.h5",
        "tests/data/esa-orbits-1-0-2.h5",
        "tests/data/keplerian-orbits-2-0.h5",
        "tests/data/esa-trailing-orbits-2-0.h5",
    with TemporaryDirectory() as temp_dir:
        path = os.path.join(temp_dir, "measurements.h5")
        for orbit_path in orbit_paths:
            # Read orbit file t0 and leave room for initial telemetry
            with h5py.File(orbit_path, "r") as orbitf:
                t0 = orbitf.attrs["t0"] + 1e6
            instru = Instrument(
                initial_telemetry_size=10,
                telemetry_downsampling=100,
                orbits=orbit_path,
                size=100,
                t0=t0,
            )
            instru.simulate()
            instru.write(path, mode="w")


def test_output_order_of_magnitude():
    """Test that output time series have the correct order of magnitude.

    Mostly follows the reasoning outlined in 10.1103/PhysRevD.107.083019, section IX A.

    We use a short simulation of 1000s.

    Given the short simulation, we expect large variances for each individual frequency bin
    We compute the mean of the ratio PSD/model as summary statistic, which should
    stay close to 1. To account for the simplistic model, we allow deviations
    by up to one order of magnitude.
    """

    # Run short simulation with equal arms, everything else at default parameters
    # We fix the noise seed for reproducibility
    size = 1e3
    ltt = 8.3
    instr = Instrument(
        size=size, orbits={mosa: ltt for mosa in Instrument.MOSAS}, seed=1
    )
    instr.simulate()

    def estimate_psd(data, detrend=None):
        """Estimate the PSD of a time series. We drop the first and last 5 points to avoid edge effects."""
        f, psd = welch(
            data, fs=instr.fs, nperseg=data.size, window=("kaiser", 15), detrend=detrend
        )
        return f[5:-5], psd[5:-5]

    # define the locking and non-locking ISI and RFI for the default locking configuration N1-12
    locking_isi = ["21", "31"]
    non_locking_isi = ["12", "23", "32", "13"]

    locking_rfi = ["13", "23", "32"]
    non_locking_rfi = ["12", "21", "31"]

    # We need to skip some samples in the beginning to account for delays and filter warm up times
    skip = 100

    # Estimate PSDs for different outputs
    locking_isi_psds = np.array(
        [
            estimate_psd(instr.isi_carrier_fluctuations[mosa][skip:])
            for mosa in locking_isi
        ]
    )
    locking_rfi_psds = np.array(
        [
            estimate_psd(instr.rfi_carrier_fluctuations[mosa][skip:])
            for mosa in locking_rfi
        ]
    )

    # Define frequency axis
    f = locking_isi_psds[0][0]

    # We expect the locking IFOs to be close to the numerical limit.
    # We don't expect to reach the theoretical machine epsilon of double
    # precision (around 15 decimal digits, so 30 orders of magnitude in PSD),
    # due to accumulation of errors.
    # Instead, we check that the locking beatnotes have PSDs at least 25 orders of
    # magnitude below the level of the primary noise source.
    for i in range(2):
        assert np.mean((locking_isi_psds[i][1] / instr.laser_asds["12"] ** 2)) <= 1e-25
    for i in range(3):
        assert np.mean((locking_rfi_psds[i][1] / instr.laser_asds["12"] ** 2)) <= 1e-25

    # Estimate non-locking ISI PSDs
    non_locking_isi_psds = np.array(
        [
            estimate_psd(instr.isi_carrier_fluctuations[mosa][skip:])
            for mosa in non_locking_isi
        ]
    )

    # For non-locking ISI on S/C 1, the laser noise appears modulated by a round-trip delay
    # We add a small offset to avoid exact zeros in the transfer function
    non_locking_isi_model_psd_1 = instr.laser_asds["12"] ** 2 * (
        np.abs(1 - np.exp(-1j * 2 * np.pi * f * 2 * ltt)) ** 2 + 0.01
    )

    # For non-locking ISI on S/C 2 and 3, the laser noise appears modulated by a single delay
    # We add a small offset to avoid exact zeros in the transfer function
    non_locking_isi_model_psd_2 = instr.laser_asds["12"] ** 2 * (
        np.abs(1 - np.exp(-1j * 2 * np.pi * f * ltt)) ** 2 + 0.01
    )

    # For the ISIs, handle 12, 13 separately from 23, 32
    for i in [0, 3]:
        assert (
            0.1
            <= np.mean((non_locking_isi_psds[i][1] / non_locking_isi_model_psd_1))
            <= 10
        )
    for i in [1, 2]:
        assert (
            0.1
            <= np.mean((non_locking_isi_psds[i][1] / non_locking_isi_model_psd_2))
            <= 10
        )

    # Estimate non-locking RFI PSDs
    non_locking_rfi_psds = np.array(
        [
            estimate_psd(instr.rfi_carrier_fluctuations[mosa][skip:])
            for mosa in non_locking_rfi
        ]
    )

    # For the RFI: the model is given by the secondary noises uncorrelated with the lockign RFI on the same S/C
    # There is a factor 2 since the noise of the locking RFI is imprinted on the laser and propagated to the non-locking one.
    rfi_secondary_psd = (
        instr.oms_rfi_carrier_asds["12"] ** 2 + instr.backlink_asds["12"] ** 2
    )
    m_2_hz = (2 * np.pi * f / SPEED_OF_LIGHT * instr.central_freq) ** 2
    non_locking_rfi_model_psd = 2 * rfi_secondary_psd * m_2_hz

    # We expect the same PSD for all non-locking RFIs
    for i in range(3):
        assert (
            0.1
            <= np.mean((non_locking_rfi_psds[i][1] / non_locking_rfi_model_psd))
            <= 10
        )

    # Estimate TMI PSDs
    tmi_psds = np.array(
        [
            estimate_psd(instr.tmi_carrier_fluctuations[mosa][skip:])
            for mosa in Instrument.MOSAS
        ]
    )

    # For the TMI: the dominant effect is the jitter of the S/C
    tmi_model = instr.mosa_jitter_x_asds["12"] ** 2
    tmi_model_psd = 4 * tmi_model * m_2_hz

    # We expect the same PSD for all TMIs
    for i in range(6):
        assert 0.1 <= np.mean(tmi_psds[i][1] / tmi_model_psd) <= 10

    # Test the MPRs roughly measure the nominal LTT
    # We don't expect this to be exact, due to clock effects and ranging errors
    for mosa in instr.MOSAS:
        np.testing.assert_allclose(8.3, instr.mprs[mosa][50:], atol=0.1)

    # Test the PSD of the MPRs
    # We expect them to be consistent with ranging noise
    mpr_psds = np.array(
        [
            estimate_psd(instr.mprs[mosa][skip:], detrend="linear")
            for mosa in Instrument.MOSAS
        ]
    )

    # Ranging noise is a white noise, so we directly compare against the nominal PSD
    for i in range(6):
        assert 0.1 <= np.mean(mpr_psds[i][1] / instr.ranging_asds["12"] ** 2) <= 10

    # Test the PSD of the ISI sideband measurements
    # We take the difference of the carrier and sideband to cancel out
    # the dominant laser noise
    isi_sb_psds = np.array(
        [
            estimate_psd(
                (
                    instr.isi_carrier_fluctuations[mosa]
                    - instr.isi_usb_fluctuations[mosa]
                )[skip:]
            )
            for mosa in instr.MOSAS
        ]
    )

    # The main noise sources affecting the ISI sidebands are
    # the modulation noise, clock noise and OMS noise
    # Clock and modulation noises enter scaled by the modulation frequencies
    left_mod_model = (
        instr.modulation_freqs["12"] ** 2
        * instr.modulation_asds["12"] ** 2
        * f ** (2 / 3)
    )
    right_mod_model = (
        instr.modulation_freqs["21"] ** 2
        * instr.modulation_asds["21"] ** 2
        * f ** (2 / 3)
    )
    clock_model = instr.modulation_freqs["12"] ** 2 * instr.clock_asds["1"] ** 2 * f**-1
    sb_readout_model = instr.oms_isi_usb_asds["12"] ** 2 * m_2_hz
    isi_usb_model = (
        2 * clock_model + left_mod_model + right_mod_model + sb_readout_model
    )

    for i in range(6):
        assert 0.1 <= np.mean(isi_sb_psds[i][1] / isi_usb_model) <= 10

    # Test the PSD of the RFI sideband measurements
    rfi_sb_psds = np.array(
        [
            estimate_psd(
                (
                    instr.rfi_carrier_fluctuations[mosa]
                    - instr.rfi_usb_fluctuations[mosa]
                )[skip:]
            )
            for mosa in instr.MOSAS
        ]
    )

    # The main noise sources affecting the RFI sidebands are
    # the modulation noise and OMS noise. Clock noise is common mode
    # in both interfering beams and cancels out.
    # Modulation noises enter scaled by the modulation frequencies.
    rfi_sbreadoutmodel = instr.oms_rfi_usb_asds["12"] ** 2 * m_2_hz
    rfi_usb_model = left_mod_model + right_mod_model + rfi_sbreadoutmodel

    for i in range(6):
        assert 0.1 <= np.mean(rfi_sb_psds[i][1] / rfi_usb_model) <= 10

    # Estimate the PSD of DWS measurements
    dws_eta_psds = np.array(
        [estimate_psd((instr.isi_dws_etas[mosa])[skip:]) for mosa in instr.MOSAS]
    )

    dws_phi_psds = np.array(
        [estimate_psd((instr.isi_dws_phis[mosa])[skip:]) for mosa in instr.MOSAS]
    )

    # DWS measures the combined SC + MOSA jitter of the receiving S/C + DWS noise
    dws_eta_model = (
        instr.dws_asds["12"] ** 2
        + instr.mosa_jitter_eta_asds["12"] ** 2
        + instr.sc_jitter_eta_asds["1"] ** 2
    ) * (2 * np.pi * f) ** 2

    dws_phi_model = (
        instr.dws_asds["12"] ** 2
        + instr.mosa_jitter_phi_asds["12"] ** 2
        + instr.sc_jitter_phi_asds["1"] ** 2
    ) * (2 * np.pi * f) ** 2

    for i in range(6):
        assert 0.1 <= np.mean(dws_eta_psds[i][1] / dws_eta_model) <= 10
        assert 0.1 <= np.mean(dws_phi_psds[i][1] / dws_phi_model) <= 10