Skip to content
Snippets Groups Projects
Commit dd0f3b7e authored by Jean-Baptiste Bayle's avatar Jean-Baptiste Bayle
Browse files

Remove cache class

parent eb0892e5
No related branches found
No related tags found
1 merge request!174Revert changes made for cache system
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Cache system adapted to LISA Instrument.
Authors:
Julian Villeneuve <julian.villeneuve@thalesgroup.com>
Jean-Baptiste Bayle <j2b.bayle@gmail.com>
"""
import logging
import multiprocessing as mp
import os
from typing import Dict, List
from h5py import File
from lisainstrument.containers import ForEachObject
logger = logging.getLogger(__name__)
class Cache:
"""Stores data in a cache, and flushes it to a file when needed.
Data is first stored in memory, and can be easily retreived if needed.
When access to the data is not needed anymore, it can be flushed to a file
and wiped from memory to reduce memory usage.
To prevent I/O bottlenecks, writing to file can be done in parallel using
multiple processes. Each process will write to a temporary file, and when
all data has been flushed, the temporary files will be merged into the
final file. Note that this is only useful for hardware which supports
parallel I/O, such as modern SSDs.
Args:
filename: filename to write the data to
nprocess: number of I/O processes (1 by default)
mode: writing mode (by default, "x" to create a new file)
"""
def __init__(self, filename: str, nprocess: int = 1, mode: str = "x") -> None:
self.filename = filename
self.nprocess = nprocess
self.cache: Dict[str, ForEachObject] = {}
self.processes: List[mp.Process] = []
self.temp_filenames = [f"{self.filename}.{i}" for i in range(nprocess)]
self.queue: mp.Queue = mp.Queue()
if nprocess > 1:
for i, temp_filename in enumerate(self.temp_filenames):
# Remove temporary files if they exist (e.g. if program crashed)
# only if the file is opened in write or append mode
if mode in ("w", "a") and os.path.exists(temp_filename):
logger.warning("Removing temporary file %s", temp_filename)
os.remove(temp_filename)
# Start process
kwargs = {
"iprocess": i,
"nprocess": nprocess,
"filename": temp_filename,
"queue": self.queue,
}
process = mp.Process(target=self._writing_process, kwargs=kwargs)
self.processes.append(process)
process.start()
def __getitem__(self, key: str) -> ForEachObject:
"""Get data from the cache using its key.
Args:
key: key of the data to return
"""
return self.cache[key]
def __setitem__(self, key: str, value: ForEachObject) -> None:
"""Put data in the cache.
Args:
key: key of the data to store
data: data to be stored in the cache
"""
logging.debug("Putting data %s in the cache", key)
self.cache[key] = value
def flush(self, key: str, write: bool) -> None:
"""Remove data from the cache, and write to disk if needed.
After data has been flushed, it can be wiped from memory by the garbage
collector to reduce memory usage. If ``keep_data`` is set to ``True``,
the data will be written to disk by one of the I/O processes.
Args:
key: key of the data to store
write: whether or not to save data in the file
"""
logging.debug("Flushing data %s", key)
if write:
data = self.cache[key]
if self.nprocess > 1:
logger.debug("Putting %s in the writing queue", key)
self.queue.put((key, data))
else:
logger.debug(
"Writing data %s to %s by main process", key, self.filename
)
with File(self.filename, "a") as hdf5:
data.write(hdf5, key)
del data
del self.cache[key]
@staticmethod
def _writing_process(iprocess, nprocess, filename, queue) -> None:
"""Writing process (can be run in parallel).
This loops indefinitely, waiting for data to be written to the file.
When the key is ``None``, the process stops.
Args:
iprocess: index of the process
nprocess: number of processes
filename: name of the file to write to
queue: queue to get data from
"""
process = mp.current_process()
logger.debug("Starting process %d/%d (%s)", iprocess + 1, nprocess, process)
with File(filename, "a") as hdf5:
while True:
key, data = queue.get()
# Stop the process if key is None
if key is None:
logger.debug(
"Stopping process %d/%d (%s)",
iprocess + 1,
nprocess,
process,
)
break
logger.debug(
"Writing data %s to %s by process %d/%d (%s)",
key,
filename,
iprocess + 1,
nprocess,
process,
)
data.write(hdf5, key)
def finalize(self) -> None:
"""Terminate all processes and merge temporary files if needed."""
if self.nprocess > 1:
# Signal all processes for termination
logging.debug("Terminating all processes")
for _ in range(len(self.processes)):
self.queue.put((None, None))
# Wait for all processes to terminate
for process in self.processes:
process.join()
# Merge temporary files
if self.nprocess > 1:
self._merge_temp_files()
def _merge_temp_files(self) -> None:
"""Merge temporary files."""
logger.info("Merging temporary files into %s", self.filename)
with File(self.filename, "a") as hdf5:
for temp_file in self.temp_filenames:
with File(temp_file, "r") as temp_hdf5:
logger.debug("Copying data from %s to %s", temp_file, self.filename)
for name in temp_hdf5:
temp_hdf5.copy(name, hdf5)
os.remove(temp_file)
logging.debug("Merging completed")
[mypy]
ignore_missing_imports = True
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""Test cache."""
import os
import numpy as np
import pytest
from h5py import File
from lisainstrument.cache import Cache
from lisainstrument.containers import ForEachSC
def test_get_set(cache: Cache) -> None:
"""Test cache get/set mechanism."""
cache = Cache("test_cache.h5", mode="w")
cache["key1"] = ForEachSC(np.array([1]))
cache["key2"] = ForEachSC(np.array([2]))
assert cache["key1"] == ForEachSC(np.array([1]))
assert cache["key2"] == ForEachSC(np.array([2]))
with pytest.raises(KeyError):
cache["key3"] # pylint: disable=pointless-statement
def test_flush(cache: Cache) -> None:
"""Test cache flush."""
try:
cache = Cache("test_cache.h5", mode="w")
cache["key1"] = ForEachSC(np.array([1]))
cache["key2"] = ForEachSC(np.array([2]))
cache["key3"] = ForEachSC(np.array([3]))
cache.flush("key1", write=True)
cache.flush("key2", write=False)
cache.finalize()
assert cache["key3"] == ForEachSC(np.array([3]))
assert "key1" not in cache.cache
assert "key2" not in cache.cache
with File("test_cache.h5", "r") as f:
columns = [("1", "<f8"), ("2", "<f8"), ("3", "<f8")]
expected = np.array([(1.0, 1.0, 1.0)], dtype=columns)
assert np.all(f["key1"] == expected)
assert "key2" not in f
assert "key3" not in f
finally:
if os.path.exists("test_cache.h5"):
os.remove("test_cache.h5")
def test_cache_finalize(cache: Cache) -> None:
"""Test that temporary files are merged into the final file."""
try:
cache = Cache("test_cache.h5", nprocess=3, mode="w")
cache["key1"] = ForEachSC([1])
cache["key2"] = ForEachSC([2])
cache["key3"] = ForEachSC([3])
cache["key4"] = ForEachSC([4])
cache.flush("key1", write=True)
cache.flush("key2", write=True)
cache.flush("key3", write=True)
cache.flush("key4", write=True)
cache.finalize()
assert os.path.isfile("test_cache.h5")
for temp_file in cache.temp_filenames:
assert not os.path.exists(temp_file)
with File("test_cache.h5", "r") as f:
columns = [("1", "<f8"), ("2", "<f8"), ("3", "<f8")]
assert np.all(f["key1"] == np.array([(1.0, 1.0, 1.0)], dtype=columns))
assert np.all(f["key2"] == np.array([(2.0, 2.0, 2.0)], dtype=columns))
assert np.all(f["key3"] == np.array([(3.0, 3.0, 3.0)], dtype=columns))
assert np.all(f["key4"] == np.array([(4.0, 4.0, 4.0)], dtype=columns))
finally:
if os.path.exists("test_cache.h5"):
os.remove("test_cache.h5")
if os.path.exists("test_cache.h5.0"):
os.remove("test_cache.h5.0")
if os.path.exists("test_cache.h5.1"):
os.remove("test_cache.h5.1")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment