• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2197509238

pending completion
2197509238

Pull #33

github

GitHub
Merge ddfe0e667 into d55f76503
Pull Request #33: Support CORDEX and CMIP5/6

2 of 26 new or added lines in 5 files covered. (7.69%)

571 existing lines in 15 files now uncovered.

737 of 3286 relevant lines covered (22.43%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.59
/miranda/convert/_utils.py
1
import datetime
3✔
2
import json
3✔
3
import logging.config
3✔
4
import os
3✔
5
from pathlib import Path
3✔
6
from typing import Dict, Optional, Union
3✔
7

8
import netCDF4
3✔
9
import numpy as np
3✔
10
import regionmask
3✔
11
import xarray as xr
3✔
12
import zarr
3✔
13
from clisops.core import subset
3✔
14
from xclim.core import calendar, units
3✔
15
from xclim.indices import tas
3✔
16

17
from miranda.scripting import LOGGING_CONFIG
3✔
18

19
logging.config.dictConfig(LOGGING_CONFIG)
3✔
20

21
__all__ = [
3✔
22
    "get_chunks_on_disk",
23
    "add_ar6_regions",
24
    "daily_aggregation",
25
    "delayed_write",
26
    "variable_conversion",
27
]
28

29
LATLON_COORDINATE_PRECISION = dict()
3✔
30
LATLON_COORDINATE_PRECISION["era5-land"] = 4
3✔
31

32
VERSION = datetime.datetime.now().strftime("%Y.%m.%d")
3✔
33

34

35
def load_json_data_mappings(project: str) -> dict:
3✔
UNCOV
36
    data_folder = Path(__file__).parent / "data"
×
37

UNCOV
38
    if project.startswith("era5"):
×
39
        metadata_definition = json.load(open(data_folder / "ecmwf_cf_attrs.json"))
×
40
    elif project in ["agcfsr", "agmerra2"]:  # This should handle the AG versions:
×
41
        metadata_definition = json.load(open(data_folder / "nasa_cf_attrs.json"))
×
42
    elif project == "nrcan-gridded-10km":
×
43
        raise NotImplementedError()
×
44
    elif project == "wfdei-gem-capa":
×
45
        metadata_definition = json.load(open(data_folder / "usask_cf_attrs.json"))
×
46
    else:
UNCOV
47
        raise NotImplementedError()
×
48

UNCOV
49
    return metadata_definition
×
50

51

52
def get_chunks_on_disk(nc_file: Union[os.PathLike, str]) -> dict:
3✔
53
    """
54

55
    Parameters
56
    ----------
57
    nc_file: Path or str
58

59
    Returns
60
    -------
61
    dict
62
    """
63
    # FIXME: This does not support zarr
64
    # TODO: This needs to be optimized for dask. See: https://github.com/Ouranosinc/miranda/pull/24/files#r840617216
UNCOV
65
    ds = netCDF4.Dataset(nc_file)
×
66
    chunks = dict()
×
67
    for v in ds.variables:
×
68
        chunks[v] = dict()
×
69
        for ii, dim in enumerate(ds[v].dimensions):
×
70
            chunks[v][dim] = ds[v].chunking()[ii]
×
71
    return chunks
×
72

73

74
def add_ar6_regions(ds: xr.Dataset) -> xr.Dataset:
3✔
75
    """Add the IPCC AR6 Regions to dataset.
76

77
    Parameters
78
    ----------
79
    ds : xarray.Dataset
80

81
    Returns
82
    -------
83
    xarray.Dataset
84
    """
UNCOV
85
    mask = regionmask.defined_regions.ar6.all.mask(ds.lon, ds.lat)
×
86
    ds = ds.assign_coords(region=mask)
×
87
    return ds
×
88

89

90
def variable_conversion(ds: xr.Dataset, project: str, output_format: str) -> xr.Dataset:
3✔
91
    """Convert variables to CF-compliant format"""
92

UNCOV
93
    def _get_time_frequency(d: xr.Dataset):
×
94
        freq = xr.infer_freq(d.time)
×
95
        if freq is None:
×
96
            raise TypeError()
×
97
        offset = (
×
98
            [int(calendar.parse_offset(freq)[0]), calendar.parse_offset(freq)[1]]
99
            if calendar.parse_offset(freq)[0] != ""
100
            else [1.0, "h"]
101
        )
102

UNCOV
103
        time_units = {
×
104
            "s": "second",
105
            "m": "minute",
106
            "h": "hour",
107
            "D": "day",
108
            "W": "week",
109
            "Y": "year",
110
        }
111
        if offset[1] in ["S", "M", "H"]:
×
112
            offset[1] = offset[1].lower()
×
113
        offset_meaning = time_units[offset[1]]
×
114
        return offset, offset_meaning
×
115

UNCOV
116
    def _correct_units_names(d: xr.Dataset, p: str, m: Dict):
×
UNCOV
117
        key = "_corrected_units"
×
UNCOV
118
        for v in d.data_vars:
×
119
            if p in m["variable_entry"][v][key].keys():
×
120
                d[v].attrs["units"] = m["variable_entry"][v][key][project]
×
121

UNCOV
122
        if "time" in m["variable_entry"].keys():
×
123
            if p in m["variable_entry"]["time"][key].keys():
×
124
                d["time"].attrs["units"] = m["variable_entry"]["time"][key][project]
×
125

126
        return d
×
127

128
    # for de-accumulation or conversion to flux
UNCOV
129
    def _transform(d: xr.Dataset, p: str, m: Dict):
×
130
        key = "_transformation"
×
131
        d_out = xr.Dataset(coords=d.coords, attrs=d.attrs)
×
132
        for vv in d.data_vars:
×
UNCOV
133
            if p in m["variable_entry"][vv][key].keys():
×
134
                try:
×
UNCOV
135
                    offset, offset_meaning = _get_time_frequency(d)
×
UNCOV
136
                except TypeError:
×
137
                    logging.error(
×
138
                        f"Unable to parse the time frequency for variable `{vv}`. "
139
                        "Verify data integrity before retrying."
140
                    )
141
                    raise
×
142

143
                if m["variable_entry"][vv][key][p] == "deaccumulate":
×
144
                    # Time-step accumulated total to time-based flux (de-accumulation)
UNCOV
145
                    logging.info(f"De-accumulating units for variable `{vv}`.")
×
UNCOV
146
                    with xr.set_options(keep_attrs=True):
×
147
                        out = d[vv].diff(dim="time")
×
148
                        out = d[vv].where(
×
149
                            getattr(d[vv].time.dt, offset_meaning) == offset[0],
150
                            out.broadcast_like(d[vv]),
151
                        )
152
                        out = units.amount2rate(out)
×
153
                    d_out[out.name] = out
×
154
                elif m["variable_entry"][vv][key][p] == "amount2rate":
×
155
                    # frequency-based totals to time-based flux
156
                    logging.info(
×
157
                        f"Performing amount-to-rate units conversion for variable `{vv}`."
158
                    )
UNCOV
159
                    out = units.amount2rate(
×
160
                        d[vv],
161
                        out_units=m["variable_entry"][vv]["units"],
162
                    )
163
                    d_out[out.name] = out
×
164
                else:
UNCOV
165
                    raise NotImplementedError(
×
166
                        f"Unknown transformation: {m['variable_entry'][vv][key][p]}"
167
                    )
168
            else:
169
                d_out[vv] = d[vv]
×
170
        return d_out
×
171

172
    def _offset_time(d: xr.Dataset, p: str, m: Dict) -> xr.Dataset:
×
UNCOV
173
        key = "_offset_time"
×
174
        d_out = xr.Dataset(coords=d.coords, attrs=d.attrs)
×
175
        for vv in d.data_vars:
×
UNCOV
176
            if p in m["variable_entry"][vv][key].keys():
×
UNCOV
177
                try:
×
178
                    offset, offset_meaning = _get_time_frequency(d)
×
179
                except TypeError:
×
UNCOV
180
                    logging.error(
×
181
                        f"Unable to parse the time frequency for variable `{vv}`. "
182
                        "Verify data integrity before retrying."
183
                    )
184
                    raise
×
185

186
                if m["variable_entry"][vv][key][p]:
×
187
                    # Offset time by value of one time-step
188
                    logging.info(
×
189
                        f"Offsetting data for `{vv}` by `{offset[0]} {offset_meaning}(s)`."
190
                    )
191
                    with xr.set_options(keep_attrs=True):
×
UNCOV
192
                        out = d[vv]
×
193
                        out["time"] = out.time - np.timedelta64(offset[0], offset[1])
×
UNCOV
194
                        d_out[out.name] = out
×
195
                else:
NEW
196
                    logging.info(
×
197
                        f"No time offsetting needed for `{vv}` in `{p}` (Explicitly set to False)."
198
                    )
199
                    d_out = d
×
200
            else:
NEW
UNCOV
201
                logging.info(f"No time offsetting needed for `{vv}` in project `{p}`.")
×
NEW
UNCOV
202
                d_out = d
×
NEW
203
        return d_out
×
204

205
    # For converting variable units to standard workflow units
206
    def _units_cf_conversion(d: xr.Dataset, m: Dict) -> xr.Dataset:
×
UNCOV
207
        descriptions = m["variable_entry"]
×
208

UNCOV
209
        if "time" in m["variable_entry"].keys():
×
210
            d["time"]["units"] = m["variable_entry"]["time"]["units"]
×
211

UNCOV
212
        for v in d.data_vars:
×
213
            d[v] = units.convert_units_to(d[v], descriptions[v]["units"])
×
214

215
        return d
×
216

217
    # Add and update existing metadata fields
218
    def _metadata_conversion(d: xr.Dataset, p: str, o: str, m: Dict) -> xr.Dataset:
×
NEW
219
        logging.info("Converting metadata to CF-like conventions.")
×
220

221
        # Add global attributes
UNCOV
222
        d.attrs.update(m["Header"])
×
UNCOV
223
        d.attrs.update(dict(project=p, format=o))
×
224

225
        # Date-based versioning
226
        d.attrs.update(dict(version=VERSION))
×
227

228
        history = (
×
229
            f"{d.attrs['history']}\n[{datetime.datetime.now()}] Converted from original data to {o}"
230
            " with modified metadata for CF-like compliance."
231
        )
232
        d.attrs.update(dict(history=history))
×
UNCOV
233
        descriptions = m["variable_entry"]
×
234

235
        if "time" in m["variable_entry"].keys():
×
236
            descriptions["time"].pop("_corrected_units")
×
237

238
        # Add variable metadata
UNCOV
239
        for v in d.data_vars:
×
240
            descriptions[v].pop("_corrected_units")
×
241
            descriptions[v].pop("_offset_time")
×
242
            descriptions[v].pop("_transformation")
×
243
            d[v].attrs.update(descriptions[v])
×
244

245
        # Rename data variables
246
        for v in d.data_vars:
×
247
            try:
×
248
                cf_name = descriptions[v]["_cf_variable_name"]
×
249
                d = d.rename({v: cf_name})
×
250
                d[cf_name].attrs.update(dict(original_variable=v))
×
251
                del d[cf_name].attrs["_cf_variable_name"]
×
UNCOV
252
            except (ValueError, IndexError):
×
253
                pass
×
254
        return d
×
255

256
    # For renaming lat and lon dims
257
    def _dims_conversion(d: xr.Dataset):
×
258
        sort_dims = []
×
259
        for orig, new in dict(longitude="lon", latitude="lat").items():
×
UNCOV
260
            try:
×
261

UNCOV
262
                d = d.rename({orig: new})
×
UNCOV
263
                if new == "lon" and np.any(d.lon > 180):
×
UNCOV
264
                    lon1 = d.lon.where(d.lon <= 180.0, d.lon - 360.0)
×
265
                    d[new] = lon1
×
UNCOV
266
                sort_dims.append(new)
×
267
            except KeyError:
×
268
                pass
×
269
            if project in LATLON_COORDINATE_PRECISION.keys():
×
UNCOV
270
                d[new] = d[new].round(LATLON_COORDINATE_PRECISION[project])
×
271
        if sort_dims:
×
UNCOV
272
            d = d.sortby(sort_dims)
×
UNCOV
273
        return d
×
274

UNCOV
275
    metadata_definition = load_json_data_mappings(project)
×
276
    ds = _correct_units_names(ds, project, metadata_definition)
×
277
    ds = _transform(ds, project, metadata_definition)
×
278
    ds = _offset_time(ds, project, metadata_definition)
×
UNCOV
279
    ds = _units_cf_conversion(ds, metadata_definition)
×
280
    ds = _metadata_conversion(ds, project, output_format, metadata_definition)
×
UNCOV
281
    ds = _dims_conversion(ds)
×
282

283
    return ds
×
284

285

286
def daily_aggregation(ds) -> Dict[str, xr.Dataset]:
3✔
UNCOV
287
    logging.info("Creating daily upscaled climate variables.")
×
288

289
    daily_dataset = dict()
×
290
    for variable in ds.data_vars:
×
UNCOV
291
        if variable in ["tas", "tdps"]:
×
292
            # Some looping to deal with memory consumption issues
293
            for v, func in {
×
294
                f"{variable}max": "max",
295
                f"{variable}min": "min",
296
                f"{variable}": "mean",
297
            }.items():
298
                ds_out = xr.Dataset()
×
299
                ds_out.attrs = ds.attrs.copy()
×
300
                ds_out.attrs["frequency"] = "day"
×
301

UNCOV
302
                method = (
×
303
                    f"time: {func}{'imum' if func != 'mean' else ''} (interval: 1 day)"
304
                )
305
                ds_out.attrs["cell_methods"] = method
×
306

UNCOV
307
                if v == "tas" and not hasattr(ds, "tas"):
×
308
                    ds_out[v] = tas(tasmax=ds.tasmax, tasmin=ds.tasmin)
×
309
                else:
310
                    # Thanks for the help, xclim contributors
UNCOV
311
                    r = ds[variable].resample(time="D")
×
UNCOV
312
                    ds_out[v] = getattr(r, func)(dim="time", keep_attrs=True)
×
313

UNCOV
314
                daily_dataset[v] = ds_out
×
UNCOV
315
                del ds_out
×
316

UNCOV
317
        elif variable in ["evspsblpot", "pr", "prsn", "snd", "snr", "snw"]:
×
UNCOV
318
            ds_out = xr.Dataset()
×
UNCOV
319
            ds_out.attrs = ds.attrs.copy()
×
UNCOV
320
            ds_out.attrs["frequency"] = "day"
×
UNCOV
321
            ds_out.attrs["cell_methods"] = "time: mean (interval: 1 day)"
×
UNCOV
322
            logging.info(f"Converting {variable} to daily time step (daily mean).")
×
UNCOV
323
            ds_out[variable] = (
×
324
                ds[variable].resample(time="D").mean(dim="time", keep_attrs=True)
325
            )
326

UNCOV
327
            daily_dataset[variable] = ds_out
×
UNCOV
328
            del ds_out
×
329
        else:
UNCOV
330
            continue
×
331

UNCOV
332
    return daily_dataset
×
333

334

335
def threshold_land_sea_mask(
3✔
336
    ds: Union[xr.Dataset, str, os.PathLike],
337
    *,
338
    land_sea_mask: Dict[str, Union[os.PathLike, str]],
339
    land_sea_percentage: int = 50,
340
    output_folder: Optional[Union[str, os.PathLike]] = None,
341
) -> Optional[Path]:
342
    """Land-Sea mask operations.
343

344
    Parameters
345
    ----------
346
    ds: Union[xr.Dataset, str, os.PathLike]
347
    land_sea_mask: dict
348
    land_sea_percentage: int
349
    output_folder: str or os.PathLike, optional
350

351
    Returns
352
    -------
353
    Path
354
    """
355
    file_name = ""
×
356
    if isinstance(ds, (str, os.PathLike)):
×
357
        if output_folder is not None:
×
358
            output_folder = Path(output_folder)
×
359
            file_name = f"{Path(ds).stem}_land-sea-masked.nc"
×
UNCOV
360
        ds = xr.open_dataset(ds)
×
361

362
    if output_folder is not None and file_name == "":
×
UNCOV
363
        logging.warning(
×
364
            "Cannot generate filenames from xarray.Dataset objects. Consider writing NetCDF manually."
365
        )
366

UNCOV
367
    try:
×
UNCOV
368
        project = ds.attrs["project"]
×
369
    except KeyError:
×
370
        raise ValueError("No 'project' field found for given dataset.")
×
371

372
    if project in land_sea_mask.keys():
×
373
        logging.info(
×
374
            f"Masking variable with land-sea mask at {land_sea_percentage} % cutoff."
375
        )
376
        land_sea_mask_variable, lsm_file = land_sea_mask[project]
×
377
        lsm_raw = xr.open_dataset(lsm_file)
×
UNCOV
378
        try:
×
379
            lsm_raw = lsm_raw.rename({"longitude": "lon", "latitude": "lat"})
×
UNCOV
380
        except ValueError:
×
381
            raise
×
382

383
        lon_bounds = np.array([ds.lon.min(), ds.lon.max()])
×
384
        lat_bounds = np.array([ds.lat.min(), ds.lat.max()])
×
385

386
        lsm = subset.subset_bbox(
×
387
            lsm_raw,
388
            lon_bnds=lon_bounds,
389
            lat_bnds=lat_bounds,
390
        ).load()
UNCOV
391
        lsm = lsm.where(lsm[land_sea_mask_variable] > float(land_sea_percentage) / 100)
×
UNCOV
392
        if project == "era5":
×
UNCOV
393
            ds = ds.where(lsm[land_sea_mask].isel(time=0, drop=True).notnull())
×
UNCOV
394
            try:
×
UNCOV
395
                ds = ds.rename({"longitude": "lon", "latitude": "lat"})
×
UNCOV
396
            except ValueError:
×
UNCOV
397
                raise
×
UNCOV
398
        elif project in ["merra2", "cfsr"]:
×
UNCOV
399
            ds = ds.where(lsm[land_sea_mask].notnull())
×
400

UNCOV
401
        ds.attrs["land_sea_cutoff"] = f"{land_sea_percentage} %"
×
402

UNCOV
403
        if len(file_name) > 0:
×
UNCOV
404
            out = output_folder / file_name
×
UNCOV
405
            ds.to_netcdf(out)
×
UNCOV
406
            return out
×
UNCOV
407
        return ds
×
UNCOV
408
    raise RuntimeError(f"Project `{project}` was not found in land-sea masks.")
×
409

410

411
def delayed_write(
3✔
412
    ds: xr.Dataset,
413
    outfile: Path,
414
    target_chunks: dict,
415
    output_format: str,
416
    overwrite: bool,
417
):
418
    """
419

420
    Parameters
421
    ----------
422
    ds: Union[xr.Dataset, str, os.PathLike]
423
    outfile
424
    target_chunks
425
    output_format
426
    overwrite
427

428
    Returns
429
    -------
430

431
    """
432
    # Set correct chunks in encoding options
UNCOV
433
    kwargs = dict()
×
UNCOV
434
    kwargs["encoding"] = dict()
×
435
    for name, da in ds.data_vars.items():
×
436
        chunks = list()
×
437
        for dim in da.dims:
×
438
            if dim in target_chunks.keys():
×
439
                chunks.append(target_chunks[str(dim)])
×
440
            else:
441
                chunks.append(len(da[dim]))
×
442

UNCOV
443
        if output_format == "netcdf":
×
UNCOV
444
            kwargs["encoding"][name] = {
×
445
                "chunksizes": chunks,
446
                "zlib": True,
447
            }
UNCOV
448
            kwargs["compute"] = False
×
UNCOV
449
            if not overwrite:
×
UNCOV
450
                kwargs["mode"] = "a"
×
UNCOV
451
        elif output_format == "zarr":
×
UNCOV
452
            ds = ds.chunk(target_chunks)
×
UNCOV
453
            kwargs["encoding"][name] = {
×
454
                "chunks": chunks,
455
                "compressor": zarr.Blosc(),
456
            }
UNCOV
457
            kwargs["compute"] = False
×
UNCOV
458
            if overwrite:
×
UNCOV
459
                kwargs["mode"] = "w"
×
UNCOV
460
    if kwargs["encoding"]:
×
UNCOV
461
        kwargs["encoding"]["time"] = {"dtype": "int32"}
×
462

UNCOV
463
    return getattr(ds, f"to_{output_format}")(outfile, **kwargs)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc