• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2229447214

pending completion
2229447214

Pull #33

github

GitHub
Merge 4abe70ba2 into d076d8475
Pull Request #33: Support CORDEX and CMIP5/6

32 of 165 new or added lines in 15 files covered. (19.39%)

6 existing lines in 5 files now uncovered.

659 of 3227 relevant lines covered (20.42%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.39
/miranda/ecmwf/_era5.py
1
import functools
3✔
2
import logging
3✔
3
import logging.config
3✔
4
import multiprocessing
3✔
5
import os
3✔
6
import re
3✔
7
import shutil
3✔
8
from datetime import datetime as dt
3✔
9
from pathlib import Path
3✔
10
from typing import List, Mapping, Optional, Tuple, Union
3✔
11

12
import xarray as xr
3✔
13

14
from miranda.gis.subset import subsetting_domains
3✔
15
from miranda.scripting import LOGGING_CONFIG
3✔
16
from miranda.units import get_time_frequency
3✔
17

18
logging.config.dictConfig(LOGGING_CONFIG)
3✔
19

20

21
__all__ = ["request_era5", "rename_era5_files", "ERA5_PROJECT_NAMES"]
3✔
22

23

24
ERA5_PROJECT_NAMES = [
3✔
25
    "era5-land",
26
    "era5-land-monthly-means",
27
    "era5-pressure-levels",
28
    "era5-pressure-levels-preliminary-back-extension",
29
    "era5-single-levels",
30
    "era5-single-levels-preliminary-back-extension",
31
]
32

33

34
def request_era5(
3✔
35
    variables: Optional[Mapping[str, str]],
36
    projects: List[str],
37
    *,
38
    domain: str = "AMNO",
39
    output_folder: Optional[Union[str, os.PathLike]] = None,
40
    year_start: Union[str, int] = 1950,
41
    year_end: Optional[Union[str, int]] = None,
42
    processes: int = 10,
43
) -> None:
44
    """Request ERA5/ERA5-Land from Copernicus Data Store in NetCDF4 format.
45

46
    Parameters
47
    ----------
48
    variables: Mapping[str, str]
49
    projects : List[{"era5", "era5-land", "era5-single-levels"}]
50
    domain : {"GLOBAL", "AMNO", "NAM", "CAN", "QC", "MTL"}
51
    output_folder : str or os.PathLike, optional
52
    year_start : int
53
    year_end : int, optional
54
    processes : int
55

56
    Returns
57
    -------
58
    None
59
    """
60
    # Variables of interest
61
    variable_reference = dict()
×
62
    variable_reference["era5-land"] = dict(
×
63
        tp="total_precipitation",
64
        v10="10m_v_component_of_wind",
65
        u10="10m_u_component_of_wind",
66
        d2m="2m_dewpoint_temperature",
67
        t2m="2m_temperature",
68
        pev="potential_evaporation",
69
        rsn="snow_density",
70
        sde="snow_depth",
71
        sd="snow_depth_water_equivalent",
72
        sf="snowfall",
73
        sp="surface_pressure",
74
    )
75
    variable_reference[
×
76
        "era5", "era-single-levels", "era5-single-levels-preliminary-back-extension"
77
    ] = dict(
78
        tp="total_precipitation",
79
        v10="10m_v_component_of_wind",
80
        u10="10m_u_component_of_wind",
81
        d2m="2m_dewpoint_temperature",
82
        t2m="2m_temperature",
83
        pev="potential evaporation",
84
        # sde= Not available for era5
85
        rsn="snow_density",
86
        sd="snow_depth",  # note difference in name vs era5-land cf_variable == snw
87
        sf="snowfall",
88
        sp="surface_pressure",
89
    )
90

91
    if year_end is None:
×
NEW
92
        year_end = dt.today().year
×
93
    years = range(int(year_start), int(year_end) + 1)
×
94

95
    months = [str(d).zfill(2) for d in range(1, 13)]
×
96
    yearmonth = list()
×
97
    for y in years:
×
98
        for m in months:
×
99
            yearmonth.append((y, m))
×
100

101
    project_names = dict()
×
102
    if "era5" in projects or "era5-single-levels" in projects:
×
103
        project_names["era5-single-levels"] = "reanalysis-era5-single-levels"
×
104
    if "era5-land" in projects:
×
105
        project_names["era5-land"] = "reanalysis-era5-land"
×
106
    if "era5-single-levels-preliminary-back-extension" in projects:
×
107
        project_names[
×
108
            "era5-single-levels-preliminary-back-extension"
109
        ] = "reanalysis-era5-single-levels-preliminary-back-extension"
110

111
    if output_folder is None:
×
112
        target = Path().cwd().joinpath("downloaded")
×
113
    else:
114
        target = output_folder
×
115
    Path(target).mkdir(exist_ok=True)
×
116
    os.chdir(target)
×
117

118
    for key, p in project_names.items():
×
119
        product = p.split("-")[0]
×
120
        v_requested = dict()
×
121
        variable_reference = next(
×
122
            var_list for k, var_list in variable_reference.items() if p in k
123
        )
124
        if variables:
×
125
            for v in variables:
×
126
                if v in variable_reference[key]:
×
127
                    v_requested[v] = variable_reference[key][v]
×
128
        else:
129
            v_requested = variable_reference[key]
×
130
        proc = multiprocessing.Pool(processes=processes)
×
131
        func = functools.partial(_request_direct_era, v_requested, p, domain, product)
×
132

133
        logging.info([func, dt.now().strftime("%Y-%m-%d %X")])
×
134

135
        proc.map(func, yearmonth)
×
136
        proc.close()
×
137
        proc.join()
×
138

139

140
def _request_direct_era(
3✔
141
    variables: Mapping[str, str],
142
    project: str,
143
    domain: str,
144
    product: str,
145
    yearmonth: Tuple[int, str],
146
):
147
    """Launch formatted request."""
148

149
    try:
×
150
        from cdsapi import Client  # noqa
×
151
    except ModuleNotFoundError:
×
152
        raise ModuleNotFoundError(
×
153
            f"{_request_direct_era.__name__} requires additional dependencies. "
154
            "Please install them with `pip install miranda[full]`."
155
        )
156

157
    year, month = yearmonth
×
158
    days = [str(d).zfill(2) for d in range(32)]
×
159
    times = [f"{str(t).zfill(2)}:00" for t in range(24)]
×
160

161
    if domain.upper() == "AMNO":
×
162
        domain = "NAM"
×
163
    region = subsetting_domains(domain)
×
164

165
    c = Client()
×
166

167
    if project in ["reanalysis-era5-single-levels", "reanalysis-era5-land"]:
×
168
        timestep = "hourly"
×
169
    else:
170
        raise NotImplementedError(project)
×
171

172
    for var in variables.keys():
×
173
        netcdf_name = (
×
174
            f"{var}_{timestep}_ecmwf_{'-'.join(project.split('-')[1:])}"
175
            f"_{product}_{domain.upper()}_{year}{month}.nc"
176
        )
177

178
        if Path(netcdf_name).exists():
×
179
            logging.info("Dataset %s already exists. Continuing..." % netcdf_name)
×
180
            continue
×
181

182
        request_kwargs = dict(
×
183
            variable=variables[var],
184
            year=year,
185
            month=month,
186
            day=days,
187
            time=times,
188
            area=region,
189
            format="netcdf",
190
        )
191

192
        if project == "reanalysis-era5-single-levels":
×
193
            request_kwargs.update(dict(product_type=product))
×
194

195
        c.retrieve(
×
196
            project,
197
            request_kwargs,
198
            netcdf_name,
199
        )
200

201

202
def rename_era5_files(path: Union[os.PathLike, str]) -> None:
3✔
203
    """Rename badly named ERA5 files.
204

205
    Notes
206
    -----
207
    Requires that the proper ERA5 project name is in the filename, separated by underscores.
208
    Assumes that the data
209

210
    Parameters
211
    ----------
212
    path: os.PathLike or str
213
      Path to a folder containing netcdf files
214

215
    Returns
216
    -------
217
    None
218

219
    """
NEW
220
    files = Path(path).glob("*.nc")
×
221
    for f in files:
×
222
        file_name = str(f.stem)
×
223

224
        ds = xr.open_dataset(f, cache=False)
×
225
        var = [d for d in ds.data_vars]
×
226
        var_name = str(var[0])
×
227

228
        try:
×
229
            x = re.search(r"\d{6}", file_name)
×
230
            date_found = x.group()
×
231
        except AttributeError:
×
232
            year = int(ds.isel(time=0).time.dt.year)
×
233
            month = int(ds.isel(time=0).time.dt.month)
×
234
            date_found = f"{year}{str(month).zfill(2)}"
×
235

NEW
236
        try:
×
NEW
237
            freq_parts = get_time_frequency(ds)
×
NEW
238
            freq = f"{freq_parts[0]}{freq_parts[1]}"
×
NEW
239
        except ValueError:
×
NEW
240
            logging.error(
×
241
                f"Unable to parse the time frequency for variable `{var_name}` "
242
                f"in file `{f.name}`. Verify data integrity before retrying."
243
            )
NEW
244
            continue
×
245

246
        names = file_name.split("_")
×
247
        projects = [name for name in names if name in ERA5_PROJECT_NAMES]
×
248
        if len(projects) == 1:
×
NEW
249
            project = projects.pop()
×
250
        elif len(projects) > 1:
×
251
            logging.warning(
×
252
                f"More than one project identified for file {f.name}. Verify file naming."
253
            )
254
            continue
×
255
        else:
NEW
256
            logging.warning("No project string found in filename.")
×
UNCOV
257
            continue
×
258

259
        product = "reanalysis"
×
260
        institute = "ecmwf"
×
261

262
        new_name_parts = [
×
263
            var_name,
264
            freq,
265
            institute,
266
            project,
267
            product,
268
            date_found,
269
        ]
270
        new_name = f"{'_'.join(new_name_parts)}.nc"
×
271
        logging.info(f"Moving {f.name} to {new_name}")
×
272

273
        shutil.move(f, Path(path).joinpath(new_name))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc