• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2276398114

pending completion
2276398114

Pull #34

github

GitHub
Merge 7c221cd23 into 1d7757a9e
Pull Request #34: ECCC summary conversion refactor and RVT support

1 of 17 new or added lines in 1 file covered. (5.88%)

245 existing lines in 8 files now uncovered.

661 of 3274 relevant lines covered (20.19%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.64
/miranda/decode/_decoder.py
1
import logging
3✔
2
import multiprocessing as mp
3✔
3
import os
3✔
4
import re
3✔
5
import warnings
3✔
6
from functools import partial
3✔
7
from logging import config
3✔
8
from os import PathLike
3✔
9
from pathlib import Path
3✔
10
from types import GeneratorType
3✔
11
from typing import Dict, List, Optional, Union
3✔
12

13
import netCDF4 as nc  # noqa
3✔
14
import pandas as pd
3✔
15
import schema
3✔
16
import zarr
3✔
17
from pandas._libs.tslibs import NaTType  # noqa
3✔
18

19
from miranda.cv import INSTITUTIONS, PROJECT_MODELS
3✔
20
from miranda.scripting import LOGGING_CONFIG
3✔
21
from miranda.validators import FACETS_SCHEMA
3✔
22

23
from ._time import (
3✔
24
    TIME_UNITS_TO_FREQUENCY,
25
    TIME_UNITS_TO_TIMEDELTA,
26
    DecoderError,
27
    date_parser,
28
)
29

30
config.dictConfig(LOGGING_CONFIG)
3✔
31

32
__all__ = [
3✔
33
    "Decoder",
34
    "guess_project",
35
]
36

37

38
def guess_project(file: Union[os.PathLike, str]) -> str:
3✔
39
    file_name = Path(file).stem
×
40

41
    potential_names = file_name.split("_")
×
42
    for project, models in PROJECT_MODELS.items():
×
43
        if any([model in potential_names for model in models]):
×
44
            return project
×
45
    raise DecoderError(f"Unable to determine project from file name: '{file_name}'.")
×
46

47

48
class Decoder:
3✔
49

50
    project = None
3✔
51
    _file_facets = dict()
3✔
52

53
    def __init__(self, project: Optional[str]):
3✔
54
        self.project = project
×
55

56
    @staticmethod
3✔
57
    def _decoder(
3✔
58
        d: dict,
59
        fail_early: bool,
60
        proj: str,
61
        lock,
62
        file: Union[str, Path],
63
    ) -> None:
64
        """
65

66
        Notes
67
        -----
68
        lock is a threading lock object
69
        """
70
        with lock:
×
71
            if proj is None:
×
72
                try:
×
73
                    proj = guess_project(file)
×
74
                except DecoderError:
×
75
                    print(
×
76
                        f"Unable to determine 'project': Signature for 'project' must be set manually for file: {file}."
77
                    )
UNCOV
78
                    if fail_early:
×
79
                        raise
×
80

UNCOV
81
            decode_function_name = f"decode_{proj.lower().replace('-','_')}"
×
82
            try:
×
83
                _deciphered = getattr(Decoder, decode_function_name)(Path(file))
×
84
                if fail_early:
×
85
                    FACETS_SCHEMA.validate(_deciphered)
×
86
                print(
×
87
                    f"Deciphered the following from {Path(file).name}: {_deciphered.items()}"
88
                )
UNCOV
89
                d[file] = _deciphered
×
90
            except AttributeError as e:
×
91
                print(f"Unable to read data from {Path(file).name}: {e}")
×
92
            except schema.SchemaError as e:
×
93
                print(f"Decoded facets from {Path(file).name} are not valid: {e}")
×
94
                raise
×
95

96
    def decode(
3✔
97
        self,
98
        files: Union[os.PathLike, str, List[Union[str, os.PathLike]], GeneratorType],
99
        raise_error: bool = False,
100
    ) -> None:
101
        """Decode facets from file or list of files.
102

103
        Parameters
104
        ----------
105
        files: Union[str, Path, List[Union[str, Path]]]
106
        raise_error: bool
107
        """
UNCOV
108
        if isinstance(files, (str, os.PathLike)):
×
UNCOV
109
            files = [files]
×
UNCOV
110
        if self.project is None:
×
UNCOV
111
            warnings.warn(
×
112
                "The decoder 'project' is not set; Decoding step will be much slower."
113
            )
114
        else:
115
            logging.info(f"Deciphering metadata with project = '{self.project}'")
×
116
        manager = mp.Manager()
×
UNCOV
117
        _file_facets = manager.dict()
×
UNCOV
118
        lock = manager.Lock()
×
UNCOV
119
        func = partial(self._decoder, _file_facets, raise_error, self.project, lock)
×
120

121
        with mp.Pool() as pool:
×
122
            pool.imap(func, files, chunksize=10)
×
123
            pool.close()
×
124
            pool.join()
×
125

UNCOV
126
        self._file_facets.update(_file_facets)
×
127

128
    def facets_table(self):
3✔
129
        raise NotImplementedError()
×
130

131
    def file_facets(self) -> Dict[os.PathLike, Dict]:
3✔
132
        return self._file_facets
×
133

134
    @classmethod
3✔
135
    def _from_dataset(cls, file: Union[Path, str]) -> (str, str, Dict):
3✔
136
        file_name = Path(file).stem
×
137

138
        variable_name = cls._decode_primary_variable(file)
×
139
        variable_date = file_name.split("_")[-1]
×
140

141
        if file.is_file() and file.suffix in [".nc", ".nc4"]:
×
142
            ds = nc.Dataset(file)
×
143
            data = dict()
×
144
            for k in ds.ncattrs():
×
145
                data[k] = getattr(ds, k)
×
146
        elif file.is_dir() and file.suffix == ".zarr":
×
147
            ds = zarr.open(file, mode="r")
×
UNCOV
148
            data = ds.attrs.asdict()
×
149
        else:
UNCOV
150
            raise DecoderError(f"Unable to read dataset: `{file.name}`.")
×
151
        return variable_name, variable_date, data
×
152

153
    @staticmethod
3✔
154
    def _decode_primary_variable(file: Path) -> str:
3✔
155
        """Attempts to find the primary variable of a netCDF
156

157
        Parameters
158
        ----------
159
        file: Union[Path, str]
160

161
        Returns
162
        -------
163
        str
164
        """
165
        dimsvar_dict = dict()
×
166
        coords = ("time", "lat", "lon", "rlat", "rlon", "height", "lev", "rotated_pole")
×
167
        suggested_variable = file.name.split("_")[0]
×
168

169
        if file.is_file() and file.suffix in [".nc", ".nc4"]:
×
170
            data = nc.Dataset(file, mode="r")
×
UNCOV
171
            for var_name, var_attrs in data.variables.items():
×
172
                dimsvar_dict[var_name] = {
×
173
                    k: var_attrs.getncattr(k) for k in var_attrs.ncattrs()
174
                }
UNCOV
175
            for k in dimsvar_dict.keys():
×
UNCOV
176
                if not str(k).startswith(coords) and suggested_variable == k:
×
UNCOV
177
                    return str(k)
×
178

179
        elif file.is_dir() and file.suffix == ".zarr":
×
180
            data = zarr.open(str(file), mode="r")
×
181
            for k in data.array_keys():
×
182
                if not str(k).startswith(coords) and suggested_variable == k:
×
183
                    return str(k)
×
184
        else:
UNCOV
185
            raise NotImplementedError()
×
186

187
    @staticmethod
3✔
188
    def _decode_time_info(
3✔
189
        file: Optional[Union[PathLike, str, List[str]]] = None,
190
        data: Optional[Dict] = None,
191
        *,
192
        field: str,
193
    ) -> Union[str, NaTType]:
194
        """
195

196
        Parameters
197
        ----------
198
        file: Union[os.PathLike, str] or List[str], optional
199
        data: dict, optional
200
        field: {"timedelta", "frequency"}
201

202
        Returns
203
        -------
204

205
        """
206
        if not file and not data:
×
207
            raise ValueError()
×
208

209
        if field == "frequency":
×
210
            time_dictionary = TIME_UNITS_TO_FREQUENCY
×
UNCOV
211
        elif field == "timedelta":
×
212
            time_dictionary = TIME_UNITS_TO_TIMEDELTA
×
213
        else:
UNCOV
214
            raise NotImplementedError()
×
215

UNCOV
216
        if isinstance(file, (str, PathLike)):
×
UNCOV
217
            file = Path(file).name.split("_")
×
218

UNCOV
219
        if isinstance(file, list):
×
UNCOV
220
            potential_times = [segment in file for segment in time_dictionary.keys()]
×
UNCOV
221
            if potential_times:
×
UNCOV
222
                if potential_times[0] in ["fx", "fixed"]:
×
UNCOV
223
                    if field == "timedelta":
×
UNCOV
224
                        return pd.NaT
×
225
                    return "fx"
×
UNCOV
226
                if field == "timedelta":
×
227
                    return pd.to_timedelta(time_dictionary[potential_times[0]])
×
228
                return time_dictionary[potential_times[0]]
×
229
        elif data:
×
230
            potential_time = data["frequency"]
×
231
            if potential_time == "":
×
232
                time_units = data["time"].units
×
233
                potential_time = time_units.split()[0]
×
234
            if field == "timedelta":
×
235
                if potential_time in ["fx", "fixed"]:
×
236
                    return pd.NaT
×
237
                return pd.to_timedelta(time_dictionary[potential_time])
×
238
            return time_dictionary[potential_time]
×
239

240
    @classmethod
3✔
241
    def decode_reanalysis(cls, file: Union[PathLike, str]) -> dict:
3✔
242
        variable, date, data = cls._from_dataset(file=file)
×
243

244
        facets = dict()
×
245
        facets.update(data)
×
246
        del facets["history"]
×
247

UNCOV
248
        facets["date"] = date
×
249

250
        file_format = data.get("output_format")
×
251
        if format:
×
252
            facets["format"] = file_format
×
253
        else:
254
            facets["format"] = data["format"]
×
255

256
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
257
        facets["variable"] = variable
×
258

259
        try:
×
260
            facets["date_start"] = date_parser(date)
×
261
            facets["date_end"] = date_parser(date, end_of_period=True)
×
262
        except DecoderError:
×
263
            pass
×
264

265
        return facets
×
266

267
    @staticmethod
3✔
268
    def decode_eccc_obs(self, file: Union[PathLike, str]) -> dict:
3✔
UNCOV
269
        raise NotImplementedError()
×
270

271
    @staticmethod
3✔
272
    def decode_ahccd_obs(self, file: Union[PathLike, str]) -> dict:
3✔
UNCOV
273
        raise NotImplementedError()
×
274

275
    @staticmethod
3✔
276
    def decode_melcc_obs(self, file: Union[PathLike, str]) -> dict:
3✔
UNCOV
277
        raise NotImplementedError()
×
278

279
    @classmethod
3✔
280
    def decode_pcic_candcs_u6(cls, file: Union[PathLike, str]) -> dict:
3✔
281
        variable, date, data = cls._from_dataset(file=file)
×
282

UNCOV
283
        facets = dict()
×
284
        facets["activity"] = data["activity_id"]
×
UNCOV
285
        facets["mip_era"] = data["project_id"]
×
286
        facets["bias_adjust_institution"] = "PCIC"
×
287
        facets["date"] = date
×
UNCOV
288
        facets["domain"] = data["domain"]
×
289
        facets["experiment"] = str(data["GCM__experiment_id"]).replace(",", "-")
×
290
        facets["format"] = "netcdf"
×
291
        facets["frequency"] = data["frequency"]
×
292
        facets["institution"] = data["GCM__institution_id"]
×
293
        facets["member"] = (
×
294
            f"r{data['GCM__realization_index']}"
295
            f"i{data['GCM__initialization_index']}"
296
            f"p{data['GCM__physics_index']}"
297
            f"f{data['GCM__forcing_index']}"
298
        )
299
        facets["processing_level"] = "biasadjusted"
×
UNCOV
300
        facets["bias_adjust_project"] = "CanDCS-U6"
×
UNCOV
301
        facets["source"] = data["GCM__source_id"]
×
UNCOV
302
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
303
        facets["type"] = "simulation"
×
UNCOV
304
        facets["variable"] = variable
×
UNCOV
305
        facets["version"] = data["GCM__data_specs_version"]
×
306

307
        try:
×
UNCOV
308
            facets["date_start"] = date_parser(date)
×
UNCOV
309
            facets["date_end"] = date_parser(date, end_of_period=True)
×
UNCOV
310
        except DecoderError:
×
311
            pass
×
312

313
        return facets
×
314

315
    @classmethod
3✔
316
    def decode_cmip6(cls, file: Union[PathLike, str]) -> dict:
3✔
317
        variable, date, data = cls._from_dataset(file=file)
×
318

319
        facets = dict()
×
320
        facets["activity"] = data["activity_id"]
×
321
        facets["date"] = date
×
322
        facets["domain"] = "global"
×
323
        facets["experiment"] = data["experiment_id"]
×
UNCOV
324
        facets["format"] = "netcdf"
×
UNCOV
325
        facets["frequency"] = data["frequency"]
×
UNCOV
326
        facets["grid_label"] = data["grid_label"]
×
UNCOV
327
        facets["institution"] = data["institution_id"]
×
UNCOV
328
        facets["member"] = data["variant_label"]
×
329
        facets["modeling_realm"] = data["realm"]
×
330
        facets["processing_level"] = "raw"
×
331
        facets["mip_era"] = data["mip_era"]
×
332
        facets["source"] = data["source_id"]
×
333
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
334
        facets["type"] = "simulation"
×
335
        facets["variable"] = variable
×
336

337
        try:
×
338
            facets["version"] = data["version"]
×
339
        except KeyError:
×
340
            possible_version = Path(file).parent.name
×
341
            if re.match(r"^[vV]\d+", possible_version):
×
UNCOV
342
                facets["version"] = Path(file).parent.name
×
343
            else:
UNCOV
344
                facets["version"] = "vNotFound"
×
345

UNCOV
346
        try:
×
347
            facets["date_start"] = date_parser(date)
×
UNCOV
348
            facets["date_end"] = date_parser(date, end_of_period=True)
×
349
        except DecoderError:
×
350
            pass
×
351

352
        return facets
×
353

354
    @classmethod
3✔
355
    def decode_cmip5(cls, file: Union[PathLike, str]) -> dict:
3✔
356
        variable, date, data = cls._from_dataset(file=file)
×
357

358
        facets = dict()
×
359
        facets["activity"] = "CMIP"
×
360
        facets["date"] = date
×
361
        facets["domain"] = "global"
×
362
        facets["experiment"] = data["experiment_id"]
×
363
        facets["format"] = "netcdf"
×
364
        facets["frequency"] = cls._decode_time_info(data=data, field="frequency")
×
365
        facets["institution"] = data["institute_id"]
×
366
        facets["member"] = data["parent_experiment_rip"]
×
367
        facets["modeling_realm"] = data["modeling_realm"]
×
UNCOV
368
        facets["processing_level"] = "raw"
×
369
        facets["mip_era"] = data["project_id"]
×
370
        facets["source"] = data["model_id"]
×
371
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
372
        facets["type"] = "simulation"
×
373
        facets["variable"] = variable
×
374

UNCOV
375
        try:
×
376
            facets["version"] = data["version"]
×
UNCOV
377
        except KeyError:
×
378
            possible_version = Path(file).parent.name
×
379
            if re.match(r"^[vV]\d+", possible_version):
×
380
                facets["version"] = Path(file).parent.name
×
381
            else:
382
                facets["version"] = "vNotFound"
×
383

384
        try:
×
UNCOV
385
            facets["date_start"] = date_parser(date)
×
UNCOV
386
            facets["date_end"] = date_parser(date, end_of_period=True)
×
UNCOV
387
        except DecoderError:
×
388
            pass
×
389

390
        return facets
×
391

392
    @classmethod
3✔
393
    def decode_cordex(cls, file: Union[PathLike, str]) -> dict:
3✔
394
        variable, date, data = cls._from_dataset(file=file)
×
395

396
        # FIXME: What to do about our internal data that breaks all established conventions?
397
        facets = dict()
×
398
        facets["activity"] = "CMIP"
×
399
        facets["project"] = "CORDEX"
×
400

401
        if data.get("project_id") == "" or data.get("project_id") is None:
×
402
            facets["mip_era"] = "internal"
×
403
        elif data.get("project_id") == "CORDEX":
×
404
            facets["mip_era"] = "CMIP5"
×
405

406
        if date == "r0i0p0":
×
407
            facets["date"] = "fx"
×
408
        else:
409
            facets["date"] = date
×
410

411
        domain = data.get("CORDEX_domain").strip()
×
412
        if domain:
×
413
            facets["domain"] = domain
×
414
        else:
415
            domain = data.get("ouranos_domain_name").strip()
×
416
            if domain:
×
417
                facets["domain"] = domain
×
418
            else:
419
                msg = f"File {Path(file).name} has a nonstandard domain name."
×
420
                logging.error(msg)
×
421
                raise NotImplementedError(msg)
×
422

423
        # CORDEX-NAM on AWS mis-attributes the domain (22/44 should be 22i/44i)
424
        aws_keys = data.get("intake_esm_dataset_key")
×
425
        if aws_keys:
×
426
            facets["domain"] = aws_keys.split(".")[3]
×
427

428
        driving_institution_parts = str(data["driving_model_id"]).split("-")
×
429
        if driving_institution_parts[0] in INSTITUTIONS:
×
430
            driving_institution = driving_institution_parts[0]
×
431
        elif "-".join(driving_institution_parts[:2]) in INSTITUTIONS:
×
432
            driving_institution = "-".join(driving_institution_parts[:2])
×
433
        elif "-".join(driving_institution_parts[:3]) in INSTITUTIONS:
×
434
            driving_institution = "-".join(driving_institution_parts[:3])
×
435
        else:
436
            raise AttributeError("driving_institution not valid.")
×
437

438
        facets["driving_institution"] = driving_institution
×
UNCOV
439
        facets["driving_model"] = data["driving_model_id"]
×
440
        facets["format"] = "netcdf"
×
441
        facets["frequency"] = cls._decode_time_info(data=data, field="frequency")
×
442

443
        if data["institute_id"].strip() == "Our.":
×
444
            facets["institution"] = "Ouranos"
×
445
        else:
446
            facets["institution"] = data["institute_id"].strip()
×
447

448
        facets["processing_level"] = "raw"
×
449
        facets["source"] = data["model_id"]
×
UNCOV
450
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
451
        facets["type"] = "simulation"
×
UNCOV
452
        facets["variable"] = variable
×
453

454
        try:
×
455
            facets["version"] = data["version"]
×
456
        except KeyError:
×
457
            possible_version = Path(file).parent.name
×
UNCOV
458
            if re.match(r"^[vV]\d+", possible_version):
×
459
                facets["version"] = Path(file).parent.name
×
460
            else:
UNCOV
461
                facets["version"] = "vNotFound"
×
462

463
        try:
×
464
            facets["date_start"] = date_parser(date)
×
465
            facets["date_end"] = date_parser(date, end_of_period=True)
×
UNCOV
466
        except DecoderError:
×
467
            pass
×
468

469
        try:
×
470
            facets["experiment"] = data["experiment_id"].strip()
×
UNCOV
471
        except KeyError:
×
472
            facets["experiment"] = data["driving_experiment_name"].strip()
×
473

UNCOV
474
        try:
×
475
            for potential_member in ["parent_experiment_rip", "parent_experiment"]:
×
UNCOV
476
                facets["member"] = data.get(potential_member)
×
477
                if facets["member"] == "N/A":
×
478
                    raise KeyError()
×
479
                else:
UNCOV
480
                    break
×
481
            if facets["member"] is None:
×
482
                raise KeyError()
×
483
        except KeyError:
×
UNCOV
484
            facets["member"] = data["driving_model_ensemble_member"].strip()
×
485

486
        return facets
×
487

488
    @classmethod
3✔
489
    def decode_isimip_ft(cls, file: Union[PathLike, str]) -> dict:
3✔
490
        variable, date, data = cls._from_dataset(file=file)
×
491

492
        facets = dict()
×
493
        facets["activity"] = "ISIMIP"
×
494
        facets["mip_era"] = data["project_id"]
×
495

UNCOV
496
        facets["date"] = date
×
497
        facets["domain"] = "global"
×
498
        facets["co2_forcing_id"] = data["co2_forcing_id"]
×
499
        facets["experiment"] = data["experiment_id"]
×
500
        facets["format"] = "netcdf"
×
501
        facets["frequency"] = cls._decode_time_info(data=data, field="frequency")
×
502
        facets["impact_model"] = data["impact_model_id"]
×
503
        facets["institution"] = data["institute_id"]
×
UNCOV
504
        facets["member"] = data["driving_model_ensemble_member"]
×
505
        facets["modeling_realm"] = data["modeling_realm"]
×
UNCOV
506
        facets["social_forcing_id"] = data["social_forcing_id"]
×
507
        facets["source"] = data["model_id"]
×
508
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
509
        facets["type"] = "simulation"
×
510
        facets["variable"] = variable
×
511

512
        try:
×
513
            facets["date_start"] = date_parser(date)
×
UNCOV
514
            facets["date_end"] = date_parser(date, end_of_period=True)
×
515
        except DecoderError:
×
UNCOV
516
            pass
×
517

518
        return facets
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc