• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 1883563084

pending completion
1883563084

Pull #24

github

GitHub
Merge 4555deb62 into 4a2a34e15
Pull Request #24: Add CMIP file structure - WIP

41 of 361 new or added lines in 19 files covered. (11.36%)

3 existing lines in 2 files now uncovered.

627 of 2829 relevant lines covered (22.16%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.09
/miranda/eccc/_raw.py
1
######################################################################
2
# S.Biner, Ouranos, mai 2019
3
#
4
# methodologie
5
#
6
# 1) on rassemble les fichiers netcdf des differentes eccc en un seul fichier netCDF.
7
#
8
# 2) on scan les fichiers sources annuels en cherchant une variable et on sauve
9
# ce qu'on trouve dans des fichiers netcdf. On applique aussi les flags
10
# et on fait les changements d'unites
11
#
12
# obtenu via http://climate.weather.gc.ca/index_e.html en cliquant sur 'about the data'
13
#######################################################################
14
import itertools
3✔
15
import logging
3✔
16
import tempfile
3✔
17
import time
3✔
18
from calendar import monthrange
3✔
19
from datetime import datetime as dt
3✔
20
from logging import config
3✔
21
from pathlib import Path
3✔
22
from typing import List, Optional, Tuple, Union
3✔
23

24
import numpy as np
3✔
25
import pandas as pd
3✔
26
import xarray as xr
3✔
27
from dask.diagnostics import ProgressBar
3✔
28

29
from miranda.scripting import LOGGING_CONFIG
3✔
30

31
from ._utils import cf_daily_metadata, cf_hourly_metadata
3✔
32

33
config.dictConfig(LOGGING_CONFIG)
3✔
34

35
__all__ = [
3✔
36
    "aggregate_stations",
37
    "convert_hourly_flat_files",
38
    "convert_daily_flat_files",
39
    "merge_converted_variables",
40
]
41

42

43
def convert_hourly_flat_files(
3✔
44
    source_files: Union[str, Path],
45
    output_folder: Union[str, Path, List[Union[str, int]]],
46
    variables: Union[str, int, List[Union[str, int]]],
47
    missing_value: int = -9999,
48
) -> None:
49
    """
50

51
    Parameters
52
    ----------
53
    source_files : str or Path
54
    output_folder : str or Path
55
    variables : str or List[str]
56
    missing_value : int
57

58
    Returns
59
    -------
60
    None
61
    """
62
    func_time = time.time()
×
63

64
    if isinstance(variables, (str, int)):
×
65
        variables = [variables]
×
66

67
    for variable_code in variables:
×
68
        info = cf_hourly_metadata(variable_code)
×
69
        variable_code = str(variable_code).zfill(3)
×
70
        variable_name = info["standard_name"]
×
71
        variable_file_name = info["nc_name"]
×
72

73
        # Preparing the data extraction
74
        col_names = "code year month day code_var ".split()
×
75
        for i in range(1, 25):
×
NEW
76
            col_names.append(f"D{i:0n}")
×
NEW
77
            col_names.append(f"F{i:0n}")
×
78

79
        rep_nc = Path(output_folder).joinpath(variable_file_name)
×
80
        rep_nc.mkdir(parents=True, exist_ok=True)
×
81

82
        # Loop on the files
83
        list_files = list()
×
84
        if isinstance(source_files, list) or Path(source_files).is_file():
×
85
            list_files.append(source_files)
×
86
        elif 262 < int(variable_code) <= 280:
×
87
            list_files.extend(
×
88
                [f for f in Path(source_files).rglob("HLY*RCS*") if f.is_file()]
89
            )
90
        else:
91
            list_files.extend(
×
92
                [f for f in Path(source_files).rglob("HLY*") if f.is_file()]
93
            )
94

95
        errored_files = list()
×
96
        for fichier in list_files:
×
97
            logging.info(f"Processing file: {fichier}.")
×
98

99
            # Create a dataframe from the files
100
            try:
×
101
                df = pd.read_fwf(
×
102
                    fichier,
103
                    widths=[7, 4, 2, 2, 3] + [6, 1] * 24,
104
                    names=col_names,
105
                    dtype={"year": int, "month": int, "day": int, "code_var": str},
106
                )
107
            except FileNotFoundError:
×
108
                logging.error(f"File {fichier} was not found.")
×
109
                errored_files.append(fichier)
×
110
                continue
×
111

112
            except (UnicodeDecodeError, Exception):
×
113
                logging.error(
×
114
                    f"File {fichier} was unable to be read. This is probably an issue with the file."
115
                )
116
                errored_files.append(fichier)
×
117
                continue
×
118

119
            # Loop through the station codes
120
            l_codes = df["code"].unique()
×
121
            for code in l_codes:
×
122
                df_code = df[df["code"] == code]
×
123

124
                # Abort if the variable is not found
125
                if variable_code not in df_code["code_var"].unique():
×
126
                    logging.info(
×
127
                        "Variable `{}` not found for station code: {}. Continuing...".format(
128
                            variable_file_name, code
129
                        )
130
                    )
131
                    continue
×
132

133
                # Treat the data
134
                logging.info(
×
135
                    "Converting `{}` for station code: {}".format(
136
                        variable_file_name, code
137
                    )
138
                )
139

140
                # Dump the data into a DataFrame
141
                df_var = df_code[df_code["code_var"] == variable_code].copy()
×
142

143
                # Mask the data according to the missing values flag
144
                df_var = df_var.replace(missing_value, np.nan)
×
145

146
                # Decode the values and flags
NEW
147
                dfd = df_var.loc[:, [f"D{i:0n}" for i in range(1, 25)]]
×
NEW
148
                dff = df_var.loc[:, [f"F{i:0n}" for i in range(1, 25)]]
×
149

150
                # Remove the "NaN" flag
151
                dff = dff.fillna("")
×
152

153
                # Use the flag to mask the values
154
                try:
×
155
                    val = np.asfarray(dfd.values)
×
156
                except ValueError as e:
×
157
                    logging.error(f"{e} raised from {dfd}, continuing...")
×
158
                    continue
×
159
                flag = dff.values
×
160
                mask = np.isin(flag, info["missing_flags"])
×
161
                val[mask] = np.nan
×
162

163
                # Treat according to units conversions
164
                val = val * info["scale_factor"] + info["add_offset"]
×
165

166
                # Create the DataArray
167
                dates = dict(time=list())
×
168
                for index, row in df_var.iterrows():
×
169
                    for h in range(0, 24):
×
170
                        dates["time"].append(
×
171
                            dt(int(row.year), int(row.month), int(row.day), h)
172
                        )
173

174
                ds = xr.Dataset()
×
175
                da_val = xr.DataArray(val.flatten(), coords=dates, dims=["time"])
×
176
                da_val = da_val.rename(variable_file_name)
×
177
                da_val.attrs["units"] = info["nc_units"]
×
178
                da_val.attrs["id"] = code
×
179
                da_val.attrs["element_number"] = variable_code
×
180
                da_val.attrs["standard_name"] = variable_name
×
181
                da_val.attrs["long_name"] = info["long_name"]
×
182

183
                da_flag = xr.DataArray(flag.flatten(), coords=dates, dims=["time"])
×
184
                da_flag.attrs["long_name"] = "data flag"
×
185
                da_flag.attrs["note"] = "See ECCC technical documentation for details"
×
186

187
                ds[variable_file_name] = da_val
×
188
                ds["flag"] = da_flag
×
189

190
                # save the file in NetCDF format
191
                start_year = ds.time.dt.year.values[0]
×
192
                end_year = ds.time.dt.year.values[-1]
×
193

194
                station_folder = rep_nc.joinpath(str(code))
×
195
                station_folder.mkdir(parents=True, exist_ok=True)
×
196

197
                if start_year == end_year:
×
198
                    f_nc = "{c}_{vc}_{v}_{sy}.nc".format(
×
199
                        c=code, vc=variable_code, v=variable_file_name, sy=start_year
200
                    )
201
                else:
202
                    f_nc = "{c}_{vc}_{v}_{sy}_{ey}.nc".format(
×
203
                        c=code,
204
                        vc=variable_code,
205
                        v=variable_file_name,
206
                        sy=start_year,
207
                        ey=end_year,
208
                    )
209

210
                ds.attrs["Conventions"] = "CF-1.7"
×
211

212
                ds.attrs[
×
213
                    "title"
214
                ] = "Environment and Climate Change Canada (ECCC) weather eccc"
215
                ds.attrs[
×
216
                    "history"
217
                ] = "{}: Merged from multiple individual station files to n-dimensional array.".format(
218
                    dt.now().strftime("%Y-%m-%d %X")
219
                )
220
                ds.attrs["version"] = f"v{dt.now().strftime('%Y.%m')}"
×
221
                ds.attrs["institution"] = "Environment and Climate Change Canada (ECCC)"
×
222
                ds.attrs[
×
223
                    "source"
224
                ] = "Weather Station data <ec.services.climatiques-climate.services.ec@canada.ca>"
225
                ds.attrs[
×
226
                    "references"
227
                ] = "https://climate.weather.gc.ca/doc/Technical_Documentation.pdf"
228
                ds.attrs[
×
229
                    "comment"
230
                ] = "Acquired on demand from data specialists at ECCC Climate Services / Services Climatiques"
231
                ds.attrs[
×
232
                    "redistribution"
233
                ] = "Redistribution policy unknown. For internal use only."
234

235
                ds.to_netcdf(station_folder.joinpath(f_nc))
×
236

NEW
237
    logging.warning(f"Process completed in {time.time() - func_time:.2f} seconds")
×
238

239

240
def convert_daily_flat_files(
3✔
241
    source_files: Union[str, Path],
242
    output_folder: Union[str, Path],
243
    variables: Union[str, int, List[Union[str, int]]],
244
    missing_value: int = -9999,
245
) -> None:
246
    """
247

248
    Parameters
249
    ----------
250
    source_files : Union[str, Path]
251
    output_folder : Union[str, Path]
252
    variables : Union[str, int, List[Union[str, int]]
253
      Variable codes (001, 002, 103, etc.)
254
    missing_value : int
255

256
    Returns
257
    -------
258
    None
259
    """
260
    func_time = time.time()
×
261

262
    if isinstance(variables, (str, int)):
×
263
        variables = [variables]
×
264

265
    for variable_code in variables:
×
266
        info = cf_daily_metadata(variable_code)
×
267
        variable_code = str(variable_code).zfill(3)
×
268
        nc_name = info["nc_name"]
×
269

270
        # Prepare the data extraction
271
        titre_colonnes = "code year month code_var".split()
×
272
        for i in range(1, 32):
×
NEW
273
            titre_colonnes.append(f"D{i:0n}")
×
NEW
274
            titre_colonnes.append(f"F{i:0n}")
×
275

276
        # Create the output directory
277
        rep_nc = Path(output_folder).joinpath(nc_name)
×
278
        rep_nc.mkdir(parents=True, exist_ok=True)
×
279

280
        # Loop on the files
281
        list_files = list()
×
282
        if isinstance(source_files, list) or Path(source_files).is_file():
×
283
            list_files.append(source_files)
×
284
        else:
285
            list_files.extend(
×
286
                [f for f in Path(source_files).rglob("*DLY*") if f.is_file()]
287
            )
288

289
        errored_files = list()
×
290
        for fichier in list_files:
×
NEW
291
            logging.info(f"Processing file: {fichier}.")
×
292

293
            # Create a Pandas DataFrame from the files
294
            try:
×
295
                df = pd.read_fwf(
×
296
                    fichier,
297
                    widths=[7, 4, 2, 3] + [6, 1] * 31,
298
                    names=titre_colonnes,
299
                    dtype={"year": int, "month": int, "code_var": str},
300
                )
301
            except ValueError:
×
302
                logging.error(
×
303
                    "File {} was unable to be read. This is probably an issue with the file.".format(
304
                        fichier
305
                    )
306
                )
307
                errored_files.append(fichier)
×
308
                continue
×
309

310
            # Loop through the station codes
311
            l_codes = df["code"].unique()
×
312
            for code in l_codes:
×
313
                df_code = df[df["code"] == code]
×
314

315
                # Abort if the variable is not present
316
                if variable_code not in df_code["code_var"].unique():
×
317
                    logging.info(
×
318
                        "Variable `{}` not found for station `{}` in file {}. Continuing...".format(
319
                            nc_name, code, fichier
320
                        )
321
                    )
322
                    continue
×
323

324
                # Perform the data treatment
NEW
325
                logging.info(f"Converting {nc_name} for station code: {code}")
×
326

327
                # Dump the values into a DataFrame
328
                df_var = df_code[df_code["code_var"] == variable_code].copy()
×
329

330
                # Apply the mask according to the NaN value
331
                df_var = df_var.replace(missing_value, np.nan)
×
332

333
                # Decoding the values and flags
NEW
334
                dfd = df_var.loc[:, [f"D{i:0n}" for i in range(1, 32)]]
×
NEW
335
                dff = df_var.loc[:, [f"F{i:0n}" for i in range(1, 32)]]
×
336

337
                # Remove the "NaN" flag
338
                dff = dff.fillna("")
×
339

340
                try:
×
341
                    # Use the flag to mask the values
342
                    val = np.asfarray(dfd.values)
×
343
                    flag = dff.values
×
344
                    mask = np.isin(flag, info["missing_flags"])
×
345
                    val[mask] = np.nan
×
346
                except ValueError:
×
347
                    continue
×
348

349
                # Adjust units
350
                val = val * info["scale_factor"] + info["add_offset"]
×
351

352
                # Create the DataArray and concatenate values and flags based on day-length of months
353
                date_range = dict(time=list())
×
354
                value_days = list()
×
355
                flag_days = list()
×
356
                for i, (index, row) in enumerate(df_var.iterrows()):
×
357
                    period = pd.Period(year=row.year, month=row.month, freq="M")
×
358
                    dates = pd.Series(
×
359
                        pd.date_range(
360
                            start=period.start_time, end=period.end_time, freq="D"
361
                        )
362
                    )
363
                    date_range["time"].extend(dates)
×
364

365
                    value_days.extend(val[i][range(monthrange(row.year, row.month)[1])])
×
366
                    flag_days.extend(flag[i][range(monthrange(row.year, row.month)[1])])
×
367

368
                ds = xr.Dataset()
×
369
                da_val = xr.DataArray(value_days, coords=date_range, dims=["time"])
×
370
                da_val = da_val.rename(nc_name)
×
371
                da_val.attrs["units"] = info["nc_units"]
×
372
                da_val.attrs["id"] = code
×
373
                da_val.attrs["element_number"] = variable_code
×
374
                da_val.attrs["standard_name"] = info["standard_name"]
×
375
                da_val.attrs["long_name"] = info["long_name"]
×
376

377
                da_flag = xr.DataArray(flag_days, coords=date_range, dims=["time"])
×
378
                da_flag.attrs["long_name"] = "data flag"
×
379
                da_flag.attrs["note"] = "See ECCC technical documentation for details"
×
380

381
                ds[nc_name] = da_val
×
382
                ds["flag"] = da_flag
×
383

384
                # Save as a NetCDF file
385
                start_year = ds.time.dt.year.values[0]
×
386
                end_year = ds.time.dt.year.values[-1]
×
387

388
                station_folder = rep_nc.joinpath(str(code))
×
389
                station_folder.mkdir(parents=True, exist_ok=True)
×
390

391
                if start_year == end_year:
×
392
                    f_nc = "{c}_{vc}_{v}_{sy}.nc".format(
×
393
                        c=code, vc=variable_code, v=nc_name, sy=start_year
394
                    )
395
                else:
396
                    f_nc = "{c}_{vc}_{v}_{sy}_{ey}.nc".format(
×
397
                        c=code,
398
                        vc=variable_code,
399
                        v=nc_name,
400
                        sy=start_year,
401
                        ey=end_year,
402
                    )
403

404
                ds.attrs["Conventions"] = "CF-1.7"
×
405

406
                ds.attrs[
×
407
                    "title"
408
                ] = "Environment and Climate Change Canada (ECCC) weather eccc"
409
                ds.attrs[
×
410
                    "history"
411
                ] = "{}: Merged from multiple individual station files to n-dimensional array.".format(
412
                    dt.now().strftime("%Y-%m-%d %X")
413
                )
414
                ds.attrs["version"] = "v{}".format(dt.now().strftime("%Y.%m"))
×
415
                ds.attrs["institution"] = "Environment and Climate Change Canada (ECCC)"
×
416
                ds.attrs[
×
417
                    "source"
418
                ] = "Weather Station data <ec.services.climatiques-climate.services.ec@canada.ca>"
419
                ds.attrs[
×
420
                    "references"
421
                ] = "https://climate.weather.gc.ca/doc/Technical_Documentation.pdf"
422
                ds.attrs[
×
423
                    "comment"
424
                ] = "Acquired on demand from data specialists at ECCC Climate Services / Services Climatiques"
425
                ds.attrs[
×
426
                    "redistribution"
427
                ] = "Redistribution policy unknown. For internal use only."
428

429
                ds.to_netcdf(station_folder.joinpath(f_nc))
×
430

NEW
431
    logging.warning(f"Process completed in {time.time() - func_time:.2f} seconds")
×
432

433

434
def aggregate_stations(
3✔
435
    source_files: Optional[Union[str, Path]] = None,
436
    output_folder: Optional[Union[str, Path]] = None,
437
    station_metadata: Union[str, Path] = None,
438
    time_step: str = "h",
439
    variables: Optional[Union[str, int, List[Union[str, int]]]] = None,
440
    include_flags: bool = True,
441
    groups: int = 5,
442
    mf_dataset_freq: Optional[str] = None,
443
    temp_directory: Optional[Union[str, Path]] = None,
444
) -> None:
445
    """
446

447
    Parameters
448
    ----------
449
    source_files: Union[str, Path]
450
    output_folder: Union[str, Path]
451
    variables: Optional[Union[str, int, List[Union[str, int]]]]
452
    time_step: str
453
    station_metadata: Union[str, Path]
454
    include_flags: bool
455
    groups: int
456
      The number of file groupings used for converting to multi-file Datasets.
457
    mf_dataset_freq: Optional[str]
458
      Resampling frequency for creating output multi-file Datasets. E.g. 'YS': 1 year per file, '5YS': 5 years per file.
459
    temp_directory: Optional[Union[str, Path]]
460
      Use another temporary directory location in case default location is not spacious enough.
461

462
    Returns
463
    -------
464
    None
465
    """
466
    func_time = time.time()
×
467

468
    if not station_metadata:
×
469
        raise RuntimeError(
×
470
            "Download the data from ECCC's Google Drive at:\n"
471
            "https://drive.google.com/open?id=1egfzGgzUb0RFu_EE5AYFZtsyXPfZ11y2"
472
        )
473

474
    if isinstance(source_files, str):
×
475
        source_files = Path(source_files)
×
476

477
    if time_step.lower() in ["h", "hour", "hourly"]:
×
478
        hourly = True
×
479
    elif time_step.lower() in ["d", "day", "daily"]:
×
480
        hourly = False
×
481
    else:
482
        raise ValueError("Time step must be `h` / `hourly` or `d` / `daily`.")
×
483

484
    if isinstance(variables, (str, int)):
×
485
        variables = [variables]
×
486
    elif variables is None:
×
487
        if hourly:
×
488
            variables = [
×
489
                89,
490
                94,
491
                123,
492
            ]
493
            variables.extend(range(76, 81))
×
494
            variables.extend(range(262, 281))
×
495
        else:
496
            variables = [1, 2, 3]
×
497
            variables.extend(range(10, 26))
×
498

499
    for variable_code in variables:
×
500
        if hourly:
×
501
            info = cf_hourly_metadata(variable_code)
×
502
        else:
503
            info = cf_daily_metadata(variable_code)
×
504
        variable_name = info["nc_name"]
×
NEW
505
        logging.info(f"Merging `{variable_name}` using `{time_step}` time step.")
×
506

507
        # Find the ECCC stations where we have available metadata
508
        df_inv = pd.read_csv(str(station_metadata), header=3)
×
509
        station_inventory = list(df_inv["Climate ID"].values)
×
510

511
        # Only perform aggregation on available data with corresponding metadata
512
        logging.info("Performing glob and sort.")
×
513
        nclist = sorted(list(source_files.joinpath(variable_name).rglob("*.nc")))
×
514

515
        ds = None
×
516
        if nclist != list():
×
517
            nclists = np.array_split(nclist, groups)
×
518

519
            with tempfile.TemporaryDirectory(
×
520
                prefix="eccc", dir=temp_directory
521
            ) as temp_dir:
522
                combinations = [(ii, nc, temp_dir) for ii, nc in enumerate(nclists)]
×
523

524
                # TODO memory use seems ok here .. could try using Pool() to increase performance
525
                for combo in combinations:
×
526
                    ii, nc, temp_dir = combo
×
527
                    _tmp_nc(ii, nc, temp_dir, groups)
×
528

529
                ds = xr.open_mfdataset(
×
530
                    sorted(list(Path(temp_dir).glob("*.nc"))),
531
                    combine="nested",
532
                    concat_dim="station",
533
                    chunks=dict(time=365),
534
                )
535

536
                # dask gives warnings about export 'object' data types
537
                ds["station_id"] = ds["station_id"].astype(str)
×
538
        if ds:
×
539
            station_file_codes = [x.name.split("_")[0] for x in nclist]
×
540
            rejected_stations = set(station_file_codes).difference(
×
541
                set(station_inventory)
542
            )
543

544
            logging.info(f"{len(rejected_stations)} rejected due to missing metadata.")
×
545
            r_all = np.zeros(ds.station_id.shape) == 0
×
546

547
            for r in rejected_stations:
×
548
                r_all[ds.station_id == r] = False
×
549
            ds = ds.isel(station=r_all)
×
550
            if not include_flags:
×
551
                drop_vars = [vv for vv in ds.data_vars if "flag" in vv]
×
552
                ds = ds.drop_vars(drop_vars)
×
553

554
            # Ensure data is in order to add metadata
555
            ds = ds.sortby(ds.station_id)
×
556

557
            attrs1 = ds.attrs
×
558
            # filter metadata for station_ids in dataset
559
            logging.info("Writing metdata.")
×
560

561
            meta = df_inv.loc[df_inv["Climate ID"].isin(ds.station_id.values)]
×
562
            # Rearrange column order to have lon, lat, elev first
563
            cols = meta.columns.tolist()
×
564
            cols1 = [
×
565
                "Latitude (Decimal Degrees)",
566
                "Longitude (Decimal Degrees)",
567
                "Elevation (m)",
568
            ]
569
            for rr in cols1:
×
570
                cols.remove(rr)
×
571
            cols1.extend(cols)
×
572
            meta = meta[cols1]
×
573
            meta.index.rename("station", inplace=True)
×
574
            meta = meta.to_xarray()
×
575
            meta.sortby(meta["Climate ID"])
×
576
            meta = meta.assign({"station": ds.station.values})
×
577

578
            meta = meta.drop(
×
579
                ["Longitude", "Latitude"]
580
            )  # these values are projected x,y values Need to know prj to potentially rename
581
            np.testing.assert_array_equal(
×
582
                meta["Climate ID"].values, ds.station_id.values
583
            )
584
            ds = xr.merge([ds, meta])
×
585
            ds.attrs = attrs1
×
586

587
            # TODO rename Longitude / Latitude DD
588
            rename = {
×
589
                "Latitude (Decimal Degrees)": "lat",
590
                "Longitude (Decimal Degrees)": "lon",
591
            }
592
            for i in rename.items():
×
593
                ds = ds.rename({i[0]: i[1]})
×
594

595
            valid_stations = list(sorted(ds.station_id.values))
×
596
            valid_stations_count = len(valid_stations)
×
597

598
            logging.info(f"Processing stations for variable `{variable_name}`.")
×
599

600
            if len(station_file_codes) == 0:
×
601
                logging.error(
×
602
                    f"No stations were found containing variable filename `{variable_name}`. Exiting."
603
                )
604
                return
×
605

606
            logging.warning(
×
607
                "Files exist for {} ECCC stations. Metadata found for {} stations. Rejecting {} stations.".format(
608
                    len(station_file_codes),
609
                    valid_stations_count,
610
                    len(rejected_stations),
611
                )
612
            )
613
            if rejected_stations:
×
614
                logging.warning(
×
615
                    f"Rejected station codes are the following: {', '.join(rejected_stations)}."
616
                )
617

618
            logging.info("Preparing the NetCDF time period.")
×
619
            # Create the time period timestamps
620
            year_start = ds.time.dt.year.min().values
×
621
            year_end = ds.time.dt.year.max().values
×
622

623
            # Calculate the time index dimensions of the output NetCDF
624
            time_index = pd.date_range(
×
625
                start=f"{year_start}-01-01",
626
                end=f"{year_end + 1}-01-01",
627
                freq="H" if hourly else "D",
628
            )[:-1]
629

630
            logging.info(
×
631
                "Number of ECCC stations: {}, time steps: {}.".format(
632
                    valid_stations_count, time_index.size
633
                )
634
            )
635

636
            ds_out = xr.Dataset(
×
637
                coords={
638
                    "time": time_index,
639
                    "station": ds.station,
640
                    "station_id": ds.station_id,
641
                },
642
                attrs=ds.attrs,
643
            )
644

645
            for vv in ds.data_vars:
×
646
                ds_out[vv] = ds[
×
647
                    vv
648
                ]  # assign data variables to output dataset ... will align with time coords
649

650
            output_folder.mkdir(parents=True, exist_ok=True)
×
651

652
            file_out = Path(output_folder).joinpath(
×
653
                "{}_eccc_{}".format(
654
                    variable_name,
655
                    "hourly" if hourly else "daily",
656
                )
657
            )
658

659
            if mf_dataset_freq is not None:
×
660
                _, datasets = zip(
×
661
                    *ds_out.resample(time=mf_dataset_freq)
662
                )  # output mf_dataset using resampling frequency
663
            else:
664
                datasets = [ds_out]
×
665

666
            paths = [
×
667
                f"{file_out}_{dd.time.dt.year.min().values}-{dd.time.dt.year.max().values}_"
668
                f'created{dt.now().strftime("%Y%m%d")}.nc'
669
                for dd in datasets
670
            ]
671

672
            comp = dict(zlib=True, complevel=5)
×
673

674
            with ProgressBar():
×
675
                for dataset, path in zip(datasets, paths):
×
676
                    encoding = {var: comp for var in ds_out.data_vars}
×
677
                    dataset.to_netcdf(
×
678
                        path,
679
                        engine="h5netcdf",
680
                        format="NETCDF4",
681
                        encoding=encoding,
682
                    )
683
                    dataset.close()
×
684
                    del dataset
×
685
            ds.close()
×
686
            ds_out.close()
×
687

688
        else:
689
            logging.info("No files found for variable: `%s`." % variable_name)
×
690

NEW
691
    runtime = f"Process completed in {time.time() - func_time:.2f} seconds"
×
692
    logging.warning(runtime)
×
693

694

695
def _tmp_nc(
3✔
696
    ii: int,
697
    nc: Union[str, Path],
698
    tempdir: Union[str, Path],
699
    batches: Optional[int] = None,
700
) -> None:
701
    if batches is None:
×
702
        batches = "X"
×
703
    logging.info(f"Processing batch of files {ii + 1} of {batches}")
×
704
    station_file_codes = [x.name.split("_")[0] for x in nc]
×
705

706
    ds = xr.open_mfdataset(nc, combine="nested", concat_dim="station")
×
707
    ds = ds.assign_coords(
×
708
        station_id=xr.DataArray(station_file_codes, dims="station").astype(str)
709
    )
710
    if "flag" in ds.data_vars:
×
711
        ds1 = ds.drop_vars("flag").copy(deep=True)
×
712
        ds1["flag"] = ds.flag.astype(str)
×
713
        ds = ds1
×
714

715
    comp = dict(zlib=True, complevel=5)
×
716
    encoding = {var: comp for var in ds.data_vars}
×
717

718
    with ProgressBar():
×
719
        ds.load().to_netcdf(
×
720
            Path(tempdir).joinpath(f"{str(ii).zfill(3)}.nc"),
721
            engine="h5netcdf",
722
            format="NETCDF4",
723
            encoding=encoding,
724
        )
725
        del ds
×
726

727

728
def merge_converted_variables(
3✔
729
    source: Union[str, Path],
730
    destination: Union[str, Path],
731
    variables: Optional[Union[str, int, List[Union[str, int]]]] = None,
732
) -> None:
733
    """
734

735
    Parameters
736
    ----------
737
    source : Union[str, Path]
738
    destination : Union[str, Path]
739
    variables : Optional[Union[str, int, List[Union[str, int]]]]
740

741
    Returns
742
    -------
743

744
    """
745

746
    def _combine_years(args: Tuple[str, Union[str, Path], Union[str, Path]]) -> None:
×
747
        varia, input_folder, output_folder = args
×
748

749
        ncfiles = sorted(list(input_folder.glob("*.nc")))
×
750
        logging.info(
×
751
            f"Found {len(ncfiles)} files for station code {input_folder.name}."
752
        )
753
        logging.info(f"Opening: {ncfiles}")
×
754

755
        ds = xr.open_mfdataset(
×
756
            ncfiles, parallel=False, combine="by_coords", concat_dim={"time"}
757
        )
758

759
        outfile = output_folder.joinpath(
×
760
            f'{ncfiles[0].name.split(f"_{varia}_")[0]}_{varia}_'
761
            f"{ds.time.dt.year.min().values}-{ds.time.dt.year.max().values}.nc"
762
        )
763
        if not outfile.exists():
×
764
            logging.info(f"Merging to {outfile.name}")
×
765
            comp = dict(zlib=True, complevel=5)
×
766
            encoding = {data_var: comp for data_var in ds.data_vars}
×
767
            encoding["time"] = {"dtype": "single"}
×
768
            with ProgressBar():
×
769
                ds.to_netcdf(outfile, encoding=encoding)
×
770
        else:
771
            logging.info(f"Files exist for {outfile.name}. Continuing...")
×
772

773
    if isinstance(source, str):
×
774
        source = Path(source)
×
775
    if isinstance(destination, str):
×
776
        destination = Path(destination)
×
777

778
    selected_variables = list()
×
779
    if variables is not None:
×
780
        if not isinstance(variables, list):
×
781
            variables = [variables]
×
782
        for var in variables:
×
783
            try:
×
784
                selected_variables.append(cf_hourly_metadata(var))
×
785
            except KeyError:
×
786
                selected_variables.append(cf_hourly_metadata(var))
×
787

788
    variables_found = [x.name for x in source.iterdir() if x.is_dir()]
×
789
    if selected_variables:
×
790
        variables_found = [
×
791
            x
792
            for x in variables_found
793
            if x in [item["nc_name"] for item in selected_variables]
794
        ]
795

796
    for variable in variables_found:
×
797
        logging.info(f"Merging files found for variable: `{variable}`.")
×
798
        station_dirs = [x for x in source.joinpath(variable).iterdir() if x.is_dir()]
×
799
        logging.info(f"Number of stations found: {len(station_dirs)}.")
×
800
        outrep = destination.joinpath(variable)
×
801

802
        Path(outrep).mkdir(parents=True, exist_ok=True)
×
803
        combs = list(itertools.product(*[[variable], station_dirs, [outrep]]))
×
804
        for c in combs:
×
805
            try:
×
806
                _combine_years(c)
×
807
            except ValueError as e:
×
808
                logging.error(
×
809
                    f"`{e}` encountered for station `{c[1].name}`. Continuing..."
810
                )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc