-
Lionel GUEZ authoredLionel GUEZ authored
inst_eddies_HDF5.py 5.41 KiB
#!/usr/bin/env python3
"""Converts an adt_YYYY-MM-DD.mat file to a triplet of shapefiles. The
mat file should be in HDF5 format. The data is the set of detected
instantaneous eddies at a given date.
"""
import shapefile
import numpy as np
from os import path
import h5py
import datetime
import f90nml
def define_fields(writers):
"""writers is a dictionary of shapefile.Writer objects."""
writers["extr"].field("ssh", "N", 13, 6)
writers["extr"].field("date_index", "N", 5)
writers["extr"].field("eddy_index", "N", 5)
writers["extr"].field("interpolat", "N", 1)
writers["extr"].field("cyclone", "N", 1)
writers["extr"].field("valid", "N", 1)
writers["extr"].field("speed", "N", 13, 6)
writers["outer"].field("r_eq_area", "N", 10, 4)
writers["outer"].field("ssh", "N", 13, 6)
writers["outer"].field("date_index", "N", 5)
writers["outer"].field("eddy_index", "N", 5)
writers["outer"].field("radius4", "N", 2)
writers["max_speed"].field("r_eq_area", "N", 10, 4)
writers["max_speed"].field("ssh", "N", 13, 6)
writers["max_speed"].field("date_index", "N", 5)
writers["max_speed"].field("eddy_index", "N", 5)
def write(writers, cell, cyclone):
"""writers is a dictionary. cell[:].transpose() is a numpy.ndarray
with shape (number of eddies, 21). cyclone is a boolean.
"""
eddy_index = 0
for eddy in cell[:].transpose():
# eddy is a numpy.ndarray with shape (21,). Note that
# amplitudes in eddy[7] and eddy[11] are positive regardless
# of eddy orientation.
eddy_index += 1
lon = matlab_data[eddy[0]][:].item()
lat = matlab_data[eddy[1]][:].item()
writers["extr"].point(lon, lat)
i = np.argwhere(X == lon).item()
j = np.argwhere(Y == lat).item()
speed = matlab_data[eddy[12]][:].item()
if np.isnan(speed): speed = 1e4
writers["extr"].record(ssh = matlab_data["ADT"][j, i],
date_index = date_index,
eddy_index = eddy_index, interpolat = 0,
cyclone = cyclone, valid = 1, speed = speed)
if cyclone:
ssh = matlab_data["ADT"][j, i] + matlab_data[eddy[7]][:].item()
else:
ssh = matlab_data["ADT"][j, i] - matlab_data[eddy[7]][:].item()
r_eq_area = matlab_data[eddy[6]][:].item()
writers["outer"].record(r_eq_area = r_eq_area, ssh = ssh,
date_index = date_index,
eddy_index = eddy_index, radius4 = - 1)
lon = matlab_data[eddy[4]][:].squeeze()
lat = matlab_data[eddy[5]][:].squeeze()
polyline = np.stack((lon, lat), axis = 1)
writers["outer"].poly([polyline])
if cyclone:
ssh = matlab_data["ADT"][j, i] + matlab_data[eddy[11]][:].item()
else:
ssh = matlab_data["ADT"][j, i] - matlab_data[eddy[11]][:].item()
r_eq_area = matlab_data[eddy[10]][:].item()
writers["max_speed"].record(r_eq_area = r_eq_area, ssh = ssh,
date_index = date_index,
eddy_index = eddy_index)
lon = matlab_data[eddy[8]][:].squeeze()
lat = matlab_data[eddy[9]][:].squeeze()
polyline = np.stack((lon, lat), axis = 1)
writers["max_speed"].poly([polyline])
n_shapes = len(writers["extr"])
writers["ishape_last"].write(str(n_shapes - 1) + "\n")
year = 1993
my_date = datetime.date(year, 1, 1)
final_date = datetime.date(year, 1, 1)
factories = [dict(writers = {}, cell_name = "Anticyclonic_Cell",
cyclone = False, SHP_triplet = "Snapshot_anti"),
dict(writers = {}, cell_name = "Cyclonic_Cell", cyclone = True,
SHP_triplet = "Snapshot_cyclo")]
for f in factories:
file = path.join(f["SHP_triplet"], "extremum")
f["writers"]["extr"] = shapefile.Writer(file, shapeType = shapefile.POINT)
file = path.join(f["SHP_triplet"], "outermost_contour")
f["writers"]["outer"] = shapefile.Writer(file,
shapeType = shapefile.POLYGON)
file = path.join(f["SHP_triplet"], "max_speed_contour")
f["writers"]["max_speed"] = shapefile.Writer(file,
shapeType = shapefile.POLYGON)
define_fields(f["writers"])
file = path.join(f["SHP_triplet"], "ishape_last.txt")
f["writers"]["ishape_last"] = open(file, "w")
# First date, get X and Y:
adt_file = my_date.strftime("adt_%Y-%m-%d.mat")
with h5py.File(adt_file, "r") as matlab_data:
X = matlab_data["X"][:].squeeze()
Y = matlab_data["Y"][:].squeeze()
while my_date <= final_date:
adt_file = my_date.strftime("adt_%Y-%m-%d.mat")
matlab_data = h5py.File(adt_file, "r")
date_index = matlab_data["date_num"][:].item()
date_index = int(date_index)
assert datetime.date.fromordinal(date_index - 366) == my_date
date_index = my_date - datetime.date(1950, 1, 1)
date_index = date_index.days
for f in factories:
write(f["writers"], matlab_data[f["cell_name"]], f["cyclone"])
matlab_data.close()
my_date += datetime.timedelta(1)
nml = f90nml.Namelist()
nml["grid_nml"] = dict(corner_deg = [X[0], Y[0]], nlon = X.size, nlat = Y.size)
for f in factories:
nml.write(path.join(f["SHP_triplet"], "grid_nml.txt"), force = True)
for k, v in f["writers"].items(): v.close()