• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 1929339745

pending completion
1929339745

Pull #24

github

GitHub
Merge 4a21595f1 into 250bc4dd4
Pull Request #24: Add CMIP file structure - WIP

49 of 402 new or added lines in 21 files covered. (12.19%)

3 existing lines in 2 files now uncovered.

631 of 2861 relevant lines covered (22.06%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.21
/miranda/utils.py
1
import logging.config
3✔
2
import os
3✔
3
import sys
3✔
4
from contextlib import contextmanager
3✔
5
from datetime import date
3✔
6
from pathlib import Path
3✔
7
from types import GeneratorType
3✔
8
from typing import Dict, Iterable, List, Optional, Sequence, Union
3✔
9

10
from . import scripting
3✔
11

12
KiB = int(pow(2, 10))
3✔
13
MiB = int(pow(2, 20))
3✔
14
GiB = int(pow(2, 30))
3✔
15

16
logging.config.dictConfig(scripting.LOGGING_CONFIG)
3✔
17

18
__all__ = [
3✔
19
    "chunk_iterables",
20
    "creation_date",
21
    "find_filepaths",
22
    "GiB",
23
    "ingest",
24
    "KiB",
25
    "list_paths_with_elements",
26
    "MiB",
27
    "read_privileges",
28
    "single_item_list",
29
    "working_directory",
30
    "yesno_prompt",
31
]
32

33

34
def ingest(files: Union[GeneratorType, List]) -> List:
3✔
35
    if isinstance(files, GeneratorType):
×
36
        files = [f for f in files]
×
37
    files.sort()
×
38
    return files
×
39

40

41
def chunk_iterables(iterable: Sequence, chunk_size: int) -> List:
3✔
42
    """Generates lists of `chunk_size` elements from `iterable`.
43

44
    Notes
45
    -----
46
    Adapted from eidord (2012) https://stackoverflow.com/a/12797249/7322852 (https://creativecommons.org/licenses/by-sa/4.0/)
47
    """
48
    iterable = iter(iterable)
×
49
    while True:
×
50
        chunk = list()
×
51
        try:
×
52
            for _ in range(chunk_size):
×
53
                chunk.append(next(iterable))
×
54
            yield chunk
×
55
        except StopIteration:
×
56
            if chunk:
×
57
                yield chunk
×
58
            break
×
59

60

61
def creation_date(path_to_file: Union[Path, str]) -> Union[float, date]:
3✔
62
    """
63
    Try to get the date that a file was created, falling back to when it was last modified if that isn't possible.
64
    See https://stackoverflow.com/a/39501288/1709587 for explanation.
65

66
    Parameters
67
    ----------
68
    path_to_file: Union[Path, str]
69

70
    Returns
71
    -------
72
    Union[float, date]
73
    """
74
    if os.name == "nt":
3✔
75
        return Path(path_to_file).stat().st_ctime
×
76

77
    stat = Path(path_to_file).stat()
3✔
78
    try:
3✔
79
        return date.fromtimestamp(stat.st_ctime)
3✔
80
    except AttributeError:
×
81
        # We're probably on Linux. No easy way to get creation dates here,
82
        # so we'll settle for when its content was last modified.
83
        return date.fromtimestamp(stat.st_mtime)
×
84

85

86
def read_privileges(location: Union[Path, str], strict: bool = False) -> bool:
3✔
87
    """
88
    Determine whether a user has read privileges to a specific file
89

90
    Parameters
91
    ----------
92
    location: Union[Path, str]
93
    strict: bool
94

95
    Returns
96
    -------
97
    bool
98
      Whether or not the current user shell has read privileges
99
    """
100
    if (2, 7) < sys.version_info < (3, 6):
3✔
101
        location = str(location)
×
102

103
    msg = ""
3✔
104
    try:
3✔
105
        if Path(location).exists():
3✔
106
            if os.access(location, os.R_OK):
3✔
107
                msg = f"{location} is read OK!"
3✔
108
                logging.info(msg)
3✔
109
                return True
3✔
110
            msg = f"Ensure read privileges for `{location}`."
3✔
111
        else:
112
            msg = f"`{location}` is an invalid path."
3✔
113
        raise OSError
3✔
114

115
    except OSError:
3✔
116
        logging.exception(msg)
3✔
117
        if strict:
3✔
118
            raise
3✔
119
        return False
3✔
120

121

122
@contextmanager
3✔
123
def working_directory(directory: Union[str, Path]) -> None:
3✔
124
    """
125
    This function momentarily changes the working directory within the context and reverts to the file working directory
126
    when the code block it is acting upon exits
127

128
    Parameters
129
    ----------
130
    directory: Union[str, Path]
131

132
    Returns
133
    -------
134
    None
135

136
    """
137
    owd = os.getcwd()
3✔
138

139
    if (2, 7) < sys.version_info < (3, 6):
3✔
140
        directory = str(directory)
×
141

142
    try:
3✔
143
        os.chdir(directory)
3✔
144
        yield directory
3✔
145
    finally:
146
        os.chdir(owd)
3✔
147

148

149
def find_filepaths(
3✔
150
    source: Union[Path, str, GeneratorType, List[Union[Path, str]]],
151
    recursive: bool = True,
152
    file_suffixes: Optional[Union[str, List[str]]] = None,
153
    **_,
154
) -> List[Path]:
155
    """
156

157
    Parameters
158
    ----------
159
    source : Union[Path, str, GeneratorType, List[Union[Path, str]]]
160
    recursive : bool
161
    file_suffixes: List[str]
162

163
    Returns
164
    -------
165
    List[Path]
166
    """
167

168
    if file_suffixes is None:
3✔
169
        file_suffixes = ["*", ".*"]
×
170
    elif isinstance(file_suffixes, str):
3✔
171
        file_suffixes = [file_suffixes]
×
172

173
    found = list()
3✔
174
    if isinstance(source, (Path, str)):
3✔
175
        source = [source]
3✔
176

177
    for location in source:
3✔
178
        for pattern in file_suffixes:
3✔
179
            if "*" not in pattern:
3✔
NEW
180
                pattern = f"*{pattern}*"
×
181
            if recursive:
3✔
182
                found.extend([f for f in Path(location).expanduser().rglob(pattern)])
3✔
183
            elif not recursive:
×
184
                found.extend([f for f in Path(location).expanduser().glob(pattern)])
×
185
            else:
NEW
186
                raise ValueError(f"Recursive: {recursive}")
×
187

188
    if (2, 7) < sys.version_info < (3, 6):
3✔
189
        found = [str(f) for f in found]
×
190

191
    return found
3✔
192

193

194
def single_item_list(iterable: Iterable) -> bool:
3✔
195
    """
196
    See: https://stackoverflow.com/a/16801605/7322852
197

198
    Parameters
199
    ----------
200
    iterable: Iterable
201

202
    Returns
203
    -------
204
    bool
205

206
    """
207
    iterator = iter(iterable)
×
208
    has_true = any(iterator)  # consume from "i" until first true or it's exhausted
×
209
    has_another_true = any(
×
210
        iterator
211
    )  # carry on consuming until another true value / exhausted
212
    return has_true and not has_another_true  # True if exactly one true found
×
213

214

215
########################################################################################
216

217

218
def yesno_prompt(query: str) -> bool:
3✔
219
    """Prompt user for a yes/no answer.
220
    Parameters
221
    ----------
222
    query : str
223
        the yes/no question to ask the user.
224

225
    Returns
226
    -------
227
    out : bool
228
        True (yes) or False (otherwise).
229
    """
230

NEW
231
    user_input = input(f"{query} (y/n) ")
×
232
    if user_input.lower() == "y":
×
233
        return True
×
234
    if user_input.lower() == "n":
×
235
        return False
×
NEW
236
    raise ValueError(f"{user_input} not in (y, n)")
×
237

238

239
def list_paths_with_elements(
3✔
240
    base_paths: Union[str, List[str]], elements: List[str]
241
) -> List[Dict]:
242
    """List a given path structure.
243

244
    Parameters
245
    ----------
246
    base_paths : List[str]
247
        list of paths from which to start the search.
248
    elements : List[str]
249
        ordered list of the expected elements.
250

251
    Returns
252
    -------
253
    List[Dict]
254
        the keys are 'path' and each of the members of the given elements,
255
        the path is the absolute path.
256

257
    Notes
258
    -----
259
    Suppose you have the following structure:
260
    /base_path/{color}/{shape}
261
    The resulting list would look like:
262
    [{'path':/base_path/red/square, 'color':'red', 'shape':'square'},
263
     {'path':/base_path/red/circle, 'color':'red', 'shape':'circle'},
264
     {'path':/base_path/blue/triangle, 'color':'blue', 'shape':'triangle'},
265
     ...
266
    ]
267
    Obviously, 'path' should not be in the input list of elements.
268
    """
269

270
    # Make sure the base_paths input is a list of absolute path
271
    paths = list()
×
272
    if not hasattr(base_paths, "__iter__"):
×
273
        paths.append(base_paths)
×
274
    paths = map(os.path.abspath, base_paths)
×
275
    # If elements list is empty, return empty list (end of recursion).
276
    if not elements:
×
277
        return list()
×
278

279
    paths_elements = list()
×
280
    for base_path in paths:
×
281
        try:
×
282
            path_content = [f for f in Path(base_path).iterdir()]
×
283
        except NotADirectoryError:
×
284
            continue
×
285
        path_content.sort()
×
286
        next_base_paths = []
×
287
        for path_item in path_content:
×
288
            next_base_paths.append(base_path.joinpath(path_item))
×
289
        next_pe = list_paths_with_elements(next_base_paths, elements[1:])
×
290
        if next_pe:
×
291
            for i, one_pe in enumerate(next_pe):
×
292
                relative_path = next_pe[i]["path"].replace(base_path, "", 1)
×
293
                new_element = relative_path.split("/")[1]
×
294
                next_pe[i][elements[0]] = new_element
×
295
            paths_elements.extend(next_pe)
×
296
        elif len(elements) == 1:
×
297
            for my_path, my_item in zip(next_base_paths, path_content):
×
298
                paths_elements.append({"path": my_path, elements[0]: my_item})
×
299
    return paths_elements
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc