Commit 34dc40c2 authored by Guillaume's avatar Guillaume
Browse files

Refactoring by Guillaume

parent 0fc00d1f
......@@ -85,7 +85,7 @@ class DRSParser(object):
"""
# Initialize tstart & tend
tstart, tend, clim = 'none', 'none', 'False'
tstart, tend, clim = '', '', 'False'
# Set clim to True and rename basename to match usual template in case of climatology file.
if basename.endswith('-clim.nc'):
......@@ -105,7 +105,7 @@ class DRSParser(object):
self.fx_file_parser.parse(basename)
# Set no timerange and no climatology.
tstart, tend, clim = 'none', 'none', 'False'
tstart, tend, clim = '', '', 'False'
# Catch any other exception.
except Exception as e:
......
......@@ -12,5 +12,47 @@
"assets": {
"column_name": "{{ assets.column_name }}",
"format": "{{ assets.format }}"
},
"aggregation_control": {
"variable_column_name": "{{ aggregation_control.variable_column_name }}",
"groupby_attrs": [
"activity_id",
"institution_id",
"source_id",
"experiment_id",
"table_id",
"grid_label"
],
"aggregations": [ [{% for agg in aggregations %}
{
"type": "{{ agg.type }}",
"attribute_name": "{{ agg.attribute_name }}",
{% if options in agg.keys() %}
"options": "{{ agg.options }}"
{% endif %}
}{% if not loop.last %},{% endif %}
{% endfor %}],
{
"type": "union",
"attribute_name": "variable_id"
},
{
"type": "join_existing",
"attribute_name": "time_range",
"options": { "dim": "time", "coords": "minimal", "compat": "override" }
},
{
"type": "join_new",
"attribute_name": "member_id",
"options": { "coords": "minimal", "compat": "override" }
},
{
"type": "join_new",
"attribute_name": "dcpp_init_year",
"options": { "coords": "minimal", "compat": "override" }
}
]
}
}
......@@ -6,6 +6,10 @@
"""
CATALOG_OUTPUT_BACKUP = '/modfs/backup/catalog'
CATALOG_OUTPUT_PROD = '/modfs/catalog/catalog'
CATALOG_OUTPUT = '/modfs/scratch/catalog'
IPSL_DATA_ROOT = '/bdd/'
INPUT_PATTERNS = {'CMIP6': '/bdd/CMIP6/*/*/*/*/.paths.txt',
......@@ -95,3 +99,84 @@ VOCAB = {
)
}
}
VOCAB_URLS = {
'CMIP6': {
'activity_id': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json',
'institution_id': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_institution_id.json',
'source_id': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_source_id.json',
'experiment_id': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_experiment_id.json',
'member_id': 'https://docs.google.com/document/d/1h0r8RZr_f3-8egBMMh7aqLwy3snpD6_MrDz1q8n5XUk/edit?usp=sharing',
'table_id': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_table_id.json',
'variable_id': 'https://docs.google.com/document/d/1h0r8RZr_f3-8egBMMh7aqLwy3snpD6_MrDz1q8n5XUk/edit?usp=sharing',
'grid_label': 'https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_grid_label.json',
'version': 'https://docs.google.com/document/d/1h0r8RZr_f3-8egBMMh7aqLwy3snpD6_MrDz1q8n5XUk/edit?usp=sharing'
},
'CMIP5': {
'product': 'https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=72',
'institute': 'https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=72',
'model': 'https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=72',
'experiment': 'https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=72',
'time_frequency': 'https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=72',
'realm': 'https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=72',
'cmor_table': 'https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=72',
'ensemble': 'https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=72',
'version': 'https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=72',
'variable': 'https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=72'
},
'CORDEX': {
'product': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf',
'domain': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf',
'institute': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf',
'driving_model': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf',
'experiment': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf',
'ensemble': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf',
'rcm_model': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf',
'rcm_version': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf',
'time_frequency': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf',
'variable': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf',
'version': 'http://is-enes-data.github.io/cordex_archive_specifications.pdf'
}
}
INIT_YEAR_REGEX = '(s(?P<init_year>[0-9]{4})-)?r[0-9]+i[0-9]+p[0-9]+f[0-9]+'
AGGREGATION_VARIABLE = {
'CMIP6': 'variable_id',
'CMIP5': 'variable',
'CORDEX': 'variable'
}
AGGREGATION_MEMBER = {
'CMIP6': 'member_id',
'CMIP5': 'ensemble',
'CORDEX': 'ensemble'
}
AGGREGATION_GROUP = {
'CMIP6': [
'activity_id',
'institution_id',
'source_id',
'experiment_id',
'table_id',
'grid_label'
],
'CMIP5': [
'institute',
'model',
'experiment',
'time_frequency',
'realm',
'cmor_table'
],
'CORDEX': [
'domain',
'institute',
'driving_model',
'experiment',
'rcm_model',
'time_frequency'
]
}
#!bin/python
# -*- coding: utf-8 -*-
import re
import subprocess
import tarfile
from argparse import ArgumentParser
from datetime import date
from glob import iglob
from multiprocessing import Pool
from pathlib import Path
from shutil import copy
from DRSParser import DRSParser
from constants import *
from utils import *
......@@ -23,20 +26,13 @@ class Process(object):
"""
# Retrieve processing context args.
self.outcat = ctx.outcat
self.outcat = CATALOG_OUTPUT
self.project = ctx.project
self.header = ctx.header
# Retrieve DRS parser from global variables forked to the worker.
global drs
# Build CSV header.
self.header = ['path', 'project']
self.header.extend(drs.dir_keys)
self.header.extend(['period_start',
'period_end',
'climatology',
'latest'])
# Set CSV entries list.
self.entries = list()
......@@ -51,6 +47,10 @@ class Process(object):
# Open source file.
with open(source, 'r') as f:
print(f'Processing {source}...')
# Iterate over lines.
for line in f:
# Split entry into full file path and latest boolean.
......@@ -66,6 +66,14 @@ class Process(object):
# Update facets from filename.
facets.update(drs.get_facets_from_filename(path.stem))
# Deserialize member/ensemble facet.
if self.project == 'CMIP6':
assert AGGREGATION_MEMBER in facets
pattern = re.compile(INIT_YEAR_REGEX)
facets['init_year'] = re.match(pattern, facets[AGGREGATION_MEMBER]).groupdict()['init_year']
else:
facets['init_year'] = ''
# If facet dict is empty, go to next line/path.
if not facets:
continue
......@@ -74,7 +82,8 @@ class Process(object):
entry_facets = [facets[i] for i in drs.dir_keys]
entry = [IPSL_DATA_ROOT + path.as_posix(), self.project]
entry.extend(entry_facets)
entry.extend([facets['period_start'],
entry.extend([facets['init_year'],
facets['period_start'],
facets['period_end'],
facets['climatology'],
latest])
......@@ -95,17 +104,24 @@ class Process(object):
except OSError:
pass
# Write JSON catalog.
make_json(catpath + JSON_EXTENSION, self.project, self.header)
# Write CSV catalog.
make_csv(catpath + CSV_EXTENSION, self.header, self.entries)
make_csv(catpath + CSV_EXTENSION, self.entries)
except Exception as e:
print(source, e)
pass
def get_sources(project):
"""
Returns an iterator over list of input sources following pattern of the project.
"""
pattern = INPUT_PATTERNS[project]
for source in iglob(pattern):
yield source
def get_args():
"""
Returns parsed command-line arguments.
......@@ -133,13 +149,6 @@ def get_args():
help="""Suitable projects are: """.format(', '.join(ALLOWED_PROJECTS))
)
parser.add_argument(
'-o', '--outcat',
required=True,
type=str,
help="""Output directory for generated catalogs."""
)
parser.add_argument(
'-c', '--processes',
metavar='1',
......@@ -166,8 +175,8 @@ class Context(object):
# Set project.
self.project = args.project
# Set output catalog directory.
self.outcat = args.outcat
# Set CSV header.
self.header = args.header
def __enter__(self):
return self
......@@ -190,17 +199,32 @@ def main():
# Add program name as argument.
setattr(args, 'prog', prog)
print(f'Catalog generation for {args.project}...')
# Set DRS parser as global variable to be forked into the pool workers.
# Setting up DRS parser into processing context raise a pickling error due to pyessv librairie.
print(f'Set up {args.project} DRS parser...')
global drs
drs = DRSParser(args.project)
# Build CSV header.
header = ['path', 'project']
header.extend(drs.dir_keys)
header.extend(['init_year',
'period_start',
'period_end',
'climatology',
'latest'])
setattr(args, 'header', header)
# Instantiate processing context
with Context(args) as ctx:
# If processes is not 1, use multiprocessing.Pool.
if args.processes != 1:
print(f'Using pool of {args.processes} workers...')
# Instantiate Pool context.
with Pool(processes=args.processes) as pool:
......@@ -219,15 +243,25 @@ def main():
# Run processes in a dummy variable
_ = [x for x in processes]
# Write JSON catalog.
make_json(CATALOG_OUTPUT_PROD + JSON_EXTENSION, args.project, header)
def get_sources(project):
"""
Returns an iterator over list of input sources following pattern of the project.
# Concatenate CSV files into one compressed CSV catalog.
print(f'Concatenating CSV catalogs...')
subprocess.check_call(
f'cat {CATALOG_OUTPUT}/{args.project}/*{CSV_EXTENSION} > {CATALOG_OUTPUT_PROD}/{args.project}{CSV_EXTENSION}')
"""
pattern = INPUT_PATTERNS[project]
for source in iglob(pattern):
yield source
# Compress CSV catalog.
print(f'Compressing final CSV catalog...')
with tarfile.open(f'{CATALOG_OUTPUT_PROD}/{args.project}{CSV_EXTENSION}.gz', 'w:gz') as tar:
tar.add(f'{CATALOG_OUTPUT_PROD}/{args.project}{CSV_EXTENSION}')
# Backup CSV catalog.
print(f'Backup CSV compressed catalog...')
copy(src=f'{CATALOG_OUTPUT_PROD}/{args.project}{CSV_EXTENSION}.gz',
dst=f'{CATALOG_OUTPUT_BACKUP}/{args.project}_{date.today().strftime("%y%m%d")}_{CSV_EXTENSION}.gz')
print(f'{args.project} intake catalogs complete.')
if __name__ == "__main__":
......
#!/bin/bash
#PBS -N esm_cat_generator
#PBS -l nodes=1:ppn=16
#PBS -q day
#PBS -l mem=8gb
#PBS -l vmem=8gb
#PBS -j oe
python esm_cat_generator.py -p CMIP6 -c 16
python esm_cat_generator.py -p CMIP5 -c 16
python esm_cat_generator.py -p CORDEX -c 16
......@@ -10,10 +10,10 @@ import os
from jinja2 import Template
from constants import CATALOG_DESCRIPTION_TEMPLATE
from constants import *
def make_csv(catpath, header, entries):
def make_csv(catpath, entries):
"""
Write ESM CSV catalog file.
......@@ -21,9 +21,9 @@ def make_csv(catpath, header, entries):
# Create directory if not exists.
if not os.path.exists(os.path.dirname(catpath)):
os.makedirs(os.path.dirname(catpath))
with open(catpath, 'w+') as f:
f.write(','.join(header) + '\n')
# Open file in write mode. Overwrites the file if the file exists.
with open(catpath, 'w') as f:
for line in entries:
f.write(','.join(line) + '\n')
......@@ -33,7 +33,8 @@ def make_json(catpath, project, header):
ESM JSON catalog templating based on Jinja2.
"""
print(f'Writing JSON catalog...')
# Get catalog name.
catname = os.path.basename(catpath)
......@@ -46,21 +47,40 @@ def make_json(catpath, project, header):
content['esmcat_version'] = '0.1.0'
content['id'] = os.path.splitext(catname)[0]
content['description'] = CATALOG_DESCRIPTION_TEMPLATE.format(project)
content['catalog_file'] = catpath
content['catalog_file'] = os.path.splitext(catname)[0] + CSV_EXTENSION
content['attributes'] = list()
for facet in header[1:]:
attr = dict()
attr['column_name'] = facet
attr['vocabulary'] = '' # TODO: Use pyessv to get facet description?
attr['vocabulary'] = VOCAB_URLS[project][facet]
content['attributes'].append(attr)
content['assets'] = dict()
content['assets']['column_name'] = header[0]
content['assets']['format'] = 'netcdf'
content['aggregation_control']['variable_column_name'] = AGGREGATION_VARIABLE[project]
content['aggregation_control']['groupby_attrs'] = AGGREGATION_GROUP[project]
content['aggregations'] = list()
content['aggregations'].append({'type': 'union',
'attribute_name': 'variable_id'})
content['aggregations'].append({'type': 'join_existing',
'attribute_name': 'period_start',
'options': {'dim': 'time',
'coords': 'minimal',
'compat': 'override'}})
content['aggregations'].append({'type': 'join_new',
'attribute_name': AGGREGATION_MEMBER[project],
'options': {'coords': 'minimal',
'compat': 'override'}})
if project == 'CMIP6':
content['aggregations'].append({'type': 'join_new',
'attribute_name': 'init_year',
'options': {'coords': 'minimal',
'compat': 'override'}})
# Create directory if not exists.
if not os.path.exists(os.path.dirname(catpath)):
os.makedirs(os.path.dirname(catpath))
# Write rendered template.
with open(catpath, 'w+') as f:
# Write rendered template. Overwrites the file if the file exists.
with open(catpath, 'w') as f:
f.write(esm_json_template.render(content))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment