Commit fdc94e09 authored by JOSSOUD Olivier's avatar JOSSOUD Olivier
Browse files

All. Improve logger, make test, factorise common functions.

parent 156c7620
Pipeline #57808 passed with stages
in 2 minutes and 41 seconds
......@@ -13,6 +13,7 @@ class Logger:
self.__create_log_dir__()
self.__delete_old_files__()
self.object_id = object_id
self.date_str = None
def write(self, msg: str, level: str = "INFO", date_str: str = None, object_id: str = None, exception_type=Exception) -> None:
if object_id is None:
......@@ -24,6 +25,8 @@ class Logger:
message = "[" + now_str + "] " + level + " [" + object_id + "]"
if date_str is not None:
message += "[" + date_str + "]"
elif self.date_str is not None:
message += "[" + self.date_str + "]"
message += " " + msg + "\n"
# Write log message in log file
......
......@@ -9,6 +9,7 @@ import wimcollect.common.logger as logger
class LogConfig:
"""To-be-inherited mother class for any class which requires a configuration file parser and a debug logger."""
def __init__(self, object_id: str, config_parser: configobj.ConfigObj = None, log: logger = None):
# Config
if config_parser is None:
......@@ -23,8 +24,50 @@ class LogConfig:
else:
self.logger = log
# Local base directory
self.local_base_dir = self.config["LOCAL"]["base_dir"]
def get_standard_filepath(base_dir: str, site_id: str,
instrument_type: str, instrument_id: str, date_str: str,
extension: str) -> str:
"""
Parameters
----------
base_dir
site_id
instrument_type
instrument_id
date_str
extension
Returns
-------
"""
# Build destination directory
dest_dir = os.path.join(base_dir, site_id, instrument_type, instrument_id)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
# Build destination filename
filename = site_id + "_" + instrument_id + "_" + date_str + "." + extension
# Build full file path
dest_filepath = os.path.join(dest_dir, filename)
return dest_filepath
def get_config_parser() -> configobj.ConfigObj:
"""Get parser for `settings.ini` configuration file.
Returns
-------
configobj.ConfigObj
Configuration file parser.
"""
# Configuration file
pkgpath = os.path.dirname(pkgutil.get_loader("wimcollect").path)
conf_file_path = os.path.join(pkgpath, "config", "settings.ini")
......@@ -36,7 +79,7 @@ def get_config_parser() -> configobj.ConfigObj:
return config_parser
def recompress_file(zip_filepath: str) -> str:
def recompress_file(zip_filepath: str, log: logger.Logger = None) -> str:
"""Recompress archive file in LZMA.
LZMA compression algorithm produces smaller files than usual ZIP, and the decompression is faster.
......@@ -47,6 +90,8 @@ def recompress_file(zip_filepath: str) -> str:
----------
zip_filepath: str
Full file path of the source ZIP file.
log: logger.Logger
Logger used to inform user about the evolution/success/errors of the recompress process
Returns
-------
......@@ -54,9 +99,8 @@ def recompress_file(zip_filepath: str) -> str:
Full file path of the LZMA-compressed output file. Basically the same as `zip_filepath`, with `.zip` replaced
by `.lzma`.
"""
# Directory where the original ZIP is, where the ZIP content will temporarily be extracted and where the final
# LZMA file will be created.
current_directory = os.path.dirname(zip_filepath)
if log is not None:
log.info("Re-compressing from ZIP to LZMA: " + zip_filepath + "...")
with tempfile.TemporaryDirectory() as tmp_dirpath:
# Extract zip file and delete it
......@@ -72,12 +116,35 @@ def recompress_file(zip_filepath: str) -> str:
zipf.close()
if not os.path.exists(compressed_filepath):
raise Exception("Failed to recompress " + zip_filepath)
msg = "Failed to recompress " + zip_filepath
if log is not None:
log.error(msg, exception_type=FileNotFoundError)
else:
raise FileNotFoundError(msg)
else:
if log is not None:
log.info("Done. Archive file: " + compressed_filepath)
return compressed_filepath
def extract_compressed_file(compressed_filepath: str, dest_dir: str, delete_compressed: bool = False) -> list:
"""Extract the compressed file in the given destination directory.
Parameters
----------
compressed_filepath: str
Full path of the compressed file.
dest_dir: str
Full path of the local directory where the compressed file should be extracted.
delete_compressed: bool, optional
If `True`, the original compressed file will be deleted once its content has been extracted. Default is `True`.
Returns
-------
list
Full path of all the extracted files.
"""
source_zip = zipfile.ZipFile(compressed_filepath, 'r')
files_in_zip = [os.path.join(dest_dir, filename) for filename in source_zip.namelist()]
source_zip.extractall(dest_dir)
......@@ -87,7 +154,9 @@ def extract_compressed_file(compressed_filepath: str, dest_dir: str, delete_comp
return files_in_zip
def compress_file(uncompressed_filepath: str, delete_uncompressed_if_success: bool = True) -> str:
def compress_file(uncompressed_filepath: str,
log: logger.Logger = None,
delete_uncompressed_if_success: bool = True) -> str:
"""Compress a single file, using LZMA algorithm.
- The output compressed file is created in the same directory as the uncompressed file.
......@@ -97,6 +166,8 @@ def compress_file(uncompressed_filepath: str, delete_uncompressed_if_success: bo
----------
uncompressed_filepath: str
Full file path of the to-be-compressed file.
log: logger.Logger
Logger used to inform user about the evolution/success/errors of the compress process
delete_uncompressed_if_success: bool
If `True` delete the source uncompressed file once the compression is successfully done.
......@@ -106,6 +177,9 @@ def compress_file(uncompressed_filepath: str, delete_uncompressed_if_success: bo
Full file path of the LZMA-compressed output file. Basically the same as `uncompressed_filepath`, with its
extension replaced by `.lzma`.
"""
if log is not None:
log.info("Compressing " + uncompressed_filepath)
compressed_filepath = uncompressed_filepath + ".lzma"
zipf = zipfile.ZipFile(compressed_filepath, 'w', zipfile.ZIP_LZMA)
zipf.write(uncompressed_filepath, arcname=os.path.basename(uncompressed_filepath))
......@@ -113,7 +187,13 @@ def compress_file(uncompressed_filepath: str, delete_uncompressed_if_success: bo
if os.path.exists(compressed_filepath) and delete_uncompressed_if_success:
os.remove(uncompressed_filepath)
if log is not None:
log.info("Done. Archive file: " + compressed_filepath)
else:
raise FileNotFoundError("Failed to compress " + uncompressed_filepath)
msg = "Failed to compress " + uncompressed_filepath
if log is not None:
log.error(msg, exception_type=FileNotFoundError)
else:
raise FileNotFoundError(msg)
return compressed_filepath
import os
import configobj
import datetime
import wimcollect.common.logger as logger
import wimcollect.common.utils as utils
import wimcollect.common.ftp as ftp
class Collector(utils.LogConfig):
class MaidoMf1hCollector(utils.LogConfig):
"""Class to collect Maïdo's Meteo-France data: one file per month (released every 18th of the next month), 1 data
line per hour."""
def __init__(self, config_parser: configobj.ConfigObj = None, log: logger = None):
self.object_id = "FTPAERIS"
utils.LogConfig.__init__(self, self.object_id, config_parser, log)
self.distant_base_dir = self.config[self.object_id]["distant_base_dir"]
def download_maido_mf_1h(self, yyyymm: str):
def download(self, yyyymm: str) -> bool:
"""Download hourly monthly-released meteo data of Meteo-France's Maïdo station.
Parameters
----------
yyyymm: str
Year and month of the to-be-downloaded data
Returns
-------
bool
`True` if everything went well, otherwise raises Exception.
"""
self.logger.info("Download Maido's Meteo-France meteo data.", yyyymm)
self.logger.date_str = yyyymm
self.logger.info("Download Maido's Meteo-France '1hour' meteo data.")
# Build source and destination file paths.
source_filepath = self.distant_base_dir + "/" + "PMAIDO_1h_" + yyyymm + ".csv"
dest_filepath = self.__get_dest_filepath__(yyyymm)
dest_filepath = utils.get_standard_filepath(self.local_base_dir, "REU", "meteo", "maidomf1h", yyyymm, "csv")
# Download
success = ftp.download_file(source_filepath, dest_filepath, self.logger, ftp_config=self.config[self.object_id])
# Compress
if success:
self.logger.info("Compressing ...", yyyymm)
utils.compress_file(dest_filepath)
self.logger.info("Done. Archive file: " + dest_filepath, yyyymm)
utils.compress_file(dest_filepath, self.logger)
return success
def __get_dest_filepath__(self, yyyymm: str) -> str:
# Build destination directory
dest_dir = os.path.join(self.config["LOCAL"]["base_dir"], "REU", "meteo", "maidomf_1h")
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
class MaidoMf1mCollector(utils.LogConfig):
"""Class to collect Maïdo's Meteo-France data: one file per day (theoretically released every day, however data can
be available few days after expected release...), 1 data line per minute."""
# Build destination filename
filename = "REU_maidomf_" + yyyymm + ".csv"
def __init__(self, config_parser: configobj.ConfigObj = None, log: logger = None):
self.object_id = "FTPAERIS"
utils.LogConfig.__init__(self, self.object_id, config_parser, log)
self.distant_base_dir = self.config[self.object_id]["distant_base_dir"]
def download(self, day: datetime.date) -> bool:
"""Download minutely daily-released meteo data of Meteo-France's Maïdo station.
Parameters
----------
day: datetime.date
Date of the data which should be downloaded.
# Build full file path
dest_filepath = os.path.join(dest_dir, filename)
Returns
-------
bool
`True` if everything went well, otherwise raises Exception.
"""
date_str = day.strftime("%Y%m%d")
self.logger.date_str = date_str
self.logger.info("Download Maido's Meteo-France '1minute' meteo data.")
# Build source and destination file paths.
source_filepath = self.distant_base_dir + "/" + "PMAIDO_1mn_" + date_str + ".csv"
dest_filepath = utils.get_standard_filepath(self.local_base_dir, "REU", "meteo", "maidomf1m", date_str, "csv")
# Download
success = ftp.download_file(source_filepath, dest_filepath, self.logger, ftp_config=self.config[self.object_id])
# Compress
if success:
utils.compress_file(dest_filepath, self.logger)
return dest_filepath
return success
......@@ -8,17 +8,16 @@ import wimcollect.common.ftp as ftp
import wimcollect.common.utils as utils
class Collector(utils.LogConfig):
class PicarroCollector(utils.LogConfig):
"""Collector for Picarro data stored on CEA's FTP server. These data archives are probably produced on Picarro with
the `wimpush` package. """
def __init__(self, config_parser: configobj.ConfigObj = None, log: logger = None):
self.object_id = "FTPCEA"
utils.LogConfig.__init__(self, self.object_id, config_parser, log)
self.distant_base_dir = self.config[self.object_id]["distant_base_dir"]
####################################################################################################################
# Picarro
def download_picarro(self, site_id: str, picarro_id: str, day: datetime.date):
def download(self, site_id: str, picarro_id: str, day: datetime.date):
"""Download Picarro file from FTP server.
Parameters
......@@ -30,57 +29,43 @@ class Collector(utils.LogConfig):
day: datetime.date
Date of the data which should be downloaded.
"""
date_str = day.strftime("%Y%m%d")
self.logger.date_str = date_str
ftp_session = ftp.connect(self.config[self.object_id], self.logger)
# Build source file path
source_dir = self.distant_base_dir + "/" + site_id + "/picarro/"
source_filename = site_id + "_" + picarro_id + day.strftime("%Y%m%d") + ".lzma"
source_filepath = source_dir + source_filename
source_filepath = self.get_source_filepath(self.distant_base_dir, site_id, picarro_id, date_str)
# Check that to-be-downloaded file exists
source_filepaths = ftp.list_files(ftp_session, source_dir, self.logger)
if source_filepath not in source_filepaths:
self.logger.error("File not found: " + source_filepath, day.strftime("%Y-%m-%d"),
exception_type=FileNotFoundError)
# Build destination file path
dest_filepath = utils.get_standard_filepath(self.local_base_dir, site_id, "picarro", picarro_id, date_str, "lzma")
# Download file
self.logger.info("Download picarro data from " + site_id, day.strftime("%Y-%m-%d"))
dest_filepath = self.__get_picarro_dest_filepath__(source_filepath)
success = ftp.download_file(source_filepath, dest_filepath, self.logger,
ftp_session=ftp_session)
success = ftp.download_file(source_filepath, dest_filepath, self.logger, ftp_session=ftp_session)
ftp_session.quit()
return success
def __get_picarro_dest_filepath__(self, source_filepath: str) -> str:
"""Get the full path where the Picarro data file should be downloaded.
Parameters
----------
source_filepath: str
Full path of the distant (FTP) to-be-downloaded Picarro data file.
Returns
-------
str
Local destination file path.
"""
filename = os.path.basename(source_filepath)
site_id, picarro_id, date_str = re.split('_|\.', filename)[0:3]
date = datetime.datetime.strptime(date_str, "%Y%m%d").date()
@staticmethod
def get_source_filepath(base_dir: str, site_id: str, picarro_id: str, date_str: str) -> str:
source_dir = base_dir + "/" + site_id + "/picarro/"
source_filename = site_id + "_" + picarro_id + "_" + date_str + ".lzma"
source_filepath = source_dir + source_filename
return source_filepath
dest_dir = os.path.join(self.config["LOCAL"]["base_dir"], site_id, "picarro", picarro_id, str(date.year))
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
dest_filepath = os.path.join(dest_dir, filename)
return dest_filepath
class HoboCollector(utils.LogConfig):
"""Collector for Hobo meteo data stored on CEA's FTP server. These data archives are probably produced on the local
Picarro instrument with the `wimpush` package: the local operator manually extracts data from Hobo and saves the
file on the Picarro."""
####################################################################################################################
# Hobo
def __init__(self, config_parser: configobj.ConfigObj = None, log: logger = None):
self.object_id = "FTPCEA"
utils.LogConfig.__init__(self, self.object_id, config_parser, log)
self.distant_base_dir = self.config[self.object_id]["distant_base_dir"]
def download_hobo(self, site_id: str):
def download(self, site_id: str):
"""Download all Hobo files from FTP server.
The distant files will be deleted from the server if the transfer is successful.
......@@ -97,13 +82,13 @@ class Collector(utils.LogConfig):
source_filepaths = ftp.list_files(ftp_session, hobo_distant_path, self.logger)
for source_filepath in source_filepaths:
dest_filepath = self.__get_hobo_dest_filepath__(source_filepath)
dest_filepath = self.__get_dest_filepath__(source_filepath)
ftp.download_file(source_filepath, dest_filepath, self.logger,
ftp_session=ftp_session)
ftp_session.quit()
def __get_hobo_dest_filepath__(self, source_filepath: str):
def __get_dest_filepath__(self, source_filepath: str):
"""Get the full path where the Hobo data file should be downloaded.
Parameters
......
import os
import configobj
import datetime
......@@ -7,17 +6,16 @@ import wimcollect.common.ftp as ftp
import wimcollect.common.utils as utils
class Collector(utils.LogConfig):
class MaidoFtirCollector(utils.LogConfig):
"""Collector for Maïdo's FTIR meteo data file"""
def __init__(self, config_parser: configobj.ConfigObj = None, log: logger = None):
self.object_id = "FTPOPAR"
utils.LogConfig.__init__(self, self.object_id, config_parser, log)
self.distant_base_dir = self.config[self.object_id]["distant_base_dir"]
def download_maido_ftir(self, day: datetime.date):
"""Download all Picarro files from FTP server.
The distant files will be deleted from the server if the transfer is successful.
def download(self, day: datetime.date):
"""Download Maïdo's FTIR meteo data file.
Parameters
----------
......@@ -25,32 +23,19 @@ class Collector(utils.LogConfig):
Date of the data which should be downloaded.
"""
date_str = day.strftime("%Y%m%d")
self.logger.date_str = date_str
# Build source and destination file paths
source_filepath = self.distant_base_dir + "/" + date_str + "Meteo125HR.xls"
dest_filepath = self.__get_dest_filepath__(day)
dest_filepath = utils.get_standard_filepath(self.local_base_dir, "REU", "meteo", "maidoftir", date_str, "tsv")
# Download
self.logger.info("Download Maido's FTIR meteo data.", date_str)
self.logger.info("Download Maido's FTIR meteo data.")
success = ftp.download_file(source_filepath, dest_filepath, self.logger,
ftp_config=self.config[self.object_id])
# Compress
if success:
self.logger.info("Compressing " + dest_filepath, date_str)
compressed_filepath = utils.compress_file(dest_filepath)
self.logger.info(self.object_id, date_str + ": Done. Archive file: " + compressed_filepath)
def __get_dest_filepath__(self, day: datetime.date) -> str:
# Build destination directory
dest_dir = os.path.join(self.config["LOCAL"]["base_dir"], "REU", "meteo", "maidoftir")
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
# Build destination filename
filename = "REU_maidoftir_" + day.strftime("%Y%m%d") + ".tsv"
# Build full file path
dest_filepath = os.path.join(dest_dir, filename)
utils.compress_file(dest_filepath, self.logger)
return dest_filepath
return True
......@@ -23,42 +23,25 @@ class Collector(utils.LogConfig):
day: datetime.date
Date of the data which should be downloaded.
"""
day_str = day.strftime("%Y-%m-%d")
date_str = day.strftime("%Y%m%d")
self.logger.date_str = date_str
# Build source file url
source_fileurl = self.base_url + "/DDU_" + picarro_id + "_" + day.strftime("%Y%m%d") + ".zip"
source_fileurl = self.base_url + "/DDU_" + picarro_id + "_" + date_str + ".zip"
# Build destination file path
dest_filepath = self.__get_dest_filepath__(picarro_id, day, source_fileurl)
dest_filepath = utils.get_standard_filepath(self.local_base_dir, "DDU", "picarro", picarro_id, date_str, "zip")
# Download file
self.logger.info("Downloading DDU Picarro file from " + source_fileurl, day_str)
self.logger.info("Downloading DDU Picarro file from " + source_fileurl)
response = urllib.request.urlretrieve(source_fileurl, dest_filepath)
success = response[0] == dest_filepath and os.path.exists(dest_filepath)
if not success:
self.logger.error("FAILED to download file in " + dest_filepath, day_str)
self.logger.error("FAILED to download file in " + dest_filepath)
else:
self.logger.info("File downloaded in " + dest_filepath, day_str)
self.logger.info("File downloaded in " + dest_filepath)
# Recompress file, from ZIP to LZMA
self.logger.info("Re-compressing from ZIP to LZMA...", day_str)
lzma_filepath = utils.recompress_file(dest_filepath)
if lzma_filepath is None:
self.logger.error("FAILED to re-compress " + dest_filepath, day_str)
else:
self.logger.info("Done. Archive file: " + lzma_filepath, day_str)
return True
def __get_dest_filepath__(self, picarro_id: str, day: datetime.date, source_filepath: str) -> str:
# Build destination directory
dest_dir = os.path.join(self.config["LOCAL"]["base_dir"], "DDU", "picarro", picarro_id, str(day.year))
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
# Build destination filename
dest_filename = os.path.basename(source_filepath)
# Build full file path
dest_filepath = os.path.join(dest_dir, dest_filename)
utils.recompress_file(dest_filepath, self.logger)
return dest_filepath
return True
......@@ -31,8 +31,8 @@ def main():
yesterday = datetime.datetime.now() - datetime.timedelta(1)
ftpaeris_col = ftpaeris.Collector()
ftpaeris_col.download_maido_mf_1h("201913")
ftpaeris_col = ftpaeris.MaidoMf1hCollector()
ftpaeris_col.download("201913")
httpddu_col = httpddu.Collector()
httpddu_col.download_picarro(yesterday)
......
import os
import re
import configobj
import datetime
......@@ -8,7 +7,7 @@ import wimcollect.common.logger as logger
import wimcollect.common.sftp as sftp
class Collector(utils.LogConfig):
class PicarroCollector(utils.LogConfig):
"""Collector for Dome C / Concordia station (trigram: DMC) data, hosted on Hermes server, managed by italian's PNRA.
"""
......@@ -16,8 +15,8 @@ class Collector(utils.LogConfig):
utils.LogConfig.__init__(self, "SFTPDMC", config_parser, log)
self.distant_base_dir = self.config[self.object_id]["distant_base_dir"]
def download_picarro(self, picarro_id: str, day: datetime.date):
"""Download Picarro data file from SFTP server.
def download(self, picarro_id: str, day: datetime.date):
"""Download Picarro data file from DMC's SFTP server.
Parameters
----------
......@@ -26,48 +25,23 @@ class Collector(utils.LogConfig):
day: datetime.date
Date of the data which should be downloaded.
"""
day_str = day.strftime("%Y-%m-%d")
date_str = day.strftime("%Y%m%d")
self.logger.date_str = date_str
# Build source file path
picarro_number = re.sub("[^0-9]", "", picarro_id)
source_filepath = self.distant_base_dir + "/" + picarro_number \
+ "/DMC_" + picarro_id + "_" + day.strftime("%Y%m%d") + ".zip"
source_filepath = self.distant_base_dir + "/" + picarro_number + "/DMC_" + picarro_id + "_" + date_str + ".zip"
# Build destination file path
dest_filepath = self.__get_dest_filepath__(picarro_id, day, source_filepath)
dest_filepath = utils.get_standard_filepath(self.local_base_dir, "DMC", "picarro", picarro_id, date_str, "zip")
# Download file
self.logger.info("Downloading DMC Picarro file from SFTP server.", day_str)
sftp_client = sftp.connect(self.config[self.object_id], self.logger)
success = sftp.download_file(sftp_client, source_filepath, dest_filepath, self.logger)
sftp_client.close()
if not success:
self.logger.error("FAILED to download file in " + dest_filepath, day_str)
else:
self.logger.info("File downloaded in " + dest_filepath, day_str)
# Re-compress, from ZIP to LZMA
if success:
self.logger.info("Re-compressing from ZIP to LZMA...", day_str)
lzma_filepath = utils.recompress_file(dest_filepath)
if lzma_filepath is None:
self.logger.error("FAILED to create archive file", day_str)
else:
self.logger.info("Done. Archive file: " + lzma_filepath, day_str)
def __get_dest_filepath__(self, picarro_id: str, day: datetime.date, source_filepath: str) -> str:
# Build destination directory
dest_dir = os.path.join(self.config["LOCAL"]["base_dir"],
"DMC", "picarro", picarro_id, str(day.year))
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
# Build destination filename
dest_filename = os.path.basename(source_filepath)
# Build full file path