Commit 9d2083fe authored by Guillaume's avatar Guillaume
Browse files

Refactoring by Guillaume

parent 5d5df3f1
# -*- coding: utf-8 -*-
"""
:platform: Unix
:synopsis: DRS parser used in this module.
"""
import os
import pyessv
from pyessv import TemplateParsingError
from TimeRange import TimeRange
from constants import VOCAB
class DRSParser(object):
"""
Class handling DRS parsing of the path and filename depending on the project.
"""
def __init__(self, project):
# Add time range collection.
pyessv.create_collection(
pyessv.load('wcrp:{}'.format(project)),
"time_range",
description="Time Range",
term_regex=r'[0-9]+\-[0-9]+'
)
# Get DRS collections.
self.dir_drs = VOCAB[project]['directory_format']
self.file_drs = VOCAB[project]['filename_format']
# DRS keys.
self.dir_keys = [pyessv.load(i).raw_name for i in self.dir_drs]
self.file_keys = [pyessv.load(i).raw_name for i in self.file_drs]
# Set path template for vocabulary check.
dir_template = os.path.join(project, '/'.join(['{}'] * len(self.dir_drs)))
self.dir_parser = pyessv.create_template_parser(dir_template, self.dir_drs, strictness=1, seperator='/')
# Set file template for vocabulary check for fixed frequency.
file_template = '_'.join(['{}'] * len(self.file_drs))
self.file_parser = pyessv.create_template_parser(file_template, self.file_drs, strictness=1, seperator='_')
# Set file template for vocabulary check.
file_template = '_'.join(['{}'] * (len(self.file_drs) - 1))
self.fx_file_parser = pyessv.create_template_parser(file_template, self.file_drs[:-1], strictness=1,
seperator='_')
def get_facets_from_path(self, path):
"""
Deserialize pathlib.Path object against a DRS.
"""
# Check vocabulary.
try:
self.dir_parser.parse(path.parent.as_posix())
# Deserialize p.parent in dict excluding project.
facets = dict(zip(self.dir_keys, path.parent.parts[1:]))
return facets
# Vocabulary error handling.
except TemplateParsingError as e:
print(e)
# Key error handling. Due to wrong number of facets in comparison with the path parts.
except KeyError as e:
print(e)
def get_facets_from_filename(self, basename):
"""
Deserialize a filename string against a DRS.
"""
# Initialize tstart & tend
tstart, tend = None, None
# Check vocabulary.
try:
self.file_parser.parse(basename)
# Deserialize time range in date format.
timerange = TimeRange(basename.split('_')[-1])
tstart, tend = timerange.start, timerange.end
# Vocabulary error handling.
except TemplateParsingError:
# Try checking vocabulary with fixed variable template.
try:
self.fx_file_parser.parse(basename)
# No timerange.
tstart, tend = "", ""
# Vocabulary error handling.
except TemplateParsingError as e:
print(e)
# Key error handling. Due to wrong number of facets in comparison with the filename parts.
except KeyError as e:
print(e)
# Key error handling. Due to wrong number of facets in comparison with the filename parts.
except KeyError as e:
print(e)
# Deserialize filename and add time range facets.
facets = dict(zip(self.file_keys[:-1], basename.split('_')[:-1]))
facets['period_start'] = tstart
facets['period_end'] = tend
return facets
{
"esmcat_version":"{{ collection.esmcat_version }}",
"id":"{{ collection.id }}",
"description": "{{ collection.description }}",
"catalog_file": "{{ collection.catalog_file }}",
"attributes": [{% for attr in collection.attributes %}
"esmcat_version":"{{ esmcat_version }}",
"id":"{{ id }}",
"description": "{{ description }}",
"catalog_file": "{{ catalog_file }}",
"attributes": [{% for attr in attributes %}
{
"column_name": "{{ attr.column_name }}",
"vocabulary": "{{ attr.vocabulary }}"
}{% if not loop.last %},{% endif %}
{% endfor %}],
"assets": {
"column_name": "{{ collection.assets.column_name }}",
"format": "{{ collection.assets.format }}"
"column_name": "{{ assets.column_name }}",
"format": "{{ assets.format }}"
}
}
This diff is collapsed.
......@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from multiprocessing import Pool
import time # bien que non utilisé dans la fonction utilisé par les workers .. il le faut .. sinon le callcak n'est pas appélé !!! (histoire de gestion des timeout)
import TimeRange # bien que non utilisé dans la fonction utilisé par les workers .. il le faut .. sinon le callcak n'est pas appélé !!! (histoire de gestion des timeout)
import os,re
import pyessv
from glob import iglob
......@@ -104,7 +104,7 @@ def createCMIP6ESMCat(path, where=None):
name = path.replace("/", "_").split("bdd_")[1]
mCatFab = ESMCatFabric(name,mHeader,mData)
mCatFab.CreateCat(where if (where!=None) else path+"/.catalog" ,"dernier update : "+str(time.time()))
mCatFab.CreateCat(where if (where!=None) else path+"/.catalog","dernier update : " + str(TimeRange.time()))
def ASyncCreateESM(path,where): # ceci est la fonction par chaque job dans la pool, quand c'est terminé on retourne le nom pour indiquer que le job est fini
......
# -*- coding: utf-8 -*-
"""
:platform: Unix
:synopsis: Constants used in this module.
"""
IPSL_DATA_ROOT = '/bdd'
#INPUT_SOURCES = {'CMIP6': ['/bdd/CMIP6/C4MIP/MOHC/.paths.txt', # CMIP6 replicas (IDRIS)
# '/bdd/CMIP6/C4MIP/IPSL/.paths.txt' # CMIP6 IPSL datasets (TGCC)
# ],
INPUT_SOURCES = {'CMIP6': ['/Users/glipsl/Desktop/.paths.txt'],
'CMIP5': ['/bdd/CMIP5/output/MOHC/.paths.txt', # CMIP5 replicas (IDRIS)
'/bdd/CMIP5/output/IPSL/.paths.txt' # CMIP5 IPSL datasets (TGCC)
],
'CORDEX': ['/bdd/CORDEX/output/EUR-11/MOHC/.paths.txt', # CORDEX replicas (IDRIS)
'/bdd/CORDEX/output/EUR-11/IPSL/.paths.txt' # CORDEX IPSL datasets (TGCC)
]}
ALLOWED_PROJECTS = ['CMIP6', 'CMIP5', 'CORDEX']
CSV_EXTENSION = ".csv"
JSON_EXTENSION = ".json"
CATALOG_DESCRIPTION_TEMPLATE = "CLIMERI-France {} data catalog."
VOCAB = {
'CMIP6': {
'directory_format': (
'wcrp:cmip6:activity_id',
'wcrp:cmip6:institution_id',
'wcrp:cmip6:source_id',
'wcrp:cmip6:experiment_id',
'wcrp:cmip6:member_id',
'wcrp:cmip6:table_id',
'wcrp:cmip6:variable_id',
'wcrp:cmip6:grid_label',
'wcrp:cmip6:version'
),
'filename_format': (
'wcrp:cmip6:variable_id',
'wcrp:cmip6:table_id',
'wcrp:cmip6:source_id',
'wcrp:cmip6:experiment_id',
'wcrp:cmip6:member_id',
'wcrp:cmip6:grid_label',
'wcrp:cmip6:time_range'
)
},
'CMIP5': {
'directory_format': (
'wcrp:cmip5:product',
'wcrp:cmip5:institute',
'wcrp:cmip5:model',
'wcrp:cmip5:experiment',
'wcrp:cmip5:time_frequency',
'wcrp:cmip5:realm',
'wcrp:cmip5:cmor_table',
'wcrp:cmip5:ensemble',
'wcrp:cmip5:version',
'wcrp:cmip5:variable'
),
'filename_format': (
'wcrp:cmip5:variable',
'wcrp:cmip5:cmor_table',
'wcrp:cmip5:model',
'wcrp:cmip5:experiment',
'wcrp:cmip5:ensemble',
'wcrp:cmip5:time_range'
)
},
'CORDEX': {
'directory_format': (
'wcrp:cordex:product',
'wcrp:cordex:domain',
'wcrp:cordex:institute',
'wcrp:cordex:driving_model',
'wcrp:cordex:experiment',
'wcrp:cordex:ensemble',
'wcrp:cordex:rcm_model',
'wcrp:cordex:rcm_version',
'wcrp:cordex:time_frequency',
'wcrp:cordex:variable',
'wcrp:cordex:version'
),
'filename_format': (
'wcrp:cordex:variable',
'wcrp:cordex:domain',
'wcrp:cordex:driving_model',
'wcrp:cordex:experiment',
'wcrp:cordex:ensemble',
'wcrp:cordex:rcm_model',
'wcrp:cordex:rcm_version',
'wcrp:cordex:time_frequency',
'wcrp:cordex:time_range'
)
}
}
#!bin/python
# -*- coding: utf-8 -*-
import signal
import subprocess
from argparse import ArgumentParser
from multiprocessing.dummy import Pool
from pathlib import Path
import lockfile
from tqdm import tqdm
from DRSParser import DRSParser
from constants import *
from utils import *
class Process(object):
"""
Child process.
"""
def __init__(self, ctx):
"""
Processing context passed to each process.
"""
self.drs = ctx.drs
self.outcat = ctx.outcat
self.depth = ctx.depth
self.project = ctx.project
def __call__(self, entry):
"""
Any error switches to the next child process.
It does not stop the main process at all.
"""
# Escape in case of error.
try:
# Split entry into full file path and latest boolean.
path, latest = map(str.strip, entry.split())
# Convert path into pathlib.Path object.
path = Path(path)
# Get facets from path.
facets = self.drs.get_facets_from_path(path)
# Update facets from filename.
facets.update(self.drs.get_facets_from_filename(path.stem))
# Build CSV entry.
entry_facets = [facets[i] for i in self.drs.dir_keys]
entry = [IPSL_DATA_ROOT + path.as_posix(), self.project]
entry.extend(entry_facets)
entry.extend([facets['period_start'], facets['period_end'], latest])
# Build catalog filename.
if self.depth == 'project':
catpath = os.path.join(self.outcat,
self.project,
self.project)
else:
catdepth = entry_facets[:self.drs.dir_keys.index(self.depth) + 1]
catpath = os.path.join(self.outcat,
self.project,
'_'.join(catdepth))
# Lock catalog file to avoid multiprocessing concurrent access.
lock = lockfile.LockFile(catpath + CSV_EXTENSION)
with lock:
# Create catalog files (CSV + JSON) if not exists.
if not os.path.isfile(catpath + CSV_EXTENSION):
# Build CSV header.
header = ['path', 'project']
header.extend(self.drs.dir_keys)
header.extend(['period_start', 'period_end', 'latest'])
# Ensure header and entry have same length.
assert len(header) == len(entry)
# Write CSV header.
make_csv(catpath + CSV_EXTENSION, header)
# Write JSON catalog in the same time.
make_json(catpath + JSON_EXTENSION, self.project, header)
# Write catalog entry.
with open(catpath + CSV_EXTENSION, 'a+') as f:
f.write(','.join(entry) + '\n')
except Exception:
raise
class Runner(object):
def __init__(self, threads):
# Initialize the pool.
self.pool = None
if threads != 1:
self.pool = Pool(processes=threads)
def _handle_sigterm(self):
# Properly kill the pool in case of SIGTERM.
if self.pool:
self.pool.terminate()
exit(1)
def run(self, sources, ctx):
# Instantiate signal handler.
sig_handler = signal.signal(signal.SIGTERM, self._handle_sigterm)
# Read sources.
for source in sources:
# Get total entires in a fast way for beautiful progress bar.
total = int(subprocess.check_output(["wc", "-l", source]).split()[0])
# Instantiate pool of processes.
if self.pool:
# Instantiate pool iterator with progress bar.
processes = tqdm(self.pool.imap(Process(ctx), self.get_entries(source)), desc='Catalog generation',
total=total)
# Sequential processing use basic map function.
else:
# Instantiate processes iterator with progress bar.
processes = tqdm(map(Process(ctx), self.get_entries(source)), desc='Catalog generation', total=total)
# Run processes in a dummy variable.
_ = [x for x in processes]
# Terminate pool in case of SIGTERM signal.
signal.signal(signal.SIGTERM, sig_handler)
# Close the pool.
if self.pool:
self.pool.close()
self.pool.join()
@staticmethod
def get_entries(source):
# Iterate over each line of the source file.
with open(source, 'r+') as f:
for entry in f:
yield entry
def get_args():
"""
Returns parsed command-line arguments.
"""
# Argument parser.
parser = ArgumentParser(
prog='mkesmcat',
description='Generates "intake-esm" catalog (JSON + CSV) from CLIMERI-France climate data archives.',
add_help=True
)
parser.add_argument(
'-v', '--version',
action='version',
version='1.0',
help='Program version.')
parser.add_argument(
'-p', '--project',
required=True,
nargs='?',
choices=ALLOWED_PROJECTS,
type=str,
help="""Suitable projects are: """.format(', '.join(ALLOWED_PROJECTS))
)
parser.add_argument(
'-o', '--outcat',
required=True,
type=str,
help="""Output directory for generated catalogs."""
)
parser.add_argument(
'-t', '--threads',
metavar='1',
type=int,
default=1,
help="""
Number of threads.
Set to "1" seems pure sequential processing (default).
Set to "-1" seems all available threads as returned by "multiprocessing.cpu_count()".
"""
)
parser.add_argument(
'-d', '--depth',
default='project',
type=str,
help="""Optional DRS depth allowing the generation of catalogues at a certain granularity."""
)
# Return command-line parser & program name.
return parser.prog, parser.parse_args()
class Context(object):
"""
Base class for processing context manager.
"""
def __init__(self, args):
# Set project.
self.project = args.project
# Set DRS parser.
self.drs = DRSParser(args.project)
# Set output catalog directory.
self.outcat = args.outcat
# Set max pool processes.
self.threads = args.threads
# Set catalog depth.
self.depth = args.depth
# Set sources
self.sources = INPUT_SOURCES[args.project]
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def main():
"""
Run main program
"""
# Get command-line arguments.
prog, args = get_args()
# Add program name as argument.
setattr(args, 'prog', prog)
# Instantiate processing context
with Context(args) as ctx:
# Instantiate the runner.
r = Runner(ctx.threads)
# Run the pool.
r.run(ctx.sources, ctx)
if __name__ == "__main__":
main()
pyessv
netcdftime
\ No newline at end of file
netcdftime
jinja2
lockfile
tqdm
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
:platform: Unix
:synopsis: Miscellaneous functions used in this module.
"""
import os
from jinja2 import Template
from constants import CATALOG_DESCRIPTION_TEMPLATE
def make_csv(catpath, header):
"""
Write header into CSV catalog file.
"""
# Create directory if not exists.
if not os.path.exists(os.path.dirname(catpath)):
os.makedirs(os.path.dirname(catpath))
with open(catpath, 'a+') as f:
f.write(','.join(header) + '\n')
def make_json(catpath, project, header):
"""
ESM JSON catalog templating based on Jinja2.
"""
# Get catalog name.
catname = os.path.basename(catpath)
# Load Jinja2 template for ESM catalogs.
with open('ESMCollectionTemplate.json') as file:
esm_json_template = Template(file.read())
# Set template content.
content = dict()
content['esmcat_version'] = '0.1.0'
content['id'] = os.path.splitext(catname)[0]
content['description'] = CATALOG_DESCRIPTION_TEMPLATE.format(project)
content['catalog_file'] = catpath
content['attributes'] = list()
for facet in header[1:]:
attr = dict()
attr['column_name'] = facet
attr['vocabulary'] = '' # TODO: Use pyessv to get facet description?
content['attributes'].append(attr)
content['assets'] = dict()
content['assets']['column_name'] = header[0]
content['assets']['format'] = 'netcdf'
# Write rendered template.
with open(catpath, 'w+') as f:
f.write(esm_json_template.render(content))
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pyessv
# Add time range collection.
pyessv.create_collection(
pyessv.load('wcrp:{}'.format(project)),
"time_range",
description="Time Range",
term_regex=r'[0-9]+\-[0-9]+'
)
VOCAB = {
'CMIP6': {
'directory_format': (
......
Markdown is supported
0% or