• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 9392392187

05 Jun 2024 11:04PM UTC coverage: 77.899% (+0.08%) from 77.815%
9392392187

push

github

mborsetti
Version 3.24.0rc0

1736 of 2491 branches covered (69.69%)

Branch coverage included in aggregate %.

4397 of 5382 relevant lines covered (81.7%)

6.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.12
/webchanges/differs.py
1
"""Differs."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import base64
8✔
8
import difflib
8✔
9
import html
8✔
10
import logging
8✔
11
import math
8✔
12
import os
8✔
13
import re
8✔
14
import shlex
8✔
15
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
16
import tempfile
8✔
17
import traceback
8✔
18
import urllib.parse
8✔
19
import warnings
8✔
20
from base64 import b64encode
8✔
21
from datetime import datetime
8✔
22
from io import BytesIO
8✔
23
from pathlib import Path
8✔
24
from typing import Any, Iterator, Literal, Optional, TYPE_CHECKING
8✔
25
from zoneinfo import ZoneInfo
8✔
26

27
import html2text
8✔
28

29
from webchanges.util import linkify, mark_to_html, TrackSubClasses
8✔
30

31
try:
8✔
32
    from deepdiff import DeepDiff
8✔
33
    from deepdiff.model import DiffLevel
8✔
34
except ImportError as e:  # pragma: no cover
35
    DeepDiff = e.msg  # type: ignore[no-redef]
36

37
try:
8✔
38
    import httpx
8✔
39
except ImportError:  # pragma: no cover
40
    httpx = None  # type: ignore[assignment]
41
if httpx is not None:
8!
42
    try:
8✔
43
        import h2
8✔
44
    except ImportError:  # pragma: no cover
45
        h2 = None  # type: ignore[assignment]
46

47
try:
8✔
48
    import numpy as np
8✔
49
except ImportError as e:  # pragma: no cover
50
    np = e.msg  # type: ignore[assignment]
51

52
try:
8✔
53
    from PIL import Image, ImageChops, ImageEnhance, ImageStat
8✔
54
except ImportError as e:  # pragma: no cover
55
    Image = e.msg  # type: ignore[no-redef]
56

57
# https://stackoverflow.com/questions/712791
58
try:
8✔
59
    import simplejson as jsonlib
8✔
60
except ImportError:  # pragma: no cover
61
    import json as jsonlib  # type: ignore[no-redef]
62

63
try:
8✔
64
    import xmltodict
8✔
65
except ImportError as e:  # pragma: no cover
66
    xmltodict = e.msg  # type: ignore[no-redef]
67

68
# https://stackoverflow.com/questions/39740632
69
if TYPE_CHECKING:
70
    from webchanges.handler import JobState
71

72

73
logger = logging.getLogger(__name__)
8✔
74

75

76
class DifferBase(metaclass=TrackSubClasses):
8✔
77
    """The base class for differs."""
78

79
    __subclasses__: dict[str, type[DifferBase]] = {}
8✔
80
    __anonymous_subclasses__: list[type[DifferBase]] = []
8✔
81

82
    __kind__: str = ''
8✔
83

84
    __supported_directives__: dict[str, str] = {}  # this must be present, even if empty
8✔
85

86
    css_added_style = 'background-color:#d1ffd1;color:#082b08;'
8✔
87
    css_deltd_style = 'background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;'
8✔
88

89
    def __init__(self, state: JobState) -> None:
8✔
90
        """
91

92
        :param state: the JobState.
93
        """
94
        self.job = state.job
8✔
95
        self.state = state
8✔
96

97
    @classmethod
8✔
98
    def differ_documentation(cls) -> str:
8✔
99
        """Generates simple differ documentation for use in the --features command line argument.
100

101
        :returns: A string to display.
102
        """
103
        result: list[str] = []
8✔
104
        for sc in TrackSubClasses.sorted_by_kind(cls):
8✔
105
            # default_subdirective = getattr(sc, '__default_subdirective__', None)
106
            result.extend((f'  * {sc.__kind__} - {sc.__doc__}',))
8✔
107
            if hasattr(sc, '__supported_directives__'):
8!
108
                for key, doc in sc.__supported_directives__.items():
8✔
109
                    result.append(f'      {key} ... {doc}')
8✔
110
        result.append('\n[] ... Parameter can be supplied as unnamed value\n')
8✔
111
        return '\n'.join(result)
8✔
112

113
    @classmethod
8✔
114
    def normalize_differ(
8✔
115
        cls,
116
        differ_spec: Optional[dict[str, Any]],
117
        job_index_number: Optional[int] = None,
118
    ) -> tuple[str, dict[str, Any]]:
119
        """Checks the differ_spec for its validity and applies default values.
120

121
        :param differ_spec: The differ as entered by the user; use "unified" if empty.
122
        :param job_index_number: The job index number.
123
        :returns: A validated differ_kind, subdirectives (where subdirectives is a dict).
124
        """
125
        differ_spec = differ_spec or {'name': 'unified'}
8✔
126
        subdirectives = differ_spec.copy()
8✔
127
        differ_kind = subdirectives.pop('name', '')
8✔
128
        if not differ_kind:
8✔
129
            if list(subdirectives.keys()) == ['command']:
8!
130
                differ_kind = 'command'
8✔
131
            else:
132
                raise ValueError(
×
133
                    f"Job {job_index_number}: Differ directive must have a 'name' sub-directive: {differ_spec}."
134
                )
135

136
        differcls = cls.__subclasses__.get(differ_kind, None)
8✔
137
        if not differcls:
8✔
138
            raise ValueError(f'Job {job_index_number}: No differ named {differ_kind}.')
8✔
139

140
        if hasattr(differcls, '__supported_directives__'):
8!
141
            provided_keys = set(subdirectives.keys())
8✔
142
            allowed_keys = set(differcls.__supported_directives__.keys())
8✔
143
            unknown_keys = provided_keys.difference(allowed_keys)
8✔
144
            if unknown_keys and '<any>' not in allowed_keys:
8✔
145
                raise ValueError(
8✔
146
                    f'Job {job_index_number}: Differ {differ_kind} does not support sub-directive(s) '
147
                    f"{', '.join(unknown_keys)} (supported: {', '.join(sorted(allowed_keys))})."
148
                )
149

150
        return differ_kind, subdirectives
8✔
151

152
    @classmethod
8✔
153
    def process(
8✔
154
        cls,
155
        differ_kind: str,
156
        directives: dict[str, Any],
157
        job_state: JobState,
158
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
159
        tz: Optional[str] = None,
160
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
161
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
162
        """Process the differ.
163

164
        :param differ_kind: The name of the differ.
165
        :param directives: The directives.
166
        :param job_state: The JobState.
167
        :param report_kind: The report kind required.
168
        :param tz: The timezone of the report.
169
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
170
           for a different report_kind.
171
        :returns: The output of the differ or a an error message with traceback if it fails.
172
        """
173
        logger.info(f'Job {job_state.job.index_number}: Applying differ {differ_kind}, directives {directives}')
8✔
174
        differcls: Optional[type[DifferBase]] = cls.__subclasses__.get(differ_kind)  # type: ignore[assignment]
8✔
175
        if differcls:
8✔
176
            try:
8✔
177
                return differcls(job_state).differ(directives, report_kind, _unfiltered_diff, tz)
8✔
178
            except Exception as e:
8✔
179
                # Differ failed
180
                logger.info(
8✔
181
                    f'Job {job_state.job.index_number}: Differ {differ_kind} with {directives=} encountered '
182
                    f'error {e}'
183
                )
184
                # Undo saving of new data since user won't see the diff
185
                job_state.delete_latest()
8✔
186

187
                job_state.exception = e
8✔
188
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
189
                directives_text = ', '.join(f'{key}={value}' for key, value in directives.items()) or 'None'
8✔
190
                return {
8✔
191
                    'text': (
192
                        f'Differ {differ_kind} with directive(s) {directives_text} encountered an '
193
                        f'error:\n\n{job_state.traceback.strip()}'
194
                    ),
195
                    'markdown': (
196
                        f'## Differ {differ_kind} with directive(s) {directives_text} '
197
                        f'encountered an error:\n```\n{job_state.traceback.strip()}\n```\n'
198
                    ),
199
                    'html': (
200
                        f'<span style="color:red;font-weight:bold">Differ {differ_kind} with directive(s) '
201
                        f'{directives_text} encountered an error:<br>\n<br>\n'
202
                        f'<span style="font-family:monospace;white-space:pre-wrap;">{job_state.traceback.strip()}'
203
                        f'</span></span>'
204
                    ),
205
                }
206
        else:
207
            return {}
8✔
208

209
    def differ(
8✔
210
        self,
211
        directives: dict[str, Any],
212
        report_kind: Literal['text', 'markdown', 'html'],
213
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
214
        tz: Optional[str] = None,
215
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
216
        """Create a diff from the data. Since this function could be called by different reporters of multiple report
217
        types ('text', 'markdown', 'html'), the differ outputs a dict with data for the report_kind it generated so
218
        that it can be reused.
219

220
        :param directives: The directives.
221
        :param report_kind: The report_kind for which a diff must be generated (at a minimum).
222
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
223
           for a different report_kind.
224
        :param tz: The timezone of the report.
225
        :returns: An empty dict if there is no change, otherwise a dict with report_kind as key and diff as value
226
           (as a minimum for the report_kind requested).
227
        :raises RuntimeError: If the external diff tool returns an error.
228
        """
229
        raise NotImplementedError()
230

231
    @staticmethod
8✔
232
    def make_timestamp(
8✔
233
        timestamp: float,
234
        tz: Optional[str] = None,
235
    ) -> str:
236
        """Creates a datetime string in RFC 5322 (email) format with the time zone name (if available) in the
237
        Comments and Folding White Space (CFWS) section.
238

239
        :param timestamp: The timestamp.
240
        :param tz: The IANA timezone of the report.
241
        :returns: A datetime string in RFC 5322 (email) format.
242
        """
243
        if timestamp:
8✔
244
            if tz:
8✔
245
                tz_info: Optional[ZoneInfo] = ZoneInfo(tz)
8✔
246
            else:
247
                tz_info = None
8✔
248
            dt = datetime.fromtimestamp(timestamp).astimezone(tz=tz_info)
8✔
249
            # add timezone name if known
250
            if dt.strftime('%Z') != dt.strftime('%z')[:3]:
8✔
251
                cfws = f" ({dt.strftime('%Z')})"
8✔
252
            else:
253
                cfws = ''
8✔
254
            return dt.strftime('%a, %d %b %Y %H:%M:%S %z') + cfws
8✔
255
        else:
256
            return 'NEW'
8✔
257

258
    @staticmethod
8✔
259
    def html2text(data: str) -> str:
8✔
260
        """Converts html to text.
261

262
        :param data: the string in html format.
263
        :returns: the string in text format.
264
        """
265
        parser = html2text.HTML2Text()
8✔
266
        parser.unicode_snob = True
8✔
267
        parser.body_width = 0
8✔
268
        parser.ignore_images = True
8✔
269
        parser.single_line_break = True
8✔
270
        parser.wrap_links = False
8✔
271
        return '\n'.join(line.rstrip() for line in parser.handle(data).splitlines())
8✔
272

273
    def raise_import_error(self, package_name: str, error_message: str) -> None:
8✔
274
        """Raise ImportError for missing package.
275

276
        :param package_name: The name of the module/package that could not be imported.
277
        :param error_message: The error message from ImportError.
278

279
        :raises: ImportError.
280
        """
281
        raise ImportError(
8✔
282
            f"Job {self.job.index_number}: Python package '{package_name}' is not installed; cannot use "
283
            f"'differ: {self.__kind__}' ({self.job.get_location()})\n{error_message}"
284
        )
285

286

287
class UnifiedDiffer(DifferBase):
8✔
288
    """(Default) Generates a unified diff."""
289

290
    __kind__ = 'unified'
8✔
291

292
    __supported_directives__ = {
8✔
293
        'context_lines': 'the number of context lines (default: 3)',
294
        'range_info': 'include range information lines (default: true)',
295
    }
296

297
    def unified_diff_to_html(self, diff: str) -> Iterator[str]:
8✔
298
        """
299
        Generates a colorized HTML table from unified diff, applying styles and processing based on job values.
300

301
        :param diff: the unified diff
302
        """
303

304
        def process_line(line: str, line_num: int, monospace_style: str) -> str:
8✔
305
            """
306
            Processes each line for HTML output, handling special cases and styles.
307

308
            :param line: The line to analyze.
309
            :param line_num: The line number in the document.
310
            :param monospace_style: Additional style string for monospace text.
311

312
            :returns: The line processed into an HTML table row string.
313
            """
314
            # The style= string (or empty string) to add to an HTML tag.
315
            if line_num == 0:
8✔
316
                style = 'font-family:monospace;color:darkred;'
8✔
317
            elif line_num == 1:
8✔
318
                style = 'font-family:monospace;color:darkgreen;'
8✔
319
            elif line[0] == '+':  # addition
8✔
320
                style = f'{monospace_style}{self.css_added_style}'
8✔
321
            elif line[0] == '-':  # deletion
8✔
322
                style = f'{monospace_style}{self.css_deltd_style}'
8✔
323
            elif line[0] == ' ':  # context line
8✔
324
                style = monospace_style
8✔
325
            elif line[0] == '@':  # range information
8✔
326
                style = 'font-family:monospace;background-color:#fbfbfb;'
8✔
327
            elif line[0] == '/':  # informational header added by additions_only or deletions_only filters
8!
328
                style = 'background-color:lightyellow;'
8✔
329
            else:
330
                raise RuntimeError('Unified Diff does not comform to standard!')
×
331
            style = f' style="{style}"' if style else ''
8✔
332

333
            if line_num > 1 and line[0] != '@':  # don't apply to headers or range information
8✔
334
                if self.job.is_markdown or line[0] == '/':  # our informational header
8✔
335
                    line = mark_to_html(line[1:], self.job.markdown_padded_tables)
8✔
336
                else:
337
                    line = linkify(line[1:])
8✔
338
            return f'<tr><td{style}>{line}</td></tr>'
8✔
339

340
        table_style = (
8✔
341
            ' style="border-collapse:collapse;font-family:monospace;white-space:pre-wrap;"'
342
            if self.job.monospace
343
            else ' style="border-collapse:collapse;"'
344
        )
345
        yield f'<table{table_style}>'
8✔
346
        monospace_style = 'font-family:monospace;' if self.job.monospace else ''
8✔
347
        for i, line in enumerate(diff.splitlines()):
8✔
348
            yield process_line(line, i, monospace_style)
8✔
349
        yield '</table>'
8✔
350

351
    def differ(
8✔
352
        self,
353
        directives: dict[str, Any],
354
        report_kind: Literal['text', 'markdown', 'html'],
355
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
356
        tz: Optional[str] = None,
357
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
358
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
359
        if report_kind == 'html' and _unfiltered_diff is not None and 'text' in _unfiltered_diff:
8✔
360
            diff_text = _unfiltered_diff['text']
8✔
361
        else:
362
            empty_return: dict[Literal['text', 'markdown', 'html'], str] = {'text': '', 'markdown': '', 'html': ''}
8✔
363
            contextlines = directives.get('context_lines', self.job.contextlines)
8✔
364
            if contextlines is None:
8✔
365
                if self.job.additions_only or self.job.deletions_only:
8✔
366
                    contextlines = 0
8✔
367
                else:
368
                    contextlines = 3
8✔
369
            diff = list(
8✔
370
                difflib.unified_diff(
371
                    str(self.state.old_data).splitlines(),
372
                    str(self.state.new_data).splitlines(),
373
                    '@',
374
                    '@',
375
                    self.make_timestamp(self.state.old_timestamp, tz),
376
                    self.make_timestamp(self.state.new_timestamp, tz),
377
                    contextlines,
378
                    lineterm='',
379
                )
380
            )
381
            if not diff:
8✔
382
                self.state.verb = 'changed,no_report'
8✔
383
                return empty_return
8✔
384
            # replace tabs in header lines
385
            diff[0] = diff[0].replace('\t', ' ')
8✔
386
            diff[1] = diff[1].replace('\t', ' ')
8✔
387

388
            if self.job.additions_only:
8✔
389
                if len(self.state.old_data) and len(self.state.new_data) / len(self.state.old_data) <= 0.25:
8✔
390
                    diff = (
8✔
391
                        diff[:2]
392
                        + ['/**Comparison type: Additions only**']
393
                        + ['/**Deletions are being shown as 75% or more of the content has been deleted**']
394
                        + diff[2:]
395
                    )
396
                else:
397
                    head = '---' + diff[0][3:]
8✔
398
                    diff = [line for line in diff if line.startswith('+') or line.startswith('@')]
8✔
399
                    diff = [
8✔
400
                        line1
401
                        for line1, line2 in zip([''] + diff, diff + [''])
402
                        if not (line1.startswith('@') and line2.startswith('@'))
403
                    ][1:]
404
                    diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
405
                    if len(diff) == 1 or len([line for line in diff if line.lstrip('+').rstrip()]) == 2:
8✔
406
                        self.state.verb = 'changed,no_report'
8✔
407
                        return empty_return
8✔
408
                    diff = [head, diff[0], '/**Comparison type: Additions only**'] + diff[1:]
8✔
409
            elif self.job.deletions_only:
8✔
410
                head = '--- @' + diff[1][3:]
8✔
411
                diff = [line for line in diff if line.startswith('-') or line.startswith('@')]
8✔
412
                diff = [
8✔
413
                    line1
414
                    for line1, line2 in zip([''] + diff, diff + [''])
415
                    if not (line1.startswith('@') and line2.startswith('@'))
416
                ][1:]
417
                diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
418
                if len(diff) == 1 or len([line for line in diff if line.lstrip('-').rstrip()]) == 2:
8✔
419
                    self.state.verb = 'changed,no_report'
8✔
420
                    return empty_return
8✔
421
                diff = [diff[0], head, '/**Comparison type: Deletions only**'] + diff[1:]
8✔
422

423
            # remove range info lines if needed
424
            if directives.get('range_info') is False or (
8✔
425
                directives.get('range_info') is None
426
                and self.job.additions_only
427
                and (len(diff) < 4 or diff[3][0] != '/')
428
            ):
429
                diff = [line for line in diff if not line.startswith('@@ ')]
8✔
430

431
            diff_text = '\n'.join(diff)
8✔
432

433
            out_diff.update(
8✔
434
                {
435
                    'text': diff_text,
436
                    'markdown': diff_text,
437
                }
438
            )
439

440
        if report_kind == 'html':
8✔
441
            out_diff['html'] = '\n'.join(self.unified_diff_to_html(diff_text))
8✔
442

443
        return out_diff
8✔
444

445

446
class TableDiffer(DifferBase):
8✔
447
    """Generates a Python HTML table diff."""
448

449
    __kind__ = 'table'
8✔
450

451
    __supported_directives__ = {
8✔
452
        'tabsize': 'tab stop spacing (default: 8)',
453
    }
454

455
    def differ(
8✔
456
        self,
457
        directives: dict[str, Any],
458
        report_kind: Literal['text', 'markdown', 'html'],
459
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
460
        tz: Optional[str] = None,
461
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
462
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
463
        if report_kind in {'text', 'markdown'} and _unfiltered_diff is not None and 'html' in _unfiltered_diff:
8✔
464
            table = _unfiltered_diff['html']
8✔
465
        else:
466
            tabsize = int(directives.get('tabsize', 8))
8✔
467
            html_diff = difflib.HtmlDiff(tabsize=tabsize)
8✔
468
            table = html_diff.make_table(
8✔
469
                str(self.state.old_data).splitlines(keepends=True),
470
                str(self.state.new_data).splitlines(keepends=True),
471
                self.make_timestamp(self.state.old_timestamp, tz),
472
                self.make_timestamp(self.state.new_timestamp, tz),
473
                True,
474
                3,
475
            )
476
            # fix table formatting
477
            table = table.replace('<th ', '<th style="font-family:monospace" ')
8✔
478
            table = table.replace('<td ', '<td style="font-family:monospace" ')
8✔
479
            table = table.replace(' nowrap="nowrap"', '')
8✔
480
            table = table.replace('<a ', '<a style="font-family:monospace;color:inherit" ')
8✔
481
            table = table.replace('<span class="diff_add"', '<span style="color:green;background-color:lightgreen"')
8✔
482
            table = table.replace('<span class="diff_sub"', '<span style="color:red;background-color:lightred"')
8✔
483
            table = table.replace('<span class="diff_chg"', '<span style="color:orange;background-color:lightyellow"')
8✔
484
            out_diff['html'] = table
8✔
485

486
        if report_kind in {'text', 'markdown'}:
8✔
487
            diff_text = self.html2text(table)
8✔
488
            out_diff.update(
8✔
489
                {
490
                    'text': diff_text,
491
                    'markdown': diff_text,
492
                }
493
            )
494

495
        return out_diff
8✔
496

497

498
class CommandDiffer(DifferBase):
8✔
499
    """Runs an external command to generate the diff."""
500

501
    __kind__ = 'command'
8✔
502

503
    __supported_directives__ = {
8✔
504
        'command': 'The command to execute',
505
    }
506

507
    re_ptags = re.compile(r'^<p>|</p>$')
8✔
508
    re_htags = re.compile(r'<(/?)h\d>')
8✔
509
    re_tagend = re.compile(r'<(?!.*<).*>+$')
8✔
510

511
    def differ(
8✔
512
        self,
513
        directives: dict[str, Any],
514
        report_kind: Literal['text', 'markdown', 'html'],
515
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
516
        tz: Optional[str] = None,
517
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
518
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
519
        command = directives['command']
8✔
520
        if (
8✔
521
            report_kind == 'html'
522
            and not command.startswith('wdiff')
523
            and _unfiltered_diff is not None
524
            and 'text' in _unfiltered_diff
525
        ):
526
            diff = _unfiltered_diff['text']
8✔
527
        else:
528
            old_data = self.state.old_data
8✔
529
            new_data = self.state.new_data
8✔
530
            if self.job.is_markdown:
8✔
531
                # protect the link anchor from being split (won't work)
532
                markdown_links_re = re.compile(r'\[(.*?)][(](.*?)[)]')
8✔
533
                old_data = markdown_links_re.sub(
8!
534
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(old_data)
535
                )
536
                new_data = markdown_links_re.sub(
8!
537
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(new_data)
538
                )
539

540
            # External diff tool
541
            with tempfile.TemporaryDirectory() as tmp_dir:
8✔
542
                tmp_path = Path(tmp_dir)
8✔
543
                old_file_path = tmp_path.joinpath('old_file')
8✔
544
                new_file_path = tmp_path.joinpath('new_file')
8✔
545
                if isinstance(old_data, str):
8!
546
                    old_file_path.write_text(old_data)
8✔
547
                else:
548
                    old_file_path.write_bytes(old_data)
×
549
                if isinstance(new_data, str):
8!
550
                    new_file_path.write_text(new_data)
8✔
551
                else:
552
                    new_file_path.write_bytes(new_data)
×
553
                cmdline = shlex.split(command) + [str(old_file_path), str(new_file_path)]
8✔
554
                proc = subprocess.run(cmdline, capture_output=True, text=True)  # noqa: S603 subprocess call
8✔
555
            if proc.stderr or proc.returncode > 1:
8✔
556
                raise RuntimeError(
8✔
557
                    f"Job {self.job.index_number}: External differ '{directives}' returned '{proc.stderr.strip()}' "
558
                    f'({self.job.get_location()})'
559
                ) from subprocess.CalledProcessError(proc.returncode, cmdline)
560
            if proc.returncode == 0:
8✔
561
                self.state.verb = 'changed,no_report'
8✔
562
                return {'text': '', 'markdown': '', 'html': ''}
8✔
563

564
            old_timestamp = self.make_timestamp(self.state.old_timestamp, tz)
8✔
565
            sep = '—' * (len(old_timestamp) + 6)
8✔
566
            new_timestamp = self.make_timestamp(self.state.new_timestamp, tz)
8✔
567
            head = '\n'.join(
8✔
568
                [
569
                    f'Using differ "{directives}"',
570
                    f'--- @ {old_timestamp}',
571
                    f'+++ @ {new_timestamp}',
572
                    f'{sep}',
573
                ]
574
            )
575
            diff = proc.stdout
8✔
576
            if self.job.is_markdown:
8!
577
                # undo the protection of the link anchor from being split
578
                diff = markdown_links_re.sub(lambda x: f'[{urllib.parse.unquote(x.group(1))}]({x.group(2)})', diff)
8!
579
            if command.startswith('wdiff') and self.job.contextlines == 0:
8!
580
                # remove lines that don't have any changes
581
                keeplines = []
×
582
                for line in diff.splitlines(keepends=True):
×
583
                    if any(x in line for x in {'{+', '+}', '[-', '-]'}):
×
584
                        keeplines.append(line)
×
585
                diff = ''.join(keeplines)
×
586
            diff = head + diff
8✔
587
            out_diff.update(
8✔
588
                {
589
                    'text': diff,
590
                    'markdown': diff,
591
                }
592
            )
593

594
        if report_kind == 'html':
8✔
595
            if command.startswith('wdiff'):
8!
596
                # colorize output of wdiff
597
                out_diff['html'] = self.wdiff_to_html(diff)
×
598
            else:
599
                out_diff['html'] = html.escape(diff)
8✔
600

601
        return out_diff
8✔
602

603
    def wdiff_to_html(self, diff: str) -> str:
8✔
604
        """
605
        Colorize output of wdiff.
606

607
        :param diff: The output of the wdiff command.
608
        :returns: The colorized HTML output.
609
        """
610
        html_diff = html.escape(diff)
8✔
611
        if self.job.is_markdown:
8✔
612
            # detect and fix multiline additions or deletions
613
            is_add = False
8✔
614
            is_del = False
8✔
615
            new_diff = []
8✔
616
            for line in html_diff.splitlines():
8✔
617
                if is_add:
8✔
618
                    line = '{+' + line
8✔
619
                    is_add = False
8✔
620
                elif is_del:
8✔
621
                    line = '[-' + line
8✔
622
                    is_del = False
8✔
623
                for match in re.findall(r'\[-|-]|{\+|\+}', line):
8✔
624
                    if match == '[-':
8✔
625
                        is_del = True
8✔
626
                    if match == '-]':
8✔
627
                        is_del = False
8✔
628
                    if match == '{+':
8✔
629
                        is_add = True
8✔
630
                    if match == '+}':
8✔
631
                        is_add = False
8✔
632
                if is_add:
8✔
633
                    line += '+}'
8✔
634
                elif is_del:
8✔
635
                    line += '-]'
8✔
636
                new_diff.append(line)
8✔
637
            html_diff = '<br>\n'.join(new_diff)
8✔
638

639
        # wdiff colorization (cannot be done with global CSS class as Gmail overrides it)
640
        html_diff = re.sub(
8✔
641
            r'\{\+(.*?)\+}',
642
            lambda x: f'<span style="{self.css_added_style}">{x.group(1)}</span>',
643
            html_diff,
644
            flags=re.DOTALL,
645
        )
646
        html_diff = re.sub(
8✔
647
            r'\[-(.*?)-]',
648
            lambda x: f'<span style="{self.css_deltd_style}">{x.group(1)}</span>',
649
            html_diff,
650
            flags=re.DOTALL,
651
        )
652
        if self.job.monospace:
8✔
653
            return f'<span style="font-family:monospace;white-space:pre-wrap">{html_diff}</span>'
8✔
654
        else:
655
            return html_diff
8✔
656

657

658
class DeepdiffDiffer(DifferBase):
8✔
659

660
    __kind__ = 'deepdiff'
8✔
661

662
    __supported_directives__ = {
8✔
663
        'data_type': "either 'json' (default) or 'xml'",
664
        'ignore_order': 'Whether to ignore the order in which the items have appeared (default: false)',
665
        'ignore_string_case': 'Whether to be case-sensitive or not when comparing strings (default: false)',
666
        'significant_digits': (
667
            'The number of digits AFTER the decimal point to be used in the comparison (default: ' 'no limit)'
668
        ),
669
    }
670

671
    def differ(
8✔
672
        self,
673
        directives: dict[str, Any],
674
        report_kind: Literal['text', 'markdown', 'html'],
675
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
676
        tz: Optional[str] = None,
677
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
678
        if isinstance(DeepDiff, str):  # pragma: no cover
679
            self.raise_import_error('deepdiff', DeepDiff)
680

681
        span_added = f'<span style="{self.css_added_style}">'
8✔
682
        span_deltd = f'<span style="{self.css_deltd_style}">'
8✔
683

684
        def _pretty_deepdiff(ddiff: DeepDiff, report_kind: Literal['text', 'markdown', 'html']) -> str:
8✔
685
            """
686
            Customized version of deepdiff.serialization.SerializationMixin.pretty method, edited to include the
687
            values deleted or added and an option for colorized HTML output. The pretty human-readable string
688
            output for the diff object regardless of what view was used to generate the diff.
689
            """
690
            if report_kind == 'html':
8✔
691
                PRETTY_FORM_TEXTS = {
8✔
692
                    'type_changes': (
693
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
694
                        f'from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}</span>.'
695
                    ),
696
                    'values_changed': (
697
                        f'Value of {{diff_path}} changed from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}'
698
                        '</span>.'
699
                    ),
700
                    'dictionary_item_added': (
701
                        f'Item {{diff_path}} added to dictionary as {span_added}{{val_t2}}</span>.'
702
                    ),
703
                    'dictionary_item_removed': (
704
                        f'Item {{diff_path}} removed from dictionary (was {span_deltd}{{val_t1}}</span>).'
705
                    ),
706
                    'iterable_item_added': f'Item {{diff_path}} added to iterable as {span_added}{{val_t2}}</span>.',
707
                    'iterable_item_removed': (
708
                        f'Item {{diff_path}} removed from iterable (was {span_deltd}{{val_t1}}</span>).'
709
                    ),
710
                    'attribute_added': f'Attribute {{diff_path}} added as {span_added}{{val_t2}}</span>.',
711
                    'attribute_removed': f'Attribute {{diff_path}} removed (was {span_deltd}{{val_t1}}</span>).',
712
                    'set_item_added': f'Item root[{{val_t2}}] added to set as {span_added}{{val_t1}}</span>.',
713
                    'set_item_removed': (
714
                        f'Item root[{{val_t1}}] removed from set (was {span_deltd}{{val_t2}}</span>).'
715
                    ),
716
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
717
                }
718
            else:
719
                PRETTY_FORM_TEXTS = {
8✔
720
                    'type_changes': (
721
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
722
                        'from {val_t1} to {val_t2}.'
723
                    ),
724
                    'values_changed': 'Value of {diff_path} changed from {val_t1} to {val_t2}.',
725
                    'dictionary_item_added': 'Item {diff_path} added to dictionary as {val_t2}.',
726
                    'dictionary_item_removed': 'Item {diff_path} removed from dictionary (was {val_t1}).',
727
                    'iterable_item_added': 'Item {diff_path} added to iterable as {val_t2}.',
728
                    'iterable_item_removed': 'Item {diff_path} removed from iterable (was {val_t1}).',
729
                    'attribute_added': 'Attribute {diff_path} added as {val_t2}.',
730
                    'attribute_removed': 'Attribute {diff_path} removed (was {val_t1}).',
731
                    'set_item_added': 'Item root[{val_t2}] added to set as {val_t1}.',
732
                    'set_item_removed': 'Item root[{val_t1}] removed from set (was {val_t2}).',
733
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
734
                }
735

736
            def _pretty_print_diff(ddiff: DiffLevel) -> str:
8✔
737
                """
738
                Customized version of deepdiff.serialization.pretty_print_diff() function, edited to include the
739
                values deleted or added.
740
                """
741
                type_t1 = type(ddiff.t1).__name__
8✔
742
                type_t2 = type(ddiff.t2).__name__
8✔
743

744
                val_t1 = (
8✔
745
                    f'"{ddiff.t1}"'
746
                    if type_t1 in {'str', 'int', 'float'}
747
                    else (jsonlib.dumps(ddiff.t1, ensure_ascii=False, indent=2) if type_t1 == 'dict' else str(ddiff.t1))
748
                )
749
                val_t2 = (
8✔
750
                    f'"{ddiff.t2}"'
751
                    if type_t2 in {'str', 'int', 'float'}
752
                    else (jsonlib.dumps(ddiff.t2, ensure_ascii=False, indent=2) if type_t2 == 'dict' else str(ddiff.t2))
753
                )
754

755
                diff_path = ddiff.path(root='')
8✔
756
                return '• ' + PRETTY_FORM_TEXTS.get(ddiff.report_type, '').format(
8✔
757
                    diff_path=diff_path,
758
                    type_t1=type_t1,
759
                    type_t2=type_t2,
760
                    val_t1=val_t1,
761
                    val_t2=val_t2,
762
                )
763

764
            result = []
8✔
765
            for key in ddiff.tree.keys():
8✔
766
                for item_key in ddiff.tree[key]:
8✔
767
                    result.append(_pretty_print_diff(item_key))
8✔
768

769
            return '\n'.join(result)
8✔
770

771
        data_type = directives.get('data_type', 'json')
8✔
772
        old_data = ''
8✔
773
        new_data = ''
8✔
774
        if data_type == 'json':
8✔
775
            try:
8✔
776
                old_data = jsonlib.loads(self.state.old_data)
8✔
777
            except jsonlib.JSONDecodeError:
8✔
778
                old_data = ''
8✔
779
            try:
8✔
780
                new_data = jsonlib.loads(self.state.new_data)
8✔
781
            except jsonlib.JSONDecodeError as e:
8✔
782
                self.state.exception = e
8✔
783
                self.state.traceback = self.job.format_error(e, traceback.format_exc())
8✔
784
                logger.error(f'{self.job.index_number}: Invalid JSON data: {e.msg} ({self.job.get_location()})')
8✔
785
                return {
8✔
786
                    'text': f'Differ {self.__kind__} ERROR: New data is invalid JSON\n{e.msg}',
787
                    'markdown': f'Differ {self.__kind__} **ERROR: New data is invalid JSON**\n{e.msg}',
788
                    'html': f'Differ {self.__kind__} <b>ERROR: New data is invalid JSON</b>\n{e.msg}',
789
                }
790
        elif data_type == 'xml':
8✔
791
            if isinstance(xmltodict, str):  # pragma: no cover
792
                self.raise_import_error('xmltodict', xmltodict)
793

794
            old_data = xmltodict.parse(self.state.old_data)
8✔
795
            new_data = xmltodict.parse(self.state.new_data)
8✔
796

797
        ignore_order = directives.get('ignore_order')
8✔
798
        ignore_string_case = directives.get('ignore_string_case')
8✔
799
        significant_digits = directives.get('significant_digits')
8✔
800
        ddiff = DeepDiff(
8✔
801
            old_data,
802
            new_data,
803
            cache_size=500,
804
            cache_purge_level=0,
805
            cache_tuning_sample_size=500,
806
            ignore_order=ignore_order,
807
            ignore_string_type_changes=True,
808
            ignore_numeric_type_changes=True,
809
            ignore_string_case=ignore_string_case,
810
            significant_digits=significant_digits,
811
            verbose_level=min(2, max(0, math.ceil(3 - logger.getEffectiveLevel() / 10))),
812
        )
813
        diff_text = _pretty_deepdiff(ddiff, report_kind)
8✔
814
        if not diff_text:
8✔
815
            self.state.verb = 'changed,no_report'
8✔
816
            return {'text': '', 'markdown': '', 'html': ''}
8✔
817

818
        self.job.set_to_monospace()
8✔
819

820
        old_timestamp = self.make_timestamp(self.state.old_timestamp, tz)
8✔
821
        new_timestamp = self.make_timestamp(self.state.new_timestamp, tz)
8✔
822
        sep = '—' * (len(old_timestamp) + 6)
8✔
823
        if report_kind == 'html':
8✔
824
            html_diff = (
8✔
825
                f'<span style="font-family:monospace;white-space:pre-wrap;">\n'
826
                f'Differ: {self.__kind__} for {data_type}\n'
827
                f'<span style="color:darkred;">--- @ {old_timestamp}</span>\n'
828
                f'<span style="color:darkgreen;">+++ @ {new_timestamp}</span>\n'
829
                + sep
830
                + '\n'
831
                + diff_text[:-1]
832
                + '</span>'
833
            )
834
            return {'html': html_diff}
8✔
835
        else:
836
            text_diff = (
8✔
837
                f'Differ: {self.__kind__} for {data_type}\n'
838
                f'--- @ {old_timestamp}\n'
839
                f'+++ @ {new_timestamp}\n' + sep + '\n' + diff_text
840
            )
841
            return {'text': text_diff, 'markdown': text_diff}
8✔
842

843

844
class ImageDiffer(DifferBase):
8✔
845
    """Compares two images providing an image outlining areas that have changed."""
846

847
    __kind__ = 'image'
8✔
848

849
    __supported_directives__ = {
8✔
850
        'data_type': (
851
            "'url' (to retrieve an image), 'ascii85' (Ascii85 data), 'base64' (Base64 data) or 'filename' (the path "
852
            "to an image file) (default: 'url')"
853
        ),
854
        'mse_threshold': (
855
            'the minimum mean squared error (MSE) between two images to consider them changed if numpy in installed '
856
            '(default: 2.5)'
857
        ),
858
    }
859

860
    def differ(
8✔
861
        self,
862
        directives: dict[str, Any],
863
        report_kind: Literal['text', 'markdown', 'html'],
864
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
865
        tz: Optional[str] = None,
866
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
867
        warnings.warn(
2✔
868
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
869
            f'change in the future. Please report any problems or suggestions at '
870
            f'https://github.com/mborsetti/webchanges/discussions.',
871
            RuntimeWarning,
872
        )
873
        if isinstance(Image, str):  # pragma: no cover
874
            self.raise_import_error('pillow', Image)
875
        if isinstance(httpx, str):  # pragma: no cover
876
            self.raise_import_error('httpx', httpx)
877

878
        def load_image_from_web(url: str) -> Image:
2✔
879
            """Fetches the image from an url."""
880
            logging.debug(f'Retrieving image from {url}')
2✔
881
            with httpx.stream('GET', url, timeout=10) as response:
2✔
882
                response.raise_for_status()
2✔
883
                return Image.open(BytesIO(b''.join(response.iter_bytes())))
2✔
884

885
        def load_image_from_file(filename: str) -> Image:
2✔
886
            """Load an image from a file."""
887
            logging.debug(f'Reading image from {filename}')
2✔
888
            return Image.open(filename)
2✔
889

890
        def load_image_from_base64(base_64: str) -> Image:
2✔
891
            """Load an image from an encoded bytes object."""
892
            logging.debug('Retrieving image from a base64 string')
2✔
893
            return Image.open(BytesIO(base64.b64decode(base_64)))
2✔
894

895
        def load_image_from_ascii85(ascii85: str) -> Image:
2✔
896
            """Load an image from an encoded bytes object."""
897
            logging.debug('Retrieving image from an ascii85 string')
2✔
898
            return Image.open(BytesIO(base64.a85decode(ascii85)))
2✔
899

900
        def compute_diff_image(img1: Image, img2: Image) -> tuple[Image, Optional[np.float64]]:
2✔
901
            """Compute the difference between two images."""
902
            # Compute the absolute value of the pixel-by-pixel difference between the two images.
903
            diff_image = ImageChops.difference(img1, img2)
2✔
904

905
            # Compute the mean squared error between the images
906
            if not isinstance(np, str):
2✔
907
                diff_array = np.array(diff_image)
2✔
908
                mse_value = np.mean(np.square(diff_array))
2✔
909
            else:  # pragma: no cover
910
                mse_value = None
911

912
            # Create the diff image by overlaying this difference on a darkened greyscale background
913
            back_image = img1.convert('L')
2✔
914
            back_image_brightness = ImageStat.Stat(back_image).rms[0]
2✔
915
            back_image = ImageEnhance.Brightness(back_image).enhance(back_image_brightness / 225)
2✔
916

917
            # Convert the 'L' image to 'RGB' using a matrix that applies to yellow tint
918
            # The matrix has 12 elements: 4 for Red, 4 for Green, and 4 for Blue.
919
            # For yellow, we want Red and Green to copy the L values (1.0) and Blue to be zero.
920
            # The matrix is: [R, G, B, A] for each of the three output channels
921
            yellow_tint_matrix = (
2✔
922
                1.0,
923
                0.0,
924
                0.0,
925
                0.0,  # Red = 100% of the grayscale value
926
                1.0,
927
                0.0,
928
                0.0,
929
                0.0,  # Green = 100% of the grayscale value
930
                0.0,
931
                0.0,
932
                0.0,
933
                0.0,  # Blue = 0% of the grayscale value
934
            )
935

936
            # Apply the conversion
937
            diff_colored = diff_image.convert('RGB').convert('RGB', matrix=yellow_tint_matrix)
2✔
938

939
            final_img = ImageChops.add(back_image.convert('RGB'), diff_colored)
2✔
940

941
            return final_img, mse_value
2✔
942

943
        data_type = directives.get('data_type', 'url')
2✔
944
        mse_threshold = directives.get('mse_threshold', 2.5)
2✔
945
        if not isinstance(self.state.old_data, str):
2!
946
            raise ValueError('old_data is not a string')
×
947
        if not isinstance(self.state.new_data, str):
2!
948
            raise ValueError('new_data is not a string')
×
949
        if data_type == 'url':
2✔
950
            old_image = load_image_from_web(self.state.old_data)
2✔
951
            new_image = load_image_from_web(self.state.new_data)
2✔
952
            old_data = f' (<a href="{self.state.old_data}">Old image</a>)'
2✔
953
            new_data = f' (<a href="{self.state.new_data}">New image</a>)'
2✔
954
        elif data_type == 'ascii85':
2✔
955
            old_image = load_image_from_ascii85(self.state.old_data)
2✔
956
            new_image = load_image_from_ascii85(self.state.new_data)
2✔
957
            old_data = ''
2✔
958
            new_data = ''
2✔
959
        elif data_type == 'base64':
2✔
960
            old_image = load_image_from_base64(self.state.old_data)
2✔
961
            new_image = load_image_from_base64(self.state.new_data)
2✔
962
            old_data = ''
2✔
963
            new_data = ''
2✔
964
        else:  # 'filename'
965
            old_image = load_image_from_file(self.state.old_data)
2✔
966
            new_image = load_image_from_file(self.state.new_data)
2✔
967
            old_data = f' (<a href="file://{self.state.old_data}">Old image</a>)'
2✔
968
            new_data = f' (<a href="file://{self.state.new_data}">New image</a>)'
2✔
969

970
        # Check formats  TODO: is it needed? under which circumstances?
971
        # if new_image.format != old_image.format:
972
        #     logger.info(f'Image formats do not match: {old_image.format} vs {new_image.format}')
973
        # else:
974
        #     logger.debug(f'image format is {old_image.format}')
975

976
        # If needed, shrink the larger image
977
        if new_image.size != old_image.size:
2✔
978
            if new_image.size > old_image.size:
2✔
979
                logging.debug(f'Job {self.job.index_number}: Shrinking the new image')
2✔
980
                img_format = new_image.format
2✔
981
                new_image = new_image.resize(old_image.size, Image.LANCZOS)
2✔
982
                new_image.format = img_format
2✔
983

984
            else:
985
                logging.debug(f'Job {self.job.index_number}: Shrinking the old image')
2✔
986
                img_format = old_image.format
2✔
987
                old_image = old_image.resize(new_image.size, Image.LANCZOS)
2✔
988
                old_image.format = img_format
2✔
989

990
        if old_image == new_image:
2✔
991
            logger.info(f'Job {self.job.index_number}: New image is identical to the old one')
2✔
992
            self.state.verb = 'unchanged'
2✔
993
            return {'text': '', 'markdown': '', 'html': ''}
2✔
994

995
        diff_image, mse_value = compute_diff_image(old_image, new_image)
2✔
996
        if mse_value:
2!
997
            logger.debug(f'Job {self.job.index_number}: MSE value {mse_value:.2f}')
2✔
998

999
        if mse_value and mse_value < mse_threshold:
2✔
1000
            logger.info(
2✔
1001
                f'Job {self.job.index_number}: MSE value {mse_value:.2f} below the threshold of {mse_threshold}; '
1002
                f'considering changes not worthy of a report'
1003
            )
1004
            self.state.verb = 'changed,no_report'
2✔
1005
            return {'text': '', 'markdown': '', 'html': ''}
2✔
1006

1007
        # Convert the difference image to a base64 object
1008
        output_stream = BytesIO()
2✔
1009
        diff_image.save(output_stream, format=new_image.format)
2✔
1010
        encoded_diff = b64encode(output_stream.getvalue()).decode()
2✔
1011

1012
        # Convert the new image to a base64 object
1013
        output_stream = BytesIO()
2✔
1014
        new_image.save(output_stream, format=new_image.format)
2✔
1015
        encoded_new = b64encode(output_stream.getvalue()).decode()
2✔
1016

1017
        # Prepare HTML output
1018
        new_timestamp = self.make_timestamp(self.state.new_timestamp, tz)
2✔
1019
        old_timestamp = self.make_timestamp(self.state.old_timestamp, tz)
2✔
1020
        sep = '—' * (len(old_timestamp) + 6)
2✔
1021
        htm = [
2✔
1022
            f'<span style="font-family:monospace">Differ: {self.__kind__} for {data_type}',
1023
            f'<span style="color:darkred;">--- @ {old_timestamp}{old_data}</span>',
1024
            f'<span style="color:darkgreen;">+++ @ {new_timestamp}{new_data}' f'</span>',
1025
            f'{sep}</span>',
1026
            'New image:',
1027
            f'<img src="data:image/{new_image.format.lower()};base64,{encoded_new}">',
1028
            'Differences from old (in yellow):',
1029
            f'<img src="data:image/{old_image.format.lower()};base64,{encoded_diff}">',
1030
            '',
1031
        ]
1032

1033
        return {
2✔
1034
            'text': 'The image has changed; please see an HTML report for the visualization.',
1035
            'markdown': 'The image has changed; please see an HTML report for the visualization.',
1036
            'html': '<br>\n'.join(htm),
1037
        }
1038

1039

1040
class AIGoogleDiffer(DifferBase):
8✔
1041
    """(Default) Generates a summary using Google Generative AI (Gemini models).
1042

1043
    Calls Google Gemini APIs; documentation at https://ai.google.dev/api/rest and tutorial at
1044
    https://ai.google.dev/tutorials/rest_quickstart
1045

1046
    """
1047

1048
    __kind__ = 'ai_google'
8✔
1049

1050
    __supported_directives__ = {
8✔
1051
        'model': (
1052
            'model name from https://ai.google.dev/gemini-api/docs/models/gemini (default: gemini-1.5-flash-latest)'
1053
        ),
1054
        'prompt': 'a custom prompt - {unified_diff}, {old_data} and {new_data} will be replaced; ask for markdown',
1055
        'system_instructions': 'Optional tone and style instructions for the model (default: Respond in Markdown)',
1056
        'prompt_ud_context_lines': 'the number of context lines for {unified_diff} (default: 9999)',
1057
        'timeout': 'the number of seconds before timing out the API call (default: 300)',
1058
        'max_output_tokens': "the maximum number of tokens returned by the model (default: None, i.e. model's default)",
1059
        'temperature': "the model's Temperature parameter (default: 0.0)",
1060
        'top_p': "the model's TopP parameter (default: None, i.e. model's default",
1061
        'top_k': "the model's TopK parameter (default: None, i.e. model's default",
1062
        'token_limit': (
1063
            "the maximum number of tokens, if different from model's default (default: None, i.e. model's default)"
1064
        ),
1065
    }
1066
    __default_subdirective__ = 'model'
8✔
1067

1068
    def differ(
8✔
1069
        self,
1070
        directives: dict[str, Any],
1071
        report_kind: Literal['text', 'markdown', 'html'],
1072
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1073
        tz: str | None = None,
1074
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1075
        logger.info(f'Job {self.job.index_number}: Running the {self.__kind__} differ from hooks.py')
8✔
1076
        warnings.warn(
8✔
1077
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1078
            f'change in the future. Please report any problems or suggestions at '
1079
            f'https://github.com/mborsetti/webchanges/discussions.',
1080
            RuntimeWarning,
1081
        )
1082

1083
        def get_ai_summary(prompt: str, system_instructions: str) -> str:
8✔
1084
            """Generate AI summary from unified diff, or an error message"""
1085
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
8✔
1086
            if len(GOOGLE_AI_API_KEY) != 39:
8✔
1087
                logger.error(
8✔
1088
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1089
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1090
                )
1091
                return (
8✔
1092
                    f'## ERROR in summarizing the changes using {self.__kind__}:\n'
1093
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1094
                    f'{len(GOOGLE_AI_API_KEY)}.\n'
1095
                )
1096

1097
            api_version = '1beta'
8✔
1098
            _models_token_limits = {  # from https://ai.google.dev/gemini-api/docs/models/gemini
8✔
1099
                'gemini-1.5': 1048576,
1100
                'gemini-1.0': 30720,
1101
                'gemini-pro': 30720,  # legacy
1102
            }
1103

1104
            if 'model' not in directives:
8!
1105
                directives['model'] = 'gemini-1.5-flash-latest'  # also for footer
×
1106
            model = directives['model']
8✔
1107
            token_limit = directives.get('token_limit')
8✔
1108
            if not token_limit:
8✔
1109
                for _model, _token_limit in _models_token_limits.items():
8!
1110
                    if model.startswith(_model):
8✔
1111
                        token_limit = _token_limit
8✔
1112
                        break
8✔
1113
                if not token_limit:
8!
1114
                    logger.error(
×
1115
                        f"Job {self.job.index_number}: Differ '{self.__kind__}' does not know `model: {model}` "
1116
                        f"(supported models starting with: {', '.join(sorted(list(_models_token_limits.keys())))}) "
1117
                        f'({self.job.get_location()})'
1118
                    )
1119
                    return f'## ERROR in summarizing the changes using {self.__kind__}:\n' f'Unknown model {model}.\n'
×
1120

1121
            if '{unified_diff}' in prompt:
8✔
1122
                context_lines = directives.get('prompt_ud_context_lines', 9999)
8✔
1123
                unified_diff = '\n'.join(
8✔
1124
                    difflib.unified_diff(
1125
                        str(self.state.old_data).splitlines(),
1126
                        str(self.state.new_data).splitlines(),
1127
                        # '@',
1128
                        # '@',
1129
                        # self.make_timestamp(self.state.old_timestamp, tz),
1130
                        # self.make_timestamp(self.state.new_timestamp, tz),
1131
                        n=context_lines,
1132
                    )
1133
                )
1134
                if not unified_diff:
8!
1135
                    # no changes
1136
                    return ''
×
1137
            else:
1138
                unified_diff = ''
8✔
1139

1140
            def _send_to_model(model_prompt: str, system_instructions: str) -> str:
8✔
1141
                """Creates the summary request to the model"""
1142
                max_output_tokens = directives.get('max_output_tokens')
×
1143
                temperature = directives.get('temperature', 0.0)
×
1144
                top_p = directives.get('top_p')
×
1145
                top_k = directives.get('top_k')
×
1146
                data = {
×
1147
                    'system_instruction': {'parts': [{'text': system_instructions}]},
1148
                    'contents': [{'parts': [{'text': model_prompt}]}],
1149
                    'generation_config': {
1150
                        'max_output_tokens': max_output_tokens,
1151
                        'temperature': temperature,
1152
                        'top_p': top_p,
1153
                        'top_k': top_k,
1154
                    },
1155
                }
1156
                logger.info(f'Job {self.job.index_number}: Making summary request to Google model {model}')
×
1157
                try:
×
1158
                    timeout = directives.get('timeout', 300)
×
1159
                    r = httpx.Client(http2=True).post(
×
1160
                        f'https://generativelanguage.googleapis.com/v{api_version}/models/{model}:generateContent?'
1161
                        f'key={GOOGLE_AI_API_KEY}',
1162
                        json=data,
1163
                        headers={'Content-Type': 'application/json'},
1164
                        timeout=timeout,
1165
                    )
1166
                    if r.is_success:
×
1167
                        result = r.json()
×
1168
                        candidate = result['candidates'][0]
×
1169
                        logger.info(
×
1170
                            f"Job {self.job.index_number}: AI generation finished by {candidate['finishReason']}"
1171
                        )
1172
                        summary = candidate['content']['parts'][0]['text']
×
1173
                    elif r.status_code == 400:
×
1174
                        summary = (
×
1175
                            f'AI summary unavailable: Received error from {r.url.host}: '
1176
                            f"{r.json().get('error', {}).get('message') or ''}"
1177
                        )
1178
                    else:
1179
                        summary = (
×
1180
                            f'AI summary unavailable: Received error {r.status_code} {r.reason_phrase} from '
1181
                            f'{r.url.host}'
1182
                        )
1183
                        if r.content:
×
1184
                            summary += f": {r.json().get('error', {}).get('message') or ''}"
×
1185

1186
                except httpx.HTTPError as e:
×
1187
                    summary = (
×
1188
                        f'AI summary unavailable: HTTP client error: {e.args[0]} when requesting data from '
1189
                        f'{e.request.url.host}'
1190
                    )
1191

1192
                return summary
×
1193

1194
            # check if data is different (for testing)
1195
            if '{old_data}' in prompt and '{new_data}' in prompt and self.state.old_data == self.state.new_data:
8✔
1196
                return ''
8✔
1197

1198
            model_prompt = prompt.format(
8✔
1199
                unified_diff=unified_diff, old_data=self.state.old_data, new_data=self.state.new_data
1200
            )
1201

1202
            if len(model_prompt) / 4 < token_limit:
8!
1203
                summary = _send_to_model(model_prompt, system_instructions)
×
1204
            elif '{unified_diff}' in prompt:
8!
1205
                logger.info(
8✔
1206
                    f'Job {self.job.index_number}: Model prompt with full diff is too long: '
1207
                    f'({len(model_prompt) / 4:,.0f} est. tokens exceeds limit of {token_limit:,.0f} tokens); '
1208
                    f'recomputing with default contextlines'
1209
                )
1210
                unified_diff = '\n'.join(
8✔
1211
                    difflib.unified_diff(
1212
                        str(self.state.old_data).splitlines(),
1213
                        str(self.state.new_data).splitlines(),
1214
                        # '@',
1215
                        # '@',
1216
                        # self.make_timestamp(self.state.old_timestamp, tz),
1217
                        # self.make_timestamp(self.state.new_timestamp, tz),
1218
                    )
1219
                )
1220
                model_prompt = prompt.format(
8✔
1221
                    unified_diff=unified_diff, old_data=self.state.old_data, new_data=self.state.new_data
1222
                )
1223
                if len(model_prompt) / 4 < token_limit:
8!
1224
                    summary = _send_to_model(model_prompt, system_instructions)
×
1225
                else:
1226
                    summary = (
8✔
1227
                        f'AI summary unavailable (model prompt with unified diff is too long: '
1228
                        f'{len(model_prompt) / 4:,.0f} est. tokens exceeds maximum of {token_limit:,.0f})'
1229
                    )
1230
            else:
1231
                logger.info(
×
1232
                    f'The model prompt may be too long: {len(model_prompt) / 4:,.0f} est. tokens exceeds '
1233
                    f'limit of {token_limit:,.0f} tokens'
1234
                )
1235
                summary = _send_to_model(model_prompt, system_instructions)
×
1236
            return summary
8✔
1237

1238
        prompt = directives.get(
8✔
1239
            'prompt',
1240
            'Identify the changes between the old document (enclosed by an <old> tag) and the new document ('
1241
            'enclosed by a <new> tag) and output a summary of such changes:\n\n<old>\n{old_data}\n</old>\n\n<new>\n'
1242
            '{new_data}\n</new>',
1243
        ).replace('\\n', '\n')
1244
        system_instructions = directives.get('system_instructions', 'Respond in Markdown')
8✔
1245
        summary = get_ai_summary(prompt, system_instructions)
8✔
1246
        if not summary:
8✔
1247
            self.state.verb = 'changed,no_report'
8✔
1248
            return {'text': '', 'markdown': '', 'html': ''}
8✔
1249
        newline = '\n'  # For Python < 3.12 f-string compatibility
8✔
1250
        back_n = '\\n'  # For Python < 3.12 f-string compatibility
8✔
1251
        directives_text = (
8✔
1252
            ', '.join(f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.items()) or 'None'
1253
        )
1254
        footer = f'Summary generated by Google Generative AI (differ directive(s): {directives_text})'
8✔
1255
        temp_unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
1256
        for rep_kind in ['text', 'html']:  # markdown is same as text
8✔
1257
            unified_report = DifferBase.process(
8✔
1258
                'unified',
1259
                directives.get('unified') or {},  # type: ignore[arg-type]
1260
                self.state,
1261
                rep_kind,  # type: ignore[arg-type]
1262
                tz,
1263
                temp_unfiltered_diff,
1264
            )
1265
        return {
8✔
1266
            'text': summary + '\n\n' + unified_report['text'] + '\n------------\n' + footer,
1267
            'markdown': summary + '\n\n' + unified_report['markdown'] + '\n* * *\n' + footer,
1268
            'html': (
1269
                mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>')
1270
                + '<br>'
1271
                + unified_report['html']
1272
                + '-----<br>'
1273
                + f'<i><small>{footer}</small></i>'
1274
            ),
1275
        }
1276

1277

1278
class WdiffDiffer(DifferBase):
8✔
1279
    __kind__ = 'wdiff'
8✔
1280

1281
    __supported_directives__: dict[str, str] = {
8✔
1282
        'context_lines': 'the number of context lines (default: 3)',
1283
        'range_info': 'include range information lines (default: true)',
1284
    }
1285

1286
    def differ(
8✔
1287
        self,
1288
        directives: dict[str, Any],
1289
        report_kind: Literal['text', 'markdown', 'html'],
1290
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
1291
        tz: Optional[str] = None,
1292
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1293
        warnings.warn(
8✔
1294
            f'Job {self.job.index_number}: Differ {self.__kind__} is WORK IN PROGRESS and has KNOWN bugs which '
1295
            "are being worked on. DO NOT USE AS THE RESULTS WON'T BE CORRECT.",
1296
            RuntimeWarning,
1297
        )
1298
        if not isinstance(self.state.old_data, str):
8!
1299
            raise ValueError
×
1300
        if not isinstance(self.state.new_data, str):
8!
1301
            raise ValueError
×
1302

1303
        # Split the texts into words tokenizing newline
1304
        if self.job.is_markdown:
8!
1305
            # Don't split spaces in link text, tokenize space as </s>
1306
            old_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.old_data)
×
1307
            words1 = old_data.replace('\n', ' <\\n> ').split(' ')
×
1308
            new_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.new_data)
×
1309
            words2 = new_data.replace('\n', ' <\\n> ').split(' ')
×
1310
        else:
1311
            words1 = self.state.old_data.replace('\n', ' <\\n> ').split(' ')
8✔
1312
            words2 = self.state.new_data.replace('\n', ' <\\n> ').split(' ')
8✔
1313

1314
        # Create a Differ object
1315
        import difflib
8✔
1316

1317
        d = difflib.Differ()
8✔
1318

1319
        # Generate a difference list
1320
        diff = list(d.compare(words1, words2))
8✔
1321

1322
        add_html = '<span style="background-color:#d1ffd1;color:#082b08;">'
8✔
1323
        rem_html = '<span style="background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;">'
8✔
1324

1325
        old_timestamp = self.make_timestamp(self.state.old_timestamp, tz)
8✔
1326
        new_timestamp = self.make_timestamp(self.state.new_timestamp, tz)
8✔
1327
        sep = '—' * (len(old_timestamp) + 6)
8✔
1328
        head_text = (
8✔
1329
            'Differ: wdiff\n' f'\033[91m--- @ {old_timestamp}\033[0m\n' f'\033[92m+++ @ {new_timestamp}\033[0m\n{sep}\n'
1330
        )
1331
        head_html = '<br>\n'.join(
8✔
1332
            [
1333
                '<span style="font-family:monospace;">Differ: wdiff',
1334
                f'<span style="color:darkred;">--- @ {old_timestamp}</span>',
1335
                f'<span style="color:darkgreen;">+++ @ {new_timestamp}</span>',
1336
                f'{sep}</span>',
1337
                '',
1338
            ]
1339
        )
1340
        # Process the diff output to make it more wdiff-like
1341
        result_text = []
8✔
1342
        result_html = []
8✔
1343
        prev_word_text = ''
8✔
1344
        prev_word_html = ''
8✔
1345
        next_text = ''
8✔
1346
        next_html = ''
8✔
1347
        add = False
8✔
1348
        rem = False
8✔
1349

1350
        for word_text in diff + ['  ']:
8✔
1351
            word_html = word_text
8✔
1352
            pre_text = [next_text] if next_text else []
8✔
1353
            pre_html = [next_html] if next_html else []
8✔
1354
            next_text = ''
8✔
1355
            next_html = ''
8✔
1356

1357
            if word_text[0] == '+' and not add:  # Beginning of additions
8✔
1358
                if rem:
8!
1359
                    prev_word_html += '</span>'
8✔
1360
                    rem = False
8✔
1361
                if word_text[2:] == '<\\n>':
8!
1362
                    next_text = '\033[92m'
×
1363
                    next_html = add_html
×
1364
                else:
1365
                    pre_text.append('\033[92m')
8✔
1366
                    pre_html.append(add_html)
8✔
1367
                add = True
8✔
1368
            elif word_text[0] == '-' and not rem:  # Beginning of deletions
8✔
1369
                if add:
8!
1370
                    prev_word_html += '</span>'
×
1371
                    add = False
×
1372
                if word_text[2:] == '<\\n>':
8!
1373
                    next_text = '\033[91m'
×
1374
                    next_html = rem_html
×
1375
                else:
1376
                    pre_text.append('\033[91m')
8✔
1377
                    pre_html.append(rem_html)
8✔
1378
                rem = True
8✔
1379
            elif word_text[0] == ' ' and (add or rem):  # Unchanged word
8!
1380
                if prev_word_text == '<\\n>':
8!
1381
                    prev_word_text = '\033[0m<\\n>'
×
1382
                    prev_word_html = '</span><\\n>'
×
1383
                else:
1384
                    prev_word_text += '\033[0m'
8✔
1385
                    prev_word_html += '</span>'
8✔
1386
                add = False
8✔
1387
                rem = False
8✔
1388
            elif word_text[2:] == '<\\n>':  # New line
×
1389
                if add:
×
1390
                    word_text = '  \033[0m<\\n>'
×
1391
                    word_html = '  </span><\\n>'
×
1392
                    add = False
×
1393
                elif rem:
×
1394
                    word_text = '  \033[0m<\\n>'
×
1395
                    word_html = '  </span><\\n>'
×
1396
                    rem = False
×
1397

1398
            result_text.append(prev_word_text)
8✔
1399
            result_html.append(prev_word_html)
8✔
1400
            pre_text.append(word_text[2:])
8✔
1401
            pre_html.append(word_html[2:])
8✔
1402
            prev_word_text = ''.join(pre_text)
8✔
1403
            prev_word_html = ''.join(pre_html)
8✔
1404
        if add or rem:
8!
1405
            result_text[-1] += '\033[0m'
×
1406
            result_html[-1] += '</span>'
×
1407

1408
        # rebuild the text from words, replacing the newline token
1409
        diff_text = ' '.join(result_text[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1410
        diff_html = ' '.join(result_html[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1411

1412
        # build contextlines
1413
        contextlines = directives.get('context_lines', self.job.contextlines)
8✔
1414
        # contextlines = 999
1415
        if contextlines is None:
8!
1416
            contextlines = 3
8✔
1417
        range_info = directives.get('range_info', True)
8✔
1418
        if contextlines < len(diff_text.splitlines()):
8!
1419
            lines_with_changes = []
×
1420
            for i, line in enumerate(diff_text.splitlines()):
×
1421
                if '\033[9' in line:
×
1422
                    lines_with_changes.append(i)
×
1423
            if contextlines:
×
1424
                lines_to_keep: set[int] = set()
×
1425
                for i in lines_with_changes:
×
1426
                    lines_to_keep.update(r for r in range(i - contextlines, i + contextlines + 1))
×
1427
            else:
1428
                lines_to_keep = set(lines_with_changes)
×
1429
            new_diff_text = []
×
1430
            new_diff_html = []
×
1431
            last_line = 0
×
1432
            skip = False
×
1433
            for i, (line_text, line_html) in enumerate(zip(diff_text.splitlines(), diff_html.splitlines())):
×
1434
                if i in lines_to_keep:
×
1435
                    if range_info and skip:
×
1436
                        new_diff_text.append(f'@@ {last_line}...{i + 1} @@')
×
1437
                        new_diff_html.append(f'@@ {last_line}...{i + 1} @@')
×
1438
                        skip = False
×
1439
                    new_diff_text.append(line_text)
×
1440
                    new_diff_html.append(line_html)
×
1441
                    last_line = i + 1
×
1442
                else:
1443
                    skip = True
×
1444
            diff_text = '\n'.join(new_diff_text)
×
1445
            diff_html = '\n'.join(new_diff_html)
×
1446

1447
        if self.job.is_markdown:
8!
1448
            diff_text = diff_text.replace('</s>', ' ')
×
1449
            diff_html = diff_html.replace('</s>', ' ')
×
1450
            diff_html = mark_to_html(diff_html, self.job.markdown_padded_tables).replace('<p>', '').replace('</p>', '')
×
1451

1452
        if self.job.monospace:
8!
1453
            diff_html = f'<span style="font-family:monospace;white-space:pre-wrap">{diff_html}</span>'
×
1454
        else:
1455
            diff_html = diff_html.replace('\n', '<br>\n')
8✔
1456

1457
        return {
8✔
1458
            'text': head_text + diff_text,
1459
            'markdown': head_text + diff_text,
1460
            'html': head_html + diff_html,
1461
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc