Commit 3e9ceae9 authored by TROUSSELLIER Laurent's avatar TROUSSELLIER Laurent
Browse files

One liner + bennchmark Atef Idea

parent a869c617
#!/usr/bin/env python3
#!bin/python
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 14 09:34:19 2022
......@@ -22,6 +22,7 @@ from lockfile import LockFile
from shutil import rmtree
from jinja2 import Template
import argparse
import shutil
# 1/ on recup les fichiers via glob
# 2/ on les passe à essv (anciennement fait avec pCMIP6) pour les valider et extraire les infos du path pour le catalogue
# 3/ on Cree/MAJ les cat ESM à la granularité ? choisie ?
......@@ -180,11 +181,11 @@ class DRS():
class GuillaumeIdea():
class GuillaumeIdea(): # 1 worker / fichier
def __init__(self):
pass
def DecodePath(self,project,root,gran,filepath):
def DecodePath(self,project,rootDirCat,gran,filepath):
# print("COUCOU")
# print(project)
# print(root)
......@@ -212,6 +213,9 @@ class GuillaumeIdea():
# la path vu par ciclad : ET : derniere version ou pas => flag latest
#########################
path, flagLatest = filepath.split(" ") # on sépare le path et le flag "latest"
flagLatest=flagLatest[:-1]
#print(path)
#print("latest=",flagLatest)
#TEMP pour le test
#
#
......@@ -222,9 +226,9 @@ class GuillaumeIdea():
#p = Path(filepath[1:]).relative_to(root) # le [1:] juste pour virer le "/" de départ pour que ça soit plus facile pour décoder le path en facets
p = Path(filepath[1:]) # le [1:] juste pour virer le "/" de départ pour que ça soit plus facile pour décoder le path en facets
#p = Path(path)
pathFromCiclad = os.path.join("/bdd",p)
#pathFromCiclad = os.path.join("/bdd",p)
#print(path,flagLatest,pathFromCiclad)
#ft1 = time.time()
#########################
......@@ -234,10 +238,10 @@ class GuillaumeIdea():
#print("1:",ft1-ft0)
#drs = DRS(project)
#print("DECODE_PATH")
facets = drs.getFacetsFromPath(p)
facets = drs.getFacetsFromPath(Path(path))
# Maybe check if there are good ?
# print("A PRIORI ON A DES FACETS ")
# print(facets)
#print("A PRIORI ON A DES FACETS ")
#print(facets)
#########################
# dans quel catalogue (csv) placer le path et les facets et le flag
......@@ -248,16 +252,28 @@ class GuillaumeIdea():
# son nom dépend des facets et de la granulité choisie:
# exemple pour une gran à "l'experiment_id" : CMIP6_PMIP_CAS_FGOALS-f3-L_lig127k_catalog.csv
indexOfGran = drs.get_dir_format()[0].index(str(gran))
rootDirCat = "bdd/modfs/catalog/"
#indexOfGran = drs.get_dir_format()[0].index(str(gran))
#rootDirCat = "bdd/modfs/catalog/"
#rootDirCat = "FakeDir/Fakebdd/modfs/catalog/"
dirCat = os.path.join(rootDirCat,project)
nameCat = ""
for i in range(indexOfGran):
nameCat+=facets[drs.dir_keys[i]]+"_"
nameCat = project+"_"+nameCat[:-1]+".csv"
pathCat = os.path.join(dirCat,nameCat)
################## TODO MIX LES 2 BOUCLES ????? ###############################################
################## Voir meme les virer ... split join du path ???? ############################
# '_'.join(path.split("/")[:gran])
## dirCat = os.path.join(rootDirCat,project)
## nameCat = ""
## for i in range(indexOfGran):
## nameCat+=facets[drs.dir_keys[i]]+"_"
## nameCat = project+"_"+nameCat[:-1]+".csv"
## pathCat = os.path.join(dirCat,nameCat)
## ### OU BIEN EN one liner =>
pathCat = os.path.join(rootDirCat,project,"_".join([facets[drs.dir_keys[i]] for i in range(drs.dir_keys.index(str(gran))+1)])+".csv")
#print(pathCat,nameCat, "path : ",pathCat)
##########################
......@@ -266,19 +282,22 @@ class GuillaumeIdea():
##########################
#ft3 = time.time()
#print("3:",ft3-ft2)
# NOTE : Python 3.7> pour 'garantir' l'ordonancement du dict : https://stackoverflow.com/questions/39980323/are-dictionaries-ordered-in-python-3-6
## data = pathFromCiclad+","+project+","
## for _,v in facets.items():
## #print(k,v)
## data = data + str(v) +","
## data = data + flagLatest
data = pathFromCiclad[:-1]+","+project+","
for _,v in facets.items():
#print(k,v)
data = data + str(v) +","
data = data + flagLatest
data = os.path.join("/bdd",Path(path))+","+project +","+ ",".join(facets.values()) + ","+ flagLatest
#print(data)
lock = LockFile(pathCat)
#print(pathCat)
with lock:
# A TESTER : si il existe ? osef .. sinon on le cree AVEC entete ...
with open(pathCat, 'a+') as f:
#print("ON APPEND")
#print("ON APPEND:", data)
f.write(data+"\n")
#print("FINI APPEND")
#ft4 = time.time()
......@@ -297,6 +316,8 @@ class GuillaumeIdea():
# print("")
pass
# creer le csv avec entete
def line_prepender(filename, line):
with open(filename, 'r+') as f:
content = f.read()
......@@ -309,7 +330,69 @@ def line_prepender(filename, line):
# MAIS ... si on connait le catalogue ... on pourrait simplement avoir un lock par catalogue
# pour l'instant on le connait pas le catalogue quand on lance les jobs avec le GuillameIdea()
class AtefIdea(): # creation de l'entete du csv par le premier worker au lieu de prepend après tous les workers
def DecodePath(self,project,rootDirCat,gran,filepath):
path, flagLatest = filepath.split(" ") # on sépare le path et le flag "latest"
flagLatest=flagLatest[:-1]
facets = drs.getFacetsFromPath(Path(path))
pathCat = os.path.join(rootDirCat,project,"_".join([facets[drs.dir_keys[i]] for i in range(drs.dir_keys.index(str(gran))+1)])+".csv")
#print(pathCat)
data = os.path.join("/bdd",Path(path))+","+project +","+ ",".join(facets.values()) + ","+ flagLatest
#print(os.path.isfile(pathCat))
lock = LockFile(pathCat)
with lock:
# A TESTER : si il existe ? osef .. sinon on le cree AVEC entete ...
if not os.path.isfile(pathCat):
#print("coucou")
header = "path,project,"+",".join(drs.dir_keys)+",period_start,period_end,latest"
with open(pathCat, 'a+') as f:
f.write(header+"\n")
SaveJSON(pathCat.split("/")[-1], rootDirCat+project, header)
with open(pathCat, 'a+') as f:
#print(data)
f.write(data+"\n")
def mainAtefIdea():
parser = argparse.ArgumentParser(description="Script which create an esm catalog (json+csv) from netcdf files in sub-directories from the 'root' dir and put the catalog into 'out_cat' directory using vocabulary as validator from 'project' CV and DRS")
parser.add_argument("project", help="could be CMIP6, CMIP5 , CORDEX", type=str)
parser.add_argument("rootDirCat", help="root path to catalog directory", type=str)
parser.add_argument("gran", help = "granularite where to start scan AND also path of output catalog", type=str)
args=parser.parse_args()
methode = AtefIdea()
global drs
drs = DRS(args.project)
nbInPool = 4
pool = Pool(nbInPool)
pathDesFichiers = "/bdd/CMIP6/C4MIP/MOHC/.paths.txt" #<= ici les mega fichiers seront dans modf => 1 pour le TGCC ,1 pour l'IDRISS après avoir fait tourner le "super find bash" IDRIS
deb = time.time() # pour benchmarck un peu
dirCat = os.path.join(args.rootDirCat,args.project)
if os.path.exists(dirCat):
rmtree(dirCat)
os.makedirs(dirCat)
tempNb = 20000
i=0
with open(pathDesFichiers) as fp:
for path in fp:
i=i+1
#print(path)
#pool.apply_async(methode.DecodePath, args=(project,root,gran,path,), callback = methode.JobDone)
pool.apply_async(methode.DecodePath, args=(args.project,args.rootDirCat,args.gran,path,))
if i>tempNb:
break
pool.close()
pool.join()
print( "au total : ", (time.time() - deb), " s ")
drs = None #DRS(project)
def main():
###################################################"
......@@ -318,10 +401,10 @@ def main():
parser = argparse.ArgumentParser(description="Script which create an esm catalog (json+csv) from netcdf files in sub-directories from the 'root' dir and put the catalog into 'out_cat' directory using vocabulary as validator from 'project' CV and DRS")
parser.add_argument("project", help="could be CMIP6, CMIP5 , CORDEX", type=str)
parser.add_argument("root", help="root path to scan in order to get an esm cat", type=str)
parser.add_argument("rootDirCat", help="root path to catalog directory", type=str)
parser.add_argument("gran", help = "granularite where to start scan AND also path of output catalog", type=str)
args=parser.parse_args()
#print(args)
print(args)
###################################################"
## Quelle est la DRS que l'on va utiliser pour creer les catalogue => ça depend du project
......@@ -343,39 +426,56 @@ def main():
#pathDesFichiers = "FakeDir/CMIP6_paths.csv" #<= ici les mega fichiers seront dans modf => 1 pour le TGCC ,1 pour l'IDRISS après avoir fait tourner le "super find bash"
pathDesFichiers = "bdd/CMIP6/C4MIP/MOHC/.paths.txt" #<= ici les mega fichiers seront dans modf => 1 pour le TGCC ,1 pour l'IDRISS après avoir fait tourner le "super find bash"
pathDesFichiers = "/bdd/CMIP6/C4MIP/MOHC/.paths.txt" #<= ici les mega fichiers seront dans modf => 1 pour le TGCC ,1 pour l'IDRISS après avoir fait tourner le "super find bash" IDRIS
#pathDesFichiers = "/bdd/CMIP6/C4MIP/IPSL/.paths.txt" #<= ici les mega fichiers seront dans modf => 1 pour le TGCC ,1 pour l'IDRISS après avoir fait tourner le "super find bash" TGCC
# TODO ... dépend du project ^^
deb = time.time() # pour benchmarck un peu
#############################################
# On Vire les anciens pour creer les nouveaux
#############################################
rootDirCat = "bdd/modfs/catalog/"
#rootDirCat = "bdd/modfs/catalog/"
#rootDirCat = "FakeDir/Fakebdd/modfs/catalog/"
dirCat = os.path.join(rootDirCat,args.project)
dirCat = os.path.join(args.rootDirCat,args.project)
if os.path.exists(dirCat):
rmtree(dirCat)
os.makedirs(dirCat)
file = open(pathDesFichiers, 'r')
lines = file.readlines()
tempNb = 2000
for i, path in enumerate(lines):
#print(path)
#print("Lecture du path des fichiers")
#file = open(pathDesFichiers, 'r')
#lines = file.readlines()
#print("Done")
tempNb = 20000
i=0
with open(pathDesFichiers) as fp:
for path in fp:
i=i+1
#print(path)
#pool.apply_async(methode.DecodePath, args=(project,root,gran,path,), callback = methode.JobDone)
pool.apply_async(methode.DecodePath, args=(args.project,args.rootDirCat,args.gran,path,))
if i>tempNb:
break
#for i, path in enumerate(lines):
# print(path)
#pool.apply_async(methode.DecodePath, args=(project,root,gran,path,), callback = methode.JobDone)
pool.apply_async(methode.DecodePath, args=(args.project,args.root,args.gran,path,))
# pool.apply_async(methode.DecodePath, args=(args.project,args.root,args.gran,path,))
if i>tempNb:
break
file.close()
# if i>tempNb:
# break
#file.close()
pool.close()
pool.join()
print("Prepend CSV et creation des json")
prependCSVAndjsonCreation(args.rootDirCat,args.project)
print( "au total : ", (time.time() - deb), " s ")
def prependCSV():
def prependCSVAndjsonCreation(rootDirCat,project):
#project="CMIP6"
#gran = "experiment_id"
......@@ -387,20 +487,24 @@ def prependCSV():
#print(header)
project= "CMIP6"
#project= "CMIP6" # parametre
global drs
drs = DRS(project)
header = "path,project"
for i in range(len(drs.dir_keys)):
header= header + ","+ drs.dir_keys[i]
header = header + ",period_start,period_end,latest"
# header = "path,project"
# ### VIRER la boucle avec la tech du join ...
# for i in range(len(drs.dir_keys)):
# header= header + ","+ drs.dir_keys[i]
# header = header + ",period_start,period_end,latest"
# #print(header)
header = "path,project,"+",".join(drs.dir_keys)+",period_start,period_end,latest"
#rootDirCat = "FakeDir/Fakebdd/modfs/catalog/"
rootDirCat = "bdd/modfs/catalog/"
#rootDirCat = "bdd/modfs/catalog/"
pattern = os.path.join(Path(rootDirCat,project),'*.csv')
#print(pattern)
for path in iglob(pattern):
#print(path)
line_prepender(path,header)
where = rootDirCat+project
SaveJSON(path.split("/")[-1],where,header)
......@@ -430,7 +534,8 @@ def SaveJSON(name,where,header):
dicAsset["format"]="netcdf"
dicToSave["assets"]=dicAsset
mycollectiondict={"collection":dicToSave}
#### A TESTER .. pourqouoi dic de dic là ???? .. le virer ...
mycollectiondict={"collection":dicToSave} #<==== celui là a virer peut être
ren = j2_ESMCollectionTemplate.render(mycollectiondict)
#print(where+"/"+name.split('.')[0]+".json")
fp = open(where+"/"+name.split('.')[0]+".json", 'w')
......@@ -447,7 +552,7 @@ def protoYAML():
pattern = os.path.join(Path(rootDirCat,"CMIP6"),'*.json')
#print(pattern)
for path in iglob(pattern):
print(path)
#print(path)
nameCatwoExtension = path.split("/")[-1].split(".")[0]
#def Add(self,nameSource,args,description,driver,metadata):
mCatFab.Add(nameCatwoExtension,{"esmcol_obj":"{{ CATALOG_DIR }}"+"/CMIP6/"+nameCatwoExtension+".json"},"","intake_esm.esm_datastore",{})
......@@ -456,7 +561,15 @@ def protoYAML():
if __name__ == "__main__":
# execute only if run as a script
#main(project ="CMIP6",root="FakeDir/FakeIDRIS/CMIP6",gran="experiment_id")
print("Main de Base")
main()
prependCSV()
protoYAML()
print("Main Avec creation des csv ET JSON avec le premier worker ")
mainAtefIdea()
# print("Test")
#protoYAML()
pass
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment