Commit 7d24c990 authored by JOSSOUD Olivier's avatar JOSSOUD Olivier
Browse files

Flow Processor. Use Pickle binary file to avoid arrival computation.

parent de558d31
Pipeline #115652 passed with stages
in 2 minutes and 11 seconds
......@@ -24,6 +24,13 @@
</Attribute>
</value>
</entry>
<entry key="/cfatools/tests/test_processor/tubing_volumes.csv">
<value>
<Attribute>
<option name="separator" value="," />
</Attribute>
</value>
</entry>
<entry key="/config/tubing_volumes.csv">
<value>
<Attribute>
......
"""
The flow processor module provides functions to determine where each ice core was at each time step
"""
import os
import pickle
import datetime
import pandas as pd
import numpy as np
......@@ -280,7 +282,9 @@ def get_arrival_df(encoder_df: pd.DataFrame,
pump_df: pd.DataFrame,
volumes_dict: dict,
keep_intermediate: bool = False,
parallel: bool = True) -> pd.DataFrame:
parallel: bool = True,
force_update: bool = False,
dataset: DatasetReader = None) -> pd.DataFrame:
"""Get arrival datetime of each encoder measure, at collector's needle and Picarro
Parameters
......@@ -295,30 +299,52 @@ def get_arrival_df(encoder_df: pd.DataFrame,
If `True` keep arrival datetime at intermediate position: debubbler, T1 and T2.
parallel: bool, default: True
If `True` compute arrival datetime in parallel. This speeds up the process but can mess up the debugger.
force_update: bool, default: False
As the computing of arrival is time consuming, the resulting DataFrame is save in a binary file for faster
later reuse (if `dataset` parameter is not `None`). If `force_update=False`, arrival datetimes are read from
this binary file, otherwise they are re-calculated.
dataset: DatasetReader, default: None
Dataset where the binary file will be stored/read from.
Returns
-------
pd.DataFrame
Dataframe with arrival datetime at needle and Picarro.
"""
arrival_df = encoder_df.copy()
arrival_df = arrival_df.drop(columns="speed")
arrival_df["melt"] = arrival_df.index
# Melting base -> to T1
arrival_df["debubbler"] = get_datetime_out(arrival_df["melt"], pump_df["com"],
volumes_dict["melt_debub"], parallel=parallel)
arrival_df["T1"] = get_datetime_out(arrival_df["debubbler"], pump_df["colpic"],
volumes_dict["debub_T1"], parallel=parallel)
# To Collector
arrival_df["needle"] = get_datetime_out(arrival_df["T1"], pump_df["col"], volumes_dict["T1_needle"],
if dataset is not None:
pkl_filename = os.path.join(dataset.binary_base_path, "arrival.pkl")
else:
pkl_filename = "----"
if not force_update and os.path.exists(pkl_filename):
binary_file = open(pkl_filename, 'rb')
arrival_df = pickle.load(binary_file)
binary_file.close()
else:
arrival_df = encoder_df.copy()
arrival_df = arrival_df.drop(columns="speed")
arrival_df["melt"] = arrival_df.index
# Melting base -> to T1
arrival_df["debubbler"] = get_datetime_out(arrival_df["melt"], pump_df["com"],
volumes_dict["melt_debub"], parallel=parallel)
arrival_df["T1"] = get_datetime_out(arrival_df["debubbler"], pump_df["colpic"],
volumes_dict["debub_T1"], parallel=parallel)
# To Collector
arrival_df["needle"] = get_datetime_out(arrival_df["T1"], pump_df["col"], volumes_dict["T1_needle"],
parallel=parallel)
# To Picarro
arrival_df["T2"] = get_datetime_out(arrival_df["T1"], pump_df["pic"], volumes_dict["T1_T2"],
parallel=parallel)
arrival_df["picarro"] = arrival_df["T2"] + pd.Timedelta(seconds=volumes_dict["T2_pic_sec"])
# To Picarro
arrival_df["T2"] = get_datetime_out(arrival_df["T1"], pump_df["pic"], volumes_dict["T1_T2"],
parallel=parallel)
arrival_df["picarro"] = arrival_df["T2"] + pd.Timedelta(seconds=volumes_dict["T2_pic_sec"])
# Create and save binary file for faster later reuse
if dataset is not None:
binary_file = open(pkl_filename, 'wb')
pickle.dump(arrival_df, binary_file)
binary_file.close()
if not keep_intermediate:
arrival_df = arrival_df.drop(columns=["debubbler", "T1", "T2"])
......
......@@ -133,7 +133,7 @@ class TestFlow(TestCase):
flow.get_datasets_data(dataset)
try:
df = flow.get_arrival_df(encoder_df, pump_df, vol_dict, parallel=False)
df = flow.get_arrival_df(encoder_df, pump_df, vol_dict, parallel=False, dataset=dataset)
df = flow.add_iceblock_info(df, encoder_df, iceblock_df)
df = flow.add_flask_info(df, collector_df)
except:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment