Commit b6d25b64 authored by Anthony Schrapffer's avatar Anthony Schrapffer
Browse files

Initialization of the interpolation tool for hydrological data

parent c67d9243
#!/bin/bash
#
#########################################
#
# Script to build and maintain en environment to run
# the python code in this directory. We verify that
# all needed packages are available.
# This is based on the anaconda python system : https://www.anaconda.com/distribution/
#
# Usage : source Environment
#
#########################################
#
# Functions
#
function hasanaconda (){
ipsl="ipsl camelot climserv ciclad"
idris="idris jean-zay"
case $(hostname -d) in
climserv*|private.ipsl.fr|ipsl.polytechnique.fr)
echo "IPSL"
eval "$1='ipsl'"
;;
idris*|jean-zay*)
echo "IDRIS"
eval "$1='idris'"
;;
*)
echo "unknown machine : $(hostname -d)"
eval "$1='unknown'"
esac
}
function checkenv (){
rm -f checkenv.py
cat <<EOF > checkenv.py
import mpi4py
import spherical_geometry
import netCDF4
import matplotlib
import mpl_toolkits
import configparser
import numba
import cartopy
print ("OK")
EOF
python checkenv.py > /dev/null
if [ $? -gt 0 ] ; then
echo "Environment not OK. Some packages must be missing."
echo "This was tested with the checkenv.py code."
else
rm -f checkenv.py
fi
}
#
# Main
#
loc=''
hasanaconda loc
if [ loc != "unknown" ] ; then
#
# Make sure the right version of Python
#
case $loc in
"ipsl")
module unload python
module load python/3.6-anaconda50
#
# Test if the MPI environment is installed.
#
if [ ! -e ${HOME}/.conda/envs/MPI ] ; then
#
# Create the MPI environment as it does not exist.
#
conda create -y -n MPI mpi4py numba astropy netcdf4 matplotlib basemap ConfigParser
source activate MPI
conda install -y cartopy
conda install -y --channel astropy spherical-geometry
else
source activate MPI
fi
;;
"idris")
module load python/3.7.3
if [[ -L ${HOME}/.conda && -d ${HOME}/.conda ]] ; then
if [ ! -e ${WORK}/.conda/envs/MPI ] ; then
conda create -y -n MPI mpi4py numba astropy netcdf4 matplotlib basemap ConfigParser cartopy
conda activate MPI
if [[ -L ${HOME}/.local && -d ${HOME}/.local ]] ; then
export PYTHONUSERBASE=$WORK/.local
if [ $(find $PYTHONUSERBASE -name spherical_geometry -ls | wc -l) -le 0 ] ; then
git clone https://github.com/spacetelescope/spherical_geometry.git
cd spherical_geometry
python setup.py install --user
fi
else
echo "Make sure \$HOME/.link is a link to your \$WORK/.local"
fi
else
conda activate MPI
fi
else
echo "Make sure \$HOME/.conda is a link to your \$WORK/.conda"
fi
;;
*)
echo "Environment for $loc is not foreseen yet"
exit
;;
esac
fi
#
# Verify we can load all the needed modules.
#
checkenv
#
#
#
1) Complete the run.def file
2) Change the configuation of REMAP.pbs if necessary
/!\ the number of core and nbcore must be coherent !
3) Launch it $ REMAP.pbs
#!/bin/bash
#
#PBS -N remap_MERIT
#
#PBS -j oe
#PBS -l nodes=2:ppn=32
#PBS -l walltime=128:00:00
#PBS -l vmem=300gb,mem=300gb
#
cd ${PBS_O_WORKDIR}
#
# Set the right Python 3 Anaconda environment
#
source ./Environment
#
\rm *.txt
python ./main_multiprocess.py
ls -l
from netCDF4 import Dataset
import numpy as np
from numba import jit
#from spherical_geometry import polygon
from shapely.geometry import Polygon
import time
import sys
EarthRadius=6370000.0
######################################
@jit(nopython = True)
def boxit(cent, dlon, dlat, polyres) :
boxll=[]
loninc = dlon/polyres
latinc = dlat/polyres
# Side 1
for pts in range(polyres) :
boxll.append([cent[0]-dlon/2.0+loninc*pts, cent[1]+dlat/2.0])
# Side 2
for pts in range(polyres) :
boxll.append([cent[0]+dlon/2.0, cent[1]+dlat/2.0-latinc*pts])
# Side 3
for pts in range(polyres) :
boxll.append([cent[0]+dlon/2.0-loninc*pts, cent[1]-dlat/2.0])
# Side 4
for pts in range(polyres) :
boxll.append([cent[0]-dlon/2.0, cent[1]-dlat/2.0+latinc*pts])
# Close
boxll.append(boxll[0])
return boxll
@jit(nopython = True)
def maxradius(cent, lon, lat) :
radius=[]
for lx, ly in zip(lon, lat) :
radius.append(np.sqrt((cent[0]-lx)**2+(cent[1]-ly)**2))
return np.max(np.array(radius))
@jit(nopython = True)
def loladist(a,b) :
return np.sqrt((a[0]-b[0])**2+(a[1]-b[1])**2)
######################################
class GlobalGrid:
def __init__(self, fdir, lon_range, lat_range):
nc = Dataset(fdir, "r")
fp = nc.variables["floodplains"][:]
self.njg, self.nig = fp.shape
print("Global grid start")
self.land = nc.variables["floodplains"][:]
# Recut over a domaine
lon_full = nc.variables["lon"][:]
lat_full = nc.variables["lat"][:]
if lon_range is not None:
i0 = np.argmin(np.abs(lon_full-np.min(lon_range)))
i1 = np.argmin(np.abs(lon_full-np.max(lon_range)))
imin, imax = np.sort([i0, i1])
if lat_range is not None:
j0 = np.argmin(np.abs(lat_full-np.min(lat_range)))
j1 = np.argmin(np.abs(lat_full-np.max(lat_range)))
jmin, jmax = np.sort([j0, j1])
self.nj = jmax-jmin+1
self.jgstart = jmin
self.ni = imax-imin+1
self.igstart = imin
self.nbland = int(np.sum(self.land[jmin:jmax+1,imin:imax+1]))
######################################
class AncillaryData:
def __init__(self, dire_glwd, part, varname):
istart = part.istart
ni = part.ni
jstart = part.jstart
nj = part.nj
self.nc = Dataset(dire_glwd, "r")
self.lon = self.nc.variables["lon"][istart:istart+ni]
self.lat = self.nc.variables["lat"][jstart:jstart+nj]
self.data = self.nc.variables[varname][jstart:jstart+nj,istart:istart+ni]
# This condition may be changed !!
# Here we are dealing with fraction of grid points
A = np.where(self.data > 0)
self.ind_land = [[A[0][i],A[1][i]] for i in range(len(A[0]))]
self.centers = [[self.lon[i],self.lat[j]] for j,i in self.ind_land]
self.values = [ self.data[j,i] for j,i in self.ind_land]
self.hdlon = np.mean(np.abs(np.diff(self.lon)))
self.hdlat = np.mean(np.abs(np.diff(self.lat)))
self.hd = max(np.abs(self.hdlon), np.abs(self.hdlat))
def get_extent(self):
extent = [np.min(self.lon)-self.hd, np.max(self.lon)+self.hd, np.min(self.lat)-self.hd, np.max(self.lat)+self.hd]
return extent
def corners(self, c) :
boxll = boxit(np.array(c), self.hdlon, self.hdlat, 2)
#lats = [p[1] for p in boxll]
#lons = [p[0] for p in boxll]
poly = Polygon(boxll)
if not poly.is_valid:
poly = poly.buffer(0)
#poly = polygon.SphericalPolygon.from_lonlat(lons, lats, center=cent)
return poly
#
######################################
#
class HydroGrid:
def __init__(self, dhydrogrid, extent):
nc = Dataset(dhydrogrid, "r")
self.lon = nc.variables["nav_lon"][0,:]
self.lat = nc.variables["nav_lat"][:,0]
self.hdlon = np.mean(np.abs(np.diff(self.lon)))
self.hdlat = np.mean(np.abs(np.diff(self.lat)))
self.hd = 1* max(abs(self.hdlon), abs(self.hdlat))
# Get the limits of the DOMAIN
i0 = np.argmin(np.abs(self.lon-(extent[0]-self.hd)))
i1 = np.argmin(np.abs(self.lon-(extent[1]+self.hd)))
i0, i1 = np.sort([i0,i1])
j0 = np.argmin(np.abs(self.lat-(extent[2]-self.hd)))
j1 = np.argmin(np.abs(self.lat-(extent[3]+self.hd)))
j0, j1 = np.sort([j0,j1])
self.limits = [j0, j1+1, i0, i1+1]
self.trip = nc.variables["trip"][j0:j1+1, i0:i1+1]
self.lon = self.lon[i0:i1+1]
self.lat = self.lat[j0:j1+1]
self.ind_land = [[j,i] for j in range(j1+1-j0) for i in range(i1+1-i0)]
A = np.where(self.trip.mask == False)
if len(A) == 1 :
self.ind_land = [[j,i] for j in range(j1+1-j0) for i in range(i1+1-i0)]
else:
self.ind_land = [[A[0][i],A[1][i]] for i in range(len(A[0]))]
self.get_corners()
#
def get_corners(self):
self.centers = [np.array([self.lon[i], self.lat[j]]) for j,i in self.ind_land]
#
def get_poly(self, i):
boxll = boxit(self.centers[i], self.hdlon, self.hdlat, 2)
#poly = polygon.SphericalPolygon.from_lonlat( llon, llat, center=self.centers[index])
poly = Polygon(boxll)
if not poly.is_valid:
poly = poly.buffer(0)
return poly
#
def select(self, c, r) :
indices = [i for i, cm in enumerate(self.centers) if (loladist(np.array(c),cm) <= r+self.hd)]
subcenters = [self.centers[i] for i in indices]
polylist = [self.get_poly(i) for i in indices]
#polyll, polylist = self.select_corners(subcenters)
return indices, polylist, subcenters
import numpy as np
import sys
from numba import jit
from netCDF4 import Dataset
#
def halfpartition(partin, land) :
"""
First partition of the domain
The domain with more land points is halved
"""
A = [p["nbland"] for p in partin]
i = np.argmax(A)
partout = []
partout = [partin[k] for k in range(len(partin)) if k != i]
dom = partin[i]
if dom["nbi"] > dom["nbj"] :
h = int(dom["nbi"]/2)
new_dom = {"nbi":dom["nbi"]-h,"nbj":dom["nbj"], \
"istart":dom["istart"]+h,"jstart":dom["jstart"]}
dom["nbi"] = h
else :
h = int(dom["nbj"]/2)
new_dom = {"nbi":dom["nbi"],"nbj":dom["nbj"]-h, \
"istart":dom["istart"],"jstart":dom["jstart"]+h}
dom["nbj"] = h
new_dom['nbland'] = int(np.nansum(land[new_dom["jstart"]:new_dom["jstart"]+new_dom["nbj"],\
new_dom["istart"]:new_dom["istart"]+new_dom["nbi"]]))
dom['nbland'] = dom['nbland']-new_dom['nbland']
if dom["nbland"]>0: partout.append(dom)
if new_dom["nbland"]>0: partout.append(new_dom)
return partout
#
# Divide in two the domain with the most number of land points
#
def fit_partition(partin, land):
partout = []
for i, dom in enumerate(partin):
if dom["nbland"] > 0 :
l = land[dom["jstart"]:dom["jstart"]+dom["nbj"],dom["istart"]:dom["istart"]+dom["nbi"]]
l1 = np.sum(l, axis = 0)
i0 = np.where(l1>0)[0][0]
i1 = np.where(l1>0)[0][-1]
l2 = np.ma.sum(l, axis = 1)
j0 = np.where(l2>0)[0][0]
j1 = np.where(l2>0)[0][-1]
dom["jstart"] = j0 + dom["jstart"]
dom["nbj"] = j1-j0+1
dom["istart"] = i0 + dom["istart"]
dom["nbi"] = i1-i0+1
dom["nbland"] = int(np.nansum(land[dom["jstart"]:dom["jstart"]+dom["nbj"],dom["istart"]:dom["istart"]+dom["nbi"]]))
partout.append(dom)
return partout
#
#
#
def adjustpart(partin, land, nbcore) :
nbland=np.array([dom['nbland'] for dom in partin])
nbptproc=np.array([dom['nbi']*dom['nbj'] for dom in partin])
zeros=np.nonzero(nbland < 1)[0]
#
# Delete partitions without land points.
#
for i in range(len(zeros)) :
partin.pop(zeros[i]-i)
#
# Divide domains with the largest number of land points
#
nbptproc=np.array([dom['nbi']*dom['nbj'] for dom in partin])
freecores = nbcore-len(partin)
while nbcore-len(partin) > 0:
partin = halfpartition(partin, land)
partin = fit_partition(partin, land)
nbptproc=np.array([dom['nbi']*dom['nbj'] for dom in partin])
nbland = [dom["nbland"] for dom in partin]
return partin
#
#
#
def partitiondom(part_in, land, nbcore) :
partout = part_in
while len(partout) < nbcore :
partout = halfpartition(partout, land)
#
partout = adjustpart(partout, land, nbcore)
return partout
#
#
#
class partition :
def __init__ (self, gg, nbcore) :
#
# Self varibales
#
part_in = [{"nbi":gg.ni,"nbj":gg.nj, "istart":gg.igstart, "jstart":gg.jgstart, "nbland":gg.nbland}]
self.part = partitiondom(part_in, gg.land, nbcore)
# Information of domain on local processor
#
def get_part(self, rank):
part = part_proc(self.part, rank)
return part
#
#############
#
class part_proc:
def __init__(self, part, rank):
self.rank = rank
self.ni = part[rank]["nbi"]
self.nj = part[rank]["nbj"]
self.istart = part[rank]["istart"]
self.jstart = part[rank]["jstart"]
self.nbland = part[rank]["nbland"]
#
import numpy as np
import numpy.ma as ma
import time
#
EarthRadius=6370000.0
#
def interpolate(M, F, option = "conservative"):
anc_new = ma.zeros(M.trip.shape)
anc_new.mask = M.trip.mask
for c, value in zip(F.centers, F.values):
# Polygone from floodplains
hydrocell = F.corners(c)
r = F.hd
# Corresponding MERIT grid points
indices, polylist, subcenters = M.select(c,r)
# F.fp[jfp, ifp] = 1 si sélectionné
OVERLAP = []
if len(indices) > 0:
for i, cell in enumerate(polylist):
try:
area_in = cell.intersection(hydrocell).area
except:
area_in = 0
if area_in > 0:
frac_modelgrid = area_in/cell.area
frac_input = area_in/hydrocell.area
OVERLAP.append([indices[i], frac_modelgrid, frac_input])
# Now distribute the value in function of the chosen option
# The overlap may be the opportunity ti distribute with
# an order of priority
# Ex : value of input is less than 100% and we want to distribute it
# over the lowest model grid points
if option == "conservative":
for ind, frac_modelgrid, frac_input in OVERLAP:
jm, im = M.ind_land[ind]
anc_new[jm,im] = anc_new[jm,im] + frac_modelgrid*value
return anc_new
#
from netCDF4 import Dataset
import numpy as np
import numpy.ma as ma
def create_output(dhydrogrid, P_list, output_name, varname):
#
nc = Dataset(dhydrogrid, "r")
lon = nc.variables["nav_lon"][0,:]
lat = nc.variables["nav_lat"][:,0]
nig = lon.shape[0]
njg = lat.shape[0]
# Get the data
ancnew = np.zeros((njg, nig))
# Mask the output
# it will be unmasked where we processed the ancillary data
#
for part, anc_arr in P_list:
j0, j1, i0, i1 = part
ancnew[j0:j1, i0:i1] = ancnew[j0:j1, i0:i1] + anc_arr
# Normalize
ancnew = np.rint(ancnew*10)/10
#ancnew = ma.masked_where(ancnew == -1, ancnew)
# Start the netCDF
with Dataset(output_name,'w') as foo:
dlat = foo.createDimension('lat', njg)
dlon = foo.createDimension('lon', nig)
#
latitudes = foo.createVariable('lat', np.float32,('lat',))
longitudes = foo.createVariable('lon', np.float32,('lon',))
#
latitudes[:] = lat[:]
longitudes[:] = lon[:]
#
# Fonction de name_var !!
anc = foo.createVariable(varname, np.float32,('lat','lon'))#, zlib = True)
anc[:,:] = ancnew[:,:]
foo.sync()
import sys
sys.path.append("./lib")
from Hydro import AncillaryData, HydroGrid, GlobalGrid
from Partition import partition
from intersection import interpolate
from output import create_output
import numpy as np
from netCDF4 import Dataset
import multiprocessing
import configparser
#
#############
config=configparser.ConfigParser()
config.read("run.def")
EarthRadius=config.getfloat("OverAll", "EarthRadius", fallback=6370000.0)
nloop=config.getint("OverAll", "nloop", fallback=100)
# Automatize with number of proc available
nbcore=config.getint("OverAll", "nbcore", fallback=1)
dinput=config.get("OverAll", "dinput")
dhydrogrid=config.get("OverAll", "dhydrogrid")
doutput=config.get("OverAll", "doutput")
varname=config.get("OverAll", "varname")
option=config.get("OverAll", "option", fallback="conservative")
lon_range=np.array(config.get("OverAll", "WEST_EAST", fallback="-180., 180.").split(","),dtype=float)
lat_range=np.array(config.get("OverAll", "SOUTH_NORTH", fallback="-90., 90.").split(","),dtype=float)
#############
# Distribute partitions
gg = GlobalGrid(dinput, lon_range, lat_range)
P = partition(gg, nloop*nbcore)
del gg.land
#############
# MAIN FUNCTION FOR PARALLELIZATION
def calculate(part_rank):
part = P.get_part(part_rank)
# Ancillary data
anc = AncillaryData(dinput, part, varname)
extent = anc.get_extent()
#
# Hydrogrid
#
HG = HydroGrid(dhydrogrid, extent)