• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2117652156

pending completion
2117652156

Pull #24

github

GitHub
Merge 0bbc89f42 into bf78f91b7
Pull Request #24: Add CMIP file structure, use pyessv controlled vocabularies, and major refactoring

242 of 1086 new or added lines in 35 files covered. (22.28%)

13 existing lines in 4 files now uncovered.

736 of 3230 relevant lines covered (22.79%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/miranda/structure/_structure.py
NEW
1
import logging.config
×
NEW
2
import multiprocessing
×
NEW
3
import os
×
NEW
4
import shutil
×
NEW
5
import sys
×
NEW
6
from functools import partial
×
NEW
7
from pathlib import Path
×
NEW
8
from types import GeneratorType
×
NEW
9
from typing import List, Mapping, Optional, Union
×
10

NEW
11
from miranda import Decoder
×
NEW
12
from miranda.decode import guess_project
×
NEW
13
from miranda.scripting import LOGGING_CONFIG
×
NEW
14
from miranda.utils import filefolder_iterator
×
NEW
15
from miranda.validators import GRIDDED_SCHEMA, SIMULATION_SCHEMA, STATION_OBS_SCHEMA
×
16

NEW
17
logging.config.dictConfig(LOGGING_CONFIG)
×
18

NEW
19
__all__ = [
×
20
    "build_path_from_schema",
21
    "structure_datasets",
22
]
23

24

NEW
25
def _structure_datasets(
×
26
    in_file: Path, out_path: Path, method: str, dry_run: bool = False
27
):
NEW
28
    method_mod = ""
×
NEW
29
    if in_file.is_dir():
×
NEW
30
        method_mod = "tree"
×
NEW
31
    if method.lower() in ["move", "copy"]:
×
NEW
32
        meth = "Moved" if method.lower() == "move" else "Copied"
×
NEW
33
        output_file = out_path.joinpath(in_file.name)
×
NEW
34
        try:
×
NEW
35
            if not dry_run:
×
NEW
36
                if sys.version_info < (3, 9):
×
NEW
37
                    getattr(shutil, f"{method}{method_mod}")(
×
38
                        str(in_file), str(output_file)
39
                    )
40
                else:
NEW
41
                    getattr(shutil, f"{method}{method_mod}")(in_file, output_file)
×
NEW
42
            logging.info(f"{meth} {in_file.name} to {output_file}.")
×
NEW
43
        except FileExistsError:
×
NEW
44
            logging.warning(f"{in_file.name} already exists at location. Continuing...")
×
45

46

NEW
47
def build_path_from_schema(
×
48
    facets: dict, output_folder: Union[str, os.PathLike]
49
) -> Path:
50
    """Build a filepath based on a valid data schema.
51

52
    Parameters
53
    ----------
54
    facets: dict
55
      Facets for a given dataset.
56
    output_folder
57
      Parent folder on which to extend the filetree structure.
58

59
    Returns
60
    -------
61
    Path
62
    """
NEW
63
    if facets["type"] == "station-obs":
×
NEW
64
        STATION_OBS_SCHEMA.validate(facets)
×
NEW
65
        folder_tree = (
×
66
            Path(output_folder)
67
            / facets["type"]
68
            / facets["project"]
69
            / facets["institution"]
70
            / facets["version"]  # This suggests "date_created"
71
            / facets["frequency"]
72
            / facets["variable"]
73
        )
NEW
74
        if hasattr(facets, "member"):
×
NEW
75
            return folder_tree / facets["member"]
×
NEW
76
        return folder_tree
×
77

NEW
78
    if facets["type"] in ["forecast", "gridded-obs", "reanalysis"]:
×
NEW
79
        GRIDDED_SCHEMA.validate(facets)
×
NEW
80
        return (
×
81
            Path(output_folder)
82
            / facets["type"]
83
            / facets["institution"]
84
            / facets["source"]
85
            / facets["project"]
86
            / facets["domain"]
87
            / facets["frequency"]
88
            / facets["variable"]
89
        )
90

NEW
91
    if facets["type"] == "simulation":
×
NEW
92
        SIMULATION_SCHEMA.validate(facets)
×
NEW
93
        if facets["processing_level"] == "raw":
×
NEW
94
            if facets["project"] == "CORDEX":
×
NEW
95
                model = facets["driving_model"]
×
96
            else:
NEW
97
                model = facets["member"]
×
NEW
98
            return (
×
99
                Path(output_folder)
100
                / facets["type"]
101
                / facets["processing_level"]
102
                / facets["activity"]
103
                / facets["project"]
104
                / facets["domain"]
105
                / facets["source"]
106
                / model
107
                / facets["experiment"]
108
                / facets["member"]
109
                / facets["frequency"]
110
                / facets["variable"]
111
            )
NEW
112
        elif facets["processing_level"] == "bias_adjusted":
×
NEW
113
            return (
×
114
                Path(output_folder)
115
                / facets["type"]
116
                / facets["processing_level"]
117
                / facets["project"]
118
                / facets["bias_adjust_institute"]
119
                / facets["domain"]
120
                / facets["project"]
121
                / facets["source"]
122
                / facets["model"]
123
                / facets["experiment"]
124
                / facets["member"]
125
                / facets["frequency"]
126
                / facets["variable"]
127
            )
128

NEW
129
    raise ValueError("No appropriate data schemas found.")
×
130

131

NEW
132
def structure_datasets(
×
133
    input_files: Union[str, os.PathLike, List[Union[str, os.PathLike]], GeneratorType],
134
    output_folder: Union[str, os.PathLike],
135
    *,
136
    project: Optional[str] = None,
137
    guess: bool = True,
138
    dry_run: bool = False,
139
    method: str = "copy",
140
    make_dirs: bool = False,
141
    filename_pattern: str = "*.nc",
142
) -> Mapping[Path, Path]:
143
    """
144

145
    Parameters
146
    ----------
147
    input_files: str or Path or list of str or Path or GeneratorType
148
    output_folder: str or Path
149
    project: {"cordex", "cmip5", "cmip6", "isimip-ft", "reanalysis"}, optional
150
    guess: bool
151
      If project not supplied, suggest to decoder that project is the same for all input_files. Default: True.
152
    dry_run: bool
153
      Prints changes that would have been made without performing them. Default: False.
154
    method: {"move", "copy"}
155
      Method to transfer files to intended location. Default: "move".
156
    make_dirs:
157
      Make folder tree if it does not already exist. Default: False.
158
    filename_pattern: str
159
      If pattern ends with "zarr", will 'glob' with provided pattern.
160
      Otherwise, will perform an 'rglob' (recursive) operation.
161

162
    Returns
163
    -------
164
    dict
165
    """
NEW
166
    input_files = filefolder_iterator(input_files, filename_pattern)
×
NEW
167
    if not project and guess:
×
168
        # Examine the first file from a list or generator
NEW
169
        for f in input_files:
×
NEW
170
            project = guess_project(f)
×
NEW
171
            decoder = Decoder(project)
×
NEW
172
            decoder.decode(f)
×
NEW
173
            break
×
174
        else:
NEW
175
            raise FileNotFoundError()
×
NEW
176
        decoder.decode(input_files)
×
177
    else:
NEW
178
        decoder = Decoder(project)
×
NEW
179
        decoder.decode(input_files)
×
180

NEW
181
    all_file_paths = dict()
×
NEW
182
    for file, facets in decoder.file_facets().items():
×
NEW
183
        output_filepath = build_path_from_schema(facets, output_folder)
×
NEW
184
        all_file_paths.update({Path(file): output_filepath})
×
185

NEW
186
    if make_dirs:
×
NEW
187
        for new_paths in set(all_file_paths.values()):
×
NEW
188
            Path(new_paths).mkdir(exist_ok=True, parents=True)
×
189

190
    # multiprocessing copy
NEW
191
    func = partial(_structure_datasets, method=method, dry_run=dry_run)
×
NEW
192
    with multiprocessing.Pool() as pool:
×
NEW
193
        pool.starmap(func, zip(all_file_paths.keys(), all_file_paths.values()))
×
NEW
194
        pool.close()
×
NEW
195
        pool.join()
×
196

NEW
197
    return all_file_paths
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc