• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 18733844090

23 Oct 2025 12:33AM UTC coverage: 73.65% (-0.04%) from 73.687%
18733844090

push

github

mborsetti
v3.31.4

1396 of 2236 branches covered (62.43%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

4675 of 6007 relevant lines covered (77.83%)

7.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.39
/webchanges/util.py
1
"""A few utilities used elsewhere."""
2

3
# The code below is subject to the license contained in the LICENSE.md file, which is part of the source code.
4
from __future__ import annotations
10✔
5

6
import getpass
10✔
7
import importlib.machinery
10✔
8
import importlib.util
10✔
9
import logging
10✔
10
import os
10✔
11
import re
10✔
12
import shlex
10✔
13
import stat
10✔
14
import subprocess
10✔
15
import sys
10✔
16
import textwrap
10✔
17
from math import floor, log10
10✔
18
from os import PathLike
10✔
19
from typing import TYPE_CHECKING, Callable, Iterable, Match
10✔
20

21
from markdown2 import Markdown
10✔
22

23
from webchanges import __project_name__, __version__
10✔
24

25
if TYPE_CHECKING:
26
    from pathlib import Path
27
    from types import ModuleType
28

29
    from webchanges.jobs import JobState
30

31
try:
10✔
32
    import httpx
10✔
33
except ImportError:  # pragma: no cover
34
    httpx = None  # type: ignore[assignment]
35
if httpx is not None:
10!
36
    try:
10✔
37
        import h2
10✔
38
    except ImportError:  # pragma: no cover
39
        h2 = None  # type: ignore[assignment]
40

41
try:
10✔
42
    from packaging.version import parse as parse_version
10✔
43
except ImportError:  # pragma: no cover
44
    from webchanges._vendored.packaging_version import parse as parse_version  # type: ignore[assignment]
45

46
logger = logging.getLogger(__name__)
10✔
47

48

49
def lazy_import(fullname: str) -> ModuleType | None:
10✔
50
    """Lazily imports a module. See https://stackoverflow.com/questions/42703908.
51

52
    To identify loading time, run $ python -X importtime webchanges --help
53
    """
54
    try:
×
55
        return sys.modules[fullname]
×
56
    except KeyError:
×
57
        spec = importlib.util.find_spec(fullname)
×
58
        if spec and spec.loader:
×
59
            module = importlib.util.module_from_spec(spec)
×
60
            loader = importlib.util.LazyLoader(spec.loader)
×
61
            # Make module with proper locking and get it inserted into sys.modules.
62
            loader.exec_module(module)
×
63
            return module
×
64
    return None
×
65

66

67
class TrackSubClasses(type):
10✔
68
    """A metaclass that stores subclass name-to-class mappings in the base class."""
69

70
    # __subclasses__ gets redefined from default "Callable[[_TT], list[_TT]]
71
    __subclasses__: dict[str, TrackSubClasses]  # type: ignore[assignment]
10✔
72
    __anonymous_subclasses__: list[TrackSubClasses]
10✔
73
    __required__: tuple[str, ...] = ()
10✔
74
    __optional__: tuple[str, ...] = ()
10✔
75
    __supported_directives__: dict[str, str] = {}
10✔
76
    __supported_subfilters__: dict[str, str] = {}
10✔
77

78
    __kind__: str
10✔
79

80
    job_states: list[JobState]
10✔
81

82
    def sorted_by_kind(cls: TrackSubClasses) -> list[TrackSubClasses]:
10✔
83
        """Generates a list of all members of a class sorted by the value of their __kind__ attribute. Useful for
84
        documentation.
85

86
        :param cls: The class.
87
        :return: The sorted list of class members.
88
        """
89
        return [item for _, item in sorted((it.__kind__, it) for it in cls.__subclasses__.values() if it.__kind__)]
10✔
90

91
    def __init__(cls, name: str, bases: tuple[type, ...], namespace: dict) -> None:
10✔
92
        for base in bases:
10✔
93
            if base is object:
10!
94
                continue
×
95

96
            for attr in ('__required__', '__optional__'):
10✔
97
                if not hasattr(base, attr):
10!
98
                    continue
×
99

100
                inherited = getattr(base, attr, ())
10✔
101
                new_value = tuple(namespace.get(attr, ())) + tuple(inherited)
10✔
102
                namespace[attr] = new_value
10✔
103
                setattr(cls, attr, new_value)
10✔
104

105
        for base in bases:
10✔
106
            if base is object:
10!
107
                continue
×
108

109
            if hasattr(cls, '__kind__'):
10!
110
                subclasses = getattr(base, '__subclasses__', None)
10✔
111
                if subclasses is not None:
10!
112
                    logger.debug(
10✔
113
                        f'Registering Class {cls.__module__}.{cls.__qualname__}'
114
                        + (f' as {cls.__kind__}' if cls.__kind__ else '')
115
                    )
116
                    subclasses[cls.__kind__] = cls
10✔
117
                    break
10✔
118
            else:
119
                anonymous_subclasses = getattr(base, '__anonymous_subclasses__', None)
×
120
                if anonymous_subclasses is not None:
×
121
                    logger.debug(f'Registering Class {cls.__module__}.{cls.__qualname__}')
×
122
                    anonymous_subclasses.append(cls)
×
123
                    break
×
124

125
        super().__init__(name, bases, namespace)
10✔
126

127

128
def edit_file(filename: str | bytes | PathLike) -> None:
10✔
129
    """Opens the editor to edit a file.
130

131
    :param filename: The filename.
132
    """
133
    editor = os.environ.get('EDITOR')
10✔
134
    if not editor:
10✔
135
        editor = os.environ.get('VISUAL')
10✔
136
    if not editor:
10!
137
        if sys.platform == 'win32':
×
138
            editor = 'notepad.exe'
×
139
        else:
140
            print('Please set the path to the editor in the environment variable $EDITOR, e.g. "export EDITOR=nano"')
×
141
            raise SystemExit(1)
×
142

143
    subprocess.run(  # noqa: S603 subprocess call - check for execution of untrusted input.
10✔
144
        [*shlex.split(editor), str(filename)], check=True
145
    )
146

147

148
def import_module_from_source(module_name: str, source_path: str | bytes | PathLike) -> ModuleType:
10✔
149
    """Loads a module and executes it in its own namespace.
150

151
    :param module_name: The name of the module to import.
152
    :param source_path: The path where the module is located.
153
    :return: A ModuleType object.
154
    """
155
    source_path = str(source_path)
10✔
156
    loader = importlib.machinery.SourceFileLoader(module_name, source_path)
10✔
157
    spec = importlib.util.spec_from_file_location(module_name, source_path, loader=loader)
10✔
158
    module = importlib.util.module_from_spec(spec)  # type: ignore[arg-type]
10✔
159
    sys.modules[module_name] = module
10✔
160
    loader.exec_module(module)
10✔
161
    # try:
162
    #     loader.exec_module(module)
163
    # except Exception:
164
    #     sys.tracebacklimit = 1000
165
    #     raise
166
    loader.exec_module(module)
10✔
167
    return module
10✔
168

169

170
def chunk_string(text: str, length: int, numbering: bool = False) -> list[str]:
10✔
171
    """Chunks a string.
172

173
    :param text: The text to be chunked.
174
    :param length: The length of the chunked text.
175
    :param numbering: Whether to number each chunk on the left if more than one chunk is generated.
176

177
    :returns: a list of chunked strings
178
    """
179
    if numbering and len(text) > length:
10✔
180
        try:
10✔
181
            text_length = length - 4 - 2
10✔
182
            digits_try = 1 if text_length <= 0 else floor(log10(len(text) / text_length))  # initialization floor
10✔
183
            digits_guess = digits_try + 1
10✔
184
            while digits_guess > digits_try:
10✔
185
                digits_try += 1
10✔
186
                text_length = length - 4 - 2 * digits_try
10✔
187
                if text_length <= 0:
10✔
188
                    raise ValueError('Not enough space to chunkify string with line numbering (1)')
10✔
189
                lines_guess = len(text) / text_length
10✔
190
                digits_guess = floor(log10(lines_guess)) + 1
10✔
191

192
            chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
10✔
193
            actual_digits = floor(log10(len(chunks))) + 1
10✔
194
            while actual_digits > digits_try:
10!
195
                digits_try += 1
×
196
                text_length = length - 4 - 2 * digits_try
×
197
                if text_length <= 0:
×
198
                    raise ValueError('Not enough space to chunkify string with line numbering (2)')
×
199
                chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
×
200
                actual_digits = floor(log10(len(chunks))) + 1
×
201

202
            length = len(chunks)
10✔
203
            return [line + ' (' + f'{{:{digits_try}d}}'.format(i + 1) + f'/{length})' for i, line in enumerate(chunks)]
10✔
204

205
        except ValueError as e:
10✔
206
            logger.error(f'{e}')
10✔
207

208
    return textwrap.wrap(text, length, replace_whitespace=False)
10✔
209

210

211
def linkify(
10✔
212
    text: str,
213
    shorten: bool = False,
214
    extra_params: str | Callable[[str], str] = '',
215
    require_protocol: bool = False,
216
    permitted_protocols: tuple[str, ...] = (
217
        'http',
218
        'https',
219
        'mailto',
220
    ),
221
) -> str:
222
    """Converts plain text into HTML with links.
223

224
    For example linkify("Hello http://tornadoweb.org!") would return 'Hello
225
    <a href="http://tornadoweb.org">http://tornadoweb.org</a>!'.
226

227
    We are using a regex from tornado library https://github.com/tornadoweb/tornado/blob/master/tornado/escape.py.
228
    This regex should avoid character entities other than &amp; so that we won't pick up &quot;, etc., but it is
229
    vulnerable to Regular expression Denial of Service (ReDoS), which would divert computational resources to an
230
    expensive regex match. The risk in this application is limited.
231

232
    In the future, consider using linkify from the bleach project instead (requires importing another package).
233

234
    :parameter text: The text to linkify.
235
    :parameter shorten: Long urls will be shortened for display.
236
    :parameter extra_params: Extra text to include in the link tag, or a callable taking the link as an argument and
237
        returning the extra text, e.g. linkify(text, extra_params='rel="nofollow" class="external"').
238
    :parameter require_protocol: Only linkify urls which include a protocol; if this is False, urls such as
239
        www.facebook.com will also be linkified.
240
    :parameter permitted_protocols: Protocols which should be linkified, e.g. linkify(text,
241
        permitted_protocols=('http', 'ftp', 'mailto')); it is very unsafe to include protocols such as javascript.
242
    """
243
    # _url_re = re.compile(  # original re
244
    #     r'\b('
245
    #     r'(?:([\w-]+):(/{1,3})|www[.])'
246
    #     r'(?:('
247
    #     r'?:(?:[^\s&()]|&amp;|&quot;)*(?:[^!"#$%&'
248
    #     r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
249
    #     r")"
250
    #     r'|(?:\((?:[^\s&()]|&amp;|&quot;)*\))'
251
    #     r')+'
252
    #     r')'
253
    # )
254

255
    _url_re = re.compile(  # modified to catch all URL parameters
10✔
256
        r'\b('
257
        r'(?:([\w-]+):(/{1,3})|www[.])'
258
        r'(?:('
259
        r'?:(?:[^\s()])*(?:[^!"#$%&'
260
        r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
261
        r')'
262
        r'|(?:\((?:[^\s()])*\))'
263
        r')+'
264
        r')'
265
    )
266

267
    if extra_params and not callable(extra_params):
10✔
268
        extra_params = f' {extra_params.strip()}'
10✔
269

270
    def make_link(m: Match) -> str:
10✔
271
        """Replacement function for re.sub using re.match as input to convert plain text into HTML with links."""
272
        url: str = m.group(1)
10✔
273
        proto: str = m.group(2)
10✔
274
        if require_protocol and not proto:
10!
275
            return url  # not protocol, no linkify
×
276

277
        if proto and proto not in permitted_protocols:
10✔
278
            return url  # bad protocol, no linkify
10✔
279

280
        href: str = m.group(1)
10✔
281
        if not proto:
10✔
282
            proto = 'https'
10✔
283
            href = f'https://{href}'  # no proto specified, use https
10✔
284

285
        params = f' {extra_params(href).strip()}' if callable(extra_params) else extra_params
10✔
286

287
        # clip long urls. max_len is just an approximation
288
        max_len = 30
10✔
289
        if shorten and len(url) > max_len:
10✔
290
            before_clip = url
10✔
291
            proto_len = len(proto) + 1 + len(m.group(3) or '') if proto else 0
10✔
292

293
            parts = url[proto_len:].split('/')
10✔
294
            if len(parts) > 1:
10!
295
                # Grab the whole host part plus the first bit of the path
296
                # The path is usually not that interesting once shortened
297
                # (no more slug, etc), so it really just provides a little
298
                # extra indication of shortening.
299
                url = url[:proto_len] + parts[0] + '/' + parts[1][:8].split('?')[0].split('.')[0]
10✔
300

301
            if len(url) > max_len * 1.5:  # still too long
10!
302
                url = url[:max_len]
×
303

304
            if url != before_clip:
10!
305
                amp = url.rfind('&')
10✔
306
                # avoid splitting html char entities
307
                if amp > max_len - 5:
10!
308
                    url = url[:amp]
×
309
                url += '...'
10✔
310

311
                if len(url) >= len(before_clip):
10!
312
                    url = before_clip
×
313
                else:
314
                    # full url is visible on mouse-over (for those who don't
315
                    # have a status bar, such as Safari by default)
316
                    params += f' title={href}'
10✔
317

318
        return f'<a href="{href}"{params}>{url}</a>'
10✔
319

320
    # text = html.escape(text)
321
    return _url_re.sub(make_link, text)
10✔
322

323

324
def get_new_version_number(timeout: float | None = None) -> str | bool:
10✔
325
    """Check PyPi for newer version of project.
326

327
    :parameter timeout: Timeout in seconds after which empty string is returned.
328
    :returns: The new version number if a newer version of project is found on PyPi, empty string otherwise, False if
329
      error retrieving the new version number is encountered.
330
    """
331
    if httpx is None:
10!
332
        logger.info('Cannot query PyPi for latest release: HTTPX not installed')
×
333
        return False
×
334

335
    try:
10✔
336
        r = httpx.Client(http2=h2 is not None, timeout=timeout).get(f'https://pypi.org/pypi/{__project_name__}/json')
10✔
337
    except httpx.RequestError as e:
×
338
        logger.info(f'Exception when querying PyPi for latest release: {e}')
×
339
        return False
×
340

341
    if r.is_success:
10!
342
        latest_release: str = r.json()['info']['version']
10✔
343
        if parse_version(latest_release) > parse_version(__version__):
10!
344
            return latest_release
×
345
    else:
346
        logger.info(f'HTTP error when querying PyPi for latest release: {r}')
×
347

348
    return ''
10✔
349

350

351
def dur_text(duration: float) -> str:
10✔
352
    """Returns a formatted string optimized to the number of seconds for use in footers.
353

354
    :parameter duration: The duration in seconds.
355
    :returns: The formatted string.
356
    """
357
    if duration < 60:
10!
358
        return f'{float(f"{duration:.2g}"):g} seconds'
10✔
UNCOV
359
    m, s = divmod(duration, 60)
×
UNCOV
360
    return f'{m:.0f}:{s:02.0f}'
×
361

362

363
def file_ownership_checks(filename: Path) -> list[str]:
10✔
364
    """Check security of file and its directory, i.e. that they belong to the current UID or root and only the owner
365
    can write to them. Return list of errors if any. Linux only.
366

367
    :returns: List of errors encountered (if any).
368
    """
369
    if sys.platform == 'win32':
10!
370
        return []
×
371

372
    file_ownership_errors = []
10✔
373
    current_uid = os.getuid()
10✔
374

375
    dirname = filename.parent
10✔
376
    dir_st = dirname.stat()
10✔
377
    if (dir_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
10✔
378
        file_ownership_errors.append(f'{dirname} is group/world-writable')
5✔
379
    if dir_st.st_uid not in {current_uid, 0}:
10!
380
        file_ownership_errors.append(f'{dirname} not owned by {getpass.getuser()} or root')
×
381

382
    file_st = filename.stat()
10✔
383
    if (file_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
10✔
384
        file_ownership_errors.append(f'{filename} is group/world-writable')
10✔
385
    if file_st.st_uid not in {current_uid, 0}:
10!
386
        file_ownership_errors.append(f'{filename} not owned by {getpass.getuser()} or root')
×
387

388
    return file_ownership_errors
10✔
389

390

391
def mark_to_html(text: str, markdown_padded_tables: bool | None = False, extras: Iterable[str] | None = None) -> str:
10✔
392
    """Converts a line of Markdown (e.g. as generated by html2text filter) to html.
393

394
    :param text: The text in Markdown format.
395
    :param markdown_padded_tables: If true, monospace the tables for alignment.
396
    :param extras: Additional extras for Markdown.
397
    :return: The text in html format.
398
    """
399
    markdowner_extras = set(extras) if extras else set()
10✔
400
    markdowner_extras.add('strike')  # text marked by double tildes is ~~strikethrough~~
10✔
401
    markdowner_extras.add('target-blank-links')  # <a> tags have rel="noopener" for added security
10✔
402
    markdowner = Markdown(extras=list(markdowner_extras))
10✔
403
    if text == '* * *':  # manually expand horizontal ruler since <hr> is used to separate jobs
10✔
404
        return '-' * 80
10✔
405
    pre = ''
10✔
406
    post = ''
10✔
407
    if text.lstrip()[:2] == '* ':  # item of unordered list
10✔
408
        lstripped = text.lstrip(' ')
10✔
409
        indent = len(text) - len(lstripped)
10✔
410
        pre += '&nbsp;' * indent
10✔
411
        pre += '● ' if indent == 2 else '⯀ ' if indent == 4 else '○ '
10✔
412
        text = text.split('* ', 1)[1]
10✔
413
    if text[:1] == ' ':
10✔
414
        # replace leading spaces with NBSP or converter will strip them all
415
        stripped = text.lstrip()
10✔
416
        text = '&nbsp;' * (len(text) - len(stripped)) + stripped
10✔
417
    text = text.replace('` ', '`&nbsp;')  # replace leading spaces within code blocks
10✔
418
    if markdown_padded_tables and '|' in text:
10✔
419
        # a padded row in a table; keep it monospaced for alignment
420
        pre += '<span style="font-family:monospace;white-space:pre-wrap">'
10✔
421
        post += '</span>'
10✔
422
    text = text.replace('[](', '[[_Link with no text_]](')  # Add link text where missing
10✔
423
    html_out = str(markdowner.convert(text)).rstrip('\n')  # convert markdown to html
10✔
424
    # fixes for Gmail
425
    html_out = html_out.replace('<a', '<a style="font-family:inherit"')  # fix <a> tag styling
10✔
426
    html_out = html_out.replace('<img', '<img style="max-width:100%;height:auto;max-height:100%"')
10✔
427
    html_out = html_out.replace('<code>', '<span style="font-family:monospace;white-space:pre-wrap">')
10✔
428
    html_out = html_out.replace('</code>', '</span>')
10✔
429
    if 'tables' in markdowner_extras:
10✔
430
        html_out = html_out.replace('<table>', '<table border="1" cellspacing="0">')
10✔
431
    # remove <p> tags wrapping
432
    html_out, sub = re.subn(r'^<p>|</p>$', '', html_out)  # remove paragraph tags
10✔
433
    if sub:
10✔
434
        return pre + html_out + post
10✔
435
    html_out = re.sub(r'<(/?)h\d>', r'<\g<1>strong>', html_out)  # replace heading tags with <strong>
10✔
436
    return pre + html_out + post
10✔
437

438

439
def import_optional_dependency(name: str, extra: str = '') -> ModuleType:
10✔
440
    """Import an optional dependency.
441

442
    If a dependency is missing an ImportError with a nice message will be raised.
443

444
    :param name: The module name.
445
    :param extra: Additional text to include in the ImportError message.
446

447
    :returns maybe_module: The imported module, when found and the version is correct.
448
      None is returned when the package is not found.
449
    """
450
    try:
×
451
        module = importlib.import_module(name)
×
452
    except ImportError as err:
×
453
        msg = f'`Import {name}` failed. {extra} Use pip or conda to install the {name} package.'
×
454
        raise ImportError(msg) from err
×
455

456
    return module
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc