Skip to content
Snippets Groups Projects
picarrocorrectionuim.py 12.04 KiB
import datetime
import os
import pyqtgraph as pg
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import QColor
import pandas as pd
import numpy as np
import statsmodels.api as sm
from math import atan, degrees

import utils
from dataprovider.picarroprovider import PicarroProvider
from gui.uimainwindow import Ui_MainWindow
from uim.picarrocalibuim import PicarroCalibUim


class PicarroCorrectionUim:
    """Class managing the 'Injections identification' sub-tab of the 'Picarro calibration' tab."""

    # "Variables" tableWidget columns identifiers.
    DATETIME_COL = 0
    STANDARD_COL = 1
    H2O_COL = 2
    D18O_COL = 3
    D18OSD_COL = 4
    D2H_COL = 5
    D2HSD_COL = 6
    KEEP_COL = 7

    STD_COLORS = [QColor(228, 26, 28),  # Red
                  QColor(55, 126, 184),  # Blue
                  QColor(77, 175, 74),  # Green
                  QColor(152, 78, 163),  # Violet
                  QColor(255, 127, 0),  # Orange
                  QColor(255, 255, 51),  # Yellow
                  QColor(166, 86, 40),  # Brown
                  QColor(247, 129, 191)]  # Pink

    def __init__(self, picarrocalib_uim: PicarroCalibUim):
        self.main_ui = picarrocalib_uim.main_ui

        self.__init_tablewidget__()
        self.standard_colors = dict()

        self.picarro_prvd = None
        picarrocalib_uim.sig_dataset_loaded.connect(self.update_dataset)

        self.calibs_df = pd.DataFrame()
        picarrocalib_uim.sig_calibs_loaded.connect(self.__load_calibrations__)

        self.stdtrue_df = picarrocalib_uim.stdtrue_df
        picarrocalib_uim.sig_stdtrue_loaded.connect(lambda stdtrue_df: setattr(self, "stdtrue_df", stdtrue_df))

        # Init calibrations_file
        calibrations_file = self.main_ui.picarro_lineedit_calib_filename.text()
        if calibrations_file != "":
            picarrocalib_uim.read_calibrations_file(calibrations_file)

    def update_dataset(self, picarro_prvd: PicarroProvider):
        self.picarro_prvd = picarro_prvd
        self.update_subset_calib_df()

    def __init_colors__(self, calibs_df: pd.DataFrame):
        colors = dict()
        standard_names = calibs_df["standard"].unique()
        for i in range(len(standard_names)):
            colors[standard_names[i]] = self.STD_COLORS[i % len(self.STD_COLORS)]
        self.standard_colors = colors

    def __load_calibrations__(self, calibs_df: pd.DataFrame):
        if len(calibs_df.index) == 0:
            return

        calibs_df["keep"] = True

        self.calibs_df = calibs_df
        self.__init_colors__(self.calibs_df)
        self.update_subset_calib_df()

    ####################################################################
    # Calibrations' table

    def __init_tablewidget__(self):
        table = self.main_ui.picarro_tablewidget_calibrations

        table.verticalHeader().setStyleSheet("QHeaderView { font-size: 10pt; }")
        table.horizontalHeader().setStyleSheet("QHeaderView { font-size: 10pt; }")

        table.verticalHeader().setFixedWidth(30)  # Width of the Row header
        table.setColumnWidth(self.DATETIME_COL, 150)
        table.setColumnWidth(self.STANDARD_COL, 70)
        table.setColumnWidth(self.H2O_COL, 70)
        table.setColumnWidth(self.D18O_COL, 70)
        table.setColumnWidth(self.D18OSD_COL, 60)
        table.setColumnWidth(self.D2H_COL, 70)
        table.setColumnWidth(self.D2HSD_COL, 60)
        table.setColumnWidth(self.KEEP_COL, 50)

        table.horizontalHeader().setFixedHeight(24)  # Height of the Column header

    def update_subset_calib_df(self):
        if not isinstance(self.picarro_prvd, PicarroProvider):
            return
        if len(self.calibs_df.index) == 0:
            return

        min_date = self.picarro_prvd.picarro_df.index.min() - datetime.timedelta(days=5)
        max_date = self.picarro_prvd.picarro_df.index.max() + datetime.timedelta(days=5)
        subset_calibs_df = self.calibs_df[(self.calibs_df["valid_start"] > min_date) &
                                          (self.calibs_df["valid_end"] < max_date)]

        self.subset_calibs_df = subset_calibs_df.reset_index(drop=True)

        self.update_table(self.subset_calibs_df)
        self.__update_calibs_timeseries_plot__()
        self.__update_calibs_equation_plot__()

    def update_table(self, subset_calibs_df: pd.DataFrame):
        # Reset Standards' colors
        self.__init_colors__(subset_calibs_df)

        # Clear previous table's content
        self.main_ui.picarro_tablewidget_calibrations.clearContents()
        self.main_ui.picarro_tablewidget_calibrations.model().removeRows(
            0, self.main_ui.picarro_tablewidget_calibrations.rowCount())

        # Add rows for the new subset
        for index, row in subset_calibs_df.iterrows():
            self.__add_row__(index, row)

    def __add_row__(self, row_id: int, row_content: pd.Series):
        table = self.main_ui.picarro_tablewidget_calibrations

        table.insertRow(row_id)

        # Datetime
        datetime_item = QDateTimeEdit(row_content["valid_start"])
        datetime_item.setDisplayFormat("dd/MM/yyyy HH:mm")
        datetime_item.setReadOnly(True)
        table.setCellWidget(row_id, self.DATETIME_COL, datetime_item)

        # Standard name
        standard_item = QTableWidgetItem()
        standard_item.setFlags(Qt.ItemIsEnabled)  # Read only
        standard_item.setBackground(self.standard_colors[row_content["standard"]])
        standard_item.setText(row_content["standard"])
        table.setItem(row_id, self.STANDARD_COL, standard_item)

        # Injections' values
        table.setItem(row_id, self.H2O_COL, self.__get_number_item__(row_content["H2O_mean"]))
        table.setItem(row_id, self.D18O_COL, self.__get_number_item__(row_content["Delta_18_16_mean"]))
        table.setItem(row_id, self.D18OSD_COL, self.__get_number_item__(row_content["Delta_18_16_std"]))
        table.setItem(row_id, self.D2H_COL, self.__get_number_item__(row_content["Delta_D_H_mean"]))
        table.setItem(row_id, self.D2HSD_COL, self.__get_number_item__(row_content["Delta_D_H_std"]))

        # Keep
        keep_item = QCheckBox()
        keep_item.setChecked(row_content["keep"])
        table.setCellWidget(row_id, self.KEEP_COL, keep_item)
        table.cellWidget(row_id, self.KEEP_COL).stateChanged.connect(
            lambda state, row_id=row_id: self.__apply_keep_state__(row_id, state))

    def __get_number_item__(self, value: float) -> QTableWidgetItem:
        number_item = QTableWidgetItem()
        number_item.setFlags(Qt.ItemIsEnabled)  # Read only
        number_item.setText("{:.2f}".format(value))
        return number_item

    def __apply_keep_state__(self, row_id: int, state: int):
        keep = bool(state)
        self.subset_calibs_df.at[row_id, 'keep'] = keep

        self.__update_calibs_timeseries_plot__()
        self.__update_calibs_equation_plot__()

    ####################################################################
    # Calibrations' timeseries plot

    def __update_calibs_timeseries_plot__(self):
        # Create a layout
        layout = pg.GraphicsLayout()
        layout.layout.setSpacing(0.)
        layout.setContentsMargins(0., 0., 0., 0.)

        # Add it to the graphics view
        self.main_ui.picarro_graphicsview_time.setCentralItem(layout)

        # Create plot items and add them to the layout
        plot_delta_18_16 = self.__get_timeseries_plot_item__(layout, 0, "Delta_18_16")
        plot_delta_D_H = self.__get_timeseries_plot_item__(layout, 1, "Delta_D_H")
        plot_delta_D_H.setXLink(plot_delta_18_16)

    def __get_timeseries_plot_item__(self, layout: pg.GraphicsLayout, rank: int, species: str):
        plot_item = layout.addPlot(rank, 0, axisItems={'bottom': utils.TimeAxisItem(orientation='bottom')})

        # Add runs points
        for standard in self.subset_calibs_df["standard"].unique():
            standard_df = self.subset_calibs_df[(self.subset_calibs_df["standard"] == standard)
                                                & (self.subset_calibs_df["keep"])]
            centered_mean = standard_df[species + "_mean"] - standard_df[species + "_mean"].mean()
            run_points = pg.ScatterPlotItem(x=utils.pd_time_to_epoch_ms(standard_df["valid_start"]),
                                            y=centered_mean,
                                            size=6,
                                            brush=self.standard_colors[standard],
                                            name=standard)
            plot_item.addItem(run_points)

            err = pg.ErrorBarItem(x=np.array(utils.pd_time_to_epoch_ms(standard_df["valid_start"])),
                                  y=np.array(centered_mean),
                                  height=np.array(standard_df[species + "_std"]) * 2,
                                  beam=60*5,  # 5 minutes
                                  pen=self.standard_colors[standard])
            plot_item.addItem(err)

        plot_item.setLabel('left', species)
        return plot_item

    ####################################################################
    # Calibrations' correction line plot

    def __update_calibs_equation_plot__(self):
        # Create a layout
        layout = pg.GraphicsLayout()
        layout.layout.setSpacing(0.)
        layout.setContentsMargins(0., 0., 0., 0.)

        # Add it to the graphics view
        self.main_ui.picarro_graphicsview_equation.setCentralItem(layout)

        # Create plot items and add them to the layout
        plot_delta_18_16 = self.__get_equation_plot_item__(layout, 0, "Delta_18_16")
        plot_delta_D_H = self.__get_equation_plot_item__(layout, 1, "Delta_D_H")

    def __get_equation_plot_item__(self, layout: pg.GraphicsLayout, rank: int, species: str):
        plot_item = layout.addPlot(rank, 0)

        # Add runs points
        for standard in self.subset_calibs_df["standard"].unique():

            standard_df = self.subset_calibs_df[(self.subset_calibs_df["standard"] == standard)
                                                & (self.subset_calibs_df["keep"])]

            # Get the standard's true value
            stdtrue_df = self.stdtrue_df[self.stdtrue_df["name"] == standard]
            stdtrue_mean = stdtrue_df[species + "_mean"].to_list() * len(standard_df.index)
            stdtrue_std = stdtrue_df[species + "_std"].to_list() * len(standard_df.index)

            run_points = pg.ScatterPlotItem(x=stdtrue_mean,
                                            y=standard_df[species + "_mean"],
                                            size=6,
                                            brush=self.standard_colors[standard],
                                            name=standard)
            plot_item.addItem(run_points)

            err = pg.ErrorBarItem(x=np.array(stdtrue_mean),
                                  y=np.array(standard_df[species + "_mean"]),
                                  width=np.array(stdtrue_std) * 2,
                                  height=np.array(standard_df[species + "_std"]) * 2,
                                  pen=self.standard_colors[standard])
            plot_item.addItem(err)

        slope, intercept = self.get_linear_model(species,
                                                 self.subset_calibs_df[self.subset_calibs_df["keep"]],
                                                 self.stdtrue_df)
        reg_line = pg.InfiniteLine(pos=QPointF(0, intercept),
                                   angle=degrees(atan(slope)))
        plot_item.addItem(reg_line)

        plot_item.setLabel('left', species)
        return plot_item

    def get_linear_model(self, species: str, calibs_df: pd.DataFrame, truevalue_df: pd.DataFrame) -> tuple:
        true = []
        meas = []
        for standard in self.subset_calibs_df["standard"].unique():
            std_meas = calibs_df.loc[calibs_df["standard"] == standard, species + "_mean"].to_list()
            std_true = truevalue_df.loc[truevalue_df["name"] == standard, species + "_mean"].to_list() * len(std_meas)
            meas += std_meas
            true += std_true

        model = sm.OLS(meas, sm.tools.add_constant(true)).fit()

        intercept = model.params[0]
        slope = model.params[1]

        return slope, intercept