• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2110382708

pending completion
2110382708

Pull #24

github

GitHub
Merge 80a89fc1d into bf78f91b7
Pull Request #24: Add CMIP file structure, use pyessv controlled vocabularies, and major refactoring

230 of 1043 new or added lines in 35 files covered. (22.05%)

13 existing lines in 4 files now uncovered.

724 of 3187 relevant lines covered (22.72%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.16
/miranda/convert/_utils.py
1
import datetime
3✔
2
import json
3✔
3
import logging.config
3✔
4
import os
3✔
5
from pathlib import Path
3✔
6
from typing import Dict, Optional, Union
3✔
7

8
import netCDF4
3✔
9
import numpy as np
3✔
10
import regionmask
3✔
11
import xarray as xr
3✔
12
import zarr
3✔
13
from clisops.core import subset
3✔
14
from xclim.core import calendar, units
3✔
15
from xclim.indices import tas
3✔
16

17
from miranda.scripting import LOGGING_CONFIG
3✔
18

19
logging.config.dictConfig(LOGGING_CONFIG)
3✔
20

21
__all__ = [
3✔
22
    "get_chunks_on_disk",
23
    "add_ar6_regions",
24
    "daily_aggregation",
25
    "delayed_write",
26
    "variable_conversion",
27
]
28

29
LATLON_COORDINATE_PRECISION = dict()
3✔
30
LATLON_COORDINATE_PRECISION["era5-land"] = 4
3✔
31

32
VERSION = datetime.datetime.now().strftime("%Y.%m.%d")
3✔
33

34

35
def load_json_data_mappings(project: str) -> dict:
3✔
NEW
36
    data_folder = Path(__file__).parent / "data"
×
37

NEW
38
    if project in ["era5-single-levels", "era5-land"]:
×
NEW
39
        metadata_definition = json.load(open(data_folder / "ecmwf_cf_attrs.json"))
×
NEW
40
    elif project in ["agcfsr", "agmerra2"]:  # This should handle the AG versions:
×
NEW
41
        metadata_definition = json.load(open(data_folder / "nasa_cf_attrs.json"))
×
NEW
42
    elif project == "nrcan-gridded-10km":
×
NEW
43
        raise NotImplementedError()
×
NEW
44
    elif project == "wfdei-gem-capa":
×
NEW
45
        metadata_definition = json.load(open(data_folder / "usask_cf_attrs.json"))
×
46
    else:
NEW
47
        raise NotImplementedError()
×
48

NEW
49
    return metadata_definition
×
50

51

52
def get_chunks_on_disk(nc_file: Union[os.PathLike, str]) -> dict:
3✔
53
    """
54

55
    Parameters
56
    ----------
57
    nc_file: Path or str
58

59
    Returns
60
    -------
61
    dict
62
    """
63
    # FIXME: This does not support zarr
64
    # TODO: This needs to be optimized for dask. See: https://github.com/Ouranosinc/miranda/pull/24/files#r840617216
NEW
65
    ds = netCDF4.Dataset(nc_file)
×
66
    chunks = dict()
×
67
    for v in ds.variables:
×
68
        chunks[v] = dict()
×
69
        for ii, dim in enumerate(ds[v].dimensions):
×
70
            chunks[v][dim] = ds[v].chunking()[ii]
×
71
    return chunks
×
72

73

74
def add_ar6_regions(ds: xr.Dataset) -> xr.Dataset:
3✔
75
    """Add the IPCC AR6 Regions to dataset.
76

77
    Parameters
78
    ----------
79
    ds : xarray.Dataset
80

81
    Returns
82
    -------
83
    xarray.Dataset
84
    """
NEW
85
    mask = regionmask.defined_regions.ar6.all.mask(ds.lon, ds.lat)
×
NEW
86
    ds = ds.assign_coords(region=mask)
×
NEW
87
    return ds
×
88

89

90
def variable_conversion(ds: xr.Dataset, project: str, output_format: str) -> xr.Dataset:
3✔
91
    """Convert variables to CF-compliant format"""
92

NEW
93
    def _correct_units_names(d: xr.Dataset, p: str, m: Dict):
×
94
        key = "_corrected_units"
×
NEW
95
        for v in d.data_vars:
×
NEW
96
            if p in m["variable_entry"][v][key].keys():
×
NEW
97
                d[v].attrs["units"] = m["variable_entry"][v][key][project]
×
98

NEW
99
        if "time" in m["variable_entry"].keys():
×
NEW
100
            if p in m["variable_entry"]["time"][key].keys():
×
NEW
101
                d["time"].attrs["units"] = m["variable_entry"]["time"][key][project]
×
102

NEW
103
        return d
×
104

105
    # for de-accumulation or conversion to flux
NEW
106
    def _transform(d: xr.Dataset, p: str, m: Dict):
×
107
        key = "_transformation"
×
108
        d_out = xr.Dataset(coords=d.coords, attrs=d.attrs)
×
109
        for vv in d.data_vars:
×
NEW
110
            if p in m["variable_entry"][vv][key].keys():
×
NEW
111
                if m["variable_entry"][vv][key][p] == "deaccumulate":
×
112
                    try:
×
NEW
113
                        freq = xr.infer_freq(ds.time)
×
NEW
114
                        if freq is None:
×
NEW
115
                            raise TypeError()
×
UNCOV
116
                        offset = (
×
117
                            float(calendar.parse_offset(freq)[0])
118
                            if calendar.parse_offset(freq)[0] != ""
119
                            else 1.0
120
                        )
121
                    except TypeError:
×
122
                        logging.error(
×
123
                            f"Unable to parse the time frequency for variable `{vv}`. "
124
                            "Verify data integrity before retrying."
125
                        )
126
                        raise
×
127

128
                    # accumulated hourly to hourly flux (de-accumulation)
129
                    with xr.set_options(keep_attrs=True):
×
130
                        out = d[vv].diff(dim="time")
×
131
                        out = d[vv].where(
×
132
                            d[vv].time.dt.hour == int(offset), out.broadcast_like(d[vv])
133
                        )
134
                        out = units.amount2rate(out)
×
135
                    d_out[out.name] = out
×
NEW
136
                elif m["variable_entry"][vv][key][p] == "amount2rate":
×
137
                    out = units.amount2rate(
×
138
                        d[vv],
139
                        out_units=m["variable_entry"][vv]["units"],
140
                    )
141
                    d_out[out.name] = out
×
142
                else:
143
                    raise NotImplementedError(
×
144
                        f"Unknown transformation: {m['variable_entry'][vv][key][p]}"
145
                    )
146

147
            else:
148
                d_out[vv] = d[vv]
×
149
        return d_out
×
150

151
    # For converting variable units to standard workflow units
NEW
152
    def _units_cf_conversion(d: xr.Dataset, m: Dict) -> xr.Dataset:
×
NEW
153
        descriptions = m["variable_entry"]
×
154

NEW
155
        if "time" in m["variable_entry"].keys():
×
NEW
156
            d["time"]["units"] = m["variable_entry"]["time"]["units"]
×
157

158
        for v in d.data_vars:
×
159
            d[v] = units.convert_units_to(d[v], descriptions[v]["units"])
×
160

UNCOV
161
        return d
×
162

163
    # Add and update existing metadata fields
NEW
164
    def _metadata_conversion(d: xr.Dataset, p: str, o: str, m: Dict) -> xr.Dataset:
×
165

166
        # Add global attributes
NEW
167
        d.attrs.update(m["Header"])
×
NEW
168
        d.attrs.update(dict(project=p, format=o))
×
169

170
        # Date-based versioning
171
        d.attrs.update(dict(version=VERSION))
×
172

173
        history = (
×
174
            f"{d.attrs['history']}\n[{datetime.datetime.now()}] Converted from original data to {o}"
175
            " with modified metadata for CF-like compliance."
176
        )
177
        d.attrs.update(dict(history=history))
×
NEW
178
        descriptions = m["variable_entry"]
×
179

NEW
180
        if "time" in m["variable_entry"].keys():
×
NEW
181
            descriptions["time"].pop("_corrected_units")
×
182

183
        # Add variable metadata
184
        for v in d.data_vars:
×
185
            descriptions[v].pop("_corrected_units")
×
186
            descriptions[v].pop("_transformation")
×
187
            d[v].attrs.update(descriptions[v])
×
188

189
        # Rename data variables
190
        for v in d.data_vars:
×
191
            try:
×
192
                cf_name = descriptions[v]["_cf_variable_name"]
×
193
                d = d.rename({v: cf_name})
×
194
                d[cf_name].attrs.update(dict(original_variable=v))
×
195
                del d[cf_name].attrs["_cf_variable_name"]
×
196
            except (ValueError, IndexError):
×
197
                pass
×
198
        return d
×
199

200
    # For renaming lat and lon dims
NEW
201
    def _dims_conversion(d: xr.Dataset):
×
202
        sort_dims = []
×
203
        for orig, new in dict(longitude="lon", latitude="lat").items():
×
204
            try:
×
205

206
                d = d.rename({orig: new})
×
207
                if new == "lon" and np.any(d.lon > 180):
×
208
                    lon1 = d.lon.where(d.lon <= 180.0, d.lon - 360.0)
×
209
                    d[new] = lon1
×
210
                sort_dims.append(new)
×
211
            except KeyError:
×
212
                pass
×
213
            if project in LATLON_COORDINATE_PRECISION.keys():
×
214
                d[new] = d[new].round(LATLON_COORDINATE_PRECISION[project])
×
215
        if sort_dims:
×
216
            d = d.sortby(sort_dims)
×
217
        return d
×
218

NEW
219
    metadata_definition = load_json_data_mappings(project)
×
NEW
220
    ds = _correct_units_names(ds, project, metadata_definition)
×
NEW
221
    ds = _transform(ds, project, metadata_definition)
×
NEW
222
    ds = _units_cf_conversion(ds, metadata_definition)
×
NEW
223
    ds = _metadata_conversion(ds, project, output_format, metadata_definition)
×
UNCOV
224
    ds = _dims_conversion(ds)
×
225

226
    return ds
×
227

228

229
def daily_aggregation(ds) -> Dict[str, xr.Dataset]:
3✔
NEW
230
    logging.info("Creating daily upscaled climate variables.")
×
231

232
    daily_dataset = dict()
×
233
    for variable in ds.data_vars:
×
234
        if variable in ["tas", "tdps"]:
×
235
            # Some looping to deal with memory consumption issues
236
            for v, func in {
×
237
                f"{variable}max": "max",
238
                f"{variable}min": "min",
239
                f"{variable}": "mean",
240
            }.items():
241
                ds_out = xr.Dataset()
×
242
                ds_out.attrs = ds.attrs.copy()
×
243
                ds_out.attrs["frequency"] = "day"
×
244

245
                method = (
×
246
                    f"time: {func}{'imum' if func != 'mean' else ''} (interval: 1 day)"
247
                )
248
                ds_out.attrs["cell_methods"] = method
×
249

250
                if v == "tas" and not hasattr(ds, "tas"):
×
251
                    ds_out[v] = tas(tasmax=ds.tasmax, tasmin=ds.tasmin)
×
252
                else:
253
                    # Thanks for the help, xclim contributors
254
                    r = ds[variable].resample(time="D")
×
255
                    ds_out[v] = getattr(r, func)(dim="time", keep_attrs=True)
×
256

257
                daily_dataset[v] = ds_out
×
258
                del ds_out
×
259

260
        elif variable in ["evspsblpot", "pr", "prsn", "snd", "snw"]:
×
261
            ds_out = xr.Dataset()
×
262
            ds_out.attrs = ds.attrs.copy()
×
263
            ds_out.attrs["frequency"] = "day"
×
264
            ds_out.attrs["cell_methods"] = "time: mean (interval: 1 day)"
×
265
            logging.info(f"Converting {variable} to daily time step (daily mean).")
×
266
            ds_out[variable] = (
×
267
                ds[variable].resample(time="D").mean(dim="time", keep_attrs=True)
268
            )
269

270
            daily_dataset[variable] = ds_out
×
271
            del ds_out
×
272
        else:
273
            continue
×
274

275
    return daily_dataset
×
276

277

278
def threshold_land_sea_mask(
3✔
279
    ds: Union[xr.Dataset, str, os.PathLike],
280
    *,
281
    land_sea_mask: Dict[str, Union[os.PathLike, str]],
282
    land_sea_percentage: int = 50,
283
    output_folder: Optional[Union[str, os.PathLike]] = None,
284
) -> Optional[Path]:
285
    """Land-Sea mask operations.
286

287
    Parameters
288
    ----------
289
    ds: Union[xr.Dataset, str, os.PathLike]
290
    land_sea_mask: dict
291
    land_sea_percentage: int
292
    output_folder: str or os.PathLike, optional
293

294
    Returns
295
    -------
296
    Path
297
    """
298
    file_name = ""
×
299
    if isinstance(ds, (str, os.PathLike)):
×
300
        if output_folder is not None:
×
301
            output_folder = Path(output_folder)
×
302
            file_name = f"{Path(ds).stem}_land-sea-masked.nc"
×
303
        ds = xr.open_dataset(ds)
×
304

305
    if output_folder is not None and file_name == "":
×
306
        logging.warning(
×
307
            "Cannot generate filenames from xarray.Dataset objects. Consider writing NetCDF manually."
308
        )
309

310
    try:
×
311
        project = ds.attrs["project"]
×
312
    except KeyError:
×
313
        raise ValueError("No 'project' found for given dataset.")
×
314

315
    if project in land_sea_mask.keys():
×
316
        logging.info(
×
317
            f"Masking variable with land-sea mask at {land_sea_percentage} % cutoff."
318
        )
319
        land_sea_mask_variable, lsm_file = land_sea_mask[project]
×
320
        lsm_raw = xr.open_dataset(lsm_file)
×
321
        try:
×
322
            lsm_raw = lsm_raw.rename({"longitude": "lon", "latitude": "lat"})
×
323
        except ValueError:
×
324
            raise
×
325

326
        lon_bounds = np.array([ds.lon.min(), ds.lon.max()])
×
327
        lat_bounds = np.array([ds.lat.min(), ds.lat.max()])
×
328

329
        lsm = subset.subset_bbox(
×
330
            lsm_raw,
331
            lon_bnds=lon_bounds,
332
            lat_bnds=lat_bounds,
333
        ).load()
334
        lsm = lsm.where(lsm[land_sea_mask_variable] > float(land_sea_percentage) / 100)
×
335
        if project == "era5":
×
336
            ds = ds.where(lsm[land_sea_mask].isel(time=0, drop=True).notnull())
×
337
            try:
×
338
                ds = ds.rename({"longitude": "lon", "latitude": "lat"})
×
339
            except ValueError:
×
340
                raise
×
341
        elif project in ["merra2", "cfsr"]:
×
342
            ds = ds.where(lsm[land_sea_mask].notnull())
×
343

344
        ds.attrs["land_sea_cutoff"] = f"{land_sea_percentage} %"
×
345

346
        if len(file_name) > 0:
×
347
            out = output_folder / file_name
×
348
            ds.to_netcdf(out)
×
349
            return out
×
350
        return ds
×
NEW
351
    raise RuntimeError("'Project' was not found.")
×
352

353

354
def delayed_write(
3✔
355
    ds: xr.Dataset,
356
    outfile: Path,
357
    target_chunks: dict,
358
    output_format: str,
359
    overwrite: bool,
360
):
361
    """
362

363
    Parameters
364
    ----------
365
    ds: Union[xr.Dataset, str, os.PathLike]
366
    outfile
367
    target_chunks
368
    output_format
369
    overwrite
370

371
    Returns
372
    -------
373

374
    """
375
    # Set correct chunks in encoding options
NEW
376
    kwargs = dict()
×
NEW
377
    kwargs["encoding"] = dict()
×
NEW
378
    for name, da in ds.data_vars.items():
×
NEW
379
        chunks = list()
×
NEW
380
        for dim in da.dims:
×
NEW
381
            if dim in target_chunks.keys():
×
NEW
382
                chunks.append(target_chunks[str(dim)])
×
383
            else:
NEW
384
                chunks.append(len(da[dim]))
×
385

NEW
386
        if output_format == "netcdf":
×
NEW
387
            kwargs["encoding"][name] = {
×
388
                "chunksizes": chunks,
389
                "zlib": True,
390
            }
NEW
391
            kwargs["compute"] = False
×
NEW
392
            if not overwrite:
×
NEW
393
                kwargs["mode"] = "a"
×
NEW
394
        elif output_format == "zarr":
×
NEW
395
            ds = ds.chunk(target_chunks)
×
NEW
396
            kwargs["encoding"][name] = {
×
397
                "chunks": chunks,
398
                "compressor": zarr.Blosc(),
399
            }
NEW
400
            kwargs["compute"] = False
×
NEW
401
            if overwrite:
×
NEW
402
                kwargs["mode"] = "w"
×
NEW
403
    if kwargs["encoding"]:
×
NEW
404
        kwargs["encoding"]["time"] = {"dtype": "int32"}
×
405

NEW
406
    return getattr(ds, f"to_{output_format}")(outfile, **kwargs)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc