• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2272299416

pending completion
2272299416

Pull #33

github

GitHub
Merge 648cff5be into dad775e9d
Pull Request #33: Support CORDEX and CMIP5/6

34 of 260 new or added lines in 16 files covered. (13.08%)

10 existing lines in 7 files now uncovered.

661 of 3276 relevant lines covered (20.18%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.63
/miranda/ecmwf/_era5.py
1
import functools
3✔
2
import logging
3✔
3
import logging.config
3✔
4
import multiprocessing
3✔
5
import os
3✔
6
import re
3✔
7
import shutil
3✔
8
from datetime import datetime as dt
3✔
9
from pathlib import Path
3✔
10
from typing import List, Mapping, Optional, Tuple, Union
3✔
11

12
import xarray as xr
3✔
13

14
from miranda.gis.subset import subsetting_domains
3✔
15
from miranda.scripting import LOGGING_CONFIG
3✔
16
from miranda.units import get_time_frequency
3✔
17

18
logging.config.dictConfig(LOGGING_CONFIG)
3✔
19

20

21
__all__ = ["request_era5", "rename_era5_files", "ERA5_PROJECT_NAMES"]
3✔
22

23

24
ERA5_PROJECT_NAMES = [
3✔
25
    "era5-land",
26
    "era5-land-monthly-means",
27
    "era5-pressure-levels",
28
    "era5-pressure-levels-preliminary-back-extension",
29
    "era5-single-levels",
30
    "era5-single-levels-preliminary-back-extension",
31
]
32

33

34
def request_era5(
3✔
35
    projects: List[str],
36
    *,
37
    variables: Optional[Mapping[str, str]] = None,
38
    domain: str = "AMNO",
39
    pressure_levels: Optional[List[int]] = None,
40
    output_folder: Optional[Union[str, os.PathLike]] = None,
41
    year_start: Optional[Union[str, int]] = None,
42
    year_end: Optional[Union[str, int]] = None,
43
    processes: int = 10,
44
) -> None:
45
    """Request ERA5/ERA5-Land from Copernicus Data Store in NetCDF4 format.
46

47
    Parameters
48
    ----------
49
    variables: Mapping[str, str]
50
    projects : List[{"era5-land", "era5-single-levels", "era5-single-levels-preliminary-back-extension", "era5-pressure-levels",  "era5-pressure-levels-preliminary-back-extension"}]
51
    domain : {"GLOBAL", "AMNO", "NAM", "CAN", "QC", "MTL"}
52
    pressure_levels: List[int], optional
53
    output_folder : str or os.PathLike, optional
54
    year_start : int, optional
55
    year_end : int, optional
56
    processes : int
57

58
    Returns
59
    -------
60
    None
61
    """
62
    # Variables of interest
63
    variable_reference = dict()
×
64
    variable_reference["era5-land"] = dict(
×
65
        tp="total_precipitation",
66
        v10="10m_v_component_of_wind",
67
        u10="10m_u_component_of_wind",
68
        d2m="2m_dewpoint_temperature",
69
        t2m="2m_temperature",
70
        pev="potential_evaporation",
71
        rsn="snow_density",
72
        sde="snow_depth",
73
        sd="snow_depth_water_equivalent",
74
        sf="snowfall",
75
        sp="surface_pressure",
76
        sshf="surface_sensible_heat_flux",
77
        slhf="surface_latent_heat_flux",
78
        ssrd="surface_solar_radiation_downwards",
79
        strd="surface_thermal_radiation_downwards",
80
        swlv1="volumetric_soil_water_layer_1",
81
        swlv2="volumetric_soil_water_layer_2",
82
        swlv3="volumetric_soil_water_layer_3",
83
        swlv4="volumetric_soil_water_layer_4",
84
    )
85
    variable_reference[
×
86
        "era5-single-levels", "era5-single-levels-preliminary-back-extension"
87
    ] = dict(
88
        tp="total_precipitation",
89
        v10="10m_v_component_of_wind",
90
        u10="10m_u_component_of_wind",
91
        d2m="2m_dewpoint_temperature",
92
        t2m="2m_temperature",
93
        pev="potential evaporation",
94
        # sde= Not available for era5
95
        rsn="snow_density",
96
        sd="snow_depth",  # note difference in name vs era5-land cf_variable == snw
97
        sf="snowfall",
98
        sp="surface_pressure",
99
        sshf="surface_sensible_heat_flux",
100
        slhf="surface_latent_heat_flux",
101
        ssrd="surface_solar_radiation_downwards",
102
        strd="surface_thermal_radiation_downwards",
103
        swlv1="volumetric_soil_water_layer_1",
104
        swlv2="volumetric_soil_water_layer_2",
105
        swlv3="volumetric_soil_water_layer_3",
106
        swlv4="volumetric_soil_water_layer_4",
107
    )
108
    variable_reference[
×
109
        "era5-pressure-levels", "era5-pressure-levels-preliminary-back-extension"
110
    ] = dict(z="geopotential")
111

112
    if output_folder is None:
×
113
        target = Path().cwd().joinpath("downloaded")
×
114
    else:
115
        target = output_folder
×
116
    Path(target).mkdir(exist_ok=True)
×
117
    os.chdir(target)
×
118

NEW
119
    project_names = dict()
×
NEW
120
    for project in projects:
×
NEW
121
        project_names[project] = f"reanalysis-{project}"
×
122

123
    for project_name, request_code in project_names.items():
×
NEW
124
        if year_start is None:
×
NEW
125
            if "back-extension" in project_name or project_name == "era5-land":
×
NEW
126
                project_year_start = 1950
×
127
            else:
NEW
128
                project_year_start = 1979
×
129
        else:
NEW
130
            project_year_start = year_start
×
131

NEW
132
        if year_end is None:
×
NEW
133
            if "back-extension" in project_name:
×
NEW
134
                project_year_end = 1978
×
135
            else:
NEW
136
                project_year_end = dt.today().year
×
137
        else:
NEW
138
            project_year_end = year_end
×
139

NEW
140
        years = range(int(project_year_start), int(project_year_end) + 1)
×
141

NEW
142
        months = [str(d).zfill(2) for d in range(1, 13)]
×
NEW
143
        yearmonth = list()
×
NEW
144
        for y in years:
×
NEW
145
            for m in months:
×
NEW
146
                yearmonth.append((y, m))
×
147

148
        product = request_code.split("-")[0]
×
149
        v_requested = dict()
×
150
        variable_reference = next(
×
151
            var_list for k, var_list in variable_reference.items() if project_name in k
152
        )
153
        if variables:
×
154
            for v in variables:
×
155
                if v in variable_reference[project_name]:
×
156
                    v_requested[v] = variable_reference[project_name][v]
×
157
        else:
158
            v_requested = variable_reference[project_name]
×
159

160
        if "pressure-levels" in project_name:
×
161
            pressure_levels_requested = [str(i) for i in pressure_levels]
×
162
        else:
163
            pressure_levels_requested = None
×
164

165
        proc = multiprocessing.Pool(processes=processes)
×
166
        func = functools.partial(
×
167
            _request_direct_era,
168
            v_requested,
169
            request_code,
170
            domain,
171
            pressure_levels_requested,
172
            product,
173
        )
174

175
        logging.info([func, dt.now().strftime("%Y-%m-%d %X")])
×
176

177
        proc.map(func, yearmonth)
×
178
        proc.close()
×
179
        proc.join()
×
180

181

182
def _request_direct_era(
3✔
183
    variables: Mapping[str, str],
184
    project: str,
185
    domain: str,
186
    pressure_levels: Optional[List[str]],
187
    product: str,
188
    yearmonth: Tuple[int, str],
189
):
190
    """Launch formatted request."""
191

192
    try:
×
193
        from cdsapi import Client  # noqa
×
194
    except ModuleNotFoundError:
×
195
        raise ModuleNotFoundError(
×
196
            f"{_request_direct_era.__name__} requires additional dependencies. "
197
            "Please install them with `pip install miranda[full]`."
198
        )
199

200
    year, month = yearmonth
×
201
    days = [str(d).zfill(2) for d in range(32)]
×
202
    times = [f"{str(t).zfill(2)}:00" for t in range(24)]
×
203

204
    if domain.upper() == "AMNO":
×
205
        domain = "NAM"
×
206
    region = subsetting_domains(domain)
×
207

208
    c = Client()
×
209

210
    if "monthly-means" in project:
×
211
        raise NotImplementedError(project)
×
212
    else:
213
        timestep = "hourly"
×
214

215
    for var in variables.keys():
×
216
        if pressure_levels is None:
×
217
            netcdf_name = (
×
218
                f"{var}_{timestep}_ecmwf_{'-'.join(project.split('-')[1:])}"
219
                f"_{product}_{domain.upper()}_{year}{month}.nc"
220
            )
221
        else:
222
            plev_names = "-".join(pressure_levels)
×
223
            netcdf_name = (
×
224
                f"{var}{plev_names}_{timestep}_ecmwf_{'-'.join(project.split('-')[1:])}"
225
                f"_{product}_{domain.upper()}_{year}{month}.nc"
226
            )
227

228
        if Path(netcdf_name).exists():
×
229
            logging.info(f"Dataset {netcdf_name} already exists. Continuing...")
×
230
            continue
×
231

232
        request_kwargs = dict(
×
233
            variable=variables[var],
234
            year=year,
235
            month=month,
236
            day=days,
237
            time=times,
238
            area=region,
239
            format="netcdf",
240
        )
241

242
        if project in [
×
243
            "reanalysis-era5-single-levels",
244
            "reanalysis-era5-single-levels-preliminary-back-extension",
245
            "reanalysis-era5-pressure-levels",
246
            "reanalysis-era5-pressure-levels-preliminary-back-extension",
247
        ]:
248
            request_kwargs.update(dict(product_type=product))
×
249

250
        if pressure_levels:
×
251
            request_kwargs.update(dict(pressure_level=pressure_levels))
×
252

253
        c.retrieve(
×
254
            project,
255
            request_kwargs,
256
            netcdf_name,
257
        )
258

259

260
def rename_era5_files(path: Union[os.PathLike, str]) -> None:
3✔
261
    """Rename badly named ERA5 files.
262

263
    Notes
264
    -----
265
    Requires that the proper ERA5 project name is in the filename, separated by underscores.
266
    Assumes that the data
267

268
    Parameters
269
    ----------
270
    path: os.PathLike or str
271
      Path to a folder containing netcdf files
272

273
    Returns
274
    -------
275
    None
276

277
    """
NEW
278
    files = Path(path).glob("*.nc")
×
279
    for f in files:
×
280
        file_name = str(f.stem)
×
281

282
        ds = xr.open_dataset(f, cache=False)
×
283
        var = [d for d in ds.data_vars]
×
284
        var_name = str(var[0])
×
285

286
        try:
×
287
            x = re.search(r"\d{6}", file_name)
×
288
            date_found = x.group()
×
289
        except AttributeError:
×
290
            year = int(ds.isel(time=0).time.dt.year)
×
291
            month = int(ds.isel(time=0).time.dt.month)
×
292
            date_found = f"{year}{str(month).zfill(2)}"
×
293

NEW
294
        try:
×
NEW
295
            freq_parts = get_time_frequency(ds)
×
NEW
296
            freq = f"{freq_parts[0]}{freq_parts[1]}"
×
NEW
297
        except ValueError:
×
NEW
298
            logging.error(
×
299
                f"Unable to parse the time frequency for variable `{var_name}` "
300
                f"in file `{f.name}`. Verify data integrity before retrying."
301
            )
NEW
302
            continue
×
303

304
        names = file_name.split("_")
×
305
        projects = [name for name in names if name in ERA5_PROJECT_NAMES]
×
306
        if len(projects) == 1:
×
NEW
307
            project = projects.pop()
×
308
        elif len(projects) > 1:
×
309
            logging.warning(
×
310
                f"More than one project identified for file {f.name}. Verify file naming."
311
            )
312
            continue
×
313
        else:
NEW
314
            logging.warning("No project string found in filename.")
×
UNCOV
315
            continue
×
316

317
        product = "reanalysis"
×
318
        institute = "ecmwf"
×
319

320
        new_name_parts = [
×
321
            var_name,
322
            freq,
323
            institute,
324
            project,
325
            product,
326
            date_found,
327
        ]
328
        new_name = f"{'_'.join(new_name_parts)}.nc"
×
329
        logging.info(f"Moving {f.name} to {new_name}")
×
330

331
        shutil.move(f, Path(path).joinpath(new_name))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc