#!/usr/bin/env python3 """Converts an adt_YYYY-MM-DD.mat file to a triplet of shapefiles. The mat file should be in HDF5 format. The data is the set of detected instantaneous eddies at a given date. """ import shapefile import numpy as np from os import path import h5py import datetime import f90nml def define_fields(writers): """writers is a dictionary of shapefile.Writer objects.""" writers["extr"].field("ssh", "N", 13, 6) writers["extr"].field("date_index", "N", 5) writers["extr"].field("eddy_index", "N", 5) writers["extr"].field("interpolat", "N", 1) writers["extr"].field("cyclone", "N", 1) writers["extr"].field("valid", "N", 1) writers["extr"].field("speed", "N", 13, 6) writers["outer"].field("r_eq_area", "N", 10, 4) writers["outer"].field("ssh", "N", 13, 6) writers["outer"].field("date_index", "N", 5) writers["outer"].field("eddy_index", "N", 5) writers["outer"].field("radius4", "N", 2) writers["max_speed"].field("r_eq_area", "N", 10, 4) writers["max_speed"].field("ssh", "N", 13, 6) writers["max_speed"].field("date_index", "N", 5) writers["max_speed"].field("eddy_index", "N", 5) def write(writers, cell, cyclone): """writers is a dictionary. cell[:].transpose() is a numpy.ndarray with shape (number of eddies, 21). cyclone is a boolean. """ eddy_index = 0 for eddy in cell[:].transpose(): # eddy is a numpy.ndarray with shape (21,). Note that # amplitudes in eddy[7] and eddy[11] are positive regardless # of eddy orientation. eddy_index += 1 lon = matlab_data[eddy[0]][:].item() lat = matlab_data[eddy[1]][:].item() writers["extr"].point(lon, lat) i = np.argwhere(X == lon).item() j = np.argwhere(Y == lat).item() speed = matlab_data[eddy[12]][:].item() if np.isnan(speed): speed = 1e4 writers["extr"].record(ssh = matlab_data["ADT"][j, i], date_index = date_index, eddy_index = eddy_index, interpolat = 0, cyclone = cyclone, valid = 1, speed = speed) if cyclone: ssh = matlab_data["ADT"][j, i] + matlab_data[eddy[7]][:].item() else: ssh = matlab_data["ADT"][j, i] - matlab_data[eddy[7]][:].item() r_eq_area = matlab_data[eddy[6]][:].item() writers["outer"].record(r_eq_area = r_eq_area, ssh = ssh, date_index = date_index, eddy_index = eddy_index, radius4 = - 1) lon = matlab_data[eddy[4]][:].squeeze() lat = matlab_data[eddy[5]][:].squeeze() polyline = np.stack((lon, lat), axis = 1) writers["outer"].poly([polyline]) if cyclone: ssh = matlab_data["ADT"][j, i] + matlab_data[eddy[11]][:].item() else: ssh = matlab_data["ADT"][j, i] - matlab_data[eddy[11]][:].item() r_eq_area = matlab_data[eddy[10]][:].item() writers["max_speed"].record(r_eq_area = r_eq_area, ssh = ssh, date_index = date_index, eddy_index = eddy_index) lon = matlab_data[eddy[8]][:].squeeze() lat = matlab_data[eddy[9]][:].squeeze() polyline = np.stack((lon, lat), axis = 1) writers["max_speed"].poly([polyline]) n_shapes = len(writers["extr"]) writers["ishape_last"].write(str(n_shapes - 1) + "\n") year = 1993 my_date = datetime.date(year, 1, 1) final_date = datetime.date(year, 1, 1) factories = [dict(writers = {}, cell_name = "Anticyclonic_Cell", cyclone = False, SHP_triplet = "Snapshot_anti"), dict(writers = {}, cell_name = "Cyclonic_Cell", cyclone = True, SHP_triplet = "Snapshot_cyclo")] for f in factories: file = path.join(f["SHP_triplet"], "extremum") f["writers"]["extr"] = shapefile.Writer(file, shapeType = shapefile.POINT) file = path.join(f["SHP_triplet"], "outermost_contour") f["writers"]["outer"] = shapefile.Writer(file, shapeType = shapefile.POLYGON) file = path.join(f["SHP_triplet"], "max_speed_contour") f["writers"]["max_speed"] = shapefile.Writer(file, shapeType = shapefile.POLYGON) define_fields(f["writers"]) file = path.join(f["SHP_triplet"], "ishape_last.txt") f["writers"]["ishape_last"] = open(file, "w") # First date, get X and Y: adt_file = my_date.strftime("adt_%Y-%m-%d.mat") with h5py.File(adt_file, "r") as matlab_data: X = matlab_data["X"][:].squeeze() Y = matlab_data["Y"][:].squeeze() while my_date <= final_date: adt_file = my_date.strftime("adt_%Y-%m-%d.mat") matlab_data = h5py.File(adt_file, "r") date_index = matlab_data["date_num"][:].item() date_index = int(date_index) assert datetime.date.fromordinal(date_index - 366) == my_date date_index = my_date - datetime.date(1950, 1, 1) date_index = date_index.days for f in factories: write(f["writers"], matlab_data[f["cell_name"]], f["cyclone"]) matlab_data.close() my_date += datetime.timedelta(1) nml = f90nml.Namelist() nml["grid_nml"] = dict(corner_deg = [X[0], Y[0]], nlon = X.size, nlat = Y.size) for f in factories: nml.write(path.join(f["SHP_triplet"], "grid_nml.txt"), force = True) for k, v in f["writers"].items(): v.close()