• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 17710149774

14 Sep 2025 10:49AM UTC coverage: 71.376% (-3.1%) from 74.434%
17710149774

push

github

mborsetti
Version 3.31.1.post2

1383 of 2314 branches covered (59.77%)

Branch coverage included in aggregate %.

4614 of 6088 relevant lines covered (75.79%)

5.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.92
/webchanges/util.py
1
"""A few utilities used elsewhere."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4
from __future__ import annotations
8✔
5

6
import getpass
8✔
7
import importlib.machinery
8✔
8
import importlib.util
8✔
9
import logging
8✔
10
import os
8✔
11
import re
8✔
12
import shlex
8✔
13
import stat
8✔
14
import subprocess
8✔
15
import sys
8✔
16
import textwrap
8✔
17
from math import floor, log10
8✔
18
from os import PathLike
8✔
19
from pathlib import Path
8✔
20
from types import ModuleType
8✔
21
from typing import TYPE_CHECKING, Callable, Iterable, Match
8✔
22

23
from markdown2 import Markdown
8✔
24

25
from webchanges import __project_name__, __version__
8✔
26

27
if TYPE_CHECKING:
28
    from webchanges.jobs import JobState
29

30
try:
8✔
31
    import httpx
8✔
32
except ImportError:  # pragma: no cover
33
    httpx = None  # type: ignore[assignment]
34
if httpx is not None:
8!
35
    try:
8✔
36
        import h2
8✔
37
    except ImportError:  # pragma: no cover
38
        h2 = None  # type: ignore[assignment]
39

40
try:
8✔
41
    from packaging.version import parse as parse_version
8✔
42
except ImportError:  # pragma: no cover
43
    from webchanges._vendored.packaging_version import parse as parse_version  # type: ignore[assignment]
44

45
logger = logging.getLogger(__name__)
8✔
46

47

48
def lazy_import(fullname: str) -> ModuleType | None:
8✔
49
    """Lazily imports a module. See https://stackoverflow.com/questions/42703908.
50

51
    To identify loading time, run $ python -X importtime webchanges --help
52
    """
53
    try:
×
54
        return sys.modules[fullname]
×
55
    except KeyError:
×
56
        spec = importlib.util.find_spec(fullname)
×
57
        if spec and spec.loader:
×
58
            module = importlib.util.module_from_spec(spec)
×
59
            loader = importlib.util.LazyLoader(spec.loader)
×
60
            # Make module with proper locking and get it inserted into sys.modules.
61
            loader.exec_module(module)
×
62
            return module
×
63
    return None
×
64

65

66
class TrackSubClasses(type):
8✔
67
    """A metaclass that stores subclass name-to-class mappings in the base class."""
68

69
    # __subclasses__ gets redefined from default "Callable[[_TT], list[_TT]]
70
    __subclasses__: dict[str, TrackSubClasses]  # type: ignore[assignment]
8✔
71
    __anonymous_subclasses__: list[TrackSubClasses]
8✔
72
    __required__: tuple[str, ...] = ()
8✔
73
    __optional__: tuple[str, ...] = ()
8✔
74
    __supported_directives__: dict[str, str] = {}
8✔
75
    __supported_subfilters__: dict[str, str] = {}
8✔
76

77
    __kind__: str
8✔
78

79
    job_states: list[JobState]
8✔
80

81
    def sorted_by_kind(cls: TrackSubClasses) -> list[TrackSubClasses]:
8✔
82
        """Generates a list of all members of a class sorted by the value of their __kind__ attribute. Useful for
83
        documentation.
84

85
        :param cls: The class.
86
        :return: The sorted list of class members.
87
        """
88
        return [item for _, item in sorted((it.__kind__, it) for it in cls.__subclasses__.values() if it.__kind__)]
8✔
89

90
    def __init__(cls, name: str, bases: tuple[type, ...], namespace: dict) -> None:
8✔
91
        for base in bases:
8✔
92
            if base is object:
8!
93
                continue
×
94

95
            for attr in {'__required__', '__optional__'}:
8✔
96
                if not hasattr(base, attr):
8!
97
                    continue
×
98

99
                inherited = getattr(base, attr, ())
8✔
100
                new_value = tuple(namespace.get(attr, ())) + tuple(inherited)
8✔
101
                namespace[attr] = new_value
8✔
102
                setattr(cls, attr, new_value)
8✔
103

104
        for base in bases:
8✔
105
            if base is object:
8!
106
                continue
×
107

108
            if hasattr(cls, '__kind__'):
8!
109
                subclasses = getattr(base, '__subclasses__', None)
8✔
110
                if subclasses is not None:
8!
111
                    logger.debug(
8✔
112
                        f'Registering Class {cls.__module__}.{cls.__qualname__}'
113
                        + (f' as {cls.__kind__}' if cls.__kind__ else '')
114
                    )
115
                    subclasses[cls.__kind__] = cls
8✔
116
                    break
8✔
117
            else:
118
                anonymous_subclasses = getattr(base, '__anonymous_subclasses__', None)
×
119
                if anonymous_subclasses is not None:
×
120
                    logger.debug(f'Registering Class {cls.__module__}.{cls.__qualname__}')
×
121
                    anonymous_subclasses.append(cls)
×
122
                    break
×
123

124
        super().__init__(name, bases, namespace)
8✔
125

126

127
def edit_file(filename: str | bytes | PathLike) -> None:
8✔
128
    """Opens the editor to edit a file.
129

130
    :param filename: The filename.
131
    """
132
    editor = os.environ.get('EDITOR')
8✔
133
    if not editor:
8✔
134
        editor = os.environ.get('VISUAL')
8✔
135
    if not editor:
8!
136
        if sys.platform == 'win32':
×
137
            editor = 'notepad.exe'
×
138
        else:
139
            print('Please set the path to the editor in the environment variable $EDITOR, e.g. "export EDITOR=nano"')
×
140
            raise SystemExit(1)
×
141

142
    subprocess.run(  # noqa: S603 subprocess call - check for execution of untrusted input.
8✔
143
        [*shlex.split(editor), str(filename)], check=True
144
    )
145

146

147
def import_module_from_source(module_name: str, source_path: str | bytes | PathLike) -> ModuleType:
8✔
148
    """Loads a module and executes it in its own namespace.
149

150
    :param module_name: The name of the module to import.
151
    :param source_path: The path where the module is located.
152
    :return: A ModuleType object.
153
    """
154
    source_path = str(source_path)
8✔
155
    loader = importlib.machinery.SourceFileLoader(module_name, source_path)
8✔
156
    spec = importlib.util.spec_from_file_location(module_name, source_path, loader=loader)
8✔
157
    module = importlib.util.module_from_spec(spec)  # type: ignore[arg-type]
8✔
158
    sys.modules[module_name] = module
8✔
159
    loader.exec_module(module)
8✔
160
    # try:
161
    #     loader.exec_module(module)
162
    # except Exception:
163
    #     sys.tracebacklimit = 1000
164
    #     raise
165
    loader.exec_module(module)
8✔
166
    return module
8✔
167

168

169
def chunk_string(text: str, length: int, numbering: bool = False) -> list[str]:
8✔
170
    """Chunks a string.
171

172
    :param text: The text to be chunked.
173
    :param length: The length of the chunked text.
174
    :param numbering: Whether to number each chunk on the left if more than one chunk is generated.
175

176
    :returns: a list of chunked strings
177
    """
178
    if numbering and len(text) > length:
8✔
179
        try:
8✔
180
            text_length = length - 4 - 2
8✔
181
            digits_try = 1 if text_length <= 0 else floor(log10(len(text) / text_length))  # initialization floor
8✔
182
            digits_guess = digits_try + 1
8✔
183
            while digits_guess > digits_try:
8✔
184
                digits_try += 1
8✔
185
                text_length = length - 4 - 2 * digits_try
8✔
186
                if text_length <= 0:
8✔
187
                    raise ValueError('Not enough space to chunkify string with line numbering (1)')
8✔
188
                lines_guess = len(text) / text_length
8✔
189
                digits_guess = floor(log10(lines_guess)) + 1
8✔
190

191
            chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
8✔
192
            actual_digits = floor(log10(len(chunks))) + 1
8✔
193
            while actual_digits > digits_try:
8!
194
                digits_try += 1
×
195
                text_length = length - 4 - 2 * digits_try
×
196
                if text_length <= 0:
×
197
                    raise ValueError('Not enough space to chunkify string with line numbering (2)')
×
198
                chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
×
199
                actual_digits = floor(log10(len(chunks))) + 1
×
200

201
            length = len(chunks)
8✔
202
            return [line + ' (' + f'{{:{digits_try}d}}'.format(i + 1) + f'/{length})' for i, line in enumerate(chunks)]
8✔
203

204
        except ValueError as e:
8✔
205
            logger.error(f'{e}')
8✔
206

207
    return textwrap.wrap(text, length, replace_whitespace=False)
8✔
208

209

210
def linkify(
8✔
211
    text: str,
212
    shorten: bool = False,
213
    extra_params: str | Callable[[str], str] = '',
214
    require_protocol: bool = False,
215
    permitted_protocols: tuple[str, ...] = (
216
        'http',
217
        'https',
218
        'mailto',
219
    ),
220
) -> str:
221
    """Converts plain text into HTML with links.
222

223
    For example linkify("Hello http://tornadoweb.org!") would return 'Hello
224
    <a href="http://tornadoweb.org">http://tornadoweb.org</a>!'.
225

226
    We are using a regex from tornado library https://github.com/tornadoweb/tornado/blob/master/tornado/escape.py.
227
    This regex should avoid character entities other than &amp; so that we won't pick up &quot;, etc., but it is
228
    vulnerable to Regular expression Denial of Service (ReDoS), which would divert computational resources to an
229
    expensive regex match. The risk in this application is limited.
230

231
    In the future, consider using linkify from the bleach project instead (requires importing another package).
232

233
    :parameter text: The text to linkify.
234
    :parameter shorten: Long urls will be shortened for display.
235
    :parameter extra_params: Extra text to include in the link tag, or a callable taking the link as an argument and
236
        returning the extra text, e.g. linkify(text, extra_params='rel="nofollow" class="external"').
237
    :parameter require_protocol: Only linkify urls which include a protocol; if this is False, urls such as
238
        www.facebook.com will also be linkified.
239
    :parameter permitted_protocols: Protocols which should be linkified, e.g. linkify(text,
240
        permitted_protocols=('http', 'ftp', 'mailto')); it is very unsafe to include protocols such as javascript.
241
    """
242
    # _url_re = re.compile(  # original re
243
    #     r'\b('
244
    #     r'(?:([\w-]+):(/{1,3})|www[.])'
245
    #     r'(?:('
246
    #     r'?:(?:[^\s&()]|&amp;|&quot;)*(?:[^!"#$%&'
247
    #     r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
248
    #     r")"
249
    #     r'|(?:\((?:[^\s&()]|&amp;|&quot;)*\))'
250
    #     r')+'
251
    #     r')'
252
    # )
253

254
    _url_re = re.compile(  # modified to catch all URL parameters
8✔
255
        r'\b('
256
        r'(?:([\w-]+):(/{1,3})|www[.])'
257
        r'(?:('
258
        r'?:(?:[^\s()])*(?:[^!"#$%&'
259
        r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
260
        r')'
261
        r'|(?:\((?:[^\s()])*\))'
262
        r')+'
263
        r')'
264
    )
265

266
    if extra_params and not callable(extra_params):
8✔
267
        extra_params = f' {extra_params.strip()}'
8✔
268

269
    def make_link(m: Match) -> str:
8✔
270
        """Replacement function for re.sub using re.match as input to convert plain text into HTML with links."""
271
        url: str = m.group(1)
8✔
272
        proto: str = m.group(2)
8✔
273
        if require_protocol and not proto:
8!
274
            return url  # not protocol, no linkify
×
275

276
        if proto and proto not in permitted_protocols:
8✔
277
            return url  # bad protocol, no linkify
8✔
278

279
        href: str = m.group(1)
8✔
280
        if not proto:
8✔
281
            proto = 'https'
8✔
282
            href = f'https://{href}'  # no proto specified, use https
8✔
283

284
        if callable(extra_params):
8!
285
            params = f' {extra_params(href).strip()}'
×
286
        else:
287
            params = extra_params
8✔
288

289
        # clip long urls. max_len is just an approximation
290
        max_len = 30
8✔
291
        if shorten and len(url) > max_len:
8✔
292
            before_clip = url
8✔
293
            if proto:
8!
294
                proto_len = len(proto) + 1 + len(m.group(3) or '')  # +1 for :
8✔
295
            else:
296
                proto_len = 0
×
297

298
            parts = url[proto_len:].split('/')
8✔
299
            if len(parts) > 1:
8!
300
                # Grab the whole host part plus the first bit of the path
301
                # The path is usually not that interesting once shortened
302
                # (no more slug, etc), so it really just provides a little
303
                # extra indication of shortening.
304
                url = url[:proto_len] + parts[0] + '/' + parts[1][:8].split('?')[0].split('.')[0]
8✔
305

306
            if len(url) > max_len * 1.5:  # still too long
8!
307
                url = url[:max_len]
×
308

309
            if url != before_clip:
8!
310
                amp = url.rfind('&')
8✔
311
                # avoid splitting html char entities
312
                if amp > max_len - 5:
8!
313
                    url = url[:amp]
×
314
                url += '...'
8✔
315

316
                if len(url) >= len(before_clip):
8!
317
                    url = before_clip
×
318
                else:
319
                    # full url is visible on mouse-over (for those who don't
320
                    # have a status bar, such as Safari by default)
321
                    params += f' title={href}'
8✔
322

323
        return f'<a href="{href}"{params}>{url}</a>'
8✔
324

325
    # text = html.escape(text)
326
    return _url_re.sub(make_link, text)
8✔
327

328

329
def get_new_version_number(timeout: float | None = None) -> str | bool:
8✔
330
    """Check PyPi for newer version of project.
331

332
    :parameter timeout: Timeout in seconds after which empty string is returned.
333
    :returns: The new version number if a newer version of project is found on PyPi, empty string otherwise, False if
334
      error retrieving the new version number is encountered.
335
    """
336
    if httpx is None:
8!
337
        logger.info('Cannot query PyPi for latest release: HTTPX not installed')
×
338
        return False
×
339

340
    try:
8✔
341
        r = httpx.Client(http2=h2 is not None, timeout=timeout).get(f'https://pypi.org/pypi/{__project_name__}/json')
8✔
342
    except httpx.RequestError as e:
×
343
        logger.info(f'Exception when querying PyPi for latest release: {e}')
×
344
        return False
×
345

346
    if r.is_success:
8!
347
        latest_release: str = r.json()['info']['version']
8✔
348
        if parse_version(latest_release) > parse_version(__version__):
8!
349
            return latest_release
×
350
    else:
351
        logger.info(f'HTTP error when querying PyPi for latest release: {r}')
×
352

353
    return ''
8✔
354

355

356
def dur_text(duration: float) -> str:
8✔
357
    """Returns a formatted string optimized to the number of seconds for use in footers.
358

359
    :parameter duration: The duration in seconds.
360
    :returns: The formatted string.
361
    """
362
    if duration < 60:
8!
363
        return f'{float(f"{duration:.2g}"):g} seconds'
8✔
364
    else:
365
        m, s = divmod(duration, 60)
×
366
        return f'{m:.0f}:{s:02.0f}'
×
367

368

369
def file_ownership_checks(filename: Path) -> list[str]:
8✔
370
    """Check security of file and its directory, i.e. that they belong to the current UID or root and only the owner
371
    can write to them. Return list of errors if any. Linux only.
372

373
    :returns: List of errors encountered (if any).
374
    """
375

376
    if sys.platform == 'win32':
8!
377
        return []
×
378

379
    file_ownership_errors = []
8✔
380
    current_uid = os.getuid()
8✔
381

382
    dirname = filename.parent
8✔
383
    dir_st = dirname.stat()
8✔
384
    if (dir_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
8✔
385
        file_ownership_errors.append(f'{dirname} is group/world-writable')
4✔
386
    if dir_st.st_uid not in {current_uid, 0}:
8!
387
        file_ownership_errors.append(f'{dirname} not owned by {getpass.getuser()} or root')
×
388

389
    file_st = filename.stat()
8✔
390
    if (file_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
8✔
391
        file_ownership_errors.append(f'{filename} is group/world-writable')
8✔
392
    if file_st.st_uid not in {current_uid, 0}:
8!
393
        file_ownership_errors.append(f'{filename} not owned by {getpass.getuser()} or root')
×
394

395
    return file_ownership_errors
8✔
396

397

398
def mark_to_html(text: str, markdown_padded_tables: bool | None = False, extras: Iterable[str] | None = None) -> str:
8✔
399
    """Converts a line of Markdown (e.g. as generated by html2text filter) to html.
400

401
    :param text: The text in Markdown format.
402
    :param markdown_padded_tables: If true, monospace the tables for alignment.
403
    :param extras: Additional extras for Markdown.
404
    :return: The text in html format.
405
    """
406
    markdowner_extras = set(extras) if extras else set()
8✔
407
    markdowner_extras.add('strike')  # text marked by double tildes is ~~strikethrough~~
8✔
408
    markdowner_extras.add('target-blank-links')  # <a> tags have rel="noopener" for added security
8✔
409
    markdowner = Markdown(extras=list(markdowner_extras))
8✔
410
    if text == '* * *':  # manually expand horizontal ruler since <hr> is used to separate jobs
8✔
411
        return '-' * 80
8✔
412
    pre = ''
8✔
413
    post = ''
8✔
414
    if text.lstrip()[:2] == '* ':  # item of unordered list
8✔
415
        lstripped = text.lstrip(' ')
8✔
416
        indent = len(text) - len(lstripped)
8✔
417
        pre += '&nbsp;' * indent
8✔
418
        pre += '● ' if indent == 2 else '⯀ ' if indent == 4 else '○ '
8✔
419
        text = text.split('* ', 1)[1]
8✔
420
    if text[:1] == ' ':
8✔
421
        # replace leading spaces with NBSP or converter will strip them all
422
        stripped = text.lstrip()
8✔
423
        text = '&nbsp;' * (len(text) - len(stripped)) + stripped
8✔
424
    text = text.replace('` ', '`&nbsp;')  # replace leading spaces within code blocks
8✔
425
    if markdown_padded_tables and '|' in text:
8✔
426
        # a padded row in a table; keep it monospaced for alignment
427
        pre += '<span style="font-family:monospace;white-space:pre-wrap">'
8✔
428
        post += '</span>'
8✔
429
    text = text.replace('[](', '[[_Link with no text_]](')  # Add link text where missing
8✔
430
    html_out = str(markdowner.convert(text)).rstrip('\n')  # convert markdown to html
8✔
431
    # fixes for Gmail
432
    html_out = html_out.replace('<a', '<a style="font-family:inherit"')  # fix <a> tag styling
8✔
433
    html_out = html_out.replace('<img', '<img style="max-width:100%;height:auto;max-height:100%"')
8✔
434
    html_out = html_out.replace('<code>', '<span style="font-family:monospace;white-space:pre-wrap">')
8✔
435
    html_out = html_out.replace('</code>', '</span>')
8✔
436
    if 'tables' in markdowner_extras:
8✔
437
        html_out = html_out.replace('<table>', '<table border="1" cellspacing="0">')
8✔
438
    # remove <p> tags wrapping
439
    html_out, sub = re.subn(r'^<p>|</p>$', '', html_out)  # remove paragraph tags
8✔
440
    if sub:
8✔
441
        return pre + html_out + post
8✔
442
    html_out = re.sub(r'<(/?)h\d>', r'<\g<1>strong>', html_out)  # replace heading tags with <strong>
8✔
443
    return pre + html_out + post
8✔
444

445

446
def import_optional_dependency(name: str, extra: str = '') -> ModuleType:
8✔
447
    """
448
    Import an optional dependency.
449

450
    If a dependency is missing an ImportError with a nice message will be raised.
451

452
    :param name: The module name.
453
    :param extra: Additional text to include in the ImportError message.
454

455
    :returns maybe_module: The imported module, when found and the version is correct.
456
      None is returned when the package is not found.
457
    """
458
    try:
×
459
        module = importlib.import_module(name)
×
460
    except ImportError as err:
×
461
        msg = f'`Import {name}` failed. {extra} Use pip or conda to install the {name} package.'
×
462
        raise ImportError(msg) from err
×
463

464
    return module
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc