-
JOSSOUD Olivier authoredJOSSOUD Olivier authored
exploprovider.py 14.43 KiB
import pandas as pd
import datetime
import os
import re
import xmltodict
import xml.etree.cElementTree as ET
from io import StringIO
import utils
from dataprovider.picarroprovider import PicarroProvider
class ExploProvider:
def __init__(self, picarro_prvd: PicarroProvider):
self.datasets_root_directory = ""
self.datasets = {}
self.picarro_prvd = picarro_prvd
def explore_root_directory(self, root_directory: str) -> list:
"""Get the names of the datasets directories.
Parameters
----------
root_directory: str
Full path of the directory containing the datasets directories.
Returns
-------
list
List of dataset directories name (without full path)
"""
directories = []
# Find all directories in datasets root directory (not recursive)
for element in os.listdir(root_directory):
if os.path.isdir(root_directory + "/" + element):
directories.append(element)
# Keep only datasets directories (ignore others like pump_calibration, conduct_calib, old, etc.)
regex = re.compile(r'[0-9]{8}_.*')
dataset_directories = list(filter(regex.search, directories))
# Sort list in alphabetical order (in this case, by ascending date)
dataset_directories.sort()
self.datasets_root_directory = root_directory
self.datasets_dirs = dataset_directories
for directory in dataset_directories:
dataset = Dataset(root_directory, directory, self.picarro_prvd)
self.datasets[directory] = dataset
return dataset_directories
class Dataset:
def __init__(self, root_directory: str, directory_name: str, picarro_prvd: PicarroProvider):
self.root_directory = root_directory
self.directory_name = directory_name
self.full_directory_name = root_directory + "/" + directory_name
self.picarro_prvd = picarro_prvd
# Get dataset name
self.dataset_text = directory_name[-9:]
self.first_data_datetime = datetime.datetime.now(tz=datetime.timezone.utc)
self.last_data_datetime = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
self.instlogs = {}
self.manual_event_log = None
# Setup save/load
self.saved_setup_dir = self.full_directory_name + "/saved_setups/"
self.saved_setup_ext = ".xml"
self.explore_dataset()
def explore_dataset(self) -> None:
filenames = os.listdir(self.full_directory_name)
for filename in filenames:
try:
inst_and_type = re.search("^" + self.directory_name + '_(.+?).log$', filename).group(1)
except AttributeError:
# The found file does not match normal instrument's log file pattern
print("File [" + filename + "] does not appear to be a valid CFA log file")
continue
instrument_name = inst_and_type.split("_")[0]
if len(inst_and_type.split("_")) == 2:
log_type = inst_and_type.split("_")[1]
if log_type == "instant":
if instrument_name == "ICBKCTRL":
instrument_log = IceblockInstantLog(self.full_directory_name, filename, instrument_name)
else:
instrument_log = InstrumentInstantLog(self.full_directory_name, filename, instrument_name)
elif log_type == "periodic":
instrument_log = InstrumentPeriodicLog(self.full_directory_name, filename, instrument_name)
self.first_data_datetime = min(self.first_data_datetime, instrument_log.df["datetime"].min())
self.last_data_datetime = max(self.last_data_datetime, instrument_log.df["datetime"].max())
else:
raise ValueError("Unknown log type: [" + log_type + "]")
self.instlogs[inst_and_type] = instrument_log
elif instrument_name == "manual-event":
self.manual_event_log = ManualEventLog(self.full_directory_name, filename, instrument_name)
# Picarro data are not logged the same way as the others, it is logged directly in the Picarro instrument.
# In order to have comparable data files, create "artificial" PICARRO_periodic log file from the Picarro log
# files.
picarro_filename = self.directory_name + "_PICARRO_periodic.log"
if picarro_filename not in filenames:
try:
picarro_df = self.picarro_prvd.get_df(self.first_data_datetime,
self.last_data_datetime,
["H2O", "Delta_D_H", "Delta_18_16"])
except ValueError as e:
print("Failed to get Picarro data: " + str(e))
return
picarro_df.to_csv(path_or_buf=self.full_directory_name + "/" + picarro_filename,
sep="\t",
index=False,
mode='w', # Always override file content
date_format=utils.datetime_format
)
picarro_log = InstrumentPeriodicLog(self.full_directory_name, picarro_filename, "PICARRO")
self.instlogs["PICARRO_periodic"] = picarro_log
def save_setup(self, setup_name: str, variable_df: pd.DataFrame, view_range: list) -> None:
# Build 'saved setup' full file name
if not os.path.exists(self.saved_setup_dir):
os.mkdir(self.saved_setup_dir)
filename = self.saved_setup_dir + setup_name + self.saved_setup_ext
# Variables table
variables_str = variable_df.to_csv(sep=";",
index=False,
mode='w')
# Create XML file
root_elmt = ET.Element("save")
ET.SubElement(root_elmt, "variables").text = variables_str
view_range_elmt = ET.SubElement(root_elmt, "view_range")
ET.SubElement(view_range_elmt, "xmin").text = "{:.2f}".format(view_range[0][0])
ET.SubElement(view_range_elmt, "xmax").text = "{:.2f}".format(view_range[0][1])
ET.SubElement(view_range_elmt, "ymin").text = "{:.4f}".format(view_range[1][0])
ET.SubElement(view_range_elmt, "ymax").text = "{:.4f}".format(view_range[1][1])
tree = ET.ElementTree(root_elmt)
tree.write(filename)
def load_setup(self, filename: str) -> tuple:
full_filename = self.saved_setup_dir + filename + self.saved_setup_ext
# Open XML file
tree = ET.parse(full_filename)
root = tree.getroot()
# Variable CSV table as pd.Dataframe
variables_str = root.findall("variables")[0].text
variable_io = StringIO(variables_str)
variable_df = pd.read_csv(variable_io, sep=";")
# View range
view_range_elmt = root.findall("view_range")[0]
view_range_dict = {"xmin": float(view_range_elmt.findall("xmin")[0].text),
"xmax": float(view_range_elmt.findall("xmax")[0].text),
"ymin": float(view_range_elmt.findall("ymin")[0].text),
"ymax": float(view_range_elmt.findall("ymax")[0].text)}
return variable_df, view_range_dict
def setup_filename_is_valid(self, filename: str) -> tuple:
"""Check if the file name is valid: no special characters, file does not already exists.
Parameters
----------
filename: str
filename (without extension) to be tested.
Returns
-------
bool:
True if the file name is valid, False otherwise
str:
The error message explaining why the file name is not valid ; an empty string if file name is valid.
"""
if not re.match("^[A-Za-z0-9_-]*$", filename):
error_msg = "File name can only contain letters, digits and '-' or '_'. File extension is automatically set."
return False, error_msg
elif filename in self.get_setup_saved_files():
error_msg = "File already exists."
return False, error_msg
else:
return True, ""
def get_setup_saved_files(self) -> list:
"""Get a list of the 'setup' file names (without extension) existing in the 'saved_setups' directory."""
if not os.path.exists(self.saved_setup_dir):
return []
filenames = os.listdir(self.saved_setup_dir)
files_without_ext = [os.path.splitext(filename)[0] for filename in filenames]
return files_without_ext
class InstrumentLog:
def __init__(self, full_directory_name: str, filename: str, instrument_name: str):
self.full_directory_name = full_directory_name
self.filename = filename
self.full_file_name = full_directory_name + "/" + filename
self.instrument_name = instrument_name
self.df = self.__get_df__()
def get_variables(self):
raise NotImplementedError("Subclasses should implement this.")
def get_timeseries(self, variable: str) -> pd.DataFrame:
raise NotImplementedError("Subclasses should implement this.")
def __get_df__(self) -> pd.DataFrame:
raise NotImplementedError("Subclasses should implement this.")
class InstrumentInstantLog(InstrumentLog):
def __init__(self, full_directory_name: str, filename: str, instrument_name: str):
InstrumentLog.__init__(self, full_directory_name, filename, instrument_name)
def __get_df__(self) -> pd.DataFrame:
df = pd.read_csv(self.full_file_name, sep=",", parse_dates=["datetime"])
df["datetime"] = df["datetime"].dt.tz_localize('UTC')
return df
def get_variables(self):
return self.df.name.unique()
def get_timeseries(self, variable: str) -> pd.DataFrame:
timeseries_df = self.df[self.df["name"] == variable]
timeseries_df = timeseries_df.drop(columns=['name'])
try:
timeseries_df["value"] = timeseries_df["value"].astype(float)
except ValueError:
timeseries_df["value_int"] = timeseries_df["value"].astype("category").cat.codes
return timeseries_df
class IceblockInstantLog(InstrumentLog):
def __init__(self, full_directory_name: str, filename: str, instrument_name: str):
InstrumentLog.__init__(self, full_directory_name, filename, instrument_name)
def __get_df__(self) -> pd.DataFrame:
df = pd.read_csv(self.full_file_name, sep=",", parse_dates=["datetime"])
df["datetime"] = df["datetime"].dt.tz_localize('UTC')
return df
def get_variables(self):
return ["melting"]
def get_timeseries(self, variable: str) -> pd.DataFrame:
if variable == "melting":
timeseries_df = self.__get_melting_timeseries__()
else:
raise ValueError("Variable name [" + variable + "] not yet managed.")
return timeseries_df
def __get_melting_timeseries__(self) -> pd.DataFrame:
# Get the mapping between iceblock id and iceblock name (assuming that the last name's modification is the
# good one.
mapping_df = self.df[["datetime", "id", "name"]].copy()
mapping_df = mapping_df.groupby("id")["id", "name"].tail(1)
mapping_df = mapping_df.append({"id": 0, "name": "None"}, ignore_index=True)
mapping_df = mapping_df.set_index("id")
mapping_dict = mapping_df["name"].to_dict()
# Get the datetime of the beginning of each iceblock's melting
melting_df = self.df[["datetime", "id", "status"]].copy()
start_df = melting_df[melting_df["status"] == "Melting"].groupby("id")["datetime", "id"].head(1)
# Get the end of the last iceblock's melting, and set that after that the current melting block is 0/None.
end_df = melting_df[melting_df["status"] == "Done"].groupby("id").head(1)
melting_df = start_df.append({"datetime": end_df.iloc[-1]["datetime"], "id": 0},
ignore_index=True)
# Get the value (iceblocks name) and value_int (coded value, iceblock id in this case).
melting_df.rename(columns={"id": 'value_int'}, inplace=True)
melting_df["value"] = melting_df["value_int"].map(mapping_dict)
return melting_df
class ManualEventLog(InstrumentLog):
def __init__(self, full_directory_name: str, filename: str, instrument_name: str):
InstrumentLog.__init__(self, full_directory_name, filename, instrument_name)
def __get_df__(self) -> pd.DataFrame:
# The manual-event log file is not a valid XML file: the root tage is missing. So open the content of the file,
# and add the root tags
with open(self.full_file_name) as f:
xml_str = f.read()
xml_str = "<root>" + xml_str + "</root>"
# Convert the XML to dict, then convert the dict to pd.Dataframe
xml_dict = xmltodict.parse(xml_str)
if "datetime" in xml_dict["root"]["event"]: # Only 1 event -> one less level in dict tree
df = pd.DataFrame([xml_dict["root"]["event"]])
else:
df = pd.DataFrame.from_dict(xml_dict["root"]["event"])
# Rename "description" column
df.rename(columns={"description": 'event'}, inplace=True)
# Format datetime column.
df["datetime"] = pd.to_datetime(df["datetime"]).dt.tz_localize('UTC')
return df
def get_variables(self):
return ["event"]
def get_timeseries(self, variable: str) -> pd.DataFrame:
timeseries_df = self.df[["datetime", variable]]
timeseries_df.rename(columns={variable: 'value'}, inplace=True)
return timeseries_df
class InstrumentPeriodicLog(InstrumentLog):
def __init__(self, full_directory_name: str, filename: str, instrument_name: str):
InstrumentLog.__init__(self, full_directory_name, filename, instrument_name)
def __get_df__(self) -> pd.DataFrame:
df = pd.read_csv(self.full_file_name, sep="\t", parse_dates=["datetime"])
if not df.empty:
df["datetime"] = df["datetime"].dt.tz_localize('UTC')
return df
def get_variables(self):
all_cols = list(self.df)
variable_cols = [colname for colname in all_cols if colname != "datetime"]
return variable_cols
def get_timeseries(self, variable: str) -> pd.DataFrame:
timeseries_df = self.df[["datetime", variable]]
timeseries_df.rename(columns={variable: 'value'}, inplace=True)
return timeseries_df