• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2149931730

pending completion
2149931730

Pull #24

github

GitHub
Merge 9b3c612a2 into bf78f91b7
Pull Request #24: Add CMIP file structure, use pyessv controlled vocabularies, and major refactoring

241 of 1090 new or added lines in 35 files covered. (22.11%)

13 existing lines in 4 files now uncovered.

735 of 3234 relevant lines covered (22.73%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.63
/miranda/ecmwf/_era5.py
1
import functools
3✔
2
import logging
3✔
3
import logging.config
3✔
4
import multiprocessing
3✔
5
import os
3✔
6
import re
3✔
7
import shutil
3✔
8
from datetime import date
3✔
9
from datetime import datetime as dt
3✔
10
from pathlib import Path
3✔
11
from typing import List, Mapping, Optional, Tuple, Union
3✔
12

13
import xarray as xr
3✔
14
from cdsapi import Client
3✔
15

16
from miranda.gis.subset import subsetting_domains
3✔
17
from miranda.scripting import LOGGING_CONFIG
3✔
18

19
logging.config.dictConfig(LOGGING_CONFIG)
3✔
20

21
__all__ = ["request_era5", "rename_era5_files", "ERA5_PROJECT_NAMES"]
3✔
22

23

24
ERA5_PROJECT_NAMES = [
3✔
25
    "era5-land",
26
    "era5-land-monthly-means",
27
    "era5-pressure-levels",
28
    "era5-pressure-levels-preliminary-back-extension",
29
    "era5-single-levels",
30
    "era5-single-levels-preliminary-back-extension",
31
]
32

33

34
def request_era5(
3✔
35
    variables: Optional[Mapping[str, str]],
36
    projects: List[str],
37
    *,
38
    domain: str = "AMNO",
39
    output_folder: Optional[Union[str, os.PathLike]] = None,
40
    year_start: Union[str, int] = 1950,
41
    year_end: Optional[Union[str, int]] = None,
42
    processes: int = 10,
43
) -> None:
44
    """Request ERA5/ERA5-Land from Copernicus Data Store in NetCDF4 format.
45

46
    Parameters
47
    ----------
48
    variables: Mapping[str, str]
49
    projects : List[{"era5", "era5-land", "era5-single-levels"}]
50
    domain : {"GLOBAL", "AMNO", "NAM", "CAN", "QC", "MTL"}
51
    output_folder : str or os.PathLike, optional
52
    year_start : int
53
    year_end : int, optional
54
    processes : int
55

56
    Returns
57
    -------
58
    None
59
    """
60
    # Variables of interest
61
    variable_reference = dict()
×
62
    variable_reference["era5-land"] = dict(
×
63
        tp="total_precipitation",
64
        v10="10m_v_component_of_wind",
65
        u10="10m_u_component_of_wind",
66
        d2m="2m_dewpoint_temperature",
67
        t2m="2m_temperature",
68
        pev="potential_evaporation",
69
        rsn="snow_density",
70
        sde="snow_depth",
71
        sd="snow_depth_water_equivalent",
72
        sf="snowfall",
73
    )
NEW
74
    variable_reference[
×
75
        "era5", "era-single-levels", "era5-single-levels-preliminary-back-extension"
76
    ] = dict(
77
        tp="total_precipitation",
78
        v10="10m_v_component_of_wind",
79
        u10="10m_u_component_of_wind",
80
        d2m="2m_dewpoint_temperature",
81
        t2m="2m_temperature",
82
        pev="potential evaporation",
83
        # sde= Not available for era5
84
        rsn="snow_density",
85
        sd="snow_depth",  # note difference in name vs era5-land cf_variable == snw
86
        sf="snowfall",
87
    )
88

89
    if year_end is None:
×
90
        year_end = date.today().year
×
91
    years = range(int(year_start), int(year_end) + 1)
×
92

93
    months = [str(d).zfill(2) for d in range(1, 13)]
×
94
    yearmonth = list()
×
95
    for y in years:
×
96
        for m in months:
×
97
            yearmonth.append((y, m))
×
98

99
    project_names = dict()
×
NEW
100
    if "era5" in projects or "era5-single-levels" in projects:
×
NEW
101
        project_names["era5-single-levels"] = "reanalysis-era5-single-levels"
×
102
    if "era5-land" in projects:
×
103
        project_names["era5-land"] = "reanalysis-era5-land"
×
NEW
104
    if "era5-single-levels-preliminary-back-extension" in projects:
×
NEW
105
        project_names[
×
106
            "era5-single-levels-preliminary-back-extension"
107
        ] = "reanalysis-era5-single-levels-preliminary-back-extension"
108

109
    if output_folder is None:
×
110
        target = Path().cwd().joinpath("downloaded")
×
111
    else:
112
        target = output_folder
×
113
    Path(target).mkdir(exist_ok=True)
×
114
    os.chdir(target)
×
115

116
    for key, p in project_names.items():
×
117
        product = p.split("-")[0]
×
118
        v_requested = dict()
×
NEW
119
        variable_reference = next(
×
120
            var_list for k, var_list in variable_reference.items() if p in k
121
        )
122
        if variables:
×
123
            for v in variables:
×
124
                if v in variable_reference[key]:
×
125
                    v_requested[v] = variable_reference[key][v]
×
126
        else:
127
            v_requested = variable_reference[key]
×
128
        proc = multiprocessing.Pool(processes=processes)
×
129
        func = functools.partial(_request_direct_era, v_requested, p, domain, product)
×
130

131
        logging.info([func, dt.now().strftime("%Y-%m-%d %X")])
×
132

133
        proc.map(func, yearmonth)
×
134
        proc.close()
×
135
        proc.join()
×
136

137

138
def _request_direct_era(
3✔
139
    variables: Mapping[str, str],
140
    project: str,
141
    domain: str,
142
    product: str,
143
    yearmonth: Tuple[int, str],
144
):
145
    """Launch formatted request."""
146
    year, month = yearmonth
×
147
    days = [str(d).zfill(2) for d in range(32)]
×
NEW
148
    times = [f"{str(t).zfill(2)}:00" for t in range(24)]
×
149

150
    if domain.upper() == "AMNO":
×
151
        domain = "NAM"
×
152
    region = subsetting_domains(domain)
×
153

154
    c = Client()
×
155

156
    if project in ["reanalysis-era5-single-levels", "reanalysis-era5-land"]:
×
157
        timestep = "hourly"
×
158
    else:
159
        raise NotImplementedError(project)
×
160

161
    for var in variables.keys():
×
162
        netcdf_name = (
×
163
            f"{var}_{timestep}_ecmwf_{'-'.join(project.split('-')[1:])}"
164
            f"_{product}_{domain.upper()}_{year}{month}.nc"
165
        )
166

167
        if Path(netcdf_name).exists():
×
168
            logging.info("Dataset %s already exists. Continuing..." % netcdf_name)
×
169
            continue
×
170

171
        request_kwargs = dict(
×
172
            variable=variables[var],
173
            year=year,
174
            month=month,
175
            day=days,
176
            time=times,
177
            area=region,
178
            format="netcdf",
179
        )
180

181
        if project == "reanalysis-era5-single-levels":
×
182
            request_kwargs.update(dict(product_type=product))
×
183

184
        c.retrieve(
×
185
            project,
186
            request_kwargs,
187
            netcdf_name,
188
        )
189

190

191
def rename_era5_files(path: Union[os.PathLike, str]) -> None:
3✔
NEW
192
    files = [f for f in Path(path).glob("*.nc")]
×
NEW
193
    for f in files:
×
NEW
194
        file_name = str(f.stem)
×
195

NEW
196
        ds = xr.open_dataset(f, cache=False)
×
NEW
197
        var = [d for d in ds.data_vars]
×
NEW
198
        var_name = str(var[0])
×
199

NEW
200
        try:
×
NEW
201
            x = re.search(r"\d{6}", file_name)
×
NEW
202
            date_found = x.group()
×
NEW
203
        except AttributeError:
×
NEW
204
            year = int(ds.isel(time=0).time.dt.year)
×
NEW
205
            month = int(ds.isel(time=0).time.dt.month)
×
NEW
206
            date_found = f"{year}{str(month).zfill(2)}"
×
207

NEW
208
        names = file_name.split("_")
×
NEW
209
        projects = [name for name in names if name in ERA5_PROJECT_NAMES]
×
NEW
210
        if len(projects) == 1:
×
NEW
211
            project = projects[0]
×
NEW
212
        elif len(projects) > 1:
×
NEW
213
            logging.warning(
×
214
                f"More than one project identified for file {f.name}. Verify file naming."
215
            )
NEW
216
            continue
×
217
        else:
NEW
218
            continue
×
219

NEW
220
        product = "reanalysis"
×
NEW
221
        freq = "1hr"
×
NEW
222
        domain = "NAM"
×
NEW
223
        institute = "ecmwf"
×
224

NEW
225
        new_name_parts = [
×
226
            var_name,
227
            freq,
228
            institute,
229
            project,
230
            product,
231
            domain,
232
            date_found,
233
        ]
NEW
234
        new_name = f"{'_'.join(new_name_parts)}.nc"
×
NEW
235
        logging.info(f"Moving {f.name} to {new_name}")
×
236

NEW
237
        shutil.move(f, Path(path).joinpath(new_name))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc