Skip to content
Snippets Groups Projects
exploprovider.py 14.43 KiB
import pandas as pd
import datetime
import os
import re
import xmltodict
import xml.etree.cElementTree as ET
from io import StringIO

import utils
from dataprovider.picarroprovider import PicarroProvider


class ExploProvider:

    def __init__(self, picarro_prvd: PicarroProvider):
        self.datasets_root_directory = ""
        self.datasets = {}
        self.picarro_prvd = picarro_prvd

    def explore_root_directory(self, root_directory: str) -> list:
        """Get the names of the datasets directories.

        Parameters
        ----------
        root_directory: str
            Full path of the directory containing the datasets directories.

        Returns
        -------
        list
            List of dataset directories name (without full path)

        """
        directories = []

        # Find all directories in datasets root directory (not recursive)
        for element in os.listdir(root_directory):
            if os.path.isdir(root_directory + "/" + element):
                directories.append(element)

        # Keep only datasets directories (ignore others like pump_calibration, conduct_calib, old, etc.)
        regex = re.compile(r'[0-9]{8}_.*')
        dataset_directories = list(filter(regex.search, directories))

        # Sort list in alphabetical order (in this case, by ascending date)
        dataset_directories.sort()

        self.datasets_root_directory = root_directory
        self.datasets_dirs = dataset_directories
        for directory in dataset_directories:
            dataset = Dataset(root_directory, directory, self.picarro_prvd)
            self.datasets[directory] = dataset

        return dataset_directories


class Dataset:

    def __init__(self, root_directory: str, directory_name: str, picarro_prvd: PicarroProvider):
        self.root_directory = root_directory
        self.directory_name = directory_name
        self.full_directory_name = root_directory + "/" + directory_name

        self.picarro_prvd = picarro_prvd

        # Get dataset name
        self.dataset_text = directory_name[-9:]
        self.first_data_datetime = datetime.datetime.now(tz=datetime.timezone.utc)
        self.last_data_datetime = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)

        self.instlogs = {}
        self.manual_event_log = None

        # Setup save/load
        self.saved_setup_dir = self.full_directory_name + "/saved_setups/"
        self.saved_setup_ext = ".xml"

        self.explore_dataset()

    def explore_dataset(self) -> None:
        filenames = os.listdir(self.full_directory_name)

        for filename in filenames:
            try:
                inst_and_type = re.search("^" + self.directory_name + '_(.+?).log$', filename).group(1)
            except AttributeError:
                # The found file does not match normal instrument's log file pattern
                print("File [" + filename + "] does not appear to be a valid CFA log file")
                continue

            instrument_name = inst_and_type.split("_")[0]

            if len(inst_and_type.split("_")) == 2:
                log_type = inst_and_type.split("_")[1]
                if log_type == "instant":
                    if instrument_name == "ICBKCTRL":
                        instrument_log = IceblockInstantLog(self.full_directory_name, filename, instrument_name)
                    else:
                        instrument_log = InstrumentInstantLog(self.full_directory_name, filename, instrument_name)
                elif log_type == "periodic":
                    instrument_log = InstrumentPeriodicLog(self.full_directory_name, filename, instrument_name)
                    self.first_data_datetime = min(self.first_data_datetime, instrument_log.df["datetime"].min())
                    self.last_data_datetime = max(self.last_data_datetime, instrument_log.df["datetime"].max())
                else:
                    raise ValueError("Unknown log type: [" + log_type + "]")
                self.instlogs[inst_and_type] = instrument_log
            elif instrument_name == "manual-event":
                self.manual_event_log = ManualEventLog(self.full_directory_name, filename, instrument_name)

        # Picarro data are not logged the same way as the others, it is logged directly in the Picarro instrument.
        # In order to have comparable data files, create "artificial" PICARRO_periodic log file from the Picarro log
        # files.
        picarro_filename = self.directory_name + "_PICARRO_periodic.log"
        if picarro_filename not in filenames:
            try:
                picarro_df = self.picarro_prvd.get_df(self.first_data_datetime,
                                                      self.last_data_datetime,
                                                      ["H2O", "Delta_D_H", "Delta_18_16"])
            except ValueError as e:
                print("Failed to get Picarro data: " + str(e))
                return

            picarro_df.to_csv(path_or_buf=self.full_directory_name + "/" + picarro_filename,
                              sep="\t",
                              index=False,
                              mode='w',  # Always override file content
                              date_format=utils.datetime_format
                              )
            picarro_log = InstrumentPeriodicLog(self.full_directory_name, picarro_filename, "PICARRO")
            self.instlogs["PICARRO_periodic"] = picarro_log

    def save_setup(self, setup_name: str, variable_df: pd.DataFrame, view_range: list) -> None:
        # Build 'saved setup' full file name
        if not os.path.exists(self.saved_setup_dir):
            os.mkdir(self.saved_setup_dir)
        filename = self.saved_setup_dir + setup_name + self.saved_setup_ext

        # Variables table
        variables_str = variable_df.to_csv(sep=";",
                                           index=False,
                                           mode='w')

        # Create XML file
        root_elmt = ET.Element("save")
        ET.SubElement(root_elmt, "variables").text = variables_str
        view_range_elmt = ET.SubElement(root_elmt, "view_range")
        ET.SubElement(view_range_elmt, "xmin").text = "{:.2f}".format(view_range[0][0])
        ET.SubElement(view_range_elmt, "xmax").text = "{:.2f}".format(view_range[0][1])
        ET.SubElement(view_range_elmt, "ymin").text = "{:.4f}".format(view_range[1][0])
        ET.SubElement(view_range_elmt, "ymax").text = "{:.4f}".format(view_range[1][1])

        tree = ET.ElementTree(root_elmt)
        tree.write(filename)

    def load_setup(self, filename: str) -> tuple:
        full_filename = self.saved_setup_dir + filename + self.saved_setup_ext

        # Open XML file
        tree = ET.parse(full_filename)
        root = tree.getroot()

        # Variable CSV table as pd.Dataframe
        variables_str = root.findall("variables")[0].text
        variable_io = StringIO(variables_str)
        variable_df = pd.read_csv(variable_io, sep=";")

        # View range
        view_range_elmt = root.findall("view_range")[0]
        view_range_dict = {"xmin": float(view_range_elmt.findall("xmin")[0].text),
                           "xmax": float(view_range_elmt.findall("xmax")[0].text),
                           "ymin": float(view_range_elmt.findall("ymin")[0].text),
                           "ymax": float(view_range_elmt.findall("ymax")[0].text)}

        return variable_df, view_range_dict

    def setup_filename_is_valid(self, filename: str) -> tuple:
        """Check if the file name is valid: no special characters, file does not already exists.

        Parameters
        ----------
        filename: str
            filename (without extension) to be tested.

        Returns
        -------
        bool:
            True if the file name is valid, False otherwise
        str:
            The error message explaining why the file name is not valid ; an empty string if file name is valid.
        """
        if not re.match("^[A-Za-z0-9_-]*$", filename):
            error_msg = "File name can only contain letters, digits and '-' or '_'. File extension is automatically set."
            return False, error_msg
        elif filename in self.get_setup_saved_files():
            error_msg = "File already exists."
            return False, error_msg
        else:
            return True, ""

    def get_setup_saved_files(self) -> list:
        """Get a list of the 'setup' file names (without extension) existing in the 'saved_setups' directory."""
        if not os.path.exists(self.saved_setup_dir):
            return []

        filenames = os.listdir(self.saved_setup_dir)

        files_without_ext = [os.path.splitext(filename)[0] for filename in filenames]
        return files_without_ext


class InstrumentLog:

    def __init__(self, full_directory_name: str, filename: str, instrument_name: str):
        self.full_directory_name = full_directory_name
        self.filename = filename
        self.full_file_name = full_directory_name + "/" + filename
        self.instrument_name = instrument_name

        self.df = self.__get_df__()

    def get_variables(self):
        raise NotImplementedError("Subclasses should implement this.")

    def get_timeseries(self, variable: str) -> pd.DataFrame:
        raise NotImplementedError("Subclasses should implement this.")

    def __get_df__(self) -> pd.DataFrame:
        raise NotImplementedError("Subclasses should implement this.")


class InstrumentInstantLog(InstrumentLog):

    def __init__(self, full_directory_name: str, filename: str, instrument_name: str):
        InstrumentLog.__init__(self, full_directory_name, filename, instrument_name)

    def __get_df__(self) -> pd.DataFrame:
        df = pd.read_csv(self.full_file_name, sep=",", parse_dates=["datetime"])
        df["datetime"] = df["datetime"].dt.tz_localize('UTC')
        return df

    def get_variables(self):
        return self.df.name.unique()

    def get_timeseries(self, variable: str) -> pd.DataFrame:
        timeseries_df = self.df[self.df["name"] == variable]
        timeseries_df = timeseries_df.drop(columns=['name'])

        try:
            timeseries_df["value"] = timeseries_df["value"].astype(float)
        except ValueError:
            timeseries_df["value_int"] = timeseries_df["value"].astype("category").cat.codes
        return timeseries_df


class IceblockInstantLog(InstrumentLog):

    def __init__(self, full_directory_name: str, filename: str, instrument_name: str):
        InstrumentLog.__init__(self, full_directory_name, filename, instrument_name)

    def __get_df__(self) -> pd.DataFrame:
        df = pd.read_csv(self.full_file_name, sep=",", parse_dates=["datetime"])
        df["datetime"] = df["datetime"].dt.tz_localize('UTC')
        return df

    def get_variables(self):
        return ["melting"]

    def get_timeseries(self, variable: str) -> pd.DataFrame:
        if variable == "melting":
            timeseries_df = self.__get_melting_timeseries__()
        else:
            raise ValueError("Variable name [" + variable + "] not yet managed.")

        return timeseries_df

    def __get_melting_timeseries__(self) -> pd.DataFrame:
        # Get the mapping between iceblock id and iceblock name (assuming that the last name's modification is the
        # good one.
        mapping_df = self.df[["datetime", "id", "name"]].copy()
        mapping_df = mapping_df.groupby("id")["id", "name"].tail(1)
        mapping_df = mapping_df.append({"id": 0, "name": "None"}, ignore_index=True)
        mapping_df = mapping_df.set_index("id")
        mapping_dict = mapping_df["name"].to_dict()

        # Get the datetime of the beginning of each iceblock's melting
        melting_df = self.df[["datetime", "id", "status"]].copy()
        start_df = melting_df[melting_df["status"] == "Melting"].groupby("id")["datetime", "id"].head(1)

        # Get the end of the last iceblock's melting, and set that after that the current melting block is 0/None.
        end_df = melting_df[melting_df["status"] == "Done"].groupby("id").head(1)
        melting_df = start_df.append({"datetime": end_df.iloc[-1]["datetime"], "id": 0},
                                     ignore_index=True)

        # Get the value (iceblocks name) and value_int (coded value, iceblock id in this case).
        melting_df.rename(columns={"id": 'value_int'}, inplace=True)
        melting_df["value"] = melting_df["value_int"].map(mapping_dict)

        return melting_df


class ManualEventLog(InstrumentLog):

    def __init__(self, full_directory_name: str, filename: str, instrument_name: str):
        InstrumentLog.__init__(self, full_directory_name, filename, instrument_name)

    def __get_df__(self) -> pd.DataFrame:
        # The manual-event log file is not a valid XML file: the root tage is missing. So open the content of the file,
        # and add the root tags
        with open(self.full_file_name) as f:
            xml_str = f.read()
        xml_str = "<root>" + xml_str + "</root>"

        # Convert the XML to dict, then convert the dict to pd.Dataframe
        xml_dict = xmltodict.parse(xml_str)
        if "datetime" in xml_dict["root"]["event"]:  # Only 1 event -> one less level in dict tree
            df = pd.DataFrame([xml_dict["root"]["event"]])
        else:
            df = pd.DataFrame.from_dict(xml_dict["root"]["event"])

        # Rename "description" column
        df.rename(columns={"description": 'event'}, inplace=True)

        # Format datetime column.
        df["datetime"] = pd.to_datetime(df["datetime"]).dt.tz_localize('UTC')
        return df

    def get_variables(self):
        return ["event"]

    def get_timeseries(self, variable: str) -> pd.DataFrame:
        timeseries_df = self.df[["datetime", variable]]
        timeseries_df.rename(columns={variable: 'value'}, inplace=True)
        return timeseries_df


class InstrumentPeriodicLog(InstrumentLog):

    def __init__(self, full_directory_name: str, filename: str, instrument_name: str):
        InstrumentLog.__init__(self, full_directory_name, filename, instrument_name)

    def __get_df__(self) -> pd.DataFrame:
        df = pd.read_csv(self.full_file_name, sep="\t", parse_dates=["datetime"])
        if not df.empty:
            df["datetime"] = df["datetime"].dt.tz_localize('UTC')
        return df

    def get_variables(self):
        all_cols = list(self.df)
        variable_cols = [colname for colname in all_cols if colname != "datetime"]
        return variable_cols

    def get_timeseries(self, variable: str) -> pd.DataFrame:
        timeseries_df = self.df[["datetime", variable]]
        timeseries_df.rename(columns={variable: 'value'}, inplace=True)
        return timeseries_df