Commit 1ea3e726 authored by JOSSOUD Olivier's avatar JOSSOUD Olivier
Browse files

Flow Processor. Tubing volumes.

parent 1fdbf876
Pipeline #115126 passed with stages
in 2 minutes and 31 seconds
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CsvFileAttributes">
<option name="attributeMap">
<map>
<entry key="/cfatools/processor/flow.py">
<value>
<Attribute>
<option name="separator" value=":" />
</Attribute>
</value>
</entry>
<entry key="/cfatools/tests/test_flow/tubing_volumes.csv">
<value>
<Attribute>
<option name="separator" value="," />
</Attribute>
</value>
</entry>
</map>
</option>
</component>
</project>
\ No newline at end of file
......@@ -3,6 +3,7 @@ The flow processor module provides functions to determine where each ice core wa
"""
import datetime
import pandas as pd
import numpy as np
from pandarallel import pandarallel
from cfatools.logreader.dataset import DatasetReader
......@@ -232,6 +233,41 @@ def get_bubble_density_from_conduct(dataset: DatasetReader):
return compump_df
def get_tubing_volume_dict(filepath: str,
max_datetime: datetime.datetime = datetime.datetime.now(datetime.timezone.utc)) -> dict:
"""Get the characteristic tubing volumes
Parameters
----------
filepath: str
Full path of the CSV file containing the tubing volumes, one `datetime` column and one column per volume. One
line per volume update (in case the physical setup and/or tubing changes).
max_datetime: datetime.datetime
Last date of the data which will be used with these volumes. The volumes returned are the most recent measured
volumes, recorded just before this `max_datetime`.
Returns
-------
dict
Tubing volumes as a dict
"""
# Read the tubing volumes CSV file.
volume_df = pd.read_csv(filepath, parse_dates=["datetime"], index_col="datetime").sort_index()
# Remove timezone info as np.datetime64 does not manage timezone. For this application and date granularity,
# it is OK to ignore timezone, it is anyway supposed to be UTC everywhere.
max_datetime = max_datetime.replace(tzinfo=None)
# Get the most recent volumes, available before the `max_datetime`
volume_df = volume_df[volume_df.index < np.datetime64(max_datetime)][-1:]
if len(volume_df.index) == 0:
raise ValueError("No tubing volume available before " + max_datetime.strftime("%Y-%m-%d"))
volume_dict = volume_df.to_dict('records')[0]
return volume_dict
def get_datetime_out(datetime_in_series: pd.Series, mlmin_series: pd.Series, tube_volume_ml: float, parallel: bool = True) -> pd.Series:
"""Get the date/time the fluid reaches the end of the tube.
......
from unittest import TestCase
import datetime
from cfatools.processor import flow
class TestFlow(TestCase):
def test_get_tubing_volume_dict(self):
vol_dict = flow.get_tubing_volume_dict("tubing_volumes.csv")
self.assertEqual(vol_dict["melt_debub"], 3.5)
vol_dict = flow.get_tubing_volume_dict("tubing_volumes.csv", datetime.datetime(2021, 1, 1))
self.assertEqual(vol_dict["T1_needle"], 7)
with self.assertRaises(ValueError):
flow.get_tubing_volume_dict("tubing_volumes.csv", datetime.datetime(2010, 1, 1))
datetime,melt_debub,debub_T1,T1_needle,T1_T2
2019-12-01,1,2,3,4
2021-04-01,3.5,0.43,1.8,0.35
2020-12-01,5,6,7,8
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment