• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2029752231

pending completion
2029752231

Pull #24

github

GitHub
Merge 86615c2cb into 66761433b
Pull Request #24: Add CMIP file structure, use pyessv controlled vocabularies, and major refactoring

225 of 1002 new or added lines in 35 files covered. (22.46%)

11 existing lines in 3 files now uncovered.

709 of 3095 relevant lines covered (22.91%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.59
/miranda/convert/_rechunk.py
1
import logging.config
3✔
2
import os
3✔
3
import shutil
3✔
4
import time
3✔
5
from pathlib import Path
3✔
6
from typing import Dict, Optional, Sequence, Union
3✔
7

8
import xarray as xr
3✔
9
import zarr
3✔
10

11
from miranda.scripting import LOGGING_CONFIG
3✔
12

13
from ._data import era5_variables, project_institutes
3✔
14

15
logging.config.dictConfig(LOGGING_CONFIG)
3✔
16

17

18
__all__ = ["rechunk_reanalysis"]
3✔
19

20

21
def _rechunk_configurator(project, time_step):
3✔
22
    # ~35 Mo chunks
NEW
23
    if project.lower() in ["era5-single-levels", "era5", "era5-land"]:
×
NEW
24
        if time_step == "1hr":
×
NEW
25
            target_chunks = {
×
26
                "time": 24 * 7,
27
                "latitude": 225,
28
                "longitude": 252,
29
            }
NEW
30
        elif time_step == "day":
×
NEW
31
            target_chunks = {"time": 365, "latitude": 125, "longitude": 125}
×
32
        else:
NEW
33
            raise NotImplementedError()
×
34
    else:
NEW
35
        raise NotImplementedError()
×
NEW
36
    return target_chunks
×
37

38

39
def rechunk_reanalysis(
3✔
40
    project: str,
41
    input_folder: Union[str, os.PathLike],
42
    output_folder: Union[str, os.PathLike],
43
    time_step: Optional[str] = None,
44
    target_chunks: Optional[Dict[str, int]] = None,
45
    variables: Optional[Sequence[str]] = None,
46
    output_format: str = "zarr",
47
    overwrite: bool = False,
48
):
49
    """Rechunks ERA5 dataset for better loading/reading performance.
50

51
    Parameters
52
    ----------
53
    project : {"era5", "era5-land", "era5-single-levels"}
54
    input_folder : str or os.PathLike
55
    output_folder : str or os.PathLike
56
    time_step : {"1hr", "day"}, optional
57
      Time step of the input data. Parsed from filename if not set.
58
    target_chunks : dict
59
      Must include "time", optionally "latitude" and "longitude"
60
    variables : Sequence[str]
61
    output_format : {"netcdf", "zarr"}
62
      Default: "zarr".
63
    overwrite : bool
64

65
    Returns
66
    -------
67
    None
68
    """
69
    input_folder = Path(input_folder)
×
70
    output_folder = Path(output_folder)
×
71

72
    if time_step is None:
×
73
        test_file = next(input_folder.glob("*"))
×
74
        file_parts = str(test_file.name).split("_")
×
75
        time_step = file_parts[1]
×
76
        if time_step not in ["1hr", "day"]:
×
77
            raise NotImplementedError()
×
78

79
    if project.startswith("era5") and variables is None:
×
NEW
80
        variables = era5_variables.copy()
×
81

82
    errored = list()
×
83
    start_all = time.perf_counter()
×
84
    for variable in variables:
×
85
        try:
×
86
            next(input_folder.glob(f"{variable}*"))
×
87
        except StopIteration:
×
88
            logging.warning("No files found for %s. Continuing..." % variable)
×
89
            continue
×
90

91
        # STEP 1 : Rewrite all years chunked on spatial dimensions, but not along the time dimension.
92
        start_var = time.perf_counter()
×
93

94
        for file in sorted(
×
95
            input_folder.glob(
96
                f"{variable}_{time_step}_{project_institutes[project]}_{project}_reanalysis_*.nc"
97
            )
98
        ):
99
            start = time.perf_counter()
×
100

101
            if output_format == "netcdf":
×
102
                output_folder.mkdir(exist_ok=True)
×
103
                out = output_folder / f"{file.stem}.nc"
×
104
            elif output_format == "zarr":
×
105
                output_path = output_folder / "temp"
×
106
                output_path.mkdir(exist_ok=True, parents=True)
×
107
                out = output_path / f"{file.stem}.zarr"
×
108
            else:
109
                raise NotImplementedError()
×
110

111
            if (out.is_dir() or out.is_file()) and not overwrite:
×
112
                logging.info(f"Already completed: {file.name}")
×
113
                continue
×
114
            if out.is_dir() and output_format == "zarr":
×
115
                logging.warning(f"Removing existing zarr files for {out.name}.")
×
116
                shutil.rmtree(out)
×
117

118
            ds = xr.open_dataset(
×
119
                file,
120
                chunks={"time": -1},
121
            )
122

123
            if target_chunks is None:
×
NEW
124
                target_chunks = _rechunk_configurator(project, time_step)
×
125

126
            # Set correct chunks in encoding options
127
            encoding = dict()
×
128
            try:
×
129
                for name, da in ds.data_vars.items():
×
130
                    chunks = list()
×
131
                    for dim in da.dims:
×
132
                        if dim in target_chunks.keys():
×
133
                            chunks.append(target_chunks[str(dim)])
×
134
                        else:
135
                            chunks.append(len(da[dim]))
×
136

137
                    if output_format == "netcdf":
×
138
                        encoding[name] = {
×
139
                            "chunksizes": chunks,
140
                            "zlib": True,
141
                        }
142
                    elif output_format == "zarr":
×
143
                        encoding[name] = {
×
144
                            "chunks": chunks,
145
                            "compressor": zarr.Blosc(),
146
                        }
147
            except KeyError:
×
148
                logging.warning(f"{file} has chunking errors. Verify data manually.")
×
149
                errored.append(file)
×
150
                continue
×
151

152
            # Write yearly file
153
            getattr(ds, f"to_{output_format}")(out, encoding=encoding)
×
154

155
            logging.info(f"Done for {file.stem} in {time.perf_counter() - start:.2f} s")
×
156

157
        if output_format == "netcdf":
×
158
            continue
×
159

160
        logging.info(
×
161
            f"First step done for {variable} in {(time.perf_counter() - start_all) / 3600:.2f} h"
162
        )
163

164
        # STEP 2 : Merge all years, chunking along the time dimension.
165
        start = time.perf_counter()
×
166

167
        files = sorted(
×
168
            (output_folder / "temp").glob(
169
                f"{variable}_{time_step}_{project_institutes[project]}_{project}_reanalysis_*.zarr"
170
            )
171
        )
172

173
        ds = xr.open_mfdataset(files, parallel=True, engine="zarr")
×
174

175
        if time_step == "1hr":
×
176
            # Four months of hours
177
            ds = ds.chunk(dict(time=2922))
×
178
        elif time_step == "day":
×
179
            # Five years of days
180
            ds = ds.chunk(dict(time=1825))
×
181
        else:
182
            raise NotImplementedError()
×
183

184
        for var in ds.data_vars.values():
×
185
            del var.encoding["chunks"]
×
186

187
        merged_zarr = Path(
×
188
            output_folder
189
            / f"{variable}_{time_step}_{project_institutes[project]}_{project}_reanalysis.zarr"
190
        )
191
        try:
×
192
            ds.to_zarr(merged_zarr, mode="w" if overwrite else "w-")
×
193
        except zarr.errors.ContainsGroupError:
×
194
            logging.error(
×
195
                'Files exist for variable %s. Consider using "overwrite=True"'
196
                % variable
197
            )
198
            raise
×
199

200
        logging.info(
×
201
            f"Second step done for {variable} in {time.perf_counter() - start:.2f} s"
202
        )
203
        logging.info(
×
204
            f"Both steps done for {variable} in {time.perf_counter() - start_var:.2f} s"
205
        )
206

207
    logging.info(
×
208
        f"All variables in {input_folder} done in {time.perf_counter() - start_all}"
209
    )
210
    if errored:
×
211
        errored_filenames = "\n".join(map(str, errored))
×
212
        logging.warning(f"Errored files are as follows: \n{errored_filenames}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc