• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 14020917399

23 Mar 2025 04:35PM UTC coverage: 75.4% (-0.05%) from 75.448%
14020917399

push

github

mborsetti
Version 3.29.0rc2

1739 of 2632 branches covered (66.07%)

Branch coverage included in aggregate %.

4575 of 5742 relevant lines covered (79.68%)

6.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.53
/webchanges/util.py
1
"""A few utilities used elsewhere."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4
from __future__ import annotations
8✔
5

6
import getpass
8✔
7
import importlib.machinery
8✔
8
import importlib.util
8✔
9
import logging
8✔
10
import os
8✔
11
import re
8✔
12
import shlex
8✔
13
import stat
8✔
14
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
15
import sys
8✔
16
import textwrap
8✔
17
from math import floor, log10
8✔
18
from os import PathLike
8✔
19
from pathlib import Path
8✔
20
from types import ModuleType
8✔
21
from typing import Callable, Iterable, Match
8✔
22

23
from markdown2 import Markdown
8✔
24

25
from webchanges import __project_name__, __version__
8✔
26

27
try:
8✔
28
    import httpx
8✔
29
except ImportError:  # pragma: no cover
30
    httpx = None  # type: ignore[assignment]
31
if httpx is not None:
8!
32
    try:
8✔
33
        import h2
8✔
34
    except ImportError:  # pragma: no cover
35
        h2 = None  # type: ignore[assignment]
36

37
try:
8✔
38
    from packaging.version import parse as parse_version
8✔
39
except ImportError:  # pragma: no cover
40
    from webchanges._vendored.packaging_version import parse as parse_version  # type: ignore[assignment]
41

42
logger = logging.getLogger(__name__)
8✔
43

44

45
def lazy_import(fullname: str) -> ModuleType | None:
8✔
46
    """Lazily imports a module. See https://stackoverflow.com/questions/42703908.
47

48
    To identify loading time, run $ python -X importtime webchanges --help
49
    """
50
    try:
×
51
        return sys.modules[fullname]
×
52
    except KeyError:
×
53
        spec = importlib.util.find_spec(fullname)
×
54
        if spec and spec.loader:
×
55
            module = importlib.util.module_from_spec(spec)
×
56
            loader = importlib.util.LazyLoader(spec.loader)
×
57
            # Make module with proper locking and get it inserted into sys.modules.
58
            loader.exec_module(module)
×
59
            return module
×
60
    return None
×
61

62

63
class TrackSubClasses(type):
8✔
64
    """A metaclass that stores subclass name-to-class mappings in the base class."""
65

66
    # __subclasses__ gets redefined from default "Callable[[_TT], list[_TT]]
67
    __subclasses__: dict[str, TrackSubClasses]  # type: ignore[assignment]
8✔
68
    __anonymous_subclasses__: list[TrackSubClasses]
8✔
69
    __required__: tuple[str, ...] = ()
8✔
70
    __optional__: tuple[str, ...] = ()
8✔
71

72
    __kind__: str
8✔
73

74
    @staticmethod
8✔
75
    def sorted_by_kind(cls: TrackSubClasses) -> list[TrackSubClasses]:
8✔
76
        """Generates a list of all members of a class sorted by the value of their __kind__ attribute. Useful for
77
        documentation.
78

79
        :param cls: The class.
80
        :return: The sorted list of class members.
81
        """
82
        return [item for _, item in sorted((it.__kind__, it) for it in cls.__subclasses__.values() if it.__kind__)]
8✔
83

84
    def __init__(cls, name: str, bases: tuple[type, ...], namespace: dict) -> None:
8✔
85
        for base in bases:
8✔
86
            if base == object:
8!
87
                continue
×
88

89
            for attr in {'__required__', '__optional__'}:
8✔
90
                if not hasattr(base, attr):
8!
91
                    continue
×
92

93
                inherited = getattr(base, attr, ())
8✔
94
                new_value = tuple(namespace.get(attr, ())) + tuple(inherited)
8✔
95
                namespace[attr] = new_value
8✔
96
                setattr(cls, attr, new_value)
8✔
97

98
        for base in bases:
8✔
99
            if base == object:
8!
100
                continue
×
101

102
            if hasattr(cls, '__kind__'):
8!
103
                subclasses = getattr(base, '__subclasses__', None)
8✔
104
                if subclasses is not None:
8!
105
                    logger.debug(f'Registering {cls} as {cls.__kind__}')
8✔
106
                    subclasses[cls.__kind__] = cls
8✔
107
                    break
8✔
108
            else:
109
                anonymous_subclasses = getattr(base, '__anonymous_subclasses__', None)
×
110
                if anonymous_subclasses is not None:
×
111
                    logger.debug(f'Registering {cls}')
×
112
                    anonymous_subclasses.append(cls)
×
113
                    break
×
114

115
        super().__init__(name, bases, namespace)
8✔
116

117

118
def edit_file(filename: str | bytes | PathLike) -> None:
8✔
119
    """Opens the editor to edit a file.
120

121
    :param filename: The filename.
122
    """
123
    editor = os.environ.get('EDITOR')
8✔
124
    if not editor:
8✔
125
        editor = os.environ.get('VISUAL')
8✔
126
    if not editor:
8!
127
        if sys.platform == 'win32':
×
128
            editor = 'notepad.exe'
×
129
        else:
130
            print('Please set the path to the editor in the environment variable $EDITOR, e.g. "export EDITOR=nano"')
×
131
            raise SystemExit(1)
×
132

133
    subprocess.run(  # noqa: S603 subprocess call - check for execution of untrusted input.
8✔
134
        shlex.split(editor) + [str(filename)],
135
        check=True,
136
    )
137

138

139
def import_module_from_source(module_name: str, source_path: str | bytes | PathLike) -> ModuleType:
8✔
140
    """Loads a module and executes it in its own namespace.
141

142
    :param module_name: The name of the module to import.
143
    :param source_path: The path where the module is located.
144
    :return: A ModuleType object.
145
    """
146
    source_path = str(source_path)
8✔
147
    loader = importlib.machinery.SourceFileLoader(module_name, source_path)
8✔
148
    spec = importlib.util.spec_from_file_location(module_name, source_path, loader=loader)
8✔
149
    module = importlib.util.module_from_spec(spec)  # type: ignore[arg-type]
8✔
150
    sys.modules[module_name] = module
8✔
151
    try:
8✔
152
        loader.exec_module(module)
8✔
153
    except Exception:
×
154
        sys.tracebacklimit = 1000
×
155
        raise
×
156
    return module
8✔
157

158

159
def chunk_string(text: str, length: int, numbering: bool = False) -> list[str]:
8✔
160
    """Chunks a string.
161

162
    :param text: The text to be chunked.
163
    :param length: The length of the chunked text.
164
    :param numbering: Whether to number each chunk on the left if more than one chunk is generated.
165

166
    :returns: a list of chunked strings
167
    """
168
    if numbering and len(text) > length:
8✔
169
        try:
8✔
170
            text_length = length - 4 - 2
8✔
171
            digits_try = 1 if text_length <= 0 else floor(log10(len(text) / text_length))  # initialization floor
8✔
172
            digits_guess = digits_try + 1
8✔
173
            while digits_guess > digits_try:
8✔
174
                digits_try += 1
8✔
175
                text_length = length - 4 - 2 * digits_try
8✔
176
                if text_length <= 0:
8✔
177
                    raise ValueError('Not enough space to chunkify string with line numbering (1)')
8✔
178
                lines_guess = len(text) / text_length
8✔
179
                digits_guess = floor(log10(lines_guess)) + 1
8✔
180

181
            chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
8✔
182
            actual_digits = floor(log10(len(chunks))) + 1
8✔
183
            while actual_digits > digits_try:
8!
184
                digits_try += 1
×
185
                text_length = length - 4 - 2 * digits_try
×
186
                if text_length <= 0:
×
187
                    raise ValueError('Not enough space to chunkify string with line numbering (2)')
×
188
                chunks = textwrap.wrap(text, text_length, replace_whitespace=False)
×
189
                actual_digits = floor(log10(len(chunks))) + 1
×
190

191
            length = len(chunks)
8✔
192
            return [line + ' (' + f'{{:{digits_try}d}}'.format(i + 1) + f'/{length})' for i, line in enumerate(chunks)]
8!
193

194
        except ValueError as e:
8✔
195
            logger.error(f'{e}')
8✔
196

197
    return textwrap.wrap(text, length, replace_whitespace=False)
8✔
198

199

200
def linkify(
8✔
201
    text: str,
202
    shorten: bool = False,
203
    extra_params: str | Callable[[str], str] = '',
204
    require_protocol: bool = False,
205
    permitted_protocols: tuple[str, ...] = (
206
        'http',
207
        'https',
208
        'mailto',
209
    ),
210
) -> str:
211
    """Converts plain text into HTML with links.
212

213
    For example linkify("Hello http://tornadoweb.org!") would return 'Hello
214
    <a href="http://tornadoweb.org">http://tornadoweb.org</a>!'.
215

216
    We are using a regex from tornado library https://github.com/tornadoweb/tornado/blob/master/tornado/escape.py.
217
    This regex should avoid character entities other than &amp; so that we won't pick up &quot;, etc., but it is
218
    vulnerable to Regular expression Denial of Service (ReDoS), which would divert computational resources to an
219
    expensive regex match. The risk in this application is limited.
220

221
    In the future, consider using linkify from the bleach project instead (requires importing another package).
222

223
    :parameter text: The text to linkify.
224
    :parameter shorten: Long urls will be shortened for display.
225
    :parameter extra_params: Extra text to include in the link tag, or a callable taking the link as an argument and
226
        returning the extra text, e.g. linkify(text, extra_params='rel="nofollow" class="external"').
227
    :parameter require_protocol: Only linkify urls which include a protocol; if this is False, urls such as
228
        www.facebook.com will also be linkified.
229
    :parameter permitted_protocols: Protocols which should be linkified, e.g. linkify(text,
230
        permitted_protocols=('http', 'ftp', 'mailto')); it is very unsafe to include protocols such as javascript.
231
    """
232
    # _URL_RE = re.compile(  # original re
233
    #     r'\b('
234
    #     r'(?:([\w-]+):(/{1,3})|www[.])'
235
    #     r'(?:('
236
    #     r'?:(?:[^\s&()]|&amp;|&quot;)*(?:[^!"#$%&'
237
    #     r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
238
    #     r")"
239
    #     r'|(?:\((?:[^\s&()]|&amp;|&quot;)*\))'
240
    #     r')+'
241
    #     r')'
242
    # )  # noqa: DUO138 catastrophic "re" usage - denial-of-service possible.
243

244
    _URL_RE = re.compile(  # modified to catch all URL parameters
8✔
245
        r'\b('
246
        r'(?:([\w-]+):(/{1,3})|www[.])'
247
        r'(?:('
248
        r'?:(?:[^\s()])*(?:[^!"#$%&'
249
        r"'()*+,.:;<=>?@\[\]^`{|}~\s])"
250
        r')'
251
        r'|(?:\((?:[^\s()])*\))'
252
        r')+'
253
        r')'
254
    )  # noqa: DUO138 catastrophic "re" usage - denial-of-service possible.
255

256
    if extra_params and not callable(extra_params):
8✔
257
        extra_params = f' {extra_params.strip()}'
8✔
258

259
    def make_link(m: Match) -> str:
8✔
260
        """Replacement function for re.sub using re.match as input to convert plain text into HTML with links."""
261
        url: str = m.group(1)
8✔
262
        proto: str = m.group(2)
8✔
263
        if require_protocol and not proto:
8!
264
            return url  # not protocol, no linkify
×
265

266
        if proto and proto not in permitted_protocols:
8✔
267
            return url  # bad protocol, no linkify
8✔
268

269
        href: str = m.group(1)
8✔
270
        if not proto:
8✔
271
            proto = 'https'
8✔
272
            href = f'https://{href}'  # no proto specified, use https
8✔
273

274
        if callable(extra_params):
8!
275
            params = f' {extra_params(href).strip()}'
×
276
        else:
277
            params = extra_params
8✔
278

279
        # clip long urls. max_len is just an approximation
280
        max_len = 30
8✔
281
        if shorten and len(url) > max_len:
8✔
282
            before_clip = url
8✔
283
            if proto:
8!
284
                proto_len = len(proto) + 1 + len(m.group(3) or '')  # +1 for :
8✔
285
            else:
286
                proto_len = 0
×
287

288
            parts = url[proto_len:].split('/')
8✔
289
            if len(parts) > 1:
8!
290
                # Grab the whole host part plus the first bit of the path
291
                # The path is usually not that interesting once shortened
292
                # (no more slug, etc), so it really just provides a little
293
                # extra indication of shortening.
294
                url = url[:proto_len] + parts[0] + '/' + parts[1][:8].split('?')[0].split('.')[0]
8✔
295

296
            if len(url) > max_len * 1.5:  # still too long
8!
297
                url = url[:max_len]
×
298

299
            if url != before_clip:
8!
300
                amp = url.rfind('&')
8✔
301
                # avoid splitting html char entities
302
                if amp > max_len - 5:
8!
303
                    url = url[:amp]
×
304
                url += '...'
8✔
305

306
                if len(url) >= len(before_clip):
8!
307
                    url = before_clip
×
308
                else:
309
                    # full url is visible on mouse-over (for those who don't
310
                    # have a status bar, such as Safari by default)
311
                    params += f' title={href}'
8✔
312

313
        return f'<a href="{href}"{params}>{url}</a>'
8✔
314

315
    # text = html.escape(text)
316
    return _URL_RE.sub(make_link, text)
8✔
317

318

319
def get_new_version_number(timeout: float | None = None) -> str | bool:
8✔
320
    """Check PyPi for newer version of project.
321

322
    :parameter timeout: Timeout in seconds after which empty string is returned.
323
    :returns: The new version number if a newer version of project is found on PyPi, empty string otherwise, False if
324
      error retrieving the new version number is encountered.
325
    """
326
    if httpx is None:
8!
327
        logger.info('Cannot query PyPi for latest release: HTTPX not installed')
×
328
        return False
×
329

330
    try:
8✔
331
        r = httpx.Client(http2=h2 is not None, timeout=timeout).get(f'https://pypi.org/pypi/{__project_name__}/json')
8✔
332
    except httpx.RequestError as e:
×
333
        logger.info(f'Exception when querying PyPi for latest release: {e}')
×
334
        return False
×
335

336
    if r.is_success:
8!
337
        latest_release: str = r.json()['info']['version']
8✔
338
        if parse_version(latest_release) > parse_version(__version__):
8!
339
            return latest_release
×
340
    else:
341
        logger.info(f'HTTP error when querying PyPi for latest release: {r}')
×
342

343
    return ''
8✔
344

345

346
def dur_text(duration: float) -> str:
8✔
347
    """Returns a formatted string optimized to the number of seconds for use in footers.
348

349
    :parameter duration: The duration in seconds.
350
    :returns: The formatted string.
351
    """
352
    if duration < 60:
8!
353
        return f'{float(f"{duration:.2g}"):g} seconds'
8✔
354
    else:
355
        m, s = divmod(duration, 60)
×
356
        return f'{m:.0f}:{s:02.0f}'
×
357

358

359
def file_ownership_checks(filename: Path) -> list[str]:
8✔
360
    """Check security of file and its directory, i.e. that they belong to the current UID or root and only the owner
361
    can write to them. Return list of errors if any. Linux only.
362

363
    :returns: List of errors encountered (if any).
364
    """
365

366
    if sys.platform == 'win32':
8!
367
        return []
×
368

369
    file_ownership_errors = []
8✔
370
    current_uid = os.getuid()  # type: ignore[attr-defined]  # not defined in Windows
8✔
371

372
    dirname = filename.parent
8✔
373
    dir_st = dirname.stat()
8✔
374
    if (dir_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
8✔
375
        file_ownership_errors.append(f'{dirname} is group/world-writable')
4✔
376
    if dir_st.st_uid not in {current_uid, 0}:
8!
377
        file_ownership_errors.append(f'{dirname} not owned by {getpass.getuser()} or root')
×
378

379
    file_st = filename.stat()
8✔
380
    if (file_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
8✔
381
        file_ownership_errors.append(f'{filename} is group/world-writable')
8✔
382
    if file_st.st_uid not in {current_uid, 0}:
8!
383
        file_ownership_errors.append(f'{filename} not owned by {getpass.getuser()} or root')
×
384

385
    return file_ownership_errors
8✔
386

387

388
def mark_to_html(text: str, markdown_padded_tables: bool | None = False, extras: Iterable[str] | None = None) -> str:
8✔
389
    """Converts a line of Markdown (e.g. as generated by html2text filter) to html.
390

391
    :param text: The text in Markdown format.
392
    :param markdown_padded_tables: If true, monospace the tables for alignment.
393
    :param extras: Additional extras for Markdown.
394
    :return: The text in html format.
395
    """
396
    markdowner_extras = set(extras) if extras else set()
8✔
397
    markdowner_extras.add('strike')  # text marked by double tildes is ~~strikethrough~~
8✔
398
    markdowner_extras.add('target-blank-links')  # <a> tags have rel="noopener" for added security
8✔
399
    markdowner = Markdown(extras=list(markdowner_extras))
8✔
400
    if text == '* * *':  # manually expand horizontal ruler since <hr> is used to separate jobs
8✔
401
        return '-' * 80
8✔
402
    pre = ''
8✔
403
    post = ''
8✔
404
    if text.lstrip()[:2] == '* ':  # item of unordered list
8✔
405
        lstripped = text.lstrip(' ')
8✔
406
        indent = len(text) - len(lstripped)
8✔
407
        pre += '&nbsp;' * indent
8✔
408
        pre += '● ' if indent == 2 else '⯀ ' if indent == 4 else '○ '
8✔
409
        text = text.split('* ', 1)[1]
8✔
410
    if text[:1] == ' ':
8✔
411
        # replace leading spaces with NBSP or converter will strip them all
412
        stripped = text.lstrip()
8✔
413
        text = '&nbsp;' * (len(text) - len(stripped)) + stripped
8✔
414
    text = text.replace('` ', '`&nbsp;')  # replace leading spaces within code blocks
8✔
415
    if markdown_padded_tables and '|' in text:
8✔
416
        # a padded row in a table; keep it monospaced for alignment
417
        pre += '<span style="font-family:monospace;white-space:pre-wrap">'
8✔
418
        post += '</span>'
8✔
419
    text = text.replace('[](', '[[_Link with no text_]](')  # Add link text where missing
8✔
420
    html_out = str(markdowner.convert(text)).rstrip('\n')  # convert markdown to html
8✔
421
    # fixes for Gmail
422
    html_out = html_out.replace('<a', '<a style="font-family:inherit"')  # fix <a> tag styling
8✔
423
    html_out = html_out.replace('<img', '<img style="max-width:100%;height:auto;max-height:100%"')
8✔
424
    html_out = html_out.replace('<code>', '<span style="font-family:monospace;white-space:pre-wrap">')
8✔
425
    html_out = html_out.replace('</code>', '</span>')
8✔
426
    if 'tables' in markdowner_extras:
8✔
427
        html_out = html_out.replace('<table>', '<table border="1" cellspacing="0">')
8✔
428
    # remove <p> tags wrapping
429
    html_out, sub = re.subn(r'^<p>|</p>$', '', html_out)  # remove paragraph tags
8✔
430
    if sub:
8✔
431
        return pre + html_out + post
8✔
432
    html_out = re.sub(r'<(/?)h\d>', r'<\g<1>strong>', html_out)  # replace heading tags with <strong>
8✔
433
    return pre + html_out + post
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc