• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mar10 / stressor / 608876122

30 Aug 2023 05:16AM UTC coverage: 76.74% (+0.2%) from 76.543%
608876122

Pull #22

travis-ci

GitHub
Bump requests from 2.28.2 to 2.31.0
Pull Request #22: Bump requests from 2.28.2 to 2.31.0

1709 of 2227 relevant lines covered (76.74%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.73
/stressor/util.py
1
# -*- coding: utf-8 -*-
2
# (c) 2020-2023 Martin Wendt and contributors; see https://github.com/mar10/stressor
3
# Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
4
"""
1✔
5
"""
6
import logging
1✔
7
import os
1✔
8
import platform
1✔
9
import random
1✔
10
import re
1✔
11
import sys
1✔
12
import types
1✔
13
import warnings
1✔
14
from datetime import datetime
1✔
15
from urllib.parse import urljoin, urlparse
1✔
16

17
from dateutil.parser import isoparse
1✔
18

19
from stressor import __version__
1✔
20

21
logger = logging.getLogger("stressor")
1✔
22

23
# Use logger.important("message even in -q mode")
24
LOGLEVEL_IMPORTANT = logging.WARNING + 1
1✔
25
logging.addLevelName(LOGLEVEL_IMPORTANT, "NOTE")
1✔
26

27

28
def log_important(self, message, *args, **kws):
1✔
29
    if self.isEnabledFor(LOGLEVEL_IMPORTANT):
1✔
30
        # Yes, logger takes its '*args' as 'args'.
31
        self._log(LOGLEVEL_IMPORTANT, message, args, **kws)
1✔
32

33

34
logging.Logger.important = log_important
1✔
35

36

37
#: Check if a a string may be used as YAML dictionary key without using quotes.
38
#: NOTE: YAML evaluates `0_` as "0" and `0_1_` as "1", so we don't accept leading numbers
39
RE_YAML_KEYWORD = re.compile(r"^[a-zA-Z_]+\w*$")
1✔
40

41
PYTHON_VERSION = "{}.{}.{}".format(
1✔
42
    sys.version_info[0], sys.version_info[1], sys.version_info[2]
43
)
44
version_info = "stressor/{} Python/{}({} bit) {}".format(
1✔
45
    __version__,
46
    PYTHON_VERSION,
47
    "64" if sys.maxsize > 2**32 else "32",
48
    platform.platform(),
49
)
50

51

52
class StressorError(RuntimeError):
1✔
53
    """Base class for all exception that we deliberatly throw."""
54

55

56
class NO_DEFAULT:
1✔
57
    """Used as default parameter to distinguish from `None`."""
58

59

60
# class Singleton(type):
61
#     """
62
#     See also https://stackoverflow.com/a/6798042/19166
63

64
#     Example::
65

66
#         class Logger(metaclass=Singleton):
67
#             pass
68

69
#     """
70

71
#     _instances = {}
72

73
#     def __call__(cls, *args, **kwargs):
74
#         if cls not in cls._instances:
75
#             cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
76
#         return cls._instances[cls]
77

78

79
class _VOID_SEGMENT:
1✔
80
    """Used internally."""
81

82

83
class PathStack:
1✔
84
    """
85
    Examples::
86

87
        path = PathStack()
88
        with path.enter("level_1"):
89
            print("Current location: {}".format(path))
90
    """
91

92
    def __init__(self, root_name=None, delimiter="/"):
1✔
93
        self.stack = []
1✔
94
        self.delimiter = delimiter
1✔
95
        if root_name:
1✔
96
            self.push(root_name)
1✔
97

98
    def __str__(self):
1✔
99
        return self.get_path()
1✔
100
        # stack = (str(s) for s in self.stack if s is not _VOID_SEGMENT)
101
        # return self.delimiter + self.delimiter.join(stack)
102

103
    #: Provide nicer display for pprint(), etc.
104
    def __repr__(self):
1✔
105
        return f"PathStack<{self}>"
×
106

107
    def __enter__(self):
1✔
108
        return self
1✔
109

110
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
111
        self.pop()
1✔
112

113
    def enter(self, name, ignore=False):
1✔
114
        if ignore:
1✔
115
            self.push(_VOID_SEGMENT)
1✔
116
        else:
117
            self.push(name)
1✔
118
        return self
1✔
119

120
    def level(self):
1✔
121
        return len(self.stack)
×
122

123
    def push(self, name):
1✔
124
        self.stack.append(name)
1✔
125

126
    def pop(self):
1✔
127
        return self.stack.pop()
1✔
128

129
    def get_path(self, skip_segs=0, last_seg=None):
1✔
130
        stack = [str(s) for s in self.stack if s is not _VOID_SEGMENT]
1✔
131
        if skip_segs > 0:
1✔
132
            stack = stack[skip_segs:]
1✔
133
        if last_seg is not None:
1✔
134
            stack = stack[:-1]
1✔
135
            stack.append(last_seg)
1✔
136
        return self.delimiter + self.delimiter.join(stack)
1✔
137

138

139
def timetag(seconds=True, ms=False):
1✔
140
    """Return a time stamp string that can be used as (part of a) filename (also sorts well)."""
141
    now = datetime.now()
×
142
    if ms or seconds:
×
143
        s = now.strftime("%Y%m%d_%H%M%S")
×
144
        if ms:
×
145
            s = "{}_{}".format(s, now.microsecond)
×
146
    else:
147
        s = now.strftime("%Y%m%d_%H%M")
×
148
    return s
×
149

150

151
def check_cli_verbose(default=3):
1✔
152
    """Check for presence of `--verbose`/`--quiet` or `-v`/`-q` without using argparse."""
153
    args = sys.argv[1:]
×
154
    verbose = default + args.count("--verbose") - args.count("--quiet")
×
155

156
    for arg in args:
×
157
        if arg.startswith("-") and not arg.startswith("--"):
×
158
            verbose += arg[1:].count("v")
×
159
            verbose -= arg[1:].count("q")
×
160
    return verbose
×
161

162

163
def init_logging(verbose=3, path=None):
1✔
164
    """CLI calls this."""
165
    if verbose < 1:
×
166
        level = logging.CRITICAL
×
167
    elif verbose < 2:
×
168
        level = logging.ERROR
×
169
    elif verbose < 3:
×
170
        level = logging.WARNING
×
171
    elif verbose < 4:
×
172
        level = logging.INFO
×
173
    else:
174
        level = logging.DEBUG
×
175

176
    logging.basicConfig(
×
177
        level=level,
178
        # format="%(asctime)s.%(msecs)03d <%(thread)d> %(levelname)-7s %(message)s",
179
        format="%(asctime)-8s.%(msecs)-3d <%(thread)05d> %(levelname)-7s %(message)s",
180
        # format="%(asctime)s.%(msecs)03d <%(process)d.%(thread)d> %(levelname)-8s %(message)s",
181
        # fmt='%(asctime)s.%(msecs)03d',datefmt='%Y-%m-%d,%H:%M:%S')
182
        # format="%(asctime)s.%(msecs)d <%(process)d.%(thread)d> %(message)s",
183
        datefmt="%H:%M:%S",
184
    )
185
    # Make sure it is set (when running from tox, the above basicConfig() did not set it)
186
    logger.setLevel(level)
×
187

188
    if path:
×
189
        if os.path.isdir(path):
×
190
            fname = "stressor_{}.log".format(timetag())
×
191
            path = os.path.join(path, fname)
×
192
        logger.info("Writing log to '{}'".format(path))
×
193
        if os.path.isfile(path):
×
194
            logger.warning("Removing log file '{}'".format(path))
×
195
            os.remove(path)
×
196
        hdlr = logging.FileHandler(path)
×
197
        formatter = logging.Formatter(
×
198
            "%(asctime)s.%(msecs)-3d - %(levelname)s: %(message)s", "%H:%M:%S"
199
        )
200
        hdlr.setFormatter(formatter)
×
201
        logger.addHandler(hdlr)
×
202
        # logger.setLevel(logging.DEBUG)
203
        logger.info("Start log ({})".format(datetime.now()))
×
204
        logger.info(version_info)
×
205
        logger.info("Running {}".format(" ".join(sys.argv)))
×
206

207
        # redirect `logger` to our special log file as well:
208
        logger.addHandler(hdlr)
×
209

210
    # Silence requests `InsecureRequestWarning` messages
211
    if verbose < 3:
×
212
        warnings.filterwarnings("ignore", message="Unverified HTTPS request")
×
213
    else:
214
        warnings.filterwarnings("once", message="Unverified HTTPS request")
×
215

216
    return logger
×
217

218

219
def set_console_ctrl_handler(
1✔
220
    ctrl_handler, do_ctrl_c=True, do_ctrl_break=False, do_ctrl_close=False
221
):
222
    """
223

224
    The do_ctrl_* functions could simply be sys.exit(1), which will ensure that
225
    atexit handlers get called.
226
    See https://bugs.python.org/issue35935
227

228
    Raises:
229
        ctypes.WinError
230
    Returns:
231
        False if not on Windows or could not register the handler.
232
    """
233
    if os.name != "nt":
1✔
234
        return False
1✔
235

236
    try:
×
237
        import ctypes
×
238
        from ctypes import wintypes
×
239

240
        kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
×
241
        logger.info("Loaded kernel32 DLL on Windows.")
×
242
    except Exception as e:
×
243
        logger.warning("Could not load kernel32 DLL on Windows: {}".format(e))
×
244
        return False
×
245

246
    CTRL_C_EVENT = 0
×
247
    CTRL_BREAK_EVENT = 1
×
248
    CTRL_CLOSE_EVENT = 2
×
249

250
    HANDLER_ROUTINE = ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.DWORD)
×
251
    kernel32.SetConsoleCtrlHandler.argtypes = (HANDLER_ROUTINE, wintypes.BOOL)
×
252

253
    @HANDLER_ROUTINE
×
254
    def handler(ctrl):
×
255
        if ctrl == CTRL_C_EVENT and do_ctrl_c:
×
256
            handled = ctrl_handler(ctrl)
×
257
        elif ctrl == CTRL_BREAK_EVENT and do_ctrl_break:
×
258
            handled = ctrl_handler(ctrl)
×
259
        elif ctrl == CTRL_CLOSE_EVENT and do_ctrl_close:
×
260
            handled = ctrl_handler(ctrl)
×
261
        else:
262
            handled = False
×
263
        # If not handled, call the next handler.
264
        return handled
×
265

266
    if not kernel32.SetConsoleCtrlHandler(handler, True):
×
267
        raise ctypes.WinError(ctypes.get_last_error())
×
268
    return True
×
269

270

271
def assert_always(condition, msg=None):
1✔
272
    """`assert` even in production code."""
273
    try:
1✔
274
        if not condition:
1✔
275
            raise AssertionError(msg) if msg is not None else AssertionError
1✔
276
    except AssertionError as e:
1✔
277
        if sys.version_info < (3, 7):
1✔
278
            raise
×
279
        # Strip last frames, so the exception's stacktrace points to the call
280
        # Credits: https://stackoverflow.com/a/58821552/19166
281
        _exc_type, _exc_value, traceback = sys.exc_info()
1✔
282
        back_frame = traceback.tb_frame.f_back
1✔
283

284
        back_tb = types.TracebackType(
1✔
285
            tb_next=None,
286
            tb_frame=back_frame,
287
            tb_lasti=back_frame.f_lasti,
288
            tb_lineno=back_frame.f_lineno,
289
        )
290
        raise e.with_traceback(back_tb)
1✔
291

292

293
def _check_arg(argument, types, condition, accept_none):
1✔
294
    if __debug__:
295
        err_msg = "`allowed_types` must be a type or class (or a tuple thereof): got instance of {}"
1✔
296
        if isinstance(types, tuple):
1✔
297
            for t in types:
1✔
298
                assert isinstance(t, type), err_msg.format(type(t))
1✔
299
        else:
300
            assert isinstance(types, type), err_msg.format(type(types))
1✔
301

302
    if accept_none:
1✔
303
        if argument is None:
1✔
304
            return
1✔
305
        extra_msg = "`None` or "
1✔
306
    else:
307
        extra_msg = ""
1✔
308

309
    if not isinstance(argument, types):
1✔
310
        raise TypeError(
1✔
311
            "Expected {}{}, but got {}".format(extra_msg, types, type(argument))
312
        )
313
    if condition is not NO_DEFAULT and not bool(condition):
1✔
314
        raise ValueError(
1✔
315
            "Invalid argument value: {} {}".format(type(argument), argument)
316
        )
317

318

319
def check_arg(argument, allowed_types, condition=NO_DEFAULT, or_none=False):
1✔
320
    """Check if `argument` has the expected type and value.
321

322
    **Note:** the exception's traceback is manipulated, so that the back frame
323
    points to the ``check_arg()`` line, instead of the actual ``raise``.
324

325
    Args:
326
        argument (any): value of the argument to check
327
        allowed_types (type or tuple of types)
328
        condition (bool, optional): additional condition that must be true
329
        or_none (bool, optional): defaults to false
330
    Returns:
331
        None
332
    Raises:
333
        TypeError: if `argument` is of an unexpected type
334
        ValueError: if `argument` does not fulfill the optional condition
335
        AssertionError:
336
            if `check_arg` was called with a wrong syntax, i.e. `allowed_types`
337
            does not contain types, e.g. if `1` was passed instead of `int`.
338

339
    Examples::
340

341
        def foo(name, amount, options=None):
342
            check_arg(name, str)
343
            check_arg(amount, (int, float), amount > 0)
344
            check_arg(options, dict, or_none=True)
345
    """
346
    try:
1✔
347
        _check_arg(argument, allowed_types, condition, accept_none=or_none)
1✔
348
    except (TypeError, ValueError) as e:
1✔
349
        if sys.version_info < (3, 7):
1✔
350
            raise
×
351
        # Strip last frames, so the exception's stacktrace points to the call
352
        _exc_type, _exc_value, traceback = sys.exc_info()
1✔
353
        back_frame = traceback.tb_frame.f_back
1✔
354

355
        back_tb = types.TracebackType(
1✔
356
            tb_next=None,
357
            tb_frame=back_frame,
358
            tb_lasti=back_frame.f_lasti,
359
            tb_lineno=back_frame.f_lineno,
360
        )
361
        raise e.with_traceback(back_tb)
1✔
362

363

364
def get_dict_attr(d, key_path, default=NO_DEFAULT):
1✔
365
    """Return the value of a nested dict using dot-notation path.
366

367
    Args:
368
        d (dict):
369
        key_path (str):
370
    Raises:
371
        KeyError:
372
        ValueError:
373
        IndexError:
374

375
    Examples::
376

377
        ...
378

379
    Todo:
380
        * k[1] instead of k.[1]
381
        * default arg
382
    """
383
    if default is not NO_DEFAULT:
1✔
384
        try:
1✔
385
            return get_dict_attr(d, key_path)
1✔
386
        except (AttributeError, KeyError, ValueError, IndexError):
1✔
387
            return default
1✔
388

389
    check_arg(d, dict)
1✔
390

391
    seg_list = key_path.split(".")
1✔
392
    value = d[seg_list[0]]
1✔
393
    for seg in seg_list[1:]:
1✔
394
        if isinstance(value, dict):
1✔
395
            value = value[seg]
1✔
396
        elif isinstance(value, (list, tuple)):
1✔
397
            if not seg.startswith("[") or not seg.endswith("]"):
1✔
398
                raise ValueError("Use `[INT]` syntax to address list items")
1✔
399
            seg = seg[1:-1]
1✔
400
            value = value[int(seg)]
1✔
401
        else:
402
            # raise ValueError("Segment '{}' cannot be nested".format(seg))
403
            try:
1✔
404
                value = getattr(value, seg)
1✔
405
            except AttributeError:
1✔
406
                raise  # ValueError("Segment '{}' cannot be nested".format(seg))
1✔
407

408
    return value
1✔
409

410

411
def coerce_str(s):
1✔
412
    """Return `s` converted to float, int, or str."""
413
    check_arg(s, str, or_none=True)
1✔
414
    if s is None:
1✔
415
        return None
1✔
416
    for t in (int, float, str):  # order matters!
1✔
417
        try:
1✔
418
            return t(s)
1✔
419
        except ValueError:
1✔
420
            if t is str:
1✔
421
                raise
×
422

423

424
def parse_option_args(opt_list, coerce_values=True):
1✔
425
    """Evaluate `--option` args.
426

427
    Args:
428
        opt_list (list):
429
            List of option definitions of the form "OPT_NAME:OPT_VALUE"
430
        coerce_values (bool=true):
431
    Returns:
432
        Dict of {opt_name: opt_value, ...}
433
    """
434
    check_arg(opt_list, list, or_none=True)
1✔
435
    res = {}
1✔
436
    if not opt_list:
1✔
437
        return res
1✔
438
    for opt in opt_list:
1✔
439
        # print(opt_list, opt)
440
        if ":" not in opt:
1✔
441
            raise ValueError("Expected 'NAME:VALUE', got {!r}".format(opt))
×
442
        config_name, config_val = opt.split(":", 1)
1✔
443
        if coerce_values:
1✔
444
            res[config_name.strip()] = coerce_str(config_val)
1✔
445
        else:
446
            res[config_name.strip()] = config_val
×
447
    # print(opt_list, res)
448
    return res
1✔
449

450

451
def parse_args_from_str(arg_str, arg_defs):  # , context=None):
1✔
452
    """
453
    Args:
454
        args_str (str): argument string, optionally comma-separated
455
        arg_defs (tuple): list of argument definitions
456
        context (dict, optional):
457
            When passed, the arguments  are parsed for ``$(var_name)`` macros,
458
            to lookup values from that dict.
459
    Returns:
460
        (dict) keyword args
461
    Raises:
462
        TypeError: if `argument` is of an unexpected type
463
        ValueError: if `argument` does not fulfill the optional condition
464
        AssertionError:
465
            if `parse_args_from_str` was called with a wrong syntax, i.e.
466
            `arg_defs` is not well-formed.
467

468
    Examples::
469

470
        arg_defs = (
471
            ("name", str),
472
            ("amount", float),
473
            ("options", dict, {}),
474
        )
475
        def order(raw_arg_string):
476
            kwargs = parse_args_from_str(arg_defs)
477
            assert isisnstance(kwargs["name"], str)
478
            assert type(kwargs["amount"]) is float
479
            assert isisnstance(kwargs["options"], dict)
480
    """
481
    check_arg(arg_str, str)
1✔
482
    check_arg(arg_defs, (list, tuple))
1✔
483

484
    res = {}
1✔
485

486
    # Special case: '$name()' should not be interpreted as having one "" arg
487
    # if arg_defs defines a default for the first arg
488
    if arg_str.strip() == "" and len(arg_defs[0]) == 3:
1✔
489
        arg_str = str(arg_defs[0][2])
1✔
490

491
    arg_list = [a.strip() for a in arg_str.split(",")]
1✔
492
    optional_mode = False
1✔
493
    for arg_def in arg_defs:
1✔
494
        check_arg(arg_def, (list, tuple))
1✔
495
        if len(arg_def) == 2:
1✔
496
            arg_name, arg_type = arg_def
1✔
497
            arg_default = NO_DEFAULT
1✔
498
            if optional_mode:
1✔
499
                raise AssertionError(
×
500
                    "Mandatory arg definition must not follow optional args: `{}`".format(
501
                        arg_def
502
                    )
503
                )
504
        elif len(arg_def) == 3:
1✔
505
            arg_name, arg_type, arg_default = arg_def
1✔
506
            optional_mode = True
1✔
507
        else:
508
            raise AssertionError("Expected 2- or 3-tuple: {}".format(arg_def))
×
509

510
        if arg_type not in (float, int, str):
1✔
511
            raise AssertionError(
×
512
                "Unsupported argument definition type: {}".format(arg_def)
513
            )
514

515
        try:
1✔
516
            # Get next arg
517
            arg_val = arg_list.pop(0)
1✔
518
            # Allow quotes
519
            is_quoted = (arg_val.startswith('"') and arg_val.endswith('"')) or (
1✔
520
                arg_val.startswith("'") and arg_val.endswith("'")
521
            )
522
            if is_quoted:
1✔
523
                # Strip quotes and return as string (don't cast to other types)
524
                arg_val = arg_val[1:-1]
1✔
525
            elif "$(" in arg_val:
1✔
526
                # The arg seems to be a macro: don't try to cast.
527
                pass
1✔
528
            else:
529
                # Raises ValueError:
530
                arg_val = arg_type(arg_val)
1✔
531
        except IndexError:
1✔
532
            if arg_default is NO_DEFAULT:
1✔
533
                raise ValueError(
1✔
534
                    "Missing mandatory arg `{}` in '{}'.".format(arg_name, arg_str)
535
                )
536
            arg_val = arg_default
1✔
537

538
        res[arg_name] = arg_val
1✔
539

540
    if arg_list:
1✔
541
        raise ValueError("Extra args `{}`.".format(", ".join(arg_list)))
1✔
542

543
    return res
1✔
544

545

546
def is_yaml_keyword(s):
1✔
547
    """Return true if `s` is a JSON compatible key."""
548
    return bool(s and RE_YAML_KEYWORD.match(s))
1✔
549

550

551
# def is_abs_url(url):
552
#     """Return true if url is already absolute."""
553
#     return "://" in url or url.startswith("/")
554

555

556
def is_relative_url(url):
1✔
557
    """Return true if url is relative to the server."""
558
    return "://" not in url  # or url.startswith("/")
×
559

560

561
def resolve_url(root, url):
1✔
562
    """Convert relative URL to absolute, using `root` as default."""
563
    return urljoin(root, url)
1✔
564

565

566
def base_url(url):
1✔
567
    parsed_uri = urlparse(url)
×
568
    if parsed_uri.netloc:
×
569
        res = "{uri.scheme}://{uri.netloc}{uri.path}".format(uri=parsed_uri)
×
570
    else:
571
        res = "{uri.path}".format(uri=parsed_uri)
×
572
    return res
×
573

574

575
def shorten_string(long_string, max_chars, min_tail_chars=0, place_holder="[...]"):
1✔
576
    """Return string, shortened to max_chars characters.
577

578
    long_string = "This is a long string, that will be truncated."
579
    trunacated_string = truncate_string(long_string, max_chars=26, min_tail_chars=11, place_holder="[...]")
580
    print trunacated_string
581
    >> This is a [...] truncated.
582

583
    @param long_string: string to be truncated
584
    @param max_chars: max chars of returned string
585
    @param min_tail_chars: minimum of tailing chars
586
    @param place_holder: place holder for striped content
587
    @return: truncated string
588
    """
589

590
    assert max_chars > (min_tail_chars + len(place_holder))
1✔
591

592
    # Other types aren't supported.
593
    if not isinstance(long_string, str):
1✔
594
        return long_string
1✔
595

596
    # If string is shorter then max_chars, it can be returned, as is.
597
    elif len(long_string) <= max_chars:
1✔
598
        return long_string
1✔
599

600
    # Making sure we don't exceed max_chars in total.
601
    prefix_length = max_chars - min_tail_chars - len(place_holder)
1✔
602

603
    if min_tail_chars:
1✔
604
        long_string = (
1✔
605
            long_string[:prefix_length] + place_holder + long_string[-min_tail_chars:]
606
        )
607
    else:
608
        long_string = long_string[:prefix_length] + place_holder
1✔
609

610
    assert len(long_string) == max_chars
1✔
611

612
    return long_string
1✔
613

614

615
def get_random_number(num_or_tuple):
1✔
616
    check_arg(num_or_tuple, (int, float, list, tuple), or_none=True)
1✔
617

618
    if isinstance(num_or_tuple, (tuple, list)):
1✔
619
        v = random.uniform(*num_or_tuple)
1✔
620
    elif num_or_tuple:
1✔
621
        v = float(num_or_tuple)
1✔
622
    else:
623
        v = num_or_tuple
1✔
624
    return v
1✔
625

626

627
def datetime_to_iso(dt=None, microseconds=False):
1✔
628
    """Return current UTC datetime as ISO formatted string."""
629
    if dt is None:
1✔
630
        dt = datetime.now()
1✔
631
    if not microseconds:
1✔
632
        dt.replace(microsecond=0)
1✔
633
    return dt.isoformat(sep=" ")
1✔
634

635

636
def iso_to_datetime(iso):
1✔
637
    """Convert as ISO formatted datetime string to datetime."""
638
    # dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S.%fZ")
639
    dt = isoparse(iso)
1✔
640

641
    return dt
1✔
642

643

644
def iso_to_stamp(iso):
1✔
645
    """Convert as ISO formatted datetime string to datetime."""
646
    return iso_to_datetime(iso).timestamp()
1✔
647

648

649
def format_elap(seconds, count=None, unit="items", high_prec=False):
1✔
650
    """Return elapsed time as H:M:S.h string with reasonable precision."""
651
    days, seconds = divmod(seconds, 86400) if seconds else (0, 0)
1✔
652

653
    if seconds >= 3600:
1✔
654
        m, s = divmod(seconds, 60)
1✔
655
        h, m = divmod(m, 60)
1✔
656
        if high_prec:
1✔
657
            res = "{:d}:{:02d}:{:04.1f} hrs".format(int(h), int(m), s)
×
658
        else:
659
            res = "{:d}:{:02d}:{:02d} hrs".format(int(h), int(m), int(s))
1✔
660
    elif seconds >= 60:
1✔
661
        m, s = divmod(seconds, 60)
1✔
662
        if high_prec:
1✔
663
            res = "{:d}:{:05.2f} min".format(int(m), s)
1✔
664
        else:
665
            res = "{:d}:{:02d} min".format(int(m), int(s))
1✔
666
    else:
667
        if high_prec:
1✔
668
            if seconds > 0.01:
1✔
669
                res = "{:.3f} sec".format(seconds)
1✔
670
            else:
671
                res = "{:f} sec".format(seconds)
1✔
672
        elif seconds > 5:
1✔
673
            res = "{:.1f} sec".format(seconds)
1✔
674
        else:
675
            res = "{:.2f} sec".format(seconds)
1✔
676

677
    if days == 1:
1✔
678
        res = "{} day {}".format(int(days), res)
×
679
    elif days:
1✔
680
        res = "{} days {}".format(int(days), res)
×
681

682
    if count and (seconds > 0):
1✔
683
        res += ", {:,.1f} {}/sec".format(float(count) / seconds, unit)
1✔
684
    return res
1✔
685

686

687
def format_num(num):
1✔
688
    """Return num rounded to reasonable precision (promille)."""
689
    if num >= 1000.0:
1✔
690
        res = "{:,}".format(round(num))
1✔
691
    elif num >= 100.0:
1✔
692
        res = str(round(num, 1))
1✔
693
        # res = "{:,.1f}".format(num)
694
    elif num >= 10.0:
1✔
695
        res = str(round(num, 2))
1✔
696
        # res = "{:,.2f}".format(num)
697
    elif num >= 1.0:
1✔
698
        res = str(round(num, 3))
1✔
699
        # res = "{:,.3f}".format(num)
700
    else:
701
        res = str(num)
1✔
702
        # res = "{:,}".format(num)
703
    return res
1✔
704

705

706
def format_rate(count, time, unit=None, high_prec=False):
1✔
707
    """Return count / time with reasonable precision."""
708
    if not time or not count:
1✔
709
        return "0"
1✔
710

711
    rate = float(count) / float(time)
1✔
712
    if rate >= 1000:
1✔
713
        res = "{}".format(int(round(rate)))
1✔
714
    elif rate >= 100:
1✔
715
        res = "{}".format(round(rate, 1))
1✔
716
    elif rate >= 10:
1✔
717
        res = "{}".format(round(rate, 2))
1✔
718
    else:
719
        res = "{}".format(round(rate, 3))
1✔
720
    return res
1✔
721

722

723
# def format_relative_datetime(dt, as_html=False):
724
#     """Format a datetime object as relative expression (i.e. '3 minutes ago')."""
725
#     try:
726
#         diff = datetime.utcnow() - dt
727
#         res = None
728
#         s = diff.seconds
729

730
#         if diff.days > 7 or diff.days < 0:
731
#             res = dt.strftime("%d %b %y")
732
#         elif diff.days == 1:
733
#             res = "1 day ago"
734
#         elif diff.days > 1:
735
#             res = "{} days ago".format(diff.days)
736
#         elif s <= 1:
737
#             res = "just now"
738
#         elif s < 60:
739
#             res = "{} seconds ago".format(s)
740
#         elif s < 120:
741
#             res = "1 minute ago"
742
#         elif s < 3600:
743
#             res = "{} minutes ago".format(s/60)
744
#         elif s < 7200:
745
#             res = "1 hour ago"
746
#         else:
747
#             res = "{} hours ago".format(s/3600)
748

749
#         if as_html:
750
#             dts = dt.strftime("%Y-%m-%d %H:%M:%S UTC")
751
#             res = "<span title='{dt}'>{rel}</span>".format(rel=res, dt=dts)
752

753
#     except Exception:
754
#         log_exception("invalid dt: {}".format(dt))
755
#         res = "(invalid {})".format(dt)
756
#     return res
757

758

759
# def format_relative_stamp(stamp, as_html=False):
760
#     dt = stamp_to_datetime(stamp)
761
#     return format_relative_datetime(dt, as_html)
762

763

764
# def string_to_bool(arg_, default=None):
765
#     """Return boolean for various bool string representations.
766

767
#     Raises Error, if string is not compatible, and no default was passed.
768
#     """
769
#     if not default in (None, True, False):
770
#         raise TypeError("default must be None or boolean")
771
#     if arg_ is None:
772
#         if default in (True, False):
773
#             return default
774
#         else: #default == None
775
#             raise ValueError("Argument is %r and default %r." % (arg_, default))
776
#     else:
777
#         if arg_ in (True, False):
778
#             # `bool` is a subclass of `int`, so we get here for 0 and 1 as well!
779
#             return bool(arg_)
780
#         arg = arg_.lower().strip()
781
#         if arg in ("1", "true", "on", "yes"):
782
#             return True
783
#         elif arg in ("0", "false", "off", "no"):
784
#             return False
785
#         elif default is not None:
786
#             return default
787
#     raise ValueError("Argument string is not boolean: %r default: %r." % (arg_, default))
788

789

790
# def byteNumberString(number, thousandsSep=True, partition=False,
791
#                      base1024=True, appendBytes="iec", prec=0):
792
#     """Convert bytes into human-readable representation."""
793
#     magsuffix = ""
794
#     bytesuffix = ""
795
#     assert appendBytes in (False, True, "short", "iec")
796
#     if partition:
797
#         magnitude = 0
798
#         if base1024:
799
#             while number >= 1024:
800
#                 magnitude += 1
801
# #                 number = number >> 10
802
#                 number /= 1024.0
803
#         else:
804
#             while number >= 1000:
805
#                 magnitude += 1
806
#                 number /= 1000.0
807
#         # TODO:
808
#         # Windows 7 Explorer teilt durch 1024, und verwendet trotzdem KB statt KiB.
809
#         # Außerdem wird aufgerundet: --> 123 -> "1 KB"
810
#         # http://en.wikipedia.org/wiki/Kibi-#IEC_standard_prefixes
811
#         magsuffix = ["", "K", "M", "G", "T", "P"][magnitude]
812
#         if magsuffix:
813
#             magsuffix = " " + magsuffix
814

815
#     if appendBytes:
816
#         if appendBytes == "iec" and magsuffix:
817
#             bytesuffix = "iB" if base1024 else "B"
818
#         elif appendBytes == "short" and magsuffix:
819
#             bytesuffix = "B"
820
#         elif number == 1:
821
#             bytesuffix = " Byte"
822
#         else:
823
#             bytesuffix = " Bytes"
824

825
#     if thousandsSep and (number >= 1000 or magsuffix):
826
#         locale.setlocale(locale.LC_ALL, "")
827
#         # TODO: make precision configurable
828
#         if prec > 0:
829
#             fs = "%.{}f".format(prec)
830
#             snum = locale.format_string(fs, number, thousandsSep)
831
#         else:
832
#             snum = locale.format("%d", number, thousandsSep)
833
#         # Some countries like france use non-breaking-space (hex=a0) as group-
834
#         # seperator, that's not plain-ascii, so we have to replace the hex-byte
835
#         # "a0" with hex-byte "20" (space)
836
#         snum = hexlify(snum).replace("a0", "20").decode("hex")
837
#     else:
838
#         snum = str(number)
839

840
#     return "%s%s%s" % (snum, magsuffix, bytesuffix)
841

842

843
def lstrip_string(s, prefix, ignore_case=False):
1✔
844
    """Remove leading string from s.
845

846
    Note: This is different than `s.lstrip('bar')` which would remove
847
    all leading 'a', 'b', and 'r' chars.
848
    """
849
    if ignore_case:
1✔
850
        if s.lower().startswith(prefix.lower()):
1✔
851
            return s[len(prefix) :]
1✔
852
    else:
853
        if s.startswith(prefix):
×
854
            return s[len(prefix) :]
×
855
    return s
×
856

857

858
# def rstrip_string(s, suffix, ignore_case=False):
859
#     """Remove trailing string from s.
860

861
#     Note: This is different than `s.rstrip('bar')` which would remove
862
#     all trailing 'a', 'b', and 'r' chars.
863
#     """
864
#     if ignore_case:
865
#         if s.lower().endswith(suffix.lower()):
866
#             return s[:-len(suffix)]
867
#     else:
868
#         if s.endswith(suffix):
869
#             return s[:-len(suffix)]
870
#     return s
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc