Commit f3d193da authored by Guillaume's avatar Guillaume
Browse files

Refactoring by Guillaume

parent fbab4179
......@@ -58,28 +58,22 @@ class DRSParser(object):
"""
# Check vocabulary.
facets= None
try:
self.dir_parser.parse(path.parent.as_posix())
# Deserialize p.parent in dict excluding project.
facets = dict(zip(self.dir_keys, path.parent.parts[1:]))
facets = dict((zip(self.dir_keys, path.parent.parts[1:])))
#return facets
return facets
# Vocabulary error handling.
except TemplateParsingError as e:
print(e)
# Key error handling. Due to wrong number of facets in comparison with the path parts.
except KeyError as e:
print(e)
finally:
#print("PB sur :",path)
return facets
def get_facets_from_filename(self, basename):
"""
Deserialize a filename string against a DRS.
......@@ -104,7 +98,7 @@ class DRSParser(object):
self.fx_file_parser.parse(basename)
# No timerange.
tstart, tend = "", ""
tstart, tend = "none", "none"
# Vocabulary error handling.
except TemplateParsingError as e:
......
......@@ -6,7 +6,7 @@ Created on Thu Jan 27 15:03:29 2022
@author: ltroussellier
"""
from esm_cat_generator1 import Context,Process
from esm_cat_generator import Context,Process
from DRSParser import DRSParser
from pathlib import Path
......
......@@ -7,20 +7,11 @@
"""
IPSL_DATA_ROOT = '/bdd/'
INPUT_SOURCES = {'CMIP6': ['/home/ltroussellier/dev_catalog/test3_gui/catalog/InputFakeFile/' # CMIP6 replicas (IDRIS)
]}
# INPUT_SOURCES = {'CMIP6': ['/bdd/CMIP6/C4MIP/MOHC/.paths.txt', # CMIP6 replicas (IDRIS)
# '/bdd/CMIP6/C4MIP/IPSL/.paths.txt' # CMIP6 IPSL datasets (TGCC)
# ]}
# INPUT_SOURCES = {'CMIP6': ['/Users/glipsl/Desktop/.paths.txt'],
# 'CMIP5': ['/bdd/CMIP5/output/MOHC/.paths.txt', # CMIP5 replicas (IDRIS)
# '/bdd/CMIP5/output/IPSL/.paths.txt' # CMIP5 IPSL datasets (TGCC)
# ],
# 'CORDEX': ['/bdd/CORDEX/output/EUR-11/MOHC/.paths.txt', # CORDEX replicas (IDRIS)
# '/bdd/CORDEX/output/EUR-11/IPSL/.paths.txt' # CORDEX IPSL datasets (TGCC)
# ]}
INPUT_SOURCES_GRAN = {'CMIP6': "experiment_id", 'CMIP5': "experiment", 'CORDEX': "experiment" }
INPUT_PATTERNS = {'CMIP6': '/bdd/CMIP6/*/*/*/*/.paths.txt',
'CMIP5': '/bdd/CMIP5/output/*/*/*/.paths.txt',
'CORDEX': '/bdd/CORDEX/output/*/*/*/*/.paths.txt'}
ALLOWED_PROJECTS = ['CMIP6', 'CMIP5', 'CORDEX']
CSV_EXTENSION = ".csv"
......
#!bin/python
# -*- coding: utf-8 -*-
import signal
import subprocess
import os
from argparse import ArgumentParser
from glob import iglob
from multiprocessing import Pool
from pathlib import Path
import lockfile
from tqdm import tqdm
from DRSParser import DRSParser
from constants import *
from utils import *
......@@ -27,145 +23,82 @@ class Process(object):
Processing context passed to each process.
"""
# print("init prrocess")
# self.drs = ctx.drs
# print("DRS OK")
# Retrieve processing context args.
self.outcat = ctx.outcat
self.depth = ctx.depth
self.project = ctx.project
#print("init end")
def __call__(self, entry):
# Retrieve DRS parser from global variables forked to the worker.
global drs
# Build CSV header.
self.header = ['path', 'project']
self.header.extend(drs.dir_keys)
self.header.extend(['period_start', 'period_end', 'latest'])
# Set CSV entries list.
self.entries = list()
def __call__(self, source):
"""
Any error switches to the next child process.
It does not stop the main process at all.
"""
# Escape in case of error.
try:
# Split entry into full file path and latest boolean.
#print("coucou du process")
path, latest = map(str.strip, entry.split())
#print(path, latest)
# Convert path into pathlib.Path object.
path = Path(path)
global drs
# Get facets from path.
facets = drs.get_facets_from_path(path)
# Update facets from filename.
facets.update(drs.get_facets_from_filename(path.stem))
# Build CSV entry.
entry_facets = [facets[i] for i in drs.dir_keys]
entry = [IPSL_DATA_ROOT + path.as_posix(), self.project]
entry.extend(entry_facets)
entry.extend([facets['period_start'], facets['period_end'], latest])
# Build catalog filename.
if self.depth == 'project':
catpath = os.path.join(self.outcat,
self.project,
self.project)
else:
catdepth = entry_facets[:drs.dir_keys.index(self.depth) + 1]
catpath = os.path.join(self.outcat,
self.project,
'_'.join(catdepth))
# Lock catalog file to avoid multiprocessing concurrent access.
lock = lockfile.LockFile(catpath + CSV_EXTENSION)
with lock:
# Create catalog files (CSV + JSON) if not exists.
if not os.path.isfile(catpath + CSV_EXTENSION):
# Build CSV header.
header = ['path', 'project']
header.extend(drs.dir_keys)
header.extend(['period_start', 'period_end', 'latest'])
# Open source file.
with open(source, 'r') as f:
# Ensure header and entry have same length.
assert len(header) == len(entry)
for line in f:
# Write CSV header.
make_csv(catpath + CSV_EXTENSION, header)
# Split entry into full file path and latest boolean.
path, latest = map(str.strip, line.split())
# Write JSON catalog in the same time.
make_json(catpath + JSON_EXTENSION, self.project, header)
# Convert path into pathlib.Path object.
path = Path(path)
# Write catalog entry.
with open(catpath + CSV_EXTENSION, 'a+') as f:
f.write(','.join(entry) + '\n')
# Get facets from path.
# Sets empty dict in the case of parsing error raised by DRSParser.
facets = drs.get_facets_from_path(path) or dict()
except Exception:
# Update facets from filename.
facets.update(drs.get_facets_from_filename(path.stem))
raise
# If facet dict is empty, go to next line/path.
if not facets:
continue
# Build CSV entry.
entry_facets = [facets[i] for i in drs.dir_keys]
entry = [IPSL_DATA_ROOT + path.as_posix(), self.project]
entry.extend(entry_facets)
entry.extend([facets['period_start'], facets['period_end'], latest])
class Runner(object):
# Ensure header and entry have same length.
assert len(self.header) == len(entry)
def __init__(self, threads):
#print("coucou")
# Initialize the pool.
self.pool = None
# Append entry to final list.
self.entries.append(entry)
if threads != 1:
self.pool = Pool(processes=threads)
catname = '_'.join(source.split('/')[2:-1])
catpath = os.path.join(self.outcat, self.project, catname)
def _handle_sigterm(self):
# Remove existing catalog files.
try:
for ext in [CSV_EXTENSION, JSON_EXTENSION]:
os.remove(catpath + ext)
except OSError:
pass
# Properly kill the pool in case of SIGTERM.
if self.pool:
self.pool.terminate()
# Write JSON catalog.
make_json(catpath + JSON_EXTENSION, self.project, self.header)
exit(1)
# Write CSV catalog.
make_csv(catpath + CSV_EXTENSION, self.header, self.entries)
def run(self, sources, ctx):
#print("coucou run")
# Instantiate signal handler.
sig_handler = signal.signal(signal.SIGTERM, self._handle_sigterm)
#print(sources)
# Read sources.
for source in sources:
#print(source)
# Get total entires in a fast way for beautiful progress bar.
total = int(subprocess.check_output(["wc", "-l", source]).split()[0])
chunksize = int(total/10)
#total = 5000000
#print("total à traiter", total)
# Instantiate pool of processes.
if self.pool:
#print("TRUE")
# Instantiate pool iterator with progress bar.
processes = tqdm(self.pool.imap(Process(ctx), self.get_entries(source), chunksize=chunksize), desc='Catalog generation',
total=total)
# Sequential processing use basic map function.
else:
#print("FALSE")
# Instantiate processes iterator with progress bar.
processes = tqdm(map(Process(ctx), self.get_entries(source)), desc='Catalog generation', total=total)
#print("ola")
#print(processes)
# Run processes in a dummy variable.
#print(len(processes))
_ = [x for x in processes]
# Terminate pool in case of SIGTERM signal.
signal.signal(signal.SIGTERM, sig_handler)
# Close the pool.
if self.pool:
self.pool.close()
self.pool.join()
@staticmethod
def get_entries(source):
# Iterate over each line of the source file.
with open(source, 'r') as f:
for entry in f:
yield entry
except Exception as e:
print(source, e)
pass
def get_args():
......@@ -203,24 +136,17 @@ def get_args():
)
parser.add_argument(
'-t', '--threads',
'-c', '--processes',
metavar='1',
type=int,
default=1,
help="""
Number of threads.
Number of processes.
Set to "1" seems pure sequential processing (default).
Set to "-1" seems all available threads as returned by "multiprocessing.cpu_count()".
Set to "-1" seems all available CPUs as returned by "multiprocessing.cpu_count()".
"""
)
parser.add_argument(
'-d', '--depth',
default='project',
type=str,
help="""Optional DRS depth allowing the generation of catalogues at a certain granularity."""
)
# Return command-line parser & program name.
return parser.prog, parser.parse_args()
......@@ -235,30 +161,19 @@ class Context(object):
# Set project.
self.project = args.project
# # Set DRS parser.
# #global drs
# drs = DRSParser(args.project)
# self.drs = drs
# Set output catalog directory.
self.outcat = args.outcat
# Set max pool processes.
self.threads = args.threads
# Set catalog depth.
self.depth = args.depth
# Set sources
self.sources = INPUT_SOURCES[args.project]
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
drs = None
def main():
"""
Run main program
......@@ -269,18 +184,42 @@ def main():
# Add program name as argument.
setattr(args, 'prog', prog)
#setting up DRSParser:
# Set DRS parser as global variable to be forked into the pool workers.
# Setting up DRS parser into processing context raise a pickling error due to pyessv librairie.
global drs
drs = DRSParser(args.project)
# Instantiate processing context
with Context(args) as ctx:
# Instantiate the runner.
r = Runner(ctx.threads)
# Run the pool.
r.run(ctx.sources, ctx)
# If processes is not 1, use multiprocessing.Pool.
if args.processes != 1:
# Instantiate Pool context.
with Pool(processes=args.processes) as pool:
# Instantiate pool iterator with progress bar.
processes = pool.imap(Process(ctx), get_sources(args.project))
# If processes is 1, use basic map function.
else:
# Instantiate processes iterator with progress bar.
processes = map(Process(ctx), get_sources(args.project))
# Run processes in a dummy variable.
_ = [x for x in processes]
def get_sources(project):
"""
Returns an iterator over list of input sources following pattern of the project.
"""
pattern = INPUT_PATTERNS[project]
for source in iglob(pattern):
yield source
if __name__ == "__main__":
......
#!bin/python
# -*- coding: utf-8 -*-
import signal
import subprocess
from argparse import ArgumentParser
from multiprocessing import Pool
from pathlib import Path
import lockfile
from tqdm import tqdm
from DRSParser import DRSParser
from constants import *
from utils import *
import os
from glob import iglob
class Process(object):
"""
Child process.
"""
def __init__(self, ctx):
"""
Processing context passed to each process.
"""
# print("init prrocess")
# self.drs = ctx.drs
# print("DRS OK")
self.outcat = ctx.outcat #OutputCatalog/
# self.depth = ctx.depth
self.project = ctx.project
#print("init end")
def __call__(self, entry): # entry => fpath1.txt .. .dans lequel il y a les paths a traiter
"""
Any error switches to the next child process.
It does not stop the main process at all.
"""
#print("FROM PROCESS", entry)
# Escape in case of error.
try:
global drs
res = ""
entry_facets=None
debug = ""
#nbpath=0
with open(entry,"r") as fi :
for oneLine in fi :
#nbpath=nbpath+1
# Split entry into full file path and latest boolean.
path, latest = map(str.strip, oneLine.split())
#print(path, latest)
# Convert path into pathlib.Path object.
path = Path(path)
# Get facets from path.
facets = drs.get_facets_from_path(path)
#debug ="1"+str(facets)+"\n"
if facets!= None:
# Update facets from filename.
facets.update(drs.get_facets_from_filename(path.stem))
#debug = debug + "2"+str(facets)+"\n"
# Build CSV entry.
entry_facets = [facets[i] for i in drs.dir_keys]
#debug = debug + "3"+str(entry_facets)+"\n"
csvEntry = [IPSL_DATA_ROOT + path.as_posix(), self.project]
csvEntry.extend(entry_facets)
csvEntry.extend([str(facets['period_start']), str(facets['period_end']), latest])
res=res+','.join(csvEntry) + '\n'
else:
print("PB : ",oneLine)
#print("RES est remplit Normalement \n",res)
# Build catalog filename.
##### JUSTE POUR DEBUG
#global nbTot
#nbTot=nbTot+1
#print("NB:",nbTot, " pour le worker : ",os.getpid(),"nbre de path vu pour ce fichier: ",nbpath)
##### FIN POUR DEBUG
if entry_facets ==None:
return #il n'y a pas de path à tratier si bien qu'il n'est pas passé dans la boucleau dessus
#raise Exception("On a entry_facets = None \n"+debug)
catdepth = entry_facets[:drs.dir_keys.index(INPUT_SOURCES_GRAN[self.project]) + 1]
catpath = os.path.join(self.outcat,
self.project,
'_'.join(catdepth))
# Lock catalog file to avoid multiprocessing concurrent access.
# lock = lockfile.LockFile(catpath + CSV_EXTENSION)
# with lock:
# Create catalog files (CSV + JSON) if not exists.
if not os.path.isfile(catpath + CSV_EXTENSION):
# Build CSV header.
header = ['path', 'project']
header.extend(drs.dir_keys)
header.extend(['period_start', 'period_end', 'latest'])
# # Ensure header and entry have same length.
# assert len(header) == len(entry)
# Write CSV header.
make_csv(catpath + CSV_EXTENSION, header)
# Write JSON catalog in the same time.
make_json(catpath + JSON_EXTENSION, self.project, header)
# Write catalog entry.
with open(catpath + CSV_EXTENSION, 'a+') as f:
f.write(res)
except Exception as e:
print("Exception pour :",entry,"\n", e,"\n")
if "csvEntry" in locals():
print(csvEntry,"\n")
# if "res" in locals():
# print(res,"\n")
pass
class Runner(object):
def __init__(self, threads):
#print("coucou")
# Initialize the pool.
self.pool = Pool(processes=threads)
def _handle_sigterm(self):
# Properly kill the pool in case of SIGTERM.
self.pool.terminate()
exit(1)
def run(self, sources, ctx):
# Instantiate signal handler.
sig_handler = signal.signal(signal.SIGTERM, self._handle_sigterm)
# Read sources. # sources are the input directories
# Dans ces repertoires il y des fichiers déjà séparé par experiment_id
for source in sources:
#print("mesource : ",source)
#FichList = [ f for f in os.listdir('.') if os.path.isfile(os.path.join('.',f)) ]
# Get total entires in a fast way for beautiful progress bar.
#total = int(subprocess.check_output(["wc", "-l", source]).split()[0])
total = len([ f for f in os.listdir(source) if os.path.isfile(os.path.join(source,f)) ])
#chunksize = int(total/10)
# Instantiate pool of processes.
# Instantiate pool iterator with progress bar.
processes = tqdm(self.pool.imap(Process(ctx), self.get_files(source), chunksize=1), desc='Catalog generation',
total=total)
print("On va lancer ", len(processes), " ")
# Run processes in a dummy variable.
_ = [x for x in processes]
# Terminate pool in case of SIGTERM signal.
signal.signal(signal.SIGTERM, sig_handler)
# Close the pool.
self.pool.close()
self.pool.join()
@staticmethod
def get_files(source):
FichList = [ f for f in os.listdir(source) if os.path.isfile(os.path.join(source,f)) ]
for filepath in FichList:
yield os.path.join(source,filepath)
@staticmethod
def get_entries(source):
# Iterate over each line of the source file.
with open(source, 'r') as f:
for entry in f:
yield entry
def get_args():
"""
Returns parsed command-line arguments.
"""
# Argument parser.
parser = ArgumentParser(
prog='mkesmcat',
description='Generates "intake-esm" catalog (JSON + CSV) from CLIMERI-France climate data archives.',
add_help=True
)
parser.add_argument(
'-v', '--version',
action='version',
version='1.0',
help='Program version.')
parser.add_argument(
'-p', '--project',
required=True,
nargs='?',
choices=ALLOWED_PROJECTS,
type=str,
help="""Suitable projects are: """.format(', '.join(ALLOWED_PROJECTS))
)
parser.add_argument(
'-o', '--outcat',