• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 1981943808

pending completion
1981943808

Pull #24

github

GitHub
Merge 7a4fc75d0 into 250bc4dd4
Pull Request #24: Add CMIP file structure - WIP

209 of 954 new or added lines in 36 files covered. (21.91%)

10 existing lines in 3 files now uncovered.

705 of 3061 relevant lines covered (23.03%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.19
/miranda/ecmwf/_era5.py
1
import functools
3✔
2
import logging
3✔
3
import logging.config
3✔
4
import multiprocessing
3✔
5
import os
3✔
6
import re
3✔
7
import shutil
3✔
8
from datetime import date
3✔
9
from datetime import datetime as dt
3✔
10
from pathlib import Path
3✔
11
from typing import List, Mapping, Optional, Tuple, Union
3✔
12

13
import xarray as xr
3✔
14
from cdsapi import Client
3✔
15

16
from miranda.gis.subset import subsetting_domains
3✔
17
from miranda.scripting import LOGGING_CONFIG
3✔
18

19
logging.config.dictConfig(LOGGING_CONFIG)
3✔
20

21
__all__ = ["request_era5", "rename_era5_files", "ERA5_PROJECT_NAMES"]
3✔
22

23

24
ERA5_PROJECT_NAMES = [
3✔
25
    "era5-land",
26
    "era5-land-monthly-means",
27
    "era5-pressure-levels",
28
    "era5-pressure-levels-preliminary-back-extension",
29
    "era5-single-levels",
30
    "era5-single-levels-preliminary-back-extension",
31
]
32

33

34
def request_era5(
3✔
35
    variables: Optional[Mapping[str, str]],
36
    projects: List[str],
37
    *,
38
    domain: str = "AMNO",
39
    output_folder: Optional[Union[str, os.PathLike]] = None,
40
    year_start: Union[str, int] = 1950,
41
    year_end: Optional[Union[str, int]] = None,
42
    processes: int = 10,
43
) -> None:
44
    """Request ERA5/ERA5-Land from Copernicus Data Store in NetCDF4 format.
45

46
    Parameters
47
    ----------
48
    variables: Mapping[str, str]
49
    projects : List[{"era5", "era5-land"}]
50
    domain : {"GLOBAL", "AMNO", "NAM", "CAN", "QC", "MTL"}
51
    output_folder : str or os.PathLike, optional
52
    year_start : int
53
    year_end : int, optional
54
    processes : int
55

56
    Returns
57
    -------
58
    None
59
    """
60
    # Variables of interest
61
    variable_reference = dict()
×
62
    variable_reference["era5-land"] = dict(
×
63
        tp="total_precipitation",
64
        v10="10m_v_component_of_wind",
65
        u10="10m_u_component_of_wind",
66
        d2m="2m_dewpoint_temperature",
67
        t2m="2m_temperature",
68
        pev="potential evaporation",
69
        sde="snow_depth",
70
        sd="snow_depth_water_equivalent",
71
        sf="snowfall",
72
    )
73
    variable_reference["era5"] = dict(
×
74
        tp="total_precipitation",
75
        v10="10m_v_component_of_wind",
76
        u10="10m_u_component_of_wind",
77
        d2m="2m_dewpoint_temperature",
78
        t2m="2m_temperature",
79
        pev="potential evaporation",
80
        # sde= Not available for era5
81
        sd="snow_depth",  # note difference in name vs era5-land cf_variable == snw
82
        sf="snowfall",
83
    )
84

85
    if year_end is None:
×
86
        year_end = date.today().year
×
87
    years = range(int(year_start), int(year_end) + 1)
×
88

89
    months = [str(d).zfill(2) for d in range(1, 13)]
×
90
    yearmonth = list()
×
91
    for y in years:
×
92
        for m in months:
×
93
            yearmonth.append((y, m))
×
94

95
    project_names = dict()
×
96
    if "era5" in projects:
×
97
        project_names["era5"] = "reanalysis-era5-single-levels"
×
98
        # project_names.append("reanalysis-era5-single-levels")
99
    if "era5-land" in projects:
×
100
        project_names["era5-land"] = "reanalysis-era5-land"
×
101
    # product = project_names[0].split("-")[0]
102

103
    if output_folder is None:
×
104
        target = Path().cwd().joinpath("downloaded")
×
105
    else:
106
        target = output_folder
×
107
    Path(target).mkdir(exist_ok=True)
×
108
    os.chdir(target)
×
109

110
    for key, p in project_names.items():
×
111
        product = p.split("-")[0]
×
112
        v_requested = dict()
×
113
        if variables:
×
114
            for v in variables:
×
115
                if v in variable_reference[key]:
×
116
                    v_requested[v] = variable_reference[key][v]
×
117
        else:
118
            v_requested = variable_reference[key]
×
119
        proc = multiprocessing.Pool(processes=processes)
×
120
        func = functools.partial(_request_direct_era, v_requested, p, domain, product)
×
121

122
        logging.info([func, dt.now().strftime("%Y-%m-%d %X")])
×
123

124
        proc.map(func, yearmonth)
×
125
        proc.close()
×
126
        proc.join()
×
127

128

129
def _request_direct_era(
3✔
130
    variables: Mapping[str, str],
131
    project: str,
132
    domain: str,
133
    product: str,
134
    yearmonth: Tuple[int, str],
135
):
136
    """Launch formatted request."""
137
    year, month = yearmonth
×
138
    days = [str(d).zfill(2) for d in range(32)]
×
NEW
139
    times = [f"{str(t).zfill(2)}:00" for t in range(24)]
×
140

141
    if domain.upper() == "AMNO":
×
142
        domain = "NAM"
×
143
    region = subsetting_domains(domain)
×
144

145
    c = Client()
×
146

147
    if project in ["reanalysis-era5-single-levels", "reanalysis-era5-land"]:
×
148
        timestep = "hourly"
×
149
    else:
150
        raise NotImplementedError(project)
×
151

152
    for var in variables.keys():
×
153
        netcdf_name = (
×
154
            f"{var}_{timestep}_ecmwf_{'-'.join(project.split('-')[1:])}"
155
            f"_{product}_{domain.upper()}_{year}{month}.nc"
156
        )
157

158
        if Path(netcdf_name).exists():
×
159
            logging.info("Dataset %s already exists. Continuing..." % netcdf_name)
×
160
            continue
×
161

162
        request_kwargs = dict(
×
163
            variable=variables[var],
164
            year=year,
165
            month=month,
166
            day=days,
167
            time=times,
168
            area=region,
169
            format="netcdf",
170
        )
171

172
        if project == "reanalysis-era5-single-levels":
×
173
            request_kwargs.update(dict(product_type=product))
×
174

175
        c.retrieve(
×
176
            project,
177
            request_kwargs,
178
            netcdf_name,
179
        )
180

181

182
def rename_era5_files(path: Union[os.PathLike, str]) -> None:
3✔
NEW
183
    files = [f for f in Path(path).glob("*.nc")]
×
NEW
184
    for f in files:
×
NEW
185
        file_name = str(f.stem)
×
186

NEW
187
        ds = xr.open_dataset(f, cache=False)
×
NEW
188
        var = [d for d in ds.data_vars]
×
NEW
189
        var_name = str(var[0])
×
190

NEW
191
        try:
×
NEW
192
            x = re.search(r"\d{6}", file_name)
×
NEW
193
            date_found = x.group()
×
NEW
194
        except AttributeError:
×
NEW
195
            year = int(ds.isel(time=0).time.dt.year)
×
NEW
196
            month = int(ds.isel(time=0).time.dt.month)
×
NEW
197
            date_found = f"{year}{str(month).zfill(2)}"
×
198

NEW
199
        names = file_name.split("_")
×
NEW
200
        projects = [name for name in names if name in ERA5_PROJECT_NAMES]
×
NEW
201
        if len(projects) == 1:
×
NEW
202
            project = projects[0]
×
NEW
203
        elif len(projects) > 1:
×
NEW
204
            logging.warning(
×
205
                f"More than one project identified for file {f.name}. Verify file naming."
206
            )
NEW
207
            continue
×
208
        else:
NEW
209
            continue
×
210

NEW
211
        product = "reanalysis"
×
NEW
212
        freq = "1hr"
×
NEW
213
        domain = "NAM"
×
NEW
214
        institute = "ecmwf"
×
215

NEW
216
        new_name_parts = [
×
217
            var_name,
218
            freq,
219
            institute,
220
            project,
221
            product,
222
            domain,
223
            date_found,
224
        ]
NEW
225
        new_name = f"{'_'.join(new_name_parts)}.nc"
×
NEW
226
        logging.info(f"Moving {f.name} to {new_name}")
×
227

NEW
228
        shutil.move(f, Path(path).joinpath(new_name))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc