Commit 445327b7 authored by JOSSOUD Olivier's avatar JOSSOUD Olivier
Browse files

Flow Processor. Improve datetime_out computation.

parent 93a56898
Pipeline #101063 passed with stages
in 1 minute and 31 seconds
......@@ -39,6 +39,30 @@ def get_flow_timeseries(dataset: DatasetReader):
pass
def add_datetime_out(event_df: pd.DataFrame, mlmin_df: pd.DataFrame, tube_volume_ml: float) -> pd.DataFrame:
"""
Parameters
----------
event_df: pd.DataFrame
mlmin_df: pd.DataFrame
tube_volume_ml: float
Returns
-------
pd.DataFrame
"""
# Add events' datetime to pump's mlmin change datetime
mlmin_df = pd.merge(mlmin_df, event_df, left_index=True, right_index=True, how="outer")
mlmin_df = mlmin_df[["mlmin"]].ffill()
mlmin_df["ml_cumul"] = pump.add_cumulated_volume(mlmin_df["mlmin"])
event_df["datetime_out"] = event_df["datetime_in"].apply(get_datetime_out, args=(mlmin_df, tube_volume_ml))
return event_df
def get_datetime_out(datetime_in: pd.Timestamp, mlmin_df: pd.DataFrame, tube_volume_ml: float) -> pd.Timestamp:
"""Get the date/time the fluid reaches the end of the tube.
......@@ -61,28 +85,20 @@ def get_datetime_out(datetime_in: pd.Timestamp, mlmin_df: pd.DataFrame, tube_vol
pd.Timestamp
Date/time when the fluid of interest exists the tube.
"""
# Determine the pump flow rate (in ml/min) at `datetime_in`
insertion_mlmin = mlmin_df[
mlmin_df.index == max(mlmin_df.index[mlmin_df.index <= datetime_in])]["mlmin"].to_list()
# Create a single-point Dataframe with this insertion flow rate
insertion_df = pd.DataFrame({'mlmin': insertion_mlmin}, index=[datetime_in])
if not isinstance(datetime_in, pd.Timestamp):
return
# Create a DataFrame of flow rate's evolution, starting from insertion (`datetime_in`)
object_mlmin_df = insertion_df.append(mlmin_df[mlmin_df.index > datetime_in])
object_mlmin_df = mlmin_df[mlmin_df.index >= datetime_in]
# Compute the cumulated volume (in ml), for each time step
object_mlmin_df["delta_t_min"] = object_mlmin_df.index.to_series().diff().fillna(pd.Timedelta(seconds=0)).astype(
'timedelta64[m]').astype('int') # Compute time difference between 2 flow rate settings (in minutes)
object_mlmin_df["ml"] = object_mlmin_df["mlmin"].shift(1, fill_value=0) * object_mlmin_df["delta_t_min"]
object_mlmin_df["ml_cumul"] = object_mlmin_df["ml"].cumsum()
# Get already-cumulated volume at datetime_in
initial_volume = object_mlmin_df["ml_cumul"].iloc[0]
# Keep only the data lines where the cumulated volume flown does not exceed the tube's volume
object_mlmin_df = object_mlmin_df[object_mlmin_df["ml_cumul"] < tube_volume_ml]
object_mlmin_df = object_mlmin_df[object_mlmin_df["ml_cumul"] < (tube_volume_ml + initial_volume)]
# Compute the duration between the last data line and the moment when the tube's volume is reached
missing_volume = tube_volume_ml - object_mlmin_df['ml_cumul'].iloc[-1]
missing_volume = (tube_volume_ml + initial_volume) - object_mlmin_df['ml_cumul'].iloc[-1]
last_speed = object_mlmin_df['mlmin'].iloc[-1]
missing_duration_min = float(missing_volume * last_speed)
......
......@@ -123,6 +123,13 @@ def get_mlmin_df(dataset: DatasetReader, pump_id: str) -> pd.DataFrame:
return mlmin_df
def add_cumulated_volume(mlmin_series: pd.Series) -> pd.Series:
# Compute the cumulated volume (in ml), for each time step
delta_t_min = mlmin_series.index.to_series().diff().dt.total_seconds() / 60
instant_ml = mlmin_series.shift(1, fill_value=0) * delta_t_min
return instant_ml.cumsum()
def add_mlmin_to_rpm_history_df(rpm_df: pd.DataFrame, coef_df: pd.DataFrame, interp_method: str = "closest",
rpm_col_name: str = "rpm") -> pd.DataFrame:
"""Add a ml/min column to the round-per-minute timeseries.
......@@ -160,9 +167,9 @@ def add_mlmin_to_rpm_history_df(rpm_df: pd.DataFrame, coef_df: pd.DataFrame, int
# to [calibration_time + time_delta_to_next_calib / 2]
coef_df["application_start_datetime"] = \
coef_df["apply_datetime"] - (coef_df["apply_datetime"] - coef_df["apply_datetime"].shift()) /2
coef_df.at[coef_df.index[0], "application_start_datetime"] = pd.Timestamp('1970-01-01', tz="GMT")
coef_df.at[coef_df.index[0], "application_start_datetime"] = pd.Timestamp('1970-01-01', tz="UTC")
coef_df["application_end_datetime"] = \
coef_df["application_start_datetime"].shift(-1, fill_value=pd.Timestamp.now(tz="GMT"))
coef_df["application_start_datetime"].shift(-1, fill_value=pd.Timestamp.now(tz="UTC"))
# Remove calibrations whose applicability range does not overlap rpm_df's data.
coef_df = coef_df.loc[coef_df["application_end_datetime"] > rpm_df.index.min()]
......
pandas==0.25.*
numpy==1.18.*
pandas~=1.2
numpy~=1.18
configobj==5.0.*
xmltodict==0.12.*
csaps==0.7.*
......
import datetime
import pandas as pd
import numpy as np
from unittest import TestCase
from configobj import ConfigObj
import os
import cProfile
import pstats
from processor import flow
from processor import pump
from logreader.dataset import DatasetReader
......@@ -42,13 +47,48 @@ class TestFlow(TestCase):
"2021-01-01 00:11:00"])
icecore_df = pd.DataFrame({'icecore_id': icecore_id, 'datetime_in': datetime_in})
try:
icecore_df["datetime_out"] = icecore_df["datetime_in"].apply(flow.get_datetime_out,
args=(mlmin_df, tube_volume_ml))
except:
self.fail("Exception raised!")
icecore_df.set_index("datetime_in", inplace=True)
icecore_df["datetime_in"] = icecore_df.index
# icecore_df["datetime_out"] = icecore_df["datetime_in"].apply(flow.get_datetime_out,
# args=(mlmin_df, tube_volume_ml))
flow.add_datetime_out(icecore_df, mlmin_df, tube_volume_ml)
# try:
#
# except:
# self.fail("Exception raised!")
self.assertEqual(icecore_df.iloc[0]["datetime_out"], pd.to_datetime("2021-01-01 00:25:00"))
self.assertEqual(icecore_df.iloc[1]["datetime_out"], pd.to_datetime("2021-01-01 00:29:00"))
self.assertEqual(icecore_df.iloc[2]["datetime_out"], pd.to_datetime("2021-01-01 00:30:00"))
self.assertEqual(icecore_df.iloc[3]["datetime_out"], pd.to_datetime("2021-01-01 00:32:00"))
########################
# Real dataset
def test_add_datetime_out(self):
tube_volume_ml = 30
dataset_name = "20210122_mock_ice_bronkgas_debub"
dataset = DatasetReader(self.base_path, dataset_name)
colpump_df = pump.get_mlmin_df(dataset, "COLPUMP")
encoder_df = dataset.get_timeseries("ENCODER_periodic")
encoder_df["datetime_in"] = encoder_df.index
t0 = datetime.datetime.now()
t1 = datetime.datetime.now()
profile = cProfile.Profile()
profile.enable()
encoder_df = flow.add_datetime_out(encoder_df, colpump_df, tube_volume_ml)
profile.disable()
ps = pstats.Stats(profile)
ps.print_stats()
t2 = datetime.datetime.now()
print(t1 - t0)
print(t2 - t1)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment