• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2259263120

pending completion
2259263120

Pull #33

github

GitHub
Merge ba6f81527 into d076d8475
Pull Request #33: Support CORDEX and CMIP5/6

32 of 214 new or added lines in 15 files covered. (14.95%)

7 existing lines in 5 files now uncovered.

659 of 3245 relevant lines covered (20.31%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/miranda/structure/_structure.py
NEW
1
import hashlib
×
2
import logging.config
×
3
import multiprocessing
×
4
import os
×
5
import shutil
×
6
import sys
×
7
from functools import partial
×
8
from pathlib import Path
×
9
from types import GeneratorType
×
10
from typing import List, Mapping, Optional, Union
×
11

NEW
12
import schema
×
13

14
from miranda import Decoder
×
15
from miranda.decode import guess_project
×
16
from miranda.scripting import LOGGING_CONFIG
×
NEW
17
from miranda.utils import discover_data
×
18
from miranda.validators import GRIDDED_SCHEMA, SIMULATION_SCHEMA, STATION_OBS_SCHEMA
×
19

20
logging.config.dictConfig(LOGGING_CONFIG)
×
21

22
__all__ = [
×
23
    "build_path_from_schema",
24
    "structure_datasets",
25
]
26

27

NEW
28
def generate_version_hashes(in_file: Path, out_file: Path):
×
NEW
29
    hash_sha256_writer = hashlib.sha256()
×
NEW
30
    with open(in_file, "rb") as f:
×
NEW
31
        hash_sha256_writer.update(f.read())
×
NEW
32
    sha256sum = hash_sha256_writer.hexdigest()
×
33

NEW
34
    print(f"Writing sha256sum (ending: {sha256sum[-6:]}) to file: {out_file.name}")
×
NEW
35
    with open(out_file, "w") as f:
×
NEW
36
        f.write(sha256sum)
×
37

NEW
38
    del hash_sha256_writer
×
NEW
39
    del sha256sum
×
40

41

UNCOV
42
def _structure_datasets(
×
43
    in_file: Path, out_path: Path, method: str, dry_run: bool = False
44
):
45
    if method.lower() in ["move", "copy"]:
×
46
        meth = "Moved" if method.lower() == "move" else "Copied"
×
47
        output_file = out_path.joinpath(in_file.name)
×
48
        try:
×
49
            if not dry_run:
×
NEW
50
                method_mod = ""
×
NEW
51
                if in_file.is_dir() and method.lower() == "copy":
×
NEW
52
                    method_mod = "tree"
×
53

54
                if sys.version_info < (3, 9):
×
55
                    getattr(shutil, f"{method}{method_mod}")(
×
56
                        str(in_file), str(output_file)
57
                    )
58
                else:
59
                    getattr(shutil, f"{method}{method_mod}")(in_file, output_file)
×
60
            print(f"{meth} {in_file.name} to {output_file}.")
×
61
        except FileExistsError:
×
62
            print(f"{in_file.name} already exists at location. Continuing...")
×
63

64

65
def build_path_from_schema(
×
66
    facets: dict, output_folder: Union[str, os.PathLike]
67
) -> Optional[Path]:
68
    """Build a filepath based on a valid data schema.
69

70
    Parameters
71
    ----------
72
    facets: dict
73
      Facets for a given dataset.
74
    output_folder
75
      Parent folder on which to extend the filetree structure.
76

77
    Returns
78
    -------
79
    Path or None
80
    """
NEW
81
    try:
×
NEW
82
        if facets["type"] == "station-obs":
×
NEW
83
            STATION_OBS_SCHEMA.validate(facets)
×
NEW
84
            folder_tree = (
×
85
                Path(output_folder)
86
                / facets["type"]
87
                / facets["project"]
88
                / facets["institution"]
89
                / facets["version"]  # This suggests "date_created"
90
                / facets["frequency"]
91
                / facets["variable"]
92
            )
NEW
93
            if hasattr(facets, "member"):
×
NEW
94
                return folder_tree / facets["member"]
×
NEW
95
            return folder_tree
×
96

NEW
97
        if facets["type"] in ["forecast", "gridded-obs", "reanalysis"]:
×
NEW
98
            GRIDDED_SCHEMA.validate(facets)
×
NEW
99
            return (
×
100
                Path(output_folder)
101
                / facets["type"]
102
                / facets["institution"]
103
                / facets["activity"]
104
                / facets["source"]
105
                / facets["project"]
106
                / facets["domain"]
107
                / facets["frequency"]
108
                / facets["variable"]
109
            )
110

NEW
111
        if facets["type"] == "simulation":
×
NEW
112
            SIMULATION_SCHEMA.validate(facets)
×
NEW
113
            if facets["processing_level"] == "raw":
×
NEW
114
                try:
×
NEW
115
                    if facets["project"] == "CORDEX":
×
NEW
116
                        return (
×
117
                            Path(output_folder)
118
                            / facets["type"]
119
                            / facets["processing_level"]
120
                            / facets["activity"]
121
                            / facets["mip_era"]
122
                            / facets["project"]
123
                            / facets["domain"]
124
                            / facets["source"]
125
                            / facets["driving_model"]
126
                            / facets["experiment"]
127
                            / facets["member"]
128
                            / facets["frequency"]
129
                            / facets["variable"]
130
                        )
NEW
131
                except KeyError:
×
UNCOV
132
                    return (
×
133
                        Path(output_folder)
134
                        / facets["type"]
135
                        / facets["processing_level"]
136
                        / facets["activity"]
137
                        / facets["mip_era"]
138
                        / facets["domain"]
139
                        / facets["institution"]
140
                        / facets["source"]
141
                        / facets["experiment"]
142
                        / facets["member"]
143
                        / facets["frequency"]
144
                        / facets["variable"]
145
                    )
NEW
146
            elif facets["processing_level"] == "biasadjusted":
×
147
                return (
×
148
                    Path(output_folder)
149
                    / facets["type"]
150
                    / facets["processing_level"]
151
                    / facets["activity"]
152
                    / facets["mip_era"]
153
                    / facets["bias_adjust_institution"]
154
                    / facets["bias_adjust_project"]
155
                    / facets["domain"]
156
                    / facets["institution"]
157
                    / facets["source"]
158
                    / facets["experiment"]
159
                    / facets["member"]
160
                    / facets["frequency"]
161
                    / facets["variable"]
162
                )
NEW
163
        raise ValueError()
×
164

NEW
165
    except schema.SchemaError:
×
NEW
166
        logging.error(f"Validation issues found for file matching schema: {facets}")
×
NEW
167
    except ValueError:
×
NEW
168
        logging.error(
×
169
            f"No appropriate data schemas found for file matching schema: {facets}"
170
        )
NEW
171
    return
×
172

173

174
def structure_datasets(
×
175
    input_files: Union[str, os.PathLike, List[Union[str, os.PathLike]], GeneratorType],
176
    output_folder: Union[str, os.PathLike],
177
    *,
178
    project: Optional[str] = None,
179
    guess: bool = True,
180
    dry_run: bool = False,
181
    method: str = "copy",
182
    make_dirs: bool = False,
183
    set_version_hashes: bool = False,
184
    filename_pattern: str = "*.nc",
185
) -> Mapping[Path, Path]:
186
    """
187

188
    Parameters
189
    ----------
190
    input_files: str or Path or list of str or Path or GeneratorType
191
    output_folder: str or Path
192
    project: {"cordex", "cmip5", "cmip6", "isimip-ft", "reanalysis", "pcic-candcs-u6"}, optional
193
    guess: bool
194
      If project not supplied, suggest to decoder that project is the same for all input_files. Default: True.
195
    dry_run: bool
196
      Prints changes that would have been made without performing them. Default: False.
197
    method: {"move", "copy"}
198
      Method to transfer files to intended location. Default: "move".
199
    make_dirs:
200
      Make folder tree if it does not already exist. Default: False.
201
    set_version_hashes:
202
      Make an accompanying file with version in filename and sha256sum in contents. Default: False.
203
    filename_pattern: str
204
      If pattern ends with "zarr", will 'glob' with provided pattern.
205
      Otherwise, will perform an 'rglob' (recursive) operation.
206

207
    Returns
208
    -------
209
    dict
210
    """
NEW
211
    input_files = discover_data(input_files, filename_pattern)
×
212
    if not project and guess:
×
213
        # Examine the first file from a list or generator
214
        for f in input_files:
×
215
            project = guess_project(f)
×
216
            decoder = Decoder(project)
×
217
            decoder.decode(f)
×
218
            break
×
219
        else:
220
            raise FileNotFoundError()
×
221
        decoder.decode(input_files)
×
222
    else:
223
        decoder = Decoder(project)
×
224
        decoder.decode(input_files)
×
225

226
    all_file_paths = dict()
×
NEW
227
    version_hash_paths = dict()
×
NEW
228
    errored_files = list()
×
229
    for file, facets in decoder.file_facets().items():
×
230
        output_filepath = build_path_from_schema(facets, output_folder)
×
NEW
231
        if isinstance(output_filepath, Path):
×
NEW
232
            all_file_paths.update({Path(file): output_filepath})
×
233
        else:
NEW
234
            errored_files.append(Path(file).name)
×
NEW
235
            continue
×
236

NEW
237
        if set_version_hashes:
×
NEW
238
            version_hash_file = f"{Path(file).stem}.{facets['version']}"
×
NEW
239
            version_hash_paths.update(
×
240
                {Path(file): output_filepath.joinpath(version_hash_file)}
241
            )
242

NEW
243
    if errored_files:
×
NEW
244
        logging.warning(
×
245
            f"Some files were unable to be structured: [{', '.join(errored_files)}]"
246
        )
247

248
    if make_dirs:
×
249
        for new_paths in set(all_file_paths.values()):
×
250
            Path(new_paths).mkdir(exist_ok=True, parents=True)
×
251

NEW
252
    if set_version_hashes:
×
NEW
253
        with multiprocessing.Pool() as pool:
×
NEW
254
            pool.starmap(
×
255
                generate_version_hashes,
256
                zip(version_hash_paths.keys(), version_hash_paths.values()),
257
            )
NEW
258
            pool.close()
×
NEW
259
            pool.join()
×
260

261
    # multiprocessing copy
NEW
262
    structure_func = partial(_structure_datasets, method=method, dry_run=dry_run)
×
263
    with multiprocessing.Pool() as pool:
×
NEW
264
        pool.starmap(
×
265
            structure_func, zip(all_file_paths.keys(), all_file_paths.values())
266
        )
267
        pool.close()
×
268
        pool.join()
×
269

270
    return all_file_paths
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc