• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2110382708

pending completion
2110382708

Pull #24

github

GitHub
Merge 80a89fc1d into bf78f91b7
Pull Request #24: Add CMIP file structure, use pyessv controlled vocabularies, and major refactoring

230 of 1043 new or added lines in 35 files covered. (22.05%)

13 existing lines in 4 files now uncovered.

724 of 3187 relevant lines covered (22.72%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.39
/miranda/convert/_reanalysis.py
1
import logging.config
3✔
2
import os
3✔
3
from pathlib import Path
3✔
4
from typing import Dict, List, Optional, Sequence, Union
3✔
5

6
import dask.config
3✔
7
import numpy as np
3✔
8
import xarray as xr
3✔
9
from clisops.core import subset
3✔
10
from dask import compute
3✔
11
from dask.diagnostics import ProgressBar
3✔
12
from xclim.core import calendar
3✔
13

14
from miranda.gis.subset import subsetting_domains
3✔
15
from miranda.scripting import LOGGING_CONFIG
3✔
16
from miranda.utils import chunk_iterables
3✔
17

18
from ._data_definitions import (
3✔
19
    reanalysis_project_institutes,
20
    xarray_frequencies_to_cmip6like,
21
)
22
from ._utils import (
3✔
23
    daily_aggregation,
24
    delayed_write,
25
    get_chunks_on_disk,
26
    variable_conversion,
27
)
28

29
logging.config.dictConfig(LOGGING_CONFIG)
3✔
30

31

32
dask.config.set(local_directory=f"{Path(__file__).parent}/dask_workers/")
3✔
33

34

35
# Needed pre-processing function
36
def _drop_those_time_bnds(dataset: xr.Dataset) -> xr.Dataset:
3✔
NEW
37
    if "time_bnds" in dataset.variables:
×
NEW
38
        return dataset.drop_vars(["time_bnds"])
×
NEW
39
    return dataset
×
40

41

42
def reanalysis_processing(
3✔
43
    data: Dict[str, List[Union[str, os.PathLike]]],
44
    output_folder: Union[str, os.PathLike],
45
    variables: Sequence[str],
46
    aggregate: Union[str, bool] = False,
47
    domains: Union[str, List[str]] = "_DEFAULT",
48
    start: Optional[str] = None,
49
    end: Optional[str] = None,
50
    target_chunks: Optional[dict] = None,
51
    output_format: str = "netcdf",
52
    overwrite: bool = False,
53
    engine: str = "h5netcdf",
54
    n_workers: int = 4,
55
    **dask_kwargs,
56
) -> None:
57
    """
58

59
    Parameters
60
    ----------
61
    data: Dict[str, List[str]]
62
    output_folder: Union[str, os.PathLike]
63
    variables: Sequence[str]
64
    aggregate: {"day", None}
65
    domains: {"QC", "CAN", "AMNO", "GLOBAL"}
66
    start: str, optional
67
    end: str, optional
68
    target_chunks: dict, optional
69
    output_format: {"netcdf", "zarr"}
70
    overwrite: bool
71
    engine: {"netcdf4", "h5netcdf"}
72
    n_workers: int
73

74
    Returns
75
    -------
76
    None
77
    """
NEW
78
    if output_format == "netcdf":
×
NEW
79
        suffix = ".nc"
×
NEW
80
    elif output_format == "zarr":
×
NEW
81
        suffix = ".zarr"
×
82
    else:
NEW
83
        raise NotImplementedError(f"`output_format`: '{output_format}")
×
84

NEW
85
    with ProgressBar(), dask.config.set(
×
86
        **{"array.slicing.split_large_chunks": False},
87
        n_workers=n_workers,
88
        **dask_kwargs,
89
    ):
NEW
90
        out_files = Path(output_folder)
×
NEW
91
        if isinstance(domains, str):
×
NEW
92
            domains = [domains]
×
93

NEW
94
        for domain in domains:
×
NEW
95
            if domain == "_DEFAULT":
×
NEW
96
                logging.warning("No domain specified. proceeding with 'not-specified'")
×
NEW
97
                output_folder = output_folder
×
NEW
98
                domain = "not-specified"
×
NEW
99
            elif isinstance(domain, str):
×
NEW
100
                output_folder = out_files.joinpath(domain)  # noqa
×
101
            else:
NEW
102
                raise NotImplementedError()
×
103

NEW
104
            output_folder.mkdir(exist_ok=True)
×
105

NEW
106
            for project, in_files in data.items():
×
NEW
107
                logging.info(
×
108
                    f"Processing {project} data{f' for domain {domain}' if domain !='not_specified' else ''}."
109
                )
NEW
110
                for var in variables:
×
111
                    # Select only for variable of interest
NEW
112
                    multi_files = sorted(x for x in in_files if f"{var}_" in str(x))
×
113

NEW
114
                    if multi_files:
×
NEW
115
                        all_chunks = get_chunks_on_disk(multi_files[0])
×
NEW
116
                        chunks = all_chunks[var]
×
117

NEW
118
                        if target_chunks is None:
×
NEW
119
                            output_chunks = dict()
×
NEW
120
                            mappings = dict(longitude="lon", latitude="lat")
×
NEW
121
                            for k, v in chunks.items():
×
NEW
122
                                if k in mappings.keys():
×
NEW
123
                                    output_chunks[mappings[k]] = v
×
124
                                else:
NEW
125
                                    output_chunks[k] = v
×
126

NEW
127
                            logging.warning(
×
128
                                "No `target_chunks` set. "
129
                                f"Proceeding with following found chunks: {output_chunks}"
130
                            )
131
                        else:
NEW
132
                            output_chunks = target_chunks
×
133

NEW
134
                        logging.info(f"Resampling variable `{var}`.")
×
135

NEW
136
                        if aggregate:
×
NEW
137
                            time_freq = aggregate
×
138
                        else:
NEW
139
                            parse_freq = calendar.parse_offset(
×
140
                                xr.infer_freq(xr.open_dataset(multi_files[0]).time)
141
                            )
NEW
142
                            time_freq = f"{parse_freq[0]}{xarray_frequencies_to_cmip6like[parse_freq[1]]}"
×
143

NEW
144
                        institute = reanalysis_project_institutes[project]
×
NEW
145
                        file_name = "_".join([var, time_freq, institute, project])
×
NEW
146
                        if domain != "not-specified":
×
NEW
147
                            file_name = f"{file_name}_{domain}"
×
148

NEW
149
                        xr_kwargs = dict(
×
150
                            chunks=chunks,
151
                            engine=engine,
152
                            preprocess=_drop_those_time_bnds,
153
                            parallel=True,
154
                        )
155

156
                        # Subsetting operations
NEW
157
                        if domain.lower() in ["global", "not-specified"]:
×
NEW
158
                            if start or end:
×
NEW
159
                                ds = subset.subset_time(
×
160
                                    xr.open_mfdataset(multi_files, **xr_kwargs),
161
                                    start_date=start,
162
                                    end_date=end,
163
                                )
164
                            else:
NEW
165
                                ds = xr.open_mfdataset(multi_files, **xr_kwargs)
×
166
                        else:
NEW
167
                            region = subsetting_domains(domain)
×
NEW
168
                            lon_values = np.array([region[1], region[3]])
×
NEW
169
                            lat_values = np.array([region[0], region[2]])
×
170

NEW
171
                            ds = subset.subset_bbox(
×
172
                                xr.open_mfdataset(multi_files, **xr_kwargs),
173
                                lon_bnds=lon_values,
174
                                lat_bnds=lat_values,
175
                                start_date=start,
176
                                end_date=end,
177
                            )
178

NEW
179
                        ds.attrs.update(dict(frequency=time_freq, domain=domain))
×
NEW
180
                        ds = variable_conversion(
×
181
                            ds, project=project, output_format=output_format
182
                        )
183

NEW
184
                        if time_freq.lower() == "day":
×
NEW
185
                            dataset = daily_aggregation(ds)
×
NEW
186
                            freq = "YS"
×
187
                        else:
NEW
188
                            out_variable = (
×
189
                                list(ds.data_vars)[0]
190
                                if len(list(ds.data_vars)) == 1
191
                                else None
192
                            )
NEW
193
                            dataset = {out_variable: ds}
×
NEW
194
                            freq = "MS"
×
195

NEW
196
                        if len(dataset) == 0:
×
NEW
197
                            logging.warning(
×
198
                                f"Daily aggregation methods for variable `{var}` are not supported. "
199
                                "Continuing..."
200
                            )
201

NEW
202
                        for key in dataset.keys():
×
NEW
203
                            ds = dataset[key]
×
204

205
                            # TODO: What do we do about multivariable files. Are they even allowed?
NEW
206
                            out_variable = (
×
207
                                list(ds.data_vars)[0]
208
                                if len(list(ds.data_vars)) == 1
209
                                else None
210
                            )
NEW
211
                            file_name1 = file_name.replace(
×
212
                                f"{var}_", f"{out_variable}_"
213
                            )
214

NEW
215
                            logging.info(f"Writing out fixed files for {file_name1}.")
×
NEW
216
                            years, datasets = zip(*ds.resample(time=freq))
×
NEW
217
                            if freq == "MS":
×
NEW
218
                                format_str = "%Y-%m"
×
NEW
219
                                iterable_chunks = 36
×
220
                            else:
NEW
221
                                format_str = "%Y"
×
NEW
222
                                iterable_chunks = 10
×
223

NEW
224
                            out_filenames = [
×
225
                                output_folder.joinpath(
226
                                    f"{file_name1}_{xr.DataArray(year).dt.strftime(format_str).values}{suffix}"
227
                                )
228
                                for year in years
229
                            ]
230

NEW
231
                            jobs = list()
×
NEW
232
                            if output_format != "zarr" and overwrite:
×
NEW
233
                                logging.warning(
×
234
                                    f"Removing existing {output_format} files for {var}."
235
                                )
NEW
236
                            for i, d in enumerate(datasets):
×
NEW
237
                                if (
×
238
                                    out_filenames[i].exists()
239
                                    and out_filenames[i].is_file()
240
                                    and overwrite
241
                                ):
NEW
242
                                    out_filenames[i].unlink()
×
243

NEW
244
                                if not out_filenames[i].exists() or (
×
245
                                    out_filenames[i].is_dir() and overwrite
246
                                ):
NEW
247
                                    jobs.append(
×
248
                                        delayed_write(
249
                                            d,
250
                                            out_filenames[i],
251
                                            output_chunks,
252
                                            output_format,
253
                                            overwrite,
254
                                        )
255
                                    )
256

NEW
257
                            if len(jobs) == 0:
×
NEW
258
                                logging.warning(
×
259
                                    f"All output files for `{var}` currently exist."
260
                                    " To overwrite them, set `overwrite=True`. Continuing..."
261
                                )
262
                            else:
NEW
263
                                chunked_jobs = chunk_iterables(jobs, iterable_chunks)
×
NEW
264
                                logging.info(f"Processing jobs for variable `{var}`.")
×
NEW
265
                                iterations = 0
×
NEW
266
                                for chunk in chunked_jobs:
×
NEW
267
                                    iterations += 1
×
NEW
268
                                    logging.info(f"Writing out job chunk {iterations}.")
×
NEW
269
                                    compute(chunk)
×
270
                    else:
NEW
271
                        logging.info(f"No files found for variable {var}.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc