• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 16871448709

11 Aug 2025 05:11AM UTC coverage: 72.527% (-0.03%) from 72.561%
16871448709

push

github

mborsetti
Version 3.31.1rc0

1748 of 2772 branches covered (63.06%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

4572 of 5942 relevant lines covered (76.94%)

5.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.93
/webchanges/util.py
1
"""A few utilities used elsewhere."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4
from __future__ import annotations
8✔
5

6
import getpass
8✔
7
import importlib.machinery
8✔
8
import importlib.util
8✔
9
import logging
8✔
10
import os
8✔
11
import re
8✔
12
import shlex
8✔
13
import stat
8✔
14
import subprocess
8✔
15
import sys
8✔
16
import textwrap
8✔
17
from math import floor, log10
8✔
18
from os import PathLike
8✔
19
from pathlib import Path
8✔
20
from types import ModuleType
8✔
21
from typing import TYPE_CHECKING, Callable, Iterable, Match
8✔
22

23
from markdown2 import Markdown
8✔
24

25
from webchanges import __project_name__, __version__
8✔
26

27
if TYPE_CHECKING:
28
    from webchanges.jobs import JobState
29

30
try:
8✔
31
    import httpx
8✔
32
except ImportError:  # pragma: no cover
33
    httpx = None  # type: ignore[assignment]
34
if httpx is not None:
8!
35
    try:
8✔
36
        import h2
8✔
37
    except ImportError:  # pragma: no cover
38
        h2 = None  # type: ignore[assignment]
39

40
try:
8✔
41
    from packaging.version import parse as parse_version
8✔
42
except ImportError:  # pragma: no cover
43
    from webchanges._vendored.packaging_version import parse as parse_version  # type: ignore[assignment]
44

45
logger = logging.getLogger(__name__)
8✔
46

47

48
def lazy_import(fullname: str) -> ModuleType | None:
8✔
49
    """Lazily imports a module. See https://stackoverflow.com/questions/42703908.
50

51
    To identify loading time, run $ python -X importtime webchanges --help
52
    """
53
    try:
×
54
        return sys.modules[fullname]
×
55
    except KeyError:
×
56
        spec = importlib.util.find_spec(fullname)
×
57
        if spec and spec.loader:
×
58
            module = importlib.util.module_from_spec(spec)
×
59
            loader = importlib.util.LazyLoader(spec.loader)
×
60
            # Make module with proper locking and get it inserted into sys.modules.
61
            loader.exec_module(module)
×
62
            return module
×
63
    return None
×
64

65

66
class TrackSubClasses(type):
8✔
67
    """A metaclass that stores subclass name-to-class mappings in the base class."""
68

69
    # __subclasses__ gets redefined from default "Callable[[_TT], list[_TT]]
70
    __subclasses__: dict[str, TrackSubClasses]  # type: ignore[assignment]
8✔
71
    __anonymous_subclasses__: list[TrackSubClasses]
8✔
72
    __required__: tuple[str, ...] = ()
8✔
73
    __optional__: tuple[str, ...] = ()
8✔
74
    __supported_directives__: dict[str, str] = {}
8✔
75
    __supported_subfilters__: dict[str, str] = {}
8✔
76

77
    __kind__: str
8✔
78

79
    job_states: list[JobState]
8✔
80

81
    def sorted_by_kind(cls: TrackSubClasses) -> list[TrackSubClasses]:
8✔
82
        """Generates a list of all members of a class sorted by the value of their __kind__ attribute. Useful for
83
        documentation.
84

85
        :param cls: The class.
86
        :return: The sorted list of class members.
87
        """
88
        return [item for _, item in sorted((it.__kind__, it) for it in cls.__subclasses__.values() if it.__kind__)]
8✔
89

90
    def __init__(cls, name: str, bases: tuple[type, ...], namespace: dict) -> None:
8✔
91
        for base in bases:
8✔
92
            if base is object:
8!
93
                continue
×
94

95
            for attr in {'__required__', '__optional__'}:
8✔
96
                if not hasattr(base, attr):
8!
97
                    continue
×
98

99
                inherited = getattr(base, attr, ())
8✔
100
                new_value = tuple(namespace.get(attr, ())) + tuple(inherited)
8✔
101
                namespace[attr] = new_value
8✔
102
                setattr(cls, attr, new_value)
8✔
103

104
        for base in bases:
8✔
105
            if base is object:
8!
106
                continue
×
107

108
            if hasattr(cls, '__kind__'):
8!
109
                subclasses = getattr(base, '__subclasses__', None)
8✔
110
                if subclasses is not None:
8!
111
                    logger.debug(f'Registering {cls} as {cls.__kind__}')
8✔
112
                    subclasses[cls.__kind__] = cls
8✔
113
                    break
8✔
114
            else:
115
                anonymous_subclasses = getattr(base, '__anonymous_subclasses__', None)
×
116
                if anonymous_subclasses is not None:
×
117
                    logger.debug(f'Registering {cls}')
×
118
                    anonymous_subclasses.append(cls)
×
119
                    break
×
120

121
        super().__init__(name, bases, namespace)
8✔
122

123

124
def edit_file(filename: str | bytes | PathLike) -> None:
8✔
125
    """Opens the editor to edit a file.
126

127
    :param filename: The filename.
128
    """
129
    editor = os.environ.get('EDITOR')
8✔
130
    if not editor:
8✔
131
        editor = os.environ.get('VISUAL')
8✔
132
    if not editor:
8!
133
        if sys.platform == 'win32':
×
134
            editor = 'notepad.exe'
×
135
        else:
136
            print('Please set the path to the editor in the environment variable $EDITOR, e.g. "export EDITOR=nano"')
×
137
            raise SystemExit(1)
×
138

139
    subprocess.run(  # noqa: S603 subprocess call - check for execution of untrusted input.
8✔
140
        [*shlex.split(editor), str(filename)], check=True
141
    )
142

143

144
def import_module_from_source(module_name: str, source_path: str | bytes | PathLike) -> ModuleType:
8✔
145
    """Loads a module and executes it in its own namespace.
146

147
    :param module_name: The name of the module to import.
148
    :param source_path: The path where the module is located.
149
    :return: A ModuleType object.
150
    """
151
    source_path = str(source_path)
8✔
152
    loader = importlib.machinery.SourceFileLoader(module_name, source_path)
8✔
153
    spec = importlib.util.spec_from_file_location(module_name, source_path, loader=loader)
8✔
154
    module = importlib.util.module_from_spec(spec)  # type: ignore[arg-type]
8✔
155
    sys.modules[module_name] = module
8✔
156
    loader.exec_module(module)
8✔
157
    # try:
158
    #     loader.exec_module(module)
159
    # except Exception:
160
    #     sys.tracebacklimit = 1000
161
    #     raise
162
    loader.exec_module(module)
8✔
163
    return module
8✔
164

165

166
def chunk_string(text: str, length: int, numbering: bool = False) -> list[str]:
8✔
167
    """Chunks a string.
168

169
    :param text: The text to be chunked.
170
    :param length: The length of the chunked text.
171
    :param numbering: Whether to number each chunk on the left if more than one chunk is generated.
172

173
    :returns: a list of chunked strings
174
    """
175
    if numbering and len(text) > length:
8✔
176
        try:
8✔
177
            text_length = length - 4 - 2
8✔
178
            digits_try = 1 if text_length <= 0 else floor(log10(len(text) / text_length))  # initialization floor
8✔
179
            digits_guess = digits_try + 1
8✔
180
            while digits_guess > digits_try:
8✔
181
                digits_try += 1
8✔
182
                text_length = length - 4 - 2 * digits_try
8✔
183
                if text_length <= 0:
8✔
184
                    raise ValueError('Not enough space to chunkify string with line numbering (1)')
8✔
185
                lines_guess = len(text) / text_length
8✔
186
                digits_guess = floor(log10(lines_guess)) + 1
8✔
187

188
            chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
8✔
189
            actual_digits = floor(log10(len(chunks))) + 1
8✔
190
            while actual_digits > digits_try:
8!
191
                digits_try += 1
×
192
                text_length = length - 4 - 2 * digits_try
×
193
                if text_length <= 0:
×
194
                    raise ValueError('Not enough space to chunkify string with line numbering (2)')
×
195
                chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
×
196
                actual_digits = floor(log10(len(chunks))) + 1
×
197

198
            length = len(chunks)
8✔
199
            return [line + ' (' + f'{{:{digits_try}d}}'.format(i + 1) + f'/{length})' for i, line in enumerate(chunks)]
8!
200

201
        except ValueError as e:
8✔
202
            logger.error(f'{e}')
8✔
203

204
    return textwrap.wrap(text, length, replace_whitespace=False)
8✔
205

206

207
def linkify(
8✔
208
    text: str,
209
    shorten: bool = False,
210
    extra_params: str | Callable[[str], str] = '',
211
    require_protocol: bool = False,
212
    permitted_protocols: tuple[str, ...] = (
213
        'http',
214
        'https',
215
        'mailto',
216
    ),
217
) -> str:
218
    """Converts plain text into HTML with links.
219

220
    For example linkify("Hello http://tornadoweb.org!") would return 'Hello
221
    <a href="http://tornadoweb.org">http://tornadoweb.org</a>!'.
222

223
    We are using a regex from tornado library https://github.com/tornadoweb/tornado/blob/master/tornado/escape.py.
224
    This regex should avoid character entities other than &amp; so that we won't pick up &quot;, etc., but it is
225
    vulnerable to Regular expression Denial of Service (ReDoS), which would divert computational resources to an
226
    expensive regex match. The risk in this application is limited.
227

228
    In the future, consider using linkify from the bleach project instead (requires importing another package).
229

230
    :parameter text: The text to linkify.
231
    :parameter shorten: Long urls will be shortened for display.
232
    :parameter extra_params: Extra text to include in the link tag, or a callable taking the link as an argument and
233
        returning the extra text, e.g. linkify(text, extra_params='rel="nofollow" class="external"').
234
    :parameter require_protocol: Only linkify urls which include a protocol; if this is False, urls such as
235
        www.facebook.com will also be linkified.
236
    :parameter permitted_protocols: Protocols which should be linkified, e.g. linkify(text,
237
        permitted_protocols=('http', 'ftp', 'mailto')); it is very unsafe to include protocols such as javascript.
238
    """
239
    # _url_re = re.compile(  # original re
240
    #     r'\b('
241
    #     r'(?:([\w-]+):(/{1,3})|www[.])'
242
    #     r'(?:('
243
    #     r'?:(?:[^\s&()]|&amp;|&quot;)*(?:[^!"#$%&'
244
    #     r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
245
    #     r")"
246
    #     r'|(?:\((?:[^\s&()]|&amp;|&quot;)*\))'
247
    #     r')+'
248
    #     r')'
249
    # )
250

251
    _url_re = re.compile(  # modified to catch all URL parameters
8✔
252
        r'\b('
253
        r'(?:([\w-]+):(/{1,3})|www[.])'
254
        r'(?:('
255
        r'?:(?:[^\s()])*(?:[^!"#$%&'
256
        r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
257
        r')'
258
        r'|(?:\((?:[^\s()])*\))'
259
        r')+'
260
        r')'
261
    )
262

263
    if extra_params and not callable(extra_params):
8✔
264
        extra_params = f' {extra_params.strip()}'
8✔
265

266
    def make_link(m: Match) -> str:
8✔
267
        """Replacement function for re.sub using re.match as input to convert plain text into HTML with links."""
268
        url: str = m.group(1)
8✔
269
        proto: str = m.group(2)
8✔
270
        if require_protocol and not proto:
8!
271
            return url  # not protocol, no linkify
×
272

273
        if proto and proto not in permitted_protocols:
8✔
274
            return url  # bad protocol, no linkify
8✔
275

276
        href: str = m.group(1)
8✔
277
        if not proto:
8✔
278
            proto = 'https'
8✔
279
            href = f'https://{href}'  # no proto specified, use https
8✔
280

281
        if callable(extra_params):
8!
282
            params = f' {extra_params(href).strip()}'
×
283
        else:
284
            params = extra_params
8✔
285

286
        # clip long urls. max_len is just an approximation
287
        max_len = 30
8✔
288
        if shorten and len(url) > max_len:
8✔
289
            before_clip = url
8✔
290
            if proto:
8!
291
                proto_len = len(proto) + 1 + len(m.group(3) or '')  # +1 for :
8✔
292
            else:
293
                proto_len = 0
×
294

295
            parts = url[proto_len:].split('/')
8✔
296
            if len(parts) > 1:
8!
297
                # Grab the whole host part plus the first bit of the path
298
                # The path is usually not that interesting once shortened
299
                # (no more slug, etc), so it really just provides a little
300
                # extra indication of shortening.
301
                url = url[:proto_len] + parts[0] + '/' + parts[1][:8].split('?')[0].split('.')[0]
8✔
302

303
            if len(url) > max_len * 1.5:  # still too long
8!
304
                url = url[:max_len]
×
305

306
            if url != before_clip:
8!
307
                amp = url.rfind('&')
8✔
308
                # avoid splitting html char entities
309
                if amp > max_len - 5:
8!
310
                    url = url[:amp]
×
311
                url += '...'
8✔
312

313
                if len(url) >= len(before_clip):
8!
314
                    url = before_clip
×
315
                else:
316
                    # full url is visible on mouse-over (for those who don't
317
                    # have a status bar, such as Safari by default)
318
                    params += f' title={href}'
8✔
319

320
        return f'<a href="{href}"{params}>{url}</a>'
8✔
321

322
    # text = html.escape(text)
323
    return _url_re.sub(make_link, text)
8✔
324

325

326
def get_new_version_number(timeout: float | None = None) -> str | bool:
8✔
327
    """Check PyPi for newer version of project.
328

329
    :parameter timeout: Timeout in seconds after which empty string is returned.
330
    :returns: The new version number if a newer version of project is found on PyPi, empty string otherwise, False if
331
      error retrieving the new version number is encountered.
332
    """
333
    if httpx is None:
8!
334
        logger.info('Cannot query PyPi for latest release: HTTPX not installed')
×
335
        return False
×
336

337
    try:
8✔
338
        r = httpx.Client(http2=h2 is not None, timeout=timeout).get(f'https://pypi.org/pypi/{__project_name__}/json')
8✔
339
    except httpx.RequestError as e:
×
340
        logger.info(f'Exception when querying PyPi for latest release: {e}')
×
341
        return False
×
342

343
    if r.is_success:
8!
344
        latest_release: str = r.json()['info']['version']
8✔
345
        if parse_version(latest_release) > parse_version(__version__):
8!
346
            return latest_release
×
347
    else:
348
        logger.info(f'HTTP error when querying PyPi for latest release: {r}')
×
349

350
    return ''
8✔
351

352

353
def dur_text(duration: float) -> str:
8✔
354
    """Returns a formatted string optimized to the number of seconds for use in footers.
355

356
    :parameter duration: The duration in seconds.
357
    :returns: The formatted string.
358
    """
359
    if duration < 60:
8!
360
        return f'{float(f"{duration:.2g}"):g} seconds'
8✔
361
    else:
UNCOV
362
        m, s = divmod(duration, 60)
×
UNCOV
363
        return f'{m:.0f}:{s:02.0f}'
×
364

365

366
def file_ownership_checks(filename: Path) -> list[str]:
8✔
367
    """Check security of file and its directory, i.e. that they belong to the current UID or root and only the owner
368
    can write to them. Return list of errors if any. Linux only.
369

370
    :returns: List of errors encountered (if any).
371
    """
372

373
    if sys.platform == 'win32':
8!
374
        return []
×
375

376
    file_ownership_errors = []
8✔
377
    current_uid = os.getuid()
8✔
378

379
    dirname = filename.parent
8✔
380
    dir_st = dirname.stat()
8✔
381
    if (dir_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
8✔
382
        file_ownership_errors.append(f'{dirname} is group/world-writable')
4✔
383
    if dir_st.st_uid not in {current_uid, 0}:
8!
384
        file_ownership_errors.append(f'{dirname} not owned by {getpass.getuser()} or root')
×
385

386
    file_st = filename.stat()
8✔
387
    if (file_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
8✔
388
        file_ownership_errors.append(f'{filename} is group/world-writable')
8✔
389
    if file_st.st_uid not in {current_uid, 0}:
8!
390
        file_ownership_errors.append(f'{filename} not owned by {getpass.getuser()} or root')
×
391

392
    return file_ownership_errors
8✔
393

394

395
def mark_to_html(text: str, markdown_padded_tables: bool | None = False, extras: Iterable[str] | None = None) -> str:
8✔
396
    """Converts a line of Markdown (e.g. as generated by html2text filter) to html.
397

398
    :param text: The text in Markdown format.
399
    :param markdown_padded_tables: If true, monospace the tables for alignment.
400
    :param extras: Additional extras for Markdown.
401
    :return: The text in html format.
402
    """
403
    markdowner_extras = set(extras) if extras else set()
8✔
404
    markdowner_extras.add('strike')  # text marked by double tildes is ~~strikethrough~~
8✔
405
    markdowner_extras.add('target-blank-links')  # <a> tags have rel="noopener" for added security
8✔
406
    markdowner = Markdown(extras=list(markdowner_extras))
8✔
407
    if text == '* * *':  # manually expand horizontal ruler since <hr> is used to separate jobs
8✔
408
        return '-' * 80
8✔
409
    pre = ''
8✔
410
    post = ''
8✔
411
    if text.lstrip()[:2] == '* ':  # item of unordered list
8✔
412
        lstripped = text.lstrip(' ')
8✔
413
        indent = len(text) - len(lstripped)
8✔
414
        pre += '&nbsp;' * indent
8✔
415
        pre += '● ' if indent == 2 else '⯀ ' if indent == 4 else '○ '
8✔
416
        text = text.split('* ', 1)[1]
8✔
417
    if text[:1] == ' ':
8✔
418
        # replace leading spaces with NBSP or converter will strip them all
419
        stripped = text.lstrip()
8✔
420
        text = '&nbsp;' * (len(text) - len(stripped)) + stripped
8✔
421
    text = text.replace('` ', '`&nbsp;')  # replace leading spaces within code blocks
8✔
422
    if markdown_padded_tables and '|' in text:
8✔
423
        # a padded row in a table; keep it monospaced for alignment
424
        pre += '<span style="font-family:monospace;white-space:pre-wrap">'
8✔
425
        post += '</span>'
8✔
426
    text = text.replace('[](', '[[_Link with no text_]](')  # Add link text where missing
8✔
427
    html_out = str(markdowner.convert(text)).rstrip('\n')  # convert markdown to html
8✔
428
    # fixes for Gmail
429
    html_out = html_out.replace('<a', '<a style="font-family:inherit"')  # fix <a> tag styling
8✔
430
    html_out = html_out.replace('<img', '<img style="max-width:100%;height:auto;max-height:100%"')
8✔
431
    html_out = html_out.replace('<code>', '<span style="font-family:monospace;white-space:pre-wrap">')
8✔
432
    html_out = html_out.replace('</code>', '</span>')
8✔
433
    if 'tables' in markdowner_extras:
8✔
434
        html_out = html_out.replace('<table>', '<table border="1" cellspacing="0">')
8✔
435
    # remove <p> tags wrapping
436
    html_out, sub = re.subn(r'^<p>|</p>$', '', html_out)  # remove paragraph tags
8✔
437
    if sub:
8✔
438
        return pre + html_out + post
8✔
439
    html_out = re.sub(r'<(/?)h\d>', r'<\g<1>strong>', html_out)  # replace heading tags with <strong>
8✔
440
    return pre + html_out + post
8✔
441

442

443
def import_optional_dependency(name: str, extra: str = '') -> ModuleType:
8✔
444
    """
445
    Import an optional dependency.
446

447
    If a dependency is missing an ImportError with a nice message will be raised.
448

449
    :param name: The module name.
450
    :param extra: Additional text to include in the ImportError message.
451

452
    :returns maybe_module: The imported module, when found and the version is correct.
453
      None is returned when the package is not found.
454
    """
455
    try:
×
456
        module = importlib.import_module(name)
×
457
    except ImportError as err:
×
458
        msg = f'`Import {name}` failed. {extra} Use pip or conda to install the {name} package.'
×
459
        raise ImportError(msg) from err
×
460

461
    return module
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc