Skip to content
Snippets Groups Projects
Commit 866e666f authored by Jean-Baptiste Bayle's avatar Jean-Baptiste Bayle
Browse files

Use containers for MOSA and spacecraft signals

parent 86452ad5
No related branches found
No related tags found
1 merge request!2Add instrument simulation
......@@ -15,7 +15,7 @@ max-line-length=120
[DESIGN]
# Maximum number of locals for function / method body
max-locals=20
max-locals=30
# Maximum number of arguments for function / method
max-args=10
......@@ -6,4 +6,4 @@ from .meta import __version__
from .meta import __author__
from .meta import __email__
from .lisa import Instrument
from .instrument import Instrument
......@@ -7,108 +7,163 @@ Authors:
Jean-Baptiste Bayle <j2b.bayle@gmail.com>
"""
# TODO:
# * modulated beam, 4 args (carrier_offsets, carrier_fluctuations, usb_offsets, usb_fluctuations)
# * args can be scalars (repeated in dict), dict (check and convert to float), callable (call with mosa as arg)
import abc
import logging
import numpy
import h5py
class Signal:
"""Represent a signal expressed as frequency offsets and fluctuations."""
class ForEachObject(abc.ABC):
"""Abstract class which represents a dictionary holding a value for each object."""
def __init__(self, offsets=0, fluctuations=0):
"""Initialize a signal from frequency offsets and fluctuations.
def __init__(self, values):
"""Initialize an object with a value or a function of the mosa index.
Args:
offsets: frequency offsets [Hz]
fluctuations: frequency fluctuations [Hz]
values: a value, a dictionary of values, or a function (mosa -> value)
"""
if callable(offsets):
self.offsets = {mosa: offsets(mosa) for mosa in Instrument.MOSAS}
if isinstance(values, dict):
self.dict = {mosa: values[mosa] for mosa in self.indices()}
elif callable(values):
self.dict = {mosa: values(mosa) for mosa in self.indices()}
elif isinstance(values, h5py.Dataset):
self.dict = {mosa: values[mosa] for mosa in self.indices()}
else:
self.offsets = Instrument.mosa_dict(offsets)
self.dict = {mosa: values for mosa in self.indices()}
if callable(fluctuations):
self.fluctuations = {mosa: fluctuations(mosa) for mosa in Instrument.MOSAS}
else:
self.fluctuations = Instrument.mosa_dict(fluctuations)
@classmethod
@abc.abstractmethod
def indices(cls):
"""Return list of object indices."""
raise NotImplementedError
def transformed(self, offsets=lambda x, mosa: x, fluctuations=lambda x, mosa: x):
"""Return a new two-variable signal transforming offsets and fluctuations.
def transform(self, transformation):
"""Transform dictionary on-the-spot.
Args:
offsets: function of (offsets, mosa) returning new offsets [Hz]
fluctuations: function of (fluctuations, mosa) returning new fluctuations [Hz]
Returns:
A new `Signal` isntance where offsets and fluctuations have been transformed.
transformation: function (mosa, value -> new_value)
"""
return self.__class__(
offsets={
mosa: offsets(self.offsets[mosa], mosa)
for mosa in Instrument.MOSAS
},
fluctuations={
mosa: fluctuations(self.fluctuations[mosa], mosa)
for mosa in Instrument.MOSAS
},
)
def reduce(self, function=lambda offsets, fluctuations: 0):
"""Compute a new MOSA dictionary from offsets and fluctuations.
self.dict = {mosa: transformation(mosa, self[mosa]) for mosa in self.indices()}
def transformed(self, transformation):
"""Return a new dictionary from transformed objects.
Args:
function: function of (offsets, fluctuations) returning new value
transformation: function (mosa, value -> new_value)
"""
return {
mosa: function(self.offsets[mosa], self.fluctuations[mosa])
for mosa in Instrument.MOSAS
}
@property
def totalfreq(self):
"""Return total frequencies, as the sum of offsets and fluctuations."""
return self.reduce(lambda offsets, fluctuations: offsets + fluctuations)
return self.__class__(lambda mosa: transformation(mosa, self[mosa]))
class ModulatedBeam:
"""Represent a modulated beam, with a carrier, an upper sideband, and optionally timer deviations."""
def __init__(self, carrier, usb, timer_deviations=None):
"""Initialize a modulated beam from carrier and upper sideband signals.
def write(self, hdf5, dataset):
"""Write values in dataset on HDF5 file.
Args:
carrier: carrier signal
usb: upper sideband signal
timer_deviations: timer deviations [s]
hdf5: HDF5 file in which to write
dataset: dataset name or path
"""
if not isinstance(carrier) or not isinstance(usb, Signal):
raise TypeError("carrier and upper sideband should be instances of `Signal`")
self.carrier = carrier
self.usb = usb
self.timer_deviations = Instrument.mosa_dict(timer_deviations)
# Retreive the maximum size of data
size = 1
for value in self.values():
if numpy.isscalar(value):
continue
if size != 1 and len(value) != size:
raise ValueError(f"incompatible sizes in dictionary '{size}' and '{len(size)}'")
size = max(size, len(value))
logging.debug("Writing dataset of size '%s' with '%s' columns", size, len(self.indices()))
# Write dataset
dtype = numpy.dtype({'names': self.indices(), 'formats': len(self.indices()) * [numpy.float64]})
hdf5.create_dataset(dataset, (size,), dtype=dtype)
for index in self.indices():
hdf5[dataset][index] = self[index]
def __getitem__(self, key):
return self.dict[key]
def __setitem__(self, key, item):
self.dict[key] = item
def values(self):
"""Return dictionary values."""
return self.dict.values()
def keys(self):
"""Return dictionary keys."""
return self.dict.keys()
def items(self):
"""Return dictionary items."""
return self.dict.items()
def __repr__(self):
return repr(self.dict)
class ForEachSC(ForEachObject):
"""Represents a dictionary of values for each spacecraft."""
@classmethod
def indices(cls):
return ['1', '2', '3']
@staticmethod
def distant_left(sc):
"""Return index of distant left spacecraft."""
if sc not in ForEachSC.indices():
raise ValueError(f"invalid spacecraft index '{sc}'")
return f'{(int(sc) - 2) % 3 + 1}'
@staticmethod
def distant_right(sc):
"""Return index of distant right spacecraft."""
if sc not in ForEachSC.indices():
raise ValueError(f"invalid spacecraft index '{sc}'")
return f'{int(sc) % 3 + 1}'
@staticmethod
def left_mosa(sc):
"""Return index of left MOSA."""
if sc not in ForEachSC.indices():
raise ValueError(f"invalid spacecraft index '{sc}'")
return f'{sc}{ForEachSC.distant_left(sc)}'
@staticmethod
def right_mosa(sc):
"""Return index of right MOSA."""
if sc not in ForEachSC.indices():
raise ValueError(f"invalid spacecraft index '{sc}'")
return f'{sc}{ForEachSC.distant_right(sc)}'
class ForEachMOSA(ForEachObject):
"""Represents a dictionary of values for each moveable optical subassembly (MOSA)."""
@classmethod
def indices(cls):
return ['12', '23', '31', '13', '32', '21']
@staticmethod
def sc(mosa):
"""Return index of spacecraft hosting MOSA."""
return f'{mosa[0]}'
@staticmethod
def distant(mosa):
"""Return index of distant MOSA.
In practive, we invert the indices to swap emitter and receiver.
"""
if mosa not in ForEachMOSA.indices():
raise ValueError(f"invalid MOSA index '{mosa}'")
return f'{mosa[1]}{mosa[0]}'
def transformed(self,
carrier_offsets=lambda x, mosa: x, carrier_fluctuations=lambda x, mosa: x,
usb_offsets=lambda x, mosa: x, usb_fluctuations=lambda x, mosa: x,
timer_deviations=lambda x, mosa: x):
"""Return a new modulated beam after applying transformations.
@staticmethod
def adjacent(mosa):
"""Return index of adjacent MOSA.
Args:
carrier_offsets: function of (offsets, mosa) returning new carrier offsets [Hz]
carrier_fluctuations: function of (fluctuations, mosa) returning new carrier fluctuations [Hz]
usb_offsets: function of (offsets, mosa) returning new upper sideband offsets [Hz]
usb_fluctuations: function of (fluctuations, mosa) returning new upper sideband fluctuations [Hz]
timer_deviations: function of (deviations, mosa) return new timer deviations [s]
Returns:
A new `ModulatedBeam` isntance where signals have been transformed.
In practice, we replace the second index by the only unused spacecraft index.
"""
return self.__class__(
carrier=self.carrier.transformed(carrier_offsets, carrier_fluctuations),
usb=self.usb.transformed(usb_offsets, usb_fluctuations),
timer_deviations={
mosa: timer_deviations(self.timer_deviations[mosa], mosa)
for mosa in Instrument.MOSAS
}
)
if mosa not in ForEachMOSA.indices():
raise ValueError(f"invalid MOSA index '{mosa}'")
unused = [sc for sc in ForEachSC.indices() if sc not in mosa]
if len(unused) != 1:
raise RuntimeError(f"cannot find adjacent MOSA for '{mosa}'")
return f'{mosa[0]}{unused[0]}'
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment