• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 11667553268

04 Nov 2024 03:38PM UTC coverage: 18.382%. Remained the same
11667553268

Pull #201

github

web-flow
Merge ce5e6b9a0 into f270ec6c2
Pull Request #201: Bump tox-gh from 1.3.2 to 1.4.1

920 of 5005 relevant lines covered (18.38%)

1.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.69
/src/miranda/structure/_structure.py
1
from __future__ import annotations
8✔
2

3
import hashlib
8✔
4
import logging.config
8✔
5
import multiprocessing
8✔
6
import os
8✔
7
import shutil
8✔
8
from functools import partial
8✔
9
from pathlib import Path
8✔
10
from types import GeneratorType
8✔
11

12
import yaml
8✔
13
from schema import SchemaError
8✔
14

15
from miranda.cv import VALIDATION_ENABLED
8✔
16
from miranda.decode import Decoder, DecoderError, guess_project
8✔
17
from miranda.io import discover_data
8✔
18
from miranda.scripting import LOGGING_CONFIG
8✔
19

20
if VALIDATION_ENABLED:
8✔
21
    from miranda.validators import validation_schemas
4✔
22

23
logging.config.dictConfig(LOGGING_CONFIG)
8✔
24

25
__all__ = [
8✔
26
    "build_path_from_schema",
27
    "create_version_hash_files",
28
    "structure_datasets",
29
]
30

31

32
def _verify(hash_value: str, hash_file: os.PathLike) -> None:
8✔
33
    try:
×
34
        with Path(hash_file).open("r") as f:
×
35
            found_sha256sum = f.read()
×
36

37
        if hash_value != found_sha256sum:
×
38
            raise ValueError()
×
39
    except ValueError:
×
40
        msg = (
×
41
            f"Found sha256sum (starting: {found_sha256sum[:6]}) "
42
            f"does not match current value (starting: {hash_value[:6]}) "
43
            f"for file `{Path(hash_file).name}."
44
        )
45
        logging.error(msg)
×
46

47

48
def generate_hash_file(
8✔
49
    in_file: os.PathLike, out_file: os.PathLike, verify: bool = False
50
) -> None:
51
    if not Path(out_file).exists():
×
52
        hash_sha256_writer = hashlib.sha256()
×
53
        with Path(in_file).open("rb") as f:
×
54
            hash_sha256_writer.update(f.read())
×
55
        sha256sum = hash_sha256_writer.hexdigest()
×
56

57
        print(
×
58
            f"Writing sha256sum (starting: {sha256sum[:6]}) to file: {Path(out_file).name}"
59
        )
60
        try:
×
61
            with Path(out_file).open("w") as f:
×
62
                f.write(sha256sum)
×
63
        except PermissionError:
×
64
            logging.error("Unable to write file. Ensure access privileges.")
×
65

66
        del hash_sha256_writer
×
67
        del sha256sum
×
68
    elif verify:
×
69
        hash_sha256_writer = hashlib.sha256()
×
70
        with Path(in_file).open("rb") as f:
×
71
            hash_sha256_writer.update(f.read())
×
72
        calculated_sha256sum = hash_sha256_writer.hexdigest()
×
73

74
        _verify(calculated_sha256sum, out_file)
×
75

76
    else:
77
        print(f"Writing sha256sum file `{Path(out_file).name}` exists. Continuing...")
×
78

79

80
def generate_hash_metadata(
8✔
81
    in_file: os.PathLike,
82
    version: str | None = None,
83
    hash_file: os.PathLike | None = None,
84
    verify: bool = False,
85
) -> dict[str, list[str]]:
86
    hashversion = dict()
×
87

88
    if version is None:
×
89
        version = "vNotFound"
×
90

91
    if not Path(hash_file).exists():
×
92
        hash_sha256_writer = hashlib.sha256()
×
93
        with Path(in_file).open("rb") as f:
×
94
            hash_sha256_writer.update(f.read())
×
95
        sha256sum = hash_sha256_writer.hexdigest()
×
96

97
        print(f"Calculated sha256sum (starting: {sha256sum[:6]})")
×
98

99
        hashversion[Path(in_file).name] = [version, sha256sum]
×
100
        del hash_sha256_writer
×
101

102
    else:
103
        hash_sha256_writer = hashlib.sha256()
×
104
        with Path(in_file).open("rb") as f:
×
105
            hash_sha256_writer.update(f.read())
×
106
        calculated_sha256sum = hash_sha256_writer.hexdigest()
×
107

108
        if verify:
×
109
            _verify(calculated_sha256sum, hash_file)
×
110

111
        hashversion[Path(in_file).name] = [version, calculated_sha256sum]
×
112

113
    return hashversion
×
114

115

116
def create_version_hash_files(
8✔
117
    input_files: (
118
        str | os.PathLike | list[str | os.PathLike] | GeneratorType | None
119
    ) = None,
120
    facet_dict: dict | None = None,
121
    verify_hash: bool = False,
122
) -> None:
123
    """Create version hashes based on files or a facets dictionary.
124

125
    Parameters
126
    ----------
127
    input_files : str, os.PathLike, list of str or os.PathLike, or GeneratorType
128
    facet_dict : dict, optional
129
    verify_hash : bool
130

131
    Returns
132
    -------
133
    None
134
    """
135
    if not facet_dict and not input_files:
×
136
        raise ValueError("Facets dictionary or sequence of filepaths required.")
×
137

138
    if input_files:
×
139
        if isinstance(input_files, os.PathLike):
×
140
            input_files = [input_files]
×
141
        for f in input_files:
×
142
            project = guess_project(f)
×
143
            decoder = Decoder(project)
×
144
            decoder.decode(f)
×
145
            break
×
146
        else:
147
            raise FileNotFoundError()
×
148
        decoder.decode(input_files)
×
149
        facet_dict = decoder.file_facets()
×
150

151
    version_hash_paths = dict()
×
152
    for file, facets in facet_dict.items():
×
153
        version_hash_file = f"{Path(file).stem}.{facets['version']}"
×
154
        version_hash_paths.update(
×
155
            {Path(file): Path(file).parent.joinpath(version_hash_file)}
156
        )
157

158
    hash_func = partial(generate_hash_file, verify=verify_hash)
×
159
    with multiprocessing.Pool() as pool:
×
160
        pool.starmap(
×
161
            hash_func,
162
            zip(version_hash_paths.keys(), version_hash_paths.values()),
163
        )
164
        pool.close()
×
165
        pool.join()
×
166

167

168
def _structure_datasets(
8✔
169
    in_file: str | os.PathLike,
170
    out_path: str | os.PathLike,
171
    method: str,
172
    dry_run: bool = False,
173
):
174
    if isinstance(in_file, str):
×
175
        in_file = Path(in_file)
×
176

177
    if method.lower() in ["move", "copy"]:
×
178
        meth = "Moved" if method.lower() == "move" else "Copied"
×
179
        output_file = Path(out_path).joinpath(in_file.name)
×
180
        try:
×
181
            if not dry_run:
×
182
                method_mod = ""
×
183
                if in_file.is_dir() and method.lower() == "copy":
×
184
                    method_mod = "tree"
×
185

186
                getattr(shutil, f"{method}{method_mod}")(in_file, output_file)
×
187
            print(f"{meth} {in_file.name} to {output_file}.")
×
188
        except FileExistsError:
×
189
            print(f"{in_file.name} already exists at location. Continuing...")
×
190

191

192
def parse_schema(
8✔
193
    facets: dict, schema: str | os.PathLike | dict, top_folder: str = "datasets"
194
) -> list:
195
    """Parse the schema from a YAML schema configuration and construct path using a dictionary of facets.
196

197
    Parameters
198
    ----------
199
    facets : dict
200
    schema : str or os.PathLike or dict
201
    top_folder : str
202

203
    Returns
204
    -------
205
    list
206
    """
207

208
    def _parse_top_level(schematic: dict, facet_dict: dict, top: str):
×
209
        try:
×
210
            parent = schematic[top]
×
211
        except KeyError:
×
212
            logging.error("Schema is not a valid facet-tree reference.")
×
213
            raise
×
214

215
        for i, options in enumerate(parent):
×
216
            if {"option", "structure", "value"}.issubset(options.keys()):
×
217
                option = options["option"]
×
218
                value = options["value"]
×
219

220
                if option in facet_dict.keys():
×
221
                    if facet_dict[option] == value:
×
222
                        return {"branch": value, "structure": options["structure"]}
×
223
                    continue
×
224
        raise ValueError("Couldn\nt parse top level.")
×
225

226
    def _parse_structure(branch_dict: dict, facet_dict: dict) -> list:
×
227
        structure = branch_dict.get("structure")
×
228
        folder_tree = list()
×
229

230
        for level in structure:
×
231
            if isinstance(level, str):
×
232
                folder_tree.append(level)
×
233
                continue
×
234
            elif isinstance(level, dict):
×
235
                if {"option", "is_true"}.issubset(level.keys()):
×
236
                    option = level["option"]
×
237

238
                    if option not in facet_dict and "value" in level:
×
239
                        raise ValueError(
×
240
                            f"Necessary facet not found for schema: `{option}`."
241
                        )
242

243
                    is_true = level.get("is_true")
×
244
                    else_value = level.get("else")
×
245
                    facet = facet_dict.get(option)
×
246

247
                    if "value" not in level:
×
248
                        # The absence of "value" means that "is_true / else" refer to the presence or not of "option" in the facets
249
                        # We also treat falsy values (empty string, None) as the absence of "option" from the facets
250
                        if not bool(facet) and else_value:
×
251
                            folder_tree.append(else_value)
×
252
                        elif bool(facet):
×
253
                            folder_tree.append(is_true)
×
254
                        else:
255
                            # "option" absent from the facets and no "else": skip.
256
                            pass
×
257
                    else:
258
                        value = level["value"]
×
259
                        if facet_dict[option] == value:
×
260
                            folder_tree.append(is_true)
×
261
                        elif else_value:
×
262
                            folder_tree.append(else_value)
×
263
                        else:
264
                            # "option" not equal to "value", but no "else" : skip.
265
                            pass
×
266
            else:
267
                raise ValueError("Supplied schema is invalid.")
×
268
        return folder_tree
×
269

270
    if isinstance(schema, (str, os.PathLike)):
×
271
        with Path(schema).open() as f:
×
272
            schema = yaml.safe_load(f.read())
×
273

274
    branch = _parse_top_level(schema, facets, top_folder)
×
275
    tree = list()  # noqa
×
276
    tree.extend(_parse_structure(branch, facets))
×
277

278
    return tree
×
279

280

281
def build_path_from_schema(
8✔
282
    facets: dict,
283
    output_folder: str | os.PathLike,
284
    schema: str | os.PathLike | dict | None = None,
285
    top_folder: str = "datasets",
286
    validate: bool = True,
287
) -> Path | None:
288
    """Build a filepath based on a valid data schema.
289

290
    Parameters
291
    ----------
292
    facets : dict
293
        Facets for a given dataset.
294
    output_folder : str or os.PathLike
295
        Parent folder on which to extend the filetree structure.
296
    schema : str or os.PathLike, optional
297
        Path to YAML schematic of database structure. If None, will use Ouranos schema.
298
    top_folder : str
299
        Top-level of supplied schema, used for validation purposes. Default: "datasets".
300
    validate: bool
301
        Run facets-validation checks over given file. Default: True.
302

303
    Returns
304
    -------
305
    Path or None
306
    """
307
    if schema is None:
×
308
        schema = Path(__file__).parent.joinpath("data").joinpath("ouranos_schema.yml")
×
309

310
    tree = parse_schema(facets, schema, top_folder)
×
311
    branch = tree[0]
×
312

313
    if validate and VALIDATION_ENABLED:
×
314
        if facets[branch] in validation_schemas.keys():
×
315
            try:
×
316
                validation_schemas[facets[branch]].validate(facets)
×
317
            except SchemaError as e:
×
318
                msg = f"Validation issues found for file matching schema: {facets}: {e}"
×
319
                logging.error(msg)
×
320
                return
×
321
        elif facets[branch] not in validation_schemas.keys():
×
322
            msg = (
×
323
                f"No appropriate data schemas found for file matching schema: {facets}",
324
            )
325
            logging.error(
×
326
                msg,
327
                DecoderError,
328
            )
329
            return
×
330
    elif validate and not VALIDATION_ENABLED:
×
331
        logging.warning(
×
332
            "Facets validation requires pyessv-archive source files. Skipping validation checks."
333
        )
334

335
    # Remove spaces in folder paths
336
    file_location = [str(facets[facet]).replace(" ", "-") for facet in tree]
×
337
    return Path(output_folder).joinpath("/".join(file_location))
×
338

339

340
def structure_datasets(
8✔
341
    input_files: str | os.PathLike | list[str | os.PathLike] | GeneratorType,
342
    output_folder: str | os.PathLike,
343
    *,
344
    project: str | None = None,
345
    guess: bool = True,
346
    dry_run: bool = False,
347
    method: str = "copy",
348
    make_dirs: bool = False,
349
    set_version_hashes: bool = False,
350
    verify_hashes: bool = False,
351
    suffix: str = "nc",
352
) -> dict[Path, Path]:
353
    """Structure datasets.
354

355
    Parameters
356
    ----------
357
    input_files : str, Path, list of str or Path, or GeneratorType
358
        Files to be sorted.
359
    output_folder : str or Path
360
        The desired location for the folder-tree.
361
    project : {"cordex", "cmip5", "cmip6", "isimip-ft", "pcic-candcs-u6", "converted"}, optional
362
        Project used to parse the facets of all supplied datasets.
363
        If not supplied, will attempt parsing with all available data categories for each file (slow)
364
        unless `guess` is True.
365
    guess : bool
366
        If project not supplied, suggest to decoder that activity is the same for all input_files. Default: True.
367
    dry_run : bool
368
        Prints changes that would have been made without performing them. Default: False.
369
    method : {"move", "copy"}
370
        Method to transfer files to intended location. Default: "move".
371
    make_dirs : bool
372
        Make folder tree if it does not already exist. Default: False.
373
    set_version_hashes : bool
374
        Make an accompanying file with version in filename and sha256sum in contents. Default: False.
375
    verify_hashes : bool
376
        Ensure that any existing she256sum files correspond with companion file. Raise on error. Default: False.
377
    suffix : {"nc", "zarr"}
378
        If "zarr", will perform a 'glob' with provided pattern.
379
        Otherwise, will perform an 'rglob' (recursive) operation.
380

381
    Returns
382
    -------
383
    dict[Path, Path]
384
    """
385
    input_files = discover_data(input_files, suffix)
×
386
    if guess and project is None:
×
387
        # Examine the first file from a list or generator
388
        for f in input_files:
×
389
            project = guess_project(f)
×
390
            decoder = Decoder(project)
×
391
            decoder.decode(f)
×
392
            break
×
393
        else:
394
            raise FileNotFoundError()
×
395
        decoder.decode(input_files)
×
396
    else:
397
        decoder = Decoder(project)
×
398
        decoder.decode(input_files)
×
399

400
    all_file_paths = dict()
×
401
    existing_hashes = dict()
×
402
    version_hash_paths = dict()
×
403
    errored_files = list()
×
404
    for file, facets in decoder.file_facets().items():
×
405
        output_filepath = build_path_from_schema(facets, output_folder)
×
406
        if isinstance(output_filepath, Path):
×
407
            all_file_paths.update({Path(file): output_filepath})
×
408
        else:
409
            errored_files.append(Path(file).name)
×
410
            continue
×
411

412
        if set_version_hashes:
×
413
            version_hash_file = f"{Path(file).stem}.{facets['version']}"
×
414
            if Path(file).parent.joinpath(version_hash_file).exists():
×
415
                existing_hashes.update(
×
416
                    {
417
                        Path(file).parent.joinpath(
418
                            version_hash_file
419
                        ): output_filepath.joinpath(version_hash_file)
420
                    }
421
                )
422
            else:
423
                version_hash_paths.update(
×
424
                    {Path(file): output_filepath.joinpath(version_hash_file)}
425
                )
426

427
    if errored_files:
×
428
        if len(errored_files) < 10:
×
429
            msg = (
×
430
                f"Some files were unable to be structured: [{', '.join(errored_files)}]"
431
            )
432
            logging.warning(msg)
×
433
        else:
434
            msg = f"Many files were unable to be structured (n={len(errored_files)})"
×
435
            logging.warning(msg)
×
436

437
    if make_dirs:
×
438
        for new_paths in set(all_file_paths.values()):
×
439
            Path(new_paths).mkdir(exist_ok=True, parents=True)
×
440

441
    if set_version_hashes:
×
442
        hash_func = partial(generate_hash_file, verify=verify_hashes)
×
443
        with multiprocessing.Pool() as pool:
×
444
            if existing_hashes:
×
445
                print(
×
446
                    f"Sha256sum signatures exist for {len(existing_hashes)} files. "
447
                    f"Transferring them via `{method}` method."
448
                )
449
                pool.starmap(
×
450
                    getattr(shutil, method),
451
                    zip(existing_hashes.keys(), existing_hashes.values()),
452
                )
453
            pool.starmap(
×
454
                hash_func,
455
                zip(version_hash_paths.keys(), version_hash_paths.values()),
456
            )
457
            pool.close()
×
458
            pool.join()
×
459

460
    # multiprocessing copy
461
    structure_func = partial(_structure_datasets, method=method, dry_run=dry_run)
×
462
    with multiprocessing.Pool() as pool:
×
463
        pool.starmap(
×
464
            structure_func, zip(all_file_paths.keys(), all_file_paths.values())
465
        )
466
        pool.close()
×
467
        pool.join()
×
468

469
    return all_file_paths
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc