Newer
Older
JOSSOUD Olivier
committed
import numpy as np
import pandas as pd
import matplotlib
import os
from scipy import interpolate
from PyQt5.QtCore import QObject, pyqtSignal
import cfatools.processor.flow as flow
import cfatools.processor.collector as collector
import cfatools.processor.encoder as encoder
from cfatools.logreader.dataset import DatasetReader
matplotlib.use('TkAgg')
class AnalyseController(QObject):
sig_simple_step_finished = pyqtSignal(str, name="simple_step_finished")
sig_action_step_finished = pyqtSignal(str, str, str, name="action_step_finished")
JOSSOUD Olivier
committed
def __init__(self):
super(AnalyseController, self).__init__()
self.current_dataset = None
self.volume_dict = dict()
self.encoder_df = pd.DataFrame()
self.iceblock_df = pd.DataFrame()
self.conduct_df = pd.DataFrame()
self.pump_df = pd.DataFrame()
self.picarro_df = pd.DataFrame()
self.collector_df = pd.DataFrame()
self.arrival_df_raw = pd.DataFrame()
self.arrival_df = pd.DataFrame()
JOSSOUD Olivier
committed
def get_dataset_reader(self, dataset_path: str) -> DatasetReader:
root_directory = os.path.dirname(dataset_path)
dataset_name = os.path.basename(dataset_path)
valid, error_msg = DatasetReader.dataset_name_is_valid(dataset_name)
if not valid:
raise ValueError(error_msg)
self.current_dataset = DatasetReader(
JOSSOUD Olivier
committed
base_path=root_directory, dataset=dataset_name
)
self.sig_simple_step_finished.emit("Dataset [" + self.current_dataset.dataset_name + "] loaded.")
return self.current_dataset
JOSSOUD Olivier
committed
def analyse(self, override_arrival_pkl: bool = False):
dataset = self.current_dataset
self.encoder_df, self.iceblock_df, self.conduct_df, self.pump_df, self.picarro_df, self.collector_df = \
JOSSOUD Olivier
committed
flow.get_datasets_data(dataset)
self.__apply_special_corrections__()
# Dataset's "processed" directory
processed_dir = os.path.join(dataset.dataset_path, "processed")
if not os.path.exists(processed_dir):
os.mkdir(processed_dir)
self.sig_simple_step_finished.emit("New 'processed' subdirectory created in " + dataset.dataset_path)
self.collector_df = collector.renumber_flasks(self.collector_df)
self.sig_action_step_finished.emit("Collector's flasks renumbered.", "View", "view_collector_df")
JOSSOUD Olivier
committed
################################################################################################################
# Tubing volumes
self.volumes_dict = flow.get_tubing_volume_dict("../config/tubing_volumes.csv",
max_datetime=self.encoder_df.index[0])
self.sig_action_step_finished.emit("Volume table loaded.", "View", "view_volume_dict")
JOSSOUD Olivier
committed
################################################################################################################
# Arrival datetimes
if override_arrival_pkl:
pkl_filepath = os.path.join(dataset.dataset_path, "binary", "arrival_nearest.pkl")
if os.path.exists(pkl_filepath):
os.remove(pkl_filepath)
self.sig_simple_step_finished.emit("Previous arrival_nearest.pkl file deleted.")
self.arrival_df_raw = flow.get_arrival_df(self.encoder_df, self.pump_df, self.volumes_dict,
parallel=False, dataset=dataset, pkl_name="arrival_nearest.pkl")
if override_arrival_pkl:
self.sig_action_step_finished.emit("Arrival_df computed.", "View", "view_arrival_df")
else:
self.sig_action_step_finished.emit("Arrival_df loaded from binary file.", "View", "view_arrival_df_raw")
self.arrival_df = self.arrival_df_raw.dropna()
self.arrival_df = flow.add_iceblock_info(self.arrival_df, self.iceblock_df)
self.arrival_df = flow.add_melted_height(self.arrival_df, self.encoder_df, self.iceblock_df)
self.arrival_df = flow.add_flask_info(self.arrival_df, self.collector_df)
self.sig_action_step_finished.emit("Arrival_df augmented with iceblock, encoder and flask info.",
"View", "view_arrival_df")
return
JOSSOUD Olivier
committed
# Save as CSV
arrival_filepath = os.path.join(processed_dir, dataset.dataset_name + "_arrival_df.csv")
self.arrival_df.to_csv(arrival_filepath, date_format="%Y-%m-%d %H:%M:%S")
JOSSOUD Olivier
committed
self.sig_simple_step_finished.emit("Arrival_df saved as CSV in " + arrival_filepath)
################################################################################################################
# Conducti rescaled by melting time
conduct_rescaled_df = flow.get_conduct_by_melt_time(self.arrival_df, self.conduct_df, export_in_dataset=dataset)
JOSSOUD Olivier
committed
################################################################################################################
# Picarro rescaled by melting time
if not self.picarro_df.index.is_monotonic_increasing:
self.picarro_df.reset_index(inplace=True)
self.picarro_df['diff_time'] = self.picarro_df["datetime"] - self.picarro_df["datetime"].shift(1)
self.picarro_df = self.picarro_df.loc[
self.picarro_df.index > self.picarro_df.loc[self.picarro_df["diff_time"]
JOSSOUD Olivier
committed
.dt.total_seconds() < 0].index[0]]
self.picarro_df = self.picarro_df.set_index("datetime")
self.picarro_df = self.picarro_df.drop(columns={'diff_time'})
picarro_rescaled_df = flow.get_picarro_by_melt_time(self.arrival_df, self.picarro_df, export_in_dataset=dataset)
JOSSOUD Olivier
committed
################################################################################################################
# Flask
flask_df = flow.get_flask_content(self.arrival_df)
JOSSOUD Olivier
committed
# Save as CSV
flask_df_filepath = os.path.join(processed_dir, dataset.dataset_name + "_flask_df.csv")
flask_df.to_csv(flask_df_filepath, index=False)
# Plot mm per flask
if not all(flask_df["flask"] == 0):
flask_df["mm_diff"] = flask_df["max"] - flask_df["min"]
mm_per_flask_df = flask_df[["flask", "mm_diff"]].groupby("flask").agg(sum)
mm_per_flask_df.plot.bar()
mm_per_flask_df.plot.hist(bins=round(len(mm_per_flask_df.index)/3))
################################################################################################################
# Conducti
# self._conduct_df.plot()
################################################################################################################
# Picarro & conducti vs. melted height
pic_conduc_height_filepath = os.path.join(processed_dir, dataset.dataset_name + "_pic_conduc_height.csv")
pic_conduc_height_df = self.arrival_df[["icbk_code", "icbk_name", "melted_height", "melted_height_icbk"]]
JOSSOUD Olivier
committed
pic_conduc_height_df = pd.merge(pic_conduc_height_df, picarro_rescaled_df, left_index=True, right_index=True)
pic_conduc_height_df = pd.merge(pic_conduc_height_df, conduct_rescaled_df, left_index=True, right_index=True)
pic_conduc_height_df.to_csv(pic_conduc_height_filepath)
################################################################################################################
# Picarro
self.arrival_df = self.arrival_df[self.arrival_df["icbk_code"] > 0]
picarro2_df = pd.merge_asof(left=self.arrival_df[["icbk_code", "icbk_name", "melted_height", "picarro"]],
right=self.picarro_df,
JOSSOUD Olivier
committed
left_on="picarro",
right_index=True)
picarro2_df["iceblock"] = picarro2_df["icbk_code"].astype(str) + '-' + picarro2_df["icbk_name"]
# print(gg.ggplot()
# + gg.geom_line(data=picarro2_df,
# mapping=gg.aes(x='picarro', y='Delta_18_16', color="iceblock"))
# )
#
# print(gg.ggplot()
# + gg.geom_path(data=picarro2_df,
# mapping=gg.aes(x='Delta_18_16', y='melted_height', color="iceblock"))
# )
################################################################################################################
# Calibrate
calib_name = "cal1"
calibrated_dir = os.path.join(dataset.dataset_path, "calibrated")
if not os.path.exists(calibrated_dir):
os.mkdir(calibrated_dir)
calib_filepath = os.path.join(calibrated_dir, dataset.dataset_name + "_" + calib_name + ".csv")
if not os.path.exists(calib_filepath):
template_calib_df = self.iceblock_df[["id", "name", "initial_height"]].copy()
JOSSOUD Olivier
committed
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
template_calib_df["start"] = 0
template_calib_df = template_calib_df\
.melt(["id", "name"])\
.sort_values(["id", "value"])\
.drop(columns=["id", "variable"])
template_calib_df = template_calib_df.rename(columns={"name": "icbk_name",
"initial_height": "melted_height"})
template_calib_df[["depth",
"Delta_18_16_slope", "Delta_18_16_intercept",
"Delta_D_H_slope", "Delta_D_H_intercept"]] = [np.NaN, np.NaN, np.NaN, np.NaN, np.NaN]
template_calib_df.to_csv(calib_filepath, index=False, float_format='%.2f')
raise ValueError("Calib file not found: " + calib_filepath)
calib_df = pd.read_csv(calib_filepath, sep=",")
################################################################################################################
# Calibrate pic_conduct_height
# Absolute depth calibration
for icbk_name in calib_df["icbk_name"].unique():
icbk_calib_df = calib_df.loc[calib_df["icbk_name"] == icbk_name]
depth_interp_func = interpolate.interp1d(x=icbk_calib_df["melted_height"],
y=icbk_calib_df["depth"])
try:
pic_conduc_height_df.loc[pic_conduc_height_df["icbk_name"] == icbk_name, "depth"] \
= depth_interp_func(pic_conduc_height_df
.loc[pic_conduc_height_df["icbk_name"] == icbk_name]["melted_height_icbk"])
except:
print(icbk_name)
# Picarro calibration
if len(calib_df.groupby(["Delta_18_16_slope", "Delta_18_16_intercept",
"Delta_D_H_slope", "Delta_D_H_intercept"]).count()) > 1:
raise ValueError("More than one calibration set for isotope is not yet supported")
isotopic_calib_df = calib_df.iloc[0]
pic_conduc_height_df["Delta_18_16_calib"] = \
pic_conduc_height_df["Delta_18_16"] * isotopic_calib_df.Delta_18_16_slope\
+ isotopic_calib_df.Delta_18_16_intercept
pic_conduc_height_df["Delta_D_H_calib"] = \
pic_conduc_height_df["Delta_D_H"] * isotopic_calib_df.Delta_D_H_slope\
+ isotopic_calib_df.Delta_D_H_intercept
# Save as CSV
pic_conduct_calibrated_filepath = os.path.join(
calibrated_dir,
dataset.dataset_name + "_pic_conduct_calibrated_" + calib_name + ".csv")
pic_conduc_height_df.to_csv(pic_conduct_calibrated_filepath, float_format='%.3f')
################################################################################################################
# Calibrate flasks
for icbk_name in calib_df["icbk_name"].unique():
icbk_calib_df = calib_df.loc[calib_df["icbk_name"] == icbk_name]
depth_interp_func = interpolate.interp1d(x=icbk_calib_df["melted_height"],
y=icbk_calib_df["depth"])
flask_df.loc[flask_df["icbk_name"] == icbk_name, "min_depth"] \
= depth_interp_func(flask_df.loc[flask_df["icbk_name"] == icbk_name]["min"])
flask_df.loc[flask_df["icbk_name"] == icbk_name, "max_depth"] \
= depth_interp_func(flask_df.loc[flask_df["icbk_name"] == icbk_name]["max"])
flask_df["diff_depth"] = flask_df["max_depth"] - flask_df["min_depth"]
flask_calibrated_filepath = os.path.join(calibrated_dir,
dataset.dataset_name + "_flask_calibrated_" + calib_name + ".csv")
flask_df.to_csv(flask_calibrated_filepath, float_format='%.2f', index=False)
def __apply_special_corrections__(self) -> None:
"""Some datasets require some manual correction before being processed, due to errors during the recording"""
if self.current_dataset.dataset_name == "20210428_ASUMA2016_8_19sq":
JOSSOUD Olivier
committed
t1 = pd.Timestamp("2021-04-28 11:02:50", tz="UTC")
t2 = pd.Timestamp("2021-04-28 12:57:56", tz="UTC")
t3 = pd.Timestamp("2021-04-28 13:20:28", tz="UTC")
t4 = pd.Timestamp("2021-04-28 15:19:37", tz="UTC")
self.encoder_df = pd.concat([self.encoder_df.loc[t1:t2], self.encoder_df.loc[t3:t4]])
self.sig_simple_step_finished.emit("Special correction applied on encoder_df.")
JOSSOUD Olivier
committed
if self.current_dataset.dataset_name in ["20210826_ASUMA2016_6-1_sq", "20210827_ASUMA2016_6_11sq"]:
self.picarro_df.index = self.picarro_df.index - pd.Timedelta(115, unit="s")
self.sig_simple_step_finished.emit("Special correction applied on picarro_df.")
JOSSOUD Olivier
committed
if self.current_dataset.dataset_name == "20220317_ASUMA2016_25_19sq":
self.encoder_df = encoder.shift_stacking_event(
self.encoder_df,
JOSSOUD Olivier
committed
old_peak_start=pd.Timestamp("2022-03-17 13:28:27.446578+00:00"),
old_peak_end=pd.Timestamp("2022-03-17 13:28:41.721330+00:00"),
shift=pd.Timedelta(seconds=60))
self.sig_simple_step_finished.emit("Special correction applied on encoder_df.")
JOSSOUD Olivier
committed
if self.current_dataset.dataset_name == "20220329_ASUMA2016_23_12sq":
self.encoder_df = encoder.shift_stacking_event(
self.encoder_df,
JOSSOUD Olivier
committed
old_peak_start=pd.Timestamp("2022-03-29 16:58:11.552587+00:00"),
old_peak_end=pd.Timestamp("2022-03-29 16:58:48.006613+00:00"),
shift=pd.Timedelta(seconds=80))
self.sig_simple_step_finished.emit("Special correction applied on encoder_df.")