• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 15141552897

20 May 2025 03:24PM UTC coverage: 16.448% (+1.4%) from 15.049%
15141552897

Pull #241

github

web-flow
Merge 12ef5216f into 730c6f31e
Pull Request #241: Testing Data and Distributed Testing

115 of 194 new or added lines in 2 files covered. (59.28%)

20 existing lines in 2 files now uncovered.

1029 of 6256 relevant lines covered (16.45%)

1.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/miranda/preprocess/eccc.py
1
"""Specialized conversion tools for Environment and Climate Change Canada / Meteorological Service of Canada data."""
2

3
from __future__ import annotations
×
4

5
import contextlib
×
6
import logging
×
7
import tempfile
×
8
from pathlib import Path
×
9
from typing import Callable
×
10

UNCOV
11
from dask.diagnostics import ProgressBar
×
12

13
from miranda.storage import file_size, report_file_size
×
14
from miranda.utils import generic_extract_archive
×
15

16
_data_folder = Path(__file__).parent / "configs"
×
17

18

19
def _run_func_on_archive_with_optional_dask(
×
20
    file: Path,
21
    function: Callable,
22
    errored_files: list[Path],
23
    **dask_kwargs,
24
) -> None:
25
    r"""
26
    Run a function on a file archive, extracting it if necessary.
27

28
    Parameters
29
    ----------
30
    file : Path
31
        File archive to process.
32
    function : Callable
33
        Function to run on the file.
34
    errored_files : list[Path]
35
        List of files that errored during processing.
36
    \*\*dask_kwargs : Any
37
        Keyword arguments to pass to dask.distributed.Client.
38

39
    Notes
40
    -----
41
    If the file is larger than 1 GiB or dask_kwargs are passed, dask.dataframes will be used.
42
    Partial function requires the function to accept the following parameters:
43
    - file: Path
44
    - using_dask: bool
45
    - client: dask.distributed.Client
46
    """
47
    with tempfile.TemporaryDirectory() as temp_folder:
×
48
        if file.suffix in [".gz", ".tar", ".zip", ".7z"]:
×
49
            data_files = generic_extract_archive(file, output_dir=temp_folder)
×
50
        else:
51
            data_files = [file]
×
52
        msg = f"Processing file: {file}."
×
53
        logging.info(msg)
×
54

55
        # 1 GiB
56
        size_limit = 2**30
×
57

58
        for data in data_files:
×
59
            size = file_size(data)
×
60
            if size > size_limit or dask_kwargs:
×
61
                if dask_kwargs:
×
62
                    logging.info("`dask_kwargs` provided - Using dask.dataframes.")
×
63
                elif size > size_limit:
×
64
                    msg = f"File exceeds {report_file_size(size_limit)} - Using dask.dataframes."
×
65
                    logging.info(msg)
×
66
                client = ProgressBar
×
67
                using_dask = True
×
68
            else:
69
                msg = f"File below {report_file_size(size_limit)} - Using pandas.dataframes."
×
70
                logging.info(msg)
×
71
                client = contextlib.nullcontext
×
72
                using_dask = False
×
73

74
            with client(**dask_kwargs) as c:
×
75
                try:
×
76
                    function(data, using_dask=using_dask, client=c)
×
77
                except FileNotFoundError:
×
78
                    errored_files.append(data)
×
79

80
        if Path(temp_folder).iterdir():
×
81
            for temporary_file in Path(temp_folder).glob("*"):
×
82
                if temporary_file in data_files:
×
83
                    temporary_file.unlink()
×
84

85

86
# def convert_flat_files(
87
#     source_files: str | os.PathLike,
88
#     output_folder: str | os.PathLike | list[str | int],
89
#     variables: str | int | list[str | int],
90
#     project: str = "eccc-obs",
91
#     mode: str = "hourly",
92
#     **dask_kwargs,
93
# ) -> None:
94
#     """
95
#
96
#     Parameters
97
#     ----------
98
#     source_files: str or Path
99
#     output_folder: str or Path
100
#     variables: str or List[str]
101
#     project: {"eccc-obs", "eccc-obs-summary", "eccc-homogenized"}
102
#     mode: {"hourly", "daily"}
103
#
104
#     Returns
105
#     -------
106
#     None
107
#     """
108
#
109
#     if isinstance(variables, (str, int)):
110
#         variables = [variables]
111
#
112
#     for variable_code in variables:
113
#         variable_code = str(variable_code).zfill(3)
114
#         metadata = load_json_data_mappings("eccc-obs").get(variable_code)
115
#
116
#
117
#
118
#         # Loop on the files
119
#         logging.info(
120
#             f"Collecting files for variable '{metadata['standard_name']}' "
121
#             f"(filenames containing '{metadata['_table_name']}')."
122
#         )
123
#         list_files = list()
124
#         if isinstance(source_files, list) or Path(source_files).is_file():
125
#             list_files.append(source_files)
126
#         else:
127
#             glob_patterns = [g for g in metadata["_table_name"]]
128
#             for pattern in glob_patterns:
129
#                 list_files.extend(
130
#                     [f for f in Path(source_files).rglob(f"{pattern}*") if f.is_file()]
131
#                 )
132
#
133
#
134
#
135
#
136
#         manager = mp.Manager()
137
#         errored_files = manager.list()
138
#         converter_func = partial(
139
#             _convert_station_file,
140
#             output_path=rep_nc,
141
#             errored_files=errored_files,
142
#             mode=mode,
143
#             variable_code=variable_code,
144
#             column_names=column_names,
145
#             column_dtypes=column_dtypes,
146
#             **metadata,
147
#         )
148
#         with mp.Pool(processes=n_workers) as pool:
149
#             pool.map(converter_func, list_files)
150
#             pool.close()
151
#             pool.join()
152
#
153
#
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc