• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 21856489627

10 Feb 2026 07:57AM UTC coverage: 73.228% (-0.09%) from 73.318%
21856489627

push

github

mborsetti
Version 3.34.0rc0

1424 of 2298 branches covered (61.97%)

Branch coverage included in aggregate %.

4766 of 6155 relevant lines covered (77.43%)

11.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.09
/webchanges/differs.py
1
"""Differs."""
2

3
# The code below is subject to the license contained in the LICENSE.md file, which is part of the source code.
4

5
from __future__ import annotations
15✔
6

7
import base64
15✔
8
import difflib
15✔
9
import html
15✔
10
import logging
15✔
11
import math
15✔
12
import os
15✔
13
import re
15✔
14
import shlex
15✔
15
import subprocess
15✔
16
import sys
15✔
17
import tempfile
15✔
18
import traceback
15✔
19
import urllib.parse
15✔
20
import warnings
15✔
21
from base64 import b64encode
15✔
22
from concurrent.futures import ThreadPoolExecutor
15✔
23
from datetime import datetime
15✔
24
from io import BytesIO
15✔
25
from pathlib import Path
15✔
26
from typing import TYPE_CHECKING, Any, Iterator, Literal, TypedDict
15✔
27
from xml.parsers.expat import ExpatError
15✔
28

29
import html2text
15✔
30
import yaml
15✔
31

32
from webchanges.util import TrackSubClasses, linkify, mark_to_html
15✔
33

34
if TYPE_CHECKING:
35
    from zoneinfo import ZoneInfo
36

37
    from webchanges.jobs import JobBase
38

39

40
try:
15✔
41
    from deepdiff import DeepDiff
15✔
42

43
    if TYPE_CHECKING:
44
        from deepdiff.model import DiffLevel
45
except ImportError as e:  # pragma: no cover
46
    DeepDiff = str(e)  # type: ignore[assignment,misc]
47

48

49
try:
15✔
50
    import httpx
15✔
51
except ImportError:  # pragma: no cover
52
    httpx = None  # type: ignore[assignment]
53
if httpx is not None:
15!
54
    try:
15✔
55
        import h2
15✔
56
    except ImportError:  # pragma: no cover
57
        h2 = None  # type: ignore[assignment]
58

59
try:
15✔
60
    import numpy as np
15✔
61
except ImportError as e:  # pragma: no cover
62
    np = str(e)  # type: ignore[assignment]
63

64
try:
15✔
65
    from PIL import Image, ImageChops, ImageEnhance, ImageStat
15✔
66
except ImportError as e:  # pragma: no cover
67
    Image = str(e)  # type: ignore[assignment]
68

69
# https://stackoverflow.com/questions/712791
70
try:
15✔
71
    import simplejson as jsonlib
15✔
72
except ImportError:  # pragma: no cover
73
    import json as jsonlib
74

75
try:
15✔
76
    import xmltodict
15✔
77
except ImportError as e:  # pragma: no cover
78
    xmltodict = str(e)  # type: ignore[assignment]
79

80
# https://stackoverflow.com/questions/39740632
81
if TYPE_CHECKING:
82
    from webchanges.handler import JobState
83
    from webchanges.storage import _ConfigDifferDefaults
84

85

86
logger = logging.getLogger(__name__)
15✔
87

88
AiGoogleDirectives = TypedDict(
15✔
89
    'AiGoogleDirectives',
90
    {
91
        'model': str,
92
        'additions_only': str,
93
        'system_instructions': str,
94
        'prompt': str,
95
        'prompt_ud_context_lines': int,
96
        'timeout': int,
97
        'max_output_tokens': int | None,
98
        'media_resolution': Literal[
99
            'media_resolution_low', 'media_resolution_medium', 'media_resolution_high', 'media_resolution_ultra_high'
100
        ],
101
        'thinking_level': Literal['low', 'medium', 'high'],
102
        'temperature': float | None,
103
        'top_p': float | None,
104
        'top_k': float | None,
105
        'thinking_budget': float | None,
106
        'tools': list[Any],
107
    },
108
    total=False,
109
)
110

111
ReportKind = Literal['plain', 'markdown', 'html']
15✔
112

113

114
class DifferBase(metaclass=TrackSubClasses):
15✔
115
    """The base class for differs.
116

117
    A differ generates a textual diff representation of data changes in the textual format requested, which can be
118
    either plain, markdown, or html.
119

120
    The diff text (before any filtering) is memoized to prevent unneeded resource wastage when running multiple
121
    reporters, or running reporters that require multiple formats, such as HTML smtp email (requires text in both html
122
    and plain formats).
123
    """
124

125
    __subclasses__: dict[str, type[DifferBase]] = {}
15✔
126
    __anonymous_subclasses__: list[type[DifferBase]] = []
15✔
127

128
    __kind__: str = ''
15✔
129

130
    __supported_directives__: dict[str, str] = {}  # this must be present, even if empty
15✔
131

132
    css_added_style = 'background-color:#d1ffd1;color:#082b08;'
15✔
133
    css_deltd_style = 'background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;'
15✔
134
    css_remvd_style = 'text-decoration:line-through;'
15✔
135

136
    def __init__(self, state: JobState) -> None:
15✔
137
        """:param state: the JobState."""
138
        self.job = state.job
15✔
139
        self.state = state
15✔
140

141
    @classmethod
15✔
142
    def differ_documentation(cls) -> str:
15✔
143
        """Generates simple differ documentation for use in the --features command line argument.
144

145
        :returns: A string to display.
146
        """
147
        result: list[str] = []
15✔
148
        for sc in TrackSubClasses.sorted_by_kind(cls):
15✔
149
            # default_directive = getattr(sc, '__default_directive__', None)
150
            result.extend((f'  * {sc.__kind__} - {sc.__doc__}',))
15✔
151
            if hasattr(sc, '__supported_directives__'):
15!
152
                for key, doc in sc.__supported_directives__.items():
15✔
153
                    result.append(f'      {key} ... {doc}')
15✔
154
        result.append('\n[] ... Parameter can be supplied as unnamed value\n')
15✔
155
        return '\n'.join(result)
15✔
156

157
    @staticmethod
15✔
158
    def debugger_attached() -> bool:
15✔
159
        """Checks if the code is currently running within an external debugger (e.g. IDE).
160

161
        :returns: True if an external debugger is attached, False otherwise.
162
        """
163
        return sys.breakpointhook.__module__ != 'sys'
15✔
164

165
    @classmethod
15✔
166
    def normalize_differ(
15✔
167
        cls,
168
        differ_spec: dict[str, Any] | None,
169
        job_index_number: int | None = None,
170
        differ_defaults: _ConfigDifferDefaults | None = None,
171
    ) -> tuple[str, dict[str, Any]]:
172
        """Checks the differ_spec for its validity and applies default values.
173

174
        :param differ_spec: The differ as entered by the user; use "unified" if empty.
175
        :param job_index_number: The job index number.
176
        :returns: A validated differ_kind, directives tuple.
177
        """
178

179
        def directives_with_defaults(
15✔
180
            differ_spec: str, directives: dict[str, Any], differ_defaults: _ConfigDifferDefaults | None = None
181
        ) -> dict[str, Any]:
182
            """Obtain differ subdirectives that also contains defaults from the configuration.
183

184
            :param differ_spec: The differ as entered by the user; use "unified" if empty.
185
            :param directives: The differ directives as stated in the job.
186
            :param config: The configuration.
187
            :returns: directives inclusive of configuration defaults.
188
            """
189
            if differ_defaults is None:
15✔
190
                logger.info('No configuration object found to look for differ defaults')
15✔
191
                return directives
15✔
192

193
            differ_default = differ_defaults.get(differ_spec, {})
15✔
194
            if isinstance(differ_default, dict):
15!
195
                # merge defaults from configuration (including dicts) into differ directives without overwriting them
196
                for key, value in differ_default.items():
15!
197
                    if key in directives:
×
198
                        if directives[key] is None:  # for speed
×
199
                            directives[key] = value
×
200
                        elif isinstance(value, dict) and isinstance(
×
201
                            directives[key],
202
                            dict,
203
                        ):
204
                            for subkey, subvalue in value.items():
×
205
                                if key in directives and subkey not in directives[key]:
×
206
                                    directives[key][subkey] = subvalue
×
207
                        # elif isinstance(differ_default[key], list) and isinstance(directives[key], list):
208
                        #     directives[key] = list(set(directives[key] + differ_default[key]))
209
                    else:
210
                        directives[key] = value
×
211

212
            return directives
15✔
213

214
        differ_spec = differ_spec or {'name': 'unified'}
15✔
215
        directives = differ_spec.copy()
15✔
216
        differ_kind = directives.pop('name', '')
15✔
217
        if not differ_kind:
15✔
218
            if list(directives.keys()) == ['command']:
15!
219
                differ_kind = 'command'
15✔
220
            else:
221
                raise ValueError(
×
222
                    f"Job {job_index_number}: Differ directive must have a 'name' sub-directive: {differ_spec}."
223
                )
224

225
        differcls: DifferBase | None = cls.__subclasses__.get(differ_kind, None)  # type: ignore[assignment]
15✔
226
        if not differcls:
15✔
227
            raise ValueError(f'Job {job_index_number}: No differ named {differ_kind}.')
15✔
228

229
        directives = directives_with_defaults(differ_kind, directives, differ_defaults)
15✔
230

231
        if hasattr(differcls, '__supported_directives__'):
15!
232
            provided_keys = set(directives.keys())
15✔
233
            allowed_keys = set(differcls.__supported_directives__.keys())
15✔
234
            unknown_keys = provided_keys.difference(allowed_keys)
15✔
235
            if unknown_keys and '<any>' not in allowed_keys:
15✔
236
                raise ValueError(
15✔
237
                    f'Job {job_index_number}: Differ {differ_kind} does not support sub-directive(s) '
238
                    f'{", ".join(unknown_keys)} (supported: {", ".join(sorted(allowed_keys))}).'
239
                )
240

241
        return differ_kind, directives
15✔
242

243
    @classmethod
15✔
244
    def process(
15✔
245
        cls,
246
        differ_kind: str,
247
        directives: dict[str, Any],
248
        job_state: JobState,
249
        report_kind: ReportKind = 'plain',
250
        tz: ZoneInfo | None = None,
251
        _unfiltered_diff: dict[ReportKind, str] | None = None,
252
    ) -> dict[ReportKind, str]:
253
        """Process the differ.
254

255
        :param differ_kind: The name of the differ.
256
        :param directives: The directives.
257
        :param job_state: The JobState.
258
        :param report_kind: The report kind required.
259
        :param tz: The timezone of the report.
260
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
261
           for a different report_kind.
262
        :returns: The output of the differ or an error message with traceback if it fails.
263
        """
264
        logger.info(f'Job {job_state.job.index_number}: Applying differ {differ_kind}, directives {directives}')
15✔
265
        differcls: type[DifferBase] | None = cls.__subclasses__.get(differ_kind)
15✔
266
        if differcls:
15✔
267
            try:
15✔
268
                return differcls(job_state).differ(directives, report_kind, _unfiltered_diff, tz)
15✔
269
            except Exception as e:
15✔
270
                # Differ failed
271
                if cls.debugger_attached():
15!
272
                    raise
×
273
                logger.info(
15✔
274
                    f'Job {job_state.job.index_number}: Differ {differ_kind} with {directives=} encountered error {e}'
275
                )
276
                # Undo saving of new data since user won't see the diff
277
                job_state.delete_latest()
15✔
278

279
                job_state.exception = e
15✔
280
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
15✔
281
                directives_text = (
15✔
282
                    ', '.join(f'{key}={value}' for key, value in directives.items()) if directives else 'None'
283
                )
284
                return {
15✔
285
                    'plain': (
286
                        f'Differ {differ_kind} with directive(s) {directives_text} encountered an '
287
                        f'error:\n\n{job_state.traceback}'
288
                    ),
289
                    'markdown': (
290
                        f'## Differ {differ_kind} with directive(s) {directives_text} '
291
                        f'encountered an error:\n```\n{job_state.traceback}\n```\n'
292
                    ),
293
                    'html': (
294
                        f'<span style="color:red;font-weight:bold">Differ {differ_kind} with directive(s) '
295
                        f'{directives_text} encountered an error:<br>\n<br>\n'
296
                        f'<span style="font-family:monospace;white-space:pre-wrap;">{job_state.traceback}'
297
                        f'</span></span>'
298
                    ),
299
                }
300
        else:
301
            return {}
15✔
302

303
    def differ(
15✔
304
        self,
305
        directives: dict[str, Any],
306
        report_kind: ReportKind,
307
        _unfiltered_diff: dict[ReportKind, str] | None = None,
308
        tz: ZoneInfo | None = None,
309
    ) -> dict[ReportKind, str]:
310
        """Generate a formatted diff representation of data changes.
311

312
        Creates a diff representation in one or more output formats (text, markdown, or HTML).
313
        At minimum, this function must return output in the format specified by 'report_kind'.
314
        As results are memoized for performance optimization, it can generate up to all three formats simultaneously.
315

316
        :param state: The JobState.
317

318
        :param directives: The directives.
319
        :param report_kind: The report_kind for which a diff must be generated (at a minimum).
320
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
321
           for a different report_kind.
322
        :param tz: The timezone of the report.
323
        :returns: An empty dict if there is no change, otherwise a dict with report_kind as key and diff as value
324
           (as a minimum for the report_kind requested).
325
        :raises RuntimeError: If the external diff tool returns an error.
326
        """
327
        raise NotImplementedError
328

329
    @staticmethod
15✔
330
    def make_timestamp(
15✔
331
        timestamp: float,
332
        tz: ZoneInfo | None = None,
333
    ) -> str:
334
        """Format a timestamp as an RFC 5322 compliant datetime string.
335

336
        Converts a numeric timestamp to a formatted datetime string following the RFC 5322 (email) standard. When a
337
        timezone is provided, its full name, if known, is appended.
338

339
        :param timestamp: The timestamp.
340
        :param tz: The IANA timezone of the report.
341
        :returns: A datetime string in RFC 5322 (email) format or 'NEW' if timestamp is 0.
342
        """
343
        if timestamp:
15✔
344
            dt = datetime.fromtimestamp(timestamp).astimezone(tz=tz)
15✔
345
            # add timezone name if known
346
            cfws = f' ({dt.strftime("%Z")})' if dt.strftime('%Z') != dt.strftime('%z')[:3] else ''
15✔
347
            return dt.strftime('%a, %d %b %Y %H:%M:%S %z') + cfws
15✔
348
        return 'NEW'
15✔
349

350
    @staticmethod
15✔
351
    def html2text(data: str) -> str:
15✔
352
        """Converts html to text.
353

354
        :param data: the string in html format.
355
        :returns: the string in text format.
356
        """
357
        parser = html2text.HTML2Text()
15✔
358
        parser.unicode_snob = True
15✔
359
        parser.body_width = 0
15✔
360
        parser.ignore_images = True
15✔
361
        parser.single_line_break = True
15✔
362
        parser.wrap_links = False
15✔
363
        return '\n'.join(line.rstrip() for line in parser.handle(data).splitlines())
15✔
364

365
    def raise_import_error(self, package_name: str, error_message: str) -> None:
15✔
366
        """Raise ImportError for missing package.
367

368
        :param package_name: The name of the module/package that could not be imported.
369
        :param error_message: The error message from ImportError.
370

371
        :raises: ImportError.
372
        """
373
        raise ImportError(
15✔
374
            f"Job {self.job.index_number}: Python package '{package_name}' is not installed; cannot use "
375
            f"'differ: {self.__kind__}' ({self.job.get_location()})\n{error_message}"
376
        )
377

378

379
class UnifiedDiffer(DifferBase):
15✔
380
    """(Default) Generates a unified diff."""
381

382
    __kind__ = 'unified'
15✔
383

384
    __supported_directives__: dict[str, str] = {
15✔
385
        'context_lines': 'the number of context lines (default: 3)',
386
        'range_info': 'include range information lines (default: true)',
387
        'additions_only': 'keep only addition lines (default: false)',
388
        'deletions_only': 'keep only deletion lines (default: false)',
389
    }
390

391
    def unified_diff_to_html(self, diff: str) -> Iterator[str]:
15✔
392
        """Generates a colorized HTML table from unified diff, applying styles and processing based on job values.
393

394
        :param diff: the unified diff
395
        """
396

397
        def process_line(line: str, line_num: int, is_markdown: bool, monospace_style: str) -> str:
15✔
398
            """Processes each line for HTML output, handling special cases and styles.
399

400
            :param line: The line to analyze.
401
            :param line_num: The line number in the document.
402
            :param monospace_style: Additional style string for monospace text.
403

404
            :returns: The line processed into an HTML table row string.
405
            """
406
            # The style= string (or empty string) to add to an HTML tag.
407
            if line_num == 0:
15✔
408
                style = 'font-family:monospace;color:darkred;'
15✔
409
            elif line_num == 1:
15✔
410
                style = 'font-family:monospace;color:darkgreen;'
15✔
411
            elif line[0] == '+':  # addition
15✔
412
                style = f'{monospace_style}{self.css_added_style}'
15✔
413
            elif line[0] == '-':  # deletion
15✔
414
                style = f'{monospace_style}{self.css_deltd_style}'
15✔
415
            elif line[0] == ' ':  # context line
15✔
416
                style = monospace_style
15✔
417
            elif line[0] == '@':  # range information
15✔
418
                style = 'font-family:monospace;background-color:#fbfbfb;'
15✔
419
            elif line[0] == '/':  # informational header added by additions_only or deletions_only filters
15!
420
                style = 'background-color:lightyellow;'
15✔
421
            else:
422
                raise RuntimeError('Unified Diff does not comform to standard!')
×
423
            style = f' style="{style}"' if style else ''
15✔
424

425
            if line_num > 1 and line[0] != '@':  # don't apply to headers or range information
15✔
426
                if is_markdown or line[0] == '/':  # our informational header
15✔
427
                    line = mark_to_html(line[1:], self.job.markdown_padded_tables)
15✔
428
                else:
429
                    line = linkify(line[1:])
15✔
430
            return f'<tr><td{style}>{line}</td></tr>'
15✔
431

432
        table_style = ' style="border-collapse:collapse;"'
15✔
433
        # table_style = (
434
        #     ' style="border-collapse:collapse;font-family:monospace;white-space:pre-wrap;"'
435
        #     if self.job.monospace
436
        #     else ' style="border-collapse:collapse;"'
437
        # )
438
        yield f'<table{table_style}>'
15✔
439
        is_markdown = self.state.is_markdown()
15✔
440
        monospace_style = 'font-family:monospace;' if self.job.monospace else ''
15✔
441
        for i, line in enumerate(diff.splitlines()):
15✔
442
            yield process_line(line, i, is_markdown, monospace_style)
15✔
443
        yield '</table>'
15✔
444

445
    def differ(
15✔
446
        self,
447
        directives: dict[str, Any],
448
        report_kind: ReportKind,
449
        _unfiltered_diff: dict[ReportKind, str] | None = None,
450
        tz: ZoneInfo | None = None,
451
    ) -> dict[ReportKind, str]:
452
        additions_only = directives.get('additions_only') or self.job.additions_only
15✔
453
        deletions_only = directives.get('deletions_only') or self.job.deletions_only
15✔
454
        out_diff: dict[ReportKind, str] = {}
15✔
455
        if report_kind == 'html' and _unfiltered_diff is not None and 'plain' in _unfiltered_diff:
15✔
456
            diff_text = _unfiltered_diff['plain']
15✔
457
        else:
458
            empty_return: dict[ReportKind, str] = {'plain': '', 'markdown': '', 'html': ''}
15✔
459
            contextlines = directives.get('context_lines', self.job.contextlines)
15✔
460
            if contextlines is None:
15✔
461
                contextlines = 0 if additions_only or deletions_only else 3
15✔
462
            diff = list(
15✔
463
                difflib.unified_diff(
464
                    str(self.state.old_data).splitlines(),
465
                    str(self.state.new_data).splitlines(),
466
                    '@',
467
                    '@',
468
                    self.make_timestamp(self.state.old_timestamp, tz),
469
                    self.make_timestamp(self.state.new_timestamp, tz),
470
                    contextlines,
471
                    lineterm='',
472
                )
473
            )
474
            if not diff:
15✔
475
                self.state.verb = 'changed,no_report'
15✔
476
                return empty_return
15✔
477
            # replace tabs in header lines
478
            diff[0] = diff[0].replace('\t', ' ')
15✔
479
            diff[1] = diff[1].replace('\t', ' ')
15✔
480

481
            if additions_only:
15✔
482
                if len(self.state.old_data) and len(self.state.new_data) / len(self.state.old_data) <= 0.25:
15✔
483
                    diff = [
15✔
484
                        *diff[:2],
485
                        '/**Comparison type: Additions only**',
486
                        '/**Deletions are being shown as 75% or more of the content has been deleted**',
487
                        *diff[2:],
488
                    ]
489
                else:
490
                    head = '---' + diff[0][3:]
15✔
491
                    diff = [line for line in diff if line.startswith(('+', '@'))]
15✔
492
                    diff = [
15✔
493
                        line1
494
                        for line1, line2 in zip(['', *diff], [*diff, ''], strict=False)
495
                        if not (line1.startswith('@') and line2.startswith('@'))
496
                    ][1:]
497
                    diff = diff[:-1] if diff[-1].startswith('@') else diff
15✔
498
                    if len(diff) == 1 or len([line for line in diff if line.removeprefix('+').rstrip()]) == 2:
15✔
499
                        self.state.verb = 'changed,no_report'
15✔
500
                        return empty_return
15✔
501
                    diff = [head, diff[0], '/**Comparison type: Additions only**', *diff[1:]]
15✔
502
            elif deletions_only:
15✔
503
                head = '--- @' + diff[1][3:]
15✔
504
                diff = [line for line in diff if line.startswith(('-', '@'))]
15✔
505
                diff = [
15✔
506
                    line1
507
                    for line1, line2 in zip(['', *diff], [*diff, ''], strict=False)
508
                    if not (line1.startswith('@') and line2.startswith('@'))
509
                ][1:]
510
                diff = diff[:-1] if diff[-1].startswith('@') else diff
15✔
511
                if len(diff) == 1 or len([line for line in diff if line.removeprefix('-').rstrip()]) == 2:
15✔
512
                    self.state.verb = 'changed,no_report'
15✔
513
                    return empty_return
15✔
514
                diff = [diff[0], head, '/**Comparison type: Deletions only**', *diff[1:]]
15✔
515

516
            # remove range info lines if needed
517
            if directives.get('range_info') is False or (
15✔
518
                directives.get('range_info') is None and additions_only and (len(diff) < 4 or diff[3][0] != '/')
519
            ):
520
                diff = [line for line in diff if not line.startswith('@@ ')]
15✔
521

522
            diff_text = '\n'.join(diff)
15✔
523

524
            out_diff.update(
15✔
525
                {
526
                    'plain': diff_text,
527
                    'markdown': diff_text,
528
                }
529
            )  # ty:ignore[no-matching-overload]
530

531
        if report_kind == 'html':
15✔
532
            out_diff['html'] = '\n'.join(self.unified_diff_to_html(diff_text))
15✔
533

534
        return out_diff
15✔
535

536

537
class TableDiffer(DifferBase):
15✔
538
    """Generates a Python HTML table diff."""
539

540
    __kind__ = 'table'
15✔
541

542
    __supported_directives__: dict[str, str] = {
15✔
543
        'tabsize': 'tab stop spacing (default: 8)',
544
    }
545

546
    def differ(
15✔
547
        self,
548
        directives: dict[str, Any],
549
        report_kind: ReportKind,
550
        _unfiltered_diff: dict[ReportKind, str] | None = None,
551
        tz: ZoneInfo | None = None,
552
    ) -> dict[ReportKind, str]:
553
        out_diff: dict[ReportKind, str] = {}
15✔
554
        if report_kind in {'plain', 'markdown'} and _unfiltered_diff is not None and 'html' in _unfiltered_diff:
15✔
555
            table = _unfiltered_diff['html']
15✔
556
        else:
557
            tabsize = int(directives.get('tabsize', 8))
15✔
558
            html_diff = difflib.HtmlDiff(tabsize=tabsize)
15✔
559
            table = html_diff.make_table(
15✔
560
                str(self.state.old_data).splitlines(keepends=True),
561
                str(self.state.new_data).splitlines(keepends=True),
562
                self.make_timestamp(self.state.old_timestamp, tz),
563
                self.make_timestamp(self.state.new_timestamp, tz),
564
                True,
565
                3,
566
            )
567
            # fix table formatting
568
            table = table.replace('<th ', '<th style="font-family:monospace" ')
15✔
569
            table = table.replace('<td ', '<td style="font-family:monospace" ')
15✔
570
            table = table.replace(' nowrap="nowrap"', '')
15✔
571
            table = table.replace('<a ', '<a style="font-family:monospace;color:inherit" ')
15✔
572
            table = table.replace('<span class="diff_add"', '<span style="color:green;background-color:lightgreen"')
15✔
573
            table = table.replace('<span class="diff_sub"', '<span style="color:red;background-color:lightred"')
15✔
574
            table = table.replace('<span class="diff_chg"', '<span style="color:orange;background-color:lightyellow"')
15✔
575
            out_diff['html'] = table
15✔
576

577
        if report_kind in {'plain', 'markdown'}:
15✔
578
            diff_text = self.html2text(table)
15✔
579
            out_diff.update(
15✔
580
                {
581
                    'plain': diff_text,
582
                    'markdown': diff_text,
583
                }
584
            )  # ty:ignore[no-matching-overload]
585

586
        return out_diff
15✔
587

588

589
class CommandDiffer(DifferBase):
15✔
590
    """Runs an external command to generate the diff."""
591

592
    __kind__ = 'command'
15✔
593

594
    __supported_directives__: dict[str, str] = {
15✔
595
        'context_lines': 'the number of context lines if command starts with wdiff (default: 3)',
596
        'command': 'The command to execute',
597
        'is_html': 'Whether the output of the command is HTML',
598
    }
599

600
    re_ptags = re.compile(r'^<p>|</p>$')
15✔
601
    re_htags = re.compile(r'<(/?)h\d>')
15✔
602
    re_tagend = re.compile(r'<(?!.*<).*>+$')
15✔
603

604
    def differ(
15✔
605
        self,
606
        directives: dict[str, Any],
607
        report_kind: ReportKind,
608
        _unfiltered_diff: dict[ReportKind, str] | None = None,
609
        tz: ZoneInfo | None = None,
610
    ) -> dict[ReportKind, str]:
611
        if self.job.monospace:
15!
612
            head_html = '\n'.join(
×
613
                [
614
                    '<span style="font-family:monospace;white-space:pre-wrap;">',
615
                    # f"Using command differ: {directives['command']}",
616
                    f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
617
                    f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>',
618
                ]
619
            )
620
        else:
621
            head_html = '<br>\n'.join(
15✔
622
                [
623
                    '<span style="font-family:monospace;">',
624
                    # f"Using command differ: {directives['command']}",
625
                    f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
626
                    f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>',
627
                    '</span>',
628
                ]
629
            )
630

631
        out_diff: dict[ReportKind, str] = {}
15✔
632
        command = directives['command']
15✔
633
        if report_kind == 'html' and _unfiltered_diff is not None and 'plain' in _unfiltered_diff:
15✔
634
            diff_text = ''.join(_unfiltered_diff['plain'].splitlines(keepends=True)[2:])
15✔
635
        else:
636
            old_data = self.state.old_data
15✔
637
            new_data = self.state.new_data
15✔
638
            if self.state.is_markdown():
15✔
639
                # protect the link anchor from being split (won't work)
640
                markdown_links_re = re.compile(r'\[(.*?)][(](.*?)[)]')
15✔
641
                old_data = markdown_links_re.sub(
15✔
642
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(old_data)
643
                )
644
                new_data = markdown_links_re.sub(
15✔
645
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(new_data)
646
                )
647

648
            # External diff tool
649
            with tempfile.TemporaryDirectory() as tmp_dir:
15✔
650
                tmp_path = Path(tmp_dir)
15✔
651
                old_file_path = tmp_path.joinpath('old_file')
15✔
652
                new_file_path = tmp_path.joinpath('new_file')
15✔
653
                if isinstance(old_data, str):
15!
654
                    old_file_path.write_text(old_data)
15✔
655
                else:
656
                    old_file_path.write_bytes(old_data)
×
657
                if isinstance(new_data, str):
15!
658
                    new_file_path.write_text(new_data)
15✔
659
                else:
660
                    new_file_path.write_bytes(new_data)
×
661
                cmdline = [*shlex.split(command), str(old_file_path), str(new_file_path)]
15✔
662
                proc = subprocess.run(cmdline, check=False, capture_output=True, text=True)  # noqa: S603 subprocess call
15✔
663
            if proc.stderr or proc.returncode > 1:
15✔
664
                raise RuntimeError(
15✔
665
                    f"Job {self.job.index_number}: External differ '{directives}' returned '{proc.stderr.strip()}' "
666
                    f'({self.job.get_location()})'
667
                ) from subprocess.CalledProcessError(proc.returncode, cmdline)
668
            if proc.returncode == 0:
15✔
669
                self.state.verb = 'changed,no_report'
10✔
670
                logger.info(
10✔
671
                    f"Job {self.job.index_number}: Command in differ 'command' returned 0 (no report) "
672
                    f'({self.job.get_location()})'
673
                )
674
                return {'plain': '', 'markdown': '', 'html': ''}
10✔
675
            head_text = '\n'.join(
15✔
676
                [
677
                    # f"Using command differ: {directives['command']}",
678
                    f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m',
679
                    f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m',
680
                    '',
681
                ]
682
            )
683
            diff = proc.stdout
15✔
684
            if self.state.is_markdown():
15!
685
                # undo the protection of the link anchor from being split
686
                diff = markdown_links_re.sub(lambda x: f'[{urllib.parse.unquote(x.group(1))}]({x.group(2)})', diff)
15✔
687
            if command.startswith('wdiff'):
15!
688
                logger.warning(
×
689
                    "Job {self.job.index_number}: Using external wdiff; note that a 'wdiff' differ is now available "
690
                    'within webchanges'
691
                )
692
                if self.job.contextlines == 0:
×
693
                    # remove lines that don't have any changes
694
                    keeplines = [
×
695
                        line
696
                        for line in diff.splitlines(keepends=True)
697
                        if any(x in line for x in ('{+', '+}', '[-', '-]'))
698
                    ]
699
                    diff = ''.join(keeplines)
×
700

701
            if directives.get('is_html'):
15!
702
                diff_text = self.html2text(diff)
×
703
                out_diff.update(
×
704
                    {
705
                        'plain': head_text + diff_text,
706
                        'markdown': head_text + diff_text,
707
                        'html': head_html + diff,
708
                    }
709
                )  # ty:ignore[no-matching-overload]
710
            else:
711
                diff_text = diff
15✔
712
                out_diff.update(
15✔
713
                    {
714
                        'plain': head_text + diff_text,
715
                        'markdown': head_text + diff_text,
716
                    }
717
                )  # ty:ignore[no-matching-overload]
718

719
        if report_kind == 'html' and 'html' not in out_diff:
15✔
720
            if command.startswith('wdiff'):
15!
721
                # colorize output of wdiff
722
                out_diff['html'] = head_html + self.wdiff_to_html(diff_text)
×
723
            else:
724
                out_diff['html'] = head_html + html.escape(diff_text)
15✔
725

726
        if self.job.monospace and 'html' in out_diff:
15!
727
            out_diff['html'] += '</span>'
×
728

729
        return out_diff
15✔
730

731
    def wdiff_to_html(self, diff: str) -> str:
15✔
732
        """Colorize output of wdiff.
733

734
        :param diff: The output of the wdiff command.
735
        :returns: The colorized HTML output.
736
        """
737
        html_diff = html.escape(diff)
15✔
738
        if self.state.is_markdown():
15✔
739
            # detect and fix multiline additions or deletions
740
            is_add = False
15✔
741
            is_del = False
15✔
742
            new_diff = []
15✔
743
            for line in html_diff.splitlines():
15✔
744
                if is_add:
15✔
745
                    line = '{+' + line
15✔
746
                    is_add = False
15✔
747
                elif is_del:
15✔
748
                    line = '[-' + line
15✔
749
                    is_del = False
15✔
750
                for match in re.findall(r'\[-|-]|{\+|\+}', line):
15✔
751
                    if match == '[-':
15✔
752
                        is_del = True
15✔
753
                    if match == '-]':
15✔
754
                        is_del = False
15✔
755
                    if match == '{+':
15✔
756
                        is_add = True
15✔
757
                    if match == '+}':
15✔
758
                        is_add = False
15✔
759
                if is_add:
15✔
760
                    line += '+}'
15✔
761
                elif is_del:
15✔
762
                    line += '-]'
15✔
763
                new_diff.append(line)
15✔
764
            html_diff = '<br>\n'.join(new_diff)
15✔
765

766
        # wdiff colorization (cannot be done with global CSS class as Gmail overrides it)
767
        html_diff = re.sub(
15✔
768
            r'\{\+(.*?)\+}',
769
            lambda x: f'<span style="{self.css_added_style}">{x.group(1)}</span>',
770
            html_diff,
771
            flags=re.DOTALL,
772
        )
773
        html_diff = re.sub(
15✔
774
            r'\[-(.*?)-]',
775
            lambda x: f'<span style="{self.css_deltd_style}">{x.group(1)}</span>',
776
            html_diff,
777
            flags=re.DOTALL,
778
        )
779
        if self.job.monospace:
15✔
780
            return f'<span style="font-family:monospace;white-space:pre-wrap">{html_diff}</span>'
15✔
781
        return html_diff
15✔
782

783

784
class DeepdiffDiffer(DifferBase):
15✔
785
    __kind__ = 'deepdiff'
15✔
786

787
    __supported_directives__: dict[str, str] = {
15✔
788
        'data_type': "either 'json' (default), 'yaml', or 'xml'",
789
        'ignore_order': 'Whether to ignore the order in which the items have appeared (default: false)',
790
        'ignore_string_case': 'Whether to be case-sensitive or not when comparing strings (default: false)',
791
        'significant_digits': (
792
            'The number of digits AFTER the decimal point to be used in the comparis: ston (default: no limit)'
793
        ),
794
        'compact': 'Whether to output a compact representation that also ignores changes of types (default: false)',
795
    }
796

797
    def differ(  # noqa: C901 mccabe complexity too high
15✔
798
        self,
799
        directives: dict[str, Any],
800
        report_kind: ReportKind,
801
        _unfiltered_diff: dict[ReportKind, str] | None = None,
802
        tz: ZoneInfo | None = None,
803
    ) -> dict[ReportKind, str]:
804
        if isinstance(DeepDiff, str):  # pragma: no cover
805
            self.raise_import_error('deepdiff', DeepDiff)
806
            raise RuntimeError  # for type checker
807

808
        span_added = f'<span style="{self.css_added_style}">'
15✔
809
        span_deltd = f'<span style="{self.css_deltd_style}">'
15✔
810
        span_remvd = f'<span style="{self.css_remvd_style}">'
15✔
811

812
        def _pretty_deepdiff(
15✔
813
            ddiff: DeepDiff,
814
            report_kind: ReportKind,
815
            compact: bool,
816
        ) -> str:
817
            """Customized version of deepdiff.serialization.SerializationMixin.pretty method, edited to include the
818
            values deleted or added and an option for colorized HTML output. The pretty human-readable string
819
            output for the diff object regardless of what view was used to generate the diff.
820

821
            :param ddiff: The diff object.
822
            :param report_kind: The report kind.
823
            :param compact: Whether to return diff text in compact mode.
824
            """
825
            # Edited strings originally in deepdiff.serialization._get_pretty_form_text
826
            # See https://github.com/seperman/deepdiff/blob/master/deepdiff/serialization.py
827
            if compact:
15✔
828
                root = '⊤'  # noqa: RUF001 DOWN TACK
15✔
829
                if report_kind == 'html':
15✔
830
                    pretty_form_texts = {
15✔
831
                        'type_changes': (
832
                            f'{{diff_path}}: {span_deltd}{{val_t1}}</span> ⮕ {span_added}{{val_t2}}</span>'
833
                        ),
834
                        'values_changed': (
835
                            f'{{diff_path}}: {span_deltd}{{val_t1}}</span> ⮕ {span_added}{{val_t2}}</span>'
836
                        ),
837
                        'dictionary_item_added': f'{{diff_path}}: {span_added}{{val_t2}}</span>',
838
                        'dictionary_item_removed': f'{span_deltd}{{diff_path}}: {{val_t1}}</span>',
839
                        'iterable_item_added': f'{{diff_path}}: {span_added}{{val_t2}}</span>',
840
                        'iterable_item_removed': f'{span_deltd}{{diff_path}}: {{val_t1}}</span>',
841
                        'attribute_added': f'{{diff_path}}: {span_added}{{val_t2}}</span>',
842
                        'attribute_removed': f'{span_remvd}{{diff_path}}</span>: {span_deltd}{{val_t1}}</span>',
843
                        'set_item_added': f'⊤[{{val_t2}}]: {span_added}{{val_t1}}</span>',  # noqa: RUF001 DOWN TACK
844
                        'set_item_removed': (
845
                            f'{span_remvd}⊤[{{val_t1}}]</span>: {span_deltd}{{val_t2}}</span>'  # noqa: RUF001
846
                        ),
847
                        'repetition_change': (
848
                            f'{{diff_path}}: repetition change {span_deltd}{{val_t1}}</span> ⮕ '
849
                            f'{span_added}{{val_t2}}</span>'
850
                        ),
851
                    }
852
                else:
853
                    pretty_form_texts = {
15✔
854
                        'type_changes': '{diff_path}: {val_t1} → {val_t2}',
855
                        'values_changed': '{diff_path}: {val_t1} → {val_t2}',
856
                        'dictionary_item_added': '{diff_path}: new {val_t2}',
857
                        'dictionary_item_removed': '{diff_path}: removed {val_t1}',
858
                        'iterable_item_added': '{diff_path}: new {val_t2}',
859
                        'iterable_item_removed': '{diff_path}: removed {val_t1}',
860
                        'attribute_added': '{diff_path}: new {val_t2}',
861
                        'attribute_removed': '{diff_path}: removed {val_t1}',
862
                        'set_item_added': '⊤[{val_t2}]: new {val_t1}',  # noqa: RUF001 DOWN TACK
863
                        'set_item_removed': '⊤[{val_t1}]: removed {val_t2}',  # noqa: RUF001 DOWN TACK
864
                        'repetition_change': '{diff_path}: repetition change {val_t1} → {val_t2}',
865
                    }
866
            else:  # not compact
867
                root = 'root'
15✔
868
                if report_kind == 'html':
15✔
869
                    pretty_form_texts = {
15✔
870
                        'type_changes': (
871
                            'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
872
                            f'from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}</span>.'
873
                        ),
874
                        'values_changed': (
875
                            f'Value of {{diff_path}} changed from {span_deltd}{{val_t1}}</span> to {span_added}'
876
                            '{val_t2}</span>.'
877
                        ),
878
                        'dictionary_item_added': (
879
                            f'Item {{diff_path}} added to dictionary as {span_added}{{val_t2}}</span>.'
880
                        ),
881
                        'dictionary_item_removed': (
882
                            f'Item {{diff_path}} removed from dictionary (was {span_deltd}{{val_t1}}</span>).'
883
                        ),
884
                        'iterable_item_added': (
885
                            f'Item {{diff_path}} added to iterable as {span_added}{{val_t2}}</span>.'
886
                        ),
887
                        'iterable_item_removed': (
888
                            f'Item {{diff_path}} removed from iterable (was {span_deltd}{{val_t1}}</span>).'
889
                        ),
890
                        'attribute_added': f'Attribute {{diff_path}} added as {span_added}{{val_t2}}</span>.',
891
                        'attribute_removed': f'Attribute {{diff_path}} removed (was {span_deltd}{{val_t1}}</span>).',
892
                        'set_item_added': f'Item root[{{val_t2}}] added to set as {span_added}{{val_t1}}</span>.',
893
                        'set_item_removed': (
894
                            f'Item root[{{val_t1}}] removed from set (was {span_deltd}{{val_t2}}</span>).'
895
                        ),
896
                        'repetition_change': (
897
                            f'Repetition change for item {{diff_path}} ({span_deltd}{{val_t2}}</span>).'
898
                        ),
899
                    }
900
                else:
901
                    pretty_form_texts = {
15✔
902
                        'type_changes': (
903
                            'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
904
                            'from {val_t1} to {val_t2}.'
905
                        ),
906
                        'values_changed': 'Value of {diff_path} changed from {val_t1} to {val_t2}.',
907
                        'dictionary_item_added': 'Item {diff_path} added to dictionary as {val_t2}.',
908
                        'dictionary_item_removed': 'Item {diff_path} removed from dictionary (was {val_t1}).',
909
                        'iterable_item_added': 'Item {diff_path} added to iterable as {val_t2}.',
910
                        'iterable_item_removed': 'Item {diff_path} removed from iterable (was {val_t1}).',
911
                        'attribute_added': 'Attribute {diff_path} added as {val_t2}.',
912
                        'attribute_removed': 'Attribute {diff_path} removed (was {val_t1}).',
913
                        'set_item_added': 'Item root[{val_t2}] added to set as {val_t1}.',
914
                        'set_item_removed': 'Item root[{val_t1}] removed from set (was {val_t2}).',
915
                        'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
916
                    }
917

918
            def _pretty_print_diff(ddiff: DiffLevel) -> str:
15✔
919
                """Customized version of deepdiff.serialization.pretty_print_diff() function, edited to include the
920
                values deleted or added.
921
                """
922

923
                def stringify_value(value: Any, value_type: str) -> str:  # noqa: ANN401 Dynamically typed expressions Any are disallowed
15✔
924
                    if value_type in {'str', 'int', 'float'}:
15✔
925
                        if compact:
15✔
926
                            return f"'{value}'"
15✔
927
                        return f'"{value}"'
15✔
928
                    if value_type in {'dict', 'list'}:
15!
929
                        if compact:
15✔
930
                            value_string = yaml.safe_dump(
15✔
931
                                value,
932
                                default_flow_style=False,
933
                                width=999,
934
                                allow_unicode=True,
935
                                sort_keys=False,
936
                            )
937
                            value_list = value_string.splitlines(keepends=True)
15✔
938
                            if len(value_list) < 2:
15!
939
                                return value_string
×
940
                            value_string = '\n    ' + '    '.join(value_list)
15✔
941
                            return value_string.rstrip()
15✔
942
                        return jsonlib.dumps(value, ensure_ascii=False, indent=2)
15✔
943
                    return str(value)
×
944

945
                type_t1 = type(ddiff.t1).__name__
15✔
946
                val_t1 = stringify_value(ddiff.t1, type_t1)
15✔
947
                type_t2 = type(ddiff.t2).__name__
15✔
948
                val_t2 = stringify_value(ddiff.t2, type_t2)
15✔
949

950
                diff_path = ddiff.path(root=root)
15✔
951
                return '• ' + pretty_form_texts.get(
15✔
952
                    ddiff.report_type or '',
953
                    '',
954
                ).format(
955
                    diff_path=diff_path,
956
                    type_t1=type_t1,
957
                    type_t2=type_t2,
958
                    val_t1=val_t1,
959
                    val_t2=val_t2,
960
                )
961

962
            def _pretty_print_diff_markdown_to_html(ddiff: DiffLevel) -> str:
15✔
963
                """Customized version of deepdiff.serialization.pretty_print_diff() function, edited to include the
964
                values deleted or added and to convert markdown into html.
965
                """
966

967
                def stringify_value(value: Any, value_type: str) -> str:  # noqa: ANN401 Dynamically typed expressions Any are disallowed
×
968
                    if value_type in {'str', 'int', 'float'}:
×
969
                        return f"'{mark_to_html(str(value))}'"
×
970
                    if value_type in {'dict', 'list'}:
×
971
                        if compact:
×
972
                            value_string = yaml.safe_dump(
×
973
                                value,
974
                                default_flow_style=False,
975
                                width=999,
976
                                allow_unicode=True,
977
                                sort_keys=False,
978
                            )
979
                            value_list = value_string.splitlines(keepends=True)
×
980
                            if len(value_list) < 2:
×
981
                                return value_string
×
982
                            value_string = mark_to_html('\n    ' + '    '.join(value_list))
×
983
                            return value_string.rstrip()
×
984
                        return mark_to_html(jsonlib.dumps(value, ensure_ascii=False, indent=2))
×
985
                    return mark_to_html(str(value))
×
986

987
                type_t1 = type(ddiff.t1).__name__
×
988
                val_t1 = stringify_value(ddiff.t1, type_t1)
×
989
                type_t2 = type(ddiff.t2).__name__
×
990
                val_t2 = stringify_value(ddiff.t2, type_t2)
×
991

992
                diff_path = ddiff.path(root=root)
×
993
                return '• ' + pretty_form_texts.get(
×
994
                    ddiff.report_type or '',
995
                    '',
996
                ).format(
997
                    diff_path=diff_path,
998
                    type_t1=type_t1,
999
                    type_t2=type_t2,
1000
                    val_t1=val_t1,
1001
                    val_t2=val_t2,
1002
                )
1003

1004
            result = (
15✔
1005
                [
1006
                    _pretty_print_diff_markdown_to_html(item_key)
1007
                    for tree_item in ddiff.tree.values()
1008
                    for item_key in tree_item
1009
                ]
1010
                if report_kind == 'html' and self.state.is_markdown()
1011
                else [_pretty_print_diff(item_key) for tree_item in ddiff.tree.values() for item_key in tree_item]
1012
            )
1013

1014
            return '\n'.join(result)
15✔
1015

1016
        def _serialize_method(
15✔
1017
            mime_type: str | None, data_label: Literal['Old', 'New']
1018
        ) -> Literal['json', 'yaml', 'xml', 'text'] | None:
1019
            """Parses the media type (formerly known as MIME type) of the data and determine if it's a known
1020
            seralization method.
1021

1022
            Uses data from https://www.iana.org/assignments/media-types/media-types.xhtml as well as various internet
1023
            searches.
1024

1025
            :param mime_type: The media type (formerly known as MIME type) of the data.
1026
            :param data_label: Either old or new, used for error reporting.
1027

1028
            :returns: Known serialization method or None.
1029
            """
1030
            if not mime_type:
15!
1031
                logger.info(
×
1032
                    f"Differ {self.__kind__} data_type for {data_label} data defaulted to 'json' as media type is "
1033
                    'missing.'
1034
                )
1035
                return 'json'
×
1036

1037
            media_type, subtype = mime_type.split('/', 1)
15✔
1038
            subtype = subtype.removeprefix('x-')  # 'x-' is deprecated per RFC6648 and needs to be removed
15✔
1039
            subtype = subtype.split('.')[-1]  # remove facet name; see RFC6838
15✔
1040
            subtype, subtype_suffix = subtype.split('+', 1) if '+' in subtype else (subtype, None)
15✔
1041

1042
            if media_type not in ('text', 'application'):
15!
1043
                return None
×
1044
            if {'yaml', 'yml'} & {subtype, subtype_suffix}:
15✔
1045
                return 'yaml'
15✔
1046
            if 'xml' in (subtype, subtype_suffix):
15✔
1047
                return 'xml'
15✔
1048
            if 'json' in (subtype, subtype_suffix):
15!
1049
                return 'json'
15✔
1050
            if media_type == 'application':
×
1051
                logger.info(
×
1052
                    f'Differ {self.__kind__} could not determine known serialization type of {data_label} data from '
1053
                    f"media type {mime_type}; defaulting to 'json'."
1054
                )
1055
                return 'json'
×
1056
            logger.info(
×
1057
                f'Differ {self.__kind__} could not determine data type of {data_label} data from media '
1058
                f"type {mime_type}; defaulting to 'text'."
1059
            )
1060
            return 'text'
×
1061

1062
        def deserialize_data(
15✔
1063
            data: str | bytes,
1064
            media_type: str | None,
1065
            data_type: Literal['json', 'yaml', 'xml', 'text'] | None,
1066
            data_label: Literal['Old', 'New'],
1067
        ) -> tuple[Any, dict | None]:
1068
            """Deserializes the stored data.
1069

1070
            :param data: The stored data.
1071
            :param mime_type: The media type (formerly MIME type) of the data.
1072
            :param data_type: The value of the data_type sub-parameter (overrides media type)
1073
            :param data_label: Either old or new, used for error reporting
1074

1075
            :returns: The deserialized data, any errors
1076
            """
1077
            if not data:
15✔
1078
                return data, None
15✔
1079
            deserialize_method = data_type or _serialize_method(media_type, data_label)
15✔
1080
            if deserialize_method == 'json':
15✔
1081
                try:
15✔
1082
                    return jsonlib.loads(data), None
15✔
1083
                except jsonlib.JSONDecodeError as e:
15✔
1084
                    self.state.exception = e
15✔
1085
                    self.state.traceback = self.job.format_error(e, traceback.format_exc())
15✔
1086
                    logger.error(
15✔
1087
                        f'Job {self.job.index_number}: {data_label} data is invalid JSON: {e} '
1088
                        f'({self.job.get_location()})'
1089
                    )
1090
                    logger.info(f'Job {self.job.index_number}: {data!r}')
15✔
1091
                    return None, {
15✔
1092
                        'plain': f'Differ {self.__kind__} ERROR: {data_label} data is invalid JSON\n{e}',
1093
                        'markdown': f'Differ {self.__kind__} **ERROR: {data_label} data is invalid JSON**\n{e}',
1094
                        'html': f'Differ {self.__kind__} <b>ERROR: {data_label} data is invalid JSON</b>\n{e}',
1095
                    }
1096
            if deserialize_method == 'yaml':
15✔
1097
                try:
15✔
1098
                    return yaml.safe_load(data), None
15✔
1099
                except yaml.YAMLError as e:
×
1100
                    self.state.exception = e
×
1101
                    self.state.traceback = self.job.format_error(e, traceback.format_exc())
×
1102
                    logger.error(
×
1103
                        f'Job {self.job.index_number}: {data_label} data is invalid YAML: {e} '
1104
                        f'({self.job.get_location()})'
1105
                    )
1106
                    logger.info(f'Job {self.job.index_number}: {data!r}')
×
1107
                    return None, {
×
1108
                        'plain': f'Differ {self.__kind__} ERROR: {data_label} data is invalid YAML\n{e}',
1109
                        'markdown': f'Differ {self.__kind__} **ERROR: {data_label} data is invalid YAML**\n{e}',
1110
                        'html': f'Differ {self.__kind__} <b>ERROR: {data_label} data is invalid YAML</b>\n{e}',
1111
                    }
1112
            if deserialize_method == 'xml':
15✔
1113
                if isinstance(xmltodict, str):  # pragma: no cover
1114
                    self.raise_import_error('xmltodict', xmltodict)
1115
                    raise RuntimeError  # for type checker
1116
                try:
15✔
1117
                    return xmltodict.parse(data), None
15✔
1118
                except ExpatError as e:
×
1119
                    self.state.exception = e
×
1120
                    self.state.traceback = self.job.format_error(e, traceback.format_exc())
×
1121
                    logger.error(
×
1122
                        f'Job {self.job.index_number}: {data_label} data is invalid XML: {e} '
1123
                        f'({self.job.get_location()})'
1124
                    )
1125
                    logger.info(f'Job {self.job.index_number}: {data!r}')
×
1126
                    return None, {
×
1127
                        'plain': f'Differ {self.__kind__} ERROR: {data_label} data is invalid XML\n{e}',
1128
                        'markdown': f'Differ {self.__kind__} **ERROR: {data_label} data is invalid XML**\n{e}',
1129
                        'html': f'Differ {self.__kind__} <b>ERROR: {data_label} data is invalid XML</b>\n{e}',
1130
                    }
1131
            if deserialize_method == 'text':
×
1132
                return data, None
×
1133
            return None, {
×
1134
                'plain': f'Differ {self.__kind__} ERROR: data_type {data_type} is not supported',
1135
                'markdown': f'Differ {self.__kind__} **ERROR: data_type {data_type} is not supported**',
1136
                'html': f'Differ {self.__kind__} <b>ERROR: data_type {data_type} is not supported</b>',
1137
            }
1138

1139
        old_data, err = deserialize_data(
15✔
1140
            self.state.old_data,
1141
            self.state.old_mime_type,
1142
            directives.get('data_type'),
1143
            'Old',
1144
        )
1145
        if err:
15✔
1146
            return err
15✔
1147
        new_data, err = deserialize_data(
15✔
1148
            self.state.new_data,
1149
            self.state.new_mime_type,
1150
            directives.get('data_type'),
1151
            'New',
1152
        )
1153
        if err:
15!
1154
            return err
×
1155
        ignore_order = bool(directives.get('ignore_order'))
15✔
1156
        ignore_string_case = bool(directives.get('ignore_string_case'))
15✔
1157
        significant_digits = directives.get('significant_digits')
15✔
1158
        compact = bool(directives.get('compact'))
15✔
1159
        ddiff = DeepDiff(
15✔
1160
            old_data,
1161
            new_data,
1162
            cache_purge_level=0,
1163
            cache_size=500,
1164
            cache_tuning_sample_size=500,
1165
            default_timezone=tz,  # ty:ignore[invalid-argument-type]
1166
            ignore_numeric_type_changes=True,
1167
            ignore_order=ignore_order,
1168
            ignore_string_case=ignore_string_case,
1169
            ignore_string_type_changes=True,
1170
            significant_digits=significant_digits,
1171
            verbose_level=min(2, max(0, math.ceil(3 - logger.getEffectiveLevel() / 10))),
1172
        )
1173
        diff_text = _pretty_deepdiff(ddiff, report_kind, compact)
15✔
1174
        if not diff_text:
15✔
1175
            self.state.verb = 'changed,no_report'
15✔
1176
            return {'plain': '', 'markdown': '', 'html': ''}
15✔
1177

1178
        self.job.set_to_monospace()
15✔
1179
        if report_kind == 'html':
15✔
1180
            html_diff = (
15✔
1181
                f'<span style="font-family:monospace;white-space:pre-wrap;">'
1182
                # f'Differ: {self.__kind__} for {data_type}\n'
1183
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>\n'
1184
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>\n'
1185
                + diff_text.replace('][', ']<wbr>[')
1186
                + '</span>'
1187
            )
1188
            return {'html': html_diff}
15✔
1189
        text_diff = (
15✔
1190
            # f'Differ: {self.__kind__} for {data_type}\n'
1191
            f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\n'
1192
            f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\n'
1193
            f'{diff_text}'
1194
        )
1195
        return {'plain': text_diff, 'markdown': text_diff}
15✔
1196

1197

1198
class ImageDiffer(DifferBase):
15✔
1199
    """Compares two images providing an image outlining areas that have changed."""
1200

1201
    __kind__ = 'image'
15✔
1202

1203
    __supported_directives__: dict[str, str] = {
15✔
1204
        'data_type': (
1205
            "'url' (to retrieve an image), 'ascii85' (Ascii85 data), 'base64' (Base64 data) or 'filename' (the path "
1206
            "to an image file) (default: 'url')"
1207
        ),
1208
        'mse_threshold': (
1209
            'the minimum mean squared error (MSE) between two images to consider them changed, if numpy in installed '
1210
            '(default: 2.5)'
1211
        ),
1212
        'ai_google': 'Generative AI summary of changes',
1213
    }
1214

1215
    def differ(  # noqa: C901 mccabe complexity too high
15✔
1216
        self,
1217
        directives: dict[str, Any],
1218
        report_kind: ReportKind,
1219
        _unfiltered_diff: dict[ReportKind, str] | None = None,
1220
        tz: ZoneInfo | None = None,
1221
    ) -> dict[ReportKind, str]:
1222
        warnings.warn(
6✔
1223
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1224
            f'change in the future. Please report any problems or suggestions at '
1225
            f'https://github.com/mborsetti/webchanges/discussions.',
1226
            RuntimeWarning,
1227
            stacklevel=1,
1228
        )
1229
        if isinstance(Image, str):  # pragma: no cover
1230
            self.raise_import_error('pillow', Image)
1231
            raise RuntimeError  # for type checker
1232
        if isinstance(httpx, str):  # pragma: no cover
1233
            self.raise_import_error('httpx', httpx)
1234
            raise RuntimeError  # for type checker
1235

1236
        def load_image_from_web(url: str) -> Image.Image:
6✔
1237
            """Fetches the image from an url."""
1238
            logger.debug(f'Retrieving image from {url}')
6✔
1239
            with httpx.stream('GET', url, timeout=10) as response:
6✔
1240
                response.raise_for_status()
6✔
1241
                return Image.open(BytesIO(b''.join(response.iter_bytes())))
6✔
1242

1243
        def load_image_from_file(filename: str) -> Image.Image:
6✔
1244
            """Load an image from a file."""
1245
            logger.debug(f'Reading image from {filename}')
6✔
1246
            return Image.open(filename)
6✔
1247

1248
        def load_image_from_base64(base_64: str) -> Image.Image:
6✔
1249
            """Load an image from an encoded bytes object."""
1250
            logger.debug('Retrieving image from a base64 string')
6✔
1251
            return Image.open(BytesIO(base64.b64decode(base_64)))
6✔
1252

1253
        def load_image_from_ascii85(ascii85: str) -> Image.Image:
6✔
1254
            """Load an image from an encoded bytes object."""
1255
            logger.debug('Retrieving image from an ascii85 string')
6✔
1256
            return Image.open(BytesIO(base64.a85decode(ascii85)))
6✔
1257

1258
        def compute_diff_image(img1: Image.Image, img2: Image.Image) -> tuple[Image.Image, np.float64 | None]:
6✔
1259
            """Compute the difference between two images."""
1260
            # Compute the absolute value of the pixel-by-pixel difference between the two images.
1261
            diff_image = ImageChops.difference(img1, img2)
6✔
1262

1263
            # Compute the mean squared error between the images
1264
            if not isinstance(np, str):
6✔
1265
                diff_array = np.array(diff_image)
6✔
1266
                mse_value = np.mean(np.square(diff_array))
6✔
1267
            else:  # pragma: no cover
1268
                mse_value = None
1269

1270
            # Create the diff image by overlaying this difference on a darkened greyscale background
1271
            back_image = img1.convert('L')
6✔
1272
            back_image_brightness = ImageStat.Stat(back_image).rms[0]
6✔
1273
            back_image = ImageEnhance.Brightness(back_image).enhance(back_image_brightness / 225)
6✔
1274

1275
            # Convert the 'L' image to 'RGB' using a matrix that applies to yellow tint
1276
            # The matrix has 12 elements: 4 for Red, 4 for Green, and 4 for Blue.
1277
            # For yellow, we want Red and Green to copy the L values (1.0) and Blue to be zero.
1278
            # The matrix is: [R, G, B, A] for each of the three output channels
1279
            yellow_tint_matrix = (
6✔
1280
                1.0,
1281
                0.0,
1282
                0.0,
1283
                0.0,  # Red = 100% of the grayscale value
1284
                1.0,
1285
                0.0,
1286
                0.0,
1287
                0.0,  # Green = 100% of the grayscale value
1288
                0.0,
1289
                0.0,
1290
                0.0,
1291
                0.0,  # Blue = 0% of the grayscale value
1292
            )
1293

1294
            # Apply the conversion
1295
            diff_colored = diff_image.convert('RGB').convert('RGB', matrix=yellow_tint_matrix)
6✔
1296

1297
            final_img = ImageChops.add(back_image.convert('RGB'), diff_colored)
6✔
1298
            final_img.format = img2.format
6✔
1299

1300
            return final_img, mse_value
6✔
1301

1302
        def ai_google(
6✔
1303
            old_image: Image.Image,
1304
            new_image: Image.Image,
1305
            diff_image: Image.Image,
1306
            directives: AiGoogleDirectives,
1307
        ) -> tuple[str, str]:
1308
            """Summarize changes in image using Generative AI (ALPHA).  Returns summary and model name."""
1309
            logger.info(f'Job {self.job.index_number}: Running ai_google for {self.__kind__} differ')
×
1310
            warnings.warn(
×
1311
                f'Job {self.job.index_number}: Using differ {self.__kind__} with ai_google, which is ALPHA, '
1312
                f'may have bugs, and may change in the future. Please report any problems or suggestions at '
1313
                f'https://github.com/mborsetti/webchanges/discussions.',
1314
                RuntimeWarning,
1315
                stacklevel=1,
1316
            )
1317

1318
            api_version = '1beta'
×
1319
            # GOOGLE_AI_API_KEY deprecated end of 2025
1320
            gemini_api_key = os.environ.get('GEMINI_API_KEY', '').rstrip()
×
1321
            if not gemini_api_key:
×
1322
                gemini_api_key = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
×
1323
                if gemini_api_key:
×
1324
                    warnings.warn(
×
1325
                        'The environment variable GOOGLE_AI_API_KEY is deprecated; please use GEMINI_API_KEY instead.',
1326
                        DeprecationWarning,
1327
                        stacklevel=1,
1328
                    )
1329
            if len(gemini_api_key) != 39:
×
1330
                logger.error(
×
1331
                    f'Job {self.job.index_number}: Environment variable GEMINI_API_KEY not found or is of the '
1332
                    f'incorrect length {len(gemini_api_key)} ({self.job.get_location()})'
1333
                )
1334
                return (
×
1335
                    f'## ERROR in summarizing changes using Google AI:\n'
1336
                    f'Environment variable GEMINI_API_KEY not found or is of the incorrect length '
1337
                    f'{len(gemini_api_key)}.\n',
1338
                    '',
1339
                )
1340

1341
            def _load_image(img_data: tuple[str, Image.Image]) -> dict[str, dict[str, str] | Exception | str]:
×
1342
                img_name, image = img_data
×
1343
                # Convert image to bytes
1344
                img_byte_arr = BytesIO()
×
1345
                image.save(img_byte_arr, format=image.format)
×
1346
                image_data = img_byte_arr.getvalue()
×
1347
                mime_type = f'image/{image.format.lower()}'  # type: ignore[union-attr]
×
1348

1349
                logger.info(
×
1350
                    f'Job {self.job.index_number}: Loading {img_name} ({image.format}) to Google AI '
1351
                    f'({len(image_data) / 1024:,.0f} kbytes)'
1352
                )
1353

1354
                # Initial resumable upload request
1355
                headers = {
×
1356
                    'X-Goog-Upload-Protocol': 'resumable',
1357
                    'X-Goog-Upload-Command': 'start',
1358
                    'X-Goog-Upload-Header-Content-Length': str(len(image_data)),
1359
                    'X-Goog-Upload-Header-Content-Type': mime_type,
1360
                    'Content-Type': 'application/json',
1361
                }
1362
                data = {'file': {'display_name': 'TEXT'}}
×
1363

1364
                with httpx.Client(http2=h2 is not None, timeout=self.job.timeout) as http_client:
×
1365
                    try:
×
1366
                        response = http_client.post(
×
1367
                            f'https://generativelanguage.googleapis.com/upload/v{api_version}/files?'
1368
                            f'key={gemini_api_key}',
1369
                            headers=headers,
1370
                            json=data,
1371
                        )
1372
                    except httpx.HTTPError as e:
×
1373
                        return {'error': e, 'img_name': img_name}
×
1374
                    upload_url = response.headers['X-Goog-Upload-Url']
×
1375

1376
                    # Upload the image data
1377
                    headers = {
×
1378
                        'Content-Length': str(len(image_data)),
1379
                        'X-Goog-Upload-Offset': '0',
1380
                        'X-Goog-Upload-Command': 'upload, finalize',
1381
                    }
1382
                    try:
×
1383
                        response = http_client.post(upload_url, headers=headers, content=image_data)
×
1384
                    except httpx.HTTPError as e:
×
1385
                        return {'error': e, 'img_name': img_name}
×
1386

1387
                # Extract file URI from response
1388
                file_info = response.json()
×
1389
                file_uri = file_info['file']['uri']
×
1390
                logger.info(f'Job {self.job.index_number}: {img_name.capitalize()} loaded to {file_uri}')
×
1391

1392
                return {
×
1393
                    'file_data': {
1394
                        'mime_type': mime_type,
1395
                        'file_uri': file_uri,
1396
                    }
1397
                }
1398

1399
            # upload to Google
1400
            additional_parts: list[dict[str, dict[str, str]]] = []
×
1401
            executor = ThreadPoolExecutor()
×
1402
            for additional_part in executor.map(
×
1403
                _load_image,
1404
                (
1405
                    ('old image', old_image),
1406
                    ('new image', new_image),
1407
                    # ('differences image', diff_image),
1408
                ),
1409
            ):
1410
                if 'error' not in additional_part:
×
1411
                    additional_parts.append(additional_part)  # type: ignore[arg-type]
×
1412
                else:
1413
                    logger.error(
×
1414
                        f'Job {self.job.index_number}: ai_google for {self.__kind__} HTTP Client error '
1415
                        f'{type(additional_part["error"])} when loading {additional_part["img_name"]} to Google AI: '
1416
                        f'{additional_part["error"]}'
1417
                    )
1418
                    return (
×
1419
                        f'HTTP Client error {type(additional_part["error"])} when loading '
1420
                        f'{additional_part["img_name"]} to Google AI: {additional_part["error"]}',
1421
                        '',
1422
                    )
1423

1424
            # system_instructions = (
1425
            #     'You are a skilled journalist tasked with summarizing the key differences between two versions '
1426
            #     'of the same image. The audience for your summary is already familiar with the image, so you can'
1427
            #     'focus on the most significant changes.'
1428
            # )
1429
            # model_prompt = (
1430
            #     'You are a skilled visual analyst tasked with analyzing two versions of an image and summarizing the '
1431
            #     'key differences between them. The audience for your summary is already familiar with the '
1432
            #     "image's content, so you should focus only on the most significant differences.\n\n"
1433
            #     '**Instructions:**\n\n'
1434
            #     # '1. Carefully examine the yellow areas in the image '
1435
            #     f"{additional_parts[2]['file_data']['file_uri']}, identify the differences, and describe them.\n"
1436
            #     f"2. Refer to the old version of the image {additional_parts[0]['file_data']['file_uri']} and the "
1437
            #     f"new version {additional_parts[1]['file_data']['file_uri']}.\n"
1438
            #     '3. You are only interested in those differences, such as additions, removals, or alterations, that '
1439
            #     'modify the intended message or interpretation.\n'
1440
            #     '4. Summarize the identified differences, except those ignored, in a clear and concise manner, '
1441
            #     'explaining how the meaning has shifted or evolved in the new version compared to the old version '
1442
            #     'only when necessary. Be specific and provide examples to illustrate your points when needed.\n'
1443
            #     '5. If there are only additions to the image, then summarize the additions.\n'
1444
            #     '6. Use Markdown formatting to structure your summary effectively. Use headings, bullet points, '
1445
            #     'and other Markdown elements as needed to enhance readability.\n'
1446
            #     '7. Restrict your analysis and summary to the information provided within these images. Do '
1447
            #     'not introduce external information or assumptions.\n'
1448
            # )
1449
            system_instructions = (
×
1450
                'You are a meticulous visual comparison agent. Your task is to analyze two images: an "old '
1451
                'version" and a "new version". Your entire focus is on identifying and listing the concrete, '
1452
                'factual differences between them.'
1453
            )
1454
            model_prompt = (
×
1455
                '**Instructions:**\n'
1456
                '\n'
1457
                f'1.  **Identify Changes:** Directly compare the "new version" '
1458
                f'{additional_parts[0]["file_data"]["file_uri"]} to the "old version" '
1459
                f'{additional_parts[1]["file_data"]["file_uri"]} and identify all additions, removals, and alterations '
1460
                'of visual elements.\n'
1461
                '\n'
1462
                '2.  **Filter for Significance:** From your initial list of changes, you must filter out any that '
1463
                'are minor or cosmetic. A difference is only significant if it alters the core subject matter or '
1464
                'the main message of the image.\n'
1465
                '    *   **IGNORE:** Minor shifts in layout, small changes in color saturation or brightness, or '
1466
                'other cosmetic adjustments that do not change what the image is depicting.\n'
1467
                '    *   **FOCUS ON:** Tangible changes such as added objects, removed people, or altered text.\n'
1468
                '\n'
1469
                '3.  **Summarize the Differences:**\n'
1470
                '    *   Present the significant differences as a bulleted list under the heading "Summary of '
1471
                'Changes".\n'
1472
                '    *   For each point, state the difference factually and concisely (e.g., "An apple was added '
1473
                "to the table,\" \"The text on the sign was changed from 'Open' to 'Closed'\").\n"
1474
                '    *   Only if a change directly and clearly alters the primary message or interpretation of the '
1475
                'image, you may add a brief, one-sentence explanation of this shift. Do not speculate on deeper '
1476
                'meanings.\n'
1477
                '\n'
1478
                '4.  **No Differences Found:** If you analyze both images and find no significant differences '
1479
                'according to the criteria above, you must respond with only the phrase: "No significant '
1480
                'differences were found between the two images." Do not attempt to find minor differences to report.\n'
1481
                '\n'
1482
                '5.  **Grounding:** Your entire analysis must be based solely on the visual information present in '
1483
                'the two images. Do not make assumptions or introduce any external information.'
1484
            )
1485
            summary, model_version = AIGoogleDiffer._send_to_model(
×
1486
                self.job,
1487
                system_instructions,
1488
                model_prompt,
1489
                additional_parts=additional_parts,  # type: ignore[arg-type]
1490
                directives=directives,
1491
            )
1492

1493
            return summary, model_version
×
1494

1495
        data_type = directives.get('data_type', 'url')
6✔
1496
        mse_threshold = directives.get('mse_threshold', 2.5)
6✔
1497
        if not isinstance(self.state.old_data, str):
6!
1498
            raise ValueError('old_data is not a string')
×
1499
        if not isinstance(self.state.new_data, str):
6!
1500
            raise ValueError('new_data is not a string')
×
1501
        if data_type == 'url':
6✔
1502
            old_image = load_image_from_web(self.state.old_data)
6✔
1503
            new_image = load_image_from_web(self.state.new_data)
6✔
1504
            old_data = f' (<a href="{self.state.old_data}" target="_blank">Old image</a>)'
6✔
1505
            new_data = f' (<a href="{self.state.new_data}" target="_blank">New image</a>)'
6✔
1506
        elif data_type == 'ascii85':
6✔
1507
            old_image = load_image_from_ascii85(self.state.old_data)
6✔
1508
            new_image = load_image_from_ascii85(self.state.new_data)
6✔
1509
            old_data = ''
6✔
1510
            new_data = ''
6✔
1511
        elif data_type == 'base64':
6✔
1512
            old_image = load_image_from_base64(self.state.old_data)
6✔
1513
            new_image = load_image_from_base64(self.state.new_data)
6✔
1514
            old_data = ''
6✔
1515
            new_data = ''
6✔
1516
        else:  # 'filename'
1517
            old_image = load_image_from_file(self.state.old_data)
6✔
1518
            new_image = load_image_from_file(self.state.new_data)
6✔
1519
            old_data = f' (<a href="file://{self.state.old_data}" target="_blank">Old image</a>)'
6✔
1520
            new_data = f' (<a href="file://{self.state.new_data}" target="_blank">New image</a>)'
6✔
1521

1522
        # Check formats  TODO: is it needed? under which circumstances?
1523
        # if new_image.format != old_image.format:
1524
        #     logger.info(f'Image formats do not match: {old_image.format} vs {new_image.format}')
1525
        # else:
1526
        #     logger.debug(f'image format is {old_image.format}')
1527

1528
        # Convert the images to a base64 object for HTML (before shrinking etc.)
1529
        output_stream = BytesIO()
6✔
1530
        old_image.save(output_stream, format=old_image.format)
6✔
1531
        encoded_old = b64encode(output_stream.getvalue()).decode()
6✔
1532
        if data_type == 'url':
6✔
1533
            encoded_new = ''
6✔
1534
        else:
1535
            output_stream = BytesIO()
6✔
1536
            new_image.save(output_stream, format=new_image.format)
6✔
1537
            encoded_new = b64encode(output_stream.getvalue()).decode()
6✔
1538

1539
        # If needed, shrink the larger image
1540
        if new_image.size != old_image.size:
6✔
1541
            if new_image.size > old_image.size:
6✔
1542
                logger.debug(f'Job {self.job.index_number}: Shrinking the new image')
6✔
1543
                img_format = new_image.format
6✔
1544
                new_image = new_image.resize(old_image.size, Image.Resampling.LANCZOS)
6✔
1545
                new_image.format = img_format
6✔
1546

1547
            else:
1548
                logger.debug(f'Job {self.job.index_number}: Shrinking the old image')
6✔
1549
                img_format = old_image.format
6✔
1550
                old_image = old_image.resize(new_image.size, Image.Resampling.LANCZOS)
6✔
1551
                old_image.format = img_format
6✔
1552

1553
        if old_image == new_image:
6✔
1554
            logger.info(f'Job {self.job.index_number}: New image is identical to the old one')
6✔
1555
            self.state.verb = 'unchanged'
6✔
1556
            return {'plain': '', 'markdown': '', 'html': ''}
6✔
1557

1558
        diff_image, mse_value = compute_diff_image(old_image, new_image)
6✔
1559
        if mse_value:
6!
1560
            logger.debug(f'Job {self.job.index_number}: MSE value {mse_value:.2f}')
6✔
1561

1562
        if mse_value and mse_value < mse_threshold:
6✔
1563
            logger.info(
6✔
1564
                f'Job {self.job.index_number}: MSE value {mse_value:.2f} below the threshold of {mse_threshold}; '
1565
                f'considering changes not worthy of a report'
1566
            )
1567
            self.state.verb = 'changed,no_report'
6✔
1568
            return {'plain': '', 'markdown': '', 'html': ''}
6✔
1569

1570
        # prepare AI summary
1571
        summary = ''
6✔
1572
        model_version = ''
6✔
1573
        if 'ai_google' in directives:
6!
1574
            summary, model_version = ai_google(old_image, new_image, diff_image, directives.get('ai_google', {}))
×
1575

1576
        # Prepare HTML output
1577
        htm = [
6✔
1578
            f'<span style="font-family:monospace">'
1579
            # f'Differ: {self.__kind__} for {data_type}',
1580
            f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}{old_data}</span>',
1581
            f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}{new_data}'
1582
            '</span>',
1583
            '</span>',
1584
            'New image:',
1585
        ]
1586
        if data_type == 'url':
6✔
1587
            htm.append(f'<img src="{self.state.new_data}" style="max-width: 100%; display: block;">')
6✔
1588
        else:
1589
            htm.append(
6✔
1590
                f'<img src="data:image/{(new_image.format or "").lower()};base64,{encoded_new}" '
1591
                'style="max-width: 100%; display: block;">'
1592
            )
1593
        # Convert the difference image to a base64 object
1594
        output_stream = BytesIO()
6✔
1595
        diff_image.save(output_stream, format=diff_image.format)
6✔
1596
        encoded_diff = b64encode(output_stream.getvalue()).decode()
6✔
1597
        htm.extend(
6✔
1598
            [
1599
                'Differences from old (in yellow):',
1600
                f'<img src="data:image/{(diff_image.format or "").lower()};base64,{encoded_diff}" '
1601
                'style="max-width: 100%; display: block;">',
1602
                'Old image:',
1603
                f'<img src="data:image/{(old_image.format or "").lower()};base64,{encoded_old}" '
1604
                'style="max-width: 100%; display: block;">',
1605
            ]
1606
        )
1607
        changed_text = 'The image has changed; please see an HTML report for the visualization.'
6✔
1608
        if not summary:
6!
1609
            return {
6✔
1610
                'plain': changed_text,
1611
                'markdown': changed_text,
1612
                'html': '<br>\n'.join(htm),
1613
            }
1614

1615
        newline = '\n'  # For Python < 3.12 f-string {} compatibility
×
1616
        back_n = '\\n'  # For Python < 3.12 f-string {} compatibility
×
1617
        directives_for_str = {key: value for key, value in directives.items() if key != 'model'}
×
1618
        if 'prompt' in directives_for_str:
×
1619
            directives_for_str['prompt'] = '«custom»'
×
1620
        directives_text = (
×
1621
            (
1622
                ' (ai_google directive(s): '
1623
                + ', '.join(f'{key}={str(value).replace(newline, back_n)}' for key, value in directives_for_str.items())
1624
                + ')'
1625
            )
1626
            if directives_for_str
1627
            else ''
1628
        )
1629
        footer = f"Summary by Google Generative AI's model {model_version}{directives_text}."
×
1630
        return {
×
1631
            'plain': (
1632
                f'{summary}\n\n\nA visualization of differences is available in {__package__} HTML reports.'
1633
                f'\n------------\n{footer}'
1634
            ),
1635
            'markdown': (
1636
                f'{summary}\n\n\nA visualization of differences is available in {__package__} HTML reports.'
1637
                f'\n* * *\n{footer}'
1638
            ),
1639
            'html': '<br>\n'.join(
1640
                [
1641
                    mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>'),
1642
                    '',
1643
                    *htm,
1644
                    '-----',
1645
                    f'<i><small>{footer}</small></i>',
1646
                ]
1647
            ),
1648
        }
1649

1650

1651
class AIGoogleDiffer(DifferBase):
15✔
1652
    """(Default) Generates a summary using Google Generative AI (Gemini models).
1653

1654
    Calls Google Gemini APIs; documentation at https://ai.google.dev/api/rest and tutorial at
1655
    https://ai.google.dev/tutorials/rest_quickstart
1656

1657
    """
1658

1659
    __kind__ = 'ai_google'
15✔
1660

1661
    __supported_directives__: dict[str, str] = {
15✔
1662
        'model': ('model name from https://ai.google.dev/gemini-api/docs/models/gemini (default: gemini-2.0-flash)'),
1663
        'system_instructions': (
1664
            'Optional tone and style instructions for the model (default: see documentation at'
1665
            'https://webchanges.readthedocs.io/en/stable/differs.html#ai-google-diff)'
1666
        ),
1667
        'prompt': 'a custom prompt - {unified_diff}, {unified_diff_new}, {old_text} and {new_text} will be replaced',
1668
        'additions_only': 'summarizes only added lines (including as a result of a change)',
1669
        'prompt_ud_context_lines': 'the number of context lines for {unified_diff} (default: 9999)',
1670
        'timeout': 'the number of seconds before timing out the API call (default: 300)',
1671
        'max_output_tokens': "the maximum number of tokens returned by the model (default: None, i.e. model's default)",
1672
        'media_resolution': 'a control of the maximum number of tokens allocated per input image or video frame',
1673
        'temperature': "the model's Temperature parameter (default: 0.0)",
1674
        'thinking_budget': "only for Gemini 2.5: The model's thinking budget",
1675
        'thinking_level': (
1676
            "For Gemini 3, the maximum depth of the model's internal reasoning process before it produces a response"
1677
        ),
1678
        'top_p': "the model's TopP parameter (default: None, i.e. model's default",
1679
        'top_k': "the model's TopK parameter (default: None, i.e. model's default",
1680
        'tools': "data passed on to the API's 'tools' field (default: None)",
1681
        'unified': 'directives passed to the unified differ (default: None)',
1682
    }
1683
    __default_directive__ = 'model'
15✔
1684

1685
    @staticmethod
15✔
1686
    def _send_to_model(
15✔
1687
        job: JobBase,
1688
        system_instructions: str,
1689
        model_prompt: str,
1690
        additional_parts: list[dict[str, str | dict[str, str]]] | None = None,
1691
        directives: AiGoogleDirectives | None = None,
1692
    ) -> tuple[str, str]:
1693
        """Creates the summary request to the model; returns the summary and the version of the actual model used."""
1694
        api_version = '1beta'
×
1695
        if directives is None:
×
1696
            directives = {}
×
1697
        model = directives.get('model', 'gemini-2.0-flash')
×
1698
        timeout = directives.get('timeout', 300)
×
1699
        max_output_tokens = directives.get('max_output_tokens')
×
1700
        temperature = directives.get('temperature', 0.0)
×
1701
        top_p = directives.get('top_p', 1.0 if temperature == 0.0 else None)
×
1702
        top_k = directives.get('top_k')
×
1703
        # GOOGLE_AI_API_KEY deprecated end of 2025
1704
        gemini_api_key = os.environ.get('GEMINI_API_KEY', '').rstrip()
×
1705
        if not gemini_api_key:
×
1706
            gemini_api_key = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
×
1707
            if gemini_api_key:
×
1708
                warnings.warn(
×
1709
                    'The environment variable GOOGLE_AI_API_KEY is deprecated; please use GEMINI_API_KEY instead.',
1710
                    DeprecationWarning,
1711
                    stacklevel=1,
1712
                )
1713
        if len(gemini_api_key) != 39:
×
1714
            logger.error(
×
1715
                f'Job {job.index_number}: Environment variable GEMINI_API_KEY not found or is of the '
1716
                f'incorrect length {len(gemini_api_key)} ({job.get_location()})'
1717
            )
1718
            return (
×
1719
                f'## ERROR in summarizing changes using Google AI:\n'
1720
                f'Environment variable GEMINI_API_KEY not found or is of the incorrect length '
1721
                f'{len(gemini_api_key)}.',
1722
                '',
1723
            )
1724

1725
        data: dict[str, Any] = {
×
1726
            'system_instruction': {'parts': [{'text': system_instructions}]},
1727
            'contents': [{'parts': [{'text': model_prompt}]}],
1728
            'generationConfig': {
1729
                'maxOutputTokens': max_output_tokens,
1730
                'temperature': temperature,
1731
                'topP': top_p,
1732
                'topK': top_k,
1733
            },
1734
        }
1735
        if additional_parts:
×
1736
            data['contents'][0]['parts'].extend(additional_parts)
×
1737
        if directives.get('media_resolution'):
×
1738
            data['contents'][0]['parts'][0]['mediaResolution'] = {'level': directives['media_resolution']}
×
1739
        if directives.get('tools'):
×
1740
            data['tools'] = directives['tools']
×
1741
        if directives.get('thinking_level'):
×
1742
            data['generationConfig'].update({'thinkingConfig': {'thinkingLevel': directives['thinking_level']}})
×
1743
        elif directives.get('thinking_budget'):
×
1744
            data['generationConfig'].update({'thinkingConfig': {'thinkingBudget': directives['thinking_budget']}})
×
1745
        logger.info(f'Job {job.index_number}: Making the content generation request to Google AI model {model}')
×
1746
        model_version = model  # default
×
1747
        with httpx.Client(http2=h2 is not None) as http_client:
×
1748
            try:
×
1749
                r = http_client.post(
×
1750
                    f'https://generativelanguage.googleapis.com/v{api_version}/models/{model}:generateContent?'
1751
                    f'key={gemini_api_key}',
1752
                    json=data,
1753
                    headers={'Content-Type': 'application/json'},
1754
                    timeout=timeout,
1755
                )
1756
            except httpx.HTTPError as e:
×
1757
                summary = (
×
1758
                    f'## ERROR in summarizing changes using Google AI:\n'
1759
                    f'HTTP client error: {e} when requesting data from {e.request.url.host}'
1760
                )
1761
                return summary, model_version
×
1762

1763
        if r.is_success:
×
1764
            result = r.json()
×
1765
            candidate = result['candidates'][0]
×
1766
            finish_reason = candidate['finishReason']
×
1767
            model_version = result['modelVersion']
×
1768
            logger.info(f'Job {job.index_number}: AI generation finished by {finish_reason} using {model_version}')
×
1769
            logger.debug(
×
1770
                f'Job {job.index_number}: Used {result["usageMetadata"]["totalTokenCount"]:,} tokens, '
1771
                f'{result["usageMetadata"]["totalTokenCount"]:,} of which for the prompt.'
1772
            )
1773
            if 'content' in candidate:
×
1774
                if 'parts' in candidate['content']:
×
1775
                    summary: str = candidate['content']['parts'][0]['text'].rstrip()
×
1776
                else:
1777
                    summary = (
×
1778
                        f'## ERROR in summarizing changes using Google AI:\n'
1779
                        f'Model did not return any candidate output:\n'
1780
                        f'finishReason={finish_reason}'
1781
                        f'{jsonlib.dumps(result["usageMetadata"], ensure_ascii=True, indent=2)}'
1782
                    )
1783
            else:
1784
                summary = (
×
1785
                    f'## ERROR in summarizing changes using Google AI:\n'
1786
                    f'Model did not return any candidate output:\n'
1787
                    f'{jsonlib.dumps(result, ensure_ascii=True, indent=2)}'
1788
                )
1789

1790
        elif r.status_code == 400:
×
1791
            summary = (
×
1792
                f'## ERROR in summarizing changes using Google AI:\n'
1793
                f'Received error from {r.url.host}: '
1794
                f'{r.json().get("error", {}).get("message") or ""}'
1795
            )
1796
        else:
1797
            summary = (
×
1798
                f'## ERROR in summarizing changes using Google AI:\n'
1799
                f'Received error {r.status_code} {r.reason_phrase} from '
1800
                f'{r.url.host}'
1801
            )
1802
            if r.content:
×
1803
                summary += f': {r.json().get("error", {}).get("message") or ""}'
×
1804

1805
        return summary, model_version
×
1806

1807
    def differ(
15✔
1808
        self,
1809
        directives: AiGoogleDirectives,
1810
        report_kind: ReportKind,
1811
        _unfiltered_diff: dict[ReportKind, str] | None = None,
1812
        tz: ZoneInfo | None = None,
1813
    ) -> dict[ReportKind, str]:
1814
        logger.info(f'Job {self.job.index_number}: Running the {self.__kind__} differ from hooks.py')
15✔
1815
        # warnings.warn(
1816
        #     f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1817
        #     f'change in the future. Please report any problems or suggestions at '
1818
        #     f'https://github.com/mborsetti/webchanges/discussions.',
1819
        #     RuntimeWarning,
1820
        #     stacklevel=1,
1821
        # )
1822

1823
        def get_ai_summary(prompt: str, system_instructions: str) -> tuple[str, str]:
15✔
1824
            """Generate AI summary from unified diff, or an error message, plus the model version."""
1825
            # GOOGLE_AI_API_KEY deprecated end of 2025
1826
            gemini_api_key = os.environ.get('GEMINI_API_KEY', '').rstrip()
15✔
1827
            if not gemini_api_key:
15✔
1828
                gemini_api_key = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
15✔
1829
                if gemini_api_key:
15!
1830
                    warnings.warn(
×
1831
                        'The environment variable GOOGLE_AI_API_KEY is deprecated; please use GEMINI_API_KEY instead.',
1832
                        DeprecationWarning,
1833
                        stacklevel=1,
1834
                    )
1835
            if len(gemini_api_key) != 39:
15✔
1836
                logger.error(
15✔
1837
                    f'Job {self.job.index_number}: Environment variable GEMINI_API_KEY not found or is of the '
1838
                    f'incorrect length {len(gemini_api_key)} ({self.job.get_location()})'
1839
                )
1840
                return (
15✔
1841
                    f'## ERROR in summarizing changes using Google AI:\n'
1842
                    f'Environment variable GEMINI_API_KEY not found or is of the incorrect length '
1843
                    f'{len(gemini_api_key)}.\n',
1844
                    '',
1845
                )
1846

1847
            if '{unified_diff' in prompt:  # matches unified_diff or unified_diff_new
15!
1848
                default_context_lines = 9999 if '{unified_diff}' in prompt else 0  # none if only unified_diff_new
×
1849
                context_lines = directives.get('prompt_ud_context_lines', default_context_lines)
×
1850
                unified_diff = '\n'.join(
×
1851
                    difflib.unified_diff(
1852
                        str(self.state.old_data).splitlines(),
1853
                        str(self.state.new_data).splitlines(),
1854
                        # '@',
1855
                        # '@',
1856
                        # self.make_timestamp(self.state.old_timestamp, tz),
1857
                        # self.make_timestamp(self.state.new_timestamp, tz),
1858
                        n=context_lines,
1859
                    )
1860
                )
1861
                if not unified_diff:
×
1862
                    # no changes
1863
                    return '', ''
×
1864
            else:
1865
                unified_diff = ''
15✔
1866

1867
            if '{unified_diff_new}' in prompt:
15!
1868
                unified_diff_new_lines = [line[1:] for line in unified_diff.splitlines() if line.startswith('+')]
×
1869
                unified_diff_new = '\n'.join(unified_diff_new_lines)
×
1870
            else:
1871
                unified_diff_new = ''
15✔
1872

1873
            # check if data is different (same data is sent during testing)
1874
            if '{old_text}' in prompt and '{new_text}' in prompt and self.state.old_data == self.state.new_data:
15!
1875
                return '', ''
15✔
1876

1877
            model_prompt = prompt.format(
×
1878
                unified_diff=unified_diff,
1879
                unified_diff_new=unified_diff_new,
1880
                old_text=self.state.old_data,
1881
                new_text=self.state.new_data,
1882
            )
1883

1884
            summary, model_version = self._send_to_model(
×
1885
                self.job,
1886
                system_instructions,
1887
                model_prompt,
1888
                directives=directives,
1889
            )
1890

1891
            return summary, model_version
×
1892

1893
        default_system_instructions = ''
15✔
1894
        if directives.get('additions_only') or self.job.additions_only:
15!
1895
            default_prompt = '\n'.join(
×
1896
                (
1897
                    'You are an expert analyst AI, specializing in the meticulous summarization of change documents. '
1898
                    'Your task is to summarize the provided unified diff in a clear and concise manner with 100% '
1899
                    'fidelity. Restrict your analysis and summary *only* to the diff provided. Do not introduce any '
1900
                    'external information or assumptions.',
1901
                    '',
1902
                    'Format your summary using Markdown. Use headings, bullet points, and other Markdown elements '
1903
                    'where appropriate to create a well-structured and easily readable summary.',
1904
                    '',
1905
                    '{unified_diff_new}',
1906
                )
1907
            )
1908
        else:
1909
            default_prompt = '\n'.join(
15✔
1910
                (
1911
                    'You are an expert analyst AI, specializing in the meticulous comparison of documents. Your task '
1912
                    'is to identify and summarize only the substantive differences between two versions of a text. '
1913
                    'Your audience is already familiar with the original document and needs a concise summary of the '
1914
                    'most significant changes in meaning or information.',
1915
                    '',
1916
                    '**Instructions:**',
1917
                    '',
1918
                    '1.  **Analyze the Texts:** Carefully review the document provided in the `<old_version>` and '
1919
                    '`</old_version>` tags and the one in the `<new_version>` and `</new_version>` tags.',
1920
                    '',
1921
                    '2.  **Identify Substantive Changes:** Compare the two versions to identify all substantive '
1922
                    'changes. A "substantive change" is defined as any modification that alters the core meaning, '
1923
                    'intent, instructions, or factual information presented in the text. This includes, but is not '
1924
                    'limited to:',
1925
                    '*   Additions of new concepts, data, or requirements.',
1926
                    '*   Deletions of existing information, arguments, or clauses.',
1927
                    '*   Alterations to definitions, conclusions, instructions, or key takeaways.',
1928
                    '',
1929
                    '3.  **Exclude Non-Substantive Changes:** You must disregard any changes that are purely cosmetic, '
1930
                    'typographical, or structural and do not alter the substantive meaning of the document. Explicitly '
1931
                    'ignore the following:',
1932
                    '*   Changes in page numbers, section/chapter numbering, or paragraph numbering.',
1933
                    '*   Corrections of spelling, punctuation, or grammatical errors.',
1934
                    '*   Modifications in formatting, layout, or font.',
1935
                    '*   Rewording or rephrasing that does not change the underlying meaning or intent.',
1936
                    '',
1937
                    '4.  **Summarize Material Differences:** Create a summary of the identified substantive changes '
1938
                    'with 100% fidelity. For each change, provide:',
1939
                    '*   A clear heading identifying the relevant section (e.g., "Section 4: User Guidelines" or '
1940
                    '"Chapteron Methodology").',
1941
                    '*   A concise description of the modification, explaining whether it is an addition, deletion, or '
1942
                    'alteration.',
1943
                    '*   A brief analysis of how the change impacts the overall message or instructions, if not '
1944
                    'immediately obvious.',
1945
                    '',
1946
                    '5.  **Output Format:**',
1947
                    '*   Use Markdown for clear and structured presentation (e.g., headings and bullet points).',
1948
                    '*   If no substantive changes are found, state this clearly.',
1949
                    '*   If the changes consist only of additions, summarize the new content.',
1950
                    '',
1951
                    '6.  **Scope Limitation:** Base your analysis strictly on the provided text excerpts. Do not '
1952
                    'infer or introduce any external context or information.',
1953
                    '',
1954
                    '<old_version>',
1955
                    '{old_text}',
1956
                    '</old_version>',
1957
                    '',
1958
                    '<new_version>',
1959
                    '{new_text}',
1960
                    '</new_version>',
1961
                )
1962
            )
1963

1964
        system_instructions = directives.get('system_instructions', default_system_instructions)
15✔
1965
        prompt = directives.get('prompt', default_prompt).replace('\\n', '\n')
15✔
1966
        summary, model_version = get_ai_summary(prompt, system_instructions)
15✔
1967
        if not summary:
15✔
1968
            self.state.verb = 'changed,no_report'
15✔
1969
            return {'plain': '', 'markdown': '', 'html': ''}
15✔
1970
        newline = '\n'  # For Python < 3.12 f-string {} compatibility
15✔
1971
        back_n = '\\n'  # For Python < 3.12 f-string {} compatibility
15✔
1972
        directives_for_str = {key: value for key, value in directives.items() if key != 'model'}
15✔
1973
        if 'prompt' in directives_for_str:
15!
1974
            directives_for_str['prompt'] = '«custom»'
×
1975
        directives_text = (
15✔
1976
            (
1977
                ' (differ directive(s): '
1978
                + ', '.join(f'{key}={str(value).replace(newline, back_n)}' for key, value in directives_for_str.items())
1979
                + ')'
1980
            )
1981
            if directives_for_str
1982
            else ''
1983
        )
1984
        footer = (
15✔
1985
            f"Summary by Google Generative AI's model {model_version}{directives_text}."
1986
            if model_version or directives_text
1987
            else ''
1988
        )
1989
        temp_unfiltered_diff: dict[ReportKind, str] = {}
15✔
1990
        for rep_kind in ('plain', 'html'):  # markdown is same as text
15✔
1991
            unified_report = DifferBase.process(
15✔
1992
                'unified',
1993
                directives.get('unified') or {},
1994
                self.state,
1995
                rep_kind,
1996
                tz,
1997
                temp_unfiltered_diff,
1998
            )
1999
        return {
15✔
2000
            'plain': (f'{summary}\n\n{unified_report["plain"]}' + (f'\n------------\n{footer}' if footer else '')),
2001
            'markdown': (f'{summary}\n\n{unified_report["markdown"]}' + (f'\n* * *\n{footer}' if footer else '')),
2002
            'html': '\n'.join(
2003
                [
2004
                    mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>'),
2005
                    '<br>',
2006
                    '<br>',
2007
                    unified_report['html'],
2008
                ]
2009
                + (['-----<br>', f'<i><small>{footer}</small></i>'] if footer else [])
2010
            ),
2011
        }
2012

2013

2014
class WdiffDiffer(DifferBase):
15✔
2015
    __kind__ = 'wdiff'
15✔
2016

2017
    __supported_directives__: dict[str, str] = {
15✔
2018
        'context_lines': 'the number of context lines (default: 3)',
2019
        'range_info': 'include range information lines (default: true)',
2020
        'color': 'colorize text output (default: true)',
2021
    }
2022

2023
    @staticmethod
15✔
2024
    def tokenize_markdown(markdown_string: str) -> list[str]:
15✔
2025
        # Escape spaces inside brackets to prevent them being split
2026
        string = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', markdown_string)
15✔
2027
        # Use split with capturing group to keep the whitespace
2028
        tokens = re.split(r'(\s+)', string)
15✔
2029
        return [t.replace('\n', '<\\n>') for t in tokens if t]
15✔
2030
        # # Split by tags, keeping the tags in the resulting list
2031
        # string = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', markdown_string)
2032
        # string = string.replace('\n', ' <\\n> ')
2033
        # return string.split(' ')
2034

2035
    @staticmethod
15✔
2036
    def tokenize_html(html_string: str) -> list[str]:
15✔
2037
        # Split by tags, keeping the tags in the resulting list
2038
        parts = re.split(r'(<[^>]+>)', html_string)
15✔
2039

2040
        tokens = []
15✔
2041
        for part in parts:
15✔
2042
            if not part:
15✔
2043
                continue
15✔
2044
            if part.startswith('<') and part.endswith('>'):
15✔
2045
                # Keep tags as a single token
2046
                tokens.append(part)
15✔
2047
            else:
2048
                # Split text content by whitespace
2049
                tokens.extend(re.split(r'(\s+)', part))
15✔
2050

2051
        return [t.replace('\n', '<\\n>') for t in tokens if t]
15✔
2052

2053
    @staticmethod
15✔
2054
    def tokenize_plain(plain_string: str) -> list[str]:
15✔
2055
        # Split by tags, keeping the tags in the resulting list
2056
        tokens = re.split(r'(\s+)', plain_string)
15✔
2057
        return [t.replace('\n', '<\\n>') for t in tokens if t]
15✔
2058

2059
    def differ(
15✔
2060
        self,
2061
        directives: dict[str, Any],
2062
        report_kind: ReportKind,
2063
        _unfiltered_diff: dict[ReportKind, str] | None = None,
2064
        tz: ZoneInfo | None = None,
2065
    ) -> dict[ReportKind, str]:
2066
        if not isinstance(self.state.old_data, str):
15!
2067
            raise ValueError("The differ 'wdiff' accepts strings only as input")
×
2068
        if not isinstance(self.state.new_data, str):
15!
2069
            raise ValueError
×
2070

2071
        # Split the texts into words tokenizing newline
2072
        if 'markdown' in self.state.old_mime_type:
15✔
2073
            a = self.tokenize_markdown(self.state.old_data)
15✔
2074
        elif 'html' in self.state.old_mime_type:
15✔
2075
            a = self.tokenize_html(self.state.old_data)
15✔
2076
        else:
2077
            a = self.tokenize_plain(self.state.old_data)
15✔
2078
        if 'markdown' in self.state.new_mime_type:
15✔
2079
            b = self.tokenize_markdown(self.state.new_data)
15✔
2080
        elif 'html' in self.state.new_mime_type:
15✔
2081
            b = self.tokenize_html(self.state.new_data)
15✔
2082
        else:
2083
            b = self.tokenize_plain(self.state.new_data)
15✔
2084

2085
        # Create a Differ object
2086
        import difflib
15✔
2087

2088
        # Generate a difference list
2089
        diff = difflib.SequenceMatcher(a=a, b=b, autojunk=False)
15✔
2090

2091
        if directives.get('color', True):
15✔
2092
            text_rem = '\033[91m'
15✔
2093
            text_rem_end = '\033[0m '
15✔
2094
            text_add = '\033[92m'
15✔
2095
            text_add_end = '\033[0m'
15✔
2096
        else:
2097
            text_rem = '[-'
15✔
2098
            text_rem_end = '-] '
15✔
2099
            text_add = '{+'
15✔
2100
            text_add_end = '+}'
15✔
2101

2102
        html_rem = '<span style="background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;">'
15✔
2103
        html_rem_end = '</span> '
15✔
2104
        html_add = '<span style="background-color:#d1ffd1;color:#082b08;">'
15✔
2105
        html_add_end = '</span>'
15✔
2106

2107
        head_text = '\n'.join(
15✔
2108
            [
2109
                # f'Differ: wdiff',
2110
                f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m',
2111
                f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m',
2112
                '',
2113
            ]
2114
        )
2115
        head_html = '<br>\n'.join(
15✔
2116
            [
2117
                '<span style="font-family:monospace;">'
2118
                # 'Differ: wdiff',
2119
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
2120
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>'
2121
                f'</span>',
2122
                '',
2123
            ]
2124
        )
2125

2126
        result_text = []
15✔
2127
        result_html = []
15✔
2128

2129
        def append_chunk(tokens: list[str], text_start: str, text_end: str, html_start: str, html_end: str) -> None:
15✔
2130
            # We use an empty string join because tokens now include their own spaces
2131
            segment = ''.join(tokens)
15✔
2132

2133
            # Handle the newline marker replacement without adding extra spaces
2134
            parts = segment.split('<\\n>')
15✔
2135
            for i, part in enumerate(parts):
15✔
2136
                if part:
15✔
2137
                    result_text.append(f'{text_start}{part}{text_end}')
15✔
2138
                    result_html.append(f'{html_start}{part}{html_end}')
15✔
2139

2140
                # If not the last element, it means there was a <\n>
2141
                if i < len(parts) - 1:
15✔
2142
                    result_text.append('\n')
15✔
2143
                    result_html.append('\n')
15✔
2144

2145
        # def append_chunk(tokens: list[str], ansi_color: str | None, html_start: str | None) -> None:
2146
        #     """
2147
        #     Appends tokens to results. If style is provided, wraps tokens in style.
2148
        #     Breaks style application at newlines (<\\n>) to maintain clean output formatting.
2149
        #     """
2150
        #     current_line_tokens = []
2151

2152
        #     def flush() -> None:
2153
        #         if current_line_tokens:
2154
        #             # Join tokens with space for this segment
2155
        #             segment = ' '.join(current_line_tokens)
2156

2157
        #             if ansi_color:
2158
        #                 result_text.append(f'{ansi_color}{segment}{ansi_end}')
2159
        #                 result_html.append(f'{html_start}{segment}{html_end}')
2160
        #             else:
2161
        #                 result_text.append(segment)
2162
        #                 result_html.append(segment)
2163
        #             current_line_tokens.clear()
2164

2165
        #     for token in tokens:
2166
        #         if token == '<\\n>':
2167
        #             flush()
2168
        #             # Append newline marker without styling
2169
        #             result_text.append('<\\n>')
2170
        #             result_html.append('<\\n>')
2171
        #         else:
2172
        #             current_line_tokens.append(token)
2173
        #     flush()
2174

2175
        # Process the diff opcodes
2176
        for tag, i1, i2, j1, j2 in diff.get_opcodes():
15✔
2177
            """
15✔
2178
            The tags are strings, with these meanings:
2179

2180
            'replace':  a[i1:i2] should be replaced by b[j1:j2]
2181
            'delete':   a[i1:i2] should be deleted.
2182
                        Note that j1==j2 in this case.
2183
            'insert':   b[j1:j2] should be inserted at a[i1:i1].
2184
                        Note that i1==i2 in this case.
2185
            'equal':    a[i1:i2] == b[j1:j2]
2186
            """
2187
            if tag == 'equal':
15✔
2188
                append_chunk(a[i1:i2], '', '', '', '')
15✔
2189
            elif tag == 'delete':
15✔
2190
                append_chunk(a[i1:i2], text_rem, text_rem_end, html_rem, html_rem_end)
15✔
2191
            elif tag == 'insert':
15✔
2192
                append_chunk(b[j1:j2], text_add, text_add_end, html_add, html_add_end)
15✔
2193
            elif tag == 'replace':
15!
2194
                append_chunk(a[i1:i2], text_rem, text_rem_end, html_rem, html_rem_end)
15✔
2195
                append_chunk(b[j1:j2], text_add, text_add_end, html_add, html_add_end)
15✔
2196

2197
        # rebuild the text from words, replacing the newline token
2198
        diff_text = ''.join(result_text).rstrip(' ')
15✔
2199
        diff_html = ''.join(result_html).rstrip(' ')
15✔
2200

2201
        # d = difflib.Differ()
2202

2203
        # # Generate a difference list
2204
        # diff = list(d.compare(old_tokens, new_tokens))
2205

2206
        # add_html = '<span style="background-color:#d1ffd1;color:#082b08;">'
2207
        # rem_html = '<span style="background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;">'
2208

2209
        # head_text = '\n'.join(
2210
        #     [
2211
        #         # f'Differ: wdiff',
2212
        #         f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m',
2213
        #         f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m',
2214
        #         '',
2215
        #     ]
2216
        # )
2217
        # head_html = '<br>\n'.join(
2218
        #     [
2219
        #         '<span style="font-family:monospace;">'
2220
        #         # 'Differ: wdiff',
2221
        #         f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
2222
        #         f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>'
2223
        #         f'</span>',
2224
        #         '',
2225
        #     ]
2226
        # )
2227
        # # Process the diff output to make it more wdiff-like
2228
        # result_text = []
2229
        # result_html = []
2230
        # prev_word_text = ''
2231
        # prev_word_html = ''
2232
        # next_text = ''
2233
        # next_html = ''
2234
        # add = False
2235
        # rem = False
2236

2237
        # for word_text in [*diff, '  ']:
2238
        #     if word_text[0] == '?':  # additional context line
2239
        #         continue
2240
        #     word_html = word_text
2241
        #     pre_text = [next_text] if next_text else []
2242
        #     pre_html = [next_html] if next_html else []
2243
        #     next_text = ''
2244
        #     next_html = ''
2245

2246
        #     if word_text[0] == '+' and not add:  # Beginning of additions
2247
        #         if rem:
2248
        #             prev_word_html += '</span>'
2249
        #             rem = False
2250
        #         if word_text[2:] == '<\\n>':
2251
        #             next_text = '\033[92m'
2252
        #             next_html = add_html
2253
        #         else:
2254
        #             pre_text.append('\033[92m')
2255
        #             pre_html.append(add_html)
2256
        #         add = True
2257
        #     elif word_text[0] == '-' and not rem:  # Beginning of deletions
2258
        #         if add:
2259
        #             prev_word_html += '</span>'
2260
        #             add = False
2261
        #         if word_text[2:] == '<\\n>':
2262
        #             next_text = '\033[91m'
2263
        #             next_html = rem_html
2264
        #         else:
2265
        #             pre_text.append('\033[91m')
2266
        #             pre_html.append(rem_html)
2267
        #         rem = True
2268
        #     elif word_text[0] == ' ' and (add or rem):  # Unchanged word
2269
        #         if prev_word_text == '<\\n>':
2270
        #             prev_word_text = '\033[0m<\\n>'
2271
        #             prev_word_html = '</span><\\n>'
2272
        #         else:
2273
        #             prev_word_text += '\033[0m'
2274
        #             prev_word_html += '</span>'
2275
        #         add = False
2276
        #         rem = False
2277
        #     elif word_text[2:] == '<\\n>':  # New line
2278
        #         if add:
2279
        #             word_text = '  \033[0m<\\n>'
2280
        #             word_html = '  </span><\\n>'
2281
        #             add = False
2282
        #         elif rem:
2283
        #             word_text = '  \033[0m<\\n>'
2284
        #             word_html = '  </span><\\n>'
2285
        #             rem = False
2286

2287
        #     result_text.append(prev_word_text)
2288
        #     result_html.append(prev_word_html)
2289
        #     pre_text.append(word_text[2:])
2290
        #     pre_html.append(word_html[2:])
2291
        #     prev_word_text = ''.join(pre_text)
2292
        #     prev_word_html = ''.join(pre_html)
2293
        # if add or rem:
2294
        #     result_text[-1] += '\033[0m'
2295
        #     result_html[-1] += '</span>'
2296

2297
        # # rebuild the text from words, replacing the newline token
2298
        # diff_text = ' '.join(result_text[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
2299
        # diff_html = ' '.join(result_html[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
2300

2301
        # build contextlines
2302
        contextlines = directives.get('context_lines', self.job.contextlines)
15✔
2303
        # contextlines = 999
2304
        if contextlines is None:
15!
2305
            contextlines = 3
15✔
2306
        range_info = directives.get('range_info', True)
15✔
2307
        if contextlines < len(diff_text.splitlines()):
15!
2308
            lines_with_changes = []
×
2309
            for i, line in enumerate(diff_text.splitlines()):
×
2310
                if '\033[9' in line:
×
2311
                    lines_with_changes.append(i)
×
2312
            if contextlines:
×
2313
                lines_to_keep: set[int] = set()
×
2314
                for i in lines_with_changes:
×
2315
                    lines_to_keep.update(r for r in range(i - contextlines, i + contextlines + 1))
×
2316
            else:
2317
                lines_to_keep = set(lines_with_changes)
×
2318
            new_diff_text = []
×
2319
            new_diff_html = []
×
2320
            last_line = 0
×
2321
            skip = False
×
2322
            i = 0
×
2323
            for i, (line_text, line_html) in enumerate(
×
2324
                zip(diff_text.splitlines(), diff_html.splitlines(), strict=False)
2325
            ):
2326
                if i in lines_to_keep:
×
2327
                    if range_info and skip:
×
2328
                        new_diff_text.append(f'@@ {last_line + 1}...{i} @@')
×
2329
                        new_diff_html.append(f'@@ {last_line + 1}...{i} @@')
×
2330
                        skip = False
×
2331
                    new_diff_text.append(line_text)
×
2332
                    new_diff_html.append(line_html)
×
2333
                    last_line = i + 1
×
2334
                else:
2335
                    skip = True
×
2336
            if (i + 1) != last_line and range_info and skip:
×
2337
                new_diff_text.append(f'@@ {last_line + 1}...{i + 1} @@')
×
2338
                new_diff_html.append(f'@@ {last_line + 1}...{i + 1} @@')
×
2339
            diff_text = '\n'.join(new_diff_text)
×
2340
            diff_html = '\n'.join(new_diff_html)
×
2341

2342
        if self.state.is_markdown():
15✔
2343
            diff_text = diff_text.replace('</s>', ' ')
15✔
2344
            diff_html = diff_html.replace('</s>', ' ')
15✔
2345
            diff_html = mark_to_html(diff_html, self.job.markdown_padded_tables).replace('<p>', '').replace('</p>', '')
15✔
2346

2347
        if self.job.monospace:
15!
2348
            diff_html = f'<span style="font-family:monospace;white-space:pre-wrap">{diff_html}</span>'
×
2349
        else:
2350
            diff_html = diff_html.replace('\n', '<br>\n')
15✔
2351

2352
        return {
15✔
2353
            'plain': head_text + diff_text,
2354
            'markdown': head_text + diff_text,
2355
            'html': head_html + diff_html,
2356
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc