• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2266273356

pending completion
2266273356

Pull #33

github

GitHub
Merge d26d6be3c into dad775e9d
Pull Request #33: Support CORDEX and CMIP5/6

32 of 223 new or added lines in 16 files covered. (14.35%)

8 existing lines in 5 files now uncovered.

660 of 3254 relevant lines covered (20.28%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.29
/miranda/convert/_rechunk.py
1
import logging.config
3✔
2
import os
3✔
3
import shutil
3✔
4
import time
3✔
5
from pathlib import Path
3✔
6
from typing import Dict, Optional, Sequence, Union
3✔
7

8
import xarray as xr
3✔
9
import zarr
3✔
10

11
from miranda.scripting import LOGGING_CONFIG
3✔
12

13
from . import reanalysis_project_institutes
3✔
14
from ._data_definitions import era5_variables
3✔
15

16
logging.config.dictConfig(LOGGING_CONFIG)
3✔
17

18

19
__all__ = ["rechunk_reanalysis"]
3✔
20

21

22
def _rechunk_configurator(project, time_step):
3✔
23
    # ~35 Mo chunks
24
    if project.lower() in ["era5-single-levels", "era5", "era5-land"]:
×
25
        if time_step == "1hr":
×
26
            # Chunks for monthly files, optimized for Zarr
27
            target_chunks = {
×
28
                "time": 24 * 7,
29
                "latitude": 225,
30
                "longitude": 252,
31
            }
32
        elif time_step == "day":
×
33
            # Chunks for annual files, optimized for Zarr
34
            target_chunks = {"time": 365, "latitude": 125, "longitude": 125}
×
35
        else:
36
            raise NotImplementedError()
×
37
    else:
38
        raise NotImplementedError()
×
39
    return target_chunks
×
40

41

42
def rechunk_reanalysis(
3✔
43
    project: str,
44
    input_folder: Union[str, os.PathLike],
45
    output_folder: Union[str, os.PathLike],
46
    time_step: Optional[str] = None,
47
    target_chunks: Optional[Dict[str, int]] = None,
48
    variables: Optional[Sequence[str]] = None,
49
    output_format: str = "zarr",
50
    overwrite: bool = False,
51
) -> None:
52
    """Rechunks ERA5 dataset for better loading/reading performance.
53

54
    Warnings
55
    --------
56
    Globbing assumes that target datasets to be rechunked have been saved in NetCDF format.
57
    File naming requires the following order of facets: `{variable}_{time_step}_{institute}_{project}_reanalysis_*.nc`.
58

59
    Parameters
60
    ----------
61
    project : {"era5", "era5-land", "era5-single-levels"}
62
      Supported reanalysis projects.
63
    input_folder : str or os.PathLike
64
      Folder to be examined. Performs globbing.
65
    output_folder : str or os.PathLike
66
      Target folder.
67
    time_step : {"1hr", "day"}, optional
68
      Time step of the input data. Parsed from filename if not set.
69
    target_chunks : dict
70
      Must include "time", optionally "latitude" and "longitude"
71
    variables : Sequence[str], optional
72
      If no variables set, will attempt to process all variables supported based on project name.
73
    output_format : {"netcdf", "zarr"}
74
      Default: "zarr".
75
    overwrite : bool
76
      Will overwrite files. For zarr, existing folders will be removed before writing.
77

78
    Returns
79
    -------
80
    None
81
    """
82
    input_folder = Path(input_folder)
×
83
    output_folder = Path(output_folder)
×
84

85
    if time_step is None:
×
86
        test_file = next(input_folder.glob("*"))
×
87
        file_parts = str(test_file.name).split("_")
×
88
        time_step = file_parts[1]
×
89
        if time_step not in ["1hr", "day"]:
×
90
            raise NotImplementedError()
×
91

92
    if project.startswith("era5") and variables is None:
×
93
        variables = era5_variables.copy()
×
94

95
    errored = list()
×
96
    start_all = time.perf_counter()
×
97
    for variable in variables:
×
98
        try:
×
99
            next(input_folder.glob(f"{variable}*"))
×
100
        except StopIteration:
×
101
            logging.warning(f"No files found for {variable}. Continuing...")
×
102
            continue
×
103

104
        # STEP 1 : Rewrite all years chunked on spatial dimensions, but not along the time dimension.
105
        start_var = time.perf_counter()
×
106

107
        for file in sorted(
×
108
            input_folder.glob(
109
                f"{variable}_{time_step}_{reanalysis_project_institutes[project]}_{project}_reanalysis_*.nc"
110
            )
111
        ):
112
            start = time.perf_counter()
×
113

114
            if output_format == "netcdf":
×
115
                output_folder.mkdir(exist_ok=True)
×
116
                out = output_folder / f"{file.stem}.nc"
×
117
            elif output_format == "zarr":
×
118
                output_path = output_folder / "temp"
×
119
                output_path.mkdir(exist_ok=True, parents=True)
×
120
                out = output_path / f"{file.stem}.zarr"
×
121
            else:
122
                raise NotImplementedError()
×
123

124
            if (out.is_dir() or out.is_file()) and not overwrite:
×
125
                logging.info(f"Already completed: {file.name}")
×
126
                continue
×
127
            if out.is_dir() and output_format == "zarr":
×
128
                logging.warning(f"Removing existing zarr files for {out.name}.")
×
129
                shutil.rmtree(out)
×
130

NEW
131
            ds = xr.open_dataset(file, chunks={"time": -1})
×
132

133
            if target_chunks is None:
×
134
                target_chunks = _rechunk_configurator(project, time_step)
×
135

136
            # Set correct chunks in encoding options
137
            encoding = dict()
×
138
            try:
×
139
                for name, da in ds.data_vars.items():
×
140
                    chunks = list()
×
141
                    for dim in da.dims:
×
142
                        if dim in target_chunks.keys():
×
143
                            chunks.append(target_chunks[str(dim)])
×
144
                        else:
145
                            chunks.append(len(da[dim]))
×
146

147
                    if output_format == "netcdf":
×
148
                        encoding[name] = {
×
149
                            "chunksizes": chunks,
150
                            "zlib": True,
151
                        }
152
                    elif output_format == "zarr":
×
153
                        encoding[name] = {
×
154
                            "chunks": chunks,
155
                            "compressor": zarr.Blosc(),
156
                        }
157
            except KeyError:
×
158
                logging.warning(f"{file} has chunking errors. Verify data manually.")
×
159
                errored.append(file)
×
160
                continue
×
161

162
            # Write out rechunked files at the same output frequencies as they were read.
163
            getattr(ds, f"to_{output_format}")(out, encoding=encoding)
×
164
            logging.info(f"Done for {file.stem} in {time.perf_counter() - start:.2f} s")
×
165

166
        if output_format == "netcdf":
×
167
            logging.info(
×
168
                f"{variable} rechunked in {(time.perf_counter() - start_all) / 3600:.2f} h"
169
            )
170
            continue
×
171

172
        logging.info(
×
173
            f"First step done for {variable} in {(time.perf_counter() - start_all) / 3600:.2f} h"
174
        )
175

176
        # STEP 2 : Merge all years, chunking along the time dimension.
177
        start = time.perf_counter()
×
178

179
        files = sorted(
×
180
            (output_folder / "temp").glob(
181
                f"{variable}_{time_step}_{reanalysis_project_institutes[project]}_{project}_reanalysis_*.zarr"
182
            )
183
        )
184

185
        ds = xr.open_mfdataset(files, parallel=True, engine="zarr")
×
186

187
        if time_step == "1hr":
×
188
            # Four months of hours
189
            ds = ds.chunk(dict(time=2922))
×
190
        elif time_step == "day":
×
191
            # Five years of days
192
            ds = ds.chunk(dict(time=1825))
×
193
        else:
194
            raise NotImplementedError()
×
195

196
        for var in ds.data_vars.values():
×
197
            del var.encoding["chunks"]
×
198

199
        merged_zarr = Path(
×
200
            output_folder
201
            / f"{variable}_{time_step}_{reanalysis_project_institutes[project]}_{project}_reanalysis.zarr"
202
        )
203
        try:
×
204
            ds.to_zarr(merged_zarr, mode="w" if overwrite else "w-")
×
205
        except zarr.errors.ContainsGroupError:
×
206
            logging.error(
×
207
                f"Files exist for variable {variable}. Consider using `overwrite=True`"
208
            )
209
            raise
×
210

211
        logging.info(
×
212
            f"Second step done for {variable} in {time.perf_counter() - start:.2f} s"
213
        )
214
        logging.info(
×
215
            f"Both steps done for {variable} in {time.perf_counter() - start_var:.2f} s"
216
        )
217

218
    logging.info(
×
219
        f"All variables in {input_folder} done in {time.perf_counter() - start_all}"
220
    )
221
    if errored:
×
222
        errored_filenames = "\n".join(map(str, errored))
×
223
        logging.warning(f"Errored files are as follows: \n{errored_filenames}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc