• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / miranda / 2157378933

pending completion
2157378933

Pull #24

github

GitHub
Merge c79a04131 into 9ac032fc5
Pull Request #24: Add CMIP file structure, use pyessv controlled vocabularies, and major refactoring

241 of 1107 new or added lines in 35 files covered. (21.77%)

13 existing lines in 4 files now uncovered.

735 of 3247 relevant lines covered (22.64%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.89
/miranda/storage.py
1
"""
2
=====================
3
Disk space management
4
=====================
5

6
Classes:
7

8
 * DiskSpaceError - the exception raised on failure.
9
 * :class:`FileMeta` - file and its size.
10
 * :class:`StorageState` - storage capacity and availability of a medium.
11

12
Functions:
13

14
 * :func:`total_size` - get total size of a list of files.
15
 * :func:`size_division` - divide files based on number and size restrictions.
16

17
"""
18
import logging
3✔
19
import logging.config
3✔
20
import os
3✔
21
from functools import reduce
3✔
22
from pathlib import Path
3✔
23
from types import GeneratorType
3✔
24
from typing import List, Union
3✔
25

26
from .scripting import LOGGING_CONFIG
3✔
27

28
logging.config.dictConfig(LOGGING_CONFIG)
3✔
29

30

31
__all__ = [
3✔
32
    "DiskSpaceError",
33
    "FileMeta",
34
    "StorageState",
35
    "file_size",
36
    "report_file_size",
37
    "size_division",
38
    "size_evaluation",
39
]
40

41

42
class DiskSpaceError(Exception):
3✔
43
    pass
3✔
44

45

46
class FileMeta:
3✔
47
    """File path and size."""
48

49
    django = {
3✔
50
        "path": ["CharField", "max_length=512"],
51
        "size": ["IntegerField", "null=True", "blank=True"],
52
    }
53

54
    def __init__(self, path: str, size: int = -1):
3✔
55
        """Initialize file meta.
56

57
        Parameters
58
        ----------
59
        path : str
60
            full path of the file.
61
        size : int
62
            size of file in bytes (default: will obtain from os.path.getsize
63
            if file exists, set to 0 otherwise).
64

65
        """
66

67
        # Make sure we have the full path of the file
68
        self.path = Path(path).absolute()
×
69

70
        # Get size of file if it is not specified
71
        if (-1 == size) and self.path.exists():
×
72
            try:
×
73
                self.size = self.path.stat().st_size
×
74
            except OSError:
×
NEW
75
                raise DiskSpaceError(f"Cannot get size of {self.path.name}.")
×
76
        elif -1 == size:
×
77
            self.size = 0
×
78
        else:
79
            self.size = size
×
80

81
    def __eq__(self, other):
3✔
82
        if self.path == other.path:
×
83
            return True
×
84
        else:
85
            return False
×
86

87

88
class StorageState:
3✔
89
    """Information regarding the storage capacity of a disk."""
90

91
    def __init__(self, base_path, capacity=-1, used_space=-1, free_space=-1):
3✔
92
        """Initialize storage state.
93

94
        Parameters
95
        ----------
96
        base_path : str
97
            base path of the storage medium.
98
        capacity : int
99
            capacity of medium in bytes (default: will obtain from system
100
            call to 'df').
101
        used_space : int
102
            space currently used on the medium (default: will obtain from
103
            system call to 'df').
104
        free_space : int
105
            space available on the medium (default: will obtain from system
106
            call to 'df').
107

108
        """
109

110
        # Make sure we have the full base path
111
        self.base_path = Path(base_path).absolute()
×
112

113
        # Get attributes from 'df' function if they are not specified
114
        if (-1 == capacity) or (-1 == used_space) or (-1 == free_space):
×
115
            if not os.path.isdir(base_path):
×
116
                raise DiskSpaceError("Cannot analyse " + base_path + ".")
×
117
            elif not os.path.isfile("/bin/df"):
×
118
                raise DiskSpaceError("/bin/df does not exist.")
×
119
            df_output = os.popen("/bin/df -P " + base_path + " | tail -1").read()
×
120
            if not df_output:
×
121
                raise DiskSpaceError("/bin/df call failed.")
×
122
            df_output_split = df_output.split()
×
123
        if -1 == capacity:
×
124
            try:
×
125
                self.capacity = int(df_output_split[1]) * 1000
×
126
            except Exception as e:
×
127
                raise DiskSpaceError("/bin/df output not as expected.") from e
×
128
        else:
129
            self.capacity = capacity
×
130
        if -1 == used_space:
×
131
            try:
×
132
                self.used_space = int(df_output_split[2]) * 1000
×
133
            except Exception as e:
×
134
                raise DiskSpaceError("/bin/df output not as expected.") from e
×
135
        else:
136
            self.used_space = used_space
×
137
        if -1 == free_space:
×
138
            try:
×
139
                self.free_space = int(df_output_split[3]) * 1000
×
140
            except Exception as e:
×
141
                raise DiskSpaceError("/bin/df output not as expected.") from e
×
142
        else:
143
            self.free_space = free_space
×
144

145

146
def size_evaluation(file_list: List[Union[str, FileMeta, Path]]) -> int:
3✔
147
    """Total size of files.
148

149
    Parameters
150
    ----------
151
    file_list : Union[str, Path, FileMeta]
152

153
    Returns
154
    -------
155
    int
156
      total size of files in bytes.
157

158
    """
159

160
    if file_list:
×
161
        size = 0
×
162
        for file_to_add in file_list:
×
163
            # If file paths are given, convert to FileMeta objects first
164
            if not isinstance(file_to_add, FileMeta):
×
165
                try:
×
166
                    file_to_add = FileMeta(file_to_add)
×
167
                except DiskSpaceError:
×
168
                    raise
×
169
            size += file_to_add.size
×
170
        return size
×
171
    else:
172
        return 0
×
173

174

175
def size_division(
3✔
176
    files_to_divide: Union[List, FileMeta, Path],
177
    size_limit: int = 0,
178
    file_limit: int = 0,
179
    check_name_repetition: bool = False,
180
    preserve_order: bool = False,
181
) -> List[list]:
182
    """Divide files according to size and number limits.
183

184
    Parameters
185
    ----------
186
    files_to_divide : Union[List, FileMeta, Path]
187
    size_limit : int
188
      size limit of divisions in bytes (default: no limit).
189
    file_limit : int
190
      number of files limit of divisions (default: no limit).
191
    check_name_repetition : bool
192
      flag to prevent file name repetitions (default: off).
193
    preserve_order : bool
194
      flag to force files to be restored in the order they are given
195
      (default: off).
196

197
    Returns
198
    -------
199
    List[list]
200
      list of divisions (each division is a list of FileMeta objects).
201

202
    """
203

NEW
204
    divisions = list()
×
205
    for file_divide in files_to_divide:
×
206
        # If file paths are given, convert to FileMeta objects first
207
        if not isinstance(file_divide, FileMeta):
×
208
            try:
×
209
                file_divide = FileMeta(file_divide)
×
210
            except DiskSpaceError:
×
211
                raise
×
212
        # Loop through divisions and try to add file according to limitations
213
        for i, division in enumerate(divisions):
×
214
            size = file_divide.size
×
215
            file_count = 1
×
216
            flag_skip = 0
×
217
            for file_divided in division:
×
218
                if check_name_repetition and (
×
219
                    os.path.basename(file_divided.path)
220
                    == os.path.basename(file_divide.path)
221
                ):
222
                    flag_skip = 1
×
223
                size = size + file_divided.size
×
224
                file_count = file_count + 1
×
225
            if (
×
226
                (size > size_limit != 0)
227
                or (file_count > file_limit != 0)
228
                or flag_skip == 1
229
            ):
230
                continue
×
231
            elif preserve_order and (i != len(divisions) - 1):
×
232
                continue
×
233
            else:
234
                divisions[i].append(file_divide)
×
235
                break
×
236
        else:
237
            divisions.append([file_divide])
×
238
    return divisions
×
239

240

241
def file_size(
3✔
242
    file_path_or_bytes: Union[Path, str, int, List[Union[str, Path]], GeneratorType]
243
) -> int:
244
    """
245

246
    Parameters
247
    ----------
248
    file_path_or_bytes : Union[Path, str, int, List[Union[str, Path]], GeneratorType]
249

250
    Returns
251
    -------
252
    int
253
    """
254
    try:
×
255
        if isinstance(file_path_or_bytes, int):
×
256
            total = file_path_or_bytes
×
NEW
257
        elif isinstance(file_path_or_bytes, (list, GeneratorType)):
×
NEW
258
            total = reduce(
×
259
                (lambda x, y: x + y),
260
                map(lambda f: Path(f).stat().st_size, file_path_or_bytes),
261
            )
262
        elif Path(file_path_or_bytes).is_file():
×
263
            total = Path(file_path_or_bytes).stat().st_size
×
264
        elif Path(file_path_or_bytes).is_dir():
×
265
            total = reduce(
×
266
                (lambda x, y: x + y),
267
                [f.stat().st_size for f in Path(file_path_or_bytes).rglob("*")],
268
            )
269
        else:
270
            raise FileNotFoundError
×
271
    except FileNotFoundError:
×
272
        logging.error(
×
273
            "File Not Found: Unable to parse file size from %s." % file_path_or_bytes
274
        )
275
        raise
×
276

277
    return total
×
278

279

280
def report_file_size(
3✔
281
    file_path_or_bytes: Union[Path, str, int, List[Union[str, Path]], GeneratorType],
282
    use_binary: bool = True,
283
    significant_digits: int = 2,
284
) -> str:
285
    """
286
    This function will parse the contents of a list or generator of files and return the
287
    size in bytes of a file or a list of files in pretty formatted text.
288
    """
NEW
289
    _CONVERSIONS = ["B", "k{}B", "M{}B", "G{}B", "T{}B", "P{}B", "E{}B", "Z{}B", "Y{}B"]
×
290

291
    def _size_formatter(i: int, binary: bool = True, precision: int = 2) -> str:
×
292
        """
293
        This function will format byte size into an appropriate nomenclature for prettier printing.
294
        """
295
        import math
×
296

297
        base = 1024 if binary else 1000
×
298
        if i == 0:
×
299
            return "0 B"
×
300
        multiple = math.trunc(math.log2(i) / math.log2(base))
×
301
        value = i / math.pow(base, multiple)
×
302
        suffix = _CONVERSIONS[multiple].format("i" if binary else "")
×
NEW
303
        return f"{value:.{precision}f} {suffix}"
×
304

305
    total = file_size(file_path_or_bytes)
×
306
    return _size_formatter(total, binary=use_binary, precision=significant_digits)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc