Commit de558d31 authored by JOSSOUD Olivier's avatar JOSSOUD Olivier
Browse files

Flow Processor. Get arrival & flask info & iceblock info

parent 364d5329
Pipeline #115570 passed with stages
in 5 minutes and 1 second
......@@ -24,6 +24,13 @@
</Attribute>
</value>
</entry>
<entry key="/config/tubing_volumes.csv">
<value>
<Attribute>
<option name="separator" value="," />
</Attribute>
</value>
</entry>
</map>
</option>
</component>
......
......@@ -235,7 +235,7 @@ def get_bubble_density_from_conduct(dataset: DatasetReader):
def get_datasets_data(dataset: DatasetReader)\
-> typing.Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame,]:
-> typing.Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Get the dataframes useful for flow analysis
Parameters
......@@ -259,12 +259,6 @@ def get_datasets_data(dataset: DatasetReader)\
iceblock_df = iceblock_df.rename(columns={"value_int": "icbk_code",
"value": "icbk_name"})
# Melting
melting_df = iceblock_df.copy()
melting_df = melting_df.drop(columns="icbk_datetime")
melting_df = pd.merge_asof(encoder_df, melting_df, left_index=True, right_index=True)
melting_df = melting_df.groupby("icbk_code").apply(__compute_mm__)
# Conductivity
conduct_df = dataset.get_timeseries("CONDUCTI_periodic")
......@@ -279,7 +273,106 @@ def get_datasets_data(dataset: DatasetReader)\
collector_df = dataset.get_timeseries("COLLECTR_instant", "flask")
collector_df = collector_df.rename(columns={"value": "flask"})
return encoder_df, iceblock_df, melting_df, conduct_df, pump_df, picarro_df, collector_df
return encoder_df, iceblock_df, conduct_df, pump_df, picarro_df, collector_df
def get_arrival_df(encoder_df: pd.DataFrame,
pump_df: pd.DataFrame,
volumes_dict: dict,
keep_intermediate: bool = False,
parallel: bool = True) -> pd.DataFrame:
"""Get arrival datetime of each encoder measure, at collector's needle and Picarro
Parameters
----------
encoder_df: pd.DataFrame
Encoder's timestamped data, as obtained with :func:`get_datasets_data` function.
pump_df: pd.DataFrame
Pumps' timestamped data, as obtained with :func:`get_datasets_data` function.
volumes_dict: dict
Tubing volumes, as obtained with :func:`get_tubing_volume_dict` function.
keep_intermediate: bool, default: False
If `True` keep arrival datetime at intermediate position: debubbler, T1 and T2.
parallel: bool, default: True
If `True` compute arrival datetime in parallel. This speeds up the process but can mess up the debugger.
Returns
-------
pd.DataFrame
Dataframe with arrival datetime at needle and Picarro.
"""
arrival_df = encoder_df.copy()
arrival_df = arrival_df.drop(columns="speed")
arrival_df["melt"] = arrival_df.index
# Melting base -> to T1
arrival_df["debubbler"] = get_datetime_out(arrival_df["melt"], pump_df["com"],
volumes_dict["melt_debub"], parallel=parallel)
arrival_df["T1"] = get_datetime_out(arrival_df["debubbler"], pump_df["colpic"],
volumes_dict["debub_T1"], parallel=parallel)
# To Collector
arrival_df["needle"] = get_datetime_out(arrival_df["T1"], pump_df["col"], volumes_dict["T1_needle"],
parallel=parallel)
# To Picarro
arrival_df["T2"] = get_datetime_out(arrival_df["T1"], pump_df["pic"], volumes_dict["T1_T2"],
parallel=parallel)
arrival_df["picarro"] = arrival_df["T2"] + pd.Timedelta(seconds=volumes_dict["T2_pic_sec"])
if not keep_intermediate:
arrival_df = arrival_df.drop(columns=["debubbler", "T1", "T2"])
return arrival_df
def add_iceblock_info(arrival_df: pd.DataFrame, encoder_df: pd.DataFrame, iceblock_df: pd.DataFrame) -> pd.DataFrame:
"""Add iceblock information: melted block height and iceblock code & name.
Parameters
----------
arrival_df: pd.DataFrame
Output of :func:`get_arrival_df` function.
encoder_df: pd.DataFrame
Encoder's timestamped data, as obtained with :func:`get_datasets_data` function.
iceblock_df: pd.DataFrame
Iceblock's timestamped data, as obtained with :func:`get_datasets_data` function.
Returns
-------
pd.DataFrame
Same as input `arrival_df`, with melted block height and iceblock code & name.
"""
# Add encoder speed to arrival_df
arrival_df = pd.merge(arrival_df, encoder_df, left_index=True, right_index=True)
# Add iceblock code and name to arrival_df
iceblock_df = iceblock_df.drop(columns="icbk_datetime")
arrival_df = pd.merge_asof(arrival_df, iceblock_df, left_index=True, right_index=True)
# Compute melted block height for each iceblock, for each encoder step
arrival_df = arrival_df.groupby("icbk_code").apply(__compute_mm__)
return arrival_df
def add_flask_info(arrival_df: pd.DataFrame, collector_df: pd.DataFrame) -> pd.DataFrame:
"""Add flask information: in which flask the needle was when iceblock segment reached the needle.
Parameters
----------
arrival_df: pd.DataFrame
Output of :func:`get_arrival_df` function.
collector_df: pd.DataFrame
Collector's timestamped data, as obtained with :func:`get_datasets_data` function.
Returns
-------
pd.DataFrame
Same as input `arrival_df`, with melted block height and iceblock code & name.
"""
arrival_df = pd.merge_asof(arrival_df, collector_df[["flask"]], left_on="needle", right_index=True)
return arrival_df
def __compute_mm__(df: pd.DataFrame) -> pd.DataFrame:
......
......@@ -110,7 +110,7 @@ class TestFlow(TestCase):
dataset = DatasetReader(self.base_path, dataset_name)
try:
encoder_df, iceblock_df, melting_df, conduct_df, pump_df, picarro_df, collector_df = \
encoder_df, iceblock_df, conduct_df, pump_df, picarro_df, collector_df = \
flow.get_datasets_data(dataset)
except:
self.fail("Exception raised!")
......@@ -123,4 +123,18 @@ class TestFlow(TestCase):
self.assertEqual(vol_dict["T1_needle"], 7)
with self.assertRaises(ValueError):
flow.get_tubing_volume_dict("../test_processor/tubing_volumes.csv", datetime.datetime(2010, 1, 1))
\ No newline at end of file
flow.get_tubing_volume_dict("../test_processor/tubing_volumes.csv", datetime.datetime(2010, 1, 1))
def test_get_arrival_df(self):
dataset_name = "20210202_mock_core_flasks"
dataset = DatasetReader(self.base_path, dataset_name)
vol_dict = flow.get_tubing_volume_dict("../test_processor/tubing_volumes.csv")
encoder_df, iceblock_df, conduct_df, pump_df, picarro_df, collector_df = \
flow.get_datasets_data(dataset)
try:
df = flow.get_arrival_df(encoder_df, pump_df, vol_dict, parallel=False)
df = flow.add_iceblock_info(df, encoder_df, iceblock_df)
df = flow.add_flask_info(df, collector_df)
except:
self.fail("Exception raised!")
datetime,melt_debub,debub_T1,T1_needle,T1_T2
2019-12-01,1,2,3,4
2021-04-01,3.5,0.43,1.8,0.35
2020-12-01,5,6,7,8
\ No newline at end of file
datetime,melt_debub,debub_T1,T1_needle,T1_T2,T2_pic_sec
2019-12-01,1,2,3,4,100
2021-04-01,3.5,0.43,1.8,0.35,50
2020-12-01,5,6,7,8,200
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment