Commit be87ccf7 authored by Clément Haëck's avatar Clément Haëck
Browse files

Do not use delayed calls for compute_hi

it might cause some memory problems
parent bf4b9124
......@@ -15,6 +15,8 @@ from lib.xarray_utils import save_chunks_by_date
def main(args):
lgr = lib.Logger()
lgr.msg("Getting data")
if args['data'] == 'ostia':
grid = lib.data.ostia.grid
ds = lib.data.ostia.get_data(args)
......@@ -28,10 +30,16 @@ def main(args):
hi.attrs = {}
sst = ds['sst']
lgr.end()
print(ds)
lgr.msg("Computing window size")
sx, sy = get_scale(hi, args['scale'], grid)
lgr.msg("Computing components")
hi = compute_components(sst, hi, sx, sy)
lgr.msg("Writing to disk")
wd = path.join(lib.data.hi.get_root(args), str(args['year']))
filename = path.join(wd, 'HI_%Y%m%d.nc')
encoding = {'_all': {'zlib': True,
......@@ -39,8 +47,9 @@ def main(args):
'dtype': 'int16',
'scale_factor': 1e-3}}
lib.setup_metadata(hi, args)
save_chunks_by_date(hi, filename=filename,
save_chunks_by_date(hi, filename=filename, delayed=False,
encoding=encoding)
lgr.end()
return hi
......
......@@ -21,4 +21,4 @@ def preprocess(ds):
lib.data.create_dataset(__name__, PREGEX, ROOT, ARGS,
open_mf_kw=dict(preprocess=preprocess))
open_mf_kw=dict(preprocess=preprocess))
......@@ -26,22 +26,27 @@ def split_by_chunks(dataset):
yield dataset[selection]
def save_chunks_by_date(ds, filename, **kwargs):
def write_dataset(ds, path, **kwargs):
if 'encoding' in kwargs:
_all = kwargs['encoding'].pop('_all', None)
if _all is not None:
for k, z in _all.items():
for var in ds.data_vars:
kwargs['encoding'].setdefault(var, {})
kwargs['encoding'][var][k] = z
def save_chunks_by_date(ds, filename, delayed=True, **kwargs):
kwargs = kwargs.copy()
if 'encoding' in kwargs:
_all = kwargs['encoding'].pop('_all', None)
if _all is not None:
for k, z in _all.items():
for var in ds.data_vars:
kwargs['encoding'].setdefault(var, {})
kwargs['encoding'][var][k] = z
def write_dataset(ds, path, **kwargs):
ds.to_netcdf(path, **kwargs)
datasets = list(split_by_chunks(ds))
filenames = [str(d.time[0].dt.strftime(filename).values)
for d in datasets]
filenames = [str(d.time[0].dt.strftime(filename).values) for d in datasets]
result = [dask.delayed(write_dataset)(ds, path, **kwargs)
for ds, path in zip(datasets, filenames)]
dask.compute(result)
if delayed:
result = [dask.delayed(write_dataset)(ds, path, **kwargs)
for ds, path in zip(datasets, filenames)]
dask.compute(result)
else:
for ds, path in zip(datasets, filenames):
write_dataset(ds, path, **kwargs)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment