• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2429425332

pending completion
2429425332

Pull #50

github

GitHub
Merge 6a007608c into 6d81d9443
Pull Request #50: revise structure to fit newest database definition

3 of 50 new or added lines in 7 files covered. (6.0%)

17 existing lines in 2 files now uncovered.

661 of 3301 relevant lines covered (20.02%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/miranda/structure/_structure.py
1
import hashlib
×
2
import logging.config
×
3
import multiprocessing
×
4
import os
×
5
import shutil
×
6
import sys
×
7
from functools import partial
×
8
from pathlib import Path
×
9
from types import GeneratorType
×
10
from typing import List, Mapping, Optional, Union
×
11

12
import schema
×
13

14
from miranda import Decoder
×
15
from miranda.decode import guess_activity
×
16
from miranda.scripting import LOGGING_CONFIG
×
17
from miranda.utils import discover_data
×
18
from miranda.validators import GRIDDED_SCHEMA, SIMULATION_SCHEMA, STATION_OBS_SCHEMA
×
19

20
logging.config.dictConfig(LOGGING_CONFIG)
×
21

22
__all__ = [
×
23
    "build_path_from_schema",
24
    "structure_datasets",
25
]
26

27

28
def generate_version_hashes(in_file: Path, out_file: Path):
×
29
    hash_sha256_writer = hashlib.sha256()
×
30
    with open(in_file, "rb") as f:
×
31
        hash_sha256_writer.update(f.read())
×
32
    sha256sum = hash_sha256_writer.hexdigest()
×
33

34
    print(f"Writing sha256sum (ending: {sha256sum[-6:]}) to file: {out_file.name}")
×
35
    with open(out_file, "w") as f:
×
36
        f.write(sha256sum)
×
37

38
    del hash_sha256_writer
×
39
    del sha256sum
×
40

41

42
def _structure_datasets(
×
43
    in_file: Path, out_path: Path, method: str, dry_run: bool = False
44
):
45
    if method.lower() in ["move", "copy"]:
×
46
        meth = "Moved" if method.lower() == "move" else "Copied"
×
47
        output_file = out_path.joinpath(in_file.name)
×
48
        try:
×
49
            if not dry_run:
×
50
                method_mod = ""
×
51
                if in_file.is_dir() and method.lower() == "copy":
×
52
                    method_mod = "tree"
×
53

54
                if sys.version_info < (3, 9):
×
55
                    getattr(shutil, f"{method}{method_mod}")(
×
56
                        str(in_file), str(output_file)
57
                    )
58
                else:
59
                    getattr(shutil, f"{method}{method_mod}")(in_file, output_file)
×
60
            print(f"{meth} {in_file.name} to {output_file}.")
×
61
        except FileExistsError:
×
62
            print(f"{in_file.name} already exists at location. Continuing...")
×
63

64

65
def build_path_from_schema(
×
66
    facets: dict, output_folder: Union[str, os.PathLike]
67
) -> Optional[Path]:
68
    """Build a filepath based on a valid data schema.
69

70
    Parameters
71
    ----------
72
    facets: dict
73
      Facets for a given dataset.
74
    output_folder
75
      Parent folder on which to extend the filetree structure.
76

77
    Returns
78
    -------
79
    Path or None
80
    """
NEW
81
    folder_tree_structure = None
×
82
    try:
×
83
        if facets["type"] == "station-obs":
×
84
            STATION_OBS_SCHEMA.validate(facets)
×
NEW
85
            folder_tree_structure = (
×
86
                "type",
87
                "institution",
88
                "source",
89
                "version",  # This suggests "date_created"
90
                "frequency",
91
                "member"
92
                if hasattr(facets, "member")
93
                else None,  # This suggests station code
94
                "variable",
95
            )
96

NEW
97
        if facets["type"] in ["forecast", "gridded-obs", "reconstruction"]:
×
98
            GRIDDED_SCHEMA.validate(facets)
×
NEW
99
            folder_tree_structure = (
×
100
                "type",
101
                "institution",
102
                "source",
103
                "domain",
104
                "frequency",
105
                "variable",
106
            )
107

108
        if facets["type"] == "simulation":
×
109
            SIMULATION_SCHEMA.validate(facets)
×
NEW
110
            folder_tree_structure = (
×
111
                "type",
112
                "processing_level",
113
                "mip_era",
114
                "activity"
115
                if facets["processing_level"] == "raw"
116
                else "bias_adjust_project",
117
                "domain",
118
                "institution",
119
                "source",
120
                "driving_model" if facets["activity"] == "CORDEX" else None,
121
                "experiment",
122
                "member",
123
                "frequency",
124
                "variable",
125
            )
126

NEW
127
        if folder_tree_structure:
×
NEW
128
            facet_tree = list()
×
NEW
129
            for facet in folder_tree_structure:
×
NEW
130
                if facet:
×
NEW
131
                    facet_tree.append(facets[facet])
×
NEW
132
            return Path(output_folder).joinpath("/".join(facet_tree))
×
133
        else:
NEW
134
            raise ValueError()
×
135

136
    except schema.SchemaError:
×
137
        logging.error(f"Validation issues found for file matching schema: {facets}")
×
138
    except ValueError:
×
139
        logging.error(
×
140
            f"No appropriate data schemas found for file matching schema: {facets}"
141
        )
142
    return
×
143

144

145
def structure_datasets(
×
146
    input_files: Union[str, os.PathLike, List[Union[str, os.PathLike]], GeneratorType],
147
    output_folder: Union[str, os.PathLike],
148
    *,
149
    activity: Optional[str] = None,
150
    guess: bool = True,
151
    dry_run: bool = False,
152
    method: str = "copy",
153
    make_dirs: bool = False,
154
    set_version_hashes: bool = False,
155
    filename_pattern: str = "*.nc",
156
) -> Mapping[Path, Path]:
157
    """
158

159
    Parameters
160
    ----------
161
    input_files: str or Path or list of str or Path or GeneratorType
162
    output_folder: str or Path
163
    activity: {"cordex", "cmip5", "cmip6", "isimip-ft", "reanalysis", "pcic-candcs-u6"}, optional
164
    guess: bool
165
      If activity not supplied, suggest to decoder that activity is the same for all input_files. Default: True.
166
    dry_run: bool
167
      Prints changes that would have been made without performing them. Default: False.
168
    method: {"move", "copy"}
169
      Method to transfer files to intended location. Default: "move".
170
    make_dirs:
171
      Make folder tree if it does not already exist. Default: False.
172
    set_version_hashes:
173
      Make an accompanying file with version in filename and sha256sum in contents. Default: False.
174
    filename_pattern: str
175
      If pattern ends with "zarr", will 'glob' with provided pattern.
176
      Otherwise, will perform an 'rglob' (recursive) operation.
177

178
    Returns
179
    -------
180
    dict
181
    """
182
    input_files = discover_data(input_files, filename_pattern)
×
183
    if not activity and guess:
×
184
        # Examine the first file from a list or generator
185
        for f in input_files:
×
186
            activity = guess_activity(f)
×
187
            decoder = Decoder(activity)
×
188
            decoder.decode(f)
×
189
            break
×
190
        else:
191
            raise FileNotFoundError()
×
192
        decoder.decode(input_files)
×
193
    else:
194
        decoder = Decoder(activity)
×
195
        decoder.decode(input_files)
×
196

197
    all_file_paths = dict()
×
198
    version_hash_paths = dict()
×
199
    errored_files = list()
×
200
    for file, facets in decoder.file_facets().items():
×
201
        output_filepath = build_path_from_schema(facets, output_folder)
×
202
        if isinstance(output_filepath, Path):
×
203
            all_file_paths.update({Path(file): output_filepath})
×
204
        else:
205
            errored_files.append(Path(file).name)
×
206
            continue
×
207

208
        if set_version_hashes:
×
209
            version_hash_file = f"{Path(file).stem}.{facets['version']}"
×
210
            version_hash_paths.update(
×
211
                {Path(file): output_filepath.joinpath(version_hash_file)}
212
            )
213

214
    if errored_files:
×
215
        logging.warning(
×
216
            f"Some files were unable to be structured: [{', '.join(errored_files)}]"
217
        )
218

219
    if make_dirs:
×
220
        for new_paths in set(all_file_paths.values()):
×
221
            Path(new_paths).mkdir(exist_ok=True, parents=True)
×
222

223
    if set_version_hashes:
×
224
        with multiprocessing.Pool() as pool:
×
225
            pool.starmap(
×
226
                generate_version_hashes,
227
                zip(version_hash_paths.keys(), version_hash_paths.values()),
228
            )
229
            pool.close()
×
230
            pool.join()
×
231

232
    # multiprocessing copy
233
    structure_func = partial(_structure_datasets, method=method, dry_run=dry_run)
×
234
    with multiprocessing.Pool() as pool:
×
235
        pool.starmap(
×
236
            structure_func, zip(all_file_paths.keys(), all_file_paths.values())
237
        )
238
        pool.close()
×
239
        pool.join()
×
240

241
    return all_file_paths
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc