• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2259086162

pending completion
2259086162

Pull #33

github

GitHub
Merge f00680dd3 into d076d8475
Pull Request #33: Support CORDEX and CMIP5/6

32 of 182 new or added lines in 15 files covered. (17.58%)

6 existing lines in 5 files now uncovered.

659 of 3232 relevant lines covered (20.39%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/miranda/structure/_structure.py
NEW
1
import hashlib
×
2
import logging.config
×
3
import multiprocessing
×
4
import os
×
5
import shutil
×
6
import sys
×
7
from functools import partial
×
8
from pathlib import Path
×
9
from types import GeneratorType
×
10
from typing import List, Mapping, Optional, Union
×
11

12
from miranda import Decoder
×
13
from miranda.decode import guess_project
×
14
from miranda.scripting import LOGGING_CONFIG
×
NEW
15
from miranda.utils import discover_data
×
16
from miranda.validators import GRIDDED_SCHEMA, SIMULATION_SCHEMA, STATION_OBS_SCHEMA
×
17

18
logging.config.dictConfig(LOGGING_CONFIG)
×
19

20
__all__ = [
×
21
    "build_path_from_schema",
22
    "structure_datasets",
23
]
24

25

NEW
26
def generate_version_hashes(in_file: Path, out_file: Path):
×
NEW
27
    hash_sha256_writer = hashlib.sha256()
×
NEW
28
    with open(in_file, "rb") as f:
×
NEW
29
        hash_sha256_writer.update(f.read())
×
NEW
30
    sha256sum = hash_sha256_writer.hexdigest()
×
31

NEW
32
    print(f"Writing sha256sum (ending: {sha256sum[-6:]}) to file: {out_file.name}")
×
NEW
33
    with open(out_file, "w") as f:
×
NEW
34
        f.write(sha256sum)
×
35

NEW
36
    del hash_sha256_writer
×
NEW
37
    del sha256sum
×
38

39

UNCOV
40
def _structure_datasets(
×
41
    in_file: Path, out_path: Path, method: str, dry_run: bool = False
42
):
43
    if method.lower() in ["move", "copy"]:
×
44
        meth = "Moved" if method.lower() == "move" else "Copied"
×
45
        output_file = out_path.joinpath(in_file.name)
×
46
        try:
×
47
            if not dry_run:
×
NEW
48
                method_mod = ""
×
NEW
49
                if in_file.is_dir() and method.lower() == "copy":
×
NEW
50
                    method_mod = "tree"
×
51

52
                if sys.version_info < (3, 9):
×
53
                    getattr(shutil, f"{method}{method_mod}")(
×
54
                        str(in_file), str(output_file)
55
                    )
56
                else:
57
                    getattr(shutil, f"{method}{method_mod}")(in_file, output_file)
×
58
            print(f"{meth} {in_file.name} to {output_file}.")
×
59
        except FileExistsError:
×
60
            print(f"{in_file.name} already exists at location. Continuing...")
×
61

62

63
def build_path_from_schema(
×
64
    facets: dict, output_folder: Union[str, os.PathLike]
65
) -> Path:
66
    """Build a filepath based on a valid data schema.
67

68
    Parameters
69
    ----------
70
    facets: dict
71
      Facets for a given dataset.
72
    output_folder
73
      Parent folder on which to extend the filetree structure.
74

75
    Returns
76
    -------
77
    Path
78
    """
79
    if facets["type"] == "station-obs":
×
80
        STATION_OBS_SCHEMA.validate(facets)
×
81
        folder_tree = (
×
82
            Path(output_folder)
83
            / facets["type"]
84
            / facets["project"]
85
            / facets["institution"]
86
            / facets["version"]  # This suggests "date_created"
87
            / facets["frequency"]
88
            / facets["variable"]
89
        )
90
        if hasattr(facets, "member"):
×
91
            return folder_tree / facets["member"]
×
92
        return folder_tree
×
93

94
    if facets["type"] in ["forecast", "gridded-obs", "reanalysis"]:
×
95
        GRIDDED_SCHEMA.validate(facets)
×
96
        return (
×
97
            Path(output_folder)
98
            / facets["type"]
99
            / facets["institution"]
100
            / facets["activity"]
101
            / facets["source"]
102
            / facets["project"]
103
            / facets["domain"]
104
            / facets["frequency"]
105
            / facets["variable"]
106
        )
107

108
    if facets["type"] == "simulation":
×
109
        SIMULATION_SCHEMA.validate(facets)
×
110
        if facets["processing_level"] == "raw":
×
111
            try:
×
112
                if facets["project"] == "CORDEX":
×
113
                    return (
×
114
                        Path(output_folder)
115
                        / facets["type"]
116
                        / facets["processing_level"]
117
                        / facets["activity"]
118
                        / facets["mip_era"]
119
                        / facets["project"]
120
                        / facets["domain"]
121
                        / facets["source"]
122
                        / facets["driving_model"]
123
                        / facets["experiment"]
124
                        / facets["member"]
125
                        / facets["frequency"]
126
                        / facets["variable"]
127
                    )
128
            except KeyError:
×
129
                return (
×
130
                    Path(output_folder)
131
                    / facets["type"]
132
                    / facets["processing_level"]
133
                    / facets["activity"]
134
                    / facets["mip_era"]
135
                    / facets["domain"]
136
                    / facets["institution"]
137
                    / facets["source"]
138
                    / facets["experiment"]
139
                    / facets["member"]
140
                    / facets["frequency"]
141
                    / facets["variable"]
142
                )
143
        elif (
×
144
            facets["processing_level"] == "biasadjusted"
145
        ):  # FIXME: remove or keep underline?
146
            return (
×
147
                Path(output_folder)
148
                / facets["type"]
149
                / facets["processing_level"]
150
                / facets["activity"]
151
                / facets["mip_era"]
152
                / facets["bias_adjust_institution"]
153
                / facets["bias_adjust_project"]
154
                / facets["domain"]
155
                / facets["institution"]
156
                / facets["source"]
157
                / facets["experiment"]
158
                / facets["member"]
159
                / facets["frequency"]
160
                / facets["variable"]
161
            )
162

163
    raise ValueError("No appropriate data schemas found.")
×
164

165

166
def structure_datasets(
×
167
    input_files: Union[str, os.PathLike, List[Union[str, os.PathLike]], GeneratorType],
168
    output_folder: Union[str, os.PathLike],
169
    *,
170
    project: Optional[str] = None,
171
    guess: bool = True,
172
    dry_run: bool = False,
173
    method: str = "copy",
174
    make_dirs: bool = False,
175
    set_version_hashes: bool = False,
176
    filename_pattern: str = "*.nc",
177
) -> Mapping[Path, Path]:
178
    """
179

180
    Parameters
181
    ----------
182
    input_files: str or Path or list of str or Path or GeneratorType
183
    output_folder: str or Path
184
    project: {"cordex", "cmip5", "cmip6", "isimip-ft", "reanalysis", "pcic-candcs-u6"}, optional
185
    guess: bool
186
      If project not supplied, suggest to decoder that project is the same for all input_files. Default: True.
187
    dry_run: bool
188
      Prints changes that would have been made without performing them. Default: False.
189
    method: {"move", "copy"}
190
      Method to transfer files to intended location. Default: "move".
191
    make_dirs:
192
      Make folder tree if it does not already exist. Default: False.
193
    set_version_hashes:
194
      Make an accompanying file with version in filename and sha256sum in contents. Default: False.
195
    filename_pattern: str
196
      If pattern ends with "zarr", will 'glob' with provided pattern.
197
      Otherwise, will perform an 'rglob' (recursive) operation.
198

199
    Returns
200
    -------
201
    dict
202
    """
NEW
203
    input_files = discover_data(input_files, filename_pattern)
×
204
    if not project and guess:
×
205
        # Examine the first file from a list or generator
206
        for f in input_files:
×
207
            project = guess_project(f)
×
208
            decoder = Decoder(project)
×
209
            decoder.decode(f)
×
210
            break
×
211
        else:
212
            raise FileNotFoundError()
×
213
        decoder.decode(input_files)
×
214
    else:
215
        decoder = Decoder(project)
×
216
        decoder.decode(input_files)
×
217

218
    all_file_paths = dict()
×
NEW
219
    version_hash_paths = dict()
×
220
    for file, facets in decoder.file_facets().items():
×
221
        output_filepath = build_path_from_schema(facets, output_folder)
×
222
        all_file_paths.update({Path(file): output_filepath})
×
223

NEW
224
        if set_version_hashes:
×
NEW
225
            version_hash_file = f"{Path(file).stem}.{facets['version']}"
×
NEW
226
            version_hash_paths.update(
×
227
                {Path(file): output_filepath.joinpath(version_hash_file)}
228
            )
229

230
    if make_dirs:
×
231
        for new_paths in set(all_file_paths.values()):
×
232
            Path(new_paths).mkdir(exist_ok=True, parents=True)
×
233

NEW
234
    if set_version_hashes:
×
NEW
235
        with multiprocessing.Pool() as pool:
×
NEW
236
            pool.starmap(
×
237
                generate_version_hashes,
238
                zip(version_hash_paths.keys(), version_hash_paths.values()),
239
            )
NEW
240
            pool.close()
×
NEW
241
            pool.join()
×
242

243
    # multiprocessing copy
NEW
244
    structure_func = partial(_structure_datasets, method=method, dry_run=dry_run)
×
245
    with multiprocessing.Pool() as pool:
×
NEW
246
        pool.starmap(
×
247
            structure_func, zip(all_file_paths.keys(), all_file_paths.values())
248
        )
249
        pool.close()
×
250
        pool.join()
×
251

252
    return all_file_paths
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc