import numpy as np import pandas as pd import matplotlib import os from scipy import interpolate from PyQt5.QtCore import QObject, pyqtSignal import cfatools.processor.flow as flow import cfatools.processor.collector as collector import cfatools.processor.encoder as encoder from cfatools.logreader.dataset import DatasetReader matplotlib.use('TkAgg') class AnalyseController(QObject): sig_simple_step_finished = pyqtSignal(str, name="simple_step_finished") def __init__(self): super(AnalyseController, self).__init__() self._current_dataset = None self._encoder_df = pd.DataFrame() self._iceblock_df = pd.DataFrame() self._conduct_df = pd.DataFrame() self._pump_df = pd.DataFrame() self._picarro_df = pd.DataFrame() self._collector_df = pd.DataFrame() def get_dataset_reader(self, dataset_path: str) -> DatasetReader: root_directory = os.path.dirname(dataset_path) dataset_name = os.path.basename(dataset_path) valid, error_msg = DatasetReader.dataset_name_is_valid(dataset_name) if not valid: raise ValueError(error_msg) self._current_dataset = DatasetReader( base_path=root_directory, dataset=dataset_name ) self.sig_simple_step_finished.emit("Dataset [" + self._current_dataset.dataset_name + "] loaded.") return self._current_dataset def analyse(self, override_arrival_pkl: bool = False): dataset = self._current_dataset self._encoder_df, self._iceblock_df, self._conduct_df, self._pump_df, self._picarro_df, self._collector_df = \ flow.get_datasets_data(dataset) self.__apply_special_corrections__() # Dataset's "processed" directory processed_dir = os.path.join(dataset.dataset_path, "processed") if not os.path.exists(processed_dir): os.mkdir(processed_dir) self.sig_simple_step_finished.emit("New 'processed' subdirectory created in " + dataset.dataset_path) self._collector_df = collector.renumber_flasks(self._collector_df) self.sig_simple_step_finished.emit("Collector's flasks renumbered.") ################################################################################################################ # Tubing volumes volumes_dict = flow.get_tubing_volume_dict("../config/tubing_volumes.csv", max_datetime=self._encoder_df.index[0]) ################################################################################################################ # Arrival datetimes if override_arrival_pkl: pkl_filepath = os.path.join(dataset.dataset_path, "binary", "arrival_nearest.pkl") if os.path.exists(pkl_filepath): os.remove(pkl_filepath) self.sig_simple_step_finished.emit("Previous arrival_nearest.pkl file deleted.") arrival_df = flow.get_arrival_df(self._encoder_df, self._pump_df, volumes_dict, parallel=False, dataset=dataset, pkl_name="arrival_nearest.pkl") self.sig_simple_step_finished.emit("Arrival_df computed.") arrival_df = arrival_df.dropna() arrival_df = flow.add_iceblock_info(arrival_df, self._iceblock_df) arrival_df = flow.add_melted_height(arrival_df, self._encoder_df, self._iceblock_df) arrival_df = flow.add_flask_info(arrival_df, self._collector_df) # Save as CSV arrival_filepath = os.path.join(processed_dir, dataset.dataset_name + "_arrival_df.csv") arrival_df.to_csv(arrival_filepath, date_format="%Y-%m-%d %H:%M:%S") self.sig_simple_step_finished.emit("Arrival_df saved as CSV in " + arrival_filepath) ################################################################################################################ # Conducti rescaled by melting time conduct_rescaled_df = flow.get_conduct_by_melt_time(arrival_df, self._conduct_df, export_in_dataset=dataset) ################################################################################################################ # Picarro rescaled by melting time if not self._picarro_df.index.is_monotonic_increasing: self._picarro_df.reset_index(inplace=True) self._picarro_df['diff_time'] = self._picarro_df["datetime"] - self._picarro_df["datetime"].shift(1) self._picarro_df = self._picarro_df.loc[ self._picarro_df.index > self._picarro_df.loc[self._picarro_df["diff_time"] .dt.total_seconds() < 0].index[0]] self._picarro_df = self._picarro_df.set_index("datetime") self._picarro_df = self._picarro_df.drop(columns={'diff_time'}) picarro_rescaled_df = flow.get_picarro_by_melt_time(arrival_df, self._picarro_df, export_in_dataset=dataset) ################################################################################################################ # Flask flask_df = flow.get_flask_content(arrival_df) # Save as CSV flask_df_filepath = os.path.join(processed_dir, dataset.dataset_name + "_flask_df.csv") flask_df.to_csv(flask_df_filepath, index=False) # Plot mm per flask if not all(flask_df["flask"] == 0): flask_df["mm_diff"] = flask_df["max"] - flask_df["min"] mm_per_flask_df = flask_df[["flask", "mm_diff"]].groupby("flask").agg(sum) mm_per_flask_df.plot.bar() mm_per_flask_df.plot.hist(bins=round(len(mm_per_flask_df.index)/3)) ################################################################################################################ # Conducti # self._conduct_df.plot() ################################################################################################################ # Picarro & conducti vs. melted height pic_conduc_height_filepath = os.path.join(processed_dir, dataset.dataset_name + "_pic_conduc_height.csv") pic_conduc_height_df = arrival_df[["icbk_code", "icbk_name", "melted_height", "melted_height_icbk"]] pic_conduc_height_df = pd.merge(pic_conduc_height_df, picarro_rescaled_df, left_index=True, right_index=True) pic_conduc_height_df = pd.merge(pic_conduc_height_df, conduct_rescaled_df, left_index=True, right_index=True) pic_conduc_height_df.to_csv(pic_conduc_height_filepath) ################################################################################################################ # Picarro arrival_df = arrival_df[arrival_df["icbk_code"] > 0] picarro2_df = pd.merge_asof(left=arrival_df[["icbk_code", "icbk_name", "melted_height", "picarro"]], right=self._picarro_df, left_on="picarro", right_index=True) picarro2_df["iceblock"] = picarro2_df["icbk_code"].astype(str) + '-' + picarro2_df["icbk_name"] # print(gg.ggplot() # + gg.geom_line(data=picarro2_df, # mapping=gg.aes(x='picarro', y='Delta_18_16', color="iceblock")) # ) # # print(gg.ggplot() # + gg.geom_path(data=picarro2_df, # mapping=gg.aes(x='Delta_18_16', y='melted_height', color="iceblock")) # ) ################################################################################################################ # Calibrate calib_name = "cal1" calibrated_dir = os.path.join(dataset.dataset_path, "calibrated") if not os.path.exists(calibrated_dir): os.mkdir(calibrated_dir) calib_filepath = os.path.join(calibrated_dir, dataset.dataset_name + "_" + calib_name + ".csv") if not os.path.exists(calib_filepath): template_calib_df = self._iceblock_df[["id", "name", "initial_height"]].copy() template_calib_df["start"] = 0 template_calib_df = template_calib_df\ .melt(["id", "name"])\ .sort_values(["id", "value"])\ .drop(columns=["id", "variable"]) template_calib_df = template_calib_df.rename(columns={"name": "icbk_name", "initial_height": "melted_height"}) template_calib_df[["depth", "Delta_18_16_slope", "Delta_18_16_intercept", "Delta_D_H_slope", "Delta_D_H_intercept"]] = [np.NaN, np.NaN, np.NaN, np.NaN, np.NaN] template_calib_df.to_csv(calib_filepath, index=False, float_format='%.2f') raise ValueError("Calib file not found: " + calib_filepath) calib_df = pd.read_csv(calib_filepath, sep=",") ################################################################################################################ # Calibrate pic_conduct_height # Absolute depth calibration for icbk_name in calib_df["icbk_name"].unique(): icbk_calib_df = calib_df.loc[calib_df["icbk_name"] == icbk_name] depth_interp_func = interpolate.interp1d(x=icbk_calib_df["melted_height"], y=icbk_calib_df["depth"]) try: pic_conduc_height_df.loc[pic_conduc_height_df["icbk_name"] == icbk_name, "depth"] \ = depth_interp_func(pic_conduc_height_df .loc[pic_conduc_height_df["icbk_name"] == icbk_name]["melted_height_icbk"]) except: print(icbk_name) # Picarro calibration if len(calib_df.groupby(["Delta_18_16_slope", "Delta_18_16_intercept", "Delta_D_H_slope", "Delta_D_H_intercept"]).count()) > 1: raise ValueError("More than one calibration set for isotope is not yet supported") isotopic_calib_df = calib_df.iloc[0] pic_conduc_height_df["Delta_18_16_calib"] = \ pic_conduc_height_df["Delta_18_16"] * isotopic_calib_df.Delta_18_16_slope\ + isotopic_calib_df.Delta_18_16_intercept pic_conduc_height_df["Delta_D_H_calib"] = \ pic_conduc_height_df["Delta_D_H"] * isotopic_calib_df.Delta_D_H_slope\ + isotopic_calib_df.Delta_D_H_intercept # Save as CSV pic_conduct_calibrated_filepath = os.path.join( calibrated_dir, dataset.dataset_name + "_pic_conduct_calibrated_" + calib_name + ".csv") pic_conduc_height_df.to_csv(pic_conduct_calibrated_filepath, float_format='%.3f') ################################################################################################################ # Calibrate flasks for icbk_name in calib_df["icbk_name"].unique(): icbk_calib_df = calib_df.loc[calib_df["icbk_name"] == icbk_name] depth_interp_func = interpolate.interp1d(x=icbk_calib_df["melted_height"], y=icbk_calib_df["depth"]) flask_df.loc[flask_df["icbk_name"] == icbk_name, "min_depth"] \ = depth_interp_func(flask_df.loc[flask_df["icbk_name"] == icbk_name]["min"]) flask_df.loc[flask_df["icbk_name"] == icbk_name, "max_depth"] \ = depth_interp_func(flask_df.loc[flask_df["icbk_name"] == icbk_name]["max"]) flask_df["diff_depth"] = flask_df["max_depth"] - flask_df["min_depth"] flask_calibrated_filepath = os.path.join(calibrated_dir, dataset.dataset_name + "_flask_calibrated_" + calib_name + ".csv") flask_df.to_csv(flask_calibrated_filepath, float_format='%.2f', index=False) def __apply_special_corrections__(self) -> None: """Some datasets require some manual correction before being processed, due to errors during the recording""" if self._current_dataset.dataset_name == "20210428_ASUMA2016_8_19sq": t1 = pd.Timestamp("2021-04-28 11:02:50", tz="UTC") t2 = pd.Timestamp("2021-04-28 12:57:56", tz="UTC") t3 = pd.Timestamp("2021-04-28 13:20:28", tz="UTC") t4 = pd.Timestamp("2021-04-28 15:19:37", tz="UTC") self._encoder_df = pd.concat([self._encoder_df.loc[t1:t2], self._encoder_df.loc[t3:t4]]) if self._current_dataset.dataset_name in ["20210826_ASUMA2016_6-1_sq", "20210827_ASUMA2016_6_11sq"]: self._picarro_df.index = self._picarro_df.index - pd.Timedelta(115, unit="s") if self._current_dataset.dataset_name == "20220317_ASUMA2016_25_19sq": self._encoder_df = encoder.shift_stacking_event( self._encoder_df, old_peak_start=pd.Timestamp("2022-03-17 13:28:27.446578+00:00"), old_peak_end=pd.Timestamp("2022-03-17 13:28:41.721330+00:00"), shift=pd.Timedelta(seconds=60)) if self._current_dataset.dataset_name == "20220329_ASUMA2016_23_12sq": self._encoder_df = encoder.shift_stacking_event( self._encoder_df, old_peak_start=pd.Timestamp("2022-03-29 16:58:11.552587+00:00"), old_peak_end=pd.Timestamp("2022-03-29 16:58:48.006613+00:00"), shift=pd.Timedelta(seconds=80))