Commit 40b2b0dc authored by JOSSOUD Olivier's avatar JOSSOUD Olivier
Browse files

Merge remote-tracking branch 'origin/master'

parents 002f9c29 81b5e83b
Pipeline #149473 passed with stages
in 1 minute and 6 seconds
The encoder processor module provides functions to manipulate encoder's raw data.
import typing
import numpy as np
import pandas as pd
......@@ -32,3 +33,81 @@ def add_cmmin_conversion(encoder_df: pd.DataFrame) -> pd.DataFrame:
encoder_df["speed_avg_cmmin"] = encoder_df["speed_avg_mmsec"] * 60 / 10
return encoder_df
def get_moveup_events(encoder_df: pd.DataFrame,
normal_speed_range: typing.Tuple[float, float]) -> pd.DataFrame:
"""Get encoder's upwards movement events.
In normal melting, the encoder's speed is within ``normal_speed_range``. But it also occur some fast upwards
movement due to a manual operation related to either the reloading with new ice blocks, the trial to unblock the ice
core, or the end of the melting session. This function gives the type and time boundaries of all these events.
encoder_df: pd.DataFrame
normal_speed_range: tuple of float
Min and max encoder speed for "normal melting". Negative speed means encoder is going down. Speed unit: mm/s
One line per event, with the following columns: ``start_datetime``, ``end_datetime``, ``start_position`` and
``end_position`` (in mm) and ``event_type``.
encoder_df = encoder_df.copy()
encoder_df["pos_diff"] = encoder_df["position"].diff()
# Get the datetime of the start of each "encoder moves up" event
starts_df = encoder_df[encoder_df["pos_diff"] > 5]
starts_df = starts_df.copy()
starts_df["diff"] = starts_df.index.to_series().diff().fillna(pd.Timedelta(days=100))
start_datetimes = starts_df[starts_df["diff"] > pd.Timedelta(seconds=5)].index.to_series()
events_list = list()
for start_datetime in start_datetimes:
# If the start of this event occurs before the end of the previous event, skip it, as it is probably a manual
# upward move belonging to the previous event.
if len(events_list) > 0:
if start_datetime < events_list[-1]["end_datetime"]:
# Find end of event (back to normal melting)
after_df = encoder_df[start_datetime:]
in_range_df = after_df[np.logical_and(after_df["speed"] >= min(normal_speed_range),
after_df["speed"] <= max(normal_speed_range))]
# Use the moment just before the beginning of the upward movement as start_datetime
start_datetime = encoder_df.index[encoder_df.index.get_loc(start_datetime) - 1]
start_position = encoder_df.loc[start_datetime]["position"]
if len(in_range_df.index) == 0:
event_type = "end_of_melting"
end_datetime = None
end_position = None
# Compute the time difference to the 5th next speed-in-range, to make sure it is not just a few isolate value
# which are in range.
in_range_df = in_range_df.copy()
in_range_df["diff_time"] = in_range_df.index.to_series().diff(periods=5).shift(-5)
end_df = in_range_df[in_range_df["diff_time"] < pd.Timedelta(seconds=5)].head(1)
end_datetime = end_df.index[0]
end_position = end_df["position"][0]
if end_position > (start_position + 50): # New iceblocks added
event_type = "stacking"
else: # No new iceblock. Encoder probably moved to un-block ice core.
event_type = "unblocking"
events_list.append({"event_type": event_type,
"start_datetime": start_datetime,
"start_position": start_position,
"end_datetime": end_datetime,
"end_position": end_position})
events_df = pd.DataFrame(events_list)
return events_df
......@@ -257,11 +257,7 @@ def get_datasets_data(dataset: DatasetReader)\
# Iceblock
iceblock_df = dataset.get_timeseries("ICBKCTRL_instant")
iceblock_df = iceblock.get_melting_timeseries(iceblock_df)
iceblock_df = iceblock_df.sort_index()
iceblock_df["icbk_datetime"] = iceblock_df.index
iceblock_df = iceblock_df.rename(columns={"value_int": "icbk_code",
"value": "icbk_name"})
iceblock_df = iceblock.get_clean_iceblock_df(iceblock_df)
# Conductivity
conduct_df = dataset.get_timeseries("CONDUCTI_periodic")
......@@ -436,6 +432,68 @@ def __compute_mm__(df: pd.DataFrame) -> pd.DataFrame:
return df
def get_absolute_melted_height(encoder_df: pd.DataFrame,
stacked_iceblocks_df: pd.DataFrame,
moveup_event_df: pd.DataFrame,
starting_on_plexi: bool) -> pd.DataFrame:
"""Get the total absolute melting height, for each encoder's timestep.
encoder_df: pd.DataFrame
``ENCODER_periodic``'s dataset.
stacked_iceblocks_df: pd.DataFrame
Datetime-indexed DataFrame containing the total height of ice blocks stacked. In other words, the height of all
the ice blocks stacked at the same time should be summed.
moveup_event_df: pd.DataFrame
Output of :func:`cfatools.processor.encoder.get_moveup_events` function.
starting_on_plexi: bool
If ``True``, the plexi-plate is in ``PLACED`` position at the beginning of the ``encoder_df`` dataset. In this
case, 7.5mm (plexi-plate height) are removed from the ``melted_height``.
Same as ``encoder_df``, with an additional ``melted_height`` column.
stack_events_df = moveup_event_df[moveup_event_df["event_type"] == "stacking"]
# Associate the stack events to the total height of ice blocks stacked at this event
stack_events_df = stack_events_df.copy()
stack_events_df = pd.merge_asof(stack_events_df, stacked_iceblocks_df,
left_on="start_datetime", right_on="datetime_stacked",
# Compute the height of already-stacked ice which has been melted during the stacking event, while the encoder was
# moving or at its parking position
stack_events_df["melted_while_event"] = \
stack_events_df["tot_stacked_height"] - (stack_events_df["end_position"] - stack_events_df["start_position"])
# Compute the average melting speed of the ice block on the melting surface, during the stacking event.
stack_events_df["avg_melting_speed"] = \
-stack_events_df["melted_while_event"] / (stack_events_df["end_datetime"] - stack_events_df["start_datetime"]).dt.total_seconds()
encoder_df["time_diff"] = encoder_df.index.to_series().diff().dt.total_seconds()
encoder_df["ice_speed"] = encoder_df["position"].diff() / encoder_df["time_diff"]
# Compute speed for each timestep of each event
for index, stack_event in stack_events_df.iterrows():
encoder_df.loc[stack_event["start_datetime"]:stack_event["end_datetime"], "ice_speed"] = \
encoder_df["melted_height"] = (-encoder_df["ice_speed"] * encoder_df["time_diff"]).cumsum()
if starting_on_plexi:
encoder_df["melted_height"] = encoder_df["melted_height"] - 7.5
encoder_df.loc[encoder_df["melted_height"] < 0, "melted_height"] = 0
# Set all-but-last 0-value melted height to NaN
if len(encoder_df[encoder_df["melted_height"] == 0].index) > 2:
encoder_df.loc[:encoder_df[encoder_df["melted_height"] == 0].tail(2).index[0], "melted_height"] = np.nan
return encoder_df
def get_tubing_volume_dict(filepath: str,
max_datetime: datetime.datetime = -> dict:
"""Get the characteristic tubing volumes
......@@ -2,7 +2,7 @@
The iceblock processor module provides functions to manipulate ice block controller's raw data (from
``yyyymmdd_dataset_name_ICBKCTRL_instant.log``-like log files).
import numpy as np
import pandas as pd
......@@ -33,3 +33,51 @@ def get_melting_timeseries(iceblock_df: pd.DataFrame) -> pd.DataFrame:
melting_df = melting_df.set_index("datetime")
return melting_df
def get_clean_iceblock_df(iceblock_df: pd.DataFrame) -> pd.DataFrame:
"""Get "cleaned" iceblocks' information, keeping only relevant data.
iceblock_df: pd.DataFrame
``ICBKCTRL_instant`` dataset.
Same as input DataFrame, without the lines with the intermediate data modification.
# Find first appearance to determine which blocks were stacked at the same time
first_df = iceblock_df.groupby("id").head(1).copy()
first_df["time_diff"] = first_df.index.to_series().diff().dt.total_seconds()
first_df["new_stack"] = 0
first_df.loc[first_df["time_diff"] > 30, "new_stack"] = 1
first_df["stack_id"] = first_df["new_stack"].cumsum()
first_df = first_df.reset_index().rename(columns={ 'datetime_stacked'})
first_df = first_df[["id", "datetime_stacked", "stack_id"]]
iceblock_df = iceblock_df.reset_index(drop=True)
# Find datetime when the block started to melt
start_df = iceblock_df[np.logical_not(iceblock_df["datetime_start"].isnull())].groupby("id").head(1)
start_df = start_df[["id", "datetime_start"]]
# Find the last data modification for each iceblock as it is supposed to be the valid one.
final_df = iceblock_df.groupby("id").tail(1)
final_df = final_df.drop(columns="datetime_start")
clean_iceblock_df = pd.merge(start_df, final_df, on="id", how="outer")
clean_iceblock_df = pd.merge(clean_iceblock_df, first_df, on="id", how="outer")
return clean_iceblock_df
def get_total_stacked_height(clean_iceblock_df: pd.DataFrame) -> pd.DataFrame:
sum_df = clean_iceblock_df.groupby("stack_id")[["initial_height"]].sum()
sum_df = sum_df.rename(columns={"initial_height": "tot_stacked_height"})
datetime_df = clean_iceblock_df.groupby("stack_id")["datetime_stacked"].max()
total_stacked_height_df = pd.merge(sum_df, datetime_df, left_index=True, right_index=True)
total_stacked_height_df = total_stacked_height_df.reset_index()
return total_stacked_height_df
from unittest import TestCase
import pandas as pd
from configobj import ConfigObj
from logreader.instrument import InstrumentReader
......@@ -20,3 +22,25 @@ class TestEncoder(TestCase):
except:"Exception raised!")
def test_get_moveup_events(self):
inst_reader = InstrumentReader(self.base_path)
encoder_df = inst_reader.get_timeseries("20210506_test_encoder_mock_core4", "ENCODER_periodic")
events_df = encoder.get_moveup_events(encoder_df, (-3.0, -0.01))
except:"Exception raised!")
expected_events_list = [{'event_type': "stacking",
'start_datetime': pd.Timestamp("2021-05-06 12:53:57.699291+00:00"),
'start_position': 191.4,
'end_datetime': pd.Timestamp("2021-05-06T12:54:43.455791000+00:00"),
'end_position': 409.75},
{'event_type': 'end_of_melting',
'start_datetime': pd.Timestamp("2021-05-06 13:25:14.941711+00:00"),
'start_position': 136.65,
'end_datetime': None,
'end_position': None}]
expected_events_df = pd.DataFrame(expected_events_list)
......@@ -9,7 +9,8 @@ import pstats
from processor import flow
from processor import pump
from logreader.dataset import DatasetReader
from processor import encoder
from logreader.dataset import DatasetReader, InstrumentReader
class TestFlow(TestCase):
......@@ -138,3 +139,18 @@ class TestFlow(TestCase):
df = flow.add_flask_info(df, collector_df)
except:"Exception raised!")
def test_get_continuous_melting_height(self):
data = [[pd.Timestamp("2021-05-07 12:34:22.133011+00:00"), 715],
[pd.Timestamp("2021-05-07 13:03:49.215460+00:00"), 677],
[pd.Timestamp("2021-05-07 13:29:03.557948+00:00"), 895],
[pd.Timestamp("2021-05-07 14:01:15.109480+00:00"), 680],
[pd.Timestamp("2021-05-07 14:30:03.054819+00:00"), 72]]
stacked_iceblock_df = pd.DataFrame(data, columns=['datetime', 'tot_stacked_height'])
stacked_iceblock_df = stacked_iceblock_df.set_index("datetime")
inst_reader = InstrumentReader(self.base_path)
encoder_df = inst_reader.get_timeseries("20210507_ASUMA2016_8_14", "ENCODER_periodic")
moveup_event_df = encoder.get_moveup_events(encoder_df, (-3.0, -0.01))
flow.get_absolute_melted_height(encoder_df, stacked_iceblock_df, moveup_event_df)
......@@ -14,8 +14,16 @@ class TestIceblock(TestCase):
def test_get_melting_timeseries(self):
reader = InstrumentReader(self.base_path)
iceblock_df = reader.get_timeseries("20191104_test_temperature_10", "ICBKCTRL_instant")
iceblock_df = reader.get_timeseries("20210507_ASUMA2016_8_14", "ICBKCTRL_instant")
except:"Exception raised!")
def test_get_clean_iceblock_df(self):
reader = InstrumentReader(self.base_path)
iceblock_df = reader.get_timeseries("20210506_test_encoder_mock_core4", "ICBKCTRL_instant")
except:"Exception raised!")
# Absolute path of the base directory where the data files produced by the CFA are be stored.
base_data_path = /homel/ojossoud/_temp/data_cfa
base_data_path = /homel/ojossoud/Data/data_cfa
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment