Newer
Older
JOSSOUD Olivier
committed
import numpy as np
import pandas as pd
import matplotlib
import os
from scipy import interpolate
from PyQt5.QtCore import QObject, pyqtSignal
import cfatools.processor.flow as flow
import cfatools.processor.collector as collector
import cfatools.processor.encoder as encoder
from cfatools.logreader.dataset import DatasetReader
JOSSOUD Olivier
committed
class AnalyseController(QObject):
sig_simple_step_finished = pyqtSignal(str, name="simple_step_finished")
sig_action_step_finished = pyqtSignal(str, str, str, name="action_step_finished")
JOSSOUD Olivier
committed
def __init__(self):
super(AnalyseController, self).__init__()
self.current_dataset = None
self.volume_dict = dict()
self.encoder_df = pd.DataFrame()
self.iceblock_df = pd.DataFrame()
self.conduct_df = pd.DataFrame()
self.pump_df = pd.DataFrame()
self.picarro_df = pd.DataFrame()
self.collector_df = pd.DataFrame()
self.arrival_df_raw = pd.DataFrame()
self.arrival_df = pd.DataFrame()
self.arrival_filepath = ""
self.flask_df = pd.DataFrame()
self.flask_df_filepath = ""
self.pic_conduc_height_filepath = ""
self.calib_df = pd.DataFrame()
self.pic_conduct_calibrated_filepath = ""
self.flask_calibrated_filepath = ""
JOSSOUD Olivier
committed
def get_dataset_reader(self, dataset_path: str) -> DatasetReader:
root_directory = os.path.dirname(dataset_path)
dataset_name = os.path.basename(dataset_path)
valid, error_msg = DatasetReader.dataset_name_is_valid(dataset_name)
if not valid:
raise ValueError(error_msg)
self.current_dataset = DatasetReader(base_path=root_directory, dataset=dataset_name)
self.sig_simple_step_finished.emit("Dataset [" + self.current_dataset.dataset_name + "] loaded.")
return self.current_dataset
JOSSOUD Olivier
committed
def analyse(self, override_arrival_pkl: bool = False):
dataset = self.current_dataset
(
self.encoder_df,
self.iceblock_df,
self.conduct_df,
self.pump_df,
self.picarro_df,
self.collector_df,
) = flow.get_datasets_data(dataset)
JOSSOUD Olivier
committed
self.__apply_special_corrections__()
# Dataset's "processed" directory
processed_dir = os.path.join(dataset.dataset_path, "processed")
if not os.path.exists(processed_dir):
os.mkdir(processed_dir)
self.sig_simple_step_finished.emit("New 'processed' subdirectory created in " + dataset.dataset_path)
self.collector_df = collector.renumber_flasks(self.collector_df)
self.sig_action_step_finished.emit("Collector's flasks renumbered.", "View", "view_collector_df")
JOSSOUD Olivier
committed
################################################################################################################
# Tubing volumes
self.volumes_dict = flow.get_tubing_volume_dict(
"../config/tubing_volumes.csv", max_datetime=self.encoder_df.index[0]
)
self.sig_action_step_finished.emit("Volume table loaded.", "View", "view_volume_dict")
JOSSOUD Olivier
committed
################################################################################################################
# Arrival datetimes
if override_arrival_pkl:
pkl_filepath = os.path.join(dataset.dataset_path, "binary", "arrival_nearest.pkl")
if os.path.exists(pkl_filepath):
os.remove(pkl_filepath)
self.sig_simple_step_finished.emit("Previous arrival_nearest.pkl file deleted.")
self.arrival_df_raw = flow.get_arrival_df(
self.encoder_df,
self.pump_df,
self.volumes_dict,
parallel=False,
dataset=dataset,
pkl_name="arrival_nearest.pkl",
)
if override_arrival_pkl:
self.sig_action_step_finished.emit("Arrival_df computed.", "View", "view_arrival_df")
else:
self.sig_action_step_finished.emit("Arrival_df loaded from binary file.", "View", "view_arrival_df_raw")
self.arrival_df = self.arrival_df_raw.dropna()
self.arrival_df = flow.add_iceblock_info(self.arrival_df, self.iceblock_df)
self.arrival_df = flow.add_melted_height(self.arrival_df, self.encoder_df, self.iceblock_df)
self.arrival_df = flow.add_flask_info(self.arrival_df, self.collector_df)
self.sig_action_step_finished.emit(
"Arrival_df augmented with iceblock, encoder and flask info.",
"View",
"view_arrival_df",
)
JOSSOUD Olivier
committed
# Save as CSV
self.arrival_filepath = os.path.join(processed_dir, dataset.dataset_name + "_arrival_df.csv")
self.arrival_df.to_csv(self.arrival_filepath, date_format="%Y-%m-%d %H:%M:%S")
self.sig_action_step_finished.emit(
"Arrival_df saved as CSV in [" + self.arrival_filepath + "]",
"Open",
"open_arrival_csv",
)
JOSSOUD Olivier
committed
################################################################################################################
# Conducti rescaled by melting time
conduct_rescaled_df = flow.get_conduct_by_melt_time(self.arrival_df, self.conduct_df, export_in_dataset=dataset)
self.sig_simple_step_finished.emit("Conducti-R(escaled) created.")
JOSSOUD Olivier
committed
################################################################################################################
# Picarro rescaled by melting time
if not self.picarro_df.index.is_monotonic_increasing:
self.picarro_df.reset_index(inplace=True)
self.picarro_df["diff_time"] = self.picarro_df["datetime"] - self.picarro_df["datetime"].shift(1)
self.picarro_df = self.picarro_df.loc[
self.picarro_df.index
> self.picarro_df.loc[self.picarro_df["diff_time"].dt.total_seconds() < 0].index[0]
]
self.picarro_df = self.picarro_df.set_index("datetime")
self.picarro_df = self.picarro_df.drop(columns={"diff_time"})
picarro_rescaled_df = flow.get_picarro_by_melt_time(self.arrival_df, self.picarro_df, export_in_dataset=dataset)
self.sig_simple_step_finished.emit("Picarro-R(escaled) created.")
JOSSOUD Olivier
committed
################################################################################################################
# Flask
self.flask_df = flow.get_flask_content(self.arrival_df)
self.sig_action_step_finished.emit("Flasks' content computed.", "View", "view_flask_df")
JOSSOUD Olivier
committed
# Save as CSV
self.flask_df_filepath = os.path.join(processed_dir, dataset.dataset_name + "_flask_df.csv")
self.flask_df.to_csv(self.flask_df_filepath, index=False)
self.sig_action_step_finished.emit(
"flask_df saved as CSV in [" + self.arrival_filepath + "]",
"Open",
"open_flask_csv",
)
JOSSOUD Olivier
committed
# Plot mm per flask
# if not all(self.flask_df["flask"] == 0):
# self.flask_df["mm_diff"] = self.flask_df["max"] - self.flask_df["min"]
# mm_per_flask_df = self.flask_df[["flask", "mm_diff"]].groupby("flask").agg(sum)
# mm_per_flask_df.plot.bar()
# mm_per_flask_df.plot.hist(bins=round(len(mm_per_flask_df.index)/3))
JOSSOUD Olivier
committed
################################################################################################################
# Conducti
# self.conduct_df.plot()
JOSSOUD Olivier
committed
################################################################################################################
# Picarro & conducti vs. melted height
self.pic_conduc_height_filepath = os.path.join(processed_dir, dataset.dataset_name + "_pic_conduc_height.csv")
pic_conduc_height_df = self.arrival_df[["icbk_code", "icbk_name", "melted_height", "melted_height_icbk"]]
JOSSOUD Olivier
committed
pic_conduc_height_df = pd.merge(pic_conduc_height_df, picarro_rescaled_df, left_index=True, right_index=True)
pic_conduc_height_df = pd.merge(pic_conduc_height_df, conduct_rescaled_df, left_index=True, right_index=True)
pic_conduc_height_df.to_csv(self.pic_conduc_height_filepath)
self.sig_action_step_finished.emit(
"pic_conduc_height_df saved as CSV in [" + self.pic_conduc_height_filepath + "]",
"Open",
"open_pic_conduc_height_df",
)
JOSSOUD Olivier
committed
################################################################################################################
# Picarro
# self.arrival_df = self.arrival_df[self.arrival_df["icbk_code"] > 0]
# picarro2_df = pd.merge_asof(left=self.arrival_df[["icbk_code", "icbk_name", "melted_height", "picarro"]],
# right=self.picarro_df,
# left_on="picarro",
# right_index=True)
# picarro2_df["iceblock"] = picarro2_df["icbk_code"].astype(str) + '-' + picarro2_df["icbk_name"]
JOSSOUD Olivier
committed
# print(gg.ggplot()
# + gg.geom_line(data=picarro2_df,
# mapping=gg.aes(x='picarro', y='Delta_18_16', color="iceblock"))
# )
#
# print(gg.ggplot()
# + gg.geom_path(data=picarro2_df,
# mapping=gg.aes(x='Delta_18_16', y='melted_height', color="iceblock"))
# )
################################################################################################################
# Calibrate
calib_name = "cal1"
calibrated_dir = os.path.join(dataset.dataset_path, "calibrated")
if not os.path.exists(calibrated_dir):
os.mkdir(calibrated_dir)
self.calib_filepath = os.path.join(calibrated_dir, dataset.dataset_name + "_" + calib_name + ".csv")
if not os.path.exists(self.calib_filepath):
template_calib_df = self.iceblock_df[["id", "name", "initial_height"]].copy()
JOSSOUD Olivier
committed
template_calib_df["start"] = 0
template_calib_df = (
template_calib_df.melt(["id", "name"]).sort_values(["id", "value"]).drop(columns=["id", "variable"])
)
template_calib_df = template_calib_df.rename(columns={"name": "icbk_name", "value": "melted_height"})
template_calib_df[
[
"depth",
"Delta_18_16_slope",
"Delta_18_16_intercept",
"Delta_D_H_slope",
"Delta_D_H_intercept",
]
] = [np.NaN, np.NaN, np.NaN, np.NaN, np.NaN]
template_calib_df.to_csv(self.calib_filepath, index=False, float_format="%.2f", sep=";")
self.calib_df = pd.read_csv(self.calib_filepath, sep=";")
if all(self.calib_df["depth"].isna()):
self.sig_action_step_finished.emit(
"Calibration file is empty: [" + self.calib_filepath + "]",
"Edit",
"edit_calib_df",
)
self.sig_simple_step_finished.emit("Abort calibration.")
else:
self.sig_action_step_finished.emit("Calibration file loaded.", "View", "view_calib_df")
JOSSOUD Olivier
committed
################################################################################################################
# Calibrate pic_conduct_height
# Absolute depth calibration
for icbk_name in self.calib_df["icbk_name"].unique():
icbk_calib_df = self.calib_df.loc[self.calib_df["icbk_name"] == icbk_name]
depth_interp_func = interpolate.interp1d(x=icbk_calib_df["melted_height"], y=icbk_calib_df["depth"])
JOSSOUD Olivier
committed
try:
pic_conduc_height_df.loc[pic_conduc_height_df["icbk_name"] == icbk_name, "depth"] = depth_interp_func(
pic_conduc_height_df.loc[pic_conduc_height_df["icbk_name"] == icbk_name]["melted_height_icbk"]
)
JOSSOUD Olivier
committed
except:
print(icbk_name)
# Picarro calibration
if (
len(
self.calib_df.groupby(
["Delta_18_16_slope", "Delta_18_16_intercept", "Delta_D_H_slope", "Delta_D_H_intercept"]
).count()
)
> 1
):
JOSSOUD Olivier
committed
raise ValueError("More than one calibration set for isotope is not yet supported")
isotopic_calib_df = self.calib_df.iloc[0]
calib_pic_conduc_height_df = pic_conduc_height_df.copy()
calib_pic_conduc_height_df["Delta_18_16_calib"] = (
calib_pic_conduc_height_df["Delta_18_16"] * isotopic_calib_df.Delta_18_16_slope
JOSSOUD Olivier
committed
+ isotopic_calib_df.Delta_18_16_intercept
)
calib_pic_conduc_height_df["Delta_D_H_calib"] = (
calib_pic_conduc_height_df["Delta_D_H"] * isotopic_calib_df.Delta_D_H_slope
JOSSOUD Olivier
committed
+ isotopic_calib_df.Delta_D_H_intercept
JOSSOUD Olivier
committed
# Save as CSV
self.pic_conduct_calibrated_filepath = os.path.join(
calibrated_dir, dataset.dataset_name + "_pic_conduct_calibrated_" + calib_name + ".csv"
)
calib_pic_conduc_height_df.to_csv(self.pic_conduct_calibrated_filepath, float_format="%.3f")
self.sig_action_step_finished.emit(
"Calibrated Picarro & Conducti saved as CSV in [" + self.pic_conduct_calibrated_filepath + "]",
"Open",
"open_calib_pic_conduc_height_df",
)
JOSSOUD Olivier
committed
################################################################################################################
# Calibrate flasks
calib_flask_df = self.flask_df.copy()
for icbk_name in self.calib_df["icbk_name"].unique():
icbk_calib_df = self.calib_df.loc[self.calib_df["icbk_name"] == icbk_name]
depth_interp_func = interpolate.interp1d(x=icbk_calib_df["melted_height"], y=icbk_calib_df["depth"])
calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name, "min_depth"] = depth_interp_func(
calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name]["min"]
)
calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name, "max_depth"] = depth_interp_func(
calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name]["max"]
)
calib_flask_df["diff_depth"] = calib_flask_df["max_depth"] - calib_flask_df["min_depth"]
JOSSOUD Olivier
committed
self.flask_calibrated_filepath = os.path.join(
calibrated_dir, dataset.dataset_name + "_flask_calibrated_" + calib_name + ".csv"
)
calib_flask_df.to_csv(self.flask_calibrated_filepath, float_format="%.2f", index=False)
self.sig_action_step_finished.emit(
"Calibrated Flask saved as CSV in [" + self.flask_calibrated_filepath + "]", "Open", "open_calib_flask_df"
)
JOSSOUD Olivier
committed
def __apply_special_corrections__(self) -> None:
"""Some datasets require some manual correction before being processed, due to errors during the recording"""
if self.current_dataset.dataset_name == "20210428_ASUMA2016_8_19sq":
JOSSOUD Olivier
committed
t1 = pd.Timestamp("2021-04-28 11:02:50", tz="UTC")
t2 = pd.Timestamp("2021-04-28 12:57:56", tz="UTC")
t3 = pd.Timestamp("2021-04-28 13:20:28", tz="UTC")
t4 = pd.Timestamp("2021-04-28 15:19:37", tz="UTC")
self.encoder_df = pd.concat([self.encoder_df.loc[t1:t2], self.encoder_df.loc[t3:t4]])
self.sig_simple_step_finished.emit("Special correction applied on encoder_df.")
JOSSOUD Olivier
committed
if self.current_dataset.dataset_name in ["20210826_ASUMA2016_6-1_sq", "20210827_ASUMA2016_6_11sq"]:
self.picarro_df.index = self.picarro_df.index - pd.Timedelta(115, unit="s")
self.sig_simple_step_finished.emit("Special correction applied on picarro_df.")
JOSSOUD Olivier
committed
if self.current_dataset.dataset_name == "20220317_ASUMA2016_25_19sq":
self.encoder_df = encoder.shift_stacking_event(
self.encoder_df,
JOSSOUD Olivier
committed
old_peak_start=pd.Timestamp("2022-03-17 13:28:27.446578+00:00"),
old_peak_end=pd.Timestamp("2022-03-17 13:28:41.721330+00:00"),
shift=pd.Timedelta(seconds=60),
)
self.sig_simple_step_finished.emit("Special correction applied on encoder_df.")
JOSSOUD Olivier
committed
if self.current_dataset.dataset_name == "20220329_ASUMA2016_23_12sq":
self.encoder_df = encoder.shift_stacking_event(
self.encoder_df,
JOSSOUD Olivier
committed
old_peak_start=pd.Timestamp("2022-03-29 16:58:11.552587+00:00"),
old_peak_end=pd.Timestamp("2022-03-29 16:58:48.006613+00:00"),
shift=pd.Timedelta(seconds=80),
)
self.sig_simple_step_finished.emit("Special correction applied on encoder_df.")
def update_calib_csv(self, calib_df: pd.DataFrame):
calib_df.to_csv(self.calib_filepath, index=False, float_format="%.5f", sep=";")
class Worker(QObject):
"""Worker class used to move long "analyse" computations to another thread, to avoid GUI freezing."""
finished = pyqtSignal()
def __init__(self, controller: AnalyseController):
super(Worker, self).__init__()
self.ctrl = controller
def run(self):
try:
self.ctrl.analyse()
except Exception as e:
print(e)
self.finished.emit()