• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2228666049

pending completion
2228666049

Pull #33

github

GitHub
Merge 9a1b0a09b into d076d8475
Pull Request #33: Support CORDEX and CMIP5/6

32 of 162 new or added lines in 15 files covered. (19.75%)

6 existing lines in 5 files now uncovered.

659 of 3225 relevant lines covered (20.43%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.85
/miranda/decode/_decoder.py
1
import logging
3✔
2
import multiprocessing as mp
3✔
3
import os
3✔
4
import re
3✔
5
import warnings
3✔
6
from functools import partial
3✔
7
from logging import config
3✔
8
from os import PathLike
3✔
9
from pathlib import Path
3✔
10
from types import GeneratorType
3✔
11
from typing import Dict, List, Optional, Union
3✔
12

13
import netCDF4 as nc  # noqa
3✔
14
import pandas as pd
3✔
15
import schema
3✔
16
import zarr
3✔
17
from pandas._libs.tslibs import NaTType  # noqa
3✔
18

19
from miranda.cv import INSTITUTIONS, PROJECT_MODELS
3✔
20
from miranda.scripting import LOGGING_CONFIG
3✔
21

22
from ._time import (
3✔
23
    TIME_UNITS_TO_FREQUENCY,
24
    TIME_UNITS_TO_TIMEDELTA,
25
    DecoderError,
26
    date_parser,
27
)
28

29
config.dictConfig(LOGGING_CONFIG)
3✔
30

31
__all__ = [
3✔
32
    "Decoder",
33
    "guess_project",
34
]
35

36

37
def guess_project(file: Union[os.PathLike, str]) -> str:
3✔
38
    file_name = Path(file).stem
×
39

40
    potential_names = file_name.split("_")
×
41
    for project, models in PROJECT_MODELS.items():
×
42
        if any([model in potential_names for model in models]):
×
43
            return project
×
44
    raise DecoderError(f"Unable to determine project from file name: '{file_name}'.")
×
45

46

47
class Decoder:
3✔
48

49
    project = None
3✔
50
    _file_facets = dict()
3✔
51

52
    def __init__(self, project: Optional[str]):
3✔
53
        self.project = project
×
54

55
    @staticmethod
3✔
56
    def _decoder(
3✔
57
        d: dict,
58
        fail_early: bool,
59
        proj: str,
60
        lock,
61
        file: Union[str, Path],
62
    ) -> None:
63
        """
64

65
        Notes
66
        -----
67
        lock is a threading lock object
68
        """
69
        with lock:
×
70
            if proj is None:
×
71
                try:
×
72
                    proj = guess_project(file)
×
73
                except DecoderError:
×
74
                    print(
×
75
                        f"Unable to determine 'project': Signature for 'project' must be set manually for file: {file}."
76
                    )
77
                    if fail_early:
×
78
                        raise
×
79

NEW
80
            decode_function_name = f"decode_{proj.lower().replace('-','_')}"
×
81
            try:
×
82
                _deciphered = getattr(Decoder, decode_function_name)(Path(file))
×
83
                print(
×
84
                    f"Deciphered the following from {Path(file).name}: {_deciphered.items()}"
85
                )
86
                d[file] = _deciphered
×
87
            except AttributeError as e:
×
88
                print(f"Unable to read data from {Path(file).name}: {e}")
×
89
            except schema.SchemaError as e:
×
90
                print(f"Decoded facets from {Path(file).name} are not valid: {e}")
×
91

92
    def decode(
3✔
93
        self,
94
        files: Union[os.PathLike, str, List[Union[str, os.PathLike]], GeneratorType],
95
        raise_error: bool = False,
96
    ):
97
        """Decode facets from file or list of files.
98

99
        Parameters
100
        ----------
101
        files: Union[str, Path, List[Union[str, Path]]]
102
        raise_error: bool
103
        """
UNCOV
104
        if isinstance(files, (str, os.PathLike)):
×
105
            files = [files]
×
106
        if self.project is None:
×
107
            warnings.warn(
×
108
                "The decoder 'project' is not set; Decoding step will be much slower."
109
            )
110
        else:
111
            logging.info(f"Deciphering metadata with project = '{self.project}'")
×
112
        manager = mp.Manager()
×
113
        _file_facets = manager.dict()
×
114
        lock = manager.Lock()
×
NEW
115
        func = partial(self._decoder, _file_facets, raise_error, self.project, lock)
×
116

117
        with mp.Pool() as pool:
×
118
            pool.imap(func, files, chunksize=10)
×
119
            pool.close()
×
120
            pool.join()
×
121

122
        self._file_facets.update(_file_facets)
×
123

124
    def facets_table(self):
3✔
125
        raise NotImplementedError()
×
126

127
    def file_facets(self) -> Dict[os.PathLike, Dict]:
3✔
128
        return self._file_facets
×
129

130
    @classmethod
3✔
131
    def _from_dataset(cls, file: Union[Path, str]) -> (str, str, Dict):
3✔
132
        file_name = Path(file).stem
×
133

134
        variable_name = cls._decode_primary_variable(file)
×
135
        variable_date = file_name.split("_")[-1]
×
136

137
        if file.is_file() and file.suffix in [".nc", ".nc4"]:
×
138
            ds = nc.Dataset(file)
×
139
            data = dict()
×
140
            for k in ds.ncattrs():
×
141
                data[k] = getattr(ds, k)
×
142
        elif file.is_dir() and file.suffix == ".zarr":
×
143
            ds = zarr.open(file, mode="r")
×
144
            data = ds.attrs.asdict()
×
145
        else:
NEW
146
            raise DecoderError(f"Unable to read dataset: `{file.name}`.")
×
147
        return variable_name, variable_date, data
×
148

149
    @staticmethod
3✔
150
    def _decode_primary_variable(file: Path) -> str:
3✔
151
        """Attempts to find the primary variable of a netCDF
152

153
        Parameters
154
        ----------
155
        file: Union[Path, str]
156

157
        Returns
158
        -------
159
        str
160
        """
161
        dimsvar_dict = dict()
×
162
        coords = ("time", "lat", "lon", "rlat", "rlon", "height", "lev", "rotated_pole")
×
163
        suggested_variable = file.name.split("_")[0]
×
164

165
        if file.is_file() and file.suffix in [".nc", ".nc4"]:
×
166
            data = nc.Dataset(file, mode="r")
×
167
            for var_name, var_attrs in data.variables.items():
×
168
                dimsvar_dict[var_name] = {
×
169
                    k: var_attrs.getncattr(k) for k in var_attrs.ncattrs()
170
                }
171
            for k in dimsvar_dict.keys():
×
172
                if not str(k).startswith(coords) and suggested_variable == k:
×
173
                    return str(k)
×
174

175
        elif file.is_dir() and file.suffix == ".zarr":
×
176
            data = zarr.open(str(file), mode="r")
×
177
            for k in data.array_keys():
×
178
                if not str(k).startswith(coords) and suggested_variable == k:
×
179
                    return str(k)
×
180
        else:
181
            raise NotImplementedError()
×
182

183
    @staticmethod
3✔
184
    def _decode_time_info(
3✔
185
        file: Optional[Union[PathLike, str, List[str]]] = None,
186
        data: Optional[Dict] = None,
187
        *,
188
        field: str,
189
    ) -> Union[str, NaTType]:
190
        """
191

192
        Parameters
193
        ----------
194
        file: Union[os.PathLike, str] or List[str], optional
195
        data: dict, optional
196
        field: {"timedelta", "frequency"}
197

198
        Returns
199
        -------
200

201
        """
202
        if not file and not data:
×
203
            raise ValueError()
×
204

205
        if field == "frequency":
×
206
            time_dictionary = TIME_UNITS_TO_FREQUENCY
×
207
        elif field == "timedelta":
×
208
            time_dictionary = TIME_UNITS_TO_TIMEDELTA
×
209
        else:
210
            raise NotImplementedError()
×
211

212
        if isinstance(file, (str, PathLike)):
×
213
            file = Path(file).name.split("_")
×
214

215
        if isinstance(file, list):
×
216
            potential_times = [segment in file for segment in time_dictionary.keys()]
×
217
            if potential_times:
×
218
                if potential_times[0] in ["fx", "fixed"]:
×
219
                    if field == "timedelta":
×
220
                        return pd.NaT
×
221
                    return "fx"
×
222
                if field == "timedelta":
×
223
                    return pd.to_timedelta(time_dictionary[potential_times[0]])
×
224
                return time_dictionary[potential_times[0]]
×
225
        elif data:
×
226
            potential_time = data["frequency"]
×
227
            if potential_time == "":
×
228
                time_units = data["time"].units
×
229
                potential_time = time_units.split()[0]
×
230
            if field == "timedelta":
×
231
                if potential_time in ["fx", "fixed"]:
×
232
                    return pd.NaT
×
233
                return pd.to_timedelta(time_dictionary[potential_time])
×
234
            return time_dictionary[potential_time]
×
235

236
    @classmethod
3✔
237
    def decode_reanalysis(cls, file: Union[PathLike, str]) -> dict:
3✔
238
        variable, date, data = cls._from_dataset(file=file)
×
239

240
        facets = dict()
×
241
        facets.update(data)
×
242
        del facets["history"]
×
243

244
        facets["date"] = date
×
245
        try:  # FIXME: This should be adjusted in files.
×
246
            facets["format"] = data["output_format"]
×
247
        except KeyError:
×
248
            facets["format"] = data["format"]
×
249
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
250
        facets["variable"] = variable
×
251

252
        try:
×
253
            facets["date_start"] = date_parser(date)
×
254
            facets["date_end"] = date_parser(date, end_of_period=True)
×
255
        except DecoderError:
×
256
            pass
×
257

258
        return facets
×
259

260
    @staticmethod
3✔
261
    def decode_eccc_obs(self, file: Union[PathLike, str]) -> dict:
3✔
262
        raise NotImplementedError()
×
263

264
    @staticmethod
3✔
265
    def decode_ahccd_obs(self, file: Union[PathLike, str]) -> dict:
3✔
266
        raise NotImplementedError()
×
267

268
    @staticmethod
3✔
269
    def decode_melcc_obs(self, file: Union[PathLike, str]) -> dict:
3✔
270
        raise NotImplementedError()
×
271

272
    @classmethod
3✔
273
    def decode_pcic_candcs_u6(cls, file: Union[PathLike, str]) -> dict:
3✔
274
        variable, date, data = cls._from_dataset(file=file)
×
275

276
        facets = dict()
×
277
        facets["activity"] = data["activity_id"]
×
278
        facets["mip_era"] = data["project_id"]
×
279
        facets["bias_adjust_institution"] = "PCIC"
×
280
        facets["date"] = date
×
281
        facets["domain"] = data["domain"]
×
282
        facets["experiment"] = str(data["GCM__experiment_id"]).replace(",", "-")
×
283
        facets["format"] = "netcdf"
×
284
        facets["frequency"] = data["frequency"]
×
285
        facets["institution"] = data["GCM__institution_id"]
×
286
        facets["member"] = (
×
287
            f"r{data['GCM__realization_index']}"
288
            f"i{data['GCM__initialization_index']}"
289
            f"p{data['GCM__physics_index']}"
290
            f"f{data['GCM__forcing_index']}"
291
        )
NEW
292
        facets["processing_level"] = "biasadjusted"
×
293
        facets["bias_adjust_project"] = "CanDCS-U6"
×
294
        facets["source"] = data["GCM__source_id"]
×
295
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
296
        facets["type"] = "simulation"
×
297
        facets["variable"] = variable
×
298
        facets["version"] = data["GCM__data_specs_version"]
×
299

300
        try:
×
301
            facets["date_start"] = date_parser(date)
×
302
            facets["date_end"] = date_parser(date, end_of_period=True)
×
303
        except DecoderError:
×
304
            pass
×
305

306
        return facets
×
307

308
    @classmethod
3✔
309
    def decode_cmip6(cls, file: Union[PathLike, str]) -> dict:
3✔
310
        variable, date, data = cls._from_dataset(file=file)
×
311

312
        facets = dict()
×
NEW
313
        facets["activity"] = data["activity_id"]
×
314
        facets["date"] = date
×
315
        facets["domain"] = "global"
×
NEW
316
        facets["experiment"] = data["experiment_id"]
×
317
        facets["format"] = "netcdf"
×
NEW
318
        facets["frequency"] = data["frequency"]
×
NEW
319
        facets["grid_label"] = data["grid_label"]
×
NEW
320
        facets["institution"] = data["institution_id"]
×
NEW
321
        facets["member"] = data["variant_label"]
×
NEW
322
        facets["modeling_realm"] = data["realm"]
×
323
        facets["processing_level"] = "raw"
×
NEW
324
        facets["mip_era"] = data["mip_era"]
×
NEW
325
        facets["source"] = data["source_id"]
×
326
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
327
        facets["type"] = "simulation"
×
328
        facets["variable"] = variable
×
329

UNCOV
330
        try:
×
NEW
331
            facets["version"] = data["version"]
×
NEW
332
        except KeyError:
×
NEW
333
            possible_version = Path(file).parent.name
×
NEW
334
            if re.match(r"^[vV]\d+", possible_version):
×
NEW
335
                facets["version"] = Path(file).parent.name
×
336
            else:
NEW
337
                facets["version"] = "vNotFound"
×
338

339
        try:
×
NEW
340
            facets["date_start"] = date_parser(date)
×
NEW
341
            facets["date_end"] = date_parser(date, end_of_period=True)
×
342
        except DecoderError:
×
343
            pass
×
344

345
        return facets
×
346

347
    @classmethod
3✔
348
    def decode_cmip5(cls, file: Union[PathLike, str]) -> dict:
3✔
349
        variable, date, data = cls._from_dataset(file=file)
×
350

351
        facets = dict()
×
352
        facets["activity"] = "CMIP"
×
353
        facets["date"] = date
×
354
        facets["domain"] = "global"
×
355
        facets["experiment"] = data["experiment_id"]
×
356
        facets["format"] = "netcdf"
×
357
        facets["frequency"] = cls._decode_time_info(data=data, field="frequency")
×
358
        facets["institution"] = data["institute_id"]
×
359
        facets["member"] = data["parent_experiment_rip"]
×
360
        facets["modeling_realm"] = data["modeling_realm"]
×
361
        facets["processing_level"] = "raw"
×
362
        facets["mip_era"] = data["project_id"]
×
363
        facets["source"] = data["model_id"]
×
364
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
365
        facets["type"] = "simulation"
×
366
        facets["variable"] = variable
×
367

368
        try:
×
NEW
369
            facets["version"] = data["version"]
×
NEW
370
        except KeyError:
×
NEW
371
            possible_version = Path(file).parent.name
×
NEW
372
            if re.match(r"^[vV]\d+", possible_version):
×
NEW
373
                facets["version"] = Path(file).parent.name
×
374
            else:
NEW
375
                facets["version"] = "vNotFound"
×
376

377
        try:
×
NEW
378
            facets["date_start"] = date_parser(date)
×
NEW
379
            facets["date_end"] = date_parser(date, end_of_period=True)
×
380
        except DecoderError:
×
381
            pass
×
382

383
        return facets
×
384

385
    @classmethod
3✔
386
    def decode_cordex(cls, file: Union[PathLike, str]) -> dict:
3✔
387
        variable, date, data = cls._from_dataset(file=file)
×
388

389
        # FIXME: What to do about our internal data that breaks all established conventions?
390
        facets = dict()
×
391
        facets["activity"] = "CMIP"
×
392
        facets["project"] = "CORDEX"
×
393

394
        if data["project_id"] == "":
×
395
            facets["mip_era"] = "internal"
×
396
        elif data["project_id"] == "CORDEX":
×
397
            facets["mip_era"] = "CMIP5"
×
398

NEW
399
        if date == "r0i0p0":
×
NEW
400
            facets["date"] = "fx"
×
401
        else:
NEW
402
            facets["date"] = date
×
403

404
        try:
×
405
            facets["domain"] = data["CORDEX_domain"].strip()
×
406
        except KeyError:
×
407
            try:
×
408
                facets["domain"] = data["ouranos_domain_name"].strip()
×
409
            except KeyError:
×
410
                msg = f"File {Path(file).name} has a nonstandard domain name."
×
411
                logging.error(msg)
×
412
                raise NotImplementedError(msg)
×
413

414
        driving_institution_parts = str(data["driving_model_id"]).split("-")
×
415
        if driving_institution_parts[0] in INSTITUTIONS:
×
416
            driving_institution = driving_institution_parts[0]
×
417
        elif "-".join(driving_institution_parts[:2]) in INSTITUTIONS:
×
418
            driving_institution = "-".join(driving_institution_parts[:2])
×
419
        elif "-".join(driving_institution_parts[:3]) in INSTITUTIONS:
×
420
            driving_institution = "-".join(driving_institution_parts[:3])
×
421
        else:
422
            raise AttributeError("driving_institution not valid.")
×
423

424
        facets["driving_institution"] = driving_institution
×
425
        facets["driving_model"] = data["driving_model_id"]
×
426
        facets["format"] = "netcdf"
×
427
        facets["frequency"] = cls._decode_time_info(data=data, field="frequency")
×
428

429
        if data["institute_id"].strip() == "Our.":
×
430
            facets["institution"] = "Ouranos"
×
431
        else:
432
            facets["institution"] = data["institute_id"].strip()
×
433

434
        facets["processing_level"] = "raw"
×
435
        facets["source"] = data["model_id"]
×
436
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
437
        facets["type"] = "simulation"
×
438
        facets["variable"] = variable
×
439

NEW
440
        try:
×
NEW
441
            facets["version"] = data["version"]
×
NEW
442
        except KeyError:
×
NEW
443
            possible_version = Path(file).parent.name
×
NEW
444
            if re.match(r"^[vV]\d+", possible_version):
×
NEW
445
                facets["version"] = Path(file).parent.name
×
446
            else:
NEW
447
                facets["version"] = "vNotFound"
×
448

449
        try:
×
450
            facets["date_start"] = date_parser(date)
×
451
            facets["date_end"] = date_parser(date, end_of_period=True)
×
452
        except DecoderError:
×
453
            pass
×
454

455
        try:
×
456
            facets["experiment"] = data["experiment_id"].strip()
×
457
        except KeyError:
×
458
            facets["experiment"] = data["driving_experiment_name"].strip()
×
459

460
        try:
×
461
            facets["member"] = data["parent_experiment_rip"].strip()
×
NEW
462
            if facets["member"] == "N/A":
×
NEW
463
                raise KeyError()
×
464
        except KeyError:
×
465
            facets["member"] = data["driving_model_ensemble_member"].strip()
×
466

467
        return facets
×
468

469
    @classmethod
3✔
470
    def decode_isimip_ft(cls, file: Union[PathLike, str]) -> dict:
3✔
471
        variable, date, data = cls._from_dataset(file=file)
×
472

473
        facets = dict()
×
474
        facets["activity"] = "ISIMIP"
×
475
        facets["mip_era"] = data["project_id"]
×
476

477
        facets["date"] = date
×
478
        facets["domain"] = "global"
×
479
        facets["co2_forcing_id"] = data["co2_forcing_id"]
×
480
        facets["experiment"] = data["experiment_id"]
×
481
        facets["format"] = "netcdf"
×
482
        facets["frequency"] = cls._decode_time_info(data=data, field="frequency")
×
483
        facets["impact_model"] = data["impact_model_id"]
×
484
        facets["institution"] = data["institute_id"]
×
485
        facets["member"] = data["driving_model_ensemble_member"]
×
486
        facets["modeling_realm"] = data["modeling_realm"]
×
487
        facets["social_forcing_id"] = data["social_forcing_id"]
×
488
        facets["source"] = data["model_id"]
×
489
        facets["timedelta"] = cls._decode_time_info(data=data, field="timedelta")
×
490
        facets["type"] = "simulation"
×
491
        facets["variable"] = variable
×
492

493
        try:
×
494
            facets["date_start"] = date_parser(date)
×
495
            facets["date_end"] = date_parser(date, end_of_period=True)
×
496
        except DecoderError:
×
497
            pass
×
498

499
        return facets
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc