• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 1947882388

pending completion
1947882388

Pull #24

github

GitHub
Merge c4f4ce623 into 250bc4dd4
Pull Request #24: Add CMIP file structure - WIP

115 of 667 new or added lines in 29 files covered. (17.24%)

9 existing lines in 3 files now uncovered.

660 of 3027 relevant lines covered (21.8%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.97
/miranda/utils.py
1
import logging.config
3✔
2
import os
3✔
3
import sys
3✔
4
from contextlib import contextmanager
3✔
5
from datetime import date
3✔
6
from pathlib import Path
3✔
7
from types import GeneratorType
3✔
8
from typing import Dict, Iterable, List, Optional, Sequence, Union
3✔
9

10
from . import scripting
3✔
11

12
logging.config.dictConfig(scripting.LOGGING_CONFIG)
3✔
13

14
__all__ = [
3✔
15
    "chunk_iterables",
16
    "creation_date",
17
    "find_filepaths",
18
    "ingest",
19
    "list_paths_with_elements",
20
    "read_privileges",
21
    "single_item_list",
22
    "working_directory",
23
    "yesno_prompt",
24
]
25

26

27
def ingest(files: Union[GeneratorType, List]) -> List:
3✔
28
    if isinstance(files, GeneratorType):
×
29
        files = [f for f in files]
×
30
    files.sort()
×
31
    return files
×
32

33

34
def chunk_iterables(iterable: Sequence, chunk_size: int) -> List:
3✔
35
    """Generates lists of `chunk_size` elements from `iterable`.
36

37
    Notes
38
    -----
39
    Adapted from eidord (2012) https://stackoverflow.com/a/12797249/7322852 (https://creativecommons.org/licenses/by-sa/4.0/)
40
    """
41
    iterable = iter(iterable)
×
42
    while True:
×
43
        chunk = list()
×
44
        try:
×
45
            for _ in range(chunk_size):
×
46
                chunk.append(next(iterable))
×
47
            yield chunk
×
48
        except StopIteration:
×
49
            if chunk:
×
50
                yield chunk
×
51
            break
×
52

53

54
def creation_date(path_to_file: Union[Path, str]) -> Union[float, date]:
3✔
55
    """
56
    Try to get the date that a file was created, falling back to when it was last modified if that isn't possible.
57
    See https://stackoverflow.com/a/39501288/1709587 for explanation.
58

59
    Parameters
60
    ----------
61
    path_to_file: Union[Path, str]
62

63
    Returns
64
    -------
65
    Union[float, date]
66
    """
67
    if os.name == "nt":
3✔
68
        return Path(path_to_file).stat().st_ctime
×
69

70
    stat = Path(path_to_file).stat()
3✔
71
    try:
3✔
72
        return date.fromtimestamp(stat.st_ctime)
3✔
73
    except AttributeError:
×
74
        # We're probably on Linux. No easy way to get creation dates here,
75
        # so we'll settle for when its content was last modified.
76
        return date.fromtimestamp(stat.st_mtime)
×
77

78

79
def read_privileges(location: Union[Path, str], strict: bool = False) -> bool:
3✔
80
    """
81
    Determine whether a user has read privileges to a specific file
82

83
    Parameters
84
    ----------
85
    location: Union[Path, str]
86
    strict: bool
87

88
    Returns
89
    -------
90
    bool
91
      Whether or not the current user shell has read privileges
92
    """
93
    if (2, 7) < sys.version_info < (3, 6):
3✔
94
        location = str(location)
×
95

96
    msg = ""
3✔
97
    try:
3✔
98
        if Path(location).exists():
3✔
99
            if os.access(location, os.R_OK):
3✔
100
                msg = f"{location} is read OK!"
3✔
101
                logging.info(msg)
3✔
102
                return True
3✔
103
            msg = f"Ensure read privileges for `{location}`."
3✔
104
        else:
105
            msg = f"`{location}` is an invalid path."
3✔
106
        raise OSError()
3✔
107

108
    except OSError:
3✔
109
        logging.exception(msg)
3✔
110
        if strict:
3✔
111
            raise
3✔
112
        return False
3✔
113

114

115
@contextmanager
3✔
116
def working_directory(directory: Union[str, Path]) -> None:
3✔
117
    """
118
    This function momentarily changes the working directory within the context and reverts to the file working directory
119
    when the code block it is acting upon exits
120

121
    Parameters
122
    ----------
123
    directory: Union[str, Path]
124

125
    Returns
126
    -------
127
    None
128

129
    """
130
    owd = os.getcwd()
3✔
131

132
    if (2, 7) < sys.version_info < (3, 6):
3✔
133
        directory = str(directory)
×
134

135
    try:
3✔
136
        os.chdir(directory)
3✔
137
        yield directory
3✔
138
    finally:
139
        os.chdir(owd)
3✔
140

141

142
def find_filepaths(
3✔
143
    source: Union[Path, str, GeneratorType, List[Union[Path, str]]],
144
    recursive: bool = True,
145
    file_suffixes: Optional[Union[str, List[str]]] = None,
146
    **_,
147
) -> List[Path]:
148
    """
149

150
    Parameters
151
    ----------
152
    source : Union[Path, str, GeneratorType, List[Union[Path, str]]]
153
    recursive : bool
154
    file_suffixes: List[str]
155

156
    Returns
157
    -------
158
    List[Path]
159
    """
160

161
    if file_suffixes is None:
3✔
162
        file_suffixes = ["*", ".*"]
×
163
    elif isinstance(file_suffixes, str):
3✔
164
        file_suffixes = [file_suffixes]
×
165

166
    found = list()
3✔
167
    if isinstance(source, (Path, str)):
3✔
168
        source = [source]
3✔
169

170
    for location in source:
3✔
171
        for pattern in file_suffixes:
3✔
172
            if "*" not in pattern:
3✔
NEW
173
                pattern = f"*{pattern}*"
×
174
            if recursive:
3✔
175
                found.extend([f for f in Path(location).expanduser().rglob(pattern)])
3✔
176
            elif not recursive:
×
177
                found.extend([f for f in Path(location).expanduser().glob(pattern)])
×
178
            else:
NEW
179
                raise ValueError(f"Recursive: {recursive}")
×
180

181
    if (2, 7) < sys.version_info < (3, 6):
3✔
182
        found = [str(f) for f in found]
×
183

184
    return found
3✔
185

186

187
def single_item_list(iterable: Iterable) -> bool:
3✔
188
    """
189
    See: https://stackoverflow.com/a/16801605/7322852
190

191
    Parameters
192
    ----------
193
    iterable: Iterable
194

195
    Returns
196
    -------
197
    bool
198

199
    """
200
    iterator = iter(iterable)
×
201
    has_true = any(iterator)  # consume from "i" until first true or it's exhausted
×
202
    has_another_true = any(
×
203
        iterator
204
    )  # carry on consuming until another true value / exhausted
205
    return has_true and not has_another_true  # True if exactly one true found
×
206

207

208
########################################################################################
209

210

211
def yesno_prompt(query: str) -> bool:
3✔
212
    """Prompt user for a yes/no answer.
213
    Parameters
214
    ----------
215
    query : str
216
        the yes/no question to ask the user.
217

218
    Returns
219
    -------
220
    out : bool
221
        True (yes) or False (otherwise).
222
    """
223

NEW
224
    user_input = input(f"{query} (y/n) ")
×
225
    if user_input.lower() == "y":
×
226
        return True
×
227
    if user_input.lower() == "n":
×
228
        return False
×
NEW
229
    raise ValueError(f"{user_input} not in (y, n)")
×
230

231

232
def list_paths_with_elements(
3✔
233
    base_paths: Union[str, List[str]], elements: List[str]
234
) -> List[Dict]:
235
    """List a given path structure.
236

237
    Parameters
238
    ----------
239
    base_paths : List[str]
240
        list of paths from which to start the search.
241
    elements : List[str]
242
        ordered list of the expected elements.
243

244
    Returns
245
    -------
246
    List[Dict]
247
        the keys are 'path' and each of the members of the given elements,
248
        the path is the absolute path.
249

250
    Notes
251
    -----
252
    Suppose you have the following structure:
253
    /base_path/{color}/{shape}
254
    The resulting list would look like:
255
    [{'path':/base_path/red/square, 'color':'red', 'shape':'square'},
256
     {'path':/base_path/red/circle, 'color':'red', 'shape':'circle'},
257
     {'path':/base_path/blue/triangle, 'color':'blue', 'shape':'triangle'},
258
     ...
259
    ]
260
    Obviously, 'path' should not be in the input list of elements.
261
    """
262

263
    # Make sure the base_paths input is a list of absolute path
264
    paths = list()
×
265
    if not hasattr(base_paths, "__iter__"):
×
266
        paths.append(base_paths)
×
267
    paths = map(os.path.abspath, base_paths)
×
268
    # If elements list is empty, return empty list (end of recursion).
269
    if not elements:
×
270
        return list()
×
271

272
    paths_elements = list()
×
273
    for base_path in paths:
×
274
        try:
×
275
            path_content = [f for f in Path(base_path).iterdir()]
×
276
        except NotADirectoryError:
×
277
            continue
×
278
        path_content.sort()
×
279
        next_base_paths = []
×
280
        for path_item in path_content:
×
281
            next_base_paths.append(base_path.joinpath(path_item))
×
282
        next_pe = list_paths_with_elements(next_base_paths, elements[1:])
×
283
        if next_pe:
×
284
            for i, one_pe in enumerate(next_pe):
×
285
                relative_path = next_pe[i]["path"].replace(base_path, "", 1)
×
286
                new_element = relative_path.split("/")[1]
×
287
                next_pe[i][elements[0]] = new_element
×
288
            paths_elements.extend(next_pe)
×
289
        elif len(elements) == 1:
×
290
            for my_path, my_item in zip(next_base_paths, path_content):
×
291
                paths_elements.append({"path": my_path, elements[0]: my_item})
×
292
    return paths_elements
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc