diff --git a/.pylintrc b/.pylintrc index ab877e36741a4d32fcdd3ada8294435f02653546..8a1b76f74082030df0add1d4ab99205360b68ec0 100644 --- a/.pylintrc +++ b/.pylintrc @@ -15,7 +15,7 @@ max-line-length=120 [DESIGN] # Maximum number of locals for function / method body -max-locals=20 +max-locals=30 # Maximum number of arguments for function / method max-args=10 diff --git a/lisainstrument/__init__.py b/lisainstrument/__init__.py index 25ceeb83f474c6f522e0944ab88e42b799e1038e..1781ab0e11e17b9ffccef1516e9ee6a88c3792cb 100644 --- a/lisainstrument/__init__.py +++ b/lisainstrument/__init__.py @@ -6,4 +6,4 @@ from .meta import __version__ from .meta import __author__ from .meta import __email__ -from .lisa import Instrument +from .instrument import Instrument diff --git a/lisainstrument/containers.py b/lisainstrument/containers.py index 2d50f23b9c877f75d1c9d224dacc2dfa92d2c6c9..bd08ee24481252c82d8f812e5c4c5f897b184670 100644 --- a/lisainstrument/containers.py +++ b/lisainstrument/containers.py @@ -7,108 +7,163 @@ Authors: Jean-Baptiste Bayle <j2b.bayle@gmail.com> """ -# TODO: -# * modulated beam, 4 args (carrier_offsets, carrier_fluctuations, usb_offsets, usb_fluctuations) -# * args can be scalars (repeated in dict), dict (check and convert to float), callable (call with mosa as arg) +import abc +import logging +import numpy +import h5py -class Signal: - """Represent a signal expressed as frequency offsets and fluctuations.""" +class ForEachObject(abc.ABC): + """Abstract class which represents a dictionary holding a value for each object.""" - def __init__(self, offsets=0, fluctuations=0): - """Initialize a signal from frequency offsets and fluctuations. + def __init__(self, values): + """Initialize an object with a value or a function of the mosa index. Args: - offsets: frequency offsets [Hz] - fluctuations: frequency fluctuations [Hz] + values: a value, a dictionary of values, or a function (mosa -> value) """ - if callable(offsets): - self.offsets = {mosa: offsets(mosa) for mosa in Instrument.MOSAS} + if isinstance(values, dict): + self.dict = {mosa: values[mosa] for mosa in self.indices()} + elif callable(values): + self.dict = {mosa: values(mosa) for mosa in self.indices()} + elif isinstance(values, h5py.Dataset): + self.dict = {mosa: values[mosa] for mosa in self.indices()} else: - self.offsets = Instrument.mosa_dict(offsets) + self.dict = {mosa: values for mosa in self.indices()} - if callable(fluctuations): - self.fluctuations = {mosa: fluctuations(mosa) for mosa in Instrument.MOSAS} - else: - self.fluctuations = Instrument.mosa_dict(fluctuations) + @classmethod + @abc.abstractmethod + def indices(cls): + """Return list of object indices.""" + raise NotImplementedError - def transformed(self, offsets=lambda x, mosa: x, fluctuations=lambda x, mosa: x): - """Return a new two-variable signal transforming offsets and fluctuations. + def transform(self, transformation): + """Transform dictionary on-the-spot. Args: - offsets: function of (offsets, mosa) returning new offsets [Hz] - fluctuations: function of (fluctuations, mosa) returning new fluctuations [Hz] - - Returns: - A new `Signal` isntance where offsets and fluctuations have been transformed. + transformation: function (mosa, value -> new_value) """ - return self.__class__( - offsets={ - mosa: offsets(self.offsets[mosa], mosa) - for mosa in Instrument.MOSAS - }, - fluctuations={ - mosa: fluctuations(self.fluctuations[mosa], mosa) - for mosa in Instrument.MOSAS - }, - ) - - def reduce(self, function=lambda offsets, fluctuations: 0): - """Compute a new MOSA dictionary from offsets and fluctuations. + self.dict = {mosa: transformation(mosa, self[mosa]) for mosa in self.indices()} + + def transformed(self, transformation): + """Return a new dictionary from transformed objects. Args: - function: function of (offsets, fluctuations) returning new value + transformation: function (mosa, value -> new_value) """ - return { - mosa: function(self.offsets[mosa], self.fluctuations[mosa]) - for mosa in Instrument.MOSAS - } - - @property - def totalfreq(self): - """Return total frequencies, as the sum of offsets and fluctuations.""" - return self.reduce(lambda offsets, fluctuations: offsets + fluctuations) - + return self.__class__(lambda mosa: transformation(mosa, self[mosa])) -class ModulatedBeam: - """Represent a modulated beam, with a carrier, an upper sideband, and optionally timer deviations.""" - - def __init__(self, carrier, usb, timer_deviations=None): - """Initialize a modulated beam from carrier and upper sideband signals. + def write(self, hdf5, dataset): + """Write values in dataset on HDF5 file. Args: - carrier: carrier signal - usb: upper sideband signal - timer_deviations: timer deviations [s] + hdf5: HDF5 file in which to write + dataset: dataset name or path """ - if not isinstance(carrier) or not isinstance(usb, Signal): - raise TypeError("carrier and upper sideband should be instances of `Signal`") - - self.carrier = carrier - self.usb = usb - self.timer_deviations = Instrument.mosa_dict(timer_deviations) + # Retreive the maximum size of data + size = 1 + for value in self.values(): + if numpy.isscalar(value): + continue + if size != 1 and len(value) != size: + raise ValueError(f"incompatible sizes in dictionary '{size}' and '{len(size)}'") + size = max(size, len(value)) + logging.debug("Writing dataset of size '%s' with '%s' columns", size, len(self.indices())) + # Write dataset + dtype = numpy.dtype({'names': self.indices(), 'formats': len(self.indices()) * [numpy.float64]}) + hdf5.create_dataset(dataset, (size,), dtype=dtype) + for index in self.indices(): + hdf5[dataset][index] = self[index] + + def __getitem__(self, key): + return self.dict[key] + + def __setitem__(self, key, item): + self.dict[key] = item + + def values(self): + """Return dictionary values.""" + return self.dict.values() + + def keys(self): + """Return dictionary keys.""" + return self.dict.keys() + + def items(self): + """Return dictionary items.""" + return self.dict.items() + + def __repr__(self): + return repr(self.dict) + + +class ForEachSC(ForEachObject): + """Represents a dictionary of values for each spacecraft.""" + + @classmethod + def indices(cls): + return ['1', '2', '3'] + + @staticmethod + def distant_left(sc): + """Return index of distant left spacecraft.""" + if sc not in ForEachSC.indices(): + raise ValueError(f"invalid spacecraft index '{sc}'") + return f'{(int(sc) - 2) % 3 + 1}' + + @staticmethod + def distant_right(sc): + """Return index of distant right spacecraft.""" + if sc not in ForEachSC.indices(): + raise ValueError(f"invalid spacecraft index '{sc}'") + return f'{int(sc) % 3 + 1}' + + @staticmethod + def left_mosa(sc): + """Return index of left MOSA.""" + if sc not in ForEachSC.indices(): + raise ValueError(f"invalid spacecraft index '{sc}'") + return f'{sc}{ForEachSC.distant_left(sc)}' + + @staticmethod + def right_mosa(sc): + """Return index of right MOSA.""" + if sc not in ForEachSC.indices(): + raise ValueError(f"invalid spacecraft index '{sc}'") + return f'{sc}{ForEachSC.distant_right(sc)}' + + +class ForEachMOSA(ForEachObject): + """Represents a dictionary of values for each moveable optical subassembly (MOSA).""" + + @classmethod + def indices(cls): + return ['12', '23', '31', '13', '32', '21'] + + @staticmethod + def sc(mosa): + """Return index of spacecraft hosting MOSA.""" + return f'{mosa[0]}' + + @staticmethod + def distant(mosa): + """Return index of distant MOSA. + + In practive, we invert the indices to swap emitter and receiver. + """ + if mosa not in ForEachMOSA.indices(): + raise ValueError(f"invalid MOSA index '{mosa}'") + return f'{mosa[1]}{mosa[0]}' - def transformed(self, - carrier_offsets=lambda x, mosa: x, carrier_fluctuations=lambda x, mosa: x, - usb_offsets=lambda x, mosa: x, usb_fluctuations=lambda x, mosa: x, - timer_deviations=lambda x, mosa: x): - """Return a new modulated beam after applying transformations. + @staticmethod + def adjacent(mosa): + """Return index of adjacent MOSA. - Args: - carrier_offsets: function of (offsets, mosa) returning new carrier offsets [Hz] - carrier_fluctuations: function of (fluctuations, mosa) returning new carrier fluctuations [Hz] - usb_offsets: function of (offsets, mosa) returning new upper sideband offsets [Hz] - usb_fluctuations: function of (fluctuations, mosa) returning new upper sideband fluctuations [Hz] - timer_deviations: function of (deviations, mosa) return new timer deviations [s] - - Returns: - A new `ModulatedBeam` isntance where signals have been transformed. + In practice, we replace the second index by the only unused spacecraft index. """ - return self.__class__( - carrier=self.carrier.transformed(carrier_offsets, carrier_fluctuations), - usb=self.usb.transformed(usb_offsets, usb_fluctuations), - timer_deviations={ - mosa: timer_deviations(self.timer_deviations[mosa], mosa) - for mosa in Instrument.MOSAS - } - ) + if mosa not in ForEachMOSA.indices(): + raise ValueError(f"invalid MOSA index '{mosa}'") + unused = [sc for sc in ForEachSC.indices() if sc not in mosa] + if len(unused) != 1: + raise RuntimeError(f"cannot find adjacent MOSA for '{mosa}'") + return f'{mosa[0]}{unused[0]}' diff --git a/lisainstrument/instrument.py b/lisainstrument/instrument.py new file mode 100644 index 0000000000000000000000000000000000000000..c59a8f723ff2c592f307d38878c3bdd7336588d3 --- /dev/null +++ b/lisainstrument/instrument.py @@ -0,0 +1,1051 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# pylint: disable=too-many-lines +""" +LISA Instrument module. + +Authors: + Jean-Baptiste Bayle <j2b.bayle@gmail.com> +""" + +import logging +import h5py +import scipy.interpolate +import numpy + +from .containers import ForEachSC +from .containers import ForEachMOSA + +from . import meta +from . import dsp +from . import noises + + +class Instrument: + """Represents an instrumental simulation.""" + # pylint: disable=attribute-defined-outside-init + + # Indexing conventions + SCS = ForEachSC.indices() + MOSAS = ForEachMOSA.indices() + + def __init__(self, size=2592000, dt=0.3, t0=0, + # Inter-spacecraft propagation + orbits=None, gws=None, interpolation=('lagrange', 31), + # Lasers + laser_asds=28.2, modulation_asds=1E-14, modulation_fknees=1.5E-2, + central_freq=2.816E14, modulation_freqs=None, offsets_freqs=None, three_lasers=False, + # Clocks + clock_asds=6.32E-14, clock_offsets=0, clock_freqoffsets=None, clock_freqlindrifts=None, + clock_freqquaddrifts=None, clockinv_tolerance=1E-10, clockinv_maxiter=5, + # Optical pathlength noises + backlink_asds=3E-12, backlink_fknees=2E-3, testmass_asds=2.4E-15, testmass_fknees=0.4E-3, + # Pseudo-ranging + ranging_biases=0, ranging_asds=3E-9, + # Physics simulation sampling and filtering + physics_upsampling=10, aafilter=('kaiser', 240, 0.3, 1.35)): + """Initialize an instrumental simulation. + + Args: + size: number of samples to generate + dt: sampling period [s] + t0: initial time [s] + orbits: path to orbit file or None for default set of proper pseudo-ranges and derivatives thereof + gws: dictionary of gravitational-wave responses or path to gravitational-wave file + interpolation: interpolation function or interpolation method and parameters; when using 'lagrange', + use a tuple ('lagrange', order) with `order` the odd interpolation order; an arbitrary function should + take (x, shift [number of samples]) as parameter + laser_asds: dictionary of amplitude spectral densities for laser noise [Hz/sqrt(Hz)] + modulation_asds: dictionary of amplitude spectral densities for modulation noise [s/sqrt(Hz)] + modulation_fknees: dictionary of cutoff frequencies for modulation noise [Hz] + central_freq: laser central frequency from which all offsets are computed [Hz] + modulation_freqs: dictionary of modulation frequencies [Hz] + offsets_freqs: dictionary of laser frequency offsets [Hz] + three_lasers: whether the only simulate one laser per spacecraft + clock_asds: dictionary of clock noise amplitude spectral densities + clock_offsets: dictionary of clock offsets + clock_freqoffsets: dictionary of clock frequency offsets [s^-1] + clock_freqlindrifts: dictionary of clock frequency linear drifts [s^-2] + clock_freqquaddrifts: dictionary of clock frequency quadratic drifts [s^-2] + clockinv_tolerance: convergence tolerance for timer deviation inversion [s] + clockinv_maxiter: maximum number of iterations for timer deviation inversion + backlink_asds: dictionary of amplitude spectral densities for backlink noise [m/sqrt(Hz)] + backlink_fknees: dictionary of cutoff frequencied for backlink noise [Hz] + testmass_asds: dictionary of amplitude spectral densities for test-mass noise [ms^(-2)/sqrt(Hz)] + testmass_fknees: dictionary of cutoff frequencied for test-mass noise [Hz] + ranging_biases: dictionary of ranging noise bias [s] + ranging_asds: dictionary of ranging noise amplitude spectral densities [s/sqrt(Hz)] + physics_upsampling: ratio of sampling frequencies for physics vs. measurement simulation + aafilter: antialiasing filter function or filter design method; when using 'kaiser', use a tuple + ('kaiser', attenuation [dB], f1 [Hz], f2 [Hz]) with attenuation the attenuation in the stopband, + and f1 < f2 the frequencies defining the transition band; use `None` for no filter + """ + # pylint: disable=too-many-arguments + logging.info("Initializing instrumental simulation") + self.git_url = 'https://gitlab.in2p3.fr/lisa-simulation/instrument' + self.version = meta.__version__ + self.simulated = False + + # Measurement sampling + self.size = int(size) + self.dt = float(dt) + self.t0 = float(t0) + self.fs = 1 / self.dt + logging.info("Computing measurement time vector (size=%s, dt=%s)", self.size, self.dt) + self.t = self.t0 + numpy.arange(self.size, dtype=numpy.float64) * self.dt + + # Physics sampling + self.physics_upsampling = int(physics_upsampling) + self.physics_size = self.size * self.physics_upsampling + self.physics_dt = self.dt / self.physics_upsampling + self.physics_fs = self.fs * self.physics_upsampling + logging.info("Computing physics time vector (size=%s, dt=%s)", self.physics_size, self.physics_dt) + self.physics_t = self.t0 + numpy.arange(self.physics_size, dtype=numpy.float64) * self.physics_dt + + # Instrument topology + self.central_freq = float(central_freq) + self.three_lasers = bool(three_lasers) + + # Laser and modulation noise + self.laser_asds = ForEachMOSA(laser_asds) + self.modulation_asds = ForEachMOSA(modulation_asds) + self.modulation_fknees = ForEachMOSA(modulation_fknees) + self.modulation_freqs = ForEachMOSA(modulation_freqs) if modulation_freqs is not None \ + else ForEachMOSA({ # Default based on mission baseline 2.4 MHz/2.401 MHz for left and right MOSAs + '12': 2.4E9, '23': 2.4E9, '31': 2.4E9, + '13': 2.401E9, '32': 2.401E9, '21': 2.401E9, + }) + + # Clocks + self.clock_asds = ForEachSC(clock_asds) + self.clock_offsets = ForEachSC(clock_offsets) + self.clock_freqoffsets = ForEachSC(clock_freqoffsets) if clock_freqoffsets is not None \ + else ForEachSC({ # Default based on LISANode + '1': 5E-8, '2': 6.25E-7, '3': -3.75E-7 + }) + self.clock_freqlindrifts = ForEachSC(clock_freqlindrifts) if clock_freqoffsets is not None \ + else ForEachSC({ # Default based on LISANode + '1': 1.6E-15, '2': 2E-14, '3': -1.2E-14 + }) + self.clock_freqquaddrifts = ForEachSC(clock_freqquaddrifts) if clock_freqoffsets is not None \ + else ForEachSC({ # Default based on LISANode + '1': 9E-24, '2': 6.75E-23, '3': -1.125e-22 + }) + + # Clock-noise inversion + self.clockinv_tolerance = float(clockinv_tolerance) + self.clockinv_maxiter = int(clockinv_maxiter) + + # Ranging noise + self.ranging_biases = ForEachMOSA(ranging_biases) + self.ranging_asds = ForEachMOSA(ranging_asds) + + # Backlink and test-mass acceleration noise + self.backlink_asds = ForEachMOSA(backlink_asds) + self.backlink_fknees = ForEachMOSA(backlink_fknees) + self.testmass_asds = ForEachMOSA(testmass_asds) + self.testmass_fknees = ForEachMOSA(testmass_fknees) + + # Frequency plan + self.offsets_freqs = ForEachMOSA(offsets_freqs) if offsets_freqs is not None \ + else ForEachMOSA({ # Default based on default for LISANode + '12': 8.1E6, '23': 9.2E6, '31': 10.3E6, '13': 1.4E6, '32': -11.6E6, '21': -9.5E6, + }) + + # Orbits and gravitational waves + self.gws = None + self.pprs = None + self.d_pprs = None + self.init_orbits(orbits) + self.init_gws(gws) + + # Interpolation and antialiasing filter + self.init_interpolation(interpolation) + self.init_aafilter(aafilter) + + def init_interpolation(self, interpolation): + """Initialize interpolation.""" + if interpolation is None: + logging.info("Disabling interpolation") + designed_interpolation = dsp.identity + elif callable(interpolation): + logging.info("Using user-provided interpolation function") + designed_interpolation = interpolation + else: + logging.info("Designing interpolation function") + designed_interpolation = Instrument.design_interpolation(interpolation) + self.interpolate = lambda x, shift: x if numpy.isscalar(x) else designed_interpolation(x, shift) + + @staticmethod + def design_interpolation(parameters): + """Design the interpolation function. + + We currently support Lagrande interpolation. + + Args: + parameters: see `interpolation` docstring in `__init__()` + + Returns: + A function which interpolates data. + """ + method = parameters[0] + if method == 'lagrange': + logging.debug("Using Lagrange interpolations") + order = parameters[1] + logging.debug("Lagrande interpolation order is %s", order) + return lambda x, shift: dsp.timeshift(x, shift, order=order) + + raise ValueError(f"invalid interpolation parameters '{parameters}'") + + def init_aafilter(self, aafilter): + """Initialize antialiasing filter and downsampling.""" + if aafilter is None: + logging.info("Disabling antialiasing filter") + designed_aafilter = dsp.identity + elif callable(aafilter): + logging.info("Using user-provided antialiasing filter function") + designed_aafilter = aafilter + else: + logging.info("Design antialiasing filter (%s)", aafilter) + designed_aafilter = self.design_aafilter(aafilter) + self.aafilter = lambda _, x: x if numpy.isscalar(x) else designed_aafilter(x) + self.downsampled = lambda _, x: x if numpy.isscalar(x) else x[::self.physics_upsampling] + + def design_aafilter(self, parameters): + """Design the antialiasing filter. + + We currently support finite-impulse response filter designed from a Kaiser window. + The order and beta parameters of the Kaiser window are deduced from the desired attenuation + and transition bandwidth of the filter. + + Args: + parameters: see `aafilter` docstring in `__init__()` + + Returns: + A function that filters data. + """ + method = parameters[0] + nyquist = self.physics_fs / 2 + + if method == 'kaiser': + logging.debug("Designing finite-impulse response filter from Kaiser window") + attenuation, freq1, freq2 = parameters[1], parameters[2], parameters[3] + if attenuation == 0: + logging.debug("Vanishing filter attenuation, disabling filtering") + return lambda x: x + logging.debug("Filter attenuation is %s dB", attenuation) + logging.debug("Filter transition band is [%s Hz, %s Hz]", freq1, freq2) + numtaps, beta = scipy.signal.kaiserord(attenuation, (freq2 - freq1) / nyquist) + logging.debug("Kaiser window has %s taps and beta is %s", numtaps, beta) + taps = scipy.signal.firwin(numtaps, (freq1 + freq2) / (2 * nyquist), window=('kaiser', beta)) + logging.debug("Filter taps are %s", taps) + return lambda x: scipy.signal.lfilter(taps, 1, x) + + raise ValueError(f"invalid filter parameters '{parameters}'") + + def init_orbits(self, orbits): + """Initialize orbits.""" + if isinstance(orbits, str): + logging.info("Interpolating orbits from orbit file '%s'", orbits) + self.orbit_file = orbits + orbitf = h5py.File(self.orbit_file, 'r') + logging.debug("Interpolating proper pseudo-ranges") + self.pprs = ForEachMOSA(lambda mosa: scipy.interpolate.InterpolatedUnivariateSpline( + orbitf['tcb']['t'][:], orbitf['tcb'][f'l_{mosa}']['ppr'], k=1, ext='raise')(self.physics_t) + ) + logging.debug("Interpolating proper pseudo-range derivatives") + self.d_pprs = ForEachMOSA(lambda mosa: scipy.interpolate.InterpolatedUnivariateSpline( + orbitf['tcb']['t'][:], orbitf['tcb'][f'l_{mosa}']['d_ppr'], k=1, ext='raise')(self.physics_t) + ) + orbitf.close() + elif orbits is None: + logging.info("No orbit file provided, using default set of proper pseudo-ranges and derivatives thereof") + self.orbit_file = None + self.pprs = ForEachMOSA({ # Default PPRs based on first samples of Keplerian orbits (v1.0) + '12': 8.3324, '23': 8.3028, '31': 8.3324, + '13': 8.3315, '32': 8.3044, '21': 8.3315, + }) + self.d_pprs = ForEachMOSA({ # Default PPR derivatives based on first samples of Keplerian orbits (v1.0) + '12': 3.1977E-9, '23': -5.0241E-9, '31': -3.1977E-9, + '13': -3.1977E-9, '32': -5.0252E-9, '21': 3.1977E-9, + }) + else: + raise TypeError(f"invalid orbits '{orbits}', should be path to orbit file or None") + + def init_gws(self, gws): + """Initialize gravitational-wave responses.""" + if isinstance(gws, str): + self.gw_file = gws + logging.info("Interpolating gravitational-wave responses from GW file '%s'", self.gw_file) + gwf = h5py.File(self.gw_file, 'r') + self.gws = ForEachMOSA(lambda mosa: scipy.interpolate.InterpolatedUnivariateSpline( + gwf['t'][:], gwf[f'l_{mosa}'][:], k=1, ext='raise')(self.physics_t) + ) + gwf.close() + elif gws is None: + logging.debug("No gravitational-wave responses") + self.gw_file = None + self.gws = ForEachMOSA(0) + else: + logging.info("Using user-provided gravitational-wave responses") + self.gw_file = None + self.gws = ForEachMOSA(gws) + + def disable_all_noises(self, but=None): + """Turn off all instrumental noises. + + Args: + but: optional category of noises to keep on ['laser', 'modulation', 'clock', 'pathlength', 'ranging'] + """ + if but != 'laser': + self.laser_asds = ForEachMOSA(0) + if but != 'modulation': + self.modulation_asds = ForEachMOSA(0) + if but != 'clock': + self.disable_clock_noises() + if but != 'pathlength': + self.disable_pathlength_noises() + if but != 'ranging': + self.disable_ranging_noises() + + def disable_clock_noises(self): + """Turn off all imperfections on clocks. + + This includes offsets, and frequency offsets and deviations. + """ + self.clock_asds = ForEachSC(0) + self.clock_offsets = ForEachSC(0) + self.clock_freqoffsets = ForEachSC(0) + self.clock_freqlindrifts = ForEachSC(0) + self.clock_freqquaddrifts = ForEachSC(0) + + def disable_pathlength_noises(self): + """Turn off all optical pathlength noises.""" + self.backlink_asds = ForEachMOSA(0) + self.testmass_asds = ForEachMOSA(0) + + def disable_ranging_noises(self): + """Turn off all pseudo-ranging noises.""" + self.ranging_biases = ForEachMOSA(0) + self.ranging_asds = ForEachMOSA(0) + + def disable_dopplers(self): + """Set proper pseudo-range derivatives to zero to turn off Doppler effects.""" + self.d_pprs = ForEachMOSA(0) + + def simulate(self, keep_all=False): + """Run a simulation, and generate all intermediary signals. + + Args: + keep_all: whether to keep all quantities in memory + """ + # pylint: disable=too-many-locals,too-many-statements,too-many-branches + + logging.info("Starting simulation") + self.keep_all = keep_all + self.simulated = True + self.simulate_noises() + + ## Local beams + + self.local_carrier_offsets = self.offsets_freqs + + logging.info("Computing carrier frequency fluctuations for local beams") + self.local_carrier_fluctuations = self.laser_noises + + logging.info("Computing upper sideband frequency offsets for local beams") + self.local_usb_offsets = ForEachMOSA(lambda mosa: + self.offsets_freqs[mosa] + self.modulation_freqs[mosa] * (1 + self.clock_noise_offsets[mosa[0]]) + ) + + logging.info("Computing upper sideband frequency fluctuations for local beams") + self.local_usb_fluctuations = ForEachMOSA(lambda mosa: + self.laser_noises[mosa] + self.modulation_freqs[mosa] \ + * (self.clock_noise_fluctuations[mosa[0]] + self.modulation_noises[mosa]) + ) + + logging.info("Computing local timer deviations") + self.local_timer_deviations = ForEachSC(lambda sc: + self.clock_offsets[sc] + numpy.cumsum( + numpy.broadcast_to(self.clock_noise_offsets[sc] + self.clock_noise_fluctuations[sc], \ + (self.physics_size,)) * self.dt) + ) + + ## Propagation to distant MOSA + + logging.info("Propagating carrier frequency offsets to distant MOSA") + self.distant_carrier_offsets = ForEachMOSA(lambda mosa: + (1 - self.d_pprs[mosa]) * \ + self.interpolate(self.local_carrier_offsets[ForEachMOSA.distant(mosa)], + -self.pprs[mosa] * self.physics_fs) \ + - self.d_pprs[mosa] * self.central_freq + ) + + logging.info("Propagating carrier frequency fluctuations to distant MOSA") + self.distant_carrier_fluctuations = ForEachMOSA(lambda mosa: + self.gws[mosa] + (1 - self.d_pprs[mosa]) * \ + self.interpolate(self.local_carrier_fluctuations[ForEachMOSA.distant(mosa)], + -self.pprs[mosa] * self.physics_fs) + ) + + logging.info("Propagating upper sideband frequency offsets to distant MOSA") + self.distant_usb_offsets = ForEachMOSA(lambda mosa: + (1 - self.d_pprs[mosa]) * \ + self.interpolate(self.local_usb_offsets[ForEachMOSA.distant(mosa)], + -self.pprs[mosa] * self.physics_fs) \ + - self.d_pprs[mosa] * self.central_freq + ) + + logging.info("Propagating upper sideband frequency fluctuations to distant MOSA") + self.distant_usb_fluctuations = ForEachMOSA(lambda mosa: + self.gws[mosa] + (1 - self.d_pprs[mosa]) * \ + self.interpolate(self.local_usb_fluctuations[ForEachMOSA.distant(mosa)], + -self.pprs[mosa] * self.physics_fs) + ) + + logging.info("Propagating timer deviations to distant MOSA") + self.distant_timer_deviations = ForEachMOSA(lambda mosa: + self.interpolate(self.local_timer_deviations[mosa[1]], + -self.pprs[mosa] * self.physics_fs) \ + - self.pprs[mosa] + ) + + ## Propagation to adjacent MOSA + + logging.info("Propagating carrier frequency offsets to adjacent MOSA") + self.adjacent_carrier_offsets = ForEachMOSA(lambda mosa: + self.local_carrier_offsets[ForEachMOSA.adjacent(mosa)] + ) + + logging.info("Propagating carrier frequency fluctuations to adjacent MOSA") + self.adjacent_carrier_fluctuations = ForEachMOSA(lambda mosa: + self.local_carrier_fluctuations[ForEachMOSA.adjacent(mosa)] \ + + self.central_freq * self.backlink_noises[mosa] + ) + + logging.info("Propagating upper sideband frequency offsets to adjacent MOSA") + self.adjacent_usb_offsets = ForEachMOSA(lambda mosa: + self.local_usb_offsets[ForEachMOSA.adjacent(mosa)] + ) + + logging.info("Propagating upper sideband frequency fluctuations to adjacent MOSA") + self.adjacent_usb_fluctuations = ForEachMOSA(lambda mosa: + self.local_usb_fluctuations[ForEachMOSA.adjacent(mosa)] \ + + self.central_freq * self.backlink_noises[mosa] + ) + + ## Inter-spacecraft interferometer local beams + + logging.info("Propagating local carrier frequency offsets to inter-spacecraft interferometer") + self.local_isc_carrier_offsets = self.local_carrier_offsets + + logging.info("Propagating local carrier frequency fluctuations to inter-spacecraft interferometer") + self.local_isc_carrier_fluctuations = self.local_carrier_fluctuations + + logging.info("Propagating local upper sideband frequency offsets to inter-spacecraft interferometer") + self.local_isc_usb_offsets = self.local_usb_offsets + + logging.info("Propagating local upper sideband frequency fluctuations to inter-spacecraft interferometer") + self.local_isc_usb_fluctuations = self.local_usb_fluctuations + + ## Inter-spacecraft interferometer distant beams + + logging.info("Propagating distant carrier frequency offsets to inter-spacecraft interferometer") + self.distant_isc_carrier_offsets = self.distant_carrier_offsets + + logging.info("Propagating distant carrier frequency fluctuations to inter-spacecraft interferometer") + self.distant_isc_carrier_fluctuations = self.distant_carrier_fluctuations + + logging.info("Propagating distant upper sideband frequency offsets to inter-spacecraft interferometer") + self.distant_isc_usb_offsets = self.distant_usb_offsets + + logging.info("Propagating distant upper sideband frequency fluctuations to inter-spacecraft interferometer") + self.distant_isc_usb_fluctuations = self.distant_usb_fluctuations + + ## Inter-spacecraft interferometer beatnotes on TPS (high-frequency) + + logging.info("Computing inter-spacecraft carrier beatnote frequency offsets on TPS") + self.tps_isc_carrier_offsets = ForEachMOSA(lambda mosa: + self.distant_isc_carrier_offsets[mosa] - self.local_isc_carrier_offsets[mosa] + ) + + logging.info("Computing inter-spacecraft carrier beatnote frequency fluctuations on TPS") + self.tps_isc_carrier_fluctuations = ForEachMOSA(lambda mosa: + self.distant_isc_carrier_fluctuations[mosa] - self.local_isc_carrier_fluctuations[mosa] + ) + + logging.info("Computing inter-spacecraft upper sideband beatnote frequency offsets on TPS") + self.tps_isc_usb_offsets = ForEachMOSA(lambda mosa: + self.distant_isc_usb_offsets[mosa] - self.local_isc_usb_offsets[mosa] + ) + + logging.info("Computing inter-spacecraft upper sideband beatnote frequency fluctuations on TPS") + self.tps_isc_usb_fluctuations = ForEachMOSA(lambda mosa: + self.distant_isc_usb_fluctuations[mosa] - self.local_isc_usb_fluctuations[mosa] + ) + + ## Measured pseudo-ranging on TPS grid (high-frequency) + + logging.info("Computing measured pseudo-ranges on TPS") + self.tps_mprs = ForEachMOSA(lambda mosa: + self.local_timer_deviations[ForEachMOSA.sc(mosa)] \ + - self.distant_timer_deviations[mosa] + self.ranging_noises[mosa] + ) + + ## Test-mass interferometer local beams + + logging.info("Propagating local carrier frequency offsets to test-mass interferometer") + self.local_tm_carrier_offsets = self.local_carrier_offsets + + logging.info("Propagating local carrier frequency fluctuations to test-mass interferometer") + self.local_tm_carrier_fluctuations = ForEachMOSA(lambda mosa: + self.local_carrier_fluctuations[mosa] + self.central_freq * self.testmass_noises[mosa] + ) + + logging.info("Propagating local upper sideband frequency offsets to test-mass interferometer") + self.local_tm_usb_offsets = self.local_usb_offsets + + logging.info("Propagating local upper sideband frequency fluctuations to test-mass interferometer") + self.local_tm_usb_fluctuations = ForEachMOSA(lambda mosa: + self.local_usb_fluctuations[mosa] + self.central_freq * self.testmass_noises[mosa] + ) + + ## Test-mass interferometer adjacent beams + + logging.info("Propagating adjacent carrier frequency offsets to test-mass interferometer") + self.adjacent_tm_carrier_offsets = self.adjacent_carrier_offsets + + logging.info("Propagating adjacent carrier frequency fluctuations to test-mass interferometer") + self.adjacent_tm_carrier_fluctuations = self.adjacent_carrier_fluctuations + + logging.info("Propagating adjacent upper sideband frequency offsets to test-mass interferometer") + self.adjacent_tm_usb_offsets = self.adjacent_usb_offsets + + logging.info("Propagating adjacent upper sideband frequency fluctuations to test-mass interferometer") + self.adjacent_tm_usb_fluctuations = self.adjacent_usb_fluctuations + + ## Test-mass interferometer beatnotes on TPS (high-frequency) + + logging.info("Computing test-mass carrier beatnote frequency offsets on TPS") + self.tps_tm_carrier_offsets = ForEachMOSA(lambda mosa: + self.adjacent_tm_carrier_offsets[mosa] - self.local_tm_carrier_offsets[mosa] + ) + + logging.info("Computing test-mass carrier beatnote frequency fluctuations on TPS") + self.tps_tm_carrier_fluctuations = ForEachMOSA(lambda mosa: + self.adjacent_tm_carrier_fluctuations[mosa] - self.local_tm_carrier_fluctuations[mosa] + ) + + logging.info("Computing test-mass upper sideband beatnote frequency offsets on TPS") + self.tps_tm_usb_offsets = ForEachMOSA(lambda mosa: + self.adjacent_tm_usb_offsets[mosa] - self.local_tm_usb_offsets[mosa] + ) + + logging.info("Computing test-mass upper sideband beatnote frequency fluctuations on TPS") + self.tps_tm_usb_fluctuations = ForEachMOSA(lambda mosa: + self.adjacent_tm_usb_fluctuations[mosa] - self.local_tm_usb_fluctuations[mosa] + ) + + ## Reference interferometer local beams + + logging.info("Propagating local carrier frequency offsets to reference interferometer") + self.local_ref_carrier_offsets = self.local_carrier_offsets + + logging.info("Propagating local carrier frequency fluctuations to reference interferometer") + self.local_ref_carrier_fluctuations = self.local_carrier_fluctuations + + logging.info("Propagating local upper sideband frequency offsets to reference interferometer") + self.local_ref_usb_offsets = self.local_usb_offsets + + logging.info("Propagating local upper sideband frequency fluctuations to reference interferometer") + self.local_ref_usb_fluctuations = self.local_usb_fluctuations + + ## Reference interferometer adjacent beams + + logging.info("Propagating adjacent carrier frequency offsets to reference interferometer") + self.adjacent_ref_carrier_offsets = self.adjacent_carrier_offsets + + logging.info("Propagating adjacent carrier frequency fluctuations to reference interferometer") + self.adjacent_ref_carrier_fluctuations = self.adjacent_carrier_fluctuations + + logging.info("Propagating adjacent upper sideband frequency offsets to reference interferometer") + self.adjacent_ref_usb_offsets = self.adjacent_usb_offsets + + logging.info("Propagating adjacent upper sideband frequency fluctuations to reference interferometer") + self.adjacent_ref_usb_fluctuations = self.adjacent_usb_fluctuations + + ## Reference interferometer beatnotes on TPS (high-frequency) + + logging.info("Computing reference carrier beatnote frequency offsets on TPS") + self.tps_ref_carrier_offsets = ForEachMOSA(lambda mosa: + self.adjacent_ref_carrier_offsets[mosa] - self.local_ref_carrier_offsets[mosa] + ) + + logging.info("Computing reference carrier beatnote frequency fluctuations on TPS") + self.tps_ref_carrier_fluctuations = ForEachMOSA(lambda mosa: + self.adjacent_ref_carrier_fluctuations[mosa] - self.local_ref_carrier_fluctuations[mosa] + ) + + logging.info("Computing reference upper sideband beatnote frequency offsets on TPS") + self.tps_ref_usb_offsets = ForEachMOSA(lambda mosa: + self.adjacent_ref_usb_offsets[mosa] - self.local_ref_usb_offsets[mosa] + ) + + logging.info("Computing reference upper sideband beatnote frequency fluctuations on TPS") + self.tps_ref_usb_fluctuations = ForEachMOSA(lambda mosa: + self.adjacent_ref_usb_fluctuations[mosa] - self.local_ref_usb_fluctuations[mosa] + ) + + ## Sampling beatnotes and measured pseudo-ranges to THE grid + + self.inverse_timer_deviations = ForEachSC(lambda sc: + self.invert_timer_deviations(self.local_timer_deviations[sc], sc) + ) + + timestamp = lambda x, sc: self.interpolate(x, -self.inverse_timer_deviations[sc] * self.physics_fs) + + logging.info("Sampling inter-spacecraft carrier beatnote frequency offsets to THE grid") + self.the_isc_carrier_offsets = ForEachMOSA(lambda mosa: + timestamp(self.tps_isc_carrier_offsets[mosa] / (1 + self.clock_noise_offsets[mosa[0]]), mosa[0]) + ) + logging.info("Sampling inter-spacecraft carrier beatnote frequency fluctuations to THE grid") + self.the_isc_carrier_fluctuations = ForEachMOSA(lambda mosa: + timestamp(self.tps_isc_carrier_fluctuations[mosa] / (1 + self.clock_noise_offsets[mosa[0]]) + - self.tps_isc_carrier_offsets[mosa] * self.clock_noise_fluctuations[mosa[0]] + / (1 + self.clock_noise_offsets[mosa[0]])**2, mosa[0]) + ) + + logging.info("Sampling inter-spacecraft upper sideband beatnote frequency offsets to THE grid") + self.the_isc_usb_offsets = ForEachMOSA(lambda mosa: + timestamp(self.tps_isc_usb_offsets[mosa] / (1 + self.clock_noise_offsets[mosa[0]]), mosa[0]) + ) + + logging.info("Sampling inter-spacecraft upper sideband beatnote frequency fluctuations to THE grid") + self.the_isc_usb_fluctuations = ForEachMOSA(lambda mosa: + timestamp(self.tps_isc_usb_fluctuations[mosa] / (1 + self.clock_noise_offsets[mosa[0]]) + - self.tps_isc_usb_offsets[mosa] * self.clock_noise_fluctuations[mosa[0]] + / (1 + self.clock_noise_offsets[mosa[0]])**2, mosa[0]) + ) + + logging.info("Sampling measured pseudo-ranges to THE grid") + self.the_mprs = ForEachMOSA(lambda mosa: + timestamp(self.tps_mprs[mosa], mosa[0]) + ) + + logging.info("Sampling test-mass beatnotes to THE grid") + self.the_tm_carrier_offsets = ForEachMOSA(lambda mosa: + timestamp(self.tps_tm_carrier_offsets[mosa] / (1 + self.clock_noise_offsets[mosa[0]]), mosa[0]) + ) + self.the_tm_carrier_fluctuations = ForEachMOSA(lambda mosa: + timestamp(self.tps_tm_carrier_fluctuations[mosa] / (1 + self.clock_noise_offsets[mosa[0]]) + - self.tps_tm_carrier_offsets[mosa] * self.clock_noise_fluctuations[mosa[0]] + / (1 + self.clock_noise_offsets[mosa[0]])**2, mosa[0]) + ) + self.the_tm_usb_offsets = ForEachMOSA(lambda mosa: + timestamp(self.tps_tm_usb_offsets[mosa] / (1 + self.clock_noise_offsets[mosa[0]]), mosa[0]) + ) + self.the_tm_usb_fluctuations = ForEachMOSA(lambda mosa: + timestamp(self.tps_tm_usb_fluctuations[mosa] / (1 + self.clock_noise_offsets[mosa[0]]) + - self.tps_tm_usb_offsets[mosa] * self.clock_noise_fluctuations[mosa[0]] + / (1 + self.clock_noise_offsets[mosa[0]])**2, mosa[0]) + ) + + logging.info("Sampling reference beatnotes to THE grid") + self.the_ref_carrier_offsets = ForEachMOSA(lambda mosa: + timestamp(self.tps_ref_carrier_offsets[mosa] / (1 + self.clock_noise_offsets[mosa[0]]), mosa[0]) + ) + self.the_ref_carrier_fluctuations = ForEachMOSA(lambda mosa: + timestamp(self.tps_ref_carrier_fluctuations[mosa] / (1 + self.clock_noise_offsets[mosa[0]]) + - self.tps_ref_carrier_offsets[mosa] * self.clock_noise_fluctuations[mosa[0]] + / (1 + self.clock_noise_offsets[mosa[0]])**2, mosa[0]) + ) + self.the_ref_usb_offsets = ForEachMOSA(lambda mosa: + timestamp(self.tps_ref_usb_offsets[mosa] / (1 + self.clock_noise_offsets[mosa[0]]), mosa[0]) + ) + self.the_ref_usb_fluctuations = ForEachMOSA(lambda mosa: + timestamp(self.tps_ref_usb_fluctuations[mosa] / (1 + self.clock_noise_offsets[mosa[0]]) + - self.tps_ref_usb_offsets[mosa] * self.clock_noise_fluctuations[mosa[0]] + / (1 + self.clock_noise_offsets[mosa[0]])**2, mosa[0]) + ) + + ## Antialiasing filtering + + logging.info("Filtering inter-spacecraft beatnotes") + self.filtered_isc_carrier_offsets = self.the_isc_carrier_offsets.transformed(self.aafilter) + self.filtered_isc_carrier_fluctuations = self.the_isc_carrier_fluctuations.transformed(self.aafilter) + self.filtered_isc_usb_offsets = self.the_isc_usb_offsets.transformed(self.aafilter) + self.filtered_isc_usb_fluctuations = self.the_isc_usb_fluctuations.transformed(self.aafilter) + + + logging.info("Filtering measured pseudo-ranges") + self.filtered_mprs = self.the_mprs.transformed(self.aafilter) + + logging.info("Filtering test-mass beatnotes") + self.filtered_tm_carrier_offsets = self.the_tm_carrier_offsets.transformed(self.aafilter) + self.filtered_tm_carrier_fluctuations = self.the_tm_carrier_fluctuations.transformed(self.aafilter) + self.filtered_tm_usb_offsets = self.the_tm_usb_offsets.transformed(self.aafilter) + self.filtered_tm_usb_fluctuations = self.the_tm_usb_fluctuations.transformed(self.aafilter) + + logging.info("Filtering reference beatnotes") + self.filtered_ref_carrier_offsets = self.the_ref_carrier_offsets.transformed(self.aafilter) + self.filtered_ref_carrier_fluctuations = self.the_ref_carrier_fluctuations.transformed(self.aafilter) + self.filtered_ref_usb_offsets = self.the_ref_usb_offsets.transformed(self.aafilter) + self.filtered_ref_usb_fluctuations = self.the_ref_usb_fluctuations.transformed(self.aafilter) + + ## Downsampling filtering + + logging.info("Downsampling inter-spacecraft beatnotes") + self.isc_carrier_offsets = self.filtered_isc_carrier_offsets.transformed(self.downsampled) + self.isc_carrier_fluctuations = self.filtered_isc_carrier_fluctuations.transformed(self.downsampled) + self.isc_usb_offsets = self.filtered_isc_usb_offsets.transformed(self.downsampled) + self.isc_usb_fluctuations = self.filtered_isc_usb_fluctuations.transformed(self.downsampled) + + logging.info("Filtering and downsampling measured pseudo-ranges") + self.mprs = self.filtered_mprs.transformed(self.downsampled) + + logging.info("Filtering and downsampling test-mass beatnotes") + self.tm_carrier_offsets = self.filtered_tm_carrier_offsets.transformed(self.downsampled) + self.tm_carrier_fluctuations = self.filtered_tm_carrier_fluctuations.transformed(self.downsampled) + self.tm_usb_offsets = self.filtered_tm_usb_offsets.transformed(self.downsampled) + self.tm_usb_fluctuations = self.filtered_tm_usb_fluctuations.transformed(self.downsampled) + + logging.info("Filtering and downsampling reference beatnotes") + self.ref_carrier_offsets = self.filtered_ref_carrier_offsets.transformed(self.downsampled) + self.ref_carrier_fluctuations = self.filtered_ref_carrier_fluctuations.transformed(self.downsampled) + self.ref_usb_offsets = self.filtered_ref_usb_offsets.transformed(self.downsampled) + self.ref_usb_fluctuations = self.filtered_ref_usb_fluctuations.transformed(self.downsampled) + + ## Total frequencies + + logging.info("Computing inter-spacecraft beatnote total frequencies") + self.isc_carriers = ForEachMOSA(lambda mosa: + self.isc_carrier_offsets[mosa] + self.isc_carrier_fluctuations[mosa] + ) + self.isc_usbs = ForEachMOSA(lambda mosa: + self.isc_usb_offsets[mosa] + self.isc_usb_fluctuations[mosa] + ) + + logging.info("Computing test-mass beatnote total frequencies") + self.tm_carriers = ForEachMOSA(lambda mosa: + self.tm_carrier_offsets[mosa] + self.tm_carrier_fluctuations[mosa] + ) + self.tm_usbs = ForEachMOSA(lambda mosa: + self.tm_usb_offsets[mosa] + self.tm_usb_fluctuations[mosa] + ) + + logging.info("Computing reference beatnote total frequencies") + self.ref_carriers = ForEachMOSA(lambda mosa: + self.ref_carrier_offsets[mosa] + self.ref_carrier_fluctuations[mosa] + ) + self.ref_usbs = ForEachMOSA(lambda mosa: + self.ref_usb_offsets[mosa] + self.ref_usb_fluctuations[mosa] + ) + + ## Closing simulation + logging.info("Simulation complete") + + def simulate_noises(self): + """Generate noise time series.""" + ## Laser noise + # TODO: optimize in case of three lasers + + logging.info("Generating laser noise time series") + self.laser_noises = ForEachMOSA(lambda mosa: + noises.laser(self.physics_fs, self.physics_size, self.laser_asds[mosa]) + ) + + if self.three_lasers: + for mosa in self.MOSAS[:3]: + self.laser_noises[ForEachMOSA.adjacent(mosa)] = self.laser_noises[mosa] + + ## Clock noise + # TODO: better optimize when clock_freqlindrifts and clock_freqquaddrifts == 0 + + logging.info("Generating clock noise time series") + t = self.physics_t + self.clock_noise_offsets = ForEachSC(lambda sc: + self.clock_freqoffsets[sc] + self.clock_freqlindrifts[sc] * t + self.clock_freqquaddrifts[sc] * t**2 + ) + + for sc in self.SCS: + if self.physics_size and numpy.all(self.clock_noise_offsets[sc] == self.clock_noise_offsets[sc][0]): + self.clock_noise_offsets[sc] = self.clock_noise_offsets[sc][0] + + + logging.info("Generating clock noise fluctuations time series") + self.clock_noise_fluctuations = ForEachSC(lambda sc: + noises.clock(self.physics_fs, self.physics_size, self.clock_asds[sc]) + ) + + ## Modulation noise + + logging.info("Generating modulation noise time series") + self.modulation_noises = ForEachMOSA(lambda mosa: + noises.modulation(self.physics_fs, self.physics_size, + self.modulation_asds[mosa], self.modulation_fknees[mosa]) + ) + + ## Backlink noise + + logging.info("Generating backlink noise time series") + self.backlink_noises = ForEachMOSA(lambda mosa: + noises.backlink(self.physics_fs, self.physics_size, self.backlink_asds[mosa], self.backlink_fknees[mosa]) + ) + + ## Test-mass acceleration noise + + logging.info("Generating test-mass acceleration noise time series") + self.testmass_noises = ForEachMOSA(lambda mosa: + noises.testmass(self.physics_fs, self.physics_size, + self.testmass_asds[mosa], self.testmass_fknees[mosa]) + ) + + ## Ranging noise + + logging.info("Generating ranging noise time series") + self.ranging_noises = ForEachMOSA(lambda mosa: + self.ranging_biases[mosa] + noises.ranging(self.physics_fs, self.physics_size, self.ranging_asds[mosa]) + ) + + def invert_timer_deviations(self, timer_deviations, sc): + """Invert timer deviations of a given spacecraft. + + We recursively solve the implicit equation dtau(tau) = dtau_hat(tau - dtau(tau)) until the + convergence criteria (tolerance) is met, or we exceed the maximum number of iterations. + + Args: + timer_deviations: array of timer deviations + sc: spacecraft index + """ + logging.info("Inverting timer deviations for spacecraft %s", sc) + logging.debug("Solving iteratively (tolerance=%s s, maxiter=%s)", + self.clockinv_tolerance, self.clockinv_maxiter) + + niter = 0 + error = 0 + next_inverse = timer_deviations + while not niter or error > self.clockinv_tolerance: + if niter >= self.clockinv_maxiter: + logging.warning("Maximum number of iterations '%s' reached (error=%.2E)", niter, error) + break + logging.debug("Starting iteration #%s", niter) + inverse = next_inverse + next_inverse = self.interpolate(timer_deviations, -inverse * self.physics_fs) + error = numpy.max(numpy.abs(inverse - next_inverse)) + logging.debug("End of iteration %s, with an error of %.2E s", niter, error) + niter += 1 + logging.debug("End of timer deviation inversion after %s iterations with an error of %.2E s", niter, error) + return inverse + + def write_metadata(self, hdf5): + """Set all properties as HDF5 attributes on `object`. + + Args: + hdf5: an HDF5 file, or a dataset + """ + for key, value in self.__dict__.items(): + try: + hdf5.attrs[key] = value + except (TypeError, RuntimeError): + try: + hdf5.attrs[key] = str(value) + except RuntimeError: + logging.warning("Cannot write metadata '%s' on '%s'", key, hdf5) + + def write(self, output='measurements.h5', mode='x', write_all=False): + """Run a simulation. + + Args: + output: path to measurement file + mode: measurement file opening mode + write_all: whether to write all quantities to file + """ + # pylint: disable=too-many-statements + if not self.simulated: + self.simulate(keep_all=write_all) + + logging.info("Writing metadata and physics time dataset to '%s'", output) + hdf5 = h5py.File(output, mode) + self.write_metadata(hdf5) + hdf5['physics_t'] = self.physics_t + + if write_all: + logging.info("Writing laser noise to '%s'", output) + self.laser_noises.write(hdf5, 'laser_noises') + + logging.info("Writing clock noise to '%s'", output) + self.clock_noise_offsets.write(hdf5, 'clock_noise_offsets') + self.clock_noise_fluctuations.write(hdf5, 'clock_noise_fluctuations') + + logging.info("Writing modulation noise to '%s'", output) + self.modulation_noises.write(hdf5, 'modulation_noises') + + logging.info("Writing local beams to '%s'", output) + self.local_carrier_offsets.write(hdf5, 'local_carrier_offsets') + self.local_carrier_fluctuations.write(hdf5, 'local_carrier_fluctuations') + self.local_usb_offsets.write(hdf5, 'local_usb_offsets') + self.local_usb_fluctuations.write(hdf5, 'local_usb_fluctuations') + self.local_timer_deviations.write(hdf5, 'local_timer_deviations') + + logging.info("Writing proper pseudo-ranges to '%s'", output) + self.pprs.write(hdf5, 'pprs') + + logging.info("Writing proper pseudo-range derivatives to '%s'", output) + self.d_pprs.write(hdf5, 'd_pprs') + + logging.info("Writing propagated distant beams to '%s'", output) + self.distant_carrier_offsets.write(hdf5, 'distant_carrier_offsets') + self.distant_carrier_fluctuations.write(hdf5, 'distant_carrier_fluctuations') + self.distant_usb_offsets.write(hdf5, 'distant_usb_offsets') + self.distant_usb_fluctuations.write(hdf5, 'distant_usb_fluctuations') + self.distant_timer_deviations.write(hdf5, 'distant_timer_deviations') + + logging.info("Writing backlink noise to '%s'", output) + self.backlink_noises.write(hdf5, 'backlink_noises') + + logging.info("Writing propagated adjacent beams to '%s'", output) + self.adjacent_carrier_offsets.write(hdf5, 'adjacent_carrier_offsets') + self.adjacent_carrier_fluctuations.write(hdf5, 'adjacent_carrier_fluctuations') + self.adjacent_usb_offsets.write(hdf5, 'adjacent_usb_offsets') + self.adjacent_usb_fluctuations.write(hdf5, 'adjacent_usb_fluctuations') + + logging.info("Writing local beams at inter-spacecraft interferometer to '%s'", output) + self.local_isc_carrier_offsets.write(hdf5, 'local_isc_carrier_offsets') + self.local_isc_carrier_fluctuations.write(hdf5, 'local_isc_carrier_fluctuations') + self.local_isc_usb_offsets.write(hdf5, 'local_isc_usb_offsets') + self.local_isc_usb_fluctuations.write(hdf5, 'local_isc_usb_fluctuations') + + logging.info("Writing distant beams at inter-spacecraft interferometer to '%s'", output) + self.distant_isc_carrier_offsets.write(hdf5, 'distant_isc_carrier_offsets') + self.distant_isc_carrier_fluctuations.write(hdf5, 'distant_isc_carrier_fluctuations') + self.distant_isc_usb_offsets.write(hdf5, 'distant_isc_usb_offsets') + self.distant_isc_usb_fluctuations.write(hdf5, 'distant_isc_usb_fluctuations') + + logging.info("Writing inter-spacecraft beatnotes on TPS to '%s'", output) + self.tps_isc_carrier_offsets.write(hdf5, 'tps_isc_carrier_offsets') + self.tps_isc_carrier_fluctuations.write(hdf5, 'tps_isc_carrier_fluctuations') + self.tps_isc_usb_offsets.write(hdf5, 'tps_isc_usb_offsets') + self.tps_isc_usb_fluctuations.write(hdf5, 'tps_isc_usb_fluctuations') + + logging.info("Writing ranging noise to '%s'", output) + self.ranging_noises.write(hdf5, 'ranging_noises') + + logging.info("Writing measured pseudo-ranges on TPS to '%s'", output) + self.tps_mprs.write(hdf5, 'tps_mprs') + + logging.info("Writing test-mass acceleration noise to '%s'", output) + self.testmass_noises.write(hdf5, 'tm_noises') + + logging.info("Writing local beams at test-mass interferometer to '%s'", output) + self.local_tm_carrier_offsets.write(hdf5, 'local_tm_carrier_offsets') + self.local_tm_carrier_fluctuations.write(hdf5, 'local_tm_carrier_fluctuations') + self.local_tm_usb_offsets.write(hdf5, 'local_tm_usb_offsets') + self.local_tm_usb_fluctuations.write(hdf5, 'local_tm_usb_fluctuations') + + logging.info("Writing adjacent beams at test-mass interferometer to '%s'", output) + self.adjacent_tm_carrier_offsets.write(hdf5, 'adjacent_tm_carrier_offsets') + self.adjacent_tm_carrier_fluctuations.write(hdf5, 'adjacent_tm_carrier_fluctuations') + self.adjacent_tm_usb_offsets.write(hdf5, 'adjacent_tm_usb_offsets') + self.adjacent_tm_usb_fluctuations.write(hdf5, 'adjacent_tm_usb_fluctuations') + + logging.info("Writing test-mass beatnotes on TPS to '%s'", output) + self.tps_tm_carrier_offsets.write(hdf5, 'tps_tm_carrier_offsets') + self.tps_tm_carrier_fluctuations.write(hdf5, 'tps_tm_carrier_fluctuations') + self.tps_tm_usb_offsets.write(hdf5, 'tps_tm_usb_offsets') + self.tps_tm_usb_fluctuations.write(hdf5, 'tps_tm_usb_fluctuations') + + logging.info("Writing local beams at reference interferometer to '%s'", output) + self.local_ref_carrier_offsets.write(hdf5, 'local_ref_carrier_offsets') + self.local_ref_carrier_fluctuations.write(hdf5, 'local_ref_carrier_fluctuations') + self.local_ref_usb_offsets.write(hdf5, 'local_ref_usb_offsets') + self.local_ref_usb_fluctuations.write(hdf5, 'local_ref_usb_fluctuations') + + logging.info("Writing adjacent beams at reference interferometer to '%s'", output) + self.adjacent_ref_carrier_offsets.write(hdf5, 'adjacent_ref_carrier_offsets') + self.adjacent_ref_carrier_fluctuations.write(hdf5, 'adjacent_ref_carrier_fluctuations') + self.adjacent_ref_usb_offsets.write(hdf5, 'adjacent_ref_usb_offsets') + self.adjacent_ref_usb_fluctuations.write(hdf5, 'adjacent_ref_usb_fluctuations') + + logging.info("Writing reference beatnotes on TPS to '%s'", output) + self.tps_ref_carrier_offsets.write(hdf5, 'tps_ref_carrier_offsets') + self.tps_ref_carrier_fluctuations.write(hdf5, 'tps_ref_carrier_fluctuations') + self.tps_ref_usb_offsets.write(hdf5, 'tps_ref_usb_offsets') + self.tps_ref_usb_fluctuations.write(hdf5, 'tps_ref_usb_fluctuations') + + logging.info("Writing inter-spacecraft beatnotes sampled to THE grid to '%s'", output) + self.the_isc_carrier_offsets.write(hdf5, 'the_isc_carrier_offsets') + self.the_isc_carrier_fluctuations.write(hdf5, 'the_isc_carrier_fluctuations') + self.the_isc_usb_offsets.write(hdf5, 'the_isc_usb_offsets') + self.the_isc_usb_fluctuations.write(hdf5, 'the_isc_usb_fluctuations') + + logging.info("Writing measured pseudo-ranges sampled to THE grid to '%s'", output) + self.the_mprs.write(hdf5, 'the_mprs') + + logging.info("Writing test-mass beatnotes sampled to THE grid to '%s'", output) + self.the_tm_carrier_offsets.write(hdf5, 'the_tm_carrier_offsets') + self.the_tm_carrier_fluctuations.write(hdf5, 'the_tm_carrier_fluctuations') + self.the_tm_usb_offsets.write(hdf5, 'the_tm_usb_offsets') + self.the_tm_usb_fluctuations.write(hdf5, 'the_tm_usb_fluctuations') + + logging.info("Writing reference beatnotes sampled to THE grid to '%s'", output) + self.the_ref_carrier_offsets.write(hdf5, 'the_ref_carrier_offsets') + self.the_ref_carrier_fluctuations.write(hdf5, 'the_ref_carrier_fluctuations') + self.the_ref_usb_offsets.write(hdf5, 'the_ref_usb_offsets') + self.the_ref_usb_fluctuations.write(hdf5, 'the_ref_usb_fluctuations') + + logging.info("Writing filtered inter-spacecraft beatnotes to '%s'", output) + self.filtered_isc_carrier_offsets.write(hdf5, 'filtered_isc_carrier_offsets') + self.filtered_isc_carrier_fluctuations.write(hdf5, 'filtered_isc_carrier_fluctuations') + self.filtered_isc_usb_offsets.write(hdf5, 'filtered_isc_usb_offsets') + self.filtered_isc_usb_fluctuations.write(hdf5, 'filtered_isc_usb_fluctuations') + + logging.info("Writing filtered measured pseudo-ranges to '%s'", output) + self.filtered_mprs.write(hdf5, 'filtered_mprs') + + logging.info("Writing filtered test-mass beatnotes to '%s'", output) + self.filtered_tm_carrier_offsets.write(hdf5, 'filtered_tm_carrier_offsets') + self.filtered_tm_carrier_fluctuations.write(hdf5, 'filtered_tm_carrier_fluctuations') + self.filtered_tm_usb_offsets.write(hdf5, 'filtered_tm_usb_offsets') + self.filtered_tm_usb_fluctuations.write(hdf5, 'filtered_tm_usb_fluctuations') + + logging.info("Writing filtered reference beatnotes to '%s'", output) + self.filtered_ref_carrier_offsets.write(hdf5, 'filtered_ref_carrier_offsets') + self.filtered_ref_carrier_fluctuations.write(hdf5, 'filtered_ref_carrier_fluctuations') + self.filtered_ref_usb_offsets.write(hdf5, 'filtered_ref_usb_offsets') + self.filtered_ref_usb_fluctuations.write(hdf5, 'filtered_ref_usb_fluctuations') + + logging.info("Writing downsampled inter-spacecraft beatnotes to '%s'", output) + self.isc_carrier_offsets.write(hdf5, 'isc_carrier_offsets') + self.isc_carrier_fluctuations.write(hdf5, 'isc_carrier_fluctuations') + self.isc_usb_offsets.write(hdf5, 'isc_usb_offsets') + self.isc_usb_fluctuations.write(hdf5, 'isc_usb_fluctuations') + + logging.info("Writing downsampled measured pseudo-ranges to '%s'", output) + self.mprs.write(hdf5, 'mprs') + + logging.info("Writing downsampled test-mass beatnotes to '%s'", output) + self.tm_carrier_offsets.write(hdf5, 'tm_carrier_offsets') + self.tm_carrier_fluctuations.write(hdf5, 'tm_carrier_fluctuations') + self.tm_usb_offsets.write(hdf5, 'tm_usb_offsets') + self.tm_usb_fluctuations.write(hdf5, 'tm_usb_fluctuations') + + logging.info("Writing downsampled reference beatnotes to '%s'", output) + self.ref_carrier_offsets.write(hdf5, 'ref_carrier_offsets') + self.ref_carrier_fluctuations.write(hdf5, 'ref_carrier_fluctuations') + self.ref_usb_offsets.write(hdf5, 'ref_usb_offsets') + self.ref_usb_fluctuations.write(hdf5, 'ref_usb_fluctuations') + + logging.info("Writing inter-spacecraft beatnote total frequencies to '%s'", output) + self.isc_carriers.write(hdf5, 'isc_carriers') + self.isc_usbs.write(hdf5, 'isc_usbs') + + logging.info("Writing test-mass beatnote total frequencies to '%s'", output) + self.tm_carriers.write(hdf5, 'tm_carriers') + self.tm_usbs.write(hdf5, 'tm_usbs') + + logging.info("Writing reference beatnote total frequencies to '%s'", output) + self.ref_carriers.write(hdf5, 'ref_carriers') + self.ref_usbs.write(hdf5, 'ref_usbs') + + logging.info("Closing output file '%s'", output) + hdf5.close() diff --git a/lisainstrument/lisa.py b/lisainstrument/lisa.py deleted file mode 100644 index d0fd9804a4fd5b45f1b79e8d22b243434c0d3603..0000000000000000000000000000000000000000 --- a/lisainstrument/lisa.py +++ /dev/null @@ -1,1277 +0,0 @@ -#! /usr/bin/env python3 -# -*- coding: utf-8 -*- -# pylint: disable=too-many-lines -""" -LISA Instrument module. - -Authors: - Jean-Baptiste Bayle <j2b.bayle@gmail.com> -""" - -import logging -import h5py -import scipy.interpolate -import numpy - -from . import meta -from . import dsp -from . import noises - - -class Instrument: - """Represents an instrumental simulation.""" - - # Indexing conventions - SCS = ['1', '2', '3'] - MOSAS = ['12', '23', '31', '13', '32', '21'] - - def __init__(self, size=2592000, dt=0.3, t0=0, - # Inter-spacecraft propagation - orbits=None, pprs=None, d_pprs=None, gws=None, interpolation=('lagrange', 31), - # Lasers - laser_asds=28.2, modulation_asds=1E-14, modulation_fknees=1.5E-2, - central_freq=2.816E14, modulation_freqs=None, offsets_freqs=None, three_lasers=False, - # Clocks - clock_asds=6.32E-14, clock_offsets=None, clock_freqoffsets=None, clock_freqlindrifts=None, - clock_freqquaddrifts=None, clockinv_tolerance=1E-10, clockinv_maxiter=5, - # Optical pathlength noises - backlink_asds=3E-12, backlink_fknees=2E-3, testmass_asds=2.4E-15, testmass_fknees=0.4E-3, - # Pseudo-ranging - ranging_biases=0, ranging_asds=3E-9, - # Physics simulation sampling and filtering - physics_upsampling=10, aafilter=('kaiser', 240, 0.3, 1.35)): - """Initialize an instrumental simulation. - - Args: - size: number of samples to generate - dt: sampling period [s] - t0: initial time [s] - orbits: path to orbit file (used for proper pseudo-ranges and derivatives therefore) - pprs: dictionary of proper pseudo-ranges, overrides values from orbit file [s] - d_pprs: dictionary of proper pseudo-range derivatives, overrides values from orbit file [s/s] - gws: dictionary of gravitational-wave responses - interpolation: interpolation function or interpolation method and parameters; when using 'lagrande', - use a tuple ('lagrange', order) with `order` the odd interpolation order; an arbitrary function should - take (x, shift [number of samples]) as parameter - laser_asds: dictionary of amplitude spectral densities for laser noise [Hz/sqrt(Hz)] - modulation_asds: dictionary of amplitude spectral densities for modulation noise [s/sqrt(Hz)] - modulation_fknees: dictionary of cutoff frequencies for modulation noise [Hz] - central_freq: laser central frequency from which all offsets are computed [Hz] - modulation_freqs: dictionary of modulation frequencies [Hz] - offsets_freqs: dictionary of laser frequency offsets [Hz] - three_lasers: whether the only simulate one laser per spacecraft - clock_asds: dictionary of clock noise amplitude spectral densities - clock_offsets: dictionary of clock offsets - clock_freqoffsets: dictionary of clock frequency offsets [s^-1] - clock_freqlindrifts: dictionary of clock frequency linear drifts [s^-2] - clock_freqquaddrifts: dictionary of clock frequency quadratic drifts [s^-2] - clockinv_tolerance: convergence tolerance for timer deviation inversion [s] - clockinv_maxiter: maximum number of iterations for timer deviation inversion - backlink_asds: dictionary of amplitude spectral densities for backlink noise [m/sqrt(Hz)] - backlink_fknees: dictionary of cutoff frequencied for backlink noise [Hz] - testmass_asds: dictionary of amplitude spectral densities for test-mass noise [ms^(-2)/sqrt(Hz)] - testmass_fknees: dictionary of cutoff frequencied for test-mass noise [Hz] - ranging_biases: dictionary of ranging noise bias [s] - ranging_asds: dictionary of ranging noise amplitude spectral densities [s/sqrt(Hz)] - physics_upsampling: ratio of sampling frequencies for physics vs. measurement simulation - aafilter: antialiasing filter function or filter design method; when using 'kaiser', use a tuple - ('kaiser', attenuation [dB], f1 [Hz], f2 [Hz]) with attenuation the attenuation in the stopband, - and f1 < f2 the frequencies defining the transition band; use `None` for no filter - """ - # pylint: disable=too-many-arguments - self.git_url = 'https://gitlab.in2p3.fr/lisa-simulation/instrument' - self.version = meta.__version__ - logging.info("Initializing instrumental simulation (dt=%.2f, t0=%.2f, size=%d)", dt, t0, size) - - # Measurement sampling - self.size = int(size) - self.dt = float(dt) - self.t0 = float(t0) - self.fs = 1 / self.dt - - # Physics sampling - self.physics_upsampling = int(physics_upsampling) - self.physics_size = self.size * self.physics_upsampling - self.physics_dt = self.dt / self.physics_upsampling - self.physics_fs = self.fs * self.physics_upsampling - - # Noise parameters - self.init_noises(laser_asds, modulation_asds, modulation_fknees, central_freq, modulation_freqs, - offsets_freqs, three_lasers, clock_asds, clock_offsets, clock_freqoffsets, clock_freqlindrifts, - clock_freqquaddrifts, backlink_asds, backlink_fknees, testmass_asds, testmass_fknees, - ranging_biases, ranging_asds) - - # Clock-noise inversion - self.clockinv_tolerance = float(clockinv_tolerance) - self.clockinv_maxiter = int(clockinv_maxiter) - - # Interpolation, antialiasing filter and orbits - self.init_interpolation(interpolation) - self.init_aafilter(aafilter) - self.init_orbits(orbits, pprs, d_pprs) - - # Orbits, gravitational waves - self.gws = self.mosa_dict(gws, default=0) - - def init_noises(self, laser_asds, modulation_asds, modulation_fknees, central_freq, modulation_freqs, - offsets_freqs, three_lasers, clock_asds, clock_offsets, clock_freqoffsets, clock_freqlindrifts, - clock_freqquaddrifts, backlink_asds, backlink_fknees, testmass_asds, testmass_fknees, - ranging_biases, ranging_asds): - """Initialize noise parameters. - - See docstring of `__init__()` for more details. - """ - # pylint: disable=too-many-arguments,too-many-locals - self.central_freq = float(central_freq) - self.three_lasers = bool(three_lasers) - - self.laser_asds = self.mosa_dict(laser_asds) - self.modulation_asds = self.mosa_dict(modulation_asds) - self.modulation_fknees = self.mosa_dict(modulation_fknees) - - self.clock_asds = self.sc_dict(clock_asds) - self.clock_offsets = self.sc_dict(clock_offsets, default=0) - self.clock_freqoffsets = self.sc_dict(clock_freqoffsets, default={ - # Default based on LISANode - '1': 5E-8, '2': 6.25E-7, '3': -3.75E-7 - }) - self.clock_freqlindrifts = self.sc_dict(clock_freqlindrifts, default={ - # Default based on LISANode - '1': 1.6E-15, '2': 2E-14, '3': -1.2E-14 - }) - self.clock_freqquaddrifts = self.sc_dict(clock_freqquaddrifts, default={ - # Default based on LISANode - '1': 9E-24, '2': 6.75E-23, '3': -1.125e-22 - }) - - self.ranging_biases = self.mosa_dict(ranging_biases) - self.ranging_asds = self.mosa_dict(ranging_asds) - - self.backlink_asds = self.mosa_dict(backlink_asds) - self.backlink_fknees = self.mosa_dict(backlink_fknees) - self.testmass_asds = self.mosa_dict(testmass_asds) - self.testmass_fknees = self.mosa_dict(testmass_fknees) - - self.modulation_freqs = self.mosa_dict(modulation_freqs, default={ - # Default based on mission baseline of 2.4 MHz and 2.401 MHz for left and right MOSAs respect - '12': 2.4E9, '23': 2.4E9, '31': 2.4E9, - '13': 2.401E9, '32': 2.401E9, '21': 2.401E9, - }) - - self.offsets_freqs = self.mosa_dict(offsets_freqs, default={ - # Default based on default for LISANode - '12': 8.1E6, '23': 9.2E6, '31': 10.3E6, '13': 1.4E6, '32': -11.6E6, '21': -9.5E6, - }) - - def init_interpolation(self, interpolation): - """Initialize interpolation.""" - if interpolation is None: - logging.info("Disabling interpolation") - designed_interpolation = lambda x: x - elif callable(interpolation): - logging.info("Using user-provided interpolation function") - designed_interpolation = interpolation - else: - logging.info("Designing interpolation function") - designed_interpolation = Instrument.design_interpolation(interpolation) - self.interpolate = lambda x, shift: x if numpy.isscalar(x) else designed_interpolation(x, shift) - - def init_aafilter(self, aafilter): - """Initialize antialiasing filter and downsampling.""" - if aafilter is None: - logging.info("Disabling antialiasing filter") - designed_aafilter = lambda x: x - elif callable(aafilter): - logging.info("Using user-provided antialiasing filter function") - designed_aafilter = aafilter - else: - logging.info("Design antialiasing filter (%s)", aafilter) - designed_aafilter = self.design_aafilter(aafilter) - self.aafilter = lambda x: x if numpy.isscalar(x) else designed_aafilter(x) - - self.downsampled = lambda x: x if numpy.isscalar(x) else x[::self.physics_upsampling] - - def init_orbits(self, orbits, pprs, d_pprs): - """Initialize orbits.""" - if orbits is not None: - self.orbits_path = str(orbits) - self.orbits = h5py.File(self.orbits_path, 'r') - self.pprs = self.mosa_dict(pprs, default=None) - self.d_pprs = self.mosa_dict(d_pprs, default=None) - else: - self.orbits_path = None - self.orbits = None - self.pprs = self.mosa_dict(pprs, default={ - # Default PPRs based on first samples of Keplerian orbits (v1.0) - '12': 8.3324, '23': 8.3028, '31': 8.3324, - '13': 8.3315, '32': 8.3044, '21': 8.3315, - }) - self.d_pprs = self.mosa_dict(d_pprs, default={ - # Default PPR derivatives based on first samples of Keplerian orbits (v1.0) - '12': 3.1977E-9, '23': -5.0241E-9, '31': -3.1977E-9, - '13': -3.1977E-9, '32': -5.0252E-9, '21': 3.1977E-9, - }) - - @classmethod - def mosa_dict(cls, value, default=None): - """Make sure that value is turned into a dictionary with the MOSAs as keys. - - Args: - value: a scalar or a dictionary - default: default value, or None - """ - if value is None and default is not None: - value = default - elif value is None and default is None: - return None - if isinstance(value, dict): - return {mosa: float(value[mosa]) for mosa in cls.MOSAS} - return {mosa: value for mosa in cls.MOSAS} - - @classmethod - def sc_dict(cls, value, default=None): - """Make sure that value is turned into a dictionary with the spacecraft as keys. - - Args: - value: a scalar or a dictionary - default: default value, or None - """ - if value is None and default is not None: - value = default - elif value is None and default is None: - return None - if isinstance(value, dict): - return {sc: float(value[sc]) for sc in cls.SCS} - return {sc: float(value) for sc in cls.SCS} - - def disable_all_noises(self, but=None): - """Turn off all instrumental noises. - - Args: - but: optional category of noises to keep on ['laser', 'modulation', 'clock', 'pathlength', 'ranging'] - """ - if but != 'laser': - self.laser_asds = self.mosa_dict(0) - if but != 'modulation': - self.modulation_asds = self.mosa_dict(0) - if but != 'clock': - self.disable_clock_noises() - if but != 'pathlength': - self.disable_pathlength_noises() - if but != 'ranging': - self.disable_ranging_noises() - return self - - def disable_clock_noises(self): - """Turn off all imperfections on clocks. - - This includes offsets, and frequency offsets and deviations. - """ - self.clock_asds = self.sc_dict(0) - self.clock_offsets = self.sc_dict(0) - self.clock_freqoffsets = self.sc_dict(0) - self.clock_freqlindrifts = self.sc_dict(0) - self.clock_freqquaddrifts = self.sc_dict(0) - return self - - def disable_pathlength_noises(self): - """Turn off all optical pathlength noises.""" - self.backlink_asds = self.mosa_dict(0) - self.testmass_asds = self.mosa_dict(0) - return self - - def disable_ranging_noises(self): - """Turn off all pseudo-ranging noises.""" - self.ranging_biases = self.mosa_dict(0) - self.ranging_asds = self.mosa_dict(0) - return self - - def disable_dopplers(self): - """Set proper pseudo-range derivatives to zero to turn off Doppler effects.""" - self.d_pprs = self.mosa_dict(0) - return self - - @classmethod - def distant(cls, mosa): - """Return index of distant MOSA. - - In practive, we invert the indices to swap emitter and receiver. - """ - if mosa not in cls.MOSAS: - raise ValueError(f"invalid MOSA index '{mosa}'") - return f'{mosa[1]}{mosa[0]}' - - @classmethod - def adjacent(cls, mosa): - """Return index of adjacent MOSA. - - In practice, we replace the second index by the only unused spacecraft index. - """ - if mosa not in cls.MOSAS: - raise ValueError(f"invalid MOSA index '{mosa}'") - unused = [sc for sc in cls.SCS if sc not in mosa] - if len(unused) != 1: - raise RuntimeError(f"cannot find adjacent MOSA for '{mosa}'") - return f'{mosa[0]}{unused[0]}' - - @staticmethod - def design_interpolation(parameters): - """Design the interpolation function. - - We currently support Lagrande interpolation. - - Args: - parameters: see `interpolation` docstring in `__init__()` - - Returns: - A function which interpolates data. - """ - method = parameters[0] - if method == 'lagrange': - logging.debug("Using Lagrange interpolations") - order = parameters[1] - logging.debug("Lagrande interpolation order is %s", order) - return lambda x, shift: dsp.timeshift(x, shift, order=order) - - raise ValueError(f"invalid interpolation parameters '{parameters}'") - - def design_aafilter(self, parameters): - """Design the antialiasing filter. - - We currently support finite-impulse response filter designed from a Kaiser window. - The order and beta parameters of the Kaiser window are deduced from the desired attenuation - and transition bandwidth of the filter. - - Args: - parameters: see `aafilter` docstring in `__init__()` - - Returns: - A function that filters data. - """ - method = parameters[0] - nyquist = self.physics_fs / 2 - - if method == 'kaiser': - logging.debug("Designing finite-impulse response filter from Kaiser window") - attenuation, freq1, freq2 = parameters[1], parameters[2], parameters[3] - if attenuation == 0: - logging.debug("Vanishing filter attenuation, disabling filtering") - return lambda x: x - logging.debug("Filter attenuation is %s dB", attenuation) - logging.debug("Filter transition band is [%s Hz, %s Hz]", freq1, freq2) - numtaps, beta = scipy.signal.kaiserord(attenuation, (freq2 - freq1) / nyquist) - logging.debug("Kaiser window has %s taps and beta is %s", numtaps, beta) - taps = scipy.signal.firwin(numtaps, (freq1 + freq2) / (2 * nyquist), window=('kaiser', beta)) - logging.debug("Filter taps are %s", taps) - return lambda x: scipy.signal.lfilter(taps, 1, x) - - raise ValueError(f"invalid filter parameters '{parameters}'") - - def invert_timer_deviations(self, timer_deviations, sc): - """Invert timer deviations of a given spacecraft. - - We recursively solve the implicit equation dtau(tau) = dtau_hat(tau - dtau(tau)) until the - convergence criteria (tolerance) is met, or we exceed the maximum number of iterations. - - Args: - timer_deviations: array of timer deviations - sc: spacecraft index - """ - logging.info("Inverting timer deviations for spacecraft %s", sc) - logging.debug("Solving iteratively (tolerance=%s s, maxiter=%s)", - self.clockinv_tolerance, self.clockinv_maxiter) - - niter = 0 - error = 0 - next_inverse = timer_deviations - while not niter or error > self.clockinv_tolerance: - if niter >= self.clockinv_maxiter: - logging.warning("Maximum number of iterations '%s' reached (error=%.2E)", niter, error) - break - logging.debug("Starting iteration #%s", niter) - inverse = next_inverse - next_inverse = self.interpolate(timer_deviations, -inverse * self.physics_fs) - error = numpy.max(numpy.abs(inverse - next_inverse)) - logging.debug("End of iteration %s, with an error of %.2E s", niter, error) - niter += 1 - logging.debug("End of timer deviation inversion after %s iterations with an error of %.2E s", niter, error) - return inverse - - def write_metadata(self, hdf5): - """Set all properties as HDF5 attributes on `object`. - - Args: - hdf5: an HDF5 file, or a dataset - """ - for key, value in self.__dict__.items(): - try: - hdf5.attrs[key] = value - except (TypeError, RuntimeError): - try: - hdf5.attrs[key] = str(value) - except RuntimeError: - logging.warning("Cannot write metadata '%s' on '%s'", key, hdf5) - - @staticmethod - def write_dset(hdf5, dname, data): - """Write a dataset `dname` on `hdf5` for each MOSA. - - Args: - hdf5: HDF5 file in which to write - dname: dataset name or path - data: dictionary of data to write - """ - # No-op if no data - if not data: - return - # Retreive the maximum size of data - size = 1 - for datum in data.values(): - if numpy.isscalar(datum): - continue - if size != 1 and len(datum) != size: - raise ValueError(f"incompatible sizes in dictionary '{size}' and '{len(datum)}'") - size = max(size, len(datum)) - logging.debug("Writing dataset of size '%s' with '%s' columns", size, len(data)) - # Write dataset - dtype = numpy.dtype({'names': list(data.keys()), 'formats': len(data) * [numpy.float64]}) - hdf5.create_dataset(dname, (size,), dtype=dtype) - for key, datum in data.items(): - hdf5[dname][key] = datum - - def write(self, output='measurements.h5', mode='x', write_all=False): - """Run a simulation. - - Args: - output: path to measurement file - mode: measurement file opening mode - write_all: whether to write all quantities to file - """ - # pylint: disable=too-many-locals,too-many-statements,too-many-branches - - logging.info("Starting simulation") - hdf5 = h5py.File(output, mode) - - ## Physics time vector - - logging.info("Computing physics time vector (size=%s, dt=%s)", self.physics_size, self.physics_dt) - t = self.t0 + numpy.arange(self.physics_size, dtype=numpy.float64) * self.physics_dt - - logging.info("Writing metadata and physics time dataset") - self.write_metadata(hdf5) - hdf5['physics_t'] = t - - ## Laser noise - - logging.info("Generating laser noise time series") - laser_noises = { - mosa: noises.laser(self.physics_fs, self.physics_size, self.laser_asds[mosa]) - for mosa in (self.MOSAS[:3] if self.three_lasers else self.MOSAS) - } - - if self.three_lasers: - for mosa in self.MOSAS[:3]: - laser_noises[self.adjacent(mosa)] = laser_noises[mosa] - - if write_all: - logging.info("Writing laser noise to '%s'", output) - self.write_dset(hdf5, 'laser_noises', laser_noises) - - ## Clock noise - - logging.info("Generating clock noise time series") - clock_noise_offsets = { - sc: self.clock_freqoffsets[sc] + self.clock_freqlindrifts[sc] * t + self.clock_freqquaddrifts[sc] * t**2 - for sc in self.SCS - } - - for sc in self.SCS: - if self.physics_size and numpy.all(clock_noise_offsets[sc] == clock_noise_offsets[sc][0]): - clock_noise_offsets[sc] = clock_noise_offsets[sc][0] - - logging.info("Generating clock noise fluctuations time series") - clock_noise_fluctuations = { - sc: noises.clock(self.physics_fs, self.physics_size, self.clock_asds[sc]) - for sc in self.SCS - } - - if write_all: - logging.info("Writing clock noise to '%s'", output) - self.write_dset(hdf5, 'clock_noise_offsets', clock_noise_offsets) - self.write_dset(hdf5, 'clock_noise_fluctuations', clock_noise_fluctuations) - - ## Modulation noise - - logging.info("Generating modulation noise time series") - modulation_noises = { - mosa: noises.modulation(self.physics_fs, self.physics_size, - self.modulation_asds[mosa], self.modulation_fknees[mosa]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing modulation noise to '%s'", output) - self.write_dset(hdf5, 'modulation_noises', modulation_noises) - - ## Local beams - - local_carrier_offsets = self.offsets_freqs - - logging.info("Computing carrier frequency fluctuations for local beams") - local_carrier_fluctuations = laser_noises - - logging.info("Computing upper sideband frequency offsets for local beams") - local_usb_offsets = { - mosa: self.offsets_freqs[mosa] + self.modulation_freqs[mosa] * (1 + clock_noise_offsets[mosa[0]]) # 2.67 - for mosa in self.MOSAS - } - - logging.info("Computing upper sideband frequency fluctuations for local beams") - local_usb_fluctuations = { - mosa: laser_noises[mosa] \ - + self.modulation_freqs[mosa] * (clock_noise_fluctuations[mosa[0]] + modulation_noises[mosa]) # 2.68 - for mosa in self.MOSAS - } - - logging.info("Computing local timer deviations") - local_timer_deviations = { - sc: self.clock_offsets[sc] + numpy.cumsum( - numpy.broadcast_to(clock_noise_offsets[sc] + clock_noise_fluctuations[sc], \ - (self.physics_size,)) * self.dt) - for sc in self.SCS - } - - if write_all: - logging.info("Writing local beams to '%s'", output) - self.write_dset(hdf5, 'local_carrier_offsets', local_carrier_offsets) - self.write_dset(hdf5, 'local_carrier_fluctuations', local_carrier_fluctuations) - self.write_dset(hdf5, 'local_usb_offsets', local_usb_offsets) - self.write_dset(hdf5, 'local_usb_fluctuations', local_usb_fluctuations) - self.write_dset(hdf5, 'local_timer_deviations', local_timer_deviations) - - ## Proper pseudo-ranges - if self.pprs is None: - - logging.info("Interpolating proper pseudo-ranges from orbit file '%s'", self.orbits_path) - self.pprs = { - mosa: scipy.interpolate.InterpolatedUnivariateSpline( - self.orbits['tcb']['t'][:], self.orbits['tcb'][f'l_{mosa}']['ppr'], k=1, ext='raise')(t) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing proper pseudo-ranges to '%s'", output) - self.write_dset(hdf5, 'pprs', self.pprs) - - ## Proper pseudo-range derivatives - if self.d_pprs is None: - - logging.info("Interpolating proper pseudo-range derivatives from orbit file '%s'", self.orbits_path) - self.d_pprs = { - mosa: scipy.interpolate.InterpolatedUnivariateSpline( - self.orbits['tcb']['t'][:], self.orbits['tcb'][f'l_{mosa}']['d_ppr'], k=1, ext='raise')(t) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing proper pseudo-range derivatives to '%s'", output) - self.write_dset(hdf5, 'd_pprs', self.d_pprs) - - ## Propagation to distant MOSA - - logging.info("Propagating carrier frequency offsets to distant MOSA") - distant_carrier_offsets = { - mosa: (1 - self.d_pprs[mosa]) * \ - self.interpolate(local_carrier_offsets[self.distant(mosa)], -self.pprs[mosa] * self.physics_fs) \ - - self.d_pprs[mosa] * self.central_freq - for mosa in self.MOSAS - } - - logging.info("Propagating carrier frequency fluctuations to distant MOSA") - distant_carrier_fluctuations = { - mosa: self.gws[mosa] + (1 - self.d_pprs[mosa]) * \ - self.interpolate(local_carrier_fluctuations[self.distant(mosa)], -self.pprs[mosa] * self.physics_fs) - for mosa in self.MOSAS - } - - logging.info("Propagating upper sideband frequency offsets to distant MOSA") - distant_usb_offsets = { - mosa: (1 - self.d_pprs[mosa]) * \ - self.interpolate(local_usb_offsets[self.distant(mosa)], -self.pprs[mosa] * self.physics_fs) \ - - self.d_pprs[mosa] * self.central_freq - for mosa in self.MOSAS - } - - logging.info("Propagating upper sideband frequency fluctuations to distant MOSA") - distant_usb_fluctuations = { - mosa: self.gws[mosa] + (1 - self.d_pprs[mosa]) * \ - self.interpolate(local_usb_fluctuations[self.distant(mosa)], -self.pprs[mosa] * self.physics_fs) - for mosa in self.MOSAS - } - - logging.info("Propagating timer deviations to distant MOSA") - distant_timer_deviations = { - mosa: self.interpolate(local_timer_deviations[mosa[1]], -self.pprs[mosa] * self.physics_fs) \ - - self.pprs[mosa] - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing propagated distant beams to '%s'", output) - self.write_dset(hdf5, 'distant_carrier_offsets', distant_carrier_offsets) - self.write_dset(hdf5, 'distant_carrier_fluctuations', distant_carrier_fluctuations) - self.write_dset(hdf5, 'distant_usb_offsets', distant_usb_offsets) - self.write_dset(hdf5, 'distant_usb_fluctuations', distant_usb_fluctuations) - self.write_dset(hdf5, 'distant_timer_deviations', distant_timer_deviations) - - ## Backlink noise - - logging.info("Generating backlink noise time series") - backlink_noises = { - mosa: noises.backlink(self.physics_fs, self.physics_size, - self.backlink_asds[mosa], self.backlink_fknees[mosa]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing backlink noise to '%s'", output) - self.write_dset(hdf5, 'backlink_noises', backlink_noises) - - ## Propagation to adjacent MOSA - - logging.info("Propagating carrier frequency offsets to adjacent MOSA") - adjacent_carrier_offsets = { - mosa: local_carrier_offsets[self.adjacent(mosa)] - for mosa in self.MOSAS - } - - logging.info("Propagating carrier frequency fluctuations to adjacent MOSA") - adjacent_carrier_fluctuations = { - mosa: local_carrier_fluctuations[self.adjacent(mosa)] + self.central_freq * backlink_noises[mosa] - for mosa in self.MOSAS - } - - logging.info("Propagating upper sideband frequency offsets to adjacent MOSA") - adjacent_usb_offsets = { - mosa: local_usb_offsets[self.adjacent(mosa)] - for mosa in self.MOSAS - } - - logging.info("Propagating upper sideband frequency fluctuations to adjacent MOSA") - adjacent_usb_fluctuations = { - mosa: local_usb_fluctuations[self.adjacent(mosa)] + self.central_freq * backlink_noises[mosa] - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing propagated adjacent beams to '%s'", output) - self.write_dset(hdf5, 'adjacent_carrier_offsets', adjacent_carrier_offsets) - self.write_dset(hdf5, 'adjacent_carrier_fluctuations', adjacent_carrier_fluctuations) - self.write_dset(hdf5, 'adjacent_usb_offsets', adjacent_usb_offsets) - self.write_dset(hdf5, 'adjacent_usb_fluctuations', adjacent_usb_fluctuations) - - ## Inter-spacecraft interferometer local beams - - logging.info("Propagating local carrier frequency offsets to inter-spacecraft interferometer") - local_isc_carrier_offsets = local_carrier_offsets - - logging.info("Propagating local carrier frequency fluctuations to inter-spacecraft interferometer") - local_isc_carrier_fluctuations = local_carrier_fluctuations - - logging.info("Propagating local upper sideband frequency offsets to inter-spacecraft interferometer") - local_isc_usb_offsets = local_usb_offsets - - logging.info("Propagating local upper sideband frequency fluctuations to inter-spacecraft interferometer") - local_isc_usb_fluctuations = local_usb_fluctuations - - if write_all: - logging.info("Writing local beams at inter-spacecraft interferometer to '%s'", output) - self.write_dset(hdf5, 'local_isc_carrier_offsets', local_isc_carrier_offsets) - self.write_dset(hdf5, 'local_isc_carrier_fluctuations', local_isc_carrier_fluctuations) - self.write_dset(hdf5, 'local_isc_usb_offsets', local_isc_usb_offsets) - self.write_dset(hdf5, 'local_isc_usb_fluctuations', local_isc_usb_fluctuations) - - ## Inter-spacecraft interferometer distant beams - - logging.info("Propagating distant carrier frequency offsets to inter-spacecraft interferometer") - distant_isc_carrier_offsets = distant_carrier_offsets - - logging.info("Propagating distant carrier frequency fluctuations to inter-spacecraft interferometer") - distant_isc_carrier_fluctuations = distant_carrier_fluctuations - - logging.info("Propagating distant upper sideband frequency offsets to inter-spacecraft interferometer") - distant_isc_usb_offsets = distant_usb_offsets - - logging.info("Propagating distant upper sideband frequency fluctuations to inter-spacecraft interferometer") - distant_isc_usb_fluctuations = distant_usb_fluctuations - - if write_all: - logging.info("Writing distant beams at inter-spacecraft interferometer to '%s'", output) - self.write_dset(hdf5, 'distant_isc_carrier_offsets', distant_isc_carrier_offsets) - self.write_dset(hdf5, 'distant_isc_carrier_fluctuations', distant_isc_carrier_fluctuations) - self.write_dset(hdf5, 'distant_isc_usb_offsets', distant_isc_usb_offsets) - self.write_dset(hdf5, 'distant_isc_usb_fluctuations', distant_isc_usb_fluctuations) - - ## Inter-spacecraft interferometer beatnotes on TPS (high-frequency) - - logging.info("Computing inter-spacecraft carrier beatnote frequency offsets on TPS") - tps_isc_carrier_offsets = { - mosa: distant_isc_carrier_offsets[mosa] - local_isc_carrier_offsets[mosa] - for mosa in self.MOSAS - } - - logging.info("Computing inter-spacecraft carrier beatnote frequency fluctuations on TPS") - tps_isc_carrier_fluctuations = { - mosa: distant_isc_carrier_fluctuations[mosa] - local_isc_carrier_fluctuations[mosa] - for mosa in self.MOSAS - } - - logging.info("Computing inter-spacecraft upper sideband beatnote frequency offsets on TPS") - tps_isc_usb_offsets = { - mosa: distant_isc_usb_offsets[mosa] - local_isc_usb_offsets[mosa] - for mosa in self.MOSAS - } - - logging.info("Computing inter-spacecraft upper sideband beatnote frequency fluctuations on TPS") - tps_isc_usb_fluctuations = { - mosa: distant_isc_usb_fluctuations[mosa] - local_isc_usb_fluctuations[mosa] - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing inter-spacecraft beatnotes on TPS to '%s'", output) - self.write_dset(hdf5, 'tps_isc_carrier_offsets', tps_isc_carrier_offsets) - self.write_dset(hdf5, 'tps_isc_carrier_fluctuations', tps_isc_carrier_fluctuations) - self.write_dset(hdf5, 'tps_isc_usb_offsets', tps_isc_usb_offsets) - self.write_dset(hdf5, 'tps_isc_usb_fluctuations', tps_isc_usb_fluctuations) - - ## Ranging noise - - logging.info("Generating ranging noise time series") - ranging_noises = { - mosa: self.ranging_biases[mosa] \ - + noises.ranging(self.physics_fs, self.physics_size, self.ranging_asds[mosa]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing ranging noise to '%s'", output) - self.write_dset(hdf5, 'ranging_noises', ranging_noises) - - ## Measured pseudo-ranging on TPS grid (high-frequency) - - logging.info("Computing measured pseudo-ranges on TPS") - tps_mprs = { - mosa: local_timer_deviations[mosa[0]] - distant_timer_deviations[mosa] + ranging_noises[mosa] - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing measured pseudo-ranges on TPS to '%s'", output) - self.write_dset(hdf5, 'tps_mprs', tps_mprs) - - ## Test-mass acceleration noise - - logging.info("Generating test-mass acceleration noise time series") - testmass_noises = { - mosa: noises.testmass(self.physics_fs, self.physics_size, - self.testmass_asds[mosa], self.testmass_fknees[mosa]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing test-mass acceleration noise to '%s'", output) - self.write_dset(hdf5, 'tm_noises', testmass_noises) - - ## Test-mass interferometer local beams - - logging.info("Propagating local carrier frequency offsets to test-mass interferometer") - local_tm_carrier_offsets = local_carrier_offsets - - logging.info("Propagating local carrier frequency fluctuations to test-mass interferometer") - local_tm_carrier_fluctuations = { - mosa: local_carrier_fluctuations[mosa] + self.central_freq * testmass_noises[mosa] - for mosa in self.MOSAS - } - - logging.info("Propagating local upper sideband frequency offsets to test-mass interferometer") - local_tm_usb_offsets = local_usb_offsets - - logging.info("Propagating local upper sideband frequency fluctuations to test-mass interferometer") - local_tm_usb_fluctuations = { - mosa: local_usb_fluctuations[mosa] + self.central_freq * testmass_noises[mosa] - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing local beams at test-mass interferometer to '%s'", output) - self.write_dset(hdf5, 'local_tm_carrier_offsets', local_tm_carrier_offsets) - self.write_dset(hdf5, 'local_tm_carrier_fluctuations', local_tm_carrier_fluctuations) - self.write_dset(hdf5, 'local_tm_usb_offsets', local_tm_usb_offsets) - self.write_dset(hdf5, 'local_tm_usb_fluctuations', local_tm_usb_fluctuations) - - ## Test-mass interferometer adjacent beams - - logging.info("Propagating adjacent carrier frequency offsets to test-mass interferometer") - adjacent_tm_carrier_offsets = adjacent_carrier_offsets - - logging.info("Propagating adjacent carrier frequency fluctuations to test-mass interferometer") - adjacent_tm_carrier_fluctuations = adjacent_carrier_fluctuations - - logging.info("Propagating adjacent upper sideband frequency offsets to test-mass interferometer") - adjacent_tm_usb_offsets = adjacent_usb_offsets - - logging.info("Propagating adjacent upper sideband frequency fluctuations to test-mass interferometer") - adjacent_tm_usb_fluctuations = adjacent_usb_fluctuations - - if write_all: - logging.info("Writing adjacent beams at test-mass interferometer to '%s'", output) - self.write_dset(hdf5, 'adjacent_tm_carrier_offsets', adjacent_tm_carrier_offsets) - self.write_dset(hdf5, 'adjacent_tm_carrier_fluctuations', adjacent_tm_carrier_fluctuations) - self.write_dset(hdf5, 'adjacent_tm_usb_offsets', adjacent_tm_usb_offsets) - self.write_dset(hdf5, 'adjacent_tm_usb_fluctuations', adjacent_tm_usb_fluctuations) - - ## Test-mass interferometer beatnotes on TPS (high-frequency) - - logging.info("Computing test-mass carrier beatnote frequency offsets on TPS") - tps_tm_carrier_offsets = { - mosa: adjacent_tm_carrier_offsets[mosa] - local_tm_carrier_offsets[mosa] - for mosa in self.MOSAS - } - - logging.info("Computing test-mass carrier beatnote frequency fluctuations on TPS") - tps_tm_carrier_fluctuations = { - mosa: adjacent_tm_carrier_fluctuations[mosa] - local_tm_carrier_fluctuations[mosa] - for mosa in self.MOSAS - } - - logging.info("Computing test-mass upper sideband beatnote frequency offsets on TPS") - tps_tm_usb_offsets = { - mosa: adjacent_tm_usb_offsets[mosa] - local_tm_usb_offsets[mosa] - for mosa in self.MOSAS - } - - logging.info("Computing test-mass upper sideband beatnote frequency fluctuations on TPS") - tps_tm_usb_fluctuations = { - mosa: adjacent_tm_usb_fluctuations[mosa] - local_tm_usb_fluctuations[mosa] - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing test-mass beatnotes on TPS to '%s'", output) - self.write_dset(hdf5, 'tps_tm_carrier_offsets', tps_tm_carrier_offsets) - self.write_dset(hdf5, 'tps_tm_carrier_fluctuations', tps_tm_carrier_fluctuations) - self.write_dset(hdf5, 'tps_tm_usb_offsets', tps_tm_usb_offsets) - self.write_dset(hdf5, 'tps_tm_usb_fluctuations', tps_tm_usb_fluctuations) - - ## Reference interferometer local beams - - logging.info("Propagating local carrier frequency offsets to reference interferometer") - local_ref_carrier_offsets = local_carrier_offsets - - logging.info("Propagating local carrier frequency fluctuations to reference interferometer") - local_ref_carrier_fluctuations = local_carrier_fluctuations - - logging.info("Propagating local upper sideband frequency offsets to reference interferometer") - local_ref_usb_offsets = local_usb_offsets - - logging.info("Propagating local upper sideband frequency fluctuations to reference interferometer") - local_ref_usb_fluctuations = local_usb_fluctuations - - if write_all: - logging.info("Writing local beams at reference interferometer to '%s'", output) - self.write_dset(hdf5, 'local_ref_carrier_offsets', local_ref_carrier_offsets) - self.write_dset(hdf5, 'local_ref_carrier_fluctuations', local_ref_carrier_fluctuations) - self.write_dset(hdf5, 'local_ref_usb_offsets', local_ref_usb_offsets) - self.write_dset(hdf5, 'local_ref_usb_fluctuations', local_ref_usb_fluctuations) - - ## Reference interferometer adjacent beams - - logging.info("Propagating adjacent carrier frequency offsets to reference interferometer") - adjacent_ref_carrier_offsets = adjacent_carrier_offsets - - logging.info("Propagating adjacent carrier frequency fluctuations to reference interferometer") - adjacent_ref_carrier_fluctuations = adjacent_carrier_fluctuations - - logging.info("Propagating adjacent upper sideband frequency offsets to reference interferometer") - adjacent_ref_usb_offsets = adjacent_usb_offsets - - logging.info("Propagating adjacent upper sideband frequency fluctuations to reference interferometer") - adjacent_ref_usb_fluctuations = adjacent_usb_fluctuations - - if write_all: - logging.info("Writing adjacent beams at reference interferometer to '%s'", output) - self.write_dset(hdf5, 'adjacent_ref_carrier_offsets', adjacent_ref_carrier_offsets) - self.write_dset(hdf5, 'adjacent_ref_carrier_fluctuations', adjacent_ref_carrier_fluctuations) - self.write_dset(hdf5, 'adjacent_ref_usb_offsets', adjacent_ref_usb_offsets) - self.write_dset(hdf5, 'adjacent_ref_usb_fluctuations', adjacent_ref_usb_fluctuations) - - ## Reference interferometer beatnotes on TPS (high-frequency) - - logging.info("Computing reference carrier beatnote frequency offsets on TPS") - tps_ref_carrier_offsets = { - mosa: adjacent_ref_carrier_offsets[mosa] - local_ref_carrier_offsets[mosa] - for mosa in self.MOSAS - } - - logging.info("Computing reference carrier beatnote frequency fluctuations on TPS") - tps_ref_carrier_fluctuations = { - mosa: adjacent_ref_carrier_fluctuations[mosa] - local_ref_carrier_fluctuations[mosa] - for mosa in self.MOSAS - } - - logging.info("Computing reference upper sideband beatnote frequency offsets on TPS") - tps_ref_usb_offsets = { - mosa: adjacent_ref_usb_offsets[mosa] - local_ref_usb_offsets[mosa] - for mosa in self.MOSAS - } - - logging.info("Computing reference upper sideband beatnote frequency fluctuations on TPS") - tps_ref_usb_fluctuations = { - mosa: adjacent_ref_usb_fluctuations[mosa] - local_ref_usb_fluctuations[mosa] - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing reference beatnotes on TPS to '%s'", output) - self.write_dset(hdf5, 'tps_ref_carrier_offsets', tps_ref_carrier_offsets) - self.write_dset(hdf5, 'tps_ref_carrier_fluctuations', tps_ref_carrier_fluctuations) - self.write_dset(hdf5, 'tps_ref_usb_offsets', tps_ref_usb_offsets) - self.write_dset(hdf5, 'tps_ref_usb_fluctuations', tps_ref_usb_fluctuations) - - ## Sampling beatnotes and measured pseudo-ranges to THE grid - - inverse_timer_deviations = { - sc: self.invert_timer_deviations(local_timer_deviations[sc], sc) - for sc in self.SCS - } - - timestamp = lambda x, sc: self.interpolate(x, -inverse_timer_deviations[sc] * self.physics_fs) - - logging.info("Sampling inter-spacecraft carrier beatnote frequency offsets to THE grid") - the_isc_carrier_offsets = { - mosa: timestamp(tps_isc_carrier_offsets[mosa] / (1 + clock_noise_offsets[mosa[0]]), mosa[0]) - for mosa in self.MOSAS - } - logging.info("Sampling inter-spacecraft carrier beatnote frequency fluctuations to THE grid") - the_isc_carrier_fluctuations = { - mosa: timestamp(tps_isc_carrier_fluctuations[mosa] / (1 + clock_noise_offsets[mosa[0]]) - - tps_isc_carrier_offsets[mosa] * clock_noise_fluctuations[mosa[0]] - / (1 + clock_noise_offsets[mosa[0]])**2, mosa[0]) - for mosa in self.MOSAS - } - - logging.info("Sampling inter-spacecraft upper sideband beatnote frequency offsets to THE grid") - the_isc_usb_offsets = { - mosa: timestamp(tps_isc_usb_offsets[mosa] / (1 + clock_noise_offsets[mosa[0]]), mosa[0]) - for mosa in self.MOSAS - } - - logging.info("Sampling inter-spacecraft upper sideband beatnote frequency fluctuations to THE grid") - the_isc_usb_fluctuations = { - mosa: timestamp(tps_isc_usb_fluctuations[mosa] / (1 + clock_noise_offsets[mosa[0]]) - - tps_isc_usb_offsets[mosa] * clock_noise_fluctuations[mosa[0]] - / (1 + clock_noise_offsets[mosa[0]])**2, mosa[0]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing inter-spacecraft beatnotes sampled to THE grid to '%s'", output) - self.write_dset(hdf5, 'the_isc_carrier_offsets', the_isc_carrier_offsets) - self.write_dset(hdf5, 'the_isc_carrier_fluctuations', the_isc_carrier_fluctuations) - self.write_dset(hdf5, 'the_isc_usb_offsets', the_isc_usb_offsets) - self.write_dset(hdf5, 'the_isc_usb_fluctuations', the_isc_usb_fluctuations) - - logging.info("Sampling measured pseudo-ranges to THE grid") - the_mprs = { - mosa: timestamp(tps_mprs[mosa], mosa[0]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing measured pseudo-ranges sampled to THE grid to '%s'", output) - self.write_dset(hdf5, 'the_mprs', the_mprs) - - logging.info("Sampling test-mass beatnotes to THE grid") - the_tm_carrier_offsets = { - mosa: timestamp(tps_tm_carrier_offsets[mosa] / (1 + clock_noise_offsets[mosa[0]]), mosa[0]) - for mosa in self.MOSAS - } - the_tm_carrier_fluctuations = { - mosa: timestamp(tps_tm_carrier_fluctuations[mosa] / (1 + clock_noise_offsets[mosa[0]]) - - tps_tm_carrier_offsets[mosa] * clock_noise_fluctuations[mosa[0]] - / (1 + clock_noise_offsets[mosa[0]])**2, mosa[0]) - for mosa in self.MOSAS - } - the_tm_usb_offsets = { - mosa: timestamp(tps_tm_usb_offsets[mosa] / (1 + clock_noise_offsets[mosa[0]]), mosa[0]) - for mosa in self.MOSAS - } - the_tm_usb_fluctuations = { - mosa: timestamp(tps_tm_usb_fluctuations[mosa] / (1 + clock_noise_offsets[mosa[0]]) - - tps_tm_usb_offsets[mosa] * clock_noise_fluctuations[mosa[0]] - / (1 + clock_noise_offsets[mosa[0]])**2, mosa[0]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing test-mass beatnotes sampled to THE grid to '%s'", output) - self.write_dset(hdf5, 'the_tm_carrier_offsets', the_tm_carrier_offsets) - self.write_dset(hdf5, 'the_tm_carrier_fluctuations', the_tm_carrier_fluctuations) - self.write_dset(hdf5, 'the_tm_usb_offsets', the_tm_usb_offsets) - self.write_dset(hdf5, 'the_tm_usb_fluctuations', the_tm_usb_fluctuations) - - logging.info("Sampling reference beatnotes to THE grid") - the_ref_carrier_offsets = { - mosa: timestamp(tps_ref_carrier_offsets[mosa] / (1 + clock_noise_offsets[mosa[0]]), mosa[0]) - for mosa in self.MOSAS - } - the_ref_carrier_fluctuations = { - mosa: timestamp(tps_ref_carrier_fluctuations[mosa] / (1 + clock_noise_offsets[mosa[0]]) - - tps_ref_carrier_offsets[mosa] * clock_noise_fluctuations[mosa[0]] - / (1 + clock_noise_offsets[mosa[0]])**2, mosa[0]) - for mosa in self.MOSAS - } - the_ref_usb_offsets = { - mosa: timestamp(tps_ref_usb_offsets[mosa] / (1 + clock_noise_offsets[mosa[0]]), mosa[0]) - for mosa in self.MOSAS - } - the_ref_usb_fluctuations = { - mosa: timestamp(tps_ref_usb_fluctuations[mosa] / (1 + clock_noise_offsets[mosa[0]]) - - tps_ref_usb_offsets[mosa] * clock_noise_fluctuations[mosa[0]] - / (1 + clock_noise_offsets[mosa[0]])**2, mosa[0]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing reference beatnotes sampled to THE grid to '%s'", output) - self.write_dset(hdf5, 'the_ref_carrier_offsets', the_ref_carrier_offsets) - self.write_dset(hdf5, 'the_ref_carrier_fluctuations', the_ref_carrier_fluctuations) - self.write_dset(hdf5, 'the_ref_usb_offsets', the_ref_usb_offsets) - self.write_dset(hdf5, 'the_ref_usb_fluctuations', the_ref_usb_fluctuations) - - ## Measurement time vector - - logging.info("Computing measurement time vector (size=%s, dt=%s)", self.size, self.dt) - logging.info("Writing measurement time dataset to '%s'", output) - hdf5['t'] = t[::self.physics_upsampling] - - ## Antialiasing filtering - - logging.info("Filtering inter-spacecraft beatnotes") - filtered_isc_carrier_offsets = { - mosa: self.aafilter(the_isc_carrier_offsets[mosa]) - for mosa in self.MOSAS - } - filtered_isc_carrier_fluctuations = { - mosa: self.aafilter(the_isc_carrier_fluctuations[mosa]) - for mosa in self.MOSAS - } - filtered_isc_usb_offsets = { - mosa: self.aafilter(the_isc_usb_offsets[mosa]) - for mosa in self.MOSAS - } - filtered_isc_usb_fluctuations = { - mosa: self.aafilter(the_isc_usb_fluctuations[mosa]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing filtered inter-spacecraft beatnotes to '%s'", output) - self.write_dset(hdf5, 'filtered_isc_carrier_offsets', filtered_isc_carrier_offsets) - self.write_dset(hdf5, 'filtered_isc_carrier_fluctuations', filtered_isc_carrier_fluctuations) - self.write_dset(hdf5, 'filtered_isc_usb_offsets', filtered_isc_usb_offsets) - self.write_dset(hdf5, 'filtered_isc_usb_fluctuations', filtered_isc_usb_fluctuations) - - logging.info("Filtering measured pseudo-ranges") - filtered_mprs = { - mosa: self.aafilter(the_mprs[mosa]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing filtered measured pseudo-ranges to '%s'", output) - self.write_dset(hdf5, 'filtered_mprs', filtered_mprs) - - logging.info("Filtering test-mass beatnotes") - filtered_tm_carrier_offsets = { - mosa: self.aafilter(the_tm_carrier_offsets[mosa]) - for mosa in self.MOSAS - } - filtered_tm_carrier_fluctuations = { - mosa: self.aafilter(the_tm_carrier_fluctuations[mosa]) - for mosa in self.MOSAS - } - filtered_tm_usb_offsets = { - mosa: self.aafilter(the_tm_usb_offsets[mosa]) - for mosa in self.MOSAS - } - filtered_tm_usb_fluctuations = { - mosa: self.aafilter(the_tm_usb_fluctuations[mosa]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing filtered test-mass beatnotes to '%s'", output) - self.write_dset(hdf5, 'filtered_tm_carrier_offsets', filtered_tm_carrier_offsets) - self.write_dset(hdf5, 'filtered_tm_carrier_fluctuations', filtered_tm_carrier_fluctuations) - self.write_dset(hdf5, 'filtered_tm_usb_offsets', filtered_tm_usb_offsets) - self.write_dset(hdf5, 'filtered_tm_usb_fluctuations', filtered_tm_usb_fluctuations) - - logging.info("Filtering reference beatnotes") - filtered_ref_carrier_offsets = { - mosa: self.aafilter(the_ref_carrier_offsets[mosa]) - for mosa in self.MOSAS - } - filtered_ref_carrier_fluctuations = { - mosa: self.aafilter(the_ref_carrier_fluctuations[mosa]) - for mosa in self.MOSAS - } - filtered_ref_usb_offsets = { - mosa: self.aafilter(the_ref_usb_offsets[mosa]) - for mosa in self.MOSAS - } - filtered_ref_usb_fluctuations = { - mosa: self.aafilter(the_ref_usb_fluctuations[mosa]) - for mosa in self.MOSAS - } - - if write_all: - logging.info("Writing filtered reference beatnotes to '%s'", output) - self.write_dset(hdf5, 'filtered_ref_carrier_offsets', filtered_ref_carrier_offsets) - self.write_dset(hdf5, 'filtered_ref_carrier_fluctuations', filtered_ref_carrier_fluctuations) - self.write_dset(hdf5, 'filtered_ref_usb_offsets', filtered_ref_usb_offsets) - self.write_dset(hdf5, 'filtered_ref_usb_fluctuations', filtered_ref_usb_fluctuations) - - ## Downsampling filtering - - logging.info("Downsampling inter-spacecraft beatnotes") - isc_carrier_offsets = { - mosa: self.downsampled(filtered_isc_carrier_offsets[mosa]) - for mosa in self.MOSAS - } - isc_carrier_fluctuations = { - mosa: self.downsampled(filtered_isc_carrier_fluctuations[mosa]) - for mosa in self.MOSAS - } - isc_usb_offsets = { - mosa: self.downsampled(filtered_isc_usb_offsets[mosa]) - for mosa in self.MOSAS - } - isc_usb_fluctuations = { - mosa: self.downsampled(filtered_isc_usb_fluctuations[mosa]) - for mosa in self.MOSAS - } - - logging.info("Writing downsampled inter-spacecraft beatnotes to '%s'", output) - self.write_dset(hdf5, 'isc_carrier_offsets', isc_carrier_offsets) - self.write_dset(hdf5, 'isc_carrier_fluctuations', isc_carrier_fluctuations) - self.write_dset(hdf5, 'isc_usb_offsets', isc_usb_offsets) - self.write_dset(hdf5, 'isc_usb_fluctuations', isc_usb_fluctuations) - - logging.info("Filtering and downsampling measured pseudo-ranges") - mprs = { - mosa: self.downsampled(filtered_mprs[mosa]) - for mosa in self.MOSAS - } - - logging.info("Writing downsampled measured pseudo-ranges to '%s'", output) - self.write_dset(hdf5, 'mprs', mprs) - - logging.info("Filtering and downsampling test-mass beatnotes") - tm_carrier_offsets = { - mosa: self.downsampled(filtered_tm_carrier_offsets[mosa]) - for mosa in self.MOSAS - } - tm_carrier_fluctuations = { - mosa: self.downsampled(filtered_tm_carrier_fluctuations[mosa]) - for mosa in self.MOSAS - } - tm_usb_offsets = { - mosa: self.downsampled(filtered_tm_usb_offsets[mosa]) - for mosa in self.MOSAS - } - tm_usb_fluctuations = { - mosa: self.downsampled(filtered_tm_usb_fluctuations[mosa]) - for mosa in self.MOSAS - } - - logging.info("Writing downsampled test-mass beatnotes to '%s'", output) - self.write_dset(hdf5, 'tm_carrier_offsets', tm_carrier_offsets) - self.write_dset(hdf5, 'tm_carrier_fluctuations', tm_carrier_fluctuations) - self.write_dset(hdf5, 'tm_usb_offsets', tm_usb_offsets) - self.write_dset(hdf5, 'tm_usb_fluctuations', tm_usb_fluctuations) - - logging.info("Filtering and downsampling reference beatnotes") - ref_carrier_offsets = { - mosa: self.downsampled(filtered_ref_carrier_offsets[mosa]) - for mosa in self.MOSAS - } - ref_carrier_fluctuations = { - mosa: self.downsampled(filtered_ref_carrier_fluctuations[mosa]) - for mosa in self.MOSAS - } - ref_usb_offsets = { - mosa: self.downsampled(filtered_ref_usb_offsets[mosa]) - for mosa in self.MOSAS - } - ref_usb_fluctuations = { - mosa: self.downsampled(filtered_ref_usb_fluctuations[mosa]) - for mosa in self.MOSAS - } - - logging.info("Writing downsampled reference beatnotes to '%s'", output) - self.write_dset(hdf5, 'ref_carrier_offsets', ref_carrier_offsets) - self.write_dset(hdf5, 'ref_carrier_fluctuations', ref_carrier_fluctuations) - self.write_dset(hdf5, 'ref_usb_offsets', ref_usb_offsets) - self.write_dset(hdf5, 'ref_usb_fluctuations', ref_usb_fluctuations) - - ## Total frequencies - - logging.info("Computing inter-spacecraft beatnote total frequencies") - isc_carriers = { - mosa: isc_carrier_offsets[mosa] + isc_carrier_fluctuations[mosa] - for mosa in self.MOSAS - } - isc_usbs = { - mosa: isc_usb_offsets[mosa] + isc_usb_fluctuations[mosa] - for mosa in self.MOSAS - } - - logging.info("Writing inter-spacecraft beatnote total frequencies to '%s'", output) - self.write_dset(hdf5, 'isc_carriers', isc_carriers) - self.write_dset(hdf5, 'isc_usbs', isc_usbs) - - logging.info("Computing test-mass beatnote total frequencies") - tm_carriers = { - mosa: tm_carrier_offsets[mosa] + tm_carrier_fluctuations[mosa] - for mosa in self.MOSAS - } - tm_usbs = { - mosa: tm_usb_offsets[mosa] + tm_usb_fluctuations[mosa] - for mosa in self.MOSAS - } - - logging.info("Writing test-mass beatnote total frequencies to '%s'", output) - self.write_dset(hdf5, 'tm_carriers', tm_carriers) - self.write_dset(hdf5, 'tm_usbs', tm_usbs) - - logging.info("Computing reference beatnote total frequencies") - ref_carriers = { - mosa: ref_carrier_offsets[mosa] + ref_carrier_fluctuations[mosa] - for mosa in self.MOSAS - } - ref_usbs = { - mosa: ref_usb_offsets[mosa] + ref_usb_fluctuations[mosa] - for mosa in self.MOSAS - } - - logging.info("Writing reference beatnote total frequencies to '%s'", output) - self.write_dset(hdf5, 'ref_carriers', ref_carriers) - self.write_dset(hdf5, 'ref_usbs', ref_usbs) - - ## Closing simulation - - logging.info("Closing output file '%s'", output) - hdf5.close() - logging.info("Simulation complete")