• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 9088219918

15 May 2024 12:45AM UTC coverage: 77.86% (+0.05%) from 77.815%
9088219918

push

github

mborsetti
Version 3.23.0

1731 of 2487 branches covered (69.6%)

Branch coverage included in aggregate %.

7 of 8 new or added lines in 2 files covered. (87.5%)

1 existing line in 1 file now uncovered.

4374 of 5354 relevant lines covered (81.7%)

6.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.17
/webchanges/differs.py
1
"""Differs."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import base64
8✔
8
import difflib
8✔
9
import html
8✔
10
import logging
8✔
11
import math
8✔
12
import os
8✔
13
import re
8✔
14
import shlex
8✔
15
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
16
import tempfile
8✔
17
import traceback
8✔
18
import urllib.parse
8✔
19
import warnings
8✔
20
from base64 import b64encode
8✔
21
from datetime import datetime
8✔
22
from io import BytesIO
8✔
23
from pathlib import Path
8✔
24
from typing import Any, Iterator, Literal, Optional, TYPE_CHECKING
8✔
25
from zoneinfo import ZoneInfo
8✔
26

27
import html2text
8✔
28

29
from webchanges.util import linkify, mark_to_html, TrackSubClasses
8✔
30

31
try:
8✔
32
    from deepdiff import DeepDiff
8✔
33
    from deepdiff.model import DiffLevel
8✔
34
except ImportError as e:  # pragma: no cover
35
    DeepDiff = e.msg  # type: ignore[no-redef]
36

37
try:
8✔
38
    import httpx
8✔
39
except ImportError:  # pragma: no cover
40
    httpx = None  # type: ignore[assignment]
41
if httpx is not None:
8!
42
    try:
8✔
43
        import h2
8✔
44
    except ImportError:  # pragma: no cover
45
        h2 = None  # type: ignore[assignment]
46

47
try:
8✔
48
    import numpy as np
8✔
49
except ImportError as e:  # pragma: no cover
50
    np = e.msg  # type: ignore[assignment]
51

52
try:
8✔
53
    from PIL import Image, ImageChops, ImageEnhance, ImageStat
8✔
54
except ImportError as e:  # pragma: no cover
55
    Image = e.msg  # type: ignore[no-redef]
56

57
# https://stackoverflow.com/questions/712791
58
try:
8✔
59
    import simplejson as jsonlib
8✔
60
except ImportError:  # pragma: no cover
61
    import json as jsonlib  # type: ignore[no-redef]
62

63
try:
8✔
64
    import xmltodict
8✔
65
except ImportError as e:  # pragma: no cover
66
    xmltodict = e.msg  # type: ignore[no-redef]
67

68
# https://stackoverflow.com/questions/39740632
69
if TYPE_CHECKING:
70
    from webchanges.handler import JobState
71

72

73
logger = logging.getLogger(__name__)
8✔
74

75

76
class DifferBase(metaclass=TrackSubClasses):
8✔
77
    """The base class for differs."""
78

79
    __subclasses__: dict[str, type[DifferBase]] = {}
8✔
80
    __anonymous_subclasses__: list[type[DifferBase]] = []
8✔
81

82
    __kind__: str = ''
8✔
83

84
    __supported_directives__: dict[str, str] = {}  # this must be present, even if empty
8✔
85

86
    css_added_style = 'background-color:#d1ffd1;color:#082b08;'
8✔
87
    css_deltd_style = 'background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;'
8✔
88

89
    def __init__(self, state: JobState) -> None:
8✔
90
        """
91

92
        :param state: the JobState.
93
        """
94
        self.job = state.job
8✔
95
        self.state = state
8✔
96

97
    @classmethod
8✔
98
    def differ_documentation(cls) -> str:
8✔
99
        """Generates simple differ documentation for use in the --features command line argument.
100

101
        :returns: A string to display.
102
        """
103
        result: list[str] = []
8✔
104
        for sc in TrackSubClasses.sorted_by_kind(cls):
8✔
105
            # default_subdirective = getattr(sc, '__default_subdirective__', None)
106
            result.extend((f'  * {sc.__kind__} - {sc.__doc__}',))
8✔
107
            if hasattr(sc, '__supported_directives__'):
8!
108
                for key, doc in sc.__supported_directives__.items():
8✔
109
                    result.append(f'      {key} ... {doc}')
8✔
110
        result.append('\n[] ... Parameter can be supplied as unnamed value\n')
8✔
111
        return '\n'.join(result)
8✔
112

113
    @classmethod
8✔
114
    def normalize_differ(
8✔
115
        cls,
116
        differ_spec: Optional[dict[str, Any]],
117
        job_index_number: Optional[int] = None,
118
    ) -> tuple[str, dict[str, Any]]:
119
        """Checks the differ_spec for its validity and applies default values.
120

121
        :param differ_spec: The differ as entered by the user; use "unified" if empty.
122
        :param job_index_number: The job index number.
123
        :returns: A validated differ_kind, subdirectives (where subdirectives is a dict).
124
        """
125
        differ_spec = differ_spec or {'name': 'unified'}
8✔
126
        subdirectives = differ_spec.copy()
8✔
127
        differ_kind = subdirectives.pop('name', '')
8✔
128
        if not differ_kind:
8✔
129
            if list(subdirectives.keys()) == ['command']:
8!
130
                differ_kind = 'command'
8✔
131
            else:
132
                raise ValueError(
×
133
                    f"Job {job_index_number}: Differ directive must have a 'name' sub-directive: {differ_spec}."
134
                )
135

136
        differcls = cls.__subclasses__.get(differ_kind, None)
8✔
137
        if not differcls:
8✔
138
            raise ValueError(f'Job {job_index_number}: No differ named {differ_kind}.')
8✔
139

140
        if hasattr(differcls, '__supported_directives__'):
8!
141
            provided_keys = set(subdirectives.keys())
8✔
142
            allowed_keys = set(differcls.__supported_directives__.keys())
8✔
143
            unknown_keys = provided_keys.difference(allowed_keys)
8✔
144
            if unknown_keys and '<any>' not in allowed_keys:
8✔
145
                raise ValueError(
8✔
146
                    f'Job {job_index_number}: Differ {differ_kind} does not support sub-directive(s) '
147
                    f"{', '.join(unknown_keys)} (supported: {', '.join(sorted(allowed_keys))})."
148
                )
149

150
        return differ_kind, subdirectives
8✔
151

152
    @classmethod
8✔
153
    def process(
8✔
154
        cls,
155
        differ_kind: str,
156
        directives: dict[str, Any],
157
        job_state: JobState,
158
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
159
        tz: Optional[str] = None,
160
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
161
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
162
        """Process the differ.
163

164
        :param differ_kind: The name of the differ.
165
        :param directives: The directives.
166
        :param job_state: The JobState.
167
        :param report_kind: The report kind required.
168
        :param tz: The timezone of the report.
169
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
170
           for a different report_kind.
171
        :returns: The output of the differ or a an error message with traceback if it fails.
172
        """
173
        logger.info(f'Job {job_state.job.index_number}: Applying differ {differ_kind}, directives {directives}')
8✔
174
        differcls: Optional[type[DifferBase]] = cls.__subclasses__.get(differ_kind)  # type: ignore[assignment]
8✔
175
        if differcls:
8✔
176
            try:
8✔
177
                return differcls(job_state).differ(directives, report_kind, _unfiltered_diff, tz)
8✔
178
            except Exception as e:
8✔
179
                # Differ failed
180
                logger.info(
8✔
181
                    f'Job {job_state.job.index_number}: Differ {differ_kind} with {directives=} encountered '
182
                    f'error {e}'
183
                )
184
                # Undo saving of new data since user won't see the diff
185
                job_state.delete_latest()
8✔
186

187
                job_state.exception = e
8✔
188
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
189
                directives_text = ', '.join(f'{key}={value}' for key, value in directives.items()) or 'None'
8✔
190
                return {
8✔
191
                    'text': (
192
                        f'Differ {differ_kind} with directive(s) {directives_text} encountered an '
193
                        f'error:\n\n{job_state.traceback.strip()}'
194
                    ),
195
                    'markdown': (
196
                        f'## Differ {differ_kind} with directive(s) {directives_text} '
197
                        f'encountered an error:\n```\n{job_state.traceback.strip()}\n```\n'
198
                    ),
199
                    'html': (
200
                        f'<span style="color:red;font-weight:bold">Differ {differ_kind} with directive(s) '
201
                        f'{directives_text} encountered an error:<br>\n<br>\n'
202
                        f'<span style="font-family:monospace;white-space:pre-wrap;">{job_state.traceback.strip()}'
203
                        f'</span></span>'
204
                    ),
205
                }
206
        else:
207
            return {}
8✔
208

209
    def differ(
8✔
210
        self,
211
        directives: dict[str, Any],
212
        report_kind: Literal['text', 'markdown', 'html'],
213
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
214
        tz: Optional[str] = None,
215
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
216
        """Create a diff from the data. Since this function could be called by different reporters of multiple report
217
        types ('text', 'markdown', 'html'), the differ outputs a dict with data for the report_kind it generated so
218
        that it can be reused.
219

220
        :param directives: The directives.
221
        :param report_kind: The report_kind for which a diff must be generated (at a minimum).
222
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
223
           for a different report_kind.
224
        :param tz: The timezone of the report.
225
        :returns: An empty dict if there is no change, otherwise a dict with report_kind as key and diff as value
226
           (as a minimum for the report_kind requested).
227
        :raises RuntimeError: If the external diff tool returns an error.
228
        """
229
        raise NotImplementedError()
230

231
    @staticmethod
8✔
232
    def make_timestamp(
8✔
233
        timestamp: float,
234
        tz: Optional[str] = None,
235
    ) -> str:
236
        """Creates a datetime string in RFC 5322 (email) format with the time zone name (if available) in the
237
        Comments and Folding White Space (CFWS) section.
238

239
        :param timestamp: The timestamp.
240
        :param tz: The IANA timezone of the report.
241
        :returns: A datetime string in RFC 5322 (email) format.
242
        """
243
        if timestamp:
8✔
244
            if tz:
8✔
245
                tz_info: Optional[ZoneInfo] = ZoneInfo(tz)
8✔
246
            else:
247
                tz_info = None
8✔
248
            dt = datetime.fromtimestamp(timestamp).astimezone(tz=tz_info)
8✔
249
            # add timezone name if known
250
            if dt.strftime('%Z') != dt.strftime('%z')[:3]:
8✔
251
                cfws = f" ({dt.strftime('%Z')})"
8✔
252
            else:
253
                cfws = ''
8✔
254
            return dt.strftime('%a, %d %b %Y %H:%M:%S %z') + cfws
8✔
255
        else:
256
            return 'NEW'
8✔
257

258
    @staticmethod
8✔
259
    def html2text(data: str) -> str:
8✔
260
        """Converts html to text.
261

262
        :param data: the string in html format.
263
        :returns: the string in text format.
264
        """
265
        parser = html2text.HTML2Text()
8✔
266
        parser.unicode_snob = True
8✔
267
        parser.body_width = 0
8✔
268
        parser.ignore_images = True
8✔
269
        parser.single_line_break = True
8✔
270
        parser.wrap_links = False
8✔
271
        return '\n'.join(line.rstrip() for line in parser.handle(data).splitlines())
8✔
272

273
    def raise_import_error(self, package_name: str, error_message: str) -> None:
8✔
274
        """Raise ImportError for missing package.
275

276
        :param package_name: The name of the module/package that could not be imported.
277
        :param error_message: The error message from ImportError.
278

279
        :raises: ImportError.
280
        """
281
        raise ImportError(
8✔
282
            f"Job {self.job.index_number}: Python package '{package_name}' is not installed; cannot use "
283
            f"'differ: {self.__kind__}' ({self.job.get_location()})\n{error_message}"
284
        )
285

286

287
class UnifiedDiffer(DifferBase):
8✔
288
    """(Default) Generates a unified diff."""
289

290
    __kind__ = 'unified'
8✔
291

292
    __supported_directives__ = {
8✔
293
        'context_lines': 'the number of context lines (default: 3)',
294
        'range_info': 'include range information lines (default: true)',
295
    }
296

297
    def unified_diff_to_html(self, diff: str) -> Iterator[str]:
8✔
298
        """
299
        Generates a colorized HTML table from unified diff, applying styles and processing based on job values.
300

301
        :param diff: the unified diff
302
        """
303

304
        def process_line(line: str, line_num: int, monospace_style: str) -> str:
8✔
305
            """
306
            Processes each line for HTML output, handling special cases and styles.
307

308
            :param line: The line to analyze.
309
            :param line_num: The line number in the document.
310
            :param monospace_style: Additional style string for monospace text.
311

312
            :returns: The line processed into an HTML table row string.
313
            """
314
            # The style= string (or empty string) to add to an HTML tag.
315
            if line_num == 0:
8✔
316
                style = 'font-family:monospace;color:darkred;'
8✔
317
            elif line_num == 1:
8✔
318
                style = 'font-family:monospace;color:darkgreen;'
8✔
319
            elif line[0] == '+':  # addition
8✔
320
                style = f'{monospace_style}{self.css_added_style}'
8✔
321
            elif line[0] == '-':  # deletion
8✔
322
                style = f'{monospace_style}{self.css_deltd_style}'
8✔
323
            elif line[0] == ' ':  # context line
8✔
324
                style = monospace_style
8✔
325
            elif line[0] == '@':  # range information
8✔
326
                style = 'font-family:monospace;background-color:#fbfbfb;'
8✔
327
            elif line[0] == '/':  # informational header added by additions_only or deletions_only filters
8!
328
                style = 'background-color:lightyellow;'
8✔
329
            else:
330
                raise RuntimeError('Unified Diff does not comform to standard!')
×
331
            style = f' style="{style}"' if style else ''
8✔
332

333
            if line_num > 1 and line[0] != '@':  # don't apply to headers or range information
8✔
334
                if self.job.is_markdown or line[0] == '/':  # our informational header
8✔
335
                    line = mark_to_html(line[1:], self.job.markdown_padded_tables)
8✔
336
                else:
337
                    line = linkify(line[1:])
8✔
338
            return f'<tr><td{style}>{line}</td></tr>'
8✔
339

340
        table_style = (
8✔
341
            ' style="border-collapse:collapse;font-family:monospace;white-space:pre-wrap;"'
342
            if self.job.monospace
343
            else ' style="border-collapse:collapse;"'
344
        )
345
        yield f'<table{table_style}>'
8✔
346
        monospace_style = 'font-family:monospace;' if self.job.monospace else ''
8✔
347
        for i, line in enumerate(diff.splitlines()):
8✔
348
            yield process_line(line, i, monospace_style)
8✔
349
        yield '</table>'
8✔
350

351
    def differ(
8✔
352
        self,
353
        directives: dict[str, Any],
354
        report_kind: Literal['text', 'markdown', 'html'],
355
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
356
        tz: Optional[str] = None,
357
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
358
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
359
        if report_kind == 'html' and _unfiltered_diff is not None and 'text' in _unfiltered_diff:
8✔
360
            diff_text = _unfiltered_diff['text']
8✔
361
        else:
362
            empty_return: dict[Literal['text', 'markdown', 'html'], str] = {'text': '', 'markdown': '', 'html': ''}
8✔
363
            contextlines = directives.get('context_lines', self.job.contextlines)
8✔
364
            if contextlines is None:
8✔
365
                if self.job.additions_only or self.job.deletions_only:
8✔
366
                    contextlines = 0
8✔
367
                else:
368
                    contextlines = 3
8✔
369
            diff = list(
8✔
370
                difflib.unified_diff(
371
                    str(self.state.old_data).splitlines(),
372
                    str(self.state.new_data).splitlines(),
373
                    '@',
374
                    '@',
375
                    self.make_timestamp(self.state.old_timestamp, tz),
376
                    self.make_timestamp(self.state.new_timestamp, tz),
377
                    contextlines,
378
                    lineterm='',
379
                )
380
            )
381
            if not diff:
8✔
382
                self.state.verb = 'changed,no_report'
8✔
383
                return empty_return
8✔
384
            # replace tabs in header lines
385
            diff[0] = diff[0].replace('\t', ' ')
8✔
386
            diff[1] = diff[1].replace('\t', ' ')
8✔
387

388
            if self.job.additions_only:
8✔
389
                if len(self.state.old_data) and len(self.state.new_data) / len(self.state.old_data) <= 0.25:
8✔
390
                    diff = (
8✔
391
                        diff[:2]
392
                        + ['/**Comparison type: Additions only**']
393
                        + ['/**Deletions are being shown as 75% or more of the content has been deleted**']
394
                        + diff[2:]
395
                    )
396
                else:
397
                    head = '---' + diff[0][3:]
8✔
398
                    diff = [line for line in diff if line.startswith('+') or line.startswith('@')]
8✔
399
                    diff = [
8✔
400
                        line1
401
                        for line1, line2 in zip([''] + diff, diff + [''])
402
                        if not (line1.startswith('@') and line2.startswith('@'))
403
                    ][1:]
404
                    diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
405
                    if len(diff) == 1 or len([line for line in diff if line.lstrip('+').rstrip()]) == 2:
8✔
406
                        self.state.verb = 'changed,no_report'
8✔
407
                        return empty_return
8✔
408
                    diff = [head, diff[0], '/**Comparison type: Additions only**'] + diff[1:]
8✔
409
            elif self.job.deletions_only:
8✔
410
                head = '--- @' + diff[1][3:]
8✔
411
                diff = [line for line in diff if line.startswith('-') or line.startswith('@')]
8✔
412
                diff = [
8✔
413
                    line1
414
                    for line1, line2 in zip([''] + diff, diff + [''])
415
                    if not (line1.startswith('@') and line2.startswith('@'))
416
                ][1:]
417
                diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
418
                if len(diff) == 1 or len([line for line in diff if line.lstrip('-').rstrip()]) == 2:
8✔
419
                    self.state.verb = 'changed,no_report'
8✔
420
                    return empty_return
8✔
421
                diff = [diff[0], head, '/**Comparison type: Deletions only**'] + diff[1:]
8✔
422

423
            # remove range info lines if needed
424
            if directives.get('range_info') is False or (
8✔
425
                directives.get('range_info') is None
426
                and self.job.additions_only
427
                and (len(diff) < 4 or diff[3][0] != '/')
428
            ):
429
                diff = [line for line in diff if not line.startswith('@@ ')]
8✔
430

431
            diff_text = '\n'.join(diff)
8✔
432

433
            out_diff.update(
8✔
434
                {
435
                    'text': diff_text,
436
                    'markdown': diff_text,
437
                }
438
            )
439

440
        if report_kind == 'html':
8✔
441
            out_diff['html'] = '\n'.join(self.unified_diff_to_html(diff_text))
8✔
442

443
        return out_diff
8✔
444

445

446
class TableDiffer(DifferBase):
8✔
447
    """Generates a Python HTML table diff."""
448

449
    __kind__ = 'table'
8✔
450

451
    __supported_directives__ = {
8✔
452
        'tabsize': 'tab stop spacing (default: 8)',
453
    }
454

455
    def differ(
8✔
456
        self,
457
        directives: dict[str, Any],
458
        report_kind: Literal['text', 'markdown', 'html'],
459
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
460
        tz: Optional[str] = None,
461
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
462
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
463
        if report_kind in {'text', 'markdown'} and _unfiltered_diff is not None and 'html' in _unfiltered_diff:
8✔
464
            table = _unfiltered_diff['html']
8✔
465
        else:
466
            tabsize = int(directives.get('tabsize', 8))
8✔
467
            html_diff = difflib.HtmlDiff(tabsize=tabsize)
8✔
468
            table = html_diff.make_table(
8✔
469
                str(self.state.old_data).splitlines(keepends=True),
470
                str(self.state.new_data).splitlines(keepends=True),
471
                self.make_timestamp(self.state.old_timestamp, tz),
472
                self.make_timestamp(self.state.new_timestamp, tz),
473
                True,
474
                3,
475
            )
476
            # fix table formatting
477
            table = table.replace('<th ', '<th style="font-family:monospace" ')
8✔
478
            table = table.replace('<td ', '<td style="font-family:monospace" ')
8✔
479
            table = table.replace(' nowrap="nowrap"', '')
8✔
480
            table = table.replace('<a ', '<a style="font-family:monospace;color:inherit" ')
8✔
481
            table = table.replace('<span class="diff_add"', '<span style="color:green;background-color:lightgreen"')
8✔
482
            table = table.replace('<span class="diff_sub"', '<span style="color:red;background-color:lightred"')
8✔
483
            table = table.replace('<span class="diff_chg"', '<span style="color:orange;background-color:lightyellow"')
8✔
484
            out_diff['html'] = table
8✔
485

486
        if report_kind in {'text', 'markdown'}:
8✔
487
            diff_text = self.html2text(table)
8✔
488
            out_diff.update(
8✔
489
                {
490
                    'text': diff_text,
491
                    'markdown': diff_text,
492
                }
493
            )
494

495
        return out_diff
8✔
496

497

498
class CommandDiffer(DifferBase):
8✔
499
    """Runs an external command to generate the diff."""
500

501
    __kind__ = 'command'
8✔
502

503
    __supported_directives__ = {
8✔
504
        'command': 'The command to execute',
505
    }
506

507
    re_ptags = re.compile(r'^<p>|</p>$')
8✔
508
    re_htags = re.compile(r'<(/?)h\d>')
8✔
509
    re_tagend = re.compile(r'<(?!.*<).*>+$')
8✔
510

511
    def differ(
8✔
512
        self,
513
        directives: dict[str, Any],
514
        report_kind: Literal['text', 'markdown', 'html'],
515
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
516
        tz: Optional[str] = None,
517
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
518
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
519
        command = directives['command']
8✔
520
        if (
8✔
521
            report_kind == 'html'
522
            and not command.startswith('wdiff')
523
            and _unfiltered_diff is not None
524
            and 'text' in _unfiltered_diff
525
        ):
526
            diff = _unfiltered_diff['text']
8✔
527
        else:
528
            old_data = self.state.old_data
8✔
529
            new_data = self.state.new_data
8✔
530
            if self.job.is_markdown:
8✔
531
                # protect the link anchor from being split (won't work)
532
                markdown_links_re = re.compile(r'\[(.*?)][(](.*?)[)]')
8✔
533
                old_data = markdown_links_re.sub(
8!
534
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(old_data)
535
                )
536
                new_data = markdown_links_re.sub(
8!
537
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(new_data)
538
                )
539

540
            # External diff tool
541
            with tempfile.TemporaryDirectory() as tmp_dir:
8✔
542
                tmp_path = Path(tmp_dir)
8✔
543
                old_file_path = tmp_path.joinpath('old_file')
8✔
544
                new_file_path = tmp_path.joinpath('new_file')
8✔
545
                if isinstance(old_data, str):
8!
546
                    old_file_path.write_text(old_data)
8✔
547
                else:
548
                    old_file_path.write_bytes(old_data)
×
549
                if isinstance(new_data, str):
8!
550
                    new_file_path.write_text(new_data)
8✔
551
                else:
552
                    new_file_path.write_bytes(new_data)
×
553
                cmdline = shlex.split(command) + [str(old_file_path), str(new_file_path)]
8✔
554
                proc = subprocess.run(cmdline, capture_output=True, text=True)  # noqa: S603 subprocess call
8✔
555
            if proc.stderr or proc.returncode > 1:
8✔
556
                raise RuntimeError(
8✔
557
                    f"Job {self.job.index_number}: External differ '{directives}' returned '{proc.stderr.strip()}' "
558
                    f'({self.job.get_location()})'
559
                ) from subprocess.CalledProcessError(proc.returncode, cmdline)
560
            if proc.returncode == 0:
8✔
561
                self.state.verb = 'changed,no_report'
8✔
562
                return {'text': '', 'markdown': '', 'html': ''}
8✔
563
            head = '\n'.join(
8✔
564
                [
565
                    f'Using differ "{directives}"',
566
                    f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}',
567
                    f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}',
568
                    '—' * 37,
569
                ]
570
            )
571
            diff = proc.stdout
8✔
572
            if self.job.is_markdown:
8!
573
                # undo the protection of the link anchor from being split
574
                diff = markdown_links_re.sub(lambda x: f'[{urllib.parse.unquote(x.group(1))}]({x.group(2)})', diff)
8!
575
            if command.startswith('wdiff') and self.job.contextlines == 0:
8!
576
                # remove lines that don't have any changes
577
                keeplines = []
×
578
                for line in diff.splitlines(keepends=True):
×
579
                    if any(x in line for x in {'{+', '+}', '[-', '-]'}):
×
580
                        keeplines.append(line)
×
581
                diff = ''.join(keeplines)
×
582
            diff = head + diff
8✔
583
            out_diff.update(
8✔
584
                {
585
                    'text': diff,
586
                    'markdown': diff,
587
                }
588
            )
589

590
        if report_kind == 'html':
8✔
591
            if command.startswith('wdiff'):
8!
592
                # colorize output of wdiff
593
                out_diff['html'] = self.wdiff_to_html(diff)
×
594
            else:
595
                out_diff['html'] = html.escape(diff)
8✔
596

597
        return out_diff
8✔
598

599
    def wdiff_to_html(self, diff: str) -> str:
8✔
600
        """
601
        Colorize output of wdiff.
602

603
        :param diff: The output of the wdiff command.
604
        :returns: The colorized HTML output.
605
        """
606
        html_diff = html.escape(diff)
8✔
607
        if self.job.is_markdown:
8✔
608
            # detect and fix multiline additions or deletions
609
            is_add = False
8✔
610
            is_del = False
8✔
611
            new_diff = []
8✔
612
            for line in html_diff.splitlines():
8✔
613
                if is_add:
8✔
614
                    line = '{+' + line
8✔
615
                    is_add = False
8✔
616
                elif is_del:
8✔
617
                    line = '[-' + line
8✔
618
                    is_del = False
8✔
619
                for match in re.findall(r'\[-|-]|{\+|\+}', line):
8✔
620
                    if match == '[-':
8✔
621
                        is_del = True
8✔
622
                    if match == '-]':
8✔
623
                        is_del = False
8✔
624
                    if match == '{+':
8✔
625
                        is_add = True
8✔
626
                    if match == '+}':
8✔
627
                        is_add = False
8✔
628
                if is_add:
8✔
629
                    line += '+}'
8✔
630
                elif is_del:
8✔
631
                    line += '-]'
8✔
632
                new_diff.append(line)
8✔
633
            html_diff = '<br>\n'.join(new_diff)
8✔
634

635
        # wdiff colorization (cannot be done with global CSS class as Gmail overrides it)
636
        html_diff = re.sub(
8✔
637
            r'\{\+(.*?)\+}',
638
            lambda x: f'<span style="{self.css_added_style}">{x.group(1)}</span>',
639
            html_diff,
640
            flags=re.DOTALL,
641
        )
642
        html_diff = re.sub(
8✔
643
            r'\[-(.*?)-]',
644
            lambda x: f'<span style="{self.css_deltd_style}">{x.group(1)}</span>',
645
            html_diff,
646
            flags=re.DOTALL,
647
        )
648
        if self.job.monospace:
8✔
649
            return f'<span style="font-family:monospace;white-space:pre-wrap">{html_diff}</span>'
8✔
650
        else:
651
            return html_diff
8✔
652

653

654
class DeepdiffDiffer(DifferBase):
8✔
655

656
    __kind__ = 'deepdiff'
8✔
657

658
    __supported_directives__ = {
8✔
659
        'data_type': "either 'json' (default) or 'xml'",
660
        'ignore_order': 'Whether to ignore the order in which the items have appeared (default: false)',
661
        'ignore_string_case': 'Whether to be case-sensitive or not when comparing strings (default: false)',
662
        'significant_digits': (
663
            'The number of digits AFTER the decimal point to be used in the comparison (default: ' 'no limit)'
664
        ),
665
    }
666

667
    def differ(
8✔
668
        self,
669
        directives: dict[str, Any],
670
        report_kind: Literal['text', 'markdown', 'html'],
671
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
672
        tz: Optional[str] = None,
673
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
674
        if isinstance(DeepDiff, str):  # pragma: no cover
675
            self.raise_import_error('deepdiff', DeepDiff)
676

677
        span_added = f'<span style="{self.css_added_style}">'
8✔
678
        span_deltd = f'<span style="{self.css_deltd_style}">'
8✔
679

680
        def _pretty_deepdiff(ddiff: DeepDiff, report_kind: Literal['text', 'markdown', 'html']) -> str:
8✔
681
            """
682
            Customized version of deepdiff.serialization.SerializationMixin.pretty method, edited to include the
683
            values deleted or added and an option for colorized HTML output. The pretty human-readable string
684
            output for the diff object regardless of what view was used to generate the diff.
685
            """
686
            if report_kind == 'html':
8✔
687
                PRETTY_FORM_TEXTS = {
8✔
688
                    'type_changes': (
689
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
690
                        f'from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}</span>.'
691
                    ),
692
                    'values_changed': (
693
                        f'Value of {{diff_path}} changed from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}'
694
                        '</span>.'
695
                    ),
696
                    'dictionary_item_added': (
697
                        f'Item {{diff_path}} added to dictionary as {span_added}{{val_t2}}</span>.'
698
                    ),
699
                    'dictionary_item_removed': (
700
                        f'Item {{diff_path}} removed from dictionary (was {span_deltd}{{val_t1}}</span>).'
701
                    ),
702
                    'iterable_item_added': f'Item {{diff_path}} added to iterable as {span_added}{{val_t2}}</span>.',
703
                    'iterable_item_removed': (
704
                        f'Item {{diff_path}} removed from iterable (was {span_deltd}{{val_t1}}</span>).'
705
                    ),
706
                    'attribute_added': f'Attribute {{diff_path}} added as {span_added}{{val_t2}}</span>.',
707
                    'attribute_removed': f'Attribute {{diff_path}} removed (was {span_deltd}{{val_t1}}</span>).',
708
                    'set_item_added': f'Item root[{{val_t2}}] added to set as {span_added}{{val_t1}}</span>.',
709
                    'set_item_removed': (
710
                        f'Item root[{{val_t1}}] removed from set (was {span_deltd}{{val_t2}}</span>).'
711
                    ),
712
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
713
                }
714
            else:
715
                PRETTY_FORM_TEXTS = {
8✔
716
                    'type_changes': (
717
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
718
                        'from {val_t1} to {val_t2}.'
719
                    ),
720
                    'values_changed': 'Value of {diff_path} changed from {val_t1} to {val_t2}.',
721
                    'dictionary_item_added': 'Item {diff_path} added to dictionary as {val_t2}.',
722
                    'dictionary_item_removed': 'Item {diff_path} removed from dictionary (was {val_t1}).',
723
                    'iterable_item_added': 'Item {diff_path} added to iterable as {val_t2}.',
724
                    'iterable_item_removed': 'Item {diff_path} removed from iterable (was {val_t1}).',
725
                    'attribute_added': 'Attribute {diff_path} added as {val_t2}.',
726
                    'attribute_removed': 'Attribute {diff_path} removed (was {val_t1}).',
727
                    'set_item_added': 'Item root[{val_t2}] added to set as {val_t1}.',
728
                    'set_item_removed': 'Item root[{val_t1}] removed from set (was {val_t2}).',
729
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
730
                }
731

732
            def _pretty_print_diff(ddiff: DiffLevel) -> str:
8✔
733
                """
734
                Customized version of deepdiff.serialization.pretty_print_diff() function, edited to include the
735
                values deleted or added.
736
                """
737
                type_t1 = type(ddiff.t1).__name__
8✔
738
                type_t2 = type(ddiff.t2).__name__
8✔
739

740
                val_t1 = (
8✔
741
                    f'"{ddiff.t1}"'
742
                    if type_t1 in {'str', 'int', 'float'}
743
                    else (jsonlib.dumps(ddiff.t1, ensure_ascii=False, indent=2) if type_t1 == 'dict' else str(ddiff.t1))
744
                )
745
                val_t2 = (
8✔
746
                    f'"{ddiff.t2}"'
747
                    if type_t2 in {'str', 'int', 'float'}
748
                    else (jsonlib.dumps(ddiff.t2, ensure_ascii=False, indent=2) if type_t2 == 'dict' else str(ddiff.t2))
749
                )
750

751
                diff_path = ddiff.path(root='')
8✔
752
                return '• ' + PRETTY_FORM_TEXTS.get(ddiff.report_type, '').format(
8✔
753
                    diff_path=diff_path,
754
                    type_t1=type_t1,
755
                    type_t2=type_t2,
756
                    val_t1=val_t1,
757
                    val_t2=val_t2,
758
                )
759

760
            result = []
8✔
761
            for key in ddiff.tree.keys():
8✔
762
                for item_key in ddiff.tree[key]:
8✔
763
                    result.append(_pretty_print_diff(item_key))
8✔
764

765
            return '\n'.join(result)
8✔
766

767
        data_type = directives.get('data_type', 'json')
8✔
768
        old_data = ''
8✔
769
        new_data = ''
8✔
770
        if data_type == 'json':
8✔
771
            try:
8✔
772
                old_data = jsonlib.loads(self.state.old_data)
8✔
773
            except jsonlib.JSONDecodeError:
8✔
774
                old_data = ''
8✔
775
            try:
8✔
776
                new_data = jsonlib.loads(self.state.new_data)
8✔
777
            except jsonlib.JSONDecodeError as e:
8✔
778
                self.state.exception = e
8✔
779
                self.state.traceback = self.job.format_error(e, traceback.format_exc())
8✔
780
                logger.error(f'{self.job.index_number}: Invalid JSON data: {e.msg} ({self.job.get_location()})')
8✔
781
                return {
8✔
782
                    'text': f'Differ {self.__kind__} ERROR: New data is invalid JSON\n{e.msg}',
783
                    'markdown': f'Differ {self.__kind__} **ERROR: New data is invalid JSON**\n{e.msg}',
784
                    'html': f'Differ {self.__kind__} <b>ERROR: New data is invalid JSON</b>\n{e.msg}',
785
                }
786
        elif data_type == 'xml':
8✔
787
            if isinstance(xmltodict, str):  # pragma: no cover
788
                self.raise_import_error('xmltodict', xmltodict)
789

790
            old_data = xmltodict.parse(self.state.old_data)
8✔
791
            new_data = xmltodict.parse(self.state.new_data)
8✔
792

793
        ignore_order = directives.get('ignore_order')
8✔
794
        ignore_string_case = directives.get('ignore_string_case')
8✔
795
        significant_digits = directives.get('significant_digits')
8✔
796
        ddiff = DeepDiff(
8✔
797
            old_data,
798
            new_data,
799
            cache_size=500,
800
            cache_purge_level=0,
801
            cache_tuning_sample_size=500,
802
            ignore_order=ignore_order,
803
            ignore_string_type_changes=True,
804
            ignore_numeric_type_changes=True,
805
            ignore_string_case=ignore_string_case,
806
            significant_digits=significant_digits,
807
            verbose_level=min(2, max(0, math.ceil(3 - logger.getEffectiveLevel() / 10))),
808
        )
809
        diff_text = _pretty_deepdiff(ddiff, report_kind)
8✔
810
        if not diff_text:
8✔
811
            self.state.verb = 'changed,no_report'
8✔
812
            return {'text': '', 'markdown': '', 'html': ''}
8✔
813

814
        self.job.set_to_monospace()
8✔
815

816
        if report_kind == 'html':
8✔
817
            html_diff = (
8✔
818
                f'<span style="font-family:monospace;white-space:pre-wrap;">\n'
819
                f'Differ: {self.__kind__} for {data_type}\n'
820
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>\n'
821
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>\n'
822
                + '—' * 37
823
                + '\n'
824
                + diff_text[:-1]
825
                + '</span>'
826
            )
827
            return {'html': html_diff}
8✔
828
        else:
829
            text_diff = (
8✔
830
                f'Differ: {self.__kind__} for {data_type}\n'
831
                f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\n'
832
                f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\n' + '—' * 37 + '\n' + diff_text
833
            )
834
            return {'text': text_diff, 'markdown': text_diff}
8✔
835

836

837
class ImageDiffer(DifferBase):
8✔
838
    """Compares two images providing an image outlining areas that have changed."""
839

840
    __kind__ = 'image'
8✔
841

842
    __supported_directives__ = {
8✔
843
        'data_type': (
844
            "'url' (to retrieve an image), 'ascii85' (Ascii85 data), 'base64' (Base64 data) or 'filename' (the path "
845
            "to an image file) (default: 'url')"
846
        ),
847
        'mse_threshold': (
848
            'the minimum mean squared error (MSE) between two images to consider them changed if numpy in installed '
849
            '(default: 2.5)'
850
        ),
851
    }
852

853
    def differ(
8✔
854
        self,
855
        directives: dict[str, Any],
856
        report_kind: Literal['text', 'markdown', 'html'],
857
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
858
        tz: Optional[str] = None,
859
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
860
        warnings.warn(
2✔
861
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
862
            f'change in the future. Please report any problems or suggestions at '
863
            f'https://github.com/mborsetti/webchanges/discussions.',
864
            RuntimeWarning,
865
        )
866
        if isinstance(Image, str):  # pragma: no cover
867
            self.raise_import_error('pillow', Image)
868
        if isinstance(httpx, str):  # pragma: no cover
869
            self.raise_import_error('httpx', httpx)
870

871
        def load_image_from_web(url: str) -> Image:
2✔
872
            """Fetches the image from an url."""
873
            logging.debug(f'Retrieving image from {url}')
2✔
874
            with httpx.stream('GET', url, timeout=10) as response:
2✔
875
                response.raise_for_status()
2✔
876
                return Image.open(BytesIO(b''.join(response.iter_bytes())))
2✔
877

878
        def load_image_from_file(filename: str) -> Image:
2✔
879
            """Load an image from a file."""
880
            logging.debug(f'Reading image from {filename}')
2✔
881
            return Image.open(filename)
2✔
882

883
        def load_image_from_base64(base_64: str) -> Image:
2✔
884
            """Load an image from an encoded bytes object."""
885
            logging.debug('Retrieving image from a base64 string')
2✔
886
            return Image.open(BytesIO(base64.b64decode(base_64)))
2✔
887

888
        def load_image_from_ascii85(ascii85: str) -> Image:
2✔
889
            """Load an image from an encoded bytes object."""
890
            logging.debug('Retrieving image from an ascii85 string')
2✔
891
            return Image.open(BytesIO(base64.a85decode(ascii85)))
2✔
892

893
        def compute_diff_image(img1: Image, img2: Image) -> tuple[Image, Optional[np.float64]]:
2✔
894
            """Compute the difference between two images."""
895
            # Compute the absolute value of the pixel-by-pixel difference between the two images.
896
            diff_image = ImageChops.difference(img1, img2)
2✔
897

898
            # Compute the mean squared error between the images
899
            if not isinstance(np, str):
2✔
900
                diff_array = np.array(diff_image)
2✔
901
                mse_value = np.mean(np.square(diff_array))
2✔
902
            else:  # pragma: no cover
903
                mse_value = None
904

905
            # Create the diff image by overlaying this difference on a darkened greyscale background
906
            back_image = img1.convert('L')
2✔
907
            back_image_brightness = ImageStat.Stat(back_image).rms[0]
2✔
908
            back_image = ImageEnhance.Brightness(back_image).enhance(back_image_brightness / 225)
2✔
909

910
            # Convert the 'L' image to 'RGB' using a matrix that applies to yellow tint
911
            # The matrix has 12 elements: 4 for Red, 4 for Green, and 4 for Blue.
912
            # For yellow, we want Red and Green to copy the L values (1.0) and Blue to be zero.
913
            # The matrix is: [R, G, B, A] for each of the three output channels
914
            yellow_tint_matrix = (
2✔
915
                1.0,
916
                0.0,
917
                0.0,
918
                0.0,  # Red = 100% of the grayscale value
919
                1.0,
920
                0.0,
921
                0.0,
922
                0.0,  # Green = 100% of the grayscale value
923
                0.0,
924
                0.0,
925
                0.0,
926
                0.0,  # Blue = 0% of the grayscale value
927
            )
928

929
            # Apply the conversion
930
            diff_colored = diff_image.convert('RGB').convert('RGB', matrix=yellow_tint_matrix)
2✔
931

932
            final_img = ImageChops.add(back_image.convert('RGB'), diff_colored)
2✔
933

934
            return final_img, mse_value
2✔
935

936
        data_type = directives.get('data_type', 'url')
2✔
937
        mse_threshold = directives.get('mse_threshold', 2.5)
2✔
938
        if not isinstance(self.state.old_data, str):
2!
939
            raise ValueError('old_data is not a string')
×
940
        if not isinstance(self.state.new_data, str):
2!
941
            raise ValueError('new_data is not a string')
×
942
        if data_type == 'url':
2✔
943
            old_image = load_image_from_web(self.state.old_data)
2✔
944
            new_image = load_image_from_web(self.state.new_data)
2✔
945
            old_data = f' (<a href="{self.state.old_data}">Old image</a>)'
2✔
946
            new_data = f' (<a href="{self.state.new_data}">New image</a>)'
2✔
947
        elif data_type == 'ascii85':
2✔
948
            old_image = load_image_from_ascii85(self.state.old_data)
2✔
949
            new_image = load_image_from_ascii85(self.state.new_data)
2✔
950
            old_data = ''
2✔
951
            new_data = ''
2✔
952
        elif data_type == 'base64':
2✔
953
            old_image = load_image_from_base64(self.state.old_data)
2✔
954
            new_image = load_image_from_base64(self.state.new_data)
2✔
955
            old_data = ''
2✔
956
            new_data = ''
2✔
957
        else:  # 'filename'
958
            old_image = load_image_from_file(self.state.old_data)
2✔
959
            new_image = load_image_from_file(self.state.new_data)
2✔
960
            old_data = f' (<a href="file://{self.state.old_data}">Old image</a>)'
2✔
961
            new_data = f' (<a href="file://{self.state.new_data}">New image</a>)'
2✔
962

963
        # Check formats  TODO: is it needed? under which circumstances?
964
        # if new_image.format != old_image.format:
965
        #     logger.info(f'Image formats do not match: {old_image.format} vs {new_image.format}')
966
        # else:
967
        #     logger.debug(f'image format is {old_image.format}')
968

969
        # If needed, shrink the larger image
970
        if new_image.size != old_image.size:
2✔
971
            if new_image.size > old_image.size:
2✔
972
                logging.debug(f'Job {self.job.index_number}: Shrinking the new image')
2✔
973
                img_format = new_image.format
2✔
974
                new_image = new_image.resize(old_image.size, Image.LANCZOS)
2✔
975
                new_image.format = img_format
2✔
976

977
            else:
978
                logging.debug(f'Job {self.job.index_number}: Shrinking the old image')
2✔
979
                img_format = old_image.format
2✔
980
                old_image = old_image.resize(new_image.size, Image.LANCZOS)
2✔
981
                old_image.format = img_format
2✔
982

983
        if old_image == new_image:
2✔
984
            logger.info(f'Job {self.job.index_number}: New image is identical to the old one')
2✔
985
            self.state.verb = 'unchanged'
2✔
986
            return {'text': '', 'markdown': '', 'html': ''}
2✔
987

988
        diff_image, mse_value = compute_diff_image(old_image, new_image)
2✔
989
        if mse_value:
2!
990
            logger.debug(f'Job {self.job.index_number}: MSE value {mse_value:.2f}')
2✔
991

992
        if mse_value and mse_value < mse_threshold:
2✔
993
            logger.info(
2✔
994
                f'Job {self.job.index_number}: MSE value {mse_value:.2f} below the threshold of {mse_threshold}; '
995
                f'considering changes not worthy of a report'
996
            )
997
            self.state.verb = 'changed,no_report'
2✔
998
            return {'text': '', 'markdown': '', 'html': ''}
2✔
999

1000
        # Convert the difference image to a base64 object
1001
        output_stream = BytesIO()
2✔
1002
        diff_image.save(output_stream, format=new_image.format)
2✔
1003
        encoded_diff = b64encode(output_stream.getvalue()).decode()
2✔
1004

1005
        # Convert the new image to a base64 object
1006
        output_stream = BytesIO()
2✔
1007
        new_image.save(output_stream, format=new_image.format)
2✔
1008
        encoded_new = b64encode(output_stream.getvalue()).decode()
2✔
1009

1010
        # Prepare HTML output
1011
        new_timestamp = self.make_timestamp(self.state.new_timestamp, tz)
2✔
1012
        old_timestamp = self.make_timestamp(self.state.old_timestamp, tz)
2✔
1013
        htm = [
2✔
1014
            f'<span style="font-family:monospace">Differ: {self.__kind__} for {data_type}',
1015
            f'<span style="color:darkred;">--- @ {old_timestamp}{old_data}</span>',
1016
            f'<span style="color:darkgreen;">+++ @ {new_timestamp}{new_data}' f'</span>',
1017
            ('—' * 37 + '</span>'),
1018
            'New image:',
1019
            f'<img src="data:image/{new_image.format.lower()};base64,{encoded_new}">',
1020
            'Differences from old (in yellow):',
1021
            f'<img src="data:image/{old_image.format.lower()};base64,{encoded_diff}">',
1022
            '',
1023
        ]
1024

1025
        return {
2✔
1026
            'text': 'The image has changed; please see an HTML report for the visualization.',
1027
            'markdown': 'The image has changed; please see an HTML report for the visualization.',
1028
            'html': '<br>\n'.join(htm),
1029
        }
1030

1031

1032
class AIGoogleDiffer(DifferBase):
8✔
1033
    """(Default) Generates a summary using Google Generative AI (Gemini models).
1034

1035
    Calls Google Gemini APIs; documentation at https://ai.google.dev/api/rest and tutorial at
1036
    https://ai.google.dev/tutorials/rest_quickstart
1037

1038
    """
1039

1040
    __kind__ = 'ai_google'
8✔
1041

1042
    __supported_directives__ = {
8✔
1043
        'model': 'model name from https://ai.google.dev/gemini-api/docs/models/gemini (default: gemini-1.5-flash)',
1044
        'prompt': 'a custom prompt - {unified_diff}, {old_data} and {new_data} will be replaced; ask for markdown',
1045
        'prompt_ud_context_lines': 'the number of context lines for {unified_diff} (default: 9999)',
1046
        'timeout': 'the number of seconds before timing out the API call (default: 300)',
1047
        'max_output_tokens': "the maximum number of tokens returned by the model (default: None, i.e. model's default)",
1048
        'temperature': "the model's Temperature parameter (default: 0.0)",
1049
        'top_p': "the model's TopP parameter (default: None, i.e. model's default",
1050
        'top_k': "the model's TopK parameter (default: None, i.e. model's default",
1051
        'token_limit': (
1052
            "the maximum number of tokens, if different from model's default (default: None, i.e. model's default)"
1053
        ),
1054
    }
1055
    __default_subdirective__ = 'model'
8✔
1056

1057
    def differ(
8✔
1058
        self,
1059
        directives: dict[str, Any],
1060
        report_kind: Literal['text', 'markdown', 'html'],
1061
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1062
        tz: str | None = None,
1063
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1064
        logger.info(f'Job {self.job.index_number}: Running the {self.__kind__} differ from hooks.py')
8✔
1065
        warnings.warn(
8✔
1066
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1067
            f'change in the future. Please report any problems or suggestions at '
1068
            f'https://github.com/mborsetti/webchanges/discussions.',
1069
            RuntimeWarning,
1070
        )
1071

1072
        def get_ai_summary(prompt: str) -> str:
8✔
1073
            """Generate AI summary from unified diff, or an error message"""
1074
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
8✔
1075
            if len(GOOGLE_AI_API_KEY) != 39:
8✔
1076
                logger.error(
8✔
1077
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1078
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1079
                )
1080
                return (
8✔
1081
                    f'## ERROR in summarizing the changes using {self.__kind__}:\n'
1082
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1083
                    f'{len(GOOGLE_AI_API_KEY)}.\n'
1084
                )
1085

1086
            api_version = '1beta'
8✔
1087
            _models_token_limits = {  # from https://ai.google.dev/gemini-api/docs/models/gemini
8✔
1088
                'gemini-1.5': 1048576,
1089
                '  gemini-1.0': 30720,
1090
                'gemini-pro': 30720,  # legacy
1091
            }
1092

1093
            if 'model' not in directives:
8!
NEW
1094
                directives['model'] = 'gemini-1.5-flash'  # also for footer
×
1095
            model = directives['model']
8✔
1096
            token_limit = directives.get('token_limit')
8✔
1097
            if not token_limit:
8✔
1098
                for _model, _token_limit in _models_token_limits.items():
8!
1099
                    if model.startswith(_model):
8✔
1100
                        token_limit = _token_limit
8✔
1101
                        break
8✔
1102
                if not token_limit:
8!
UNCOV
1103
                    logger.error(
×
1104
                        f"Job {self.job.index_number}: Differ '{self.__kind__}' does not know `model: {model}` "
1105
                        f"(supported models starting with: {', '.join(sorted(list(_models_token_limits.keys())))}) "
1106
                        f'({self.job.get_location()})'
1107
                    )
1108
                    return f'## ERROR in summarizing the changes using {self.__kind__}:\n' f'Unknown model {model}.\n'
×
1109

1110
            if '{unified_diff}' in prompt:
8!
1111
                context_lines = directives.get('prompt_ud_context_lines', 9999)
8✔
1112
                unified_diff = '\n'.join(
8✔
1113
                    difflib.unified_diff(
1114
                        str(self.state.old_data).splitlines(),
1115
                        str(self.state.new_data).splitlines(),
1116
                        # '@',
1117
                        # '@',
1118
                        # self.make_timestamp(self.state.old_timestamp, tz),
1119
                        # self.make_timestamp(self.state.new_timestamp, tz),
1120
                        n=context_lines,
1121
                    )
1122
                )
1123
                if not unified_diff:
8✔
1124
                    # no changes
1125
                    return ''
8✔
1126
            else:
1127
                unified_diff = ''
×
1128

1129
            def _send_to_model(model_prompt: str) -> str:
8✔
1130
                """Creates the summary request to the model"""
1131
                max_output_tokens = directives.get('max_output_tokens')
×
1132
                temperature = directives.get('temperature', 0.0)
×
1133
                top_p = directives.get('top_p')
×
1134
                top_k = directives.get('top_k')
×
1135
                data = {
×
1136
                    'system_instruction': {'parts': [{'text': 'Respond in Markdown'}]},
1137
                    'contents': [{'parts': [{'text': model_prompt}]}],
1138
                    'generation_config': {
1139
                        'max_output_tokens': max_output_tokens,
1140
                        'temperature': temperature,
1141
                        'top_p': top_p,
1142
                        'top_k': top_k,
1143
                    },
1144
                }
1145
                logger.info(f'Job {self.job.index_number}: Making summary request to Google model {model}')
×
1146
                try:
×
1147
                    timeout = directives.get('timeout', 300)
×
1148
                    r = httpx.Client(http2=True).post(
×
1149
                        f'https://generativelanguage.googleapis.com/v{api_version}/models/{model}:generateContent?'
1150
                        f'key={GOOGLE_AI_API_KEY}',
1151
                        json=data,
1152
                        headers={'Content-Type': 'application/json'},
1153
                        timeout=timeout,
1154
                    )
1155
                    if r.is_success:
×
1156
                        result = r.json()
×
1157
                        candidate = result['candidates'][0]
×
1158
                        logger.info(
×
1159
                            f"Job {self.job.index_number}: AI generation finished by {candidate['finishReason']}"
1160
                        )
1161
                        summary = candidate['content']['parts'][0]['text']
×
1162
                    elif r.status_code == 400:
×
1163
                        summary = (
×
1164
                            f'AI summary unavailable: Received error from {r.url.host}: '
1165
                            f"{r.json().get('error', {}).get('message') or ''}"
1166
                        )
1167
                    else:
1168
                        summary = (
×
1169
                            f'AI summary unavailable: Received error {r.status_code} {r.reason_phrase} from '
1170
                            f'{r.url.host}'
1171
                        )
1172
                        if r.content:
×
1173
                            summary += f": {r.json().get('error', {}).get('message') or ''}"
×
1174

1175
                except httpx.HTTPError as e:
×
1176
                    summary = (
×
1177
                        f'AI summary unavailable: HTTP client error: {e.args[0]} when requesting data from '
1178
                        f'{e.request.url.host}'
1179
                    )
1180

1181
                return summary
×
1182

1183
            model_prompt = prompt.format(
8✔
1184
                unified_diff=unified_diff, old_data=self.state.old_data, new_data=self.state.new_data
1185
            )
1186

1187
            if len(model_prompt) / 4 < token_limit:
8!
1188
                summary = _send_to_model(model_prompt)
×
1189
            elif '{unified_diff}' in prompt:
8!
1190
                logger.info(
8✔
1191
                    f'Job {self.job.index_number}: Model prompt with full diff is too long: '
1192
                    f'({len(model_prompt) / 4:,.0f} est. tokens exceeds limit of {token_limit:,.0f} tokens); '
1193
                    f'recomputing with default contextlines'
1194
                )
1195
                unified_diff = '\n'.join(
8✔
1196
                    difflib.unified_diff(
1197
                        str(self.state.old_data).splitlines(),
1198
                        str(self.state.new_data).splitlines(),
1199
                        # '@',
1200
                        # '@',
1201
                        # self.make_timestamp(self.state.old_timestamp, tz),
1202
                        # self.make_timestamp(self.state.new_timestamp, tz),
1203
                    )
1204
                )
1205
                model_prompt = prompt.format(
8✔
1206
                    unified_diff=unified_diff, old_data=self.state.old_data, new_data=self.state.new_data
1207
                )
1208
                if len(model_prompt) / 4 < token_limit:
8!
1209
                    summary = _send_to_model(model_prompt)
×
1210
                else:
1211
                    summary = (
8✔
1212
                        f'AI summary unavailable (model prompt with unified diff is too long: '
1213
                        f'{len(model_prompt) / 4:,.0f} est. tokens exceeds maximum of {token_limit:,.0f})'
1214
                    )
1215
            else:
1216
                logger.info(
×
1217
                    f'The model prompt may be too long: {len(model_prompt) / 4:,.0f} est. tokens exceeds '
1218
                    f'limit of {token_limit:,.0f} tokens'
1219
                )
1220
                summary = _send_to_model(model_prompt)
×
1221
            return summary
8✔
1222

1223
        prompt = directives.get(
8✔
1224
            'prompt',
1225
            'Analyze this unified diff and create a summary listing only the changes:\n\n{unified_diff}',
1226
        )
1227
        summary = get_ai_summary(prompt)
8✔
1228
        if not summary:
8✔
1229
            self.state.verb = 'changed,no_report'
8✔
1230
            return {'text': '', 'markdown': '', 'html': ''}
8✔
1231
        directives_text = ', '.join(f'{key}={value}' for key, value in directives.items()) or 'None'
8✔
1232
        footer = f'Summary generated by Google Generative AI (differ directive(s): {directives_text})'
8✔
1233
        temp_unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
1234
        for rep_kind in ['text', 'html']:  # markdown is same as text
8✔
1235
            unified_report = DifferBase.process(
8✔
1236
                'unified',
1237
                directives.get('unified') or {},  # type: ignore[arg-type]
1238
                self.state,
1239
                rep_kind,  # type: ignore[arg-type]
1240
                tz,
1241
                temp_unfiltered_diff,
1242
            )
1243
        return {
8✔
1244
            'text': summary + '\n\n' + unified_report['text'] + '\n------------\n' + footer,
1245
            'markdown': summary + '\n\n' + unified_report['markdown'] + '\n* * *\n' + footer,
1246
            'html': (
1247
                mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>')
1248
                + '<br>'
1249
                + unified_report['html']
1250
                + '-----<br>'
1251
                + f'<i><small>{footer}</small></i>'
1252
            ),
1253
        }
1254

1255

1256
class WdiffDiffer(DifferBase):
8✔
1257
    __kind__ = 'wdiff'
8✔
1258

1259
    __supported_directives__: dict[str, str] = {
8✔
1260
        'context_lines': 'the number of context lines (default: 3)',
1261
        'range_info': 'include range information lines (default: true)',
1262
    }
1263

1264
    def differ(
8✔
1265
        self,
1266
        directives: dict[str, Any],
1267
        report_kind: Literal['text', 'markdown', 'html'],
1268
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
1269
        tz: Optional[str] = None,
1270
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1271
        warnings.warn(
8✔
1272
            f'Job {self.job.index_number}: Differ {self.__kind__} is WORK IN PROGRESS and has KNOWN bugs which '
1273
            "are being worked on. DO NOT USE AS THE RESULTS WON'T BE CORRECT.",
1274
            RuntimeWarning,
1275
        )
1276
        if not isinstance(self.state.old_data, str):
8!
1277
            raise ValueError
×
1278
        if not isinstance(self.state.new_data, str):
8!
1279
            raise ValueError
×
1280

1281
        # Split the texts into words tokenizing newline
1282
        if self.job.is_markdown:
8!
1283
            # Don't split spaces in link text, tokenize space as </s>
1284
            old_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.old_data)
×
1285
            words1 = old_data.replace('\n', ' <\\n> ').split(' ')
×
1286
            new_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.new_data)
×
1287
            words2 = new_data.replace('\n', ' <\\n> ').split(' ')
×
1288
        else:
1289
            words1 = self.state.old_data.replace('\n', ' <\\n> ').split(' ')
8✔
1290
            words2 = self.state.new_data.replace('\n', ' <\\n> ').split(' ')
8✔
1291

1292
        # Create a Differ object
1293
        import difflib
8✔
1294

1295
        d = difflib.Differ()
8✔
1296

1297
        # Generate a difference list
1298
        diff = list(d.compare(words1, words2))
8✔
1299

1300
        add_html = '<span style="background-color:#d1ffd1;color:#082b08;">'
8✔
1301
        rem_html = '<span style="background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;">'
8✔
1302

1303
        head_text = (
8✔
1304
            'Differ: wdiff\n'
1305
            f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m\n'
1306
            f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m\n' + '—' * 37 + '\n'
1307
        )
1308
        head_html = '<br>\n'.join(
8✔
1309
            [
1310
                '<span style="font-family:monospace;">Differ: wdiff',
1311
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
1312
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>',
1313
                '—' * 37 + '</span>',
1314
                '',
1315
            ]
1316
        )
1317
        # Process the diff output to make it more wdiff-like
1318
        result = []
8✔
1319
        result_html = []
8✔
1320
        prev_word_text = ''
8✔
1321
        prev_word_html = ''
8✔
1322
        next_text = ''
8✔
1323
        next_html = ''
8✔
1324
        add = False
8✔
1325
        rem = False
8✔
1326
        for word_text in diff:
8✔
1327
            if len(word_text) < 3:
8!
1328
                continue
×
1329
            if word_text[0] == '?':
8!
1330
                continue
×
1331
            word_html = word_text
8✔
1332
            pre_text = [next_text] if next_text else []
8✔
1333
            pre_html = [next_html] if next_html else []
8✔
1334
            next_text = ''
8✔
1335
            next_html = ''
8✔
1336

1337
            if word_text[0] == '+' and not add:  # Beginning of additions
8✔
1338
                if rem:
8!
1339
                    prev_word_html += '</span>'
8✔
1340
                    rem = False
8✔
1341
                if word_text[2:] == '<\\n>':
8!
1342
                    next_text = '\033[92m'
×
1343
                    next_html = add_html
×
1344
                else:
1345
                    pre_text.append('\033[92m')
8✔
1346
                    pre_html.append(add_html)
8✔
1347
                add = True
8✔
1348
            elif word_text[0] == '-' and not rem:  # Beginning of deletions
8!
1349
                if add:
8!
1350
                    prev_word_html += '</span>'
×
1351
                    add = False
×
1352
                if word_text[2:] == '<\\n>':
8!
1353
                    next_text = '\033[91m'
×
1354
                    next_html = rem_html
×
1355
                else:
1356
                    pre_text.append('\033[91m')
8✔
1357
                    pre_html.append(rem_html)
8✔
1358
                rem = True
8✔
1359
            elif word_text[0] == ' ' and (add or rem):  # Unchanged word
×
1360
                if prev_word_text == '<\\n>':
×
1361
                    prev_word_text = '\033[0m<\\n>'
×
1362
                    prev_word_html = '</span><\\n>'
×
1363
                else:
1364
                    prev_word_text += '\033[0m'
×
1365
                    prev_word_html += '</span>'
×
1366
                add = False
×
1367
                rem = False
×
1368
            elif word_text[2:] == '<\\n>':  # New line
×
1369
                if add:
×
1370
                    word_html = f'  </span><\\n> {add_html}'
×
1371
                elif rem:
×
1372
                    word_html = f'  </span><\\n> {rem_html}'
×
1373

1374
            result.append(prev_word_text)
8✔
1375
            result_html.append(prev_word_html)
8✔
1376
            pre_text.append(word_text[2:])
8✔
1377
            pre_html.append(word_html[2:])
8✔
1378
            prev_word_text = ''.join(pre_text)
8✔
1379
            prev_word_html = ''.join(pre_html)
8✔
1380
        result.append(prev_word_text)
8✔
1381
        result_html.append(prev_word_html)
8✔
1382
        if add or rem:
8!
1383
            result[-1] += '\033[0m'
8✔
1384
            result_html[-1] += '</span>'
8✔
1385

1386
        # rebuild the text from words, replacing the newline token
1387
        diff_text = ' '.join(result[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1388
        diff_html = ' '.join(result_html[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1389

1390
        # build contextlines
1391
        contextlines = directives.get('context_lines', self.job.contextlines)
8✔
1392
        # contextlines = 999
1393
        if contextlines is None:
8!
1394
            contextlines = 3
8✔
1395
        range_info = directives.get('range_info', True)
8✔
1396
        if contextlines < len(diff_text.splitlines()):
8!
1397
            lines_with_changes = []
×
1398
            for i, line in enumerate(diff_text.splitlines()):
×
1399
                if '\033[9' in line:
×
1400
                    lines_with_changes.append(i)
×
1401
            if contextlines:
×
1402
                lines_to_keep: set[int] = set()
×
1403
                for i in lines_with_changes:
×
1404
                    lines_to_keep.update(r for r in range(i - contextlines, i + contextlines + 1))
×
1405
            else:
1406
                lines_to_keep = set(lines_with_changes)
×
1407
            new_diff_text = []
×
1408
            new_diff_html = []
×
1409
            last_line = 0
×
1410
            skip = False
×
1411
            for i, (line_text, line_html) in enumerate(zip(diff_text.splitlines(), diff_html.splitlines())):
×
1412
                if i in lines_to_keep:
×
1413
                    if range_info and skip:
×
1414
                        new_diff_text.append(f'@@ {last_line}...{i + 1} @@')
×
1415
                        new_diff_html.append(f'@@ {last_line}...{i + 1} @@')
×
1416
                        skip = False
×
1417
                    new_diff_text.append(line_text)
×
1418
                    new_diff_html.append(line_html)
×
1419
                    last_line = i + 1
×
1420
                else:
1421
                    skip = True
×
1422
            diff_text = '\n'.join(new_diff_text)
×
1423
            diff_html = '\n'.join(new_diff_html)
×
1424

1425
        if self.job.is_markdown:
8!
1426
            diff_text = diff_text.replace('</s>', ' ')
×
1427
            diff_html = diff_html.replace('</s>', ' ')
×
1428
            diff_html = mark_to_html(diff_html, self.job.markdown_padded_tables).replace('<p>', '').replace('</p>', '')
×
1429

1430
        if self.job.monospace:
8!
1431
            diff_html = f'<span style="font-family:monospace;white-space:pre-wrap">{diff_html}</span>'
×
1432
        else:
1433
            diff_html = diff_html.replace('\n', '<br>\n')
8✔
1434

1435
        return {
8✔
1436
            'text': head_text + diff_text,
1437
            'markdown': head_text + diff_text,
1438
            'html': head_html + diff_html,
1439
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc