• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2163561290

pending completion
2163561290

Pull #24

github

GitHub
Merge 6afb6735a into 9ac032fc5
Pull Request #24: Add CMIP file structure, use pyessv controlled vocabularies, and major refactoring

242 of 1114 new or added lines in 35 files covered. (21.72%)

12 existing lines in 4 files now uncovered.

735 of 3252 relevant lines covered (22.6%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.45
/miranda/utils.py
1
import logging.config
3✔
2
import os
3✔
3
import sys
3✔
4
from contextlib import contextmanager
3✔
5
from datetime import date
3✔
6
from pathlib import Path
3✔
7
from types import GeneratorType
3✔
8
from typing import Dict, Iterable, List, Optional, Sequence, Union
3✔
9

10
from . import scripting
3✔
11

12
logging.config.dictConfig(scripting.LOGGING_CONFIG)
3✔
13

14
__all__ = [
3✔
15
    "chunk_iterables",
16
    "creation_date",
17
    "filefolder_iterator",
18
    "find_filepaths",
19
    "list_paths_with_elements",
20
    "read_privileges",
21
    "single_item_list",
22
    "working_directory",
23
    "yesno_prompt",
24
]
25

26
# For datetime validation
27
ISO_8601 = (
3✔
28
    r"^(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])"
29
    r"T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\.[0-9]+)?(Z|[+-](?:2[0-3]|[01][0-9]):[0-5][0-9])?$"
30
)
31

32

33
def filefolder_iterator(
3✔
34
    input_files: Union[str, os.PathLike, List[Union[str, os.PathLike]], GeneratorType],
35
    pattern: str,
36
) -> Union[List[os.PathLike], GeneratorType]:
37
    """
38

39
    Parameters
40
    ----------
41
    input_files: str or Path or List[Union[str, Path]] or GeneratorType
42
    pattern: str
43

44
    Returns
45
    -------
46
    list or generator
47
    """
NEW
48
    if isinstance(input_files, (Path, str)):
×
NEW
49
        input_files = Path(input_files)
×
NEW
50
        if input_files.is_dir():
×
NEW
51
            if pattern.endswith("zarr"):
×
NEW
52
                input_files = sorted(list(input_files.glob(pattern)))
×
53
            else:
NEW
54
                input_files = input_files.rglob(pattern)
×
NEW
55
    elif isinstance(input_files, list):
×
NEW
56
        input_files = sorted(Path(p) for p in input_files)
×
NEW
57
    elif isinstance(input_files, GeneratorType):
×
NEW
58
        pass
×
59
    else:
NEW
60
        raise NotImplementedError(f"input_files: {type(input_files)}")
×
NEW
61
    return input_files
×
62

63

64
def chunk_iterables(iterable: Sequence, chunk_size: int) -> List:
3✔
65
    """Generates lists of `chunk_size` elements from `iterable`.
66

67
    Notes
68
    -----
69
    Adapted from eidord (2012) https://stackoverflow.com/a/12797249/7322852 (https://creativecommons.org/licenses/by-sa/4.0/)
70
    """
71
    iterable = iter(iterable)
×
72
    while True:
×
73
        chunk = list()
×
74
        try:
×
75
            for _ in range(chunk_size):
×
76
                chunk.append(next(iterable))
×
77
            yield chunk
×
78
        except StopIteration:
×
79
            if chunk:
×
80
                yield chunk
×
81
            break
×
82

83

84
def creation_date(path_to_file: Union[Path, str]) -> Union[float, date]:
3✔
85
    """
86
    Try to get the date that a file was created, falling back to when it was last modified if that isn't possible.
87
    See https://stackoverflow.com/a/39501288/1709587 for explanation.
88

89
    Parameters
90
    ----------
91
    path_to_file: Union[Path, str]
92

93
    Returns
94
    -------
95
    Union[float, date]
96
    """
97
    if os.name == "nt":
3✔
98
        return Path(path_to_file).stat().st_ctime
×
99

100
    stat = Path(path_to_file).stat()
3✔
101
    try:
3✔
102
        return date.fromtimestamp(stat.st_ctime)
3✔
103
    except AttributeError:
×
104
        # We're probably on Linux. No easy way to get creation dates here,
105
        # so we'll settle for when its content was last modified.
106
        return date.fromtimestamp(stat.st_mtime)
×
107

108

109
def read_privileges(location: Union[Path, str], strict: bool = False) -> bool:
3✔
110
    """
111
    Determine whether a user has read privileges to a specific file
112

113
    Parameters
114
    ----------
115
    location: Union[Path, str]
116
    strict: bool
117

118
    Returns
119
    -------
120
    bool
121
      Whether the current user shell has read privileges
122
    """
123
    if (2, 7) < sys.version_info < (3, 6):
3✔
124
        location = str(location)
×
125

126
    msg = ""
3✔
127
    try:
3✔
128
        if Path(location).exists():
3✔
129
            if os.access(location, os.R_OK):
3✔
130
                msg = f"{location} is read OK!"
3✔
131
                logging.info(msg)
3✔
132
                return True
3✔
133
            msg = f"Ensure read privileges for `{location}`."
3✔
134
        else:
135
            msg = f"`{location}` is an invalid path."
3✔
136
        raise OSError()
3✔
137

138
    except OSError:
3✔
139
        logging.exception(msg)
3✔
140
        if strict:
3✔
141
            raise
3✔
142
        return False
3✔
143

144

145
@contextmanager
3✔
146
def working_directory(directory: Union[str, Path]) -> None:
3✔
147
    """
148
    This function momentarily changes the working directory within the context and reverts to the file working directory
149
    when the code block it is acting upon exits
150

151
    Parameters
152
    ----------
153
    directory: Union[str, Path]
154

155
    Returns
156
    -------
157
    None
158

159
    """
160
    owd = os.getcwd()
3✔
161

162
    if (2, 7) < sys.version_info < (3, 6):
3✔
163
        directory = str(directory)
×
164

165
    try:
3✔
166
        os.chdir(directory)
3✔
167
        yield directory
3✔
168
    finally:
169
        os.chdir(owd)
3✔
170

171

172
def find_filepaths(
3✔
173
    source: Union[Path, str, GeneratorType, List[Union[Path, str]]],
174
    recursive: bool = True,
175
    file_suffixes: Optional[Union[str, List[str]]] = None,
176
    **_,
177
) -> List[Path]:
178
    """
179

180
    Parameters
181
    ----------
182
    source : Union[Path, str, GeneratorType, List[Union[Path, str]]]
183
    recursive : bool
184
    file_suffixes: List[str]
185

186
    Returns
187
    -------
188
    List[Path]
189
    """
190

191
    if file_suffixes is None:
3✔
192
        file_suffixes = ["*", ".*"]
×
193
    elif isinstance(file_suffixes, str):
3✔
194
        file_suffixes = [file_suffixes]
×
195

196
    found = list()
3✔
197
    if isinstance(source, (Path, str)):
3✔
198
        source = [source]
3✔
199

200
    for location in source:
3✔
201
        for pattern in file_suffixes:
3✔
202
            if "*" not in pattern:
3✔
NEW
203
                pattern = f"*{pattern}*"
×
204
            if recursive:
3✔
205
                found.extend([f for f in Path(location).expanduser().rglob(pattern)])
3✔
206
            elif not recursive:
×
207
                found.extend([f for f in Path(location).expanduser().glob(pattern)])
×
208
            else:
NEW
209
                raise ValueError(f"Recursive: {recursive}")
×
210

211
    if (2, 7) < sys.version_info < (3, 6):
3✔
212
        found = [str(f) for f in found]
×
213

214
    return found
3✔
215

216

217
def single_item_list(iterable: Iterable) -> bool:
3✔
218
    """
219
    See: https://stackoverflow.com/a/16801605/7322852
220

221
    Parameters
222
    ----------
223
    iterable: Iterable
224

225
    Returns
226
    -------
227
    bool
228

229
    """
230
    iterator = iter(iterable)
×
231
    has_true = any(iterator)  # consume from "i" until first true or it's exhausted
×
232
    has_another_true = any(
×
233
        iterator
234
    )  # carry on consuming until another true value / exhausted
235
    return has_true and not has_another_true  # True if exactly one true found
×
236

237

238
########################################################################################
239

240

241
def yesno_prompt(query: str) -> bool:
3✔
242
    """Prompt user for a yes/no answer.
243
    Parameters
244
    ----------
245
    query : str
246
        the yes/no question to ask the user.
247

248
    Returns
249
    -------
250
    out : bool
251
        True (yes) or False (otherwise).
252
    """
253

NEW
254
    user_input = input(f"{query} (y/n) ")
×
255
    if user_input.lower() == "y":
×
256
        return True
×
257
    if user_input.lower() == "n":
×
258
        return False
×
NEW
259
    raise ValueError(f"{user_input} not in (y, n)")
×
260

261

262
def list_paths_with_elements(
3✔
263
    base_paths: Union[str, List[str]], elements: List[str]
264
) -> List[Dict]:
265
    """List a given path structure.
266

267
    Parameters
268
    ----------
269
    base_paths : List[str]
270
      list of paths from which to start the search.
271
    elements : List[str]
272
      ordered list of the expected elements.
273

274
    Returns
275
    -------
276
    List[Dict]
277
      The keys are 'path' and each of the members of the given elements, the path is the absolute path.
278

279
    Notes
280
    -----
281
    Suppose you have the following structure: /base_path/{color}/{shape}
282
    The resulting list would look like::
283

284
         [{'path':/base_path/red/square, 'color':'red', 'shape':'square'},
285
         {'path':/base_path/red/circle, 'color':'red', 'shape':'circle'},
286
         {'path':/base_path/blue/triangle, 'color':'blue', 'shape':'triangle'},
287
         ...]
288

289
    Obviously, 'path' should not be in the input list of elements.
290
    """
291

292
    # Make sure the base_paths input is a list of absolute path
293
    paths = list()
×
294
    if not hasattr(base_paths, "__iter__"):
×
295
        paths.append(base_paths)
×
296
    paths = map(os.path.abspath, base_paths)
×
297
    # If elements list is empty, return empty list (end of recursion).
298
    if not elements:
×
299
        return list()
×
300

301
    paths_elements = list()
×
302
    for base_path in paths:
×
303
        try:
×
304
            path_content = [f for f in Path(base_path).iterdir()]
×
305
        except NotADirectoryError:
×
306
            continue
×
307
        path_content.sort()
×
308
        next_base_paths = []
×
309
        for path_item in path_content:
×
310
            next_base_paths.append(base_path.joinpath(path_item))
×
311
        next_pe = list_paths_with_elements(next_base_paths, elements[1:])
×
312
        if next_pe:
×
313
            for i, one_pe in enumerate(next_pe):
×
314
                relative_path = next_pe[i]["path"].replace(base_path, "", 1)
×
315
                new_element = relative_path.split("/")[1]
×
316
                next_pe[i][elements[0]] = new_element
×
317
            paths_elements.extend(next_pe)
×
318
        elif len(elements) == 1:
×
319
            for my_path, my_item in zip(next_base_paths, path_content):
×
320
                paths_elements.append({"path": my_path, elements[0]: my_item})
×
321
    return paths_elements
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc