• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2266807674

pending completion
2266807674

Pull #33

github

GitHub
Merge 58a4f4d99 into dad775e9d
Pull Request #33: Support CORDEX and CMIP5/6

34 of 242 new or added lines in 16 files covered. (14.05%)

10 existing lines in 7 files now uncovered.

661 of 3268 relevant lines covered (20.23%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.7
/miranda/storage.py
1
"""
2
=====================
3
Disk space management
4
=====================
5

6
Classes:
7

8
 * DiskSpaceError - the exception raised on failure.
9
 * :class:`FileMeta` - file and its size.
10
 * :class:`StorageState` - storage capacity and availability of a medium.
11

12
Functions:
13

14
 * :func:`total_size` - get total size of a list of files.
15
 * :func:`size_division` - divide files based on number and size restrictions.
16

17
"""
18
import logging
3✔
19
import logging.config
3✔
20
import os
3✔
21
import subprocess
3✔
22
from functools import reduce
3✔
23
from pathlib import Path
3✔
24
from types import GeneratorType
3✔
25
from typing import List, Union
3✔
26

27
from .scripting import LOGGING_CONFIG
3✔
28

29
logging.config.dictConfig(LOGGING_CONFIG)
3✔
30

31

32
__all__ = [
3✔
33
    "DiskSpaceError",
34
    "FileMeta",
35
    "StorageState",
36
    "file_size",
37
    "report_file_size",
38
    "size_division",
39
    "size_evaluation",
40
]
41

42

43
class DiskSpaceError(Exception):
3✔
44
    pass
3✔
45

46

47
class FileMeta:
3✔
48
    """File path and size."""
49

50
    django = {
3✔
51
        "path": ["CharField", "max_length=512"],
52
        "size": ["IntegerField", "null=True", "blank=True"],
53
    }
54

55
    def __init__(self, path: str, size: int = -1):
3✔
56
        """Initialize file meta.
57

58
        Parameters
59
        ----------
60
        path : str
61
            full path of the file.
62
        size : int
63
            size of file in bytes (default: will obtain from os.path.getsize
64
            if file exists, set to 0 otherwise).
65

66
        """
67

68
        # Make sure we have the full path of the file
69
        self.path = Path(path).absolute()
×
70

71
        # Get size of file if it is not specified
72
        if (-1 == size) and self.path.exists():
×
73
            try:
×
74
                self.size = self.path.stat().st_size
×
75
            except OSError:
×
76
                raise DiskSpaceError(f"Cannot get size of {self.path.name}.")
×
77
        elif -1 == size:
×
78
            self.size = 0
×
79
        else:
80
            self.size = size
×
81

82
    def __eq__(self, other):
3✔
83
        if self.path == other.path:
×
84
            return True
×
85
        else:
86
            return False
×
87

88

89
class StorageState:
3✔
90
    """Information regarding the storage capacity of a disk."""
91

92
    def __init__(self, base_path, capacity=-1, used_space=-1, free_space=-1):
3✔
93
        """Initialize storage state.
94

95
        Parameters
96
        ----------
97
        base_path : str
98
            base path of the storage medium.
99
        capacity : int
100
            capacity of medium in bytes (default: will obtain from system
101
            call to 'df').
102
        used_space : int
103
            space currently used on the medium (default: will obtain from
104
            system call to 'df').
105
        free_space : int
106
            space available on the medium (default: will obtain from system
107
            call to 'df').
108

109
        """
110

111
        # Make sure we have the full base path
112
        self.base_path = Path(base_path).absolute()
×
113

114
        # Get attributes from 'df' function if they are not specified
NEW
115
        if not self.base_path.is_dir():
×
NEW
116
            raise DiskSpaceError(f"Cannot analyze {self.base_path}.")
×
NEW
117
        elif not Path("/bin/df").exists():
×
NEW
118
            raise DiskSpaceError("/bin/df does not exist.")
×
NEW
119
        df_output = subprocess.run(
×
120
            ["/bin/df", "-P", base_path, "|", "tail", "-1"], capture_output=True
121
        )
NEW
122
        if not df_output.stdout:
×
NEW
123
            raise DiskSpaceError("/bin/df call failed.")
×
NEW
124
        df_output_split = df_output.stdout.split()
×
125
        if -1 == capacity:
×
126
            try:
×
127
                self.capacity = int(df_output_split[1]) * 1000
×
128
            except Exception as e:
×
129
                raise DiskSpaceError("/bin/df output not as expected.") from e
×
130
        else:
131
            self.capacity = capacity
×
132
        if -1 == used_space:
×
133
            try:
×
134
                self.used_space = int(df_output_split[2]) * 1000
×
135
            except Exception as e:
×
136
                raise DiskSpaceError("/bin/df output not as expected.") from e
×
137
        else:
138
            self.used_space = used_space
×
139
        if -1 == free_space:
×
140
            try:
×
141
                self.free_space = int(df_output_split[3]) * 1000
×
142
            except Exception as e:
×
143
                raise DiskSpaceError("/bin/df output not as expected.") from e
×
144
        else:
145
            self.free_space = free_space
×
146

147

148
def size_evaluation(file_list: List[Union[str, FileMeta, Path]]) -> int:
3✔
149
    """Total size of files.
150

151
    Parameters
152
    ----------
153
    file_list : Union[str, Path, FileMeta]
154

155
    Returns
156
    -------
157
    int
158
      total size of files in bytes.
159

160
    """
161

162
    if file_list:
×
163
        size = 0
×
164
        for file_to_add in file_list:
×
165
            # If file paths are given, convert to FileMeta objects first
166
            if not isinstance(file_to_add, FileMeta):
×
167
                try:
×
168
                    file_to_add = FileMeta(file_to_add)
×
169
                except DiskSpaceError:
×
170
                    raise
×
171
            size += file_to_add.size
×
172
        return size
×
173
    else:
174
        return 0
×
175

176

177
def size_division(
3✔
178
    files_to_divide: Union[List, FileMeta, Path],
179
    size_limit: int = 0,
180
    file_limit: int = 0,
181
    check_name_repetition: bool = False,
182
    preserve_order: bool = False,
183
) -> List[list]:
184
    """Divide files according to size and number limits.
185

186
    Parameters
187
    ----------
188
    files_to_divide : Union[List, FileMeta, Path]
189
    size_limit : int
190
      Size limit of divisions in bytes. Default: 0 (no limit).
191
    file_limit : int
192
      Number of files limit of divisions. Default: 0 (no limit).
193
    check_name_repetition : bool
194
      Flag to prevent file name repetitions. Default: False.
195
    preserve_order : bool
196
      Flag to force files to be restored in the order they are given. Default: False.
197

198
    Returns
199
    -------
200
    List[list]
201
      list of divisions (each division is a list of FileMeta objects).
202

203
    """
204

205
    divisions = list()
×
206
    for file_divide in files_to_divide:
×
207
        # If file paths are given, convert to FileMeta objects first
208
        if not isinstance(file_divide, FileMeta):
×
209
            try:
×
210
                file_divide = FileMeta(file_divide)
×
211
            except DiskSpaceError:
×
212
                raise
×
213
        # Loop through divisions and try to add file according to limitations
214
        for i, division in enumerate(divisions):
×
215
            size = file_divide.size
×
216
            file_count = 1
×
217
            flag_skip = 0
×
218
            for file_divided in division:
×
219
                if check_name_repetition and (
×
220
                    os.path.basename(file_divided.path)
221
                    == os.path.basename(file_divide.path)
222
                ):
223
                    flag_skip = 1
×
224
                size = size + file_divided.size
×
225
                file_count = file_count + 1
×
226
            if (
×
227
                (size > size_limit != 0)
228
                or (file_count > file_limit != 0)
229
                or flag_skip == 1
230
            ):
231
                continue
×
232
            elif preserve_order and (i != len(divisions) - 1):
×
233
                continue
×
234
            else:
235
                divisions[i].append(file_divide)
×
236
                break
×
237
        else:
238
            divisions.append([file_divide])
×
239
    return divisions
×
240

241

242
def file_size(
3✔
243
    file_path_or_bytes: Union[Path, str, int, List[Union[str, Path]], GeneratorType]
244
) -> int:
245
    """
246

247
    Parameters
248
    ----------
249
    file_path_or_bytes : Union[Path, str, int, List[Union[str, Path]], GeneratorType]
250

251
    Returns
252
    -------
253
    int
254
    """
255
    try:
×
256
        if isinstance(file_path_or_bytes, int):
×
257
            total = file_path_or_bytes
×
258
        elif isinstance(file_path_or_bytes, (list, GeneratorType)):
×
259
            total = reduce(
×
260
                (lambda x, y: x + y),
261
                map(lambda f: Path(f).stat().st_size, file_path_or_bytes),
262
            )
263
        elif Path(file_path_or_bytes).is_file():
×
264
            total = Path(file_path_or_bytes).stat().st_size
×
265
        elif Path(file_path_or_bytes).is_dir():
×
266
            total = reduce(
×
267
                (lambda x, y: x + y),
268
                [f.stat().st_size for f in Path(file_path_or_bytes).rglob("*")],
269
            )
270
        else:
271
            raise FileNotFoundError
×
272
    except FileNotFoundError:
×
273
        logging.error(
×
274
            f"File Not Found: Unable to parse file size from {file_path_or_bytes}"
275
        )
276
        raise
×
277

278
    return total
×
279

280

281
def report_file_size(
3✔
282
    file_path_or_bytes: Union[Path, str, int, List[Union[str, Path]], GeneratorType],
283
    use_binary: bool = True,
284
    significant_digits: int = 2,
285
) -> str:
286
    """
287
    This function will parse the contents of a list or generator of files and return the
288
    size in bytes of a file or a list of files in pretty formatted text.
289
    """
290
    _CONVERSIONS = ["B", "k{}B", "M{}B", "G{}B", "T{}B", "P{}B", "E{}B", "Z{}B", "Y{}B"]
×
291

292
    def _size_formatter(i: int, binary: bool = True, precision: int = 2) -> str:
×
293
        """
294
        This function will format byte size into an appropriate nomenclature for prettier printing.
295
        """
296
        import math
×
297

298
        base = 1024 if binary else 1000
×
299
        if i == 0:
×
300
            return "0 B"
×
301
        multiple = math.trunc(math.log2(i) / math.log2(base))
×
302
        value = i / math.pow(base, multiple)
×
303
        suffix = _CONVERSIONS[multiple].format("i" if binary else "")
×
304
        return f"{value:.{precision}f} {suffix}"
×
305

306
    total = file_size(file_path_or_bytes)
×
307
    return _size_formatter(total, binary=use_binary, precision=significant_digits)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc