• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 10371336087

13 Aug 2024 02:07PM UTC coverage: 77.832% (-0.2%) from 78.056%
10371336087

push

github

mborsetti
Version 3.25.0rc0

1751 of 2515 branches covered (69.62%)

Branch coverage included in aggregate %.

4446 of 5447 relevant lines covered (81.62%)

6.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.44
/webchanges/differs.py
1
"""Differs."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import base64
8✔
8
import difflib
8✔
9
import html
8✔
10
import logging
8✔
11
import math
8✔
12
import os
8✔
13
import re
8✔
14
import shlex
8✔
15
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
16
import tempfile
8✔
17
import traceback
8✔
18
import urllib.parse
8✔
19
import warnings
8✔
20
from base64 import b64encode
8✔
21
from datetime import datetime
8✔
22
from io import BytesIO
8✔
23
from pathlib import Path
8✔
24
from typing import Any, Iterator, Literal, Optional, TYPE_CHECKING
8✔
25
from zoneinfo import ZoneInfo
8✔
26

27
import html2text
8✔
28

29
from webchanges.util import linkify, mark_to_html, TrackSubClasses
8✔
30

31
try:
8✔
32
    from deepdiff import DeepDiff
8✔
33
    from deepdiff.model import DiffLevel
8✔
34
except ImportError as e:  # pragma: no cover
35
    DeepDiff = str(e)  # type: ignore[no-redef]
36

37
try:
8✔
38
    import httpx
8✔
39
except ImportError:  # pragma: no cover
40
    httpx = None  # type: ignore[assignment]
41
if httpx is not None:
8!
42
    try:
8✔
43
        import h2
8✔
44
    except ImportError:  # pragma: no cover
45
        h2 = None  # type: ignore[assignment]
46

47
try:
8✔
48
    import numpy as np
8✔
49
except ImportError as e:  # pragma: no cover
50
    np = str(e)  # type: ignore[assignment]
51

52
try:
8✔
53
    from PIL import Image, ImageChops, ImageEnhance, ImageStat
8✔
54
except ImportError as e:  # pragma: no cover
55
    Image = str(e)  # type: ignore[assignment]
56

57
# https://stackoverflow.com/questions/712791
58
try:
8✔
59
    import simplejson as jsonlib
8✔
60
except ImportError:  # pragma: no cover
61
    import json as jsonlib  # type: ignore[no-redef]
62

63
try:
8✔
64
    import xmltodict
8✔
65
except ImportError as e:  # pragma: no cover
66
    xmltodict = str(e)  # type: ignore[no-redef]
67

68
# https://stackoverflow.com/questions/39740632
69
if TYPE_CHECKING:
70
    from webchanges.handler import JobState
71

72

73
logger = logging.getLogger(__name__)
8✔
74

75

76
class DifferBase(metaclass=TrackSubClasses):
8✔
77
    """The base class for differs."""
78

79
    __subclasses__: dict[str, type[DifferBase]] = {}
8✔
80
    __anonymous_subclasses__: list[type[DifferBase]] = []
8✔
81

82
    __kind__: str = ''
8✔
83

84
    __supported_directives__: dict[str, str] = {}  # this must be present, even if empty
8✔
85

86
    css_added_style = 'background-color:#d1ffd1;color:#082b08;'
8✔
87
    css_deltd_style = 'background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;'
8✔
88

89
    def __init__(self, state: JobState) -> None:
8✔
90
        """
91

92
        :param state: the JobState.
93
        """
94
        self.job = state.job
8✔
95
        self.state = state
8✔
96

97
    @classmethod
8✔
98
    def differ_documentation(cls) -> str:
8✔
99
        """Generates simple differ documentation for use in the --features command line argument.
100

101
        :returns: A string to display.
102
        """
103
        result: list[str] = []
8✔
104
        for sc in TrackSubClasses.sorted_by_kind(cls):
8✔
105
            # default_subdirective = getattr(sc, '__default_subdirective__', None)
106
            result.extend((f'  * {sc.__kind__} - {sc.__doc__}',))
8✔
107
            if hasattr(sc, '__supported_directives__'):
8!
108
                for key, doc in sc.__supported_directives__.items():
8✔
109
                    result.append(f'      {key} ... {doc}')
8✔
110
        result.append('\n[] ... Parameter can be supplied as unnamed value\n')
8✔
111
        return '\n'.join(result)
8✔
112

113
    @classmethod
8✔
114
    def normalize_differ(
8✔
115
        cls,
116
        differ_spec: Optional[dict[str, Any]],
117
        job_index_number: Optional[int] = None,
118
    ) -> tuple[str, dict[str, Any]]:
119
        """Checks the differ_spec for its validity and applies default values.
120

121
        :param differ_spec: The differ as entered by the user; use "unified" if empty.
122
        :param job_index_number: The job index number.
123
        :returns: A validated differ_kind, subdirectives (where subdirectives is a dict).
124
        """
125
        differ_spec = differ_spec or {'name': 'unified'}
8✔
126
        subdirectives = differ_spec.copy()
8✔
127
        differ_kind = subdirectives.pop('name', '')
8✔
128
        if not differ_kind:
8✔
129
            if list(subdirectives.keys()) == ['command']:
8!
130
                differ_kind = 'command'
8✔
131
            else:
132
                raise ValueError(
×
133
                    f"Job {job_index_number}: Differ directive must have a 'name' sub-directive: {differ_spec}."
134
                )
135

136
        differcls = cls.__subclasses__.get(differ_kind, None)
8✔
137
        if not differcls:
8✔
138
            raise ValueError(f'Job {job_index_number}: No differ named {differ_kind}.')
8✔
139

140
        if hasattr(differcls, '__supported_directives__'):
8!
141
            provided_keys = set(subdirectives.keys())
8✔
142
            allowed_keys = set(differcls.__supported_directives__.keys())
8✔
143
            unknown_keys = provided_keys.difference(allowed_keys)
8✔
144
            if unknown_keys and '<any>' not in allowed_keys:
8✔
145
                raise ValueError(
8✔
146
                    f'Job {job_index_number}: Differ {differ_kind} does not support sub-directive(s) '
147
                    f"{', '.join(unknown_keys)} (supported: {', '.join(sorted(allowed_keys))})."
148
                )
149

150
        return differ_kind, subdirectives
8✔
151

152
    @classmethod
8✔
153
    def process(
8✔
154
        cls,
155
        differ_kind: str,
156
        directives: dict[str, Any],
157
        job_state: JobState,
158
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
159
        tz: Optional[str] = None,
160
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
161
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
162
        """Process the differ.
163

164
        :param differ_kind: The name of the differ.
165
        :param directives: The directives.
166
        :param job_state: The JobState.
167
        :param report_kind: The report kind required.
168
        :param tz: The timezone of the report.
169
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
170
           for a different report_kind.
171
        :returns: The output of the differ or a an error message with traceback if it fails.
172
        """
173
        logger.info(f'Job {job_state.job.index_number}: Applying differ {differ_kind}, directives {directives}')
8✔
174
        differcls: Optional[type[DifferBase]] = cls.__subclasses__.get(differ_kind)  # type: ignore[assignment]
8✔
175
        if differcls:
8✔
176
            try:
8✔
177
                return differcls(job_state).differ(directives, report_kind, _unfiltered_diff, tz)
8✔
178
            except Exception as e:
8✔
179
                # Differ failed
180
                logger.info(
8✔
181
                    f'Job {job_state.job.index_number}: Differ {differ_kind} with {directives=} encountered '
182
                    f'error {e}'
183
                )
184
                # Undo saving of new data since user won't see the diff
185
                job_state.delete_latest()
8✔
186

187
                job_state.exception = e
8✔
188
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
189
                directives_text = ', '.join(f'{key}={value}' for key, value in directives.items()) or 'None'
8✔
190
                return {
8✔
191
                    'text': (
192
                        f'Differ {differ_kind} with directive(s) {directives_text} encountered an '
193
                        f'error:\n\n{job_state.traceback.strip()}'
194
                    ),
195
                    'markdown': (
196
                        f'## Differ {differ_kind} with directive(s) {directives_text} '
197
                        f'encountered an error:\n```\n{job_state.traceback.strip()}\n```\n'
198
                    ),
199
                    'html': (
200
                        f'<span style="color:red;font-weight:bold">Differ {differ_kind} with directive(s) '
201
                        f'{directives_text} encountered an error:<br>\n<br>\n'
202
                        f'<span style="font-family:monospace;white-space:pre-wrap;">{job_state.traceback.strip()}'
203
                        f'</span></span>'
204
                    ),
205
                }
206
        else:
207
            return {}
8✔
208

209
    def differ(
8✔
210
        self,
211
        directives: dict[str, Any],
212
        report_kind: Literal['text', 'markdown', 'html'],
213
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
214
        tz: Optional[str] = None,
215
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
216
        """Create a diff from the data. Since this function could be called by different reporters of multiple report
217
        types ('text', 'markdown', 'html'), the differ outputs a dict with data for the report_kind it generated so
218
        that it can be reused.
219

220
        :param directives: The directives.
221
        :param report_kind: The report_kind for which a diff must be generated (at a minimum).
222
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
223
           for a different report_kind.
224
        :param tz: The timezone of the report.
225
        :returns: An empty dict if there is no change, otherwise a dict with report_kind as key and diff as value
226
           (as a minimum for the report_kind requested).
227
        :raises RuntimeError: If the external diff tool returns an error.
228
        """
229
        raise NotImplementedError()
230

231
    @staticmethod
8✔
232
    def make_timestamp(
8✔
233
        timestamp: float,
234
        tz: Optional[str] = None,
235
    ) -> str:
236
        """Creates a datetime string in RFC 5322 (email) format with the time zone name (if available) in the
237
        Comments and Folding White Space (CFWS) section.
238

239
        :param timestamp: The timestamp.
240
        :param tz: The IANA timezone of the report.
241
        :returns: A datetime string in RFC 5322 (email) format.
242
        """
243
        if timestamp:
8✔
244
            if tz:
8✔
245
                tz_info: Optional[ZoneInfo] = ZoneInfo(tz)
8✔
246
            else:
247
                tz_info = None
8✔
248
            dt = datetime.fromtimestamp(timestamp).astimezone(tz=tz_info)
8✔
249
            # add timezone name if known
250
            if dt.strftime('%Z') != dt.strftime('%z')[:3]:
8✔
251
                cfws = f" ({dt.strftime('%Z')})"
8✔
252
            else:
253
                cfws = ''
8✔
254
            return dt.strftime('%a, %d %b %Y %H:%M:%S %z') + cfws
8✔
255
        else:
256
            return 'NEW'
8✔
257

258
    @staticmethod
8✔
259
    def html2text(data: str) -> str:
8✔
260
        """Converts html to text.
261

262
        :param data: the string in html format.
263
        :returns: the string in text format.
264
        """
265
        parser = html2text.HTML2Text()
8✔
266
        parser.unicode_snob = True
8✔
267
        parser.body_width = 0
8✔
268
        parser.ignore_images = True
8✔
269
        parser.single_line_break = True
8✔
270
        parser.wrap_links = False
8✔
271
        return '\n'.join(line.rstrip() for line in parser.handle(data).splitlines())
8✔
272

273
    def raise_import_error(self, package_name: str, error_message: str) -> None:
8✔
274
        """Raise ImportError for missing package.
275

276
        :param package_name: The name of the module/package that could not be imported.
277
        :param error_message: The error message from ImportError.
278

279
        :raises: ImportError.
280
        """
281
        raise ImportError(
8✔
282
            f"Job {self.job.index_number}: Python package '{package_name}' is not installed; cannot use "
283
            f"'differ: {self.__kind__}' ({self.job.get_location()})\n{error_message}"
284
        )
285

286

287
class UnifiedDiffer(DifferBase):
8✔
288
    """(Default) Generates a unified diff."""
289

290
    __kind__ = 'unified'
8✔
291

292
    __supported_directives__ = {
8✔
293
        'context_lines': 'the number of context lines (default: 3)',
294
        'range_info': 'include range information lines (default: true)',
295
    }
296

297
    def unified_diff_to_html(self, diff: str) -> Iterator[str]:
8✔
298
        """
299
        Generates a colorized HTML table from unified diff, applying styles and processing based on job values.
300

301
        :param diff: the unified diff
302
        """
303

304
        def process_line(line: str, line_num: int, is_markdown: bool, monospace_style: str) -> str:
8✔
305
            """
306
            Processes each line for HTML output, handling special cases and styles.
307

308
            :param line: The line to analyze.
309
            :param line_num: The line number in the document.
310
            :param monospace_style: Additional style string for monospace text.
311

312
            :returns: The line processed into an HTML table row string.
313
            """
314
            # The style= string (or empty string) to add to an HTML tag.
315
            if line_num == 0:
8✔
316
                style = 'font-family:monospace;color:darkred;'
8✔
317
            elif line_num == 1:
8✔
318
                style = 'font-family:monospace;color:darkgreen;'
8✔
319
            elif line[0] == '+':  # addition
8✔
320
                style = f'{monospace_style}{self.css_added_style}'
8✔
321
            elif line[0] == '-':  # deletion
8✔
322
                style = f'{monospace_style}{self.css_deltd_style}'
8✔
323
            elif line[0] == ' ':  # context line
8✔
324
                style = monospace_style
8✔
325
            elif line[0] == '@':  # range information
8✔
326
                style = 'font-family:monospace;background-color:#fbfbfb;'
8✔
327
            elif line[0] == '/':  # informational header added by additions_only or deletions_only filters
8!
328
                style = 'background-color:lightyellow;'
8✔
329
            else:
330
                raise RuntimeError('Unified Diff does not comform to standard!')
×
331
            style = f' style="{style}"' if style else ''
8✔
332

333
            if line_num > 1 and line[0] != '@':  # don't apply to headers or range information
8✔
334
                if is_markdown or line[0] == '/':  # our informational header
8✔
335
                    line = mark_to_html(line[1:], self.job.markdown_padded_tables)
8✔
336
                else:
337
                    line = linkify(line[1:])
8✔
338
            return f'<tr><td{style}>{line}</td></tr>'
8✔
339

340
        table_style = (
8✔
341
            ' style="border-collapse:collapse;font-family:monospace;white-space:pre-wrap;"'
342
            if self.job.monospace
343
            else ' style="border-collapse:collapse;"'
344
        )
345
        yield f'<table{table_style}>'
8✔
346
        is_markdown = self.state.is_markdown()
8✔
347
        monospace_style = 'font-family:monospace;' if self.job.monospace else ''
8✔
348
        for i, line in enumerate(diff.splitlines()):
8✔
349
            yield process_line(line, i, is_markdown, monospace_style)
8✔
350
        yield '</table>'
8✔
351

352
    def differ(
8✔
353
        self,
354
        directives: dict[str, Any],
355
        report_kind: Literal['text', 'markdown', 'html'],
356
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
357
        tz: Optional[str] = None,
358
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
359
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
360
        if report_kind == 'html' and _unfiltered_diff is not None and 'text' in _unfiltered_diff:
8✔
361
            diff_text = _unfiltered_diff['text']
8✔
362
        else:
363
            empty_return: dict[Literal['text', 'markdown', 'html'], str] = {'text': '', 'markdown': '', 'html': ''}
8✔
364
            contextlines = directives.get('context_lines', self.job.contextlines)
8✔
365
            if contextlines is None:
8✔
366
                if self.job.additions_only or self.job.deletions_only:
8✔
367
                    contextlines = 0
8✔
368
                else:
369
                    contextlines = 3
8✔
370
            diff = list(
8✔
371
                difflib.unified_diff(
372
                    str(self.state.old_data).splitlines(),
373
                    str(self.state.new_data).splitlines(),
374
                    '@',
375
                    '@',
376
                    self.make_timestamp(self.state.old_timestamp, tz),
377
                    self.make_timestamp(self.state.new_timestamp, tz),
378
                    contextlines,
379
                    lineterm='',
380
                )
381
            )
382
            if not diff:
8✔
383
                self.state.verb = 'changed,no_report'
8✔
384
                return empty_return
8✔
385
            # replace tabs in header lines
386
            diff[0] = diff[0].replace('\t', ' ')
8✔
387
            diff[1] = diff[1].replace('\t', ' ')
8✔
388

389
            if self.job.additions_only:
8✔
390
                if len(self.state.old_data) and len(self.state.new_data) / len(self.state.old_data) <= 0.25:
8✔
391
                    diff = (
8✔
392
                        diff[:2]
393
                        + ['/**Comparison type: Additions only**']
394
                        + ['/**Deletions are being shown as 75% or more of the content has been deleted**']
395
                        + diff[2:]
396
                    )
397
                else:
398
                    head = '---' + diff[0][3:]
8✔
399
                    diff = [line for line in diff if line.startswith('+') or line.startswith('@')]
8✔
400
                    diff = [
8✔
401
                        line1
402
                        for line1, line2 in zip([''] + diff, diff + [''])
403
                        if not (line1.startswith('@') and line2.startswith('@'))
404
                    ][1:]
405
                    diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
406
                    if len(diff) == 1 or len([line for line in diff if line.lstrip('+').rstrip()]) == 2:
8✔
407
                        self.state.verb = 'changed,no_report'
8✔
408
                        return empty_return
8✔
409
                    diff = [head, diff[0], '/**Comparison type: Additions only**'] + diff[1:]
8✔
410
            elif self.job.deletions_only:
8✔
411
                head = '--- @' + diff[1][3:]
8✔
412
                diff = [line for line in diff if line.startswith('-') or line.startswith('@')]
8✔
413
                diff = [
8✔
414
                    line1
415
                    for line1, line2 in zip([''] + diff, diff + [''])
416
                    if not (line1.startswith('@') and line2.startswith('@'))
417
                ][1:]
418
                diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
419
                if len(diff) == 1 or len([line for line in diff if line.lstrip('-').rstrip()]) == 2:
8✔
420
                    self.state.verb = 'changed,no_report'
8✔
421
                    return empty_return
8✔
422
                diff = [diff[0], head, '/**Comparison type: Deletions only**'] + diff[1:]
8✔
423

424
            # remove range info lines if needed
425
            if directives.get('range_info') is False or (
8✔
426
                directives.get('range_info') is None
427
                and self.job.additions_only
428
                and (len(diff) < 4 or diff[3][0] != '/')
429
            ):
430
                diff = [line for line in diff if not line.startswith('@@ ')]
8✔
431

432
            diff_text = '\n'.join(diff)
8✔
433

434
            out_diff.update(
8✔
435
                {
436
                    'text': diff_text,
437
                    'markdown': diff_text,
438
                }
439
            )
440

441
        if report_kind == 'html':
8✔
442
            out_diff['html'] = '\n'.join(self.unified_diff_to_html(diff_text))
8✔
443

444
        return out_diff
8✔
445

446

447
class TableDiffer(DifferBase):
8✔
448
    """Generates a Python HTML table diff."""
449

450
    __kind__ = 'table'
8✔
451

452
    __supported_directives__ = {
8✔
453
        'tabsize': 'tab stop spacing (default: 8)',
454
    }
455

456
    def differ(
8✔
457
        self,
458
        directives: dict[str, Any],
459
        report_kind: Literal['text', 'markdown', 'html'],
460
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
461
        tz: Optional[str] = None,
462
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
463
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
464
        if report_kind in {'text', 'markdown'} and _unfiltered_diff is not None and 'html' in _unfiltered_diff:
8✔
465
            table = _unfiltered_diff['html']
8✔
466
        else:
467
            tabsize = int(directives.get('tabsize', 8))
8✔
468
            html_diff = difflib.HtmlDiff(tabsize=tabsize)
8✔
469
            table = html_diff.make_table(
8✔
470
                str(self.state.old_data).splitlines(keepends=True),
471
                str(self.state.new_data).splitlines(keepends=True),
472
                self.make_timestamp(self.state.old_timestamp, tz),
473
                self.make_timestamp(self.state.new_timestamp, tz),
474
                True,
475
                3,
476
            )
477
            # fix table formatting
478
            table = table.replace('<th ', '<th style="font-family:monospace" ')
8✔
479
            table = table.replace('<td ', '<td style="font-family:monospace" ')
8✔
480
            table = table.replace(' nowrap="nowrap"', '')
8✔
481
            table = table.replace('<a ', '<a style="font-family:monospace;color:inherit" ')
8✔
482
            table = table.replace('<span class="diff_add"', '<span style="color:green;background-color:lightgreen"')
8✔
483
            table = table.replace('<span class="diff_sub"', '<span style="color:red;background-color:lightred"')
8✔
484
            table = table.replace('<span class="diff_chg"', '<span style="color:orange;background-color:lightyellow"')
8✔
485
            out_diff['html'] = table
8✔
486

487
        if report_kind in {'text', 'markdown'}:
8✔
488
            diff_text = self.html2text(table)
8✔
489
            out_diff.update(
8✔
490
                {
491
                    'text': diff_text,
492
                    'markdown': diff_text,
493
                }
494
            )
495

496
        return out_diff
8✔
497

498

499
class CommandDiffer(DifferBase):
8✔
500
    """Runs an external command to generate the diff."""
501

502
    __kind__ = 'command'
8✔
503

504
    __supported_directives__ = {
8✔
505
        'command': 'The command to execute',
506
    }
507

508
    re_ptags = re.compile(r'^<p>|</p>$')
8✔
509
    re_htags = re.compile(r'<(/?)h\d>')
8✔
510
    re_tagend = re.compile(r'<(?!.*<).*>+$')
8✔
511

512
    def differ(
8✔
513
        self,
514
        directives: dict[str, Any],
515
        report_kind: Literal['text', 'markdown', 'html'],
516
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
517
        tz: Optional[str] = None,
518
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
519
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
520
        command = directives['command']
8✔
521
        if (
8✔
522
            report_kind == 'html'
523
            and not command.startswith('wdiff')
524
            and _unfiltered_diff is not None
525
            and 'text' in _unfiltered_diff
526
        ):
527
            diff = _unfiltered_diff['text']
8✔
528
        else:
529
            old_data = self.state.old_data
8✔
530
            new_data = self.state.new_data
8✔
531
            if self.state.is_markdown():
8✔
532
                # protect the link anchor from being split (won't work)
533
                markdown_links_re = re.compile(r'\[(.*?)][(](.*?)[)]')
8✔
534
                old_data = markdown_links_re.sub(
8!
535
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(old_data)
536
                )
537
                new_data = markdown_links_re.sub(
8!
538
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(new_data)
539
                )
540

541
            # External diff tool
542
            with tempfile.TemporaryDirectory() as tmp_dir:
8✔
543
                tmp_path = Path(tmp_dir)
8✔
544
                old_file_path = tmp_path.joinpath('old_file')
8✔
545
                new_file_path = tmp_path.joinpath('new_file')
8✔
546
                if isinstance(old_data, str):
8!
547
                    old_file_path.write_text(old_data)
8✔
548
                else:
549
                    old_file_path.write_bytes(old_data)
×
550
                if isinstance(new_data, str):
8!
551
                    new_file_path.write_text(new_data)
8✔
552
                else:
553
                    new_file_path.write_bytes(new_data)
×
554
                cmdline = shlex.split(command) + [str(old_file_path), str(new_file_path)]
8✔
555
                proc = subprocess.run(cmdline, capture_output=True, text=True)  # noqa: S603 subprocess call
8✔
556
            if proc.stderr or proc.returncode > 1:
8✔
557
                raise RuntimeError(
8✔
558
                    f"Job {self.job.index_number}: External differ '{directives}' returned '{proc.stderr.strip()}' "
559
                    f'({self.job.get_location()})'
560
                ) from subprocess.CalledProcessError(proc.returncode, cmdline)
561
            if proc.returncode == 0:
8✔
562
                self.state.verb = 'changed,no_report'
8✔
563
                return {'text': '', 'markdown': '', 'html': ''}
8✔
564
            head = '\n'.join(
8✔
565
                [
566
                    f'Using differ "{directives}"',
567
                    f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}',
568
                    f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}',
569
                ]
570
            )
571
            diff = proc.stdout
8✔
572
            if self.state.is_markdown():
8!
573
                # undo the protection of the link anchor from being split
574
                diff = markdown_links_re.sub(lambda x: f'[{urllib.parse.unquote(x.group(1))}]({x.group(2)})', diff)
8!
575
            if command.startswith('wdiff') and self.job.contextlines == 0:
8!
576
                # remove lines that don't have any changes
577
                keeplines = []
×
578
                for line in diff.splitlines(keepends=True):
×
579
                    if any(x in line for x in {'{+', '+}', '[-', '-]'}):
×
580
                        keeplines.append(line)
×
581
                diff = ''.join(keeplines)
×
582
            diff = f'{head}\n{diff}'
8✔
583
            out_diff.update(
8✔
584
                {
585
                    'text': diff,
586
                    'markdown': diff,
587
                }
588
            )
589

590
        if report_kind == 'html':
8✔
591
            if command.startswith('wdiff'):
8!
592
                # colorize output of wdiff
593
                out_diff['html'] = self.wdiff_to_html(diff)
×
594
            else:
595
                out_diff['html'] = html.escape(diff)
8✔
596

597
        return out_diff
8✔
598

599
    def wdiff_to_html(self, diff: str) -> str:
8✔
600
        """
601
        Colorize output of wdiff.
602

603
        :param diff: The output of the wdiff command.
604
        :returns: The colorized HTML output.
605
        """
606
        html_diff = html.escape(diff)
8✔
607
        if self.state.is_markdown():
8✔
608
            # detect and fix multiline additions or deletions
609
            is_add = False
8✔
610
            is_del = False
8✔
611
            new_diff = []
8✔
612
            for line in html_diff.splitlines():
8✔
613
                if is_add:
8✔
614
                    line = '{+' + line
8✔
615
                    is_add = False
8✔
616
                elif is_del:
8✔
617
                    line = '[-' + line
8✔
618
                    is_del = False
8✔
619
                for match in re.findall(r'\[-|-]|{\+|\+}', line):
8✔
620
                    if match == '[-':
8✔
621
                        is_del = True
8✔
622
                    if match == '-]':
8✔
623
                        is_del = False
8✔
624
                    if match == '{+':
8✔
625
                        is_add = True
8✔
626
                    if match == '+}':
8✔
627
                        is_add = False
8✔
628
                if is_add:
8✔
629
                    line += '+}'
8✔
630
                elif is_del:
8✔
631
                    line += '-]'
8✔
632
                new_diff.append(line)
8✔
633
            html_diff = '<br>\n'.join(new_diff)
8✔
634

635
        # wdiff colorization (cannot be done with global CSS class as Gmail overrides it)
636
        html_diff = re.sub(
8✔
637
            r'\{\+(.*?)\+}',
638
            lambda x: f'<span style="{self.css_added_style}">{x.group(1)}</span>',
639
            html_diff,
640
            flags=re.DOTALL,
641
        )
642
        html_diff = re.sub(
8✔
643
            r'\[-(.*?)-]',
644
            lambda x: f'<span style="{self.css_deltd_style}">{x.group(1)}</span>',
645
            html_diff,
646
            flags=re.DOTALL,
647
        )
648
        if self.job.monospace:
8✔
649
            return f'<span style="font-family:monospace;white-space:pre-wrap">{html_diff}</span>'
8✔
650
        else:
651
            return html_diff
8✔
652

653

654
class DeepdiffDiffer(DifferBase):
8✔
655

656
    __kind__ = 'deepdiff'
8✔
657

658
    __supported_directives__ = {
8✔
659
        'data_type': "either 'json' (default) or 'xml'",
660
        'ignore_order': 'Whether to ignore the order in which the items have appeared (default: false)',
661
        'ignore_string_case': 'Whether to be case-sensitive or not when comparing strings (default: false)',
662
        'significant_digits': (
663
            'The number of digits AFTER the decimal point to be used in the comparison (default: ' 'no limit)'
664
        ),
665
    }
666

667
    def differ(
8✔
668
        self,
669
        directives: dict[str, Any],
670
        report_kind: Literal['text', 'markdown', 'html'],
671
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
672
        tz: Optional[str] = None,
673
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
674
        if isinstance(DeepDiff, str):  # pragma: no cover
675
            self.raise_import_error('deepdiff', DeepDiff)
676

677
        span_added = f'<span style="{self.css_added_style}">'
8✔
678
        span_deltd = f'<span style="{self.css_deltd_style}">'
8✔
679

680
        def _pretty_deepdiff(ddiff: DeepDiff, report_kind: Literal['text', 'markdown', 'html']) -> str:
8✔
681
            """
682
            Customized version of deepdiff.serialization.SerializationMixin.pretty method, edited to include the
683
            values deleted or added and an option for colorized HTML output. The pretty human-readable string
684
            output for the diff object regardless of what view was used to generate the diff.
685
            """
686
            if report_kind == 'html':
8✔
687
                PRETTY_FORM_TEXTS = {
8✔
688
                    'type_changes': (
689
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
690
                        f'from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}</span>.'
691
                    ),
692
                    'values_changed': (
693
                        f'Value of {{diff_path}} changed from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}'
694
                        '</span>.'
695
                    ),
696
                    'dictionary_item_added': (
697
                        f'Item {{diff_path}} added to dictionary as {span_added}{{val_t2}}</span>.'
698
                    ),
699
                    'dictionary_item_removed': (
700
                        f'Item {{diff_path}} removed from dictionary (was {span_deltd}{{val_t1}}</span>).'
701
                    ),
702
                    'iterable_item_added': f'Item {{diff_path}} added to iterable as {span_added}{{val_t2}}</span>.',
703
                    'iterable_item_removed': (
704
                        f'Item {{diff_path}} removed from iterable (was {span_deltd}{{val_t1}}</span>).'
705
                    ),
706
                    'attribute_added': f'Attribute {{diff_path}} added as {span_added}{{val_t2}}</span>.',
707
                    'attribute_removed': f'Attribute {{diff_path}} removed (was {span_deltd}{{val_t1}}</span>).',
708
                    'set_item_added': f'Item root[{{val_t2}}] added to set as {span_added}{{val_t1}}</span>.',
709
                    'set_item_removed': (
710
                        f'Item root[{{val_t1}}] removed from set (was {span_deltd}{{val_t2}}</span>).'
711
                    ),
712
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
713
                }
714
            else:
715
                PRETTY_FORM_TEXTS = {
8✔
716
                    'type_changes': (
717
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
718
                        'from {val_t1} to {val_t2}.'
719
                    ),
720
                    'values_changed': 'Value of {diff_path} changed from {val_t1} to {val_t2}.',
721
                    'dictionary_item_added': 'Item {diff_path} added to dictionary as {val_t2}.',
722
                    'dictionary_item_removed': 'Item {diff_path} removed from dictionary (was {val_t1}).',
723
                    'iterable_item_added': 'Item {diff_path} added to iterable as {val_t2}.',
724
                    'iterable_item_removed': 'Item {diff_path} removed from iterable (was {val_t1}).',
725
                    'attribute_added': 'Attribute {diff_path} added as {val_t2}.',
726
                    'attribute_removed': 'Attribute {diff_path} removed (was {val_t1}).',
727
                    'set_item_added': 'Item root[{val_t2}] added to set as {val_t1}.',
728
                    'set_item_removed': 'Item root[{val_t1}] removed from set (was {val_t2}).',
729
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
730
                }
731

732
            def _pretty_print_diff(ddiff: DiffLevel) -> str:
8✔
733
                """
734
                Customized version of deepdiff.serialization.pretty_print_diff() function, edited to include the
735
                values deleted or added.
736
                """
737
                type_t1 = type(ddiff.t1).__name__
8✔
738
                type_t2 = type(ddiff.t2).__name__
8✔
739

740
                val_t1 = (
8✔
741
                    f'"{ddiff.t1}"'
742
                    if type_t1 in {'str', 'int', 'float'}
743
                    else (
744
                        jsonlib.dumps(ddiff.t1, ensure_ascii=False, indent=2)
745
                        if type_t1 in {'dict', 'list'}
746
                        else str(ddiff.t1)
747
                    )
748
                )
749
                val_t2 = (
8✔
750
                    f'"{ddiff.t2}"'
751
                    if type_t2 in {'str', 'int', 'float'}
752
                    else (
753
                        jsonlib.dumps(ddiff.t2, ensure_ascii=False, indent=2)
754
                        if type_t2 in {'dict', 'list'}
755
                        else str(ddiff.t2)
756
                    )
757
                )
758

759
                diff_path = ddiff.path()
8✔
760
                return '• ' + PRETTY_FORM_TEXTS.get(ddiff.report_type, '').format(
8✔
761
                    diff_path=diff_path,
762
                    type_t1=type_t1,
763
                    type_t2=type_t2,
764
                    val_t1=val_t1,
765
                    val_t2=val_t2,
766
                )
767

768
            result = []
8✔
769
            for key in ddiff.tree.keys():
8✔
770
                for item_key in ddiff.tree[key]:
8✔
771
                    result.append(_pretty_print_diff(item_key))
8✔
772

773
            return '\n'.join(result)
8✔
774

775
        data_type = directives.get('data_type', 'json')
8✔
776
        old_data = ''
8✔
777
        new_data = ''
8✔
778
        if data_type == 'json':
8✔
779
            try:
8✔
780
                old_data = jsonlib.loads(self.state.old_data)
8✔
781
            except jsonlib.JSONDecodeError:
8✔
782
                old_data = ''
8✔
783
            try:
8✔
784
                new_data = jsonlib.loads(self.state.new_data)
8✔
785
            except jsonlib.JSONDecodeError as e:
8✔
786
                self.state.exception = e
8✔
787
                self.state.traceback = self.job.format_error(e, traceback.format_exc())
8✔
788
                logger.error(f'Job {self.job.index_number}: New data is invalid JSON: {e} ({self.job.get_location()})')
8✔
789
                logger.info(f'Job {self.job.index_number}: {self.state.new_data!r}')
8✔
790
                return {
8✔
791
                    'text': f'Differ {self.__kind__} ERROR: New data is invalid JSON\n{e}',
792
                    'markdown': f'Differ {self.__kind__} **ERROR: New data is invalid JSON**\n{e}',
793
                    'html': f'Differ {self.__kind__} <b>ERROR: New data is invalid JSON</b>\n{e}',
794
                }
795
        elif data_type == 'xml':
8✔
796
            if isinstance(xmltodict, str):  # pragma: no cover
797
                self.raise_import_error('xmltodict', xmltodict)
798

799
            old_data = xmltodict.parse(self.state.old_data)
8✔
800
            new_data = xmltodict.parse(self.state.new_data)
8✔
801

802
        ignore_order = directives.get('ignore_order')
8✔
803
        ignore_string_case = directives.get('ignore_string_case')
8✔
804
        significant_digits = directives.get('significant_digits')
8✔
805
        ddiff = DeepDiff(
8✔
806
            old_data,
807
            new_data,
808
            cache_size=500,
809
            cache_purge_level=0,
810
            cache_tuning_sample_size=500,
811
            ignore_order=ignore_order,
812
            ignore_string_type_changes=True,
813
            ignore_numeric_type_changes=True,
814
            ignore_string_case=ignore_string_case,
815
            significant_digits=significant_digits,
816
            verbose_level=min(2, max(0, math.ceil(3 - logger.getEffectiveLevel() / 10))),
817
        )
818
        diff_text = _pretty_deepdiff(ddiff, report_kind)
8✔
819
        if not diff_text:
8✔
820
            self.state.verb = 'changed,no_report'
8✔
821
            return {'text': '', 'markdown': '', 'html': ''}
8✔
822

823
        self.job.set_to_monospace()
8✔
824
        if report_kind == 'html':
8✔
825
            html_diff = (
8✔
826
                f'<span style="font-family:monospace;white-space:pre-wrap;">'
827
                # f'Differ: {self.__kind__} for {data_type}\n'
828
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>\n'
829
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>\n'
830
                + diff_text[:-1].replace('][', ']<wbr>[')
831
                + '</span>'
832
            )
833
            return {'html': html_diff}
8✔
834
        else:
835
            text_diff = (
8✔
836
                # f'Differ: {self.__kind__} for {data_type}\n'
837
                f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\n'
838
                f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\n'
839
                f'{diff_text}'
840
            )
841
            return {'text': text_diff, 'markdown': text_diff}
8✔
842

843

844
class ImageDiffer(DifferBase):
8✔
845
    """Compares two images providing an image outlining areas that have changed."""
846

847
    __kind__ = 'image'
8✔
848

849
    __supported_directives__ = {
8✔
850
        'data_type': (
851
            "'url' (to retrieve an image), 'ascii85' (Ascii85 data), 'base64' (Base64 data) or 'filename' (the path "
852
            "to an image file) (default: 'url')"
853
        ),
854
        'mse_threshold': (
855
            'the minimum mean squared error (MSE) between two images to consider them changed if numpy in installed '
856
            '(default: 2.5)'
857
        ),
858
    }
859

860
    def differ(
8✔
861
        self,
862
        directives: dict[str, Any],
863
        report_kind: Literal['text', 'markdown', 'html'],
864
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
865
        tz: Optional[str] = None,
866
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
867
        warnings.warn(
2✔
868
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
869
            f'change in the future. Please report any problems or suggestions at '
870
            f'https://github.com/mborsetti/webchanges/discussions.',
871
            RuntimeWarning,
872
        )
873
        if isinstance(Image, str):  # pragma: no cover
874
            self.raise_import_error('pillow', Image)
875
        if isinstance(httpx, str):  # pragma: no cover
876
            self.raise_import_error('httpx', httpx)
877

878
        def load_image_from_web(url: str) -> Image.Image:
2✔
879
            """Fetches the image from an url."""
880
            logging.debug(f'Retrieving image from {url}')
2✔
881
            with httpx.stream('GET', url, timeout=10) as response:
2✔
882
                response.raise_for_status()
2✔
883
                return Image.open(BytesIO(b''.join(response.iter_bytes())))
2✔
884

885
        def load_image_from_file(filename: str) -> Image.Image:
2✔
886
            """Load an image from a file."""
887
            logging.debug(f'Reading image from {filename}')
2✔
888
            return Image.open(filename)
2✔
889

890
        def load_image_from_base64(base_64: str) -> Image.Image:
2✔
891
            """Load an image from an encoded bytes object."""
892
            logging.debug('Retrieving image from a base64 string')
2✔
893
            return Image.open(BytesIO(base64.b64decode(base_64)))
2✔
894

895
        def load_image_from_ascii85(ascii85: str) -> Image.Image:
2✔
896
            """Load an image from an encoded bytes object."""
897
            logging.debug('Retrieving image from an ascii85 string')
2✔
898
            return Image.open(BytesIO(base64.a85decode(ascii85)))
2✔
899

900
        def compute_diff_image(img1: Image.Image, img2: Image.Image) -> tuple[Image.Image, Optional[np.float64]]:
2✔
901
            """Compute the difference between two images."""
902
            # Compute the absolute value of the pixel-by-pixel difference between the two images.
903
            diff_image = ImageChops.difference(img1, img2)
2✔
904

905
            # Compute the mean squared error between the images
906
            if not isinstance(np, str):
2✔
907
                diff_array = np.array(diff_image)
2✔
908
                mse_value = np.mean(np.square(diff_array))
2✔
909
            else:  # pragma: no cover
910
                mse_value = None
911

912
            # Create the diff image by overlaying this difference on a darkened greyscale background
913
            back_image = img1.convert('L')
2✔
914
            back_image_brightness = ImageStat.Stat(back_image).rms[0]
2✔
915
            back_image = ImageEnhance.Brightness(back_image).enhance(back_image_brightness / 225)
2✔
916

917
            # Convert the 'L' image to 'RGB' using a matrix that applies to yellow tint
918
            # The matrix has 12 elements: 4 for Red, 4 for Green, and 4 for Blue.
919
            # For yellow, we want Red and Green to copy the L values (1.0) and Blue to be zero.
920
            # The matrix is: [R, G, B, A] for each of the three output channels
921
            yellow_tint_matrix = (
2✔
922
                1.0,
923
                0.0,
924
                0.0,
925
                0.0,  # Red = 100% of the grayscale value
926
                1.0,
927
                0.0,
928
                0.0,
929
                0.0,  # Green = 100% of the grayscale value
930
                0.0,
931
                0.0,
932
                0.0,
933
                0.0,  # Blue = 0% of the grayscale value
934
            )
935

936
            # Apply the conversion
937
            diff_colored = diff_image.convert('RGB').convert('RGB', matrix=yellow_tint_matrix)
2✔
938

939
            final_img = ImageChops.add(back_image.convert('RGB'), diff_colored)
2✔
940

941
            return final_img, mse_value
2✔
942

943
        data_type = directives.get('data_type', 'url')
2✔
944
        mse_threshold = directives.get('mse_threshold', 2.5)
2✔
945
        if not isinstance(self.state.old_data, str):
2!
946
            raise ValueError('old_data is not a string')
×
947
        if not isinstance(self.state.new_data, str):
2!
948
            raise ValueError('new_data is not a string')
×
949
        if data_type == 'url':
2✔
950
            old_image = load_image_from_web(self.state.old_data)
2✔
951
            new_image = load_image_from_web(self.state.new_data)
2✔
952
            old_data = f' (<a href="{self.state.old_data}">Old image</a>)'
2✔
953
            new_data = f' (<a href="{self.state.new_data}">New image</a>)'
2✔
954
        elif data_type == 'ascii85':
2✔
955
            old_image = load_image_from_ascii85(self.state.old_data)
2✔
956
            new_image = load_image_from_ascii85(self.state.new_data)
2✔
957
            old_data = ''
2✔
958
            new_data = ''
2✔
959
        elif data_type == 'base64':
2✔
960
            old_image = load_image_from_base64(self.state.old_data)
2✔
961
            new_image = load_image_from_base64(self.state.new_data)
2✔
962
            old_data = ''
2✔
963
            new_data = ''
2✔
964
        else:  # 'filename'
965
            old_image = load_image_from_file(self.state.old_data)
2✔
966
            new_image = load_image_from_file(self.state.new_data)
2✔
967
            old_data = f' (<a href="file://{self.state.old_data}">Old image</a>)'
2✔
968
            new_data = f' (<a href="file://{self.state.new_data}">New image</a>)'
2✔
969

970
        # Check formats  TODO: is it needed? under which circumstances?
971
        # if new_image.format != old_image.format:
972
        #     logger.info(f'Image formats do not match: {old_image.format} vs {new_image.format}')
973
        # else:
974
        #     logger.debug(f'image format is {old_image.format}')
975

976
        # If needed, shrink the larger image
977
        if new_image.size != old_image.size:
2✔
978
            if new_image.size > old_image.size:
2✔
979
                logging.debug(f'Job {self.job.index_number}: Shrinking the new image')
2✔
980
                img_format = new_image.format
2✔
981
                new_image = new_image.resize(old_image.size, Image.Resampling.LANCZOS)
2✔
982
                new_image.format = img_format
2✔
983

984
            else:
985
                logging.debug(f'Job {self.job.index_number}: Shrinking the old image')
2✔
986
                img_format = old_image.format
2✔
987
                old_image = old_image.resize(new_image.size, Image.Resampling.LANCZOS)
2✔
988
                old_image.format = img_format
2✔
989

990
        if old_image == new_image:
2✔
991
            logger.info(f'Job {self.job.index_number}: New image is identical to the old one')
2✔
992
            self.state.verb = 'unchanged'
2✔
993
            return {'text': '', 'markdown': '', 'html': ''}
2✔
994

995
        diff_image, mse_value = compute_diff_image(old_image, new_image)
2✔
996
        if mse_value:
2!
997
            logger.debug(f'Job {self.job.index_number}: MSE value {mse_value:.2f}')
2✔
998

999
        if mse_value and mse_value < mse_threshold:
2✔
1000
            logger.info(
2✔
1001
                f'Job {self.job.index_number}: MSE value {mse_value:.2f} below the threshold of {mse_threshold}; '
1002
                f'considering changes not worthy of a report'
1003
            )
1004
            self.state.verb = 'changed,no_report'
2✔
1005
            return {'text': '', 'markdown': '', 'html': ''}
2✔
1006

1007
        # Convert the difference image to a base64 object
1008
        output_stream = BytesIO()
2✔
1009
        diff_image.save(output_stream, format=new_image.format)
2✔
1010
        encoded_diff = b64encode(output_stream.getvalue()).decode()
2✔
1011

1012
        # Convert the new image to a base64 object
1013
        output_stream = BytesIO()
2✔
1014
        new_image.save(output_stream, format=new_image.format)
2✔
1015
        encoded_new = b64encode(output_stream.getvalue()).decode()
2✔
1016

1017
        # Prepare HTML output
1018
        htm = [
2✔
1019
            f'<span style="font-family:monospace">'
1020
            # f'Differ: {self.__kind__} for {data_type}',
1021
            f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}{old_data}</span>',
1022
            f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}{new_data}'
1023
            f'</span>',
1024
            'New image:',
1025
        ]
1026
        if data_type == 'url':
2✔
1027
            htm.append(f'<img src="{self.state.old_data}" style="max-width: 100%; display: block;">')
2✔
1028
        else:
1029
            htm.append(
2✔
1030
                f'<img src="data:image/{(new_image.format or "").lower()};base64,{encoded_new}" '
1031
                f'style="max-width: 100%; display: block;">'
1032
            )
1033
        htm.extend(
2✔
1034
            [
1035
                'Differences from old (in yellow):',
1036
                f'<img src="data:image/{(old_image.format or "").lower()};base64,{encoded_diff}" ',
1037
                'style="max-width: 100%; display: block;">',
1038
            ]
1039
        )
1040

1041
        return {
2✔
1042
            'text': 'The image has changed; please see an HTML report for the visualization.',
1043
            'markdown': 'The image has changed; please see an HTML report for the visualization.',
1044
            'html': '<br>\n'.join(htm),
1045
        }
1046

1047

1048
class AIGoogleDiffer(DifferBase):
8✔
1049
    """(Default) Generates a summary using Google Generative AI (Gemini models).
1050

1051
    Calls Google Gemini APIs; documentation at https://ai.google.dev/api/rest and tutorial at
1052
    https://ai.google.dev/tutorials/rest_quickstart
1053

1054
    """
1055

1056
    __kind__ = 'ai_google'
8✔
1057

1058
    __supported_directives__ = {
8✔
1059
        'model': (
1060
            'model name from https://ai.google.dev/gemini-api/docs/models/gemini (default: gemini-1.5-flash-latest)'
1061
        ),
1062
        'prompt': 'a custom prompt - {unified_diff}, {old_data} and {new_data} will be replaced; ask for markdown',
1063
        'system_instructions': 'Optional tone and style instructions for the model (default: Respond in Markdown)',
1064
        'prompt_ud_context_lines': 'the number of context lines for {unified_diff} (default: 9999)',
1065
        'timeout': 'the number of seconds before timing out the API call (default: 300)',
1066
        'max_output_tokens': "the maximum number of tokens returned by the model (default: None, i.e. model's default)",
1067
        'temperature': "the model's Temperature parameter (default: 0.0)",
1068
        'top_p': "the model's TopP parameter (default: None, i.e. model's default",
1069
        'top_k': "the model's TopK parameter (default: None, i.e. model's default",
1070
        'token_limit': (
1071
            "the maximum number of tokens, if different from model's default (default: None, i.e. model's default)"
1072
        ),
1073
    }
1074
    __default_subdirective__ = 'model'
8✔
1075

1076
    def differ(
8✔
1077
        self,
1078
        directives: dict[str, Any],
1079
        report_kind: Literal['text', 'markdown', 'html'],
1080
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1081
        tz: str | None = None,
1082
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1083
        logger.info(f'Job {self.job.index_number}: Running the {self.__kind__} differ from hooks.py')
8✔
1084
        warnings.warn(
8✔
1085
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1086
            f'change in the future. Please report any problems or suggestions at '
1087
            f'https://github.com/mborsetti/webchanges/discussions.',
1088
            RuntimeWarning,
1089
        )
1090

1091
        def get_ai_summary(prompt: str, system_instructions: str) -> str:
8✔
1092
            """Generate AI summary from unified diff, or an error message"""
1093
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
8✔
1094
            if len(GOOGLE_AI_API_KEY) != 39:
8✔
1095
                logger.error(
8✔
1096
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1097
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1098
                )
1099
                return (
8✔
1100
                    f'## ERROR in summarizing changes using {self.__kind__}:\n'
1101
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1102
                    f'{len(GOOGLE_AI_API_KEY)}.\n'
1103
                )
1104

1105
            _models_token_limits = {  # from https://ai.google.dev/gemini-api/docs/models/gemini
8✔
1106
                'gemini-1.5-pro-2m': 2097152,
1107
                'gemini-1.5': 2097152,
1108
                'gemini-1.0': 30720,
1109
                'gemini-pro': 30720,  # legacy naming
1110
                'gemma-2': 8192,
1111
            }
1112
            if 'model' not in directives:
8!
1113
                directives['model'] = 'gemini-1.5-flash-latest'  # also for footer
×
1114
            model = directives['model']
8✔
1115
            token_limit = directives.get('token_limit')
8✔
1116
            if not token_limit:
8✔
1117
                for _model, _token_limit in _models_token_limits.items():
8!
1118
                    if model.startswith(_model):
8✔
1119
                        token_limit = _token_limit
8✔
1120
                        break
8✔
1121
                if not token_limit:
8!
1122
                    logger.error(
×
1123
                        f"Job {self.job.index_number}: Differ '{self.__kind__}' does not know `model: {model}` "
1124
                        f"(supported models starting with: {', '.join(sorted(list(_models_token_limits.keys())))}) "
1125
                        f'({self.job.get_location()})'
1126
                    )
1127
                    return f'## ERROR in summarizing changes using {self.__kind__}:\n' f'Unknown model {model}.\n'
×
1128

1129
            if '{unified_diff}' in prompt:
8✔
1130
                context_lines = directives.get('prompt_ud_context_lines', 9999)
8✔
1131
                unified_diff = '\n'.join(
8✔
1132
                    difflib.unified_diff(
1133
                        str(self.state.old_data).splitlines(),
1134
                        str(self.state.new_data).splitlines(),
1135
                        # '@',
1136
                        # '@',
1137
                        # self.make_timestamp(self.state.old_timestamp, tz),
1138
                        # self.make_timestamp(self.state.new_timestamp, tz),
1139
                        n=context_lines,
1140
                    )
1141
                )
1142
                if not unified_diff:
8!
1143
                    # no changes
1144
                    return ''
×
1145
            else:
1146
                unified_diff = ''
8✔
1147

1148
            def _send_to_model(model_prompt: str, system_instructions: str) -> str:
8✔
1149
                """Creates the summary request to the model"""
1150
                api_version = '1beta'
×
1151
                max_output_tokens = directives.get('max_output_tokens')
×
1152
                temperature = directives.get('temperature', 0.0)
×
1153
                top_p = directives.get('top_p')
×
1154
                top_k = directives.get('top_k')
×
1155
                data = {
×
1156
                    'system_instruction': {'parts': [{'text': system_instructions}]},
1157
                    'contents': [{'parts': [{'text': model_prompt}]}],
1158
                    'generation_config': {
1159
                        'max_output_tokens': max_output_tokens,
1160
                        'temperature': temperature,
1161
                        'top_p': top_p,
1162
                        'top_k': top_k,
1163
                    },
1164
                }
1165
                logger.info(f'Job {self.job.index_number}: Making summary request to Google model {model}')
×
1166
                try:
×
1167
                    timeout = directives.get('timeout', 300)
×
1168
                    r = httpx.Client(http2=True).post(
×
1169
                        f'https://generativelanguage.googleapis.com/v{api_version}/models/{model}:generateContent?'
1170
                        f'key={GOOGLE_AI_API_KEY}',
1171
                        json=data,
1172
                        headers={'Content-Type': 'application/json'},
1173
                        timeout=timeout,
1174
                    )
1175
                    if r.is_success:
×
1176
                        result = r.json()
×
1177
                        candidate = result['candidates'][0]
×
1178
                        logger.info(
×
1179
                            f"Job {self.job.index_number}: AI generation finished by {candidate['finishReason']}"
1180
                        )
1181
                        summary = candidate['content']['parts'][0]['text']
×
1182
                    elif r.status_code == 400:
×
1183
                        summary = (
×
1184
                            f'AI summary unavailable: Received error from {r.url.host}: '
1185
                            f"{r.json().get('error', {}).get('message') or ''}"
1186
                        )
1187
                    else:
1188
                        summary = (
×
1189
                            f'AI summary unavailable: Received error {r.status_code} {r.reason_phrase} from '
1190
                            f'{r.url.host}'
1191
                        )
1192
                        if r.content:
×
1193
                            summary += f": {r.json().get('error', {}).get('message') or ''}"
×
1194

1195
                except httpx.HTTPError as e:
×
1196
                    summary = (
×
1197
                        f'AI summary unavailable: HTTP client error: {e} when requesting data from '
1198
                        f'{e.request.url.host}'
1199
                    )
1200

1201
                return summary
×
1202

1203
            # check if data is different (for testing)
1204
            if '{old_data}' in prompt and '{new_data}' in prompt and self.state.old_data == self.state.new_data:
8✔
1205
                return ''
8✔
1206

1207
            model_prompt = prompt.format(
8✔
1208
                unified_diff=unified_diff, old_data=self.state.old_data, new_data=self.state.new_data
1209
            )
1210

1211
            if len(model_prompt) / 4 < token_limit:
8!
1212
                summary = _send_to_model(model_prompt, system_instructions)
×
1213
            elif '{unified_diff}' in prompt:
8!
1214
                logger.info(
8✔
1215
                    f'Job {self.job.index_number}: Model prompt with full diff is too long: '
1216
                    f'({len(model_prompt) / 4:,.0f} est. tokens exceeds limit of {token_limit:,.0f} tokens); '
1217
                    f'recomputing with default contextlines'
1218
                )
1219
                unified_diff = '\n'.join(
8✔
1220
                    difflib.unified_diff(
1221
                        str(self.state.old_data).splitlines(),
1222
                        str(self.state.new_data).splitlines(),
1223
                        # '@',
1224
                        # '@',
1225
                        # self.make_timestamp(self.state.old_timestamp, tz),
1226
                        # self.make_timestamp(self.state.new_timestamp, tz),
1227
                    )
1228
                )
1229
                model_prompt = prompt.format(
8✔
1230
                    unified_diff=unified_diff, old_data=self.state.old_data, new_data=self.state.new_data
1231
                )
1232
                if len(model_prompt) / 4 < token_limit:
8!
1233
                    summary = _send_to_model(model_prompt, system_instructions)
×
1234
                else:
1235
                    summary = (
8✔
1236
                        f'AI summary unavailable (model prompt with unified diff is too long: '
1237
                        f'{len(model_prompt) / 4:,.0f} est. tokens exceeds maximum of {token_limit:,.0f})'
1238
                    )
1239
            else:
1240
                logger.info(
×
1241
                    f'The model prompt may be too long: {len(model_prompt) / 4:,.0f} est. tokens exceeds '
1242
                    f'limit of {token_limit:,.0f} tokens'
1243
                )
1244
                summary = _send_to_model(model_prompt, system_instructions)
×
1245
            return summary
8✔
1246

1247
        prompt = directives.get(
8✔
1248
            'prompt',
1249
            'Identify the changes between the old document (enclosed by an <old> tag) and the new document ('
1250
            'enclosed by a <new> tag) and output a summary of such changes:\n\n<old>\n{old_data}\n</old>\n\n<new>\n'
1251
            '{new_data}\n</new>',
1252
        ).replace('\\n', '\n')
1253
        system_instructions = directives.get('system_instructions', 'Respond in Markdown')
8✔
1254
        summary = get_ai_summary(prompt, system_instructions)
8✔
1255
        if not summary:
8✔
1256
            self.state.verb = 'changed,no_report'
8✔
1257
            return {'text': '', 'markdown': '', 'html': ''}
8✔
1258
        newline = '\n'  # For Python < 3.12 f-string compatibility
8✔
1259
        back_n = '\\n'  # For Python < 3.12 f-string compatibility
8✔
1260
        directives_text = (
8✔
1261
            ', '.join(f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.items()) or 'None'
1262
        )
1263
        footer = f'Summary generated by Google Generative AI (differ directive(s): {directives_text})'
8✔
1264
        temp_unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
1265
        for rep_kind in ['text', 'html']:  # markdown is same as text
8✔
1266
            unified_report = DifferBase.process(
8✔
1267
                'unified',
1268
                directives.get('unified') or {},  # type: ignore[arg-type]
1269
                self.state,
1270
                rep_kind,  # type: ignore[arg-type]
1271
                tz,
1272
                temp_unfiltered_diff,
1273
            )
1274
        return {
8✔
1275
            'text': summary + '\n\n' + unified_report['text'] + '\n------------\n' + footer,
1276
            'markdown': summary + '\n\n' + unified_report['markdown'] + '\n* * *\n' + footer,
1277
            'html': (
1278
                mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>')
1279
                + '<br>'
1280
                + unified_report['html']
1281
                + '-----<br>'
1282
                + f'<i><small>{footer}</small></i>'
1283
            ),
1284
        }
1285

1286

1287
class WdiffDiffer(DifferBase):
8✔
1288
    __kind__ = 'wdiff'
8✔
1289

1290
    __supported_directives__: dict[str, str] = {
8✔
1291
        'context_lines': 'the number of context lines (default: 3)',
1292
        'range_info': 'include range information lines (default: true)',
1293
    }
1294

1295
    def differ(
8✔
1296
        self,
1297
        directives: dict[str, Any],
1298
        report_kind: Literal['text', 'markdown', 'html'],
1299
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
1300
        tz: Optional[str] = None,
1301
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1302
        warnings.warn(
8✔
1303
            f'Job {self.job.index_number}: Differ {self.__kind__} is WORK IN PROGRESS and has KNOWN bugs which '
1304
            "are being worked on. DO NOT USE AS THE RESULTS WON'T BE CORRECT.",
1305
            RuntimeWarning,
1306
        )
1307
        if not isinstance(self.state.old_data, str):
8!
1308
            raise ValueError
×
1309
        if not isinstance(self.state.new_data, str):
8!
1310
            raise ValueError
×
1311

1312
        # Split the texts into words tokenizing newline
1313
        if self.state.is_markdown():
8!
1314
            # Don't split spaces in link text, tokenize space as </s>
1315
            old_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.old_data)
8✔
1316
            words1 = old_data.replace('\n', ' <\\n> ').split(' ')
8✔
1317
            new_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.new_data)
8✔
1318
            words2 = new_data.replace('\n', ' <\\n> ').split(' ')
8✔
1319
        else:
1320
            words1 = self.state.old_data.replace('\n', ' <\\n> ').split(' ')
×
1321
            words2 = self.state.new_data.replace('\n', ' <\\n> ').split(' ')
×
1322

1323
        # Create a Differ object
1324
        import difflib
8✔
1325

1326
        d = difflib.Differ()
8✔
1327

1328
        # Generate a difference list
1329
        diff = list(d.compare(words1, words2))
8✔
1330

1331
        add_html = '<span style="background-color:#d1ffd1;color:#082b08;">'
8✔
1332
        rem_html = '<span style="background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;">'
8✔
1333

1334
        head_text = (
8✔
1335
            # f'Differ: wdiff\n'
1336
            f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m\n'
1337
            f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m\n'
1338
        )
1339
        head_html = '<br>\n'.join(
8✔
1340
            [
1341
                '<span style="font-family:monospace;">'
1342
                # 'Differ: wdiff',
1343
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
1344
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>'
1345
                f'</span>',
1346
                '',
1347
            ]
1348
        )
1349
        # Process the diff output to make it more wdiff-like
1350
        result_text = []
8✔
1351
        result_html = []
8✔
1352
        prev_word_text = ''
8✔
1353
        prev_word_html = ''
8✔
1354
        next_text = ''
8✔
1355
        next_html = ''
8✔
1356
        add = False
8✔
1357
        rem = False
8✔
1358

1359
        for word_text in diff + ['  ']:
8✔
1360
            if word_text[0] == '?':  # additional context line
8✔
1361
                continue
8✔
1362
            word_html = word_text
8✔
1363
            pre_text = [next_text] if next_text else []
8✔
1364
            pre_html = [next_html] if next_html else []
8✔
1365
            next_text = ''
8✔
1366
            next_html = ''
8✔
1367

1368
            if word_text[0] == '+' and not add:  # Beginning of additions
8✔
1369
                if rem:
8✔
1370
                    prev_word_html += '</span>'
8✔
1371
                    rem = False
8✔
1372
                if word_text[2:] == '<\\n>':
8!
1373
                    next_text = '\033[92m'
×
1374
                    next_html = add_html
×
1375
                else:
1376
                    pre_text.append('\033[92m')
8✔
1377
                    pre_html.append(add_html)
8✔
1378
                add = True
8✔
1379
            elif word_text[0] == '-' and not rem:  # Beginning of deletions
8✔
1380
                if add:
8✔
1381
                    prev_word_html += '</span>'
8✔
1382
                    add = False
8✔
1383
                if word_text[2:] == '<\\n>':
8!
1384
                    next_text = '\033[91m'
×
1385
                    next_html = rem_html
×
1386
                else:
1387
                    pre_text.append('\033[91m')
8✔
1388
                    pre_html.append(rem_html)
8✔
1389
                rem = True
8✔
1390
            elif word_text[0] == ' ' and (add or rem):  # Unchanged word
8✔
1391
                if prev_word_text == '<\\n>':
8!
1392
                    prev_word_text = '\033[0m<\\n>'
×
1393
                    prev_word_html = '</span><\\n>'
×
1394
                else:
1395
                    prev_word_text += '\033[0m'
8✔
1396
                    prev_word_html += '</span>'
8✔
1397
                add = False
8✔
1398
                rem = False
8✔
1399
            elif word_text[2:] == '<\\n>':  # New line
8✔
1400
                if add:
8!
1401
                    word_text = '  \033[0m<\\n>'
×
1402
                    word_html = '  </span><\\n>'
×
1403
                    add = False
×
1404
                elif rem:
8!
1405
                    word_text = '  \033[0m<\\n>'
×
1406
                    word_html = '  </span><\\n>'
×
1407
                    rem = False
×
1408

1409
            result_text.append(prev_word_text)
8✔
1410
            result_html.append(prev_word_html)
8✔
1411
            pre_text.append(word_text[2:])
8✔
1412
            pre_html.append(word_html[2:])
8✔
1413
            prev_word_text = ''.join(pre_text)
8✔
1414
            prev_word_html = ''.join(pre_html)
8✔
1415
        if add or rem:
8!
1416
            result_text[-1] += '\033[0m'
×
1417
            result_html[-1] += '</span>'
×
1418

1419
        # rebuild the text from words, replacing the newline token
1420
        diff_text = ' '.join(result_text[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1421
        diff_html = ' '.join(result_html[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1422

1423
        # build contextlines
1424
        contextlines = directives.get('context_lines', self.job.contextlines)
8✔
1425
        # contextlines = 999
1426
        if contextlines is None:
8!
1427
            contextlines = 3
8✔
1428
        range_info = directives.get('range_info', True)
8✔
1429
        if contextlines < len(diff_text.splitlines()):
8!
1430
            lines_with_changes = []
×
1431
            for i, line in enumerate(diff_text.splitlines()):
×
1432
                if '\033[9' in line:
×
1433
                    lines_with_changes.append(i)
×
1434
            if contextlines:
×
1435
                lines_to_keep: set[int] = set()
×
1436
                for i in lines_with_changes:
×
1437
                    lines_to_keep.update(r for r in range(i - contextlines, i + contextlines + 1))
×
1438
            else:
1439
                lines_to_keep = set(lines_with_changes)
×
1440
            new_diff_text = []
×
1441
            new_diff_html = []
×
1442
            last_line = 0
×
1443
            skip = False
×
1444
            i = 0
×
1445
            for i, (line_text, line_html) in enumerate(zip(diff_text.splitlines(), diff_html.splitlines())):
×
1446
                if i in lines_to_keep:
×
1447
                    if range_info and skip:
×
1448
                        new_diff_text.append(f'@@ {last_line + 1}...{i} @@')
×
1449
                        new_diff_html.append(f'@@ {last_line + 1}...{i} @@')
×
1450
                        skip = False
×
1451
                    new_diff_text.append(line_text)
×
1452
                    new_diff_html.append(line_html)
×
1453
                    last_line = i + 1
×
1454
                else:
1455
                    skip = True
×
1456
            if (i + 1) != last_line:
×
1457
                if range_info and skip:
×
1458
                    new_diff_text.append(f'@@ {last_line + 1}...{i + 1} @@')
×
1459
                    new_diff_html.append(f'@@ {last_line + 1}...{i + 1} @@')
×
1460
            diff_text = '\n'.join(new_diff_text)
×
1461
            diff_html = '\n'.join(new_diff_html)
×
1462

1463
        if self.state.is_markdown():
8!
1464
            diff_text = diff_text.replace('</s>', ' ')
8✔
1465
            diff_html = diff_html.replace('</s>', ' ')
8✔
1466
            diff_html = mark_to_html(diff_html, self.job.markdown_padded_tables).replace('<p>', '').replace('</p>', '')
8✔
1467

1468
        if self.job.monospace:
8!
1469
            diff_html = f'<span style="font-family:monospace;white-space:pre-wrap">{diff_html}</span>'
×
1470
        else:
1471
            diff_html = diff_html.replace('\n', '<br>\n')
8✔
1472

1473
        return {
8✔
1474
            'text': head_text + diff_text,
1475
            'markdown': head_text + diff_text,
1476
            'html': head_html + diff_html,
1477
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc