Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 81beebdd authored by JOSSOUD Olivier's avatar JOSSOUD Olivier
Browse files

FTP CEA.

parent 527fa4bb
import configobj
import ftplib
import codecs
import wimcollect.logger as logger
class FtpCollector:
def __init__(self, config_parser: configobj.ConfigObj, log: logger, object_id: str):
# Logger
self.logger = log
self.object_id = object_id
# Config
self.config = config_parser
self.base_dir = self.config[self.object_id]["root_dir"]
def _ftp_connect_(self):
try:
self.session = ftplib.FTP(host=self.config[self.object_id]["host"],
user=self.config[self.object_id]["user"],
passwd=codecs.decode(self.config[self.object_id]["password"], "rot_13"))
except ftplib.error_perm as e:
self.logger.write(self.object_id, "Failed to connect to FTP: " + str(e))
except BlockingIOError as e:
self.logger.write(self.object_id, "Host [" + self.config[self.object_id]["host"] + "] seems to be unreachable.")
else:
self.logger.write(self.object_id,
"Connected to [" + self.config[self.object_id]["host"] + "] as user: "
+ self.config[self.object_id]["user"])
def _ftp_list_files_(self, ftp_session: ftplib.FTP, path: str) -> list:
filepaths = []
try:
filepaths = ftp_session.nlst(path)
except ftplib.error_perm as resp:
if str(resp) == "550 No files found":
self.logger.write(self.object_id, "No files in directory " + path)
else:
raise
return filepaths
def _ftp_download_file(self, ftp_session: ftplib.FTP,
source_filepath: str, dest_filepath: str,
delete_if_success: bool = False) -> None:
self.logger.write(self.object_id, "Downloading " + source_filepath + " ...")
with open(dest_filepath, 'wb') as file:
response = ftp_session.retrbinary('RETR %s' % source_filepath, file.write)
if response == "226 Transfer complete.":
if delete_if_success:
ftp_session.delete(source_filepath)
self.logger.write(self.object_id, "Done. Destination file: " + dest_filepath)
else:
self.logger.write(self.object_id, "FAILED. Destination file: " + dest_filepath)
import os
import re
import configobj
import datetime
import wimcollect.logger as logger
import wimcollect.ftp as ftp
class Collector(ftp.FtpCollector):
def __init__(self, config_parser: configobj.ConfigObj, log: logger):
self.object_id = "FTPCEA"
ftp.FtpCollector.__init__(self, config_parser, log, self.object_id)
####################################################################################################################
# Picarro
def download_picarro(self, site_id: str):
"""Download all Picarro files from FTP server.
The distant files will be deleted from the server if the transfer is successful.
Parameters
----------
site_id: str
Site's trigram
"""
self.logger.write(self.object_id, "Download picarro data from " + site_id)
self._ftp_connect_()
picarro_distant_path = self.base_dir + "/" + site_id + "/picarro/"
self.session.cwd(picarro_distant_path)
source_filepaths = self._ftp_list_files_(self.session, picarro_distant_path)
for source_filepath in source_filepaths:
dest_filepath = self.__get_picarro_dest_filepath__(source_filepath)
self._ftp_download_file(self.session, source_filepath, dest_filepath, delete_if_success=True)
self.session.quit()
def __get_picarro_dest_filepath__(self, source_filepath: str) -> str:
"""Get the full path where the Picarro data file should be downloaded.
Parameters
----------
source_filepath: str
Full path of the distant (FTP) to-be-downloaded Picarro data file.
Returns
-------
str
Local destination file path.
"""
filename = os.path.basename(source_filepath)
site_id, picarro_id, date_str = re.split('_|\.', filename)[0:3]
date = datetime.datetime.strptime(date_str, "%Y%m%d").date()
dest_dir = os.path.join(self.config["LOCAL"]["base_dir"], site_id, "picarro", picarro_id, str(date.year))
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
dest_filepath = os.path.join(dest_dir, filename)
return dest_filepath
####################################################################################################################
# Hobo
def download_hobo(self, site_id: str):
"""Download all Hobo files from FTP server.
The distant files will be deleted from the server if the transfer is successful.
Parameters
----------
site_id: str
Site's trigram
"""
self.logger.write(self.object_id, "Download hobo data from " + site_id)
self._ftp_connect_()
hobo_distant_path = self.base_dir + "/" + site_id + "/hobo/"
self.session.cwd(hobo_distant_path)
source_filepaths = self._ftp_list_files_(self.session, hobo_distant_path)
for source_filepath in source_filepaths:
dest_filepath = self.__get_hobo_dest_filepath__(source_filepath)
self._ftp_download_file(self.session, source_filepath, dest_filepath, delete_if_success=True)
self.session.quit()
def __get_hobo_dest_filepath__(self, source_filepath: str):
"""Get the full path where the Hobo data file should be downloaded.
Parameters
----------
source_filepath: str
Full path of the distant (FTP) to-be-downloaded Hobo data file.
Returns
-------
str
Local destination file path.
"""
filename = os.path.basename(source_filepath)
filename_parts = re.split('_|\.|-', filename)
site_id = filename_parts[0]
hobo_id = filename_parts[2]
dest_dir = os.path.join(self.config["LOCAL"]["base_dir"], site_id, "meteo", "hobo_" + hobo_id)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
dest_filepath = os.path.join(dest_dir, filename)
return dest_filepath
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment