Skip to content
Snippets Groups Projects
Commit d9b85fbf authored by POLLET Vincent's avatar POLLET Vincent
Browse files

Fix dl1 to dl2

parent bcaff8dc
No related branches found
No related tags found
No related merge requests found
Pipeline #339324 failed
...@@ -8,5 +8,5 @@ dependencies: ...@@ -8,5 +8,5 @@ dependencies:
- pandas - pandas
- numpy - numpy
- python>=3.11 - python>=3.11
- scikit-learn - scikit-learn<1.3 # required to load test models with pickle TODO: update test models !
- pytables - pytables
\ No newline at end of file
...@@ -6,9 +6,11 @@ import numpy as np ...@@ -6,9 +6,11 @@ import numpy as np
import pandas as pd import pandas as pd
from astropy.coordinates import Angle from astropy.coordinates import Angle
from rta_reconstruction.utils.optics import OpticsDescription
# TODO: put key in configuration, not hard-coded # TODO: put key in configuration, not hard-coded
# TODO: make sure likelihood fit is only use in source dependent analysis, therefore really not needed # TODO: make sure likelihood fit is only use in source dependent analysis, therefore really not needed, otherwise need to implement it here
# TODO: do we need to create the "delta_t" column (lstchain does it from dragon time, which we don't have) # TODO: do we need to create the "delta_t" column (lstchain does it from dragon time, which we don't have)
def read_dl1(dl1_file_path: Path, dl1_param_key: str = "/dl1/event/telescope/parameters/LST_LSTCam") -> pd.DataFrame: def read_dl1(dl1_file_path: Path, dl1_param_key: str = "/dl1/event/telescope/parameters/LST_LSTCam") -> pd.DataFrame:
return pd.read_hdf(dl1_file_path, key=dl1_param_key) return pd.read_hdf(dl1_file_path, key=dl1_param_key)
...@@ -56,13 +58,21 @@ def convert_az_and_sin_az_angle_to_degree(dl1_df: pd.DataFrame): ...@@ -56,13 +58,21 @@ def convert_az_and_sin_az_angle_to_degree(dl1_df: pd.DataFrame):
# TODO: make key configurable # TODO: make key configurable
# TODO: make sure we get the right optic per tel ID # TODO: make sure we get the right optic per tel ID
# TODO: make this able to work for other tel ID # TODO: make this able to work for other tel ID
def read_effective_focal_length( def read_telescope_optics(dl1_file_path: Path, optics_table_key: str = "/configuration/instrument/telescope/optics"):
dl1_file_path: Path, optics_table_key: str = "/configuration/instrument/telescope/optics" optics_df = pd.read_hdf(dl1_file_path, key=optics_table_key)
): layout_df = pd.read_hdf(dl1_file_path, key="/configuration/instrument/subarray/layout")
optics_table = pd.read_hdf(dl1_file_path, key=optics_table_key) lst1_layout_row = layout_df.loc[layout_df["tel_id"] == 1].iloc[0]
layout_table = pd.read_hdf(dl1_file_path, key="/configuration/instrument/subarray/layout") optics_row = optics_df.iloc[lst1_layout_row["optics_index"]]
lst1_layout_row = [row for row in layout_table if row["tel_id"] == 1][0] return OpticsDescription(
return optics_table[lst1_layout_row["optics_index"]] name=optics_row.name,
size_type=optics_row.size_type,
n_mirrors=optics_row.n_mirrors,
equivalent_focal_length=optics_row.equivalent_focal_length * u.meter, # TODO: read units from file ?
effective_focal_length=optics_row.effective_focal_length * u.meter,
mirror_area=optics_row.mirror_area * u.meter * u.meter,
n_mirror_tiles=optics_row.n_mirror_tiles,
reflector_shape=optics_row.reflector_shape,
)
# TODO: configuration object for filtering ranges # TODO: configuration object for filtering ranges
......
...@@ -16,8 +16,9 @@ from rta_reconstruction.dl1_reader import ( ...@@ -16,8 +16,9 @@ from rta_reconstruction.dl1_reader import (
filter_dl1, filter_dl1,
interpolate_missing_alt_az, interpolate_missing_alt_az,
read_dl1, read_dl1,
read_effective_focal_length, read_telescope_optics,
) )
from rta_reconstruction.dl2_io import init_dl2_file, write_dl2_df
from rta_reconstruction.image_displacement import ( from rta_reconstruction.image_displacement import (
disp_to_pos, disp_to_pos,
disp_vector_pol2cart, disp_vector_pol2cart,
...@@ -27,6 +28,7 @@ from rta_reconstruction.utils.coordinates import camera_to_altaz, camera_to_show ...@@ -27,6 +28,7 @@ from rta_reconstruction.utils.coordinates import camera_to_altaz, camera_to_show
from rta_reconstruction.utils.logging import init_logging from rta_reconstruction.utils.logging import init_logging
# TODO: move somewhere else ?
def reco_source_position_sky(cog_x, cog_y, disp_dx, disp_dy, focal_length, pointing_alt, pointing_az): def reco_source_position_sky(cog_x, cog_y, disp_dx, disp_dy, focal_length, pointing_alt, pointing_az):
""" """
Compute the reconstructed source position in the sky Compute the reconstructed source position in the sky
...@@ -51,6 +53,7 @@ def reco_source_position_sky(cog_x, cog_y, disp_dx, disp_dy, focal_length, point ...@@ -51,6 +53,7 @@ def reco_source_position_sky(cog_x, cog_y, disp_dx, disp_dy, focal_length, point
# TODO: use pydantic configuration instead of json to Dict # TODO: use pydantic configuration instead of json to Dict
# TODO: use more generic type hints than specific class (sklearn regressor/classifier maybe ?) # TODO: use more generic type hints than specific class (sklearn regressor/classifier maybe ?)
# TODO: refactor in several steps to isolate the "predict" calls, to make it simpler to implement training
def dl1_to_dl2( def dl1_to_dl2(
dl1_df: pd.DataFrame, dl1_df: pd.DataFrame,
dl1_dl2_config: Dict, dl1_dl2_config: Dict,
...@@ -74,16 +77,20 @@ def dl1_to_dl2( ...@@ -74,16 +77,20 @@ def dl1_to_dl2(
# load everything as a fortran order rec-array (pre-allocated with dl2 columns.) # load everything as a fortran order rec-array (pre-allocated with dl2 columns.)
# that way we can in with column names, and avoid pandas copy # that way we can in with column names, and avoid pandas copy
# pytables read has an "out" parameters, but it doesn't check the types between out and datadisk (only copies) # pytables read has an "out" parameters, but it doesn't check the types between out and datadisk (only copies)
dl2_df["log_reco_energy"] = energy_regressor.predict(dl2_df.loc[:, *dl1_dl2_config["energy_features"]]) dl2_df["log_reco_energy"] = energy_regressor.predict(dl2_df.loc[:, dl1_dl2_config["energy_regressor_features"]])
dl2_df["reco_energy"] = 10 ** dl2_df["log_energy"] dl2_df["reco_energy"] = 10 ** dl2_df["log_reco_energy"]
if image_displacement_vector_regressor is not None: if image_displacement_vector_regressor is not None:
disp_vector = image_displacement_vector_regressor.predict( disp_vector = image_displacement_vector_regressor.predict(
dl2_df.loc[:, *dl1_dl2_config["disp_vector_features"]] dl2_df.loc[:, dl1_dl2_config["disp_vector_regressor_features"]]
) )
else: else:
dl2_df["reco_disp_norm"] = disp_norm_regressor.predict(dl2_df.loc[:, *dl1_dl2_config["disp_norm_features"]]) dl2_df["reco_disp_norm"] = disp_norm_regressor.predict(
disp_sign_proba = disp_sign_classifier.predict_proba(dl2_df[:, *dl1_dl2_config["disp_sign_features"]]) dl2_df.loc[:, dl1_dl2_config["disp_norm_regressor_features"]]
)
disp_sign_proba = disp_sign_classifier.predict_proba(
dl2_df[:, dl1_dl2_config["disp_sign_classifier_features"]]
)
# TODO: since we only care about something been gamma or not, 1 probability should be enough # TODO: since we only care about something been gamma or not, 1 probability should be enough
# we need to change the models for it to predict "gamma" or "not-gamma" with single proba. # we need to change the models for it to predict "gamma" or "not-gamma" with single proba.
col = list(disp_sign_classifier.classes_).index(1) col = list(disp_sign_classifier.classes_).index(1)
...@@ -143,7 +150,7 @@ def dl1_to_dl2( ...@@ -143,7 +150,7 @@ def dl1_to_dl2(
dl2_df["reco_alt"] = src_pos_reco.alt.rad dl2_df["reco_alt"] = src_pos_reco.alt.rad
dl2_df["reco_az"] = src_pos_reco.az.rad dl2_df["reco_az"] = src_pos_reco.az.rad
gammaness = gamma_classifier.predict_proba(dl2_df.loc[:, *dl1_dl2_config["classification_features"]]) gammaness = gamma_classifier.predict_proba(dl2_df.loc[:, dl1_dl2_config["gamma_classifier_features"]])
# TODO: replace this with a single proba predictor like disp sign, so no hardcoded class values! # TODO: replace this with a single proba predictor like disp sign, so no hardcoded class values!
# gammaness is the prediction probability for the class 0 (proton: class 101) # gammaness is the prediction probability for the class 0 (proton: class 101)
...@@ -156,6 +163,7 @@ def dl1_to_dl2( ...@@ -156,6 +163,7 @@ def dl1_to_dl2(
def main(): def main():
# TODO: init logging with log files passed as argument or in config
init_logging(log_filename="dl1_to_dl2.log") init_logging(log_filename="dl1_to_dl2.log")
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
...@@ -243,10 +251,10 @@ def main(): ...@@ -243,10 +251,10 @@ def main():
args = parser.parse_args() args = parser.parse_args()
if len(args.dl1_files) != len(args.dl2_file_paths): if len(args.dl1_file_paths) != len(args.dl2_file_paths):
raise argparse.ArgumentTypeError( raise argparse.ArgumentTypeError(
"The number {} of input dl1 files must match the number {} of output DL2 files.".format( "The number {} of input dl1 files must match the number {} of output DL2 files.".format(
len(args.dl1_files), len(args.dl2_files) len(args.dl1_file_paths), len(args.dl2_files)
) )
) )
...@@ -283,44 +291,44 @@ def main(): ...@@ -283,44 +291,44 @@ def main():
energy_regressor = joblib.load(args.energy_regressor_model_path) energy_regressor = joblib.load(args.energy_regressor_model_path)
gamma_classifier = joblib.load(args.gamma_classifier_path) gamma_classifier = joblib.load(args.gamma_classifier_path)
all_features = config["energy_regressor_features"] + config["gamma_classifier_features"]
using_disp_vector = args.disp_vector_regressor_model_path is not None using_disp_vector = args.disp_vector_regressor_model_path is not None
if using_disp_vector: if using_disp_vector:
disp_vector_regressor = joblib.load(args.disp_vector_regressor_model_path) disp_vector_regressor = joblib.load(args.disp_vector_regressor_model_path)
disp_norm_regressor = None disp_norm_regressor = None
disp_sign_classifier = None disp_sign_classifier = None
all_features += config["disp_vector_regressor_features"]
else: else:
disp_vector_regressor = None disp_vector_regressor = None
disp_norm_regressor = joblib.load(args.disp_norm_regressor_model_path) disp_norm_regressor = joblib.load(args.disp_norm_regressor_model_path)
disp_sign_classifier = joblib.load(args.disp_sign_classifier_model_path) disp_sign_classifier = joblib.load(args.disp_sign_classifier_model_path)
all_features += config["disp_norm_regressor_features"] + config["disp_sign_classifier_features"]
for dl1_file_path, dl2_file_path in zip(args.dl1_file_paths, args.dl2_file_paths): for dl1_file_path, dl2_file_path in zip(args.dl1_file_paths, args.dl2_file_paths):
# TODO: read dl1 straight in a numpy array (avoid pandas to avoid copy before passing to sklearn) # TODO: read dl1 straight in a numpy array (avoid pandas to avoid copy before passing to sklearn)
# should be Fortran order ? -> benchmark # should be Fortran order ? -> benchmark
dl1_df = read_dl1(dl1_file_path) # TODO: make table "key" configurable dl1_df = read_dl1(dl1_file_path) # TODO: make table "key" configurable
interpolate_missing_alt_az(dl1_df) # TODO: required after dl1 alt-az maker ? what about simple dropna interpolate_missing_alt_az(dl1_df) # TODO: required after dl1 alt-az maker ? what about simple dropna
effective_focal_length = read_effective_focal_length(dl1_file_path) # TODO: make sure correct tel_optics = read_telescope_optics(dl1_file_path) # TODO: make sure correct
convert_az_and_sin_az_angle_to_degree(dl1_df) # TODO: keep angles in radian... convert_az_and_sin_az_angle_to_degree(
dl1_df
) # TODO: is this required ? (angles come from arctan2, and sin is not used ?)
dl1_df = filter_dl1( dl1_df = filter_dl1(
dl1_df, dl1_df, filters=config["events_filters"], finite_params=all_features
filters=config["events_filters"],
finite_params=config["energy_regression_features"]
+ config["disp_regression_features"]
+ config["particle_classification_features"]
+ config["disp_classification_features"],
) # TODO: config ? (filter events based on column names, filters, ~isnan, etc.) ) # TODO: config ? (filter events based on column names, filters, ~isnan, etc.)
# TODO: is this required ? # TODO: is this required ?
# Update parameters related to target direction on camera frame for MC data # Update parameters related to target direction on camera frame for MC data
# taking into account of the aberration effect using effective focal length # taking into account of the aberration effect using effective focal length
if "disp_norm" in dl1_df.columns: if "disp_norm" in dl1_df.columns:
update_disp_with_effective_focal_length(dl1_df, effective_focal_length=effective_focal_length) update_disp_with_effective_focal_length(dl1_df, effective_focal_length=tel_optics.effective_focal_length)
# TODO: make dl1_to_dl2 not return anything - inplace operations in the df # TODO: make dl1_to_dl2 not return anything - inplace operations in the df
dl2_df = dl1_to_dl2( dl2_df = dl1_to_dl2(
dl1_df, dl1_df,
config, config,
effective_focal_length, tel_optics.effective_focal_length,
energy_regressor, energy_regressor,
gamma_classifier, gamma_classifier,
disp_vector_regressor, disp_vector_regressor,
...@@ -328,7 +336,12 @@ def main(): ...@@ -328,7 +336,12 @@ def main():
disp_sign_classifier, disp_sign_classifier,
) )
write_dl2(dl2_df, dl2_file_path) init_dl2_file(dl2_file_path, dl1_file_path)
write_dl2_df(
dl2_file_path,
dl2_df,
attributes={"config": config},
)
if __name__ == "__main__": if __name__ == "__main__":
......
from pathlib import Path
import tables
from pandas import DataFrame
def init_dl2_file(dl2_file_path: Path, dl1_file_path: Path):
if dl2_file_path.exists():
raise FileExistsError("{} already exists, refusing to overwrite it.".format(dl2_file_path))
# TODO: make these configurable somehow
not_copy_list = [
"/dl1/event/telescope/image/LST_LSTCam",
"/dl1/event/telescope/parameters/LST_LSTCam",
"/dl1/event/telescope/parameters_src_dependent/LST_LSTCam",
"/dl1/event/telescope/likelihood_parameters/LST_LSTCam",
]
with tables.open_file(dl1_file_path, "r") as dl1_f, tables.open_file(dl2_file_path, "w") as dl2_f:
for node in dl1_f.walk_nodes():
if node._v_pathname not in not_copy_list:
if node._v_parent._v_pathname not in dl2_f:
parent = dl2_f.create_group(
node._v_parent._v_parent._v_pathname, node._v_parent._v_name, createparents=True
)
else:
parent = dl2_f.get_node(node._v_parent._v_pathname)
dl2_f.copy_node(node, parent)
# TODO: global metadata in file global attributes ? (soft version, contact)
def write_dl2_df(
dl2_file_path: Path,
dl2_df: DataFrame,
dl2_table_path: str = "/dl2/event/telescope/parameters/LST_LSTCam",
attributes=None,
filters=tables.Filters( # TODO: make those configurable, benchmark performances and compare with pandas "to_hdf"
complevel=1,
complib="blosc:zstd",
fletcher32=True,
bitshuffle=False,
),
):
with tables.open_file(dl2_file_path, "a") as dl2_f:
path, table_name = dl2_table_path.rsplit("/", maxsplit=1) # TODO: use pathlib split ?
dl2_table = dl2_f.create_table(
path,
table_name,
dl2_df.to_records(index=False),
createparents=True,
filters=filters,
)
if attributes is not None:
for key, value in attributes.items():
dl2_table.attrs[key] = value
...@@ -5,16 +5,23 @@ import astropy.units as u ...@@ -5,16 +5,23 @@ import astropy.units as u
import numpy as np import numpy as np
from astropy.coordinates import ( from astropy.coordinates import (
AltAz, AltAz,
Angle,
BaseCoordinateFrame, BaseCoordinateFrame,
BaseRepresentation, BaseRepresentation,
CartesianRepresentation, CartesianRepresentation,
CoordinateAttribute, CoordinateAttribute,
DynamicMatrixTransform,
EarthLocation, EarthLocation,
EarthLocationAttribute, EarthLocationAttribute,
FunctionTransform,
QuantityAttribute, QuantityAttribute,
RepresentationMapping,
SkyCoord, SkyCoord,
TimeAttribute, TimeAttribute,
UnitSphericalRepresentation,
frame_transform_graph,
) )
from astropy.coordinates.matrix_utilities import matrix_transpose, rotation_matrix
from astropy.time import Time from astropy.time import Time
from numpy import broadcast_arrays from numpy import broadcast_arrays
...@@ -145,6 +152,85 @@ class PlanarRepresentation(BaseRepresentation): ...@@ -145,6 +152,85 @@ class PlanarRepresentation(BaseRepresentation):
return "x", "y" return "x", "y"
class TelescopeFrame(BaseCoordinateFrame):
"""
Telescope coordinate frame.
A Frame using a UnitSphericalRepresentation.
This is basically the same as a HorizonCoordinate, but the
origin is at the telescope's pointing direction.
This is used to specify coordinates in the field of view of a
telescope that is independent of the optical properties of the telescope.
``fov_lon`` is aligned with azimuth and ``fov_lat`` is aligned with altitude
of the horizontal coordinate frame as implemented in ``astropy.coordinates.AltAz``.
This is what astropy calls a SkyOffsetCoordinate.
Attributes
----------
telescope_pointing: SkyCoord[AltAz]
Coordinate of the telescope pointing in AltAz
obstime: Tiem
Observation time
location: EarthLocation
Location of the telescope
"""
frame_specific_representation_info = {
UnitSphericalRepresentation: [
RepresentationMapping("lon", "fov_lon"),
RepresentationMapping("lat", "fov_lat"),
]
}
default_representation = UnitSphericalRepresentation
telescope_pointing = CoordinateAttribute(default=None, frame=AltAz)
obstime = TimeAttribute(default=None)
location = EarthLocationAttribute(default=None)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# make sure telescope coordinate is in range [-180°, 180°]
if isinstance(self._data, UnitSphericalRepresentation):
self._data.lon.wrap_angle = Angle(180, unit=u.deg)
@frame_transform_graph.transform(FunctionTransform, TelescopeFrame, TelescopeFrame)
def telescope_to_telescope(from_telescope_coord, to_telescope_frame):
"""Transform between two skyoffset frames."""
intermediate_from = from_telescope_coord.transform_to(from_telescope_coord.telescope_pointing)
intermediate_to = intermediate_from.transform_to(to_telescope_frame.telescope_pointing)
return intermediate_to.transform_to(to_telescope_frame)
@frame_transform_graph.transform(DynamicMatrixTransform, AltAz, TelescopeFrame)
def altaz_to_telescope(altaz_coord, telescope_frame):
"""Convert a reference coordinate to an sky offset frame."""
# Define rotation matrices along the position angle vector, and
# relative to the telescope_pointing.
telescope_pointing = telescope_frame.telescope_pointing.represent_as(UnitSphericalRepresentation)
mat1 = rotation_matrix(-telescope_pointing.lat, "y")
mat2 = rotation_matrix(telescope_pointing.lon, "z")
return mat1 @ mat2
@frame_transform_graph.transform(DynamicMatrixTransform, TelescopeFrame, AltAz)
def telescope_to_altaz(telescope_coord, altaz_frame):
"""Convert an sky offset frame coordinate to the reference frame"""
# use the forward transform, but just invert it
mat = altaz_to_telescope(altaz_frame, telescope_coord)
# transpose is the inverse because mat is a rotation matrix
return matrix_transpose(mat)
class CameraFrame(BaseCoordinateFrame): class CameraFrame(BaseCoordinateFrame):
""" """
Camera coordinate frame. Camera coordinate frame.
...@@ -187,6 +273,82 @@ class CameraFrame(BaseCoordinateFrame): ...@@ -187,6 +273,82 @@ class CameraFrame(BaseCoordinateFrame):
location = EarthLocationAttribute(default=None) location = EarthLocationAttribute(default=None)
@frame_transform_graph.transform(FunctionTransform, CameraFrame, TelescopeFrame)
def camera_to_telescope(camera_coord, telescope_frame):
"""
Transformation between CameraFrame and TelescopeFrame.
Is called when a SkyCoord is transformed from CameraFrame into TelescopeFrame
"""
x_pos = camera_coord.cartesian.x
y_pos = camera_coord.cartesian.y
rot = camera_coord.rotation
if rot == 0: # if no rotation applied save a few cycles
x_rotated = x_pos
y_rotated = y_pos
else:
cosrot = np.cos(rot)
sinrot = np.sin(rot)
x_rotated = x_pos * cosrot - y_pos * sinrot
y_rotated = x_pos * sinrot + y_pos * cosrot
focal_length = camera_coord.focal_length
if focal_length.shape == () and focal_length.value == 0:
raise ValueError("Coordinate in CameraFrame is missing focal_length information")
# this assumes an equidistant mapping function of the telescope optics
# or a small angle approximation of most other mapping functions
# this could be replaced by actually defining the mapping function
# as an Attribute of `CameraFrame` that maps f(r, focal_length) -> theta,
# where theta is the angle to the optical axis and r is the distance
# to the camera center in the focal plane
fov_lat = u.Quantity((x_rotated / focal_length).to_value(u.dimensionless_unscaled), u.rad, copy=False)
fov_lon = u.Quantity((y_rotated / focal_length).to_value(u.dimensionless_unscaled), u.rad, copy=False)
representation = UnitSphericalRepresentation(lat=fov_lat, lon=fov_lon)
return telescope_frame.realize_frame(representation)
@frame_transform_graph.transform(FunctionTransform, TelescopeFrame, CameraFrame)
def telescope_to_camera(telescope_coord, camera_frame):
"""
Transformation between TelescopeFrame and CameraFrame
Is called when a SkyCoord is transformed from TelescopeFrame into CameraFrame
"""
x_pos = telescope_coord.fov_lat
y_pos = telescope_coord.fov_lon
# reverse the rotation applied to get to this system
rot = -camera_frame.rotation
if rot.value == 0.0: # if no rotation applied save a few cycles
x_rotated = x_pos
y_rotated = y_pos
else: # or else rotate all positions around the camera centre
cosrot = np.cos(rot)
sinrot = np.sin(rot)
x_rotated = x_pos * cosrot - y_pos * sinrot
y_rotated = x_pos * sinrot + y_pos * cosrot
focal_length = camera_frame.focal_length
if focal_length.shape == () and focal_length.value == 0:
raise ValueError("CameraFrame is missing focal_length information")
# this assumes an equidistant mapping function of the telescope optics
# or a small angle approximation of most other mapping functions
# this could be replaced by actually defining the mapping function
# as an Attribute of `CameraFrame` that maps f(theta, focal_length) -> r,
# where theta is the angle to the optical axis and r is the distance
# to the camera center in the focal plane
x_rotated = x_rotated.to_value(u.rad) * focal_length
y_rotated = y_rotated.to_value(u.rad) * focal_length
representation = CartesianRepresentation(x_rotated, y_rotated, 0 * u.m)
return camera_frame.realize_frame(representation)
def sky_to_camera(alt, az, focal, pointing_alt, pointing_az): def sky_to_camera(alt, az, focal, pointing_alt, pointing_az):
""" """
Coordinate transform from aky position (alt, az) (in angles) Coordinate transform from aky position (alt, az) (in angles)
......
from enum import Enum, StrEnum, unique
import astropy.units as u
import numpy as np
@unique
class SizeType(StrEnum):
"""
Enumeration of different telescope sizes (LST, MST, SST)
"""
#: Unkown
UNKNOWN = "UNKNOWN"
#: A telescope with a mirror diameter larger than 16m
LST = "LST"
#: A telescope with a mirror diameter larger than 8m
MST = "MST"
#: Telescopes with a mirror diameter smaller than 8m
SST = "SST"
@unique
class ReflectorShape(Enum):
"""
Enumeration of the different reflector shapes
"""
#: Unkown
UNKNOWN = "UNKNOWN"
#: A telescope with a parabolic dish
PARABOLIC = "PARABOLIC"
#: A telescope with a Davies--Cotton dish
DAVIES_COTTON = "DAVIES_COTTON"
#: A telescope with a hybrid between parabolic and Davies--Cotton dish
HYBRID = "HYBRID"
#: A dual mirror Schwarzschild-Couder reflector
SCHWARZSCHILD_COUDER = "SCHWARZSCHILD_COUDER"
class OpticsDescription:
"""
Describes the optics of a Cherenkov Telescope mirror
The string representation of an `OpticsDescription` will be a combination
of the telescope-type and sub-type as follows: "type-subtype". You can
also get each individually.
Parameters
----------
name : str
Name of this optical system
n_mirrors : int
Number of mirrors, i. e. 2 for Schwarzschild-Couder else 1
equivalent_focal_length : astropy.units.Quantity[length]
Equivalent focal-length of telescope, independent of which type of
optics (as in the Monte-Carlo). This is the nominal focal length
for single mirror telescopes and the equivalent focal length for dual
mirror telescopes.
effective_focal_length : astropy.units.Quantity[length]
The effective_focal_length is the focal length estimated from
ray tracing to correct for coma aberration. It is thus not automatically
available for all simulations, but only if it was set beforehand
in the simtel configuration. This is the focal length that should be
used for transforming from camera frame to telescope frame for all
reconstruction tasks to correct for the mean aberration.
mirror_area : astropy.units.Quantity[area]
total reflective surface area of the optical system (in m^2)
n_mirror_tiles : int
number of mirror facets
Raises
------
ValueError:
if tel_type or mirror_type are not one of the accepted values
TypeError, astropy.units.UnitsError:
if the units of one of the inputs are missing or incompatible
"""
CURRENT_TAB_VERSION = "4.0"
COMPATIBLE_VERSIONS = {"4.0"}
__slots__ = (
"name",
"size_type",
"effective_focal_length",
"equivalent_focal_length",
"mirror_area",
"n_mirrors",
"n_mirror_tiles",
"reflector_shape",
)
@u.quantity_input(
mirror_area=u.m**2,
equivalent_focal_length=u.m,
effective_focal_length=u.m,
)
def __init__(
self,
name,
size_type,
n_mirrors,
equivalent_focal_length,
effective_focal_length,
mirror_area,
n_mirror_tiles,
reflector_shape,
):
self.name = name
self.size_type = SizeType(size_type)
self.reflector_shape = ReflectorShape(reflector_shape)
self.equivalent_focal_length = equivalent_focal_length.to(u.m)
self.effective_focal_length = effective_focal_length.to(u.m)
self.mirror_area = mirror_area
self.n_mirrors = n_mirrors
self.n_mirror_tiles = n_mirror_tiles
def __hash__(self):
"""Make this hashable, so it can be used as dict keys or in sets"""
# From python >= 3.10, hash of nan is random, we want a fixed hash also for
# unknown effective focal length:
if np.isnan(self.effective_focal_length.value):
effective_focal_length = -1
else:
effective_focal_length = self.effective_focal_length.to_value(u.m)
return hash(
(
round(self.equivalent_focal_length.to_value(u.m), 4),
round(effective_focal_length, 4),
round(self.mirror_area.to_value(u.m**2)),
self.size_type.value,
self.reflector_shape.value,
self.n_mirrors,
self.n_mirror_tiles,
)
)
def __eq__(self, other):
"""For eq, we just compare equal hash"""
return hash(self) == hash(other)
def __str__(self):
return self.name
import sys
from pathlib import Path from pathlib import Path
import pytest import pytest
from rta_reconstruction.dl1_to_dl2 import main from rta_reconstruction.dl1_to_dl2 import main
from tests.utils.hdf import compare_hdf5
@pytest.mark.skip(reason="Not implemented yet!") def test_dl1_to_dl2(monkeypatch: pytest.MonkeyPatch, tmp_path: Path, request: pytest.FixtureRequest):
def test_dl1_to_dl2(monkeypatch: pytest.MonkeyPatch, tmp_path: Path): current_dir = Path(request.path).resolve().parent
raise NotImplementedError() dl1_file_path = current_dir / "resources/dl1_LST_Run06892_gammaness_cut_70_sample_500.h5"
config_path = current_dir / "resources/lstchain_configuration.json"
energy_regressor_model_path = current_dir / "resources/energy_regressor.sav"
gamma_classifier_model_path = current_dir / "resources/gamma_classifier.sav"
image_displacement_vector_regressor_model_path = current_dir / "resources/image_displacement_vector_regressor.sav"
dl2_file_path = tmp_path / "dl2_LST_Run06892_gammaness_cut_70_sample_500.h5"
with monkeypatch.context() as mpc:
mpc.setattr(
sys,
"argv",
[
sys.argv[0],
"-c",
str(config_path),
"-i",
str(dl1_file_path),
"-e",
str(energy_regressor_model_path),
"-g",
str(gamma_classifier_model_path),
"-v",
str(image_displacement_vector_regressor_model_path),
"-o",
str(dl2_file_path),
],
)
main()
assert dl2_file_path.exists()
compare_hdf5(current_dir / "resources/dl1_LST_Run06892_gammaness_cut_70_sample_500.h5", dl2_file_path)
from pathlib import Path
import numpy as np
import tables
def compare_hdf5(file1_pathname: Path, file2_pathname: Path):
with tables.open_file(file1_pathname, "r") as f1, tables.open_file(file2_pathname, "r") as f2:
for ref_node in f1.walk_nodes():
assert ref_node._v_pathname in f2
if isinstance(ref_node, tables.Table):
output_node = f1.get_node(ref_node._v_pathname)
for name in ref_node.colnames:
ref_values = ref_node.cols._f_col(name)
output_values = output_node.cols._f_col(name)
if np.issubdtype(ref_values.dtype, np.number):
assert np.allclose(ref_values[:], output_values[:], equal_nan=True)
else:
assert all([ref_values[idx] == output_values[idx] for idx in range(len(ref_values))])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment