import numpy as np import pandas as pd import matplotlib import os from scipy import interpolate from PyQt5.QtCore import QObject, pyqtSignal import cfatools.processor.flow as flow import cfatools.processor.collector as collector import cfatools.processor.encoder as encoder from cfatools.logreader.dataset import DatasetReader matplotlib.use('TkAgg') class AnalyseController(QObject): sig_simple_step_finished = pyqtSignal(str, name="simple_step_finished") sig_action_step_finished = pyqtSignal(str, str, str, name="action_step_finished") def __init__(self): super(AnalyseController, self).__init__() self.current_dataset = None self.volume_dict = dict() self.encoder_df = pd.DataFrame() self.iceblock_df = pd.DataFrame() self.conduct_df = pd.DataFrame() self.pump_df = pd.DataFrame() self.picarro_df = pd.DataFrame() self.collector_df = pd.DataFrame() self.arrival_df_raw = pd.DataFrame() self.arrival_df = pd.DataFrame() self.arrival_filepath = "" self.flask_df = pd.DataFrame() self.flask_df_filepath = "" self.pic_conduc_height_filepath = "" self.calib_df = pd.DataFrame() self.pic_conduct_calibrated_filepath = "" self.flask_calibrated_filepath = "" def get_dataset_reader(self, dataset_path: str) -> DatasetReader: root_directory = os.path.dirname(dataset_path) dataset_name = os.path.basename(dataset_path) valid, error_msg = DatasetReader.dataset_name_is_valid(dataset_name) if not valid: raise ValueError(error_msg) self.current_dataset = DatasetReader( base_path=root_directory, dataset=dataset_name ) self.sig_simple_step_finished.emit("Dataset [" + self.current_dataset.dataset_name + "] loaded.") return self.current_dataset def analyse(self, override_arrival_pkl: bool = False): dataset = self.current_dataset self.encoder_df, self.iceblock_df, self.conduct_df, self.pump_df, self.picarro_df, self.collector_df = \ flow.get_datasets_data(dataset) self.__apply_special_corrections__() # Dataset's "processed" directory processed_dir = os.path.join(dataset.dataset_path, "processed") if not os.path.exists(processed_dir): os.mkdir(processed_dir) self.sig_simple_step_finished.emit("New 'processed' subdirectory created in " + dataset.dataset_path) self.collector_df = collector.renumber_flasks(self.collector_df) self.sig_action_step_finished.emit("Collector's flasks renumbered.", "View", "view_collector_df") ################################################################################################################ # Tubing volumes self.volumes_dict = flow.get_tubing_volume_dict("../config/tubing_volumes.csv", max_datetime=self.encoder_df.index[0]) self.sig_action_step_finished.emit("Volume table loaded.", "View", "view_volume_dict") ################################################################################################################ # Arrival datetimes if override_arrival_pkl: pkl_filepath = os.path.join(dataset.dataset_path, "binary", "arrival_nearest.pkl") if os.path.exists(pkl_filepath): os.remove(pkl_filepath) self.sig_simple_step_finished.emit("Previous arrival_nearest.pkl file deleted.") self.arrival_df_raw = flow.get_arrival_df(self.encoder_df, self.pump_df, self.volumes_dict, parallel=False, dataset=dataset, pkl_name="arrival_nearest.pkl") if override_arrival_pkl: self.sig_action_step_finished.emit("Arrival_df computed.", "View", "view_arrival_df") else: self.sig_action_step_finished.emit("Arrival_df loaded from binary file.", "View", "view_arrival_df_raw") self.arrival_df = self.arrival_df_raw.dropna() self.arrival_df = flow.add_iceblock_info(self.arrival_df, self.iceblock_df) self.arrival_df = flow.add_melted_height(self.arrival_df, self.encoder_df, self.iceblock_df) self.arrival_df = flow.add_flask_info(self.arrival_df, self.collector_df) self.sig_action_step_finished.emit("Arrival_df augmented with iceblock, encoder and flask info.", "View", "view_arrival_df") # Save as CSV self.arrival_filepath = os.path.join(processed_dir, dataset.dataset_name + "_arrival_df.csv") self.arrival_df.to_csv(self.arrival_filepath, date_format="%Y-%m-%d %H:%M:%S") self.sig_action_step_finished.emit("Arrival_df saved as CSV in [" + self.arrival_filepath + "]", "Open", "open_arrival_csv") ################################################################################################################ # Conducti rescaled by melting time conduct_rescaled_df = flow.get_conduct_by_melt_time(self.arrival_df, self.conduct_df, export_in_dataset=dataset) self.sig_simple_step_finished.emit("Conducti-R(escaled) created.") ################################################################################################################ # Picarro rescaled by melting time if not self.picarro_df.index.is_monotonic_increasing: self.picarro_df.reset_index(inplace=True) self.picarro_df['diff_time'] = self.picarro_df["datetime"] - self.picarro_df["datetime"].shift(1) self.picarro_df = self.picarro_df.loc[ self.picarro_df.index > self.picarro_df.loc[self.picarro_df["diff_time"] .dt.total_seconds() < 0].index[0]] self.picarro_df = self.picarro_df.set_index("datetime") self.picarro_df = self.picarro_df.drop(columns={'diff_time'}) picarro_rescaled_df = flow.get_picarro_by_melt_time(self.arrival_df, self.picarro_df, export_in_dataset=dataset) self.sig_simple_step_finished.emit("Picarro-R(escaled) created.") ################################################################################################################ # Flask self.flask_df = flow.get_flask_content(self.arrival_df) self.sig_action_step_finished.emit("Flasks' content computed.", "View", "view_flask_df") # Save as CSV self.flask_df_filepath = os.path.join(processed_dir, dataset.dataset_name + "_flask_df.csv") self.flask_df.to_csv(self.flask_df_filepath, index=False) self.sig_action_step_finished.emit("flask_df saved as CSV in [" + self.arrival_filepath + "]", "Open", "open_flask_csv") # Plot mm per flask # if not all(self.flask_df["flask"] == 0): # self.flask_df["mm_diff"] = self.flask_df["max"] - self.flask_df["min"] # mm_per_flask_df = self.flask_df[["flask", "mm_diff"]].groupby("flask").agg(sum) # mm_per_flask_df.plot.bar() # mm_per_flask_df.plot.hist(bins=round(len(mm_per_flask_df.index)/3)) ################################################################################################################ # Conducti # self.conduct_df.plot() ################################################################################################################ # Picarro & conducti vs. melted height self.pic_conduc_height_filepath = os.path.join(processed_dir, dataset.dataset_name + "_pic_conduc_height.csv") pic_conduc_height_df = self.arrival_df[["icbk_code", "icbk_name", "melted_height", "melted_height_icbk"]] pic_conduc_height_df = pd.merge(pic_conduc_height_df, picarro_rescaled_df, left_index=True, right_index=True) pic_conduc_height_df = pd.merge(pic_conduc_height_df, conduct_rescaled_df, left_index=True, right_index=True) pic_conduc_height_df.to_csv(self.pic_conduc_height_filepath) self.sig_action_step_finished.emit( "pic_conduc_height_df saved as CSV in [" + self.pic_conduc_height_filepath + "]", "Open", "open_pic_conduc_height_df") ################################################################################################################ # Picarro # self.arrival_df = self.arrival_df[self.arrival_df["icbk_code"] > 0] # picarro2_df = pd.merge_asof(left=self.arrival_df[["icbk_code", "icbk_name", "melted_height", "picarro"]], # right=self.picarro_df, # left_on="picarro", # right_index=True) # picarro2_df["iceblock"] = picarro2_df["icbk_code"].astype(str) + '-' + picarro2_df["icbk_name"] # print(gg.ggplot() # + gg.geom_line(data=picarro2_df, # mapping=gg.aes(x='picarro', y='Delta_18_16', color="iceblock")) # ) # # print(gg.ggplot() # + gg.geom_path(data=picarro2_df, # mapping=gg.aes(x='Delta_18_16', y='melted_height', color="iceblock")) # ) ################################################################################################################ # Calibrate calib_name = "cal1" calibrated_dir = os.path.join(dataset.dataset_path, "calibrated") if not os.path.exists(calibrated_dir): os.mkdir(calibrated_dir) calib_filepath = os.path.join(calibrated_dir, dataset.dataset_name + "_" + calib_name + ".csv") if not os.path.exists(calib_filepath): template_calib_df = self.iceblock_df[["id", "name", "initial_height"]].copy() template_calib_df["start"] = 0 template_calib_df = template_calib_df\ .melt(["id", "name"])\ .sort_values(["id", "value"])\ .drop(columns=["id", "variable"]) template_calib_df = template_calib_df.rename(columns={"name": "icbk_name", "initial_height": "melted_height"}) template_calib_df[["depth", "Delta_18_16_slope", "Delta_18_16_intercept", "Delta_D_H_slope", "Delta_D_H_intercept"]] = [np.NaN, np.NaN, np.NaN, np.NaN, np.NaN] template_calib_df.to_csv(calib_filepath, index=False, float_format='%.2f') self.calib_df = pd.read_csv(calib_filepath, sep=",") if len(self.calib_df.index) < 1: self.sig_simple_step_finished.emit("Calibration file is empty: [" + calib_filepath + "]") self.sig_simple_step_finished.emit("Abort calibration.") else: self.sig_action_step_finished.emit("Calibration file loaded.", "View", "view_calib_df") ################################################################################################################ # Calibrate pic_conduct_height # Absolute depth calibration for icbk_name in self.calib_df["icbk_name"].unique(): icbk_calib_df = self.calib_df.loc[self.calib_df["icbk_name"] == icbk_name] depth_interp_func = interpolate.interp1d(x=icbk_calib_df["melted_height"], y=icbk_calib_df["depth"]) try: pic_conduc_height_df.loc[pic_conduc_height_df["icbk_name"] == icbk_name, "depth"] \ = depth_interp_func(pic_conduc_height_df .loc[pic_conduc_height_df["icbk_name"] == icbk_name]["melted_height_icbk"]) except: print(icbk_name) # Picarro calibration if len(self.calib_df.groupby(["Delta_18_16_slope", "Delta_18_16_intercept", "Delta_D_H_slope", "Delta_D_H_intercept"]).count()) > 1: raise ValueError("More than one calibration set for isotope is not yet supported") isotopic_calib_df = self.calib_df.iloc[0] calib_pic_conduc_height_df = pic_conduc_height_df.copy() calib_pic_conduc_height_df["Delta_18_16_calib"] = \ calib_pic_conduc_height_df["Delta_18_16"] * isotopic_calib_df.Delta_18_16_slope\ + isotopic_calib_df.Delta_18_16_intercept calib_pic_conduc_height_df["Delta_D_H_calib"] = \ calib_pic_conduc_height_df["Delta_D_H"] * isotopic_calib_df.Delta_D_H_slope\ + isotopic_calib_df.Delta_D_H_intercept # Save as CSV self.pic_conduct_calibrated_filepath = os.path.join( calibrated_dir, dataset.dataset_name + "_pic_conduct_calibrated_" + calib_name + ".csv") calib_pic_conduc_height_df.to_csv(self.pic_conduct_calibrated_filepath, float_format='%.3f') self.sig_action_step_finished.emit( "Calibrated Picarro & Conducti saved as CSV in [" + self.pic_conduct_calibrated_filepath + "]", "Open", "open_calib_pic_conduc_height_df") ################################################################################################################ # Calibrate flasks calib_flask_df = self.flask_df.copy() for icbk_name in self.calib_df["icbk_name"].unique(): icbk_calib_df = self.calib_df.loc[self.calib_df["icbk_name"] == icbk_name] depth_interp_func = interpolate.interp1d(x=icbk_calib_df["melted_height"], y=icbk_calib_df["depth"]) calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name, "min_depth"] \ = depth_interp_func(calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name]["min"]) calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name, "max_depth"] \ = depth_interp_func(calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name]["max"]) calib_flask_df["diff_depth"] = calib_flask_df["max_depth"] - calib_flask_df["min_depth"] self.flask_calibrated_filepath = os.path.join(calibrated_dir, dataset.dataset_name + "_flask_calibrated_" + calib_name + ".csv") calib_flask_df.to_csv(self.flask_calibrated_filepath, float_format='%.2f', index=False) self.sig_action_step_finished.emit( "Calibrated Flask saved as CSV in [" + self.flask_calibrated_filepath + "]", "Open", "open_calib_flask_df") def __apply_special_corrections__(self) -> None: """Some datasets require some manual correction before being processed, due to errors during the recording""" if self.current_dataset.dataset_name == "20210428_ASUMA2016_8_19sq": t1 = pd.Timestamp("2021-04-28 11:02:50", tz="UTC") t2 = pd.Timestamp("2021-04-28 12:57:56", tz="UTC") t3 = pd.Timestamp("2021-04-28 13:20:28", tz="UTC") t4 = pd.Timestamp("2021-04-28 15:19:37", tz="UTC") self.encoder_df = pd.concat([self.encoder_df.loc[t1:t2], self.encoder_df.loc[t3:t4]]) self.sig_simple_step_finished.emit("Special correction applied on encoder_df.") if self.current_dataset.dataset_name in ["20210826_ASUMA2016_6-1_sq", "20210827_ASUMA2016_6_11sq"]: self.picarro_df.index = self.picarro_df.index - pd.Timedelta(115, unit="s") self.sig_simple_step_finished.emit("Special correction applied on picarro_df.") if self.current_dataset.dataset_name == "20220317_ASUMA2016_25_19sq": self.encoder_df = encoder.shift_stacking_event( self.encoder_df, old_peak_start=pd.Timestamp("2022-03-17 13:28:27.446578+00:00"), old_peak_end=pd.Timestamp("2022-03-17 13:28:41.721330+00:00"), shift=pd.Timedelta(seconds=60)) self.sig_simple_step_finished.emit("Special correction applied on encoder_df.") if self.current_dataset.dataset_name == "20220329_ASUMA2016_23_12sq": self.encoder_df = encoder.shift_stacking_event( self.encoder_df, old_peak_start=pd.Timestamp("2022-03-29 16:58:11.552587+00:00"), old_peak_end=pd.Timestamp("2022-03-29 16:58:48.006613+00:00"), shift=pd.Timedelta(seconds=80)) self.sig_simple_step_finished.emit("Special correction applied on encoder_df.") class Worker(QObject): """Worker class used to move long "analyse" computations to another thread, to avoid GUI freezing.""" finished = pyqtSignal() def __init__(self, controller: AnalyseController): super(Worker, self).__init__() self.ctrl = controller def run(self): self.ctrl.analyse() self.finished.emit()