Skip to content
Snippets Groups Projects

Draft: Resolve "Use Numpy arrays instead of `ForEachObject` instances"

2 unresolved threads
Files
3
+ 0
333
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Container object for signals.
Authors:
Jean-Baptiste Bayle <j2b.bayle@gmail.com>
"""
import abc
import logging
import numpy
import h5py
import matplotlib.pyplot
logger = logging.getLogger(__name__)
class ForEachObject(abc.ABC):
"""Abstract class which represents a dictionary holding a value for each object."""
def __init__(self, values):
"""Initialize an object with a value or a function of the mosa index.
Args:
values: a value, a dictionary of values, or a function (mosa -> value)
"""
if isinstance(values, dict):
self.dict = {mosa: values[mosa] for mosa in self.indices()}
elif callable(values):
self.dict = {mosa: values(mosa) for mosa in self.indices()}
elif isinstance(values, h5py.Dataset):
self.dict = {mosa: values[mosa] for mosa in self.indices()}
else:
self.dict = {mosa: values for mosa in self.indices()}
@classmethod
@abc.abstractmethod
def indices(cls):
"""Return list of object indices."""
raise NotImplementedError
def transformed(self, transformation):
"""Return a new dictionary from transformed objects.
Args:
transformation: function (mosa, value -> new_value)
"""
return self.__class__(lambda mosa: transformation(mosa, self[mosa]))
def collapsed(self):
"""Turn a numpy arrays containing identical elements into a scalar.
This method can be used to optimize computations when constant time series are involved.
"""
return self.transformed(lambda _, x:
x[0] if isinstance(x, numpy.ndarray) and numpy.all(x == x[0]) else x
)
def write(self, hdf5, dataset):
"""Write values in dataset on HDF5 file.
Args:
hdf5: HDF5 file in which to write
dataset: dataset name or path
"""
# Retreive the maximum size of data
size = 1
for value in self.values():
if numpy.isscalar(value):
continue
if size != 1 and len(value) != size:
raise ValueError(f"incompatible sizes in dictionary '{size}' and '{len(size)}'")
size = max(size, len(value))
logger.debug("Writing dataset of size '%s' with '%s' columns", size, len(self.indices()))
# Write dataset
dtype = numpy.dtype({'names': self.indices(), 'formats': len(self.indices()) * [numpy.float64]})
hdf5.create_dataset(dataset, (size,), dtype=dtype)
for index in self.indices():
hdf5[dataset][index] = self[index]
def __getitem__(self, key):
return self.dict[key]
def __setitem__(self, key, item):
self.dict[key] = item
def values(self):
"""Return dictionary values."""
return self.dict.values()
def keys(self):
"""Return dictionary keys."""
return self.dict.keys()
def items(self):
"""Return dictionary items."""
return self.dict.items()
def plot(self, output=None, dt=1, t0=0, size='auto', title='Signals'):
"""Plot signals for each object.
Args:
output: output file, None to show the plots
dt: sampling period [s]
t0: initial time [s]
size: duration of time series, or 'auto' [samples]
title: plot title
"""
if size == 'auto':
size = len(self) if len(self) > 1 else 100
t = t0 + numpy.arange(size) * dt
# Plot signals
logger.info("Plotting signals for each object")
matplotlib.pyplot.figure(figsize=(12, 4))
for key, signal in self.items():
matplotlib.pyplot.plot(t, numpy.broadcast_to(signal, size), label=key)
matplotlib.pyplot.grid()
matplotlib.pyplot.legend()
matplotlib.pyplot.xlabel("Time [s]")
matplotlib.pyplot.ylabel("Signal")
matplotlib.pyplot.title(title)
# Save or show glitch
if output is not None:
logger.info("Saving plot to %s", output)
matplotlib.pyplot.savefig(output, bbox_inches='tight')
else:
matplotlib.pyplot.show()
def __len__(self):
"""Return maximum size of signals."""
sizes = [1 if numpy.isscalar(signal) else len(signal) for signal in self.values()]
return numpy.max(sizes)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.dict == other.dict
if isinstance(other, dict):
return self.dict == other
return numpy.all([self[index] == other for index in self.indices()])
def __abs__(self):
return self.transformed(lambda index, value: abs(value))
def __neg__(self):
return self.transformed(lambda index, value: -value)
def __add__(self, other):
if isinstance(other, ForEachObject):
if isinstance(other, type(self)):
return self.transformed(lambda index, value: value + other[index])
raise TypeError(f"unsupported operand type for +: '{type(self)}' and '{type(other)}'")
return self.transformed(lambda _, value: value + other)
def __radd__(self, other):
return self + other
def __sub__(self, other):
if isinstance(other, ForEachObject):
if isinstance(other, type(self)):
return self.transformed(lambda index, value: value - other[index])
raise TypeError(f"unsupported operand type for -: '{type(self)}' and '{type(other)}'")
return self.transformed(lambda _, value: value - other)
def __rsub__(self, other):
return -(self - other)
def __mul__(self, other):
if isinstance(other, ForEachObject):
if isinstance(other, type(self)):
return self.transformed(lambda index, value: value * other[index])
raise TypeError(f"unsupported operand type for *: '{type(self)}' and '{type(other)}'")
return self.transformed(lambda _, value: value * other)
def __rmul__(self, other):
return self * other
def __floordiv__(self, other):
if isinstance(other, ForEachObject):
if isinstance(other, type(self)):
return self.transformed(lambda index, value: value // other[index])
raise TypeError(f"unsupported operand type for //: '{type(self)}' and '{type(other)}'")
return self.transformed(lambda _, value: value // other)
def __truediv__(self, other):
if isinstance(other, ForEachObject):
if isinstance(other, type(self)):
return self.transformed(lambda index, value: value / other[index])
raise TypeError(f"unsupported operand type for /: '{type(self)}' and '{type(other)}'")
return self.transformed(lambda _, value: value / other)
def __rtruediv__(self, other):
return (self / other)**(-1)
def __pow__(self, other):
return self.transformed(lambda _, value: value**other)
def __repr__(self):
return repr(self.dict)
class ForEachSC(ForEachObject):
"""Represents a dictionary of values for each spacecraft."""
@classmethod
def indices(cls):
return ['1', '2', '3']
@staticmethod
def distant_left_sc(sc):
"""Return index of distant rleftspacecraft."""
if sc not in ForEachSC.indices():
raise ValueError(f"invalid spacecraft index '{sc}'")
return f'{int(sc) % 3 + 1}'
@staticmethod
def distant_right_sc(sc):
"""Return index of distant right spacecraft."""
if sc not in ForEachSC.indices():
raise ValueError(f"invalid spacecraft index '{sc}'")
return f'{(int(sc) - 2) % 3 + 1}'
@staticmethod
def left_mosa(sc):
"""Return index of left MOSA."""
if sc not in ForEachSC.indices():
raise ValueError(f"invalid spacecraft index '{sc}'")
return f'{sc}{ForEachSC.distant_left_sc(sc)}'
@staticmethod
def right_mosa(sc):
"""Return index of right MOSA."""
if sc not in ForEachSC.indices():
raise ValueError(f"invalid spacecraft index '{sc}'")
return f'{sc}{ForEachSC.distant_right_sc(sc)}'
def for_each_mosa(self):
"""Return a ForEachMOSA instance by sharing the spacecraft values on both MOSAs."""
return ForEachMOSA(lambda mosa: self[ForEachMOSA.sc(mosa)])
def __add__(self, other):
if isinstance(other, ForEachMOSA):
return self.for_each_mosa() + other
return super().__add__(other)
def __sub__(self, other):
if isinstance(other, ForEachMOSA):
return self.for_each_mosa() - other
return super().__sub__(other)
def __mul__(self, other):
if isinstance(other, ForEachMOSA):
return self.for_each_mosa() * other
return super().__mul__(other)
def __floordiv__(self, other):
if isinstance(other, ForEachMOSA):
return self.for_each_mosa() // other
return super().__floordiv__(other)
def __truediv__(self, other):
if isinstance(other, ForEachMOSA):
return self.for_each_mosa() / other
return super().__truediv__(other)
class ForEachMOSA(ForEachObject):
"""Represents a dictionary of values for each moveable optical subassembly (MOSA)."""
@classmethod
def indices(cls):
return ['12', '23', '31', '13', '32', '21']
@staticmethod
def sc(mosa):
"""Return index of spacecraft hosting MOSA."""
return f'{mosa[0]}'
@staticmethod
def distant_mosa(mosa):
"""Return index of distant MOSA.
In practive, we invert the indices to swap emitter and receiver.
"""
if mosa not in ForEachMOSA.indices():
raise ValueError(f"invalid MOSA index '{mosa}'")
return f'{mosa[1]}{mosa[0]}'
@staticmethod
def adjacent_mosa(mosa):
"""Return index of adjacent MOSA.
In practice, we replace the second index by the only unused spacecraft index.
"""
if mosa not in ForEachMOSA.indices():
raise ValueError(f"invalid MOSA index '{mosa}'")
unused = [sc for sc in ForEachSC.indices() if sc not in mosa]
if len(unused) != 1:
raise RuntimeError(f"cannot find adjacent MOSA for '{mosa}'")
return f'{mosa[0]}{unused[0]}'
def distant(self):
"""Return a ForEachMOSA instance for distant MOSAs."""
return ForEachMOSA(lambda mosa: self[ForEachMOSA.distant_mosa(mosa)])
def adjacent(self):
"""Return a ForEachMOSA instance for adjacent MOSAs."""
return ForEachMOSA(lambda mosa: self[ForEachMOSA.adjacent_mosa(mosa)])
def __add__(self, other):
if isinstance(other, ForEachSC):
return self + other.for_each_mosa()
return super().__add__(other)
def __sub__(self, other):
if isinstance(other, ForEachSC):
return self - other.for_each_mosa()
return super().__sub__(other)
def __mul__(self, other):
if isinstance(other, ForEachSC):
return self * other.for_each_mosa()
return super().__mul__(other)
def __floordiv__(self, other):
if isinstance(other, ForEachSC):
return self // other.for_each_mosa()
return super().__floordiv__(other)
def __truediv__(self, other):
if isinstance(other, ForEachSC):
return self / other.for_each_mosa()
return super().__truediv__(other)
Loading