Commit 2beb23a0 authored by JOSSOUD Olivier's avatar JOSSOUD Olivier
Browse files

Flow Processor. Parallel processing for datetime_out computation.

parent eb489ba1
Pipeline #101209 failed with stage
in 52 seconds
......@@ -3,6 +3,7 @@ The flow processor module provides functions to determine where each ice core wa
"""
import datetime
import pandas as pd
from pandarallel import pandarallel
from logreader.dataset import DatasetReader
from processor import iceblock
......@@ -38,7 +39,7 @@ def get_flow_timeseries(dataset: DatasetReader):
pass
def get_datetime_out(datetime_in: pd.Series, mlmin_df: pd.DataFrame, tube_volume_ml: float) -> pd.Series:
def get_datetime_out(datetime_in: pd.Series, mlmin_df: pd.DataFrame, tube_volume_ml: float, parallel: bool = True) -> pd.Series:
"""Get the date/time the fluid reaches the end of the tube.
Considering a tube whose internal volume is ``tube_volume_ml``, in which flows a fluid as variable flow rates
......@@ -53,6 +54,11 @@ def get_datetime_out(datetime_in: pd.Series, mlmin_df: pd.DataFrame, tube_volume
Datetime-indexed dataframe containing a Series of fluid flow rate, in ml/min (data column must be named
``mlmin``). In most case, this comes from pump's flow rate settings time series.
tube_volume_ml: float
Volume of the tube.
parallel: bool
If ``True`` the datetime_out computation will be parallelized, which is faster 2-3x faster for big
``datetime_in`` Series but make debugging more complicated. Should be enabled in production disabled while
debugging.
Returns
-------
......@@ -64,7 +70,11 @@ def get_datetime_out(datetime_in: pd.Series, mlmin_df: pd.DataFrame, tube_volume
mlmin_df = mlmin_df[["mlmin"]].ffill()
mlmin_df["ml_cumul"] = pump.add_cumulated_volume(mlmin_df["mlmin"])
datetime_out = datetime_in.apply(__get_single_datetime_out__, args=(mlmin_df, tube_volume_ml))
if parallel:
pandarallel.initialize()
datetime_out = datetime_in.parallel_apply(__get_single_datetime_out__, args=(mlmin_df, tube_volume_ml))
else:
datetime_out = datetime_in.apply(__get_single_datetime_out__, args=(mlmin_df, tube_volume_ml))
return datetime_out
......
sphinx==2.2.*
sphinx_rtd_theme==0.4.*
numpydoc==0.9.*
\ No newline at end of file
sphinx~=2.2
sphinx_rtd_theme~=0.4
numpydoc~=0.9
\ No newline at end of file
pandas~=1.2
configobj~=5.0
csaps~=0.7.*
matplotlib~=3.3 # Linux package python3-tk must be also installed
numpy~=1.18
configobj==5.0.*
xmltodict==0.12.*
csaps==0.7.*
matplotlib>=3.1.* # Linux package python3-tk must be also installed
\ No newline at end of file
pandas~=1.2
pandarallel~=1.5
xmltodict~=0.12
......@@ -58,6 +58,7 @@ class TestFlow(TestCase):
self.assertEqual(icecore_df.iloc[3]["datetime_out"], pd.to_datetime("2021-01-01 00:32:00"))
def test_get_datetime_out_realdateset(self):
monitor_processing_time = False
tube_volume_ml = 30
dataset_name = "20210122_mock_ice_bronkgas_debub"
......@@ -67,13 +68,16 @@ class TestFlow(TestCase):
encoder_df["datetime_in"] = encoder_df.index
profile = cProfile.Profile()
profile.enable()
if monitor_processing_time:
profile = cProfile.Profile()
profile.enable()
encoder_df["datetime_out"] = flow.get_datetime_out(encoder_df["datetime_in"], colpump_df, tube_volume_ml)
profile.disable()
ps = pstats.Stats(profile)
ps.print_stats()
encoder_df["datetime_out"] = flow.get_datetime_out(encoder_df["datetime_in"], colpump_df, tube_volume_ml,
parallel=False)
if monitor_processing_time:
profile.disable()
ps = pstats.Stats(profile)
ps.print_stats()
self.assertIsInstance(encoder_df, pd.DataFrame)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment