Skip to content
Snippets Groups Projects
analysecontroller.py 17 KiB
Newer Older
import numpy as np
import pandas as pd
import matplotlib
import os
from scipy import interpolate

from PyQt5.QtCore import QObject, pyqtSignal

import cfatools.processor.flow as flow
import cfatools.processor.collector as collector
import cfatools.processor.encoder as encoder
from cfatools.logreader.dataset import DatasetReader

matplotlib.use("TkAgg")


class AnalyseController(QObject):

    sig_simple_step_finished = pyqtSignal(str, name="simple_step_finished")
    sig_action_step_finished = pyqtSignal(str, str, str, name="action_step_finished")

    def __init__(self):
        super(AnalyseController, self).__init__()

        self.current_dataset = None
        self.volume_dict = dict()
        self.encoder_df = pd.DataFrame()
        self.iceblock_df = pd.DataFrame()
        self.conduct_df = pd.DataFrame()
        self.pump_df = pd.DataFrame()
        self.picarro_df = pd.DataFrame()
        self.collector_df = pd.DataFrame()
        self.arrival_df_raw = pd.DataFrame()
        self.arrival_df = pd.DataFrame()
        self.arrival_filepath = ""
        self.flask_df = pd.DataFrame()
        self.flask_df_filepath = ""
        self.pic_conduc_height_filepath = ""
        self.calib_filepath = ""
        self.calib_df = pd.DataFrame()
        self.pic_conduct_calibrated_filepath = ""
        self.flask_calibrated_filepath = ""

    def get_dataset_reader(self, dataset_path: str) -> DatasetReader:
        root_directory = os.path.dirname(dataset_path)
        dataset_name = os.path.basename(dataset_path)
        valid, error_msg = DatasetReader.dataset_name_is_valid(dataset_name)
        if not valid:
            raise ValueError(error_msg)
        self.current_dataset = DatasetReader(base_path=root_directory, dataset=dataset_name)
        self.sig_simple_step_finished.emit("Dataset [" + self.current_dataset.dataset_name + "] loaded.")
        return self.current_dataset

    def analyse(self, override_arrival_pkl: bool = False):
        dataset = self.current_dataset
        (
            self.encoder_df,
            self.iceblock_df,
            self.conduct_df,
            self.pump_df,
            self.picarro_df,
            self.collector_df,
        ) = flow.get_datasets_data(dataset)

        self.__apply_special_corrections__()

        # Dataset's "processed" directory
        processed_dir = os.path.join(dataset.dataset_path, "processed")
        if not os.path.exists(processed_dir):
            os.mkdir(processed_dir)
            self.sig_simple_step_finished.emit("New 'processed' subdirectory created in " + dataset.dataset_path)

        self.collector_df = collector.renumber_flasks(self.collector_df)
        self.sig_action_step_finished.emit("Collector's flasks renumbered.", "View", "view_collector_df")

        ################################################################################################################
        # Tubing volumes
        self.volumes_dict = flow.get_tubing_volume_dict(
            "../config/tubing_volumes.csv", max_datetime=self.encoder_df.index[0]
        )
        self.sig_action_step_finished.emit("Volume table loaded.", "View", "view_volume_dict")

        ################################################################################################################
        # Arrival datetimes
        if override_arrival_pkl:
            pkl_filepath = os.path.join(dataset.dataset_path, "binary", "arrival_nearest.pkl")
            if os.path.exists(pkl_filepath):
                os.remove(pkl_filepath)
                self.sig_simple_step_finished.emit("Previous arrival_nearest.pkl file deleted.")
        self.arrival_df_raw = flow.get_arrival_df(
            self.encoder_df,
            self.pump_df,
            self.volumes_dict,
            parallel=False,
            dataset=dataset,
            pkl_name="arrival_nearest.pkl",
        )
        if override_arrival_pkl:
            self.sig_action_step_finished.emit("Arrival_df computed.", "View", "view_arrival_df")
        else:
            self.sig_action_step_finished.emit("Arrival_df loaded from binary file.", "View", "view_arrival_df_raw")

        self.arrival_df = self.arrival_df_raw.dropna()
        self.arrival_df = flow.add_iceblock_info(self.arrival_df, self.iceblock_df)
        self.arrival_df = flow.add_melted_height(self.arrival_df, self.encoder_df, self.iceblock_df)
        self.arrival_df = flow.add_flask_info(self.arrival_df, self.collector_df)
        self.sig_action_step_finished.emit(
            "Arrival_df augmented with iceblock, encoder and flask info.",
            "View",
            "view_arrival_df",
        )
        self.arrival_filepath = os.path.join(processed_dir, dataset.dataset_name + "_arrival_df.csv")
        self.arrival_df.to_csv(self.arrival_filepath, date_format="%Y-%m-%d %H:%M:%S")
        self.sig_action_step_finished.emit(
            "Arrival_df saved as CSV in [" + self.arrival_filepath + "]",
            "Open",
            "open_arrival_csv",
        )

        ################################################################################################################
        # Conducti rescaled by melting time
        conduct_rescaled_df = flow.get_conduct_by_melt_time(self.arrival_df, self.conduct_df, export_in_dataset=dataset)
        self.sig_simple_step_finished.emit("Conducti-R(escaled) created.")

        ################################################################################################################
        # Picarro rescaled by melting time
        if not self.picarro_df.index.is_monotonic_increasing:
            self.picarro_df.reset_index(inplace=True)
            self.picarro_df["diff_time"] = self.picarro_df["datetime"] - self.picarro_df["datetime"].shift(1)
            self.picarro_df = self.picarro_df.loc[
                self.picarro_df.index
                > self.picarro_df.loc[self.picarro_df["diff_time"].dt.total_seconds() < 0].index[0]
            ]
            self.picarro_df = self.picarro_df.set_index("datetime")
            self.picarro_df = self.picarro_df.drop(columns={"diff_time"})
        picarro_rescaled_df = flow.get_picarro_by_melt_time(self.arrival_df, self.picarro_df, export_in_dataset=dataset)
        self.sig_simple_step_finished.emit("Picarro-R(escaled) created.")

        ################################################################################################################
        # Flask
        self.flask_df = flow.get_flask_content(self.arrival_df)
        self.sig_action_step_finished.emit("Flasks' content computed.", "View", "view_flask_df")
        self.flask_df_filepath = os.path.join(processed_dir, dataset.dataset_name + "_flask_df.csv")
        self.flask_df.to_csv(self.flask_df_filepath, index=False)
        self.sig_action_step_finished.emit(
            "flask_df saved as CSV in [" + self.arrival_filepath + "]",
            "Open",
            "open_flask_csv",
        )
        # if not all(self.flask_df["flask"] == 0):
        #     self.flask_df["mm_diff"] = self.flask_df["max"] - self.flask_df["min"]
        #     mm_per_flask_df = self.flask_df[["flask", "mm_diff"]].groupby("flask").agg(sum)
        #     mm_per_flask_df.plot.bar()
        #     mm_per_flask_df.plot.hist(bins=round(len(mm_per_flask_df.index)/3))

        ################################################################################################################
        # Conducti

        ################################################################################################################
        # Picarro & conducti vs. melted height
        self.pic_conduc_height_filepath = os.path.join(processed_dir, dataset.dataset_name + "_pic_conduc_height.csv")
        pic_conduc_height_df = self.arrival_df[["icbk_code", "icbk_name", "melted_height", "melted_height_icbk"]]
        pic_conduc_height_df = pd.merge(pic_conduc_height_df, picarro_rescaled_df, left_index=True, right_index=True)
        pic_conduc_height_df = pd.merge(pic_conduc_height_df, conduct_rescaled_df, left_index=True, right_index=True)
        pic_conduc_height_df.to_csv(self.pic_conduc_height_filepath)
        self.sig_action_step_finished.emit(
            "pic_conduc_height_df saved as CSV in [" + self.pic_conduc_height_filepath + "]",
            "Open",
            "open_pic_conduc_height_df",
        )

        ################################################################################################################
        # Picarro
        # self.arrival_df = self.arrival_df[self.arrival_df["icbk_code"] > 0]
        # picarro2_df = pd.merge_asof(left=self.arrival_df[["icbk_code", "icbk_name", "melted_height", "picarro"]],
        #                             right=self.picarro_df,
        #                             left_on="picarro",
        #                             right_index=True)
        # picarro2_df["iceblock"] = picarro2_df["icbk_code"].astype(str) + '-' + picarro2_df["icbk_name"]

        # print(gg.ggplot()
        #       + gg.geom_line(data=picarro2_df,
        #                      mapping=gg.aes(x='picarro', y='Delta_18_16', color="iceblock"))
        #       )
        #
        # print(gg.ggplot()
        #       + gg.geom_path(data=picarro2_df,
        #                      mapping=gg.aes(x='Delta_18_16', y='melted_height', color="iceblock"))
        #       )

        ################################################################################################################
        # Calibrate
        calib_name = "cal1"
        calibrated_dir = os.path.join(dataset.dataset_path, "calibrated")
        if not os.path.exists(calibrated_dir):
            os.mkdir(calibrated_dir)
        self.calib_filepath = os.path.join(calibrated_dir, dataset.dataset_name + "_" + calib_name + ".csv")
        if not os.path.exists(self.calib_filepath):
            template_calib_df = self.iceblock_df[["id", "name", "initial_height"]].copy()
            template_calib_df = (
                template_calib_df.melt(["id", "name"]).sort_values(["id", "value"]).drop(columns=["id", "variable"])
            )
            template_calib_df = template_calib_df.rename(columns={"name": "icbk_name", "value": "melted_height"})
            template_calib_df[
                [
                    "depth",
                    "Delta_18_16_slope",
                    "Delta_18_16_intercept",
                    "Delta_D_H_slope",
                    "Delta_D_H_intercept",
                ]
            ] = [np.NaN, np.NaN, np.NaN, np.NaN, np.NaN]
            template_calib_df.to_csv(self.calib_filepath, index=False, float_format="%.2f", sep=";")
        self.calib_df = pd.read_csv(self.calib_filepath, sep=";")

        if all(self.calib_df["depth"].isna()):
            self.sig_action_step_finished.emit(
                "Calibration file is empty: [" + self.calib_filepath + "]",
                "Edit",
                "edit_calib_df",
            )
            self.sig_simple_step_finished.emit("Abort calibration.")
        else:
            self.sig_action_step_finished.emit("Calibration file loaded.", "View", "view_calib_df")

        ################################################################################################################
        # Calibrate pic_conduct_height
        # Absolute depth calibration
        for icbk_name in self.calib_df["icbk_name"].unique():
            icbk_calib_df = self.calib_df.loc[self.calib_df["icbk_name"] == icbk_name]
            depth_interp_func = interpolate.interp1d(x=icbk_calib_df["melted_height"], y=icbk_calib_df["depth"])
                pic_conduc_height_df.loc[pic_conduc_height_df["icbk_name"] == icbk_name, "depth"] = depth_interp_func(
                    pic_conduc_height_df.loc[pic_conduc_height_df["icbk_name"] == icbk_name]["melted_height_icbk"]
                )
        if (
            len(
                self.calib_df.groupby(
                    ["Delta_18_16_slope", "Delta_18_16_intercept", "Delta_D_H_slope", "Delta_D_H_intercept"]
                ).count()
            )
            > 1
        ):
            raise ValueError("More than one calibration set for isotope is not yet supported")

        isotopic_calib_df = self.calib_df.iloc[0]
        calib_pic_conduc_height_df = pic_conduc_height_df.copy()
        calib_pic_conduc_height_df["Delta_18_16_calib"] = (
            calib_pic_conduc_height_df["Delta_18_16"] * isotopic_calib_df.Delta_18_16_slope
        )
        calib_pic_conduc_height_df["Delta_D_H_calib"] = (
            calib_pic_conduc_height_df["Delta_D_H"] * isotopic_calib_df.Delta_D_H_slope
        self.pic_conduct_calibrated_filepath = os.path.join(
            calibrated_dir, dataset.dataset_name + "_pic_conduct_calibrated_" + calib_name + ".csv"
        )
        calib_pic_conduc_height_df.to_csv(self.pic_conduct_calibrated_filepath, float_format="%.3f")
        self.sig_action_step_finished.emit(
            "Calibrated Picarro & Conducti saved as CSV in [" + self.pic_conduct_calibrated_filepath + "]",
            "Open",
            "open_calib_pic_conduc_height_df",
        )

        ################################################################################################################
        # Calibrate flasks
        calib_flask_df = self.flask_df.copy()
        for icbk_name in self.calib_df["icbk_name"].unique():
            icbk_calib_df = self.calib_df.loc[self.calib_df["icbk_name"] == icbk_name]
            depth_interp_func = interpolate.interp1d(x=icbk_calib_df["melted_height"], y=icbk_calib_df["depth"])
            calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name, "min_depth"] = depth_interp_func(
                calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name]["min"]
            )
            calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name, "max_depth"] = depth_interp_func(
                calib_flask_df.loc[calib_flask_df["icbk_name"] == icbk_name]["max"]
            )
        calib_flask_df["diff_depth"] = calib_flask_df["max_depth"] - calib_flask_df["min_depth"]
        self.flask_calibrated_filepath = os.path.join(
            calibrated_dir, dataset.dataset_name + "_flask_calibrated_" + calib_name + ".csv"
        )
        calib_flask_df.to_csv(self.flask_calibrated_filepath, float_format="%.2f", index=False)
        self.sig_action_step_finished.emit(
            "Calibrated Flask saved as CSV in [" + self.flask_calibrated_filepath + "]", "Open", "open_calib_flask_df"
        )

    def __apply_special_corrections__(self) -> None:
        """Some datasets require some manual correction before being processed, due to errors during the recording"""
        if self.current_dataset.dataset_name == "20210428_ASUMA2016_8_19sq":
            t1 = pd.Timestamp("2021-04-28 11:02:50", tz="UTC")
            t2 = pd.Timestamp("2021-04-28 12:57:56", tz="UTC")
            t3 = pd.Timestamp("2021-04-28 13:20:28", tz="UTC")
            t4 = pd.Timestamp("2021-04-28 15:19:37", tz="UTC")
            self.encoder_df = pd.concat([self.encoder_df.loc[t1:t2], self.encoder_df.loc[t3:t4]])
            self.sig_simple_step_finished.emit("Special correction applied on encoder_df.")
        if self.current_dataset.dataset_name in ["20210826_ASUMA2016_6-1_sq", "20210827_ASUMA2016_6_11sq"]:
            self.picarro_df.index = self.picarro_df.index - pd.Timedelta(115, unit="s")
            self.sig_simple_step_finished.emit("Special correction applied on picarro_df.")
        if self.current_dataset.dataset_name == "20220317_ASUMA2016_25_19sq":
            self.encoder_df = encoder.shift_stacking_event(
                self.encoder_df,
                old_peak_start=pd.Timestamp("2022-03-17 13:28:27.446578+00:00"),
                old_peak_end=pd.Timestamp("2022-03-17 13:28:41.721330+00:00"),
                shift=pd.Timedelta(seconds=60),
            )
            self.sig_simple_step_finished.emit("Special correction applied on encoder_df.")
        if self.current_dataset.dataset_name == "20220329_ASUMA2016_23_12sq":
            self.encoder_df = encoder.shift_stacking_event(
                self.encoder_df,
                old_peak_start=pd.Timestamp("2022-03-29 16:58:11.552587+00:00"),
                old_peak_end=pd.Timestamp("2022-03-29 16:58:48.006613+00:00"),
                shift=pd.Timedelta(seconds=80),
            )
            self.sig_simple_step_finished.emit("Special correction applied on encoder_df.")
    def update_calib_csv(self, calib_df: pd.DataFrame):
        calib_df.to_csv(self.calib_filepath, index=False, float_format="%.5f", sep=";")


class Worker(QObject):
    """Worker class used to move long "analyse" computations to another thread, to avoid GUI freezing."""

    finished = pyqtSignal()

    def __init__(self, controller: AnalyseController):
        super(Worker, self).__init__()
        self.ctrl = controller

    def run(self):
        try:
            self.ctrl.analyse()
        except Exception as e:
            print(e)