• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2162068203

pending completion
2162068203

Pull #24

github

GitHub
Merge ae30bbd2b into 9ac032fc5
Pull Request #24: Add CMIP file structure, use pyessv controlled vocabularies, and major refactoring

242 of 1112 new or added lines in 35 files covered. (21.76%)

12 existing lines in 4 files now uncovered.

735 of 3250 relevant lines covered (22.62%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.29
/miranda/convert/_rechunk.py
1
import logging.config
3✔
2
import os
3✔
3
import shutil
3✔
4
import time
3✔
5
from pathlib import Path
3✔
6
from typing import Dict, Optional, Sequence, Union
3✔
7

8
import xarray as xr
3✔
9
import zarr
3✔
10

11
from miranda.scripting import LOGGING_CONFIG
3✔
12

13
from . import reanalysis_project_institutes
3✔
14
from ._data_definitions import era5_variables
3✔
15

16
logging.config.dictConfig(LOGGING_CONFIG)
3✔
17

18

19
__all__ = ["rechunk_reanalysis"]
3✔
20

21

22
def _rechunk_configurator(project, time_step):
3✔
23
    # ~35 Mo chunks
NEW
24
    if project.lower() in ["era5-single-levels", "era5", "era5-land"]:
×
NEW
25
        if time_step == "1hr":
×
26
            # TODO: confirm new chunks.
NEW
27
            target_chunks = {
×
28
                "time": 24 * 7,  # "time": 24 * 30,
29
                "latitude": 225,  # "lat" : 50
30
                "longitude": 252,  # "lon" : 50
31
            }
NEW
32
        elif time_step == "day":
×
33
            # TODO: confirm new chunks: target_chunks = {"time": 365, "latitude": 50, "longitude": 50}
NEW
34
            target_chunks = {"time": 365, "latitude": 125, "longitude": 125}
×
35
        else:
NEW
36
            raise NotImplementedError()
×
37
    else:
NEW
38
        raise NotImplementedError()
×
NEW
39
    return target_chunks
×
40

41

42
def rechunk_reanalysis(
3✔
43
    project: str,
44
    input_folder: Union[str, os.PathLike],
45
    output_folder: Union[str, os.PathLike],
46
    time_step: Optional[str] = None,
47
    target_chunks: Optional[Dict[str, int]] = None,
48
    variables: Optional[Sequence[str]] = None,
49
    output_format: str = "zarr",
50
    overwrite: bool = False,
51
):
52
    """Rechunks ERA5 dataset for better loading/reading performance.
53

54
    Parameters
55
    ----------
56
    project : {"era5", "era5-land", "era5-single-levels"}
57
      Supported reanalysis projects.
58
    input_folder : str or os.PathLike
59
      Folder to be examined. Performs globbing.
60
    output_folder : str or os.PathLike
61
      Target folder.
62
    time_step : {"1hr", "day"}, optional
63
      Time step of the input data. Parsed from filename if not set.
64
    target_chunks : dict
65
      Must include "time", optionally "latitude" and "longitude"
66
    variables : Sequence[str], optional
67
      If no variables set, will attempt to process all variables supported based on project name.
68
    output_format : {"netcdf", "zarr"}
69
      Default: "zarr".
70
    overwrite : bool
71
      Will overwrite files. For zarr, existing folders will be removed before writing.
72

73
    Returns
74
    -------
75
    None
76
    """
77
    input_folder = Path(input_folder)
×
78
    output_folder = Path(output_folder)
×
79

80
    if time_step is None:
×
81
        test_file = next(input_folder.glob("*"))
×
82
        file_parts = str(test_file.name).split("_")
×
83
        time_step = file_parts[1]
×
84
        if time_step not in ["1hr", "day"]:
×
85
            raise NotImplementedError()
×
86

87
    if project.startswith("era5") and variables is None:
×
NEW
88
        variables = era5_variables.copy()
×
89

90
    errored = list()
×
91
    start_all = time.perf_counter()
×
92
    for variable in variables:
×
93
        try:
×
94
            next(input_folder.glob(f"{variable}*"))
×
95
        except StopIteration:
×
NEW
96
            logging.warning(f"No files found for {variable}. Continuing...")
×
97
            continue
×
98

99
        # STEP 1 : Rewrite all years chunked on spatial dimensions, but not along the time dimension.
100
        start_var = time.perf_counter()
×
101

102
        for file in sorted(
×
103
            input_folder.glob(
104
                f"{variable}_{time_step}_{reanalysis_project_institutes[project]}_{project}_reanalysis_*.nc"
105
            )
106
        ):
107
            start = time.perf_counter()
×
108

109
            if output_format == "netcdf":
×
110
                output_folder.mkdir(exist_ok=True)
×
111
                out = output_folder / f"{file.stem}.nc"
×
112
            elif output_format == "zarr":
×
113
                output_path = output_folder / "temp"
×
114
                output_path.mkdir(exist_ok=True, parents=True)
×
115
                out = output_path / f"{file.stem}.zarr"
×
116
            else:
117
                raise NotImplementedError()
×
118

119
            if (out.is_dir() or out.is_file()) and not overwrite:
×
120
                logging.info(f"Already completed: {file.name}")
×
121
                continue
×
122
            if out.is_dir() and output_format == "zarr":
×
123
                logging.warning(f"Removing existing zarr files for {out.name}.")
×
124
                shutil.rmtree(out)
×
125

126
            ds = xr.open_dataset(
×
127
                file,
128
                chunks={"time": -1},
129
            )
130

131
            if target_chunks is None:
×
NEW
132
                target_chunks = _rechunk_configurator(project, time_step)
×
133

134
            # Set correct chunks in encoding options
135
            encoding = dict()
×
136
            try:
×
137
                for name, da in ds.data_vars.items():
×
138
                    chunks = list()
×
139
                    for dim in da.dims:
×
140
                        if dim in target_chunks.keys():
×
141
                            chunks.append(target_chunks[str(dim)])
×
142
                        else:
143
                            chunks.append(len(da[dim]))
×
144

145
                    if output_format == "netcdf":
×
146
                        encoding[name] = {
×
147
                            "chunksizes": chunks,
148
                            "zlib": True,
149
                        }
150
                    elif output_format == "zarr":
×
151
                        encoding[name] = {
×
152
                            "chunks": chunks,
153
                            "compressor": zarr.Blosc(),
154
                        }
155
            except KeyError:
×
156
                logging.warning(f"{file} has chunking errors. Verify data manually.")
×
157
                errored.append(file)
×
158
                continue
×
159

160
            # Write out rechunked files at the same output frequencies as they were read.
161
            getattr(ds, f"to_{output_format}")(out, encoding=encoding)
×
UNCOV
162
            logging.info(f"Done for {file.stem} in {time.perf_counter() - start:.2f} s")
×
163

164
        if output_format == "netcdf":
×
NEW
165
            logging.info(
×
166
                f"{variable} rechunked in {(time.perf_counter() - start_all) / 3600:.2f} h"
167
            )
UNCOV
168
            continue
×
169

170
        logging.info(
×
171
            f"First step done for {variable} in {(time.perf_counter() - start_all) / 3600:.2f} h"
172
        )
173

174
        # STEP 2 : Merge all years, chunking along the time dimension.
175
        start = time.perf_counter()
×
176

177
        files = sorted(
×
178
            (output_folder / "temp").glob(
179
                f"{variable}_{time_step}_{reanalysis_project_institutes[project]}_{project}_reanalysis_*.zarr"
180
            )
181
        )
182

183
        ds = xr.open_mfdataset(files, parallel=True, engine="zarr")
×
184

185
        if time_step == "1hr":
×
186
            # Four months of hours
187
            ds = ds.chunk(dict(time=2922))
×
188
        elif time_step == "day":
×
189
            # Five years of days
190
            ds = ds.chunk(dict(time=1825))
×
191
        else:
192
            raise NotImplementedError()
×
193

194
        for var in ds.data_vars.values():
×
195
            del var.encoding["chunks"]
×
196

197
        merged_zarr = Path(
×
198
            output_folder
199
            / f"{variable}_{time_step}_{reanalysis_project_institutes[project]}_{project}_reanalysis.zarr"
200
        )
201
        try:
×
202
            ds.to_zarr(merged_zarr, mode="w" if overwrite else "w-")
×
203
        except zarr.errors.ContainsGroupError:
×
204
            logging.error(
×
205
                'Files exist for variable %s. Consider using "overwrite=True"'
206
                % variable
207
            )
208
            raise
×
209

210
        logging.info(
×
211
            f"Second step done for {variable} in {time.perf_counter() - start:.2f} s"
212
        )
213
        logging.info(
×
214
            f"Both steps done for {variable} in {time.perf_counter() - start_var:.2f} s"
215
        )
216

217
    logging.info(
×
218
        f"All variables in {input_folder} done in {time.perf_counter() - start_all}"
219
    )
220
    if errored:
×
221
        errored_filenames = "\n".join(map(str, errored))
×
222
        logging.warning(f"Errored files are as follows: \n{errored_filenames}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc