Commit a869c617 authored by TROUSSELLIER Laurent's avatar TROUSSELLIER Laurent
Browse files

Creation JingaTemplate + Creation Ciclad Script

parent 754ee6cb
......@@ -12,7 +12,8 @@ from vocabulary import VOCAB
def main():
# TODO: From CLI args.
root = '/Users/glipsl/Documents/work/catalog/bdd/'
#root = '/Users/glipsl/Documents/work/catalog/bdd/'
root = '/home/ltrousse/Bureau/ModCatAdmin/catalog/bdd'
project = 'CMIP6'
# Get DRS collections.
......@@ -117,9 +118,11 @@ def main():
facets.update(dict(zip(file_keys[:-1], p.name.split('_')[:-1])))
facets['period_start'] = tstart
facets['period_end'] = tend
print(facets)
# Write in CSV.
#TODO
except:
pass
if __name__ == "__main__":
main()
\ No newline at end of file
{
"esmcat_version":"{{ collection.esmcat_version }}",
"id":"{{ collection.id }}",
"description": "{{ collection.description }}",
"catalog_file": "{{ collection.catalog_file }}",
"attributes": [{% for attr in collection.attributes %}
{
"column_name": "{{ attr.column_name }}",
"vocabulary": "{{ attr.vocabulary }}"
}{% if not loop.last %},{% endif %}
{% endfor %}],
"assets": {
"column_name": "{{ collection.assets.column_name }}",
"format": "{{ collection.assets.format }}"
}
}
......@@ -80,6 +80,7 @@ class ESMCatFabric():
#print(json.dumps(dicToSave))
with open(self.path+"/"+self.name+".json", 'w') as fp:
json.dump(dicToSave, fp, indent=4)
def __repr__(self):
res = ""
res+=str(self.header)+"\n"
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 14 09:34:19 2022
@author: ltrousse
"""
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from multiprocessing import Pool
import time # bien que non utilisé dans la fonction utilisé par les workers .. il le faut .. sinon le callcak n'est pas appélé !!! (histoire de gestion des timeout)
import os,re
import pyessv
from pyessv import TemplateParsingError
from vocabulary import VOCAB
from glob import iglob
from pathlib import Path
from TimeRange import TimeRange
from lockfile import LockFile
from shutil import rmtree
from jinja2 import Template
import argparse
# 1/ on recup les fichiers via glob
# 2/ on les passe à essv (anciennement fait avec pCMIP6) pour les valider et extraire les infos du path pour le catalogue
# 3/ on Cree/MAJ les cat ESM à la granularité ? choisie ?
# 4/ on passe les cat ESM au validateur de intake-esm
class DRS():
def __init__(self, project):
# Get DRS collections.
self.project=project
self.dir_drs = VOCAB[project]['directory_format']
self.file_drs = VOCAB[project]['filename_format']
#print(self.project,self.dir_drs,self.file_drs)
#print(self.file_drs)
# Add time range collection.
time_range = pyessv.create_collection(
pyessv.load('wcrp:{}'.format(project)),
"time_range",
description="Time Range",
term_regex=r'[0-9]+\-[0-9]+'
)
# Override version collection with "latest" pattern.
version = pyessv.create_collection(
pyessv.load('wcrp:{}'.format(project)),
"version",
description="Version",
term_regex=r'^v[0-9]{8}|latest$'
)
# DRS keys.
self.dir_keys = [pyessv.load(i).raw_name for i in self.dir_drs]
self.file_keys = [pyessv.load(i).raw_name for i in self.file_drs]
# Set path template for vocabulary check.
dir_template = os.path.join(project, '/'.join(['{}'] * len(self.dir_drs)))
self.dir_parser = pyessv.create_template_parser(dir_template, self.dir_drs, strictness=1, seperator='/')
# Set file template for vocabulary check for fixed frequency.
file_template = '_'.join(['{}'] * len(self.file_drs))
self.file_parser = pyessv.create_template_parser(file_template, self.file_drs, strictness=1, seperator='_')
# Set file template for vocabulary check.
file_template = '_'.join(['{}'] * (len(self.file_drs) - 1))
self.fx_file_parser = pyessv.create_template_parser(file_template, self.file_drs[:-1], strictness=1, seperator='_')
def getDirFromFacetsUntilGran(self,facets,gran):
print("getDirFromFacetsUntilGran")
indexOfGran = self.get_dir_format()[0].index(str(gran))
print(indexOfGran)
myDir=""
for i in range(indexOfGran):
print(i)
myDir+=facets[self.dir_keys[i]]+"/"
print("myDir:",myDir)
return myDir
def getFacetsFromPath(self,p):
#print("ON RECUP LES FACETS FROM ",p)
# Initialize final dictionary of DRS facets.
facets = dict()
# Deserialize path.
try:
# Check vocabulary.
self.dir_parser.parse(p.parent.as_posix())
# Deserialize p.parent in dict excluding project.
facets = dict(zip(self.dir_keys, p.parent.parts[1:]))
# Vocabulary error handling.
except TemplateParsingError as e:
print(e)
# Key error handling. Due to wrong number of facets in comparison with the path parts.
except KeyError as e:
print(e)
# Deserialize filename.
#try:
# Check vocabulary.
try:
self.file_parser.parse(p.stem)
# Deserialize time range in date format.
timerange = TimeRange(p.stem.split('_')[-1])
tstart, tend = timerange.start, timerange.end
# Vocabulary error handling.
except TemplateParsingError as e:
# Try checking vocabulary with fixed variable template.
try:
self.fx_file_parser.parse(p.stem)
# No timerange.
tstart, tend = None
# Vocabulary error handling.
except TemplateParsingError as e:
print(e)
# Key error handling. Due to wrong number of facets in comparison with the filename parts.
except KeyError as e:
print(e)
# Key error handling. Due to wrong number of facets in comparison with the filename parts.
except KeyError as e:
print(e)
# Deserialize p.name and update dict.
facets.update(dict(zip(self.file_keys[:-1], p.name.split('_')[:-1])))
facets['period_start'] = tstart
facets['period_end'] = tend
#except :
#pass
return facets
def get_dir_format(self):
"""
Load project directory structure from PYESSV Archive.
"""
# Load project vocabulary.
vocabulary = pyessv.load('wcrp:{}'.format(self.project))
# Correct the directory structure if needed.
directory_structure = vocabulary.data['directory_format'].replace('activity_drs', 'activity_id')
# Extract names of collections (excluding "root").
names = [re.search(r'\w+', part).group() for part in directory_structure.split('s/')[1:]]
# Get set of pyessv collections.
collections = set()
for name in names:
for collection in vocabulary.collections:
if name == collection.raw_name:
collections.add(collection)
return names, collections
def __repr__(self):
res=""
res+="Dir Keys :\n"
res+=str(self.dir_keys) +"\n"
res+="File Keys :\n"
res+=str(self.file_keys) +"\n"
res+="All Parser :\n"
res+=str(self.dir_parser) +"\n"
res+=str(self.file_parser) +"\n"
res+=str(self.fx_file_parser) +"\n"
return res
class GuillaumeIdea():
def __init__(self):
pass
def DecodePath(self,project,root,gran,filepath):
# print("COUCOU")
# print(project)
# print(root)
# print(gran)
# print(filepath)
########################################
## Traitement unitaire pour chaque fichier
########################################
# Infos nécessaires :
# * le path vu par Ciclad
# * les facets qui correspondent à ce path
# * derniere version ou pas => flag latest
# * dans quel catalogue (csv) placer le path et les facets et le flag
#
# Au final :
# * append => "a+" le path et les facets et le flag" dans le bon catalogue
#
# Défaut assumé:
# * le catalogue ne sera pas valide tant qu'on aura pas le header dans les csv et créé le json qui décrit mieux chaque colonne
# ces 2 traitements seront fait quand tous les fichiers csv seront créés de façon unitaire par cette fonction en parallele
#filepath exemple : /gpfscmip/gpfsdata/esgf/CMIP6/PMIP/CAS/FGOALS-f3-L/lig127k/r1i1p1f1/day/vas/gr/v20191025/vas_day_FGOALS-f3-L_lig127k_r1i1p1f1_gr_11900101-11991231.nc latest
#########################
# la path vu par ciclad : ET : derniere version ou pas => flag latest
#########################
path, flagLatest = filepath.split(" ") # on sépare le path et le flag "latest"
#TEMP pour le test
#
#
#print("START")
#ft0 = time.time()
#path = filepath
#flagLatest = "1"
#p = Path(filepath[1:]).relative_to(root) # le [1:] juste pour virer le "/" de départ pour que ça soit plus facile pour décoder le path en facets
p = Path(filepath[1:]) # le [1:] juste pour virer le "/" de départ pour que ça soit plus facile pour décoder le path en facets
pathFromCiclad = os.path.join("/bdd",p)
#print(path,flagLatest,pathFromCiclad)
#ft1 = time.time()
#########################
# les facets qui correspondent à ce path :
#########################
#ft1 = time.time()
#print("1:",ft1-ft0)
#drs = DRS(project)
#print("DECODE_PATH")
facets = drs.getFacetsFromPath(p)
# Maybe check if there are good ?
# print("A PRIORI ON A DES FACETS ")
# print(facets)
#########################
# dans quel catalogue (csv) placer le path et les facets et le flag
#########################
#ft2 = time.time()
#print("2:",ft2-ft1)
# le catalogue est situé dans /modfs/catalog/"project"/
# son nom dépend des facets et de la granulité choisie:
# exemple pour une gran à "l'experiment_id" : CMIP6_PMIP_CAS_FGOALS-f3-L_lig127k_catalog.csv
indexOfGran = drs.get_dir_format()[0].index(str(gran))
rootDirCat = "bdd/modfs/catalog/"
#rootDirCat = "FakeDir/Fakebdd/modfs/catalog/"
dirCat = os.path.join(rootDirCat,project)
nameCat = ""
for i in range(indexOfGran):
nameCat+=facets[drs.dir_keys[i]]+"_"
nameCat = project+"_"+nameCat[:-1]+".csv"
pathCat = os.path.join(dirCat,nameCat)
#print(pathCat,nameCat, "path : ",pathCat)
##########################
# On a toute les infos ...
# Append avec lock
##########################
#ft3 = time.time()
#print("3:",ft3-ft2)
data = pathFromCiclad[:-1]+","+project+","
for _,v in facets.items():
#print(k,v)
data = data + str(v) +","
data = data + flagLatest
#print(data)
lock = LockFile(pathCat)
with lock:
with open(pathCat, 'a+') as f:
#print("ON APPEND")
f.write(data+"\n")
#print("FINI APPEND")
#ft4 = time.time()
#print("4:",ft4-ft3)
return filepath
def JobDone(self,ret):
if ret == "":
#print("le path n'était pas bon")
pass
else:
# print("############################")
# print("On a traité le fichier ",ret)
# print("############################")
# print("")
pass
def line_prepender(filename, line):
with open(filename, 'r+') as f:
content = f.read()
f.seek(0, 0)
f.write(line.rstrip('\r\n') + '\n' + content)
#lock = Lock() # Là .. le lock global .. est ennuyeux ..
# Le lock sert uniquement (je crois) aux acces concurrents sur le catalogue
# MAIS ... si on connait le catalogue ... on pourrait simplement avoir un lock par catalogue
# pour l'instant on le connait pas le catalogue quand on lance les jobs avec le GuillameIdea()
drs = None #DRS(project)
def main():
###################################################"
## La CLI pour avoir des parametres pour le script
###################################################"
parser = argparse.ArgumentParser(description="Script which create an esm catalog (json+csv) from netcdf files in sub-directories from the 'root' dir and put the catalog into 'out_cat' directory using vocabulary as validator from 'project' CV and DRS")
parser.add_argument("project", help="could be CMIP6, CMIP5 , CORDEX", type=str)
parser.add_argument("root", help="root path to scan in order to get an esm cat", type=str)
parser.add_argument("gran", help = "granularite where to start scan AND also path of output catalog", type=str)
args=parser.parse_args()
#print(args)
###################################################"
## Quelle est la DRS que l'on va utiliser pour creer les catalogue => ça depend du project
###################################################"
#drs = DRS(args.project)
###################################################"
## L'idée c'est de paralleliser la recupération des path pour les analyser => glob
###################################################"
#pattern = os.path.join(args.root, args.project, '**/' * (len(drs.dir_keys)+1))
methode = GuillaumeIdea()
#project = "CMIP6"
#root = "gpfscmip/gpfsdata/esgf"
#gran = "experiment_id"
global drs
drs = DRS(args.project)
nbInPool = 4
pool = Pool(nbInPool)
#pathDesFichiers = "FakeDir/CMIP6_paths.csv" #<= ici les mega fichiers seront dans modf => 1 pour le TGCC ,1 pour l'IDRISS après avoir fait tourner le "super find bash"
pathDesFichiers = "bdd/CMIP6/C4MIP/MOHC/.paths.txt" #<= ici les mega fichiers seront dans modf => 1 pour le TGCC ,1 pour l'IDRISS après avoir fait tourner le "super find bash"
# TODO ... dépend du project ^^
deb = time.time() # pour benchmarck un peu
#############################################
# On Vire les anciens pour creer les nouveaux
#############################################
rootDirCat = "bdd/modfs/catalog/"
#rootDirCat = "FakeDir/Fakebdd/modfs/catalog/"
dirCat = os.path.join(rootDirCat,args.project)
if os.path.exists(dirCat):
rmtree(dirCat)
os.makedirs(dirCat)
file = open(pathDesFichiers, 'r')
lines = file.readlines()
tempNb = 2000
for i, path in enumerate(lines):
#print(path)
#pool.apply_async(methode.DecodePath, args=(project,root,gran,path,), callback = methode.JobDone)
pool.apply_async(methode.DecodePath, args=(args.project,args.root,args.gran,path,))
if i>tempNb:
break
file.close()
pool.close()
pool.join()
print( "au total : ", (time.time() - deb), " s ")
def prependCSV():
#project="CMIP6"
#gran = "experiment_id"
################
# Une fois fini ... on a une liste de csv auquel il faut prépend le header
# On veut pour chaque fichier, écrire le header dans ces fichiers
################
#print(header)
project= "CMIP6"
global drs
drs = DRS(project)
header = "path,project"
for i in range(len(drs.dir_keys)):
header= header + ","+ drs.dir_keys[i]
header = header + ",period_start,period_end,latest"
#rootDirCat = "FakeDir/Fakebdd/modfs/catalog/"
rootDirCat = "bdd/modfs/catalog/"
pattern = os.path.join(Path(rootDirCat,project),'*.csv')
#print(pattern)
for path in iglob(pattern):
line_prepender(path,header)
where = rootDirCat+project
SaveJSON(path.split("/")[-1],where,header)
#print(path)
def SaveJSON(name,where,header):
ESMCollectionTemplate = "ESMCollectionTemplate.json"
with open(ESMCollectionTemplate) as file_:
j2_ESMCollectionTemplate = Template(file_.read())
dicToSave={}
dicToSave["esmcat_version"]="0.1.0"
dicToSave["id"]=str(name.split('.')[0])
dicToSave["description"]= ""
dicToSave["catalog_file"]=name
attributeList=[]
for key in header.split(",")[1:]:
attr={}
attr["column_name"] = key
attr["vocabulary"]=""
attributeList.append(attr)
dicToSave["attributes"]=attributeList
dicAsset={}
dicAsset["column_name"]= header.split(",")[0]
dicAsset["format"]="netcdf"
dicToSave["assets"]=dicAsset
mycollectiondict={"collection":dicToSave}
ren = j2_ESMCollectionTemplate.render(mycollectiondict)
#print(where+"/"+name.split('.')[0]+".json")
fp = open(where+"/"+name.split('.')[0]+".json", 'w')
fp.write(ren)
fp.close()
from YALMFab import CatYALMFab
def protoYAML():
#rootDirCat = "FakeDir/Fakebdd/modfs/catalog/"
rootDirCat = "bdd/modfs/catalog/"
mCatFab = CatYALMFab(rootDirCat,"CMIP6_YALM")
pattern = os.path.join(Path(rootDirCat,"CMIP6"),'*.json')
#print(pattern)
for path in iglob(pattern):
print(path)
nameCatwoExtension = path.split("/")[-1].split(".")[0]
#def Add(self,nameSource,args,description,driver,metadata):
mCatFab.Add(nameCatwoExtension,{"esmcol_obj":"{{ CATALOG_DIR }}"+"/CMIP6/"+nameCatwoExtension+".json"},"","intake_esm.esm_datastore",{})
mCatFab.Update()
if __name__ == "__main__":
# execute only if run as a script
#main(project ="CMIP6",root="FakeDir/FakeIDRIS/CMIP6",gran="experiment_id")
main()
prependCSV()
protoYAML()
......@@ -131,7 +131,7 @@ def main():
# le root path à scanner
# le path de dest du catalog.
project = 'CMIP6'
root = '/bdd/CMIP6'
root = '/bdd/root CMIP6'
out_cat = '/Users/glipsl/Documents/work/catalog'
# Load project directory structure.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment