-
We do not need it. we can just have a local variable at initialization.
We do not need it. we can just have a local variable at initialization.
util_eddies.py 9.71 KiB
import math
from os import path
import sys
import os
import shapefile
import numpy as np
class SHPC_class:
"""We are assuming that eddies are indexed from 1, without jump, for
each date.
"""
def __init__(self, SHPC_dir, def_orient):
"""def_orient, for default orientation, is used to define d_init and
n_dates. Since we are going to open files, we might as
well ask the user which orientation will be useful.
"""
if not os.access(SHPC_dir, os.R_OK):
print(f"{SHPC_dir} not readable", file=sys.stderr)
raise OSError
self.dir = SHPC_dir
# <begin> Define self.n_slices
fname = path.join(SHPC_dir, "n_slices.txt")
try:
with open(fname) as f:
self.n_slices = int(f.read())
except FileNotFoundError:
print(fname, "not found. Creating it...")
self.n_slices = 0
while True:
slice_dir = path.join(SHPC_dir, f"Slice_{self.n_slices}")
if not path.isdir(slice_dir):
break
self.n_slices += 1
if self.n_slices == 0:
print("util_eddies:SHPC_class:init:No slice found")
sys.exit(1)
else:
with open(fname, "w", encoding="utf-8") as f:
f.write(f"{self.n_slices}\n")
# <end> Define self.n_slices
self._readers = [
{"Anticyclones": {}, "Cyclones": {}} for i in range(self.n_slices)
]
# <begin> Define self.d_init
self.d_init = []
for i_slice, reader in enumerate(self._readers):
my_shapefile = path.join(
SHPC_dir, f"Slice_{i_slice}", def_orient, "extremum"
)
try:
reader[def_orient]["extremum"] = shapefile.Reader(my_shapefile)
except shapefile.ShapefileException:
# Use the other orientation for this slice:
orientation = other_orient(def_orient)
my_shapefile = path.join(
SHPC_dir, f"Slice_{i_slice}", orientation, "extremum"
)
reader[orientation]["extremum"] = shapefile.Reader(my_shapefile)
else:
orientation = def_orient
if len(reader[orientation]["extremum"]) != 0:
d_init_slice = reader[orientation]["extremum"].record(0).date
else:
print(
f"__init__: Empty shapefile: {my_shapefile}",
file=sys.stderr,
)
d_init_slice = None
self.d_init.append(d_init_slice)
# <end> Define self.d_init
self._ishape_last = [
{"Anticyclones": None, "Cyclones": None}
for i in range(self.n_slices)
]
# Number of eddies at each date:
self._e_max = [
{"Anticyclones": None, "Cyclones": None}
for i in range(self.n_slices)
]
# We only need ishape_last for the last slice, for one
# orientation, in order to define n_dates:
ishape_last = self.get_ishape_last(self.n_slices - 1, def_orient)
self.n_dates = np.ediff1d(self.d_init, to_end=ishape_last.size)
self._i_slice = []
for i_slice, n_dates in enumerate(self.n_dates):
self._i_slice.extend(n_dates * [i_slice])
self._n_shapes = [
{"Anticyclones": 0, "Cyclones": 0} for i in range(self.n_slices)
]
def get_ishape_last(self, i_slice, orientation):
"""Defines and returns self._ishape_last[i_slice][orientation], which
is a numpy array. Can return None if ishape_last.txt does not
exist and cannot be created. Also defines
self._e_max[i_slice][orientation].
"""
if self._ishape_last[i_slice][orientation] is None:
fname = path.join(
self.dir, f"Slice_{i_slice}", orientation, "ishape_last.txt"
)
try:
self._ishape_last[i_slice][orientation] = np.loadtxt(
fname, dtype=int, ndmin=1
)
except OSError:
print("Could not read", fname)
print("Creating it...")
self._ishape_last[i_slice][
orientation
] = self._create_ishape_last(i_slice, orientation)
if self._ishape_last[i_slice][orientation] is not None:
assert self._ishape_last[i_slice][orientation].size != 0
if i_slice != self.n_slices - 1:
assert (
self._ishape_last[i_slice][orientation].size
== self.n_dates[i_slice]
)
self._e_max[i_slice][orientation] = np.ediff1d(
self._ishape_last[i_slice][orientation],
to_begin=self._ishape_last[i_slice][orientation][0] + 1,
)
return self._ishape_last[i_slice][orientation]
def get_reader(self, i_slice, orientation, layer):
if layer not in self._readers[i_slice][orientation]:
my_shapefile = path.join(
self.dir, f"Slice_{i_slice}", orientation, layer
)
try:
self._readers[i_slice][orientation][layer] = shapefile.Reader(
my_shapefile
)
except shapefile.ShapefileException:
print(f"get_reader: {my_shapefile} not found", file=sys.stderr)
self._readers[i_slice][orientation][layer] = None
return self._readers[i_slice][orientation][layer]
def get_n_shapes(self, i_slice, orientation):
if self._n_shapes[i_slice][orientation] == 0:
reader = self.get_reader(i_slice, orientation, layer="extremum")
self._n_shapes[i_slice][orientation] = len(reader)
return self._n_shapes[i_slice][orientation]
def _create_ishape_last(self, i_slice, orientation):
"""Creates the file ishape_last.txt and returns the corresponding
numpy array. Returns None if it cannot get a reader.
"""
last_date = self.d_init[i_slice]
date_ishape = [] # list of tuples (date, ishape)
reader = self.get_reader(i_slice, orientation, "extremum")
if reader is None:
print('Could not create "ishape_last.txt".')
ishape_last = None
else:
for ishape, rec in enumerate(reader.iterRecords(fields=["date"])):
if rec.date != last_date:
date_ishape.append((last_date, ishape - 1))
last_date = rec.date
date_ishape.append((last_date, ishape))
date_ishape = np.array(date_ishape)
# Replace date by repetition of date factor:
date_ishape[:-1, 0] = np.ediff1d(date_ishape[:, 0])
date_ishape[-1, 0] = 1
ishape_last = []
for repet_factor, ishape in date_ishape:
ishape_last.extend(repet_factor * [ishape])
ishape_last = np.array(ishape_last)
fname = path.join(
self.dir, f"Slice_{i_slice}", orientation, "ishape_last.txt"
)
np.savetxt(fname, ishape_last, fmt="%.0d")
return ishape_last
def get_slice(self, date_index):
assert date_index >= self.d_init[0]
return self._i_slice[date_index - self.d_init[0]]
def comp_ishape(self, date, eddy_index, orientation):
"""Compute the location in the shapefiles: return `(i_slice, ishape)`.
Returns None if ishape_last is None or eddy_index is greater
than the maximum value.
"""
i_slice = self.get_slice(date)
ishape_last = self.get_ishape_last(i_slice, orientation)
if ishape_last is None:
ishape = None
else:
k = date - self.d_init[i_slice]
assert k >= 0
assert 1 <= eddy_index
if eddy_index <= self._e_max[i_slice][orientation][k]:
if k == 0:
ishape = eddy_index - 1
else:
# k >= 1
ishape = ishape_last[k - 1] + eddy_index
else:
ishape = None
return i_slice, ishape
def ishape_range(self, date, orientation):
"""Returns an empty list if ishape_last was not found and could not be
created.
"""
i_slice, ishape_start = self.comp_ishape(date, 1, orientation)
if ishape_start is None:
return i_slice, []
else:
ishape_stop = (
self._ishape_last[i_slice][orientation][
date - self.d_init[i_slice]
]
+ 1
)
return i_slice, range(ishape_start, ishape_stop)
def in_window(point, window):
llcrnrlon, llcrnrlat, urcrnrlon, urcrnrlat = window
longit = point[0] + math.ceil((llcrnrlon - point[0]) / 360) * 360
# (in [llcrnrlon, llcrnrlon + 360[)
return longit <= urcrnrlon and llcrnrlat <= point[1] <= urcrnrlat
def other_orient(orientation):
if orientation == "Cyclones":
return "Anticyclones"
else:
return "Cyclones"
def node_to_date_eddy(node_index, e_overestim, only_date=False):
"""Convert from node identification in graph to eddy identification in
shapefiles. Return date_index or (date_index, eddy_index).
"""
k = (node_index - 1) // e_overestim
if only_date:
return k
else:
return k, node_index - k * e_overestim
def date_eddy_to_node(date, eddy_index, e_overestim):
"""Convert from eddy identification in shapefiles to node
identification in graph.
"""
return date * e_overestim + eddy_index