• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 16548352850

27 Jul 2025 06:53AM UTC coverage: 74.68% (-0.4%) from 75.068%
16548352850

push

github

mborsetti
Version 3.31.0rc0

1799 of 2750 branches covered (65.42%)

Branch coverage included in aggregate %.

4 of 4 new or added lines in 1 file covered. (100.0%)

799 existing lines in 8 files now uncovered.

4669 of 5911 relevant lines covered (78.99%)

6.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.45
/webchanges/util.py
1
"""A few utilities used elsewhere."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4
from __future__ import annotations
8✔
5

6
import getpass
8✔
7
import importlib.machinery
8✔
8
import importlib.util
8✔
9
import logging
8✔
10
import os
8✔
11
import re
8✔
12
import shlex
8✔
13
import stat
8✔
14
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
15
import sys
8✔
16
import textwrap
8✔
17
from math import floor, log10
8✔
18
from os import PathLike
8✔
19
from pathlib import Path
8✔
20
from types import ModuleType
8✔
21
from typing import Callable, Iterable, Match
8✔
22

23
from markdown2 import Markdown
8✔
24

25
from webchanges import __project_name__, __version__
8✔
26

27
try:
8✔
28
    import httpx
8✔
29
except ImportError:  # pragma: no cover
30
    httpx = None  # type: ignore[assignment]
31
if httpx is not None:
8!
32
    try:
8✔
33
        import h2
8✔
34
    except ImportError:  # pragma: no cover
35
        h2 = None  # type: ignore[assignment]
36

37
try:
8✔
38
    from packaging.version import parse as parse_version
8✔
39
except ImportError:  # pragma: no cover
40
    from webchanges._vendored.packaging_version import parse as parse_version  # type: ignore[assignment]
41

42
logger = logging.getLogger(__name__)
8✔
43

44

45
def lazy_import(fullname: str) -> ModuleType | None:
8✔
46
    """Lazily imports a module. See https://stackoverflow.com/questions/42703908.
47

48
    To identify loading time, run $ python -X importtime webchanges --help
49
    """
50
    try:
×
51
        return sys.modules[fullname]
×
52
    except KeyError:
×
53
        spec = importlib.util.find_spec(fullname)
×
54
        if spec and spec.loader:
×
55
            module = importlib.util.module_from_spec(spec)
×
56
            loader = importlib.util.LazyLoader(spec.loader)
×
57
            # Make module with proper locking and get it inserted into sys.modules.
58
            loader.exec_module(module)
×
59
            return module
×
60
    return None
×
61

62

63
class TrackSubClasses(type):
8✔
64
    """A metaclass that stores subclass name-to-class mappings in the base class."""
65

66
    # __subclasses__ gets redefined from default "Callable[[_TT], list[_TT]]
67
    __subclasses__: dict[str, TrackSubClasses]  # type: ignore[assignment]
8✔
68
    __anonymous_subclasses__: list[TrackSubClasses]
8✔
69
    __required__: tuple[str, ...] = ()
8✔
70
    __optional__: tuple[str, ...] = ()
8✔
71
    __supported_directives__: dict[str, str] = {}
8✔
72
    __supported_subfilters__: dict[str, str] = {}
8✔
73

74
    __kind__: str
8✔
75

76
    def sorted_by_kind(cls: TrackSubClasses) -> list[TrackSubClasses]:
8✔
77
        """Generates a list of all members of a class sorted by the value of their __kind__ attribute. Useful for
78
        documentation.
79

80
        :param cls: The class.
81
        :return: The sorted list of class members.
82
        """
83
        return [item for _, item in sorted((it.__kind__, it) for it in cls.__subclasses__.values() if it.__kind__)]
8✔
84

85
    def __init__(cls, name: str, bases: tuple[type, ...], namespace: dict) -> None:
8✔
86
        for base in bases:
8✔
87
            if base == object:
8!
UNCOV
88
                continue
×
89

90
            for attr in {'__required__', '__optional__'}:
8✔
91
                if not hasattr(base, attr):
8!
UNCOV
92
                    continue
×
93

94
                inherited = getattr(base, attr, ())
8✔
95
                new_value = tuple(namespace.get(attr, ())) + tuple(inherited)
8✔
96
                namespace[attr] = new_value
8✔
97
                setattr(cls, attr, new_value)
8✔
98

99
        for base in bases:
8✔
100
            if base == object:
8!
UNCOV
101
                continue
×
102

103
            if hasattr(cls, '__kind__'):
8!
104
                subclasses = getattr(base, '__subclasses__', None)
8✔
105
                if subclasses is not None:
8!
106
                    logger.debug(f'Registering {cls} as {cls.__kind__}')
8✔
107
                    subclasses[cls.__kind__] = cls
8✔
108
                    break
8✔
109
            else:
110
                anonymous_subclasses = getattr(base, '__anonymous_subclasses__', None)
×
111
                if anonymous_subclasses is not None:
×
112
                    logger.debug(f'Registering {cls}')
×
113
                    anonymous_subclasses.append(cls)
×
UNCOV
114
                    break
×
115

116
        super().__init__(name, bases, namespace)
8✔
117

118

119
def edit_file(filename: str | bytes | PathLike) -> None:
8✔
120
    """Opens the editor to edit a file.
121

122
    :param filename: The filename.
123
    """
124
    editor = os.environ.get('EDITOR')
8✔
125
    if not editor:
8✔
126
        editor = os.environ.get('VISUAL')
8✔
127
    if not editor:
8!
128
        if sys.platform == 'win32':
×
UNCOV
129
            editor = 'notepad.exe'
×
130
        else:
131
            print('Please set the path to the editor in the environment variable $EDITOR, e.g. "export EDITOR=nano"')
×
UNCOV
132
            raise SystemExit(1)
×
133

134
    subprocess.run(  # noqa: S603 subprocess call - check for execution of untrusted input.
8✔
135
        shlex.split(editor) + [str(filename)],
136
        check=True,
137
    )
138

139

140
def import_module_from_source(module_name: str, source_path: str | bytes | PathLike) -> ModuleType:
8✔
141
    """Loads a module and executes it in its own namespace.
142

143
    :param module_name: The name of the module to import.
144
    :param source_path: The path where the module is located.
145
    :return: A ModuleType object.
146
    """
147
    source_path = str(source_path)
8✔
148
    loader = importlib.machinery.SourceFileLoader(module_name, source_path)
8✔
149
    spec = importlib.util.spec_from_file_location(module_name, source_path, loader=loader)
8✔
150
    module = importlib.util.module_from_spec(spec)  # type: ignore[arg-type]
8✔
151
    sys.modules[module_name] = module
8✔
152
    try:
8✔
153
        loader.exec_module(module)
8✔
154
    except Exception:
×
155
        sys.tracebacklimit = 1000
×
UNCOV
156
        raise
×
157
    return module
8✔
158

159

160
def chunk_string(text: str, length: int, numbering: bool = False) -> list[str]:
8✔
161
    """Chunks a string.
162

163
    :param text: The text to be chunked.
164
    :param length: The length of the chunked text.
165
    :param numbering: Whether to number each chunk on the left if more than one chunk is generated.
166

167
    :returns: a list of chunked strings
168
    """
169
    if numbering and len(text) > length:
8✔
170
        try:
8✔
171
            text_length = length - 4 - 2
8✔
172
            digits_try = 1 if text_length <= 0 else floor(log10(len(text) / text_length))  # initialization floor
8✔
173
            digits_guess = digits_try + 1
8✔
174
            while digits_guess > digits_try:
8✔
175
                digits_try += 1
8✔
176
                text_length = length - 4 - 2 * digits_try
8✔
177
                if text_length <= 0:
8✔
178
                    raise ValueError('Not enough space to chunkify string with line numbering (1)')
8✔
179
                lines_guess = len(text) / text_length
8✔
180
                digits_guess = floor(log10(lines_guess)) + 1
8✔
181

182
            chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
8✔
183
            actual_digits = floor(log10(len(chunks))) + 1
8✔
184
            while actual_digits > digits_try:
8!
185
                digits_try += 1
×
186
                text_length = length - 4 - 2 * digits_try
×
187
                if text_length <= 0:
×
188
                    raise ValueError('Not enough space to chunkify string with line numbering (2)')
×
189
                chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
×
UNCOV
190
                actual_digits = floor(log10(len(chunks))) + 1
×
191

192
            length = len(chunks)
8✔
193
            return [line + ' (' + f'{{:{digits_try}d}}'.format(i + 1) + f'/{length})' for i, line in enumerate(chunks)]
8!
194

195
        except ValueError as e:
8✔
196
            logger.error(f'{e}')
8✔
197

198
    return textwrap.wrap(text, length, replace_whitespace=False)
8✔
199

200

201
def linkify(
8✔
202
    text: str,
203
    shorten: bool = False,
204
    extra_params: str | Callable[[str], str] = '',
205
    require_protocol: bool = False,
206
    permitted_protocols: tuple[str, ...] = (
207
        'http',
208
        'https',
209
        'mailto',
210
    ),
211
) -> str:
212
    """Converts plain text into HTML with links.
213

214
    For example linkify("Hello http://tornadoweb.org!") would return 'Hello
215
    <a href="http://tornadoweb.org">http://tornadoweb.org</a>!'.
216

217
    We are using a regex from tornado library https://github.com/tornadoweb/tornado/blob/master/tornado/escape.py.
218
    This regex should avoid character entities other than &amp; so that we won't pick up &quot;, etc., but it is
219
    vulnerable to Regular expression Denial of Service (ReDoS), which would divert computational resources to an
220
    expensive regex match. The risk in this application is limited.
221

222
    In the future, consider using linkify from the bleach project instead (requires importing another package).
223

224
    :parameter text: The text to linkify.
225
    :parameter shorten: Long urls will be shortened for display.
226
    :parameter extra_params: Extra text to include in the link tag, or a callable taking the link as an argument and
227
        returning the extra text, e.g. linkify(text, extra_params='rel="nofollow" class="external"').
228
    :parameter require_protocol: Only linkify urls which include a protocol; if this is False, urls such as
229
        www.facebook.com will also be linkified.
230
    :parameter permitted_protocols: Protocols which should be linkified, e.g. linkify(text,
231
        permitted_protocols=('http', 'ftp', 'mailto')); it is very unsafe to include protocols such as javascript.
232
    """
233
    # _URL_RE = re.compile(  # original re
234
    #     r'\b('
235
    #     r'(?:([\w-]+):(/{1,3})|www[.])'
236
    #     r'(?:('
237
    #     r'?:(?:[^\s&()]|&amp;|&quot;)*(?:[^!"#$%&'
238
    #     r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
239
    #     r")"
240
    #     r'|(?:\((?:[^\s&()]|&amp;|&quot;)*\))'
241
    #     r')+'
242
    #     r')'
243
    # )  # noqa: DUO138 catastrophic "re" usage - denial-of-service possible.
244

245
    _URL_RE = re.compile(  # modified to catch all URL parameters
8✔
246
        r'\b('
247
        r'(?:([\w-]+):(/{1,3})|www[.])'
248
        r'(?:('
249
        r'?:(?:[^\s()])*(?:[^!"#$%&'
250
        r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
251
        r')'
252
        r'|(?:\((?:[^\s()])*\))'
253
        r')+'
254
        r')'
255
    )  # noqa: DUO138 catastrophic "re" usage - denial-of-service possible.
256

257
    if extra_params and not callable(extra_params):
8✔
258
        extra_params = f' {extra_params.strip()}'
8✔
259

260
    def make_link(m: Match) -> str:
8✔
261
        """Replacement function for re.sub using re.match as input to convert plain text into HTML with links."""
262
        url: str = m.group(1)
8✔
263
        proto: str = m.group(2)
8✔
264
        if require_protocol and not proto:
8!
UNCOV
265
            return url  # not protocol, no linkify
×
266

267
        if proto and proto not in permitted_protocols:
8✔
268
            return url  # bad protocol, no linkify
8✔
269

270
        href: str = m.group(1)
8✔
271
        if not proto:
8✔
272
            proto = 'https'
8✔
273
            href = f'https://{href}'  # no proto specified, use https
8✔
274

275
        if callable(extra_params):
8!
UNCOV
276
            params = f' {extra_params(href).strip()}'
×
277
        else:
278
            params = extra_params
8✔
279

280
        # clip long urls. max_len is just an approximation
281
        max_len = 30
8✔
282
        if shorten and len(url) > max_len:
8✔
283
            before_clip = url
8✔
284
            if proto:
8!
285
                proto_len = len(proto) + 1 + len(m.group(3) or '')  # +1 for :
8✔
286
            else:
UNCOV
287
                proto_len = 0
×
288

289
            parts = url[proto_len:].split('/')
8✔
290
            if len(parts) > 1:
8!
291
                # Grab the whole host part plus the first bit of the path
292
                # The path is usually not that interesting once shortened
293
                # (no more slug, etc), so it really just provides a little
294
                # extra indication of shortening.
295
                url = url[:proto_len] + parts[0] + '/' + parts[1][:8].split('?')[0].split('.')[0]
8✔
296

297
            if len(url) > max_len * 1.5:  # still too long
8!
UNCOV
298
                url = url[:max_len]
×
299

300
            if url != before_clip:
8!
301
                amp = url.rfind('&')
8✔
302
                # avoid splitting html char entities
303
                if amp > max_len - 5:
8!
UNCOV
304
                    url = url[:amp]
×
305
                url += '...'
8✔
306

307
                if len(url) >= len(before_clip):
8!
UNCOV
308
                    url = before_clip
×
309
                else:
310
                    # full url is visible on mouse-over (for those who don't
311
                    # have a status bar, such as Safari by default)
312
                    params += f' title={href}'
8✔
313

314
        return f'<a href="{href}"{params}>{url}</a>'
8✔
315

316
    # text = html.escape(text)
317
    return _URL_RE.sub(make_link, text)
8✔
318

319

320
def get_new_version_number(timeout: float | None = None) -> str | bool:
8✔
321
    """Check PyPi for newer version of project.
322

323
    :parameter timeout: Timeout in seconds after which empty string is returned.
324
    :returns: The new version number if a newer version of project is found on PyPi, empty string otherwise, False if
325
      error retrieving the new version number is encountered.
326
    """
327
    if httpx is None:
8!
328
        logger.info('Cannot query PyPi for latest release: HTTPX not installed')
×
UNCOV
329
        return False
×
330

331
    try:
8✔
332
        r = httpx.Client(http2=h2 is not None, timeout=timeout).get(f'https://pypi.org/pypi/{__project_name__}/json')
8✔
333
    except httpx.RequestError as e:
×
334
        logger.info(f'Exception when querying PyPi for latest release: {e}')
×
UNCOV
335
        return False
×
336

337
    if r.is_success:
8!
338
        latest_release: str = r.json()['info']['version']
8✔
339
        if parse_version(latest_release) > parse_version(__version__):  # pyright: ignore[reportOperatorIssue]
8!
UNCOV
340
            return latest_release
×
341
    else:
UNCOV
342
        logger.info(f'HTTP error when querying PyPi for latest release: {r}')
×
343

344
    return ''
8✔
345

346

347
def dur_text(duration: float) -> str:
8✔
348
    """Returns a formatted string optimized to the number of seconds for use in footers.
349

350
    :parameter duration: The duration in seconds.
351
    :returns: The formatted string.
352
    """
353
    if duration < 60:
8!
354
        return f'{float(f"{duration:.2g}"):g} seconds'
8✔
355
    else:
356
        m, s = divmod(duration, 60)
×
UNCOV
357
        return f'{m:.0f}:{s:02.0f}'
×
358

359

360
def file_ownership_checks(filename: Path) -> list[str]:
8✔
361
    """Check security of file and its directory, i.e. that they belong to the current UID or root and only the owner
362
    can write to them. Return list of errors if any. Linux only.
363

364
    :returns: List of errors encountered (if any).
365
    """
366

367
    if sys.platform == 'win32':
8!
UNCOV
368
        return []
×
369

370
    file_ownership_errors = []
8✔
371
    current_uid = os.getuid()  # type: ignore[attr-defined]  # not defined in Windows
8✔
372

373
    dirname = filename.parent
8✔
374
    dir_st = dirname.stat()
8✔
375
    if (dir_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
8✔
376
        file_ownership_errors.append(f'{dirname} is group/world-writable')
4✔
377
    if dir_st.st_uid not in {current_uid, 0}:
8!
UNCOV
378
        file_ownership_errors.append(f'{dirname} not owned by {getpass.getuser()} or root')
×
379

380
    file_st = filename.stat()
8✔
381
    if (file_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
8✔
382
        file_ownership_errors.append(f'{filename} is group/world-writable')
8✔
383
    if file_st.st_uid not in {current_uid, 0}:
8!
UNCOV
384
        file_ownership_errors.append(f'{filename} not owned by {getpass.getuser()} or root')
×
385

386
    return file_ownership_errors
8✔
387

388

389
def mark_to_html(text: str, markdown_padded_tables: bool | None = False, extras: Iterable[str] | None = None) -> str:
8✔
390
    """Converts a line of Markdown (e.g. as generated by html2text filter) to html.
391

392
    :param text: The text in Markdown format.
393
    :param markdown_padded_tables: If true, monospace the tables for alignment.
394
    :param extras: Additional extras for Markdown.
395
    :return: The text in html format.
396
    """
397
    markdowner_extras = set(extras) if extras else set()
8✔
398
    markdowner_extras.add('strike')  # text marked by double tildes is ~~strikethrough~~
8✔
399
    markdowner_extras.add('target-blank-links')  # <a> tags have rel="noopener" for added security
8✔
400
    markdowner = Markdown(extras=list(markdowner_extras))
8✔
401
    if text == '* * *':  # manually expand horizontal ruler since <hr> is used to separate jobs
8✔
402
        return '-' * 80
8✔
403
    pre = ''
8✔
404
    post = ''
8✔
405
    if text.lstrip()[:2] == '* ':  # item of unordered list
8✔
406
        lstripped = text.lstrip(' ')
8✔
407
        indent = len(text) - len(lstripped)
8✔
408
        pre += '&nbsp;' * indent
8✔
409
        pre += '● ' if indent == 2 else '⯀ ' if indent == 4 else '○ '
8✔
410
        text = text.split('* ', 1)[1]
8✔
411
    if text[:1] == ' ':
8✔
412
        # replace leading spaces with NBSP or converter will strip them all
413
        stripped = text.lstrip()
8✔
414
        text = '&nbsp;' * (len(text) - len(stripped)) + stripped
8✔
415
    text = text.replace('` ', '`&nbsp;')  # replace leading spaces within code blocks
8✔
416
    if markdown_padded_tables and '|' in text:
8✔
417
        # a padded row in a table; keep it monospaced for alignment
418
        pre += '<span style="font-family:monospace;white-space:pre-wrap">'
8✔
419
        post += '</span>'
8✔
420
    text = text.replace('[](', '[[_Link with no text_]](')  # Add link text where missing
8✔
421
    html_out = str(markdowner.convert(text)).rstrip('\n')  # convert markdown to html
8✔
422
    # fixes for Gmail
423
    html_out = html_out.replace('<a', '<a style="font-family:inherit"')  # fix <a> tag styling
8✔
424
    html_out = html_out.replace('<img', '<img style="max-width:100%;height:auto;max-height:100%"')
8✔
425
    html_out = html_out.replace('<code>', '<span style="font-family:monospace;white-space:pre-wrap">')
8✔
426
    html_out = html_out.replace('</code>', '</span>')
8✔
427
    if 'tables' in markdowner_extras:
8✔
428
        html_out = html_out.replace('<table>', '<table border="1" cellspacing="0">')
8✔
429
    # remove <p> tags wrapping
430
    html_out, sub = re.subn(r'^<p>|</p>$', '', html_out)  # remove paragraph tags
8✔
431
    if sub:
8✔
432
        return pre + html_out + post
8✔
433
    html_out = re.sub(r'<(/?)h\d>', r'<\g<1>strong>', html_out)  # replace heading tags with <strong>
8✔
434
    return pre + html_out + post
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc