• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 21856489627

10 Feb 2026 07:57AM UTC coverage: 73.228% (-0.09%) from 73.318%
21856489627

push

github

mborsetti
Version 3.34.0rc0

1424 of 2298 branches covered (61.97%)

Branch coverage included in aggregate %.

4766 of 6155 relevant lines covered (77.43%)

11.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.02
/webchanges/util.py
1
"""A few utilities used elsewhere."""
2

3
# The code below is subject to the license contained in the LICENSE.md file, which is part of the source code.
4
from __future__ import annotations
15✔
5

6
import getpass
15✔
7
import importlib.machinery
15✔
8
import importlib.util
15✔
9
import logging
15✔
10
import os
15✔
11
import re
15✔
12
import shlex
15✔
13
import stat
15✔
14
import subprocess
15✔
15
import sys
15✔
16
import textwrap
15✔
17
from math import floor, log10
15✔
18
from os import PathLike
15✔
19
from typing import TYPE_CHECKING, Callable, Iterable, Match
15✔
20

21
from markdown2 import Markdown
15✔
22

23
from webchanges import __project_name__, __version__
15✔
24

25
if TYPE_CHECKING:
26
    from pathlib import Path
27
    from types import ModuleType
28

29
    from webchanges.jobs import JobState
30

31
try:
15✔
32
    import httpx
15✔
33
except ImportError:  # pragma: no cover
34
    httpx = None  # type: ignore[assignment]
35
if httpx is not None:
15!
36
    try:
15✔
37
        import h2
15✔
38
    except ImportError:  # pragma: no cover
39
        h2 = None  # type: ignore[assignment]
40

41
try:
15✔
42
    from packaging.version import parse as parse_version
15✔
43
except ImportError:  # pragma: no cover
44
    from webchanges._vendored.packaging_version import parse as parse_version
45

46
logger = logging.getLogger(__name__)
15✔
47

48

49
def lazy_import(fullname: str) -> ModuleType | None:
15✔
50
    """Lazily imports a module. See https://stackoverflow.com/questions/42703908.
51

52
    To identify loading time, run $ python -X importtime webchanges --help
53
    """
54
    try:
×
55
        return sys.modules[fullname]
×
56
    except KeyError:
×
57
        spec = importlib.util.find_spec(fullname)
×
58
        if spec and spec.loader:
×
59
            module = importlib.util.module_from_spec(spec)
×
60
            loader = importlib.util.LazyLoader(spec.loader)
×
61
            # Make module with proper locking and get it inserted into sys.modules.
62
            loader.exec_module(module)
×
63
            return module
×
64
    return None
×
65

66

67
class TrackSubClasses(type):
15✔
68
    """A metaclass that stores subclass name-to-class mappings in the base class."""
69

70
    # __subclasses__ gets redefined from default "Callable[[_TT], list[_TT]]
71
    __subclasses__: dict[str, TrackSubClasses]
15✔
72
    __anonymous_subclasses__: list[TrackSubClasses]
15✔
73
    __required__: tuple[str, ...] = ()
15✔
74
    __optional__: tuple[str, ...] = ()
15✔
75
    __supported_directives__: dict[str, str] = {}
15✔
76
    __supported_subfilters__: dict[str, str] = {}
15✔
77

78
    __kind__: str
15✔
79

80
    job_states: list[JobState]
15✔
81

82
    def sorted_by_kind(cls: TrackSubClasses) -> list[TrackSubClasses]:
15✔
83
        """Generates a list of all members of a class sorted by the value of their __kind__ attribute.
84

85
        Useful for documentation.
86

87
        :param cls: The class.
88
        :return: The sorted list of class members.
89
        """
90
        return [item for _, item in sorted((it.__kind__, it) for it in cls.__subclasses__.values() if it.__kind__)]
15✔
91

92
    def __init__(cls, name: str, bases: tuple[type, ...], namespace: dict) -> None:
15✔
93
        """_summary_. # TODO.
94

95
        :param name: _description_.  # TODO.
96
        :param bases: _description_.  # TODO.
97
        :param namespace: _description_.  # TODO.
98
        """
99
        for base in bases:
15✔
100
            if base is object:
15!
101
                continue
×
102

103
            for attr in ('__required__', '__optional__'):
15✔
104
                if not hasattr(base, attr):
15!
105
                    continue
×
106

107
                inherited = getattr(base, attr, ())
15✔
108
                new_value = tuple(namespace.get(attr, ())) + tuple(inherited)
15✔
109
                namespace[attr] = new_value
15✔
110
                setattr(cls, attr, new_value)
15✔
111

112
        for base in bases:
15✔
113
            if base is object:
15!
114
                continue
×
115

116
            if hasattr(cls, '__kind__'):
15!
117
                subclasses = getattr(base, '__subclasses__', None)
15✔
118
                if subclasses is not None:
15!
119
                    logger.debug(
15✔
120
                        f'Registering Class {cls.__module__}.{cls.__qualname__}'
121
                        + (f' as {cls.__kind__}' if cls.__kind__ else '')
122
                    )
123
                    subclasses[cls.__kind__] = cls
15✔
124
                    break
15✔
125
            else:
126
                anonymous_subclasses = getattr(base, '__anonymous_subclasses__', None)
×
127
                if anonymous_subclasses is not None:
×
128
                    logger.debug(f'Registering Class {cls.__module__}.{cls.__qualname__}')
×
129
                    anonymous_subclasses.append(cls)
×
130
                    break
×
131

132
        super().__init__(name, bases, namespace)
15✔
133

134

135
def edit_file(filename: str | bytes | PathLike) -> None:
15✔
136
    """Opens the editor to edit a file.
137

138
    :param filename: The filename.
139
    """
140
    editor = os.environ.get('EDITOR')
15✔
141
    if not editor:
15✔
142
        editor = os.environ.get('VISUAL')
15✔
143
    if not editor:
15!
144
        if sys.platform == 'win32':
×
145
            editor = 'notepad.exe'
×
146
        else:
147
            print('Please set the path to the editor in the environment variable $EDITOR, e.g. "export EDITOR=nano"')
×
148
            raise SystemExit(1)
×
149

150
    subprocess.run(  # noqa: S603 subprocess call - check for execution of untrusted input.
15✔
151
        [*shlex.split(editor), str(filename)], check=True
152
    )
153

154

155
def import_module_from_source(module_name: str, source_path: str | bytes | PathLike) -> ModuleType:
15✔
156
    """Loads a module and executes it in its own namespace.
157

158
    :param module_name: The name of the module to import.
159
    :param source_path: The path where the module is located.
160
    :return: A ModuleType object.
161
    """
162
    source_path = str(source_path)
15✔
163
    loader = importlib.machinery.SourceFileLoader(module_name, source_path)
15✔
164
    spec = importlib.util.spec_from_file_location(module_name, source_path, loader=loader)
15✔
165
    module = importlib.util.module_from_spec(spec)  # type: ignore[arg-type]
15✔
166
    sys.modules[module_name] = module
15✔
167
    loader.exec_module(module)
15✔
168
    # try:
169
    #     loader.exec_module(module)
170
    # except Exception:
171
    #     sys.tracebacklimit = 1000
172
    #     raise
173
    loader.exec_module(module)
15✔
174
    return module
15✔
175

176

177
def chunk_string(text: str, length: int, numbering: bool = False) -> list[str]:
15✔
178
    """Chunks a string.
179

180
    :param text: The text to be chunked.
181
    :param length: The length of the chunked text.
182
    :param numbering: Whether to number each chunk on the left if more than one chunk is generated.
183

184
    :returns: a list of chunked strings
185
    """
186
    if numbering and len(text) > length:
15✔
187
        try:
15✔
188
            text_length = length - 4 - 2
15✔
189
            digits_try = 1 if text_length <= 0 else floor(log10(len(text) / text_length))  # initialization floor
15✔
190
            digits_guess = digits_try + 1
15✔
191
            while digits_guess > digits_try:
15✔
192
                digits_try += 1
15✔
193
                text_length = length - 4 - 2 * digits_try
15✔
194
                if text_length <= 0:
15✔
195
                    raise ValueError('Not enough space to chunkify string with line numbering (1)')
15✔
196
                lines_guess = len(text) / text_length
15✔
197
                digits_guess = floor(log10(lines_guess)) + 1
15✔
198

199
            chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
15✔
200
            actual_digits = floor(log10(len(chunks))) + 1
15✔
201
            while actual_digits > digits_try:
15!
202
                digits_try += 1
×
203
                text_length = length - 4 - 2 * digits_try
×
204
                if text_length <= 0:
×
205
                    raise ValueError('Not enough space to chunkify string with line numbering (2)')
×
206
                chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
×
207
                actual_digits = floor(log10(len(chunks))) + 1
×
208

209
            length = len(chunks)
15✔
210
            return [line + ' (' + f'{{:{digits_try}d}}'.format(i + 1) + f'/{length})' for i, line in enumerate(chunks)]
15✔
211

212
        except ValueError as e:
15✔
213
            logger.error(f'{e}')
15✔
214

215
    return textwrap.wrap(text, length, replace_whitespace=False)
15✔
216

217

218
def linkify(
15✔
219
    text: str,
220
    shorten: bool = False,
221
    extra_params: str | Callable[[str], str] = '',
222
    require_protocol: bool = False,
223
    permitted_protocols: tuple[str, ...] = (
224
        'http',
225
        'https',
226
        'mailto',
227
    ),
228
) -> str:
229
    """Converts plain text into HTML with links.
230

231
    For example linkify("Hello http://tornadoweb.org!") would return 'Hello
232
    <a href="http://tornadoweb.org">http://tornadoweb.org</a>!'.
233

234
    We are using a regex from tornado library https://github.com/tornadoweb/tornado/blob/master/tornado/escape.py.
235
    This regex should avoid character entities other than &amp; so that we won't pick up &quot;, etc., but it is
236
    vulnerable to Regular expression Denial of Service (ReDoS), which would divert computational resources to an
237
    expensive regex match. The risk in this application is limited.
238

239
    In the future, consider using linkify from the bleach project instead (requires importing another package).
240

241
    :parameter text: The text to linkify.
242
    :parameter shorten: Long urls will be shortened for display.
243
    :parameter extra_params: Extra text to include in the link tag, or a callable taking the link as an argument and
244
        returning the extra text, e.g. linkify(text, extra_params='rel="nofollow" class="external"').
245
    :parameter require_protocol: Only linkify urls which include a protocol; if this is False, urls such as
246
        www.facebook.com will also be linkified.
247
    :parameter permitted_protocols: Protocols which should be linkified, e.g. linkify(text,
248
        permitted_protocols=('http', 'ftp', 'mailto')); it is very unsafe to include protocols such as javascript.
249
    """
250
    # _url_re = re.compile(  # original re
251
    #     r'\b('
252
    #     r'(?:([\w-]+):(/{1,3})|www[.])'
253
    #     r'(?:('
254
    #     r'?:(?:[^\s&()]|&amp;|&quot;)*(?:[^!"#$%&'
255
    #     r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
256
    #     r")"
257
    #     r'|(?:\((?:[^\s&()]|&amp;|&quot;)*\))'
258
    #     r')+'
259
    #     r')'
260
    # )
261

262
    _url_re = re.compile(  # modified to catch all URL parameters
15✔
263
        r'\b('
264
        r'(?:([\w-]+):(/{1,3})|www[.])'
265
        r'(?:('
266
        r'?:(?:[^\s()])*(?:[^!"#$%&'
267
        r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
268
        r')'
269
        r'|(?:\((?:[^\s()])*\))'
270
        r')+'
271
        r')'
272
    )
273

274
    if extra_params and not callable(extra_params):
15✔
275
        extra_params = f' {extra_params.strip()}'
15✔
276

277
    def make_link(m: Match) -> str:
15✔
278
        """Replacement function for re.sub using re.match as input to convert plain text into HTML with links."""
279
        url: str = m.group(1)
15✔
280
        proto: str = m.group(2)
15✔
281
        if require_protocol and not proto:
15!
282
            return url  # not protocol, no linkify
×
283

284
        if proto and proto not in permitted_protocols:
15✔
285
            return url  # bad protocol, no linkify
15✔
286

287
        href: str = m.group(1)
15✔
288
        if not proto:
15✔
289
            proto = 'https'
15✔
290
            href = f'https://{href}'  # no proto specified, use https
15✔
291

292
        params = f' {extra_params(href).strip()}' if callable(extra_params) else extra_params
15✔
293

294
        # clip long urls. max_len is just an approximation
295
        max_len = 30
15✔
296
        if shorten and len(url) > max_len:
15✔
297
            before_clip = url
15✔
298
            proto_len = len(proto) + 1 + len(m.group(3) or '') if proto else 0
15✔
299

300
            parts = url[proto_len:].split('/')
15✔
301
            if len(parts) > 1:
15!
302
                # Grab the whole host part plus the first bit of the path
303
                # The path is usually not that interesting once shortened
304
                # (no more slug, etc), so it really just provides a little
305
                # extra indication of shortening.
306
                url = url[:proto_len] + parts[0] + '/' + parts[1][:8].split('?')[0].split('.')[0]
15✔
307

308
            if len(url) > max_len * 1.5:  # still too long
15!
309
                url = url[:max_len]
×
310

311
            if url != before_clip:
15!
312
                amp = url.rfind('&')
15✔
313
                # avoid splitting html char entities
314
                if amp > max_len - 5:
15!
315
                    url = url[:amp]
×
316
                url += '...'
15✔
317

318
                if len(url) >= len(before_clip):
15!
319
                    url = before_clip
×
320
                else:
321
                    # full url is visible on mouse-over (for those who don't
322
                    # have a status bar, such as Safari by default)
323
                    params += f' title={href}'
15✔
324

325
        return f'<a href="{href}"{params}>{url}</a>'
15✔
326

327
    # text = html.escape(text)
328
    return _url_re.sub(make_link, text)
15✔
329

330

331
def get_new_version_number(timeout: float | None = None) -> str | bool:
15✔
332
    """Check PyPi for newer version of project.
333

334
    :parameter timeout: Timeout in seconds after which empty string is returned.
335
    :returns: The new version number if a newer version of project is found on PyPi, empty string otherwise, False if
336
      error retrieving the new version number is encountered.
337
    """
338
    if httpx is None:
15!
339
        logger.info('Cannot query PyPi for latest release: HTTPX not installed')
×
340
        return False
×
341

342
    try:
15✔
343
        with httpx.Client(http2=h2 is not None, timeout=timeout) as http_client:
15✔
344
            r = http_client.get(f'https://pypi.org/pypi/{__project_name__}/json')
15✔
345
    except httpx.RequestError as e:
×
346
        logger.info(f'Exception when querying PyPi for latest release: {e}')
×
347
        return False
×
348

349
    if r.is_success:
15!
350
        latest_release: str = r.json()['info']['version']
15✔
351
        if parse_version(latest_release) > parse_version(__version__):  # ty:ignore[unsupported-operator]
15!
352
            return latest_release
×
353
    else:
354
        logger.info(f'HTTP error when querying PyPi for latest release: {r}')
×
355

356
    return ''
15✔
357

358

359
def dur_text(duration: float) -> str:
15✔
360
    """Returns a formatted string optimized to the number of seconds for use in footers.
361

362
    :parameter duration: The duration in seconds.
363
    :returns: The formatted string.
364
    """
365
    if duration < 60:
15✔
366
        return f'{float(f"{duration:.2g}"):g} seconds'
15✔
367
    m, s = divmod(duration, 60)
5✔
368
    return f'{m:.0f}:{s:02.0f}'
5✔
369

370

371
def file_ownership_checks(filename: Path) -> list[str]:
15✔
372
    """Check security of file and its directory.
373

374
    Ensures that they belong to the current UID or root and only the owner can write to them. Return list of errors if
375
    any. Linux only.
376

377
    :returns: List of errors encountered (if any).
378
    """
379
    if sys.platform == 'win32':
15✔
380
        return []
5✔
381

382
    file_ownership_errors = []
10✔
383
    current_uid = os.getuid()
10✔
384

385
    dirname = filename.parent
10✔
386
    dir_st = dirname.stat()
10✔
387
    if (dir_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
10✔
388
        file_ownership_errors.append(f'{dirname} is group/world-writable')
5✔
389
    if dir_st.st_uid not in {current_uid, 0}:
10!
390
        file_ownership_errors.append(f'{dirname} not owned by {getpass.getuser()} or root')
×
391

392
    file_st = filename.stat()
10✔
393
    if (file_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
10✔
394
        file_ownership_errors.append(f'{filename} is group/world-writable')
10✔
395
    if file_st.st_uid not in {current_uid, 0}:
10!
396
        file_ownership_errors.append(f'{filename} not owned by {getpass.getuser()} or root')
×
397

398
    return file_ownership_errors
10✔
399

400

401
def mark_to_html(text: str, markdown_padded_tables: bool | None = False, extras: Iterable[str] | None = None) -> str:
15✔
402
    """Converts a line of Markdown (e.g. as generated by html2text filter) to html.
403

404
    :param text: The text in Markdown format.
405
    :param markdown_padded_tables: If true, monospace the tables for alignment.
406
    :param extras: Additional extras for Markdown.
407
    :return: The text in html format.
408
    """
409
    markdowner_extras = set(extras) if extras else set()
15✔
410
    markdowner_extras.add('strike')  # text marked by double tildes is ~~strikethrough~~
15✔
411
    markdowner_extras.add('target-blank-links')  # <a> tags have rel="noopener" for added security
15✔
412
    markdowner = Markdown(extras=list(markdowner_extras))
15✔
413
    if text == '* * *':  # manually expand horizontal ruler since <hr> is used to separate jobs
15✔
414
        return '-' * 80
15✔
415
    pre = ''
15✔
416
    post = ''
15✔
417
    if text.lstrip()[:2] == '* ':  # item of unordered list
15✔
418
        lstripped = text.lstrip(' ')
15✔
419
        indent = len(text) - len(lstripped)
15✔
420
        pre += '&nbsp;' * indent
15✔
421
        pre += '● ' if indent == 2 else '⯀ ' if indent == 4 else '○ '
15✔
422
        text = text.split('* ', 1)[1]
15✔
423
    if text[:1] == ' ':
15✔
424
        # replace leading spaces with NBSP or converter will strip them all
425
        stripped = text.lstrip()
15✔
426
        text = '&nbsp;' * (len(text) - len(stripped)) + stripped
15✔
427
    text = text.replace('` ', '`&nbsp;')  # replace leading spaces within code blocks
15✔
428
    if markdown_padded_tables and '|' in text:
15✔
429
        # a padded row in a table; keep it monospaced for alignment
430
        pre += '<span style="font-family:monospace;white-space:pre-wrap">'
15✔
431
        post += '</span>'
15✔
432
    text = text.replace('[](', '[[_Link with no text_]](')  # Add link text where missing
15✔
433
    html_out = str(markdowner.convert(text)).rstrip('\n')  # convert markdown to html
15✔
434
    # fixes for Gmail
435
    html_out = html_out.replace('<a', '<a style="font-family:inherit"')  # fix <a> tag styling
15✔
436
    html_out = html_out.replace('<img', '<img style="max-width:100%;height:auto;max-height:100%"')
15✔
437
    html_out = html_out.replace('<code>', '<span style="font-family:monospace;white-space:pre-wrap">')
15✔
438
    html_out = html_out.replace('</code>', '</span>')
15✔
439
    if 'tables' in markdowner_extras:
15✔
440
        html_out = html_out.replace('<table>', '<table border="1" cellspacing="0">')
15✔
441
    # remove <p> tags wrapping
442
    html_out, sub = re.subn(r'^<p>|</p>$', '', html_out)  # remove paragraph tags
15✔
443
    if sub:
15✔
444
        return pre + html_out + post
15✔
445
    html_out = re.sub(r'<(/?)h\d>', r'<\g<1>strong>', html_out)  # replace heading tags with <strong>
15✔
446
    return pre + html_out + post
15✔
447

448

449
def import_optional_dependency(name: str, extra: str = '') -> ModuleType:
15✔
450
    """Import an optional dependency.
451

452
    If a dependency is missing an ImportError with a nice message will be raised.
453

454
    :param name: The module name.
455
    :param extra: Additional text to include in the ImportError message.
456

457
    :returns maybe_module: The imported module, when found and the version is correct.
458
      None is returned when the package is not found.
459
    """
460
    try:
×
461
        module = importlib.import_module(name)
×
462
    except ImportError as err:
×
463
        msg = f'`Import {name}` failed. {extra} Use pip or conda to install the {name} package.'
×
464
        raise ImportError(msg) from err
×
465

466
    return module
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc