• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 13779290754

11 Mar 2025 02:50AM UTC coverage: 75.46%. First build
13779290754

push

github

mborsetti
3.28.2rc0

1738 of 2629 branches covered (66.11%)

Branch coverage included in aggregate %.

18 of 36 new or added lines in 7 files covered. (50.0%)

4572 of 5733 relevant lines covered (79.75%)

6.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.81
/webchanges/differs.py
1
"""Differs."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import base64
8✔
8
import difflib
8✔
9
import html
8✔
10
import io
8✔
11
import logging
8✔
12
import math
8✔
13
import os
8✔
14
import re
8✔
15
import shlex
8✔
16
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
17
import tempfile
8✔
18
import traceback
8✔
19
import urllib.parse
8✔
20
import warnings
8✔
21
from base64 import b64encode
8✔
22
from concurrent.futures import ThreadPoolExecutor
8✔
23
from datetime import datetime
8✔
24
from io import BytesIO
8✔
25
from pathlib import Path
8✔
26
from typing import Any, Iterator, Literal, TYPE_CHECKING, TypedDict
8✔
27
from zoneinfo import ZoneInfo
8✔
28

29
import html2text
8✔
30

31
from webchanges.jobs import JobBase
8✔
32
from webchanges.util import linkify, mark_to_html, TrackSubClasses
8✔
33

34
try:
8✔
35
    from deepdiff import DeepDiff
8✔
36
    from deepdiff.model import DiffLevel
8✔
37
except ImportError as e:  # pragma: no cover
38
    DeepDiff = str(e)  # type: ignore[assignment,misc]
39

40
try:
8✔
41
    import httpx
8✔
42
except ImportError:  # pragma: no cover
43
    httpx = None  # type: ignore[assignment]
44
if httpx is not None:
8!
45
    try:
8✔
46
        import h2
8✔
47
    except ImportError:  # pragma: no cover
48
        h2 = None  # type: ignore[assignment]
49

50
try:
8✔
51
    import numpy as np
8✔
52
except ImportError as e:  # pragma: no cover
53
    np = str(e)  # type: ignore[assignment]
54

55
try:
8✔
56
    from PIL import Image, ImageChops, ImageEnhance, ImageStat
8✔
57
except ImportError as e:  # pragma: no cover
58
    Image = str(e)  # type: ignore[assignment]
59

60
# https://stackoverflow.com/questions/712791
61
try:
8✔
62
    import simplejson as jsonlib
8✔
63
except ImportError:  # pragma: no cover
64
    import json as jsonlib  # type: ignore[no-redef]
65

66
try:
8✔
67
    import xmltodict
8✔
68
except ImportError as e:  # pragma: no cover
69
    xmltodict = str(e)  # type: ignore[no-redef]
70

71
# https://stackoverflow.com/questions/39740632
72
if TYPE_CHECKING:
73
    from webchanges.handler import JobState
74
    from webchanges.storage import _Config
75

76

77
logger = logging.getLogger(__name__)
8✔
78

79
AiGoogleDirectives = TypedDict(
8✔
80
    'AiGoogleDirectives',
81
    {
82
        'model': str,
83
        'additions_only': str,
84
        'system_instructions': str,
85
        'prompt': str,
86
        'prompt_ud_context_lines': int,
87
        'timeout': int,
88
        'max_output_tokens': int | None,
89
        'temperature': float | None,
90
        'top_p': float | None,
91
        'top_k': float | None,
92
        'tools': list[Any],
93
    },
94
    total=False,
95
)
96

97

98
class DifferBase(metaclass=TrackSubClasses):
8✔
99
    """The base class for differs."""
100

101
    __subclasses__: dict[str, type[DifferBase]] = {}
8✔
102
    __anonymous_subclasses__: list[type[DifferBase]] = []
8✔
103

104
    __kind__: str = ''
8✔
105

106
    __supported_directives__: dict[str, str] = {}  # this must be present, even if empty
8✔
107

108
    css_added_style = 'background-color:#d1ffd1;color:#082b08;'
8✔
109
    css_deltd_style = 'background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;'
8✔
110

111
    def __init__(self, state: JobState) -> None:
8✔
112
        """
113

114
        :param state: the JobState.
115
        """
116
        self.job = state.job
8✔
117
        self.state = state
8✔
118

119
    @classmethod
8✔
120
    def differ_documentation(cls) -> str:
8✔
121
        """Generates simple differ documentation for use in the --features command line argument.
122

123
        :returns: A string to display.
124
        """
125
        result: list[str] = []
8✔
126
        for sc in TrackSubClasses.sorted_by_kind(cls):
8✔
127
            # default_directive = getattr(sc, '__default_directive__', None)
128
            result.extend((f'  * {sc.__kind__} - {sc.__doc__}',))
8✔
129
            if hasattr(sc, '__supported_directives__'):
8!
130
                for key, doc in sc.__supported_directives__.items():
8✔
131
                    result.append(f'      {key} ... {doc}')
8✔
132
        result.append('\n[] ... Parameter can be supplied as unnamed value\n')
8✔
133
        return '\n'.join(result)
8✔
134

135
    @classmethod
8✔
136
    def normalize_differ(
8✔
137
        cls,
138
        differ_spec: dict[str, Any] | None,
139
        job_index_number: int | None = None,
140
        config: _Config | None = None,
141
    ) -> tuple[str, dict[str, Any]]:
142
        """Checks the differ_spec for its validity and applies default values.
143

144
        :param differ_spec: The differ as entered by the user; use "unified" if empty.
145
        :param job_index_number: The job index number.
146
        :returns: A validated differ_kind, directives tuple.
147
        """
148

149
        def directives_with_defaults(
8✔
150
            differ_spec: str, directives: dict[str, Any], config: _Config | None = None
151
        ) -> dict[str, Any]:
152
            """Obtain differ subdirectives that also contains defaults from the configuration.
153

154
            :param differ_kind: The differ kind.
155
            :param directives: The differ directives as stated in the job.
156
            :returns: directives inclusive of configuration defaults.
157
            """
158
            if config is None:
8!
159
                logger.error('Cannot merge differ differdirectives with defaults as no config object was passed')
8✔
160
                return directives
8✔
161
            cfg = config.get('differ_defaults')
×
162
            if isinstance(cfg, dict):
×
163
                defaults: dict[str, Any] = cfg.get(differ_spec)  # type: ignore[assignment]
×
164
                if defaults:
×
165
                    for key, value in defaults.items():
×
166
                        if key not in directives:
×
167
                            directives[key] = value
×
168
            return directives
×
169

170
        differ_spec = differ_spec or {'name': 'unified'}
8✔
171
        directives = differ_spec.copy()
8✔
172
        differ_kind = directives.pop('name', '')
8✔
173
        if not differ_kind:
8✔
174
            if list(directives.keys()) == ['command']:
8!
175
                differ_kind = 'command'
8✔
176
            else:
177
                raise ValueError(
×
178
                    f"Job {job_index_number}: Differ directive must have a 'name' sub-directive: {differ_spec}."
179
                )
180

181
        differcls: DifferBase | None = cls.__subclasses__.get(differ_kind, None)  # type: ignore[assignment]
8✔
182
        if not differcls:
8✔
183
            raise ValueError(f'Job {job_index_number}: No differ named {differ_kind}.')
8✔
184

185
        if directives:
8✔
186
            directives = directives_with_defaults(differ_kind, directives, config)
8✔
187

188
        if hasattr(differcls, '__supported_directives__'):
8!
189
            provided_keys = set(directives.keys())
8✔
190
            allowed_keys = set(differcls.__supported_directives__.keys())
8✔
191
            unknown_keys = provided_keys.difference(allowed_keys)
8✔
192
            if unknown_keys and '<any>' not in allowed_keys:
8✔
193
                raise ValueError(
8✔
194
                    f'Job {job_index_number}: Differ {differ_kind} does not support sub-directive(s) '
195
                    f"{', '.join(unknown_keys)} (supported: {', '.join(sorted(allowed_keys))})."
196
                )
197

198
        return differ_kind, directives
8✔
199

200
    @classmethod
8✔
201
    def process(
8✔
202
        cls,
203
        differ_kind: str,
204
        directives: dict[str, Any],
205
        job_state: JobState,
206
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
207
        tz: ZoneInfo | None = None,
208
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
209
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
210
        """Process the differ.
211

212
        :param differ_kind: The name of the differ.
213
        :param directives: The directives.
214
        :param job_state: The JobState.
215
        :param report_kind: The report kind required.
216
        :param tz: The timezone of the report.
217
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
218
           for a different report_kind.
219
        :returns: The output of the differ or an error message with traceback if it fails.
220
        """
221
        logger.info(f'Job {job_state.job.index_number}: Applying differ {differ_kind}, directives {directives}')
8✔
222
        differcls: type[DifferBase] | None = cls.__subclasses__.get(differ_kind)  # type: ignore[assignment]
8✔
223
        if differcls:
8✔
224
            try:
8✔
225
                return differcls(job_state).differ(directives, report_kind, _unfiltered_diff, tz)
8✔
226
            except Exception as e:
8✔
227
                # Differ failed
228
                logger.info(
8✔
229
                    f'Job {job_state.job.index_number}: Differ {differ_kind} with {directives=} encountered '
230
                    f'error {e}'
231
                )
232
                # Undo saving of new data since user won't see the diff
233
                job_state.delete_latest()
8✔
234

235
                job_state.exception = e
8✔
236
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
237
                directives_text = ', '.join(f'{key}={value}' for key, value in directives.items()) or 'None'
8✔
238
                return {
8✔
239
                    'text': (
240
                        f'Differ {differ_kind} with directive(s) {directives_text} encountered an '
241
                        f'error:\n\n{job_state.traceback}'
242
                    ),
243
                    'markdown': (
244
                        f'## Differ {differ_kind} with directive(s) {directives_text} '
245
                        f'encountered an error:\n```\n{job_state.traceback}\n```\n'
246
                    ),
247
                    'html': (
248
                        f'<span style="color:red;font-weight:bold">Differ {differ_kind} with directive(s) '
249
                        f'{directives_text} encountered an error:<br>\n<br>\n'
250
                        f'<span style="font-family:monospace;white-space:pre-wrap;">{job_state.traceback}'
251
                        f'</span></span>'
252
                    ),
253
                }
254
        else:
255
            return {}
8✔
256

257
    def differ(
8✔
258
        self,
259
        directives: dict[str, Any],
260
        report_kind: Literal['text', 'markdown', 'html'],
261
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
262
        tz: ZoneInfo | None = None,
263
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
264
        """Create a diff from the data. Since this function could be called by different reporters of multiple report
265
        types ('text', 'markdown', 'html'), the differ outputs a dict with data for the report_kind it generated so
266
        that it can be reused.
267

268
        :param directives: The directives.
269
        :param report_kind: The report_kind for which a diff must be generated (at a minimum).
270
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
271
           for a different report_kind.
272
        :param tz: The timezone of the report.
273
        :returns: An empty dict if there is no change, otherwise a dict with report_kind as key and diff as value
274
           (as a minimum for the report_kind requested).
275
        :raises RuntimeError: If the external diff tool returns an error.
276
        """
277
        raise NotImplementedError()
278

279
    @staticmethod
8✔
280
    def make_timestamp(
8✔
281
        timestamp: float,
282
        tz: ZoneInfo | None = None,
283
    ) -> str:
284
        """Creates a datetime string in RFC 5322 (email) format with the time zone name (if available) in the
285
        Comments and Folding White Space (CFWS) section.
286

287
        :param timestamp: The timestamp.
288
        :param tz: The IANA timezone of the report.
289
        :returns: A datetime string in RFC 5322 (email) format.
290
        """
291
        if timestamp:
8✔
292
            dt = datetime.fromtimestamp(timestamp).astimezone(tz=tz)
8✔
293
            # add timezone name if known
294
            if dt.strftime('%Z') != dt.strftime('%z')[:3]:
8✔
295
                cfws = f" ({dt.strftime('%Z')})"
8✔
296
            else:
297
                cfws = ''
8✔
298
            return dt.strftime('%a, %d %b %Y %H:%M:%S %z') + cfws
8✔
299
        else:
300
            return 'NEW'
8✔
301

302
    @staticmethod
8✔
303
    def html2text(data: str) -> str:
8✔
304
        """Converts html to text.
305

306
        :param data: the string in html format.
307
        :returns: the string in text format.
308
        """
309
        parser = html2text.HTML2Text()
8✔
310
        parser.unicode_snob = True
8✔
311
        parser.body_width = 0
8✔
312
        parser.ignore_images = True
8✔
313
        parser.single_line_break = True
8✔
314
        parser.wrap_links = False
8✔
315
        return '\n'.join(line.rstrip() for line in parser.handle(data).splitlines())
8✔
316

317
    def raise_import_error(self, package_name: str, error_message: str) -> None:
8✔
318
        """Raise ImportError for missing package.
319

320
        :param package_name: The name of the module/package that could not be imported.
321
        :param error_message: The error message from ImportError.
322

323
        :raises: ImportError.
324
        """
325
        raise ImportError(
8✔
326
            f"Job {self.job.index_number}: Python package '{package_name}' is not installed; cannot use "
327
            f"'differ: {self.__kind__}' ({self.job.get_location()})\n{error_message}"
328
        )
329

330

331
class UnifiedDiffer(DifferBase):
8✔
332
    """(Default) Generates a unified diff."""
333

334
    __kind__ = 'unified'
8✔
335

336
    __supported_directives__ = {
8✔
337
        'context_lines': 'the number of context lines (default: 3)',
338
        'range_info': 'include range information lines (default: true)',
339
        'additions_only': 'keep only addition lines (default: false)',
340
        'deletions_only': 'keep only deletion lines (default: false)',
341
    }
342

343
    def unified_diff_to_html(self, diff: str) -> Iterator[str]:
8✔
344
        """
345
        Generates a colorized HTML table from unified diff, applying styles and processing based on job values.
346

347
        :param diff: the unified diff
348
        """
349

350
        def process_line(line: str, line_num: int, is_markdown: bool, monospace_style: str) -> str:
8✔
351
            """
352
            Processes each line for HTML output, handling special cases and styles.
353

354
            :param line: The line to analyze.
355
            :param line_num: The line number in the document.
356
            :param monospace_style: Additional style string for monospace text.
357

358
            :returns: The line processed into an HTML table row string.
359
            """
360
            # The style= string (or empty string) to add to an HTML tag.
361
            if line_num == 0:
8✔
362
                style = 'font-family:monospace;color:darkred;'
8✔
363
            elif line_num == 1:
8✔
364
                style = 'font-family:monospace;color:darkgreen;'
8✔
365
            elif line[0] == '+':  # addition
8✔
366
                style = f'{monospace_style}{self.css_added_style}'
8✔
367
            elif line[0] == '-':  # deletion
8✔
368
                style = f'{monospace_style}{self.css_deltd_style}'
8✔
369
            elif line[0] == ' ':  # context line
8✔
370
                style = monospace_style
8✔
371
            elif line[0] == '@':  # range information
8✔
372
                style = 'font-family:monospace;background-color:#fbfbfb;'
8✔
373
            elif line[0] == '/':  # informational header added by additions_only or deletions_only filters
8!
374
                style = 'background-color:lightyellow;'
8✔
375
            else:
376
                raise RuntimeError('Unified Diff does not comform to standard!')
×
377
            style = f' style="{style}"' if style else ''
8✔
378

379
            if line_num > 1 and line[0] != '@':  # don't apply to headers or range information
8✔
380
                if is_markdown or line[0] == '/':  # our informational header
8✔
381
                    line = mark_to_html(line[1:], self.job.markdown_padded_tables)
8✔
382
                else:
383
                    line = linkify(line[1:])
8✔
384
            return f'<tr><td{style}>{line}</td></tr>'
8✔
385

386
        table_style = (
8✔
387
            ' style="border-collapse:collapse;font-family:monospace;white-space:pre-wrap;"'
388
            if self.job.monospace
389
            else ' style="border-collapse:collapse;"'
390
        )
391
        yield f'<table{table_style}>'
8✔
392
        is_markdown = self.state.is_markdown()
8✔
393
        monospace_style = 'font-family:monospace;' if self.job.monospace else ''
8✔
394
        for i, line in enumerate(diff.splitlines()):
8✔
395
            yield process_line(line, i, is_markdown, monospace_style)
8✔
396
        yield '</table>'
8✔
397

398
    def differ(
8✔
399
        self,
400
        directives: dict[str, Any],
401
        report_kind: Literal['text', 'markdown', 'html'],
402
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
403
        tz: ZoneInfo | None = None,
404
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
405
        additions_only = directives.get('additions_only') or self.job.additions_only
8✔
406
        deletions_only = directives.get('deletions_only') or self.job.deletions_only
8✔
407
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
408
        if report_kind == 'html' and _unfiltered_diff is not None and 'text' in _unfiltered_diff:
8✔
409
            diff_text = _unfiltered_diff['text']
8✔
410
        else:
411
            empty_return: dict[Literal['text', 'markdown', 'html'], str] = {'text': '', 'markdown': '', 'html': ''}
8✔
412
            contextlines = directives.get('context_lines', self.job.contextlines)
8✔
413
            if contextlines is None:
8✔
414
                if additions_only or deletions_only:
8✔
415
                    contextlines = 0
8✔
416
                else:
417
                    contextlines = 3
8✔
418
            diff = list(
8✔
419
                difflib.unified_diff(
420
                    str(self.state.old_data).splitlines(),
421
                    str(self.state.new_data).splitlines(),
422
                    '@',
423
                    '@',
424
                    self.make_timestamp(self.state.old_timestamp, tz),
425
                    self.make_timestamp(self.state.new_timestamp, tz),
426
                    contextlines,
427
                    lineterm='',
428
                )
429
            )
430
            if not diff:
8✔
431
                self.state.verb = 'changed,no_report'
8✔
432
                return empty_return
8✔
433
            # replace tabs in header lines
434
            diff[0] = diff[0].replace('\t', ' ')
8✔
435
            diff[1] = diff[1].replace('\t', ' ')
8✔
436

437
            if additions_only:
8✔
438
                if len(self.state.old_data) and len(self.state.new_data) / len(self.state.old_data) <= 0.25:
8✔
439
                    diff = (
8✔
440
                        diff[:2]
441
                        + ['/**Comparison type: Additions only**']
442
                        + ['/**Deletions are being shown as 75% or more of the content has been deleted**']
443
                        + diff[2:]
444
                    )
445
                else:
446
                    head = '---' + diff[0][3:]
8✔
447
                    diff = [line for line in diff if line.startswith('+') or line.startswith('@')]
8!
448
                    diff = [
8!
449
                        line1
450
                        for line1, line2 in zip([''] + diff, diff + [''])
451
                        if not (line1.startswith('@') and line2.startswith('@'))
452
                    ][1:]
453
                    diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
454
                    if len(diff) == 1 or len([line for line in diff if line.lstrip('+').rstrip()]) == 2:
8!
455
                        self.state.verb = 'changed,no_report'
8✔
456
                        return empty_return
8✔
457
                    diff = [head, diff[0], '/**Comparison type: Additions only**'] + diff[1:]
8✔
458
            elif deletions_only:
8✔
459
                head = '--- @' + diff[1][3:]
8✔
460
                diff = [line for line in diff if line.startswith('-') or line.startswith('@')]
8!
461
                diff = [
8!
462
                    line1
463
                    for line1, line2 in zip([''] + diff, diff + [''])
464
                    if not (line1.startswith('@') and line2.startswith('@'))
465
                ][1:]
466
                diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
467
                if len(diff) == 1 or len([line for line in diff if line.lstrip('-').rstrip()]) == 2:
8!
468
                    self.state.verb = 'changed,no_report'
8✔
469
                    return empty_return
8✔
470
                diff = [diff[0], head, '/**Comparison type: Deletions only**'] + diff[1:]
8✔
471

472
            # remove range info lines if needed
473
            if directives.get('range_info') is False or (
8✔
474
                directives.get('range_info') is None and additions_only and (len(diff) < 4 or diff[3][0] != '/')
475
            ):
476
                diff = [line for line in diff if not line.startswith('@@ ')]
8!
477

478
            diff_text = '\n'.join(diff)
8✔
479

480
            out_diff.update(
8✔
481
                {
482
                    'text': diff_text,
483
                    'markdown': diff_text,
484
                }
485
            )
486

487
        if report_kind == 'html':
8✔
488
            out_diff['html'] = '\n'.join(self.unified_diff_to_html(diff_text))
8✔
489

490
        return out_diff
8✔
491

492

493
class TableDiffer(DifferBase):
8✔
494
    """Generates a Python HTML table diff."""
495

496
    __kind__ = 'table'
8✔
497

498
    __supported_directives__ = {
8✔
499
        'tabsize': 'tab stop spacing (default: 8)',
500
    }
501

502
    def differ(
8✔
503
        self,
504
        directives: dict[str, Any],
505
        report_kind: Literal['text', 'markdown', 'html'],
506
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
507
        tz: ZoneInfo | None = None,
508
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
509
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
510
        if report_kind in {'text', 'markdown'} and _unfiltered_diff is not None and 'html' in _unfiltered_diff:
8✔
511
            table = _unfiltered_diff['html']
8✔
512
        else:
513
            tabsize = int(directives.get('tabsize', 8))
8✔
514
            html_diff = difflib.HtmlDiff(tabsize=tabsize)
8✔
515
            table = html_diff.make_table(
8✔
516
                str(self.state.old_data).splitlines(keepends=True),
517
                str(self.state.new_data).splitlines(keepends=True),
518
                self.make_timestamp(self.state.old_timestamp, tz),
519
                self.make_timestamp(self.state.new_timestamp, tz),
520
                True,
521
                3,
522
            )
523
            # fix table formatting
524
            table = table.replace('<th ', '<th style="font-family:monospace" ')
8✔
525
            table = table.replace('<td ', '<td style="font-family:monospace" ')
8✔
526
            table = table.replace(' nowrap="nowrap"', '')
8✔
527
            table = table.replace('<a ', '<a style="font-family:monospace;color:inherit" ')
8✔
528
            table = table.replace('<span class="diff_add"', '<span style="color:green;background-color:lightgreen"')
8✔
529
            table = table.replace('<span class="diff_sub"', '<span style="color:red;background-color:lightred"')
8✔
530
            table = table.replace('<span class="diff_chg"', '<span style="color:orange;background-color:lightyellow"')
8✔
531
            out_diff['html'] = table
8✔
532

533
        if report_kind in {'text', 'markdown'}:
8✔
534
            diff_text = self.html2text(table)
8✔
535
            out_diff.update(
8✔
536
                {
537
                    'text': diff_text,
538
                    'markdown': diff_text,
539
                }
540
            )
541

542
        return out_diff
8✔
543

544

545
class CommandDiffer(DifferBase):
8✔
546
    """Runs an external command to generate the diff."""
547

548
    __kind__ = 'command'
8✔
549

550
    __supported_directives__ = {
8✔
551
        'command': 'The command to execute',
552
    }
553

554
    re_ptags = re.compile(r'^<p>|</p>$')
8✔
555
    re_htags = re.compile(r'<(/?)h\d>')
8✔
556
    re_tagend = re.compile(r'<(?!.*<).*>+$')
8✔
557

558
    def differ(
8✔
559
        self,
560
        directives: dict[str, Any],
561
        report_kind: Literal['text', 'markdown', 'html'],
562
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
563
        tz: ZoneInfo | None = None,
564
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
565
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
566
        command = directives['command']
8✔
567
        if (
8✔
568
            report_kind == 'html'
569
            and not command.startswith('wdiff')
570
            and _unfiltered_diff is not None
571
            and 'text' in _unfiltered_diff
572
        ):
573
            diff = _unfiltered_diff['text']
8✔
574
        else:
575
            old_data = self.state.old_data
8✔
576
            new_data = self.state.new_data
8✔
577
            if self.state.is_markdown():
8✔
578
                # protect the link anchor from being split (won't work)
579
                markdown_links_re = re.compile(r'\[(.*?)][(](.*?)[)]')
8✔
580
                old_data = markdown_links_re.sub(
8!
581
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(old_data)
582
                )
583
                new_data = markdown_links_re.sub(
8!
584
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(new_data)
585
                )
586

587
            # External diff tool
588
            with tempfile.TemporaryDirectory() as tmp_dir:
8✔
589
                tmp_path = Path(tmp_dir)
8✔
590
                old_file_path = tmp_path.joinpath('old_file')
8✔
591
                new_file_path = tmp_path.joinpath('new_file')
8✔
592
                if isinstance(old_data, str):
8!
593
                    old_file_path.write_text(old_data)
8✔
594
                else:
595
                    old_file_path.write_bytes(old_data)
×
596
                if isinstance(new_data, str):
8!
597
                    new_file_path.write_text(new_data)
8✔
598
                else:
599
                    new_file_path.write_bytes(new_data)
×
600
                cmdline = shlex.split(command) + [str(old_file_path), str(new_file_path)]
8✔
601
                proc = subprocess.run(cmdline, capture_output=True, text=True)  # noqa: S603 subprocess call
8✔
602
            if proc.stderr or proc.returncode > 1:
8✔
603
                raise RuntimeError(
8✔
604
                    f"Job {self.job.index_number}: External differ '{directives}' returned '{proc.stderr.strip()}' "
605
                    f'({self.job.get_location()})'
606
                ) from subprocess.CalledProcessError(proc.returncode, cmdline)
607
            if proc.returncode == 0:
8✔
608
                self.state.verb = 'changed,no_report'
8✔
609
                return {'text': '', 'markdown': '', 'html': ''}
8✔
610
            head = '\n'.join(
8✔
611
                [
612
                    f'Using differ "{directives}"',
613
                    f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}',
614
                    f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}',
615
                ]
616
            )
617
            diff = proc.stdout
8✔
618
            if self.state.is_markdown():
8!
619
                # undo the protection of the link anchor from being split
620
                diff = markdown_links_re.sub(lambda x: f'[{urllib.parse.unquote(x.group(1))}]({x.group(2)})', diff)
8!
621
            if command.startswith('wdiff') and self.job.contextlines == 0:
8!
622
                # remove lines that don't have any changes
623
                keeplines = []
×
624
                for line in diff.splitlines(keepends=True):
×
625
                    if any(x in line for x in {'{+', '+}', '[-', '-]'}):
×
626
                        keeplines.append(line)
×
627
                diff = ''.join(keeplines)
×
628
            diff = f'{head}\n{diff}'
8✔
629
            out_diff.update(
8✔
630
                {
631
                    'text': diff,
632
                    'markdown': diff,
633
                }
634
            )
635

636
        if report_kind == 'html':
8✔
637
            if command.startswith('wdiff'):
8!
638
                # colorize output of wdiff
639
                out_diff['html'] = self.wdiff_to_html(diff)
×
640
            else:
641
                out_diff['html'] = html.escape(diff)
8✔
642

643
        return out_diff
8✔
644

645
    def wdiff_to_html(self, diff: str) -> str:
8✔
646
        """
647
        Colorize output of wdiff.
648

649
        :param diff: The output of the wdiff command.
650
        :returns: The colorized HTML output.
651
        """
652
        html_diff = html.escape(diff)
8✔
653
        if self.state.is_markdown():
8✔
654
            # detect and fix multiline additions or deletions
655
            is_add = False
8✔
656
            is_del = False
8✔
657
            new_diff = []
8✔
658
            for line in html_diff.splitlines():
8✔
659
                if is_add:
8✔
660
                    line = '{+' + line
8✔
661
                    is_add = False
8✔
662
                elif is_del:
8✔
663
                    line = '[-' + line
8✔
664
                    is_del = False
8✔
665
                for match in re.findall(r'\[-|-]|{\+|\+}', line):
8✔
666
                    if match == '[-':
8✔
667
                        is_del = True
8✔
668
                    if match == '-]':
8✔
669
                        is_del = False
8✔
670
                    if match == '{+':
8✔
671
                        is_add = True
8✔
672
                    if match == '+}':
8✔
673
                        is_add = False
8✔
674
                if is_add:
8✔
675
                    line += '+}'
8✔
676
                elif is_del:
8✔
677
                    line += '-]'
8✔
678
                new_diff.append(line)
8✔
679
            html_diff = '<br>\n'.join(new_diff)
8✔
680

681
        # wdiff colorization (cannot be done with global CSS class as Gmail overrides it)
682
        html_diff = re.sub(
8✔
683
            r'\{\+(.*?)\+}',
684
            lambda x: f'<span style="{self.css_added_style}">{x.group(1)}</span>',
685
            html_diff,
686
            flags=re.DOTALL,
687
        )
688
        html_diff = re.sub(
8✔
689
            r'\[-(.*?)-]',
690
            lambda x: f'<span style="{self.css_deltd_style}">{x.group(1)}</span>',
691
            html_diff,
692
            flags=re.DOTALL,
693
        )
694
        if self.job.monospace:
8✔
695
            return f'<span style="font-family:monospace;white-space:pre-wrap">{html_diff}</span>'
8✔
696
        else:
697
            return html_diff
8✔
698

699

700
class DeepdiffDiffer(DifferBase):
8✔
701

702
    __kind__ = 'deepdiff'
8✔
703

704
    __supported_directives__ = {
8✔
705
        'data_type': "either 'json' (default) or 'xml'",
706
        'ignore_order': 'Whether to ignore the order in which the items have appeared (default: false)',
707
        'ignore_string_case': 'Whether to be case-sensitive or not when comparing strings (default: false)',
708
        'significant_digits': (
709
            'The number of digits AFTER the decimal point to be used in the comparison (default: ' 'no limit)'
710
        ),
711
    }
712

713
    def differ(
8✔
714
        self,
715
        directives: dict[str, Any],
716
        report_kind: Literal['text', 'markdown', 'html'],
717
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
718
        tz: ZoneInfo | None = None,
719
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
720
        if isinstance(DeepDiff, str):  # pragma: no cover
721
            self.raise_import_error('deepdiff', DeepDiff)
722

723
        span_added = f'<span style="{self.css_added_style}">'
8✔
724
        span_deltd = f'<span style="{self.css_deltd_style}">'
8✔
725

726
        def _pretty_deepdiff(ddiff: DeepDiff, report_kind: Literal['text', 'markdown', 'html']) -> str:
8✔
727
            """
728
            Customized version of deepdiff.serialization.SerializationMixin.pretty method, edited to include the
729
            values deleted or added and an option for colorized HTML output. The pretty human-readable string
730
            output for the diff object regardless of what view was used to generate the diff.
731
            """
732
            if report_kind == 'html':
8✔
733
                PRETTY_FORM_TEXTS = {
8✔
734
                    'type_changes': (
735
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
736
                        f'from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}</span>.'
737
                    ),
738
                    'values_changed': (
739
                        f'Value of {{diff_path}} changed from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}'
740
                        '</span>.'
741
                    ),
742
                    'dictionary_item_added': (
743
                        f'Item {{diff_path}} added to dictionary as {span_added}{{val_t2}}</span>.'
744
                    ),
745
                    'dictionary_item_removed': (
746
                        f'Item {{diff_path}} removed from dictionary (was {span_deltd}{{val_t1}}</span>).'
747
                    ),
748
                    'iterable_item_added': f'Item {{diff_path}} added to iterable as {span_added}{{val_t2}}</span>.',
749
                    'iterable_item_removed': (
750
                        f'Item {{diff_path}} removed from iterable (was {span_deltd}{{val_t1}}</span>).'
751
                    ),
752
                    'attribute_added': f'Attribute {{diff_path}} added as {span_added}{{val_t2}}</span>.',
753
                    'attribute_removed': f'Attribute {{diff_path}} removed (was {span_deltd}{{val_t1}}</span>).',
754
                    'set_item_added': f'Item root[{{val_t2}}] added to set as {span_added}{{val_t1}}</span>.',
755
                    'set_item_removed': (
756
                        f'Item root[{{val_t1}}] removed from set (was {span_deltd}{{val_t2}}</span>).'
757
                    ),
758
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
759
                }
760
            else:
761
                PRETTY_FORM_TEXTS = {
8✔
762
                    'type_changes': (
763
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
764
                        'from {val_t1} to {val_t2}.'
765
                    ),
766
                    'values_changed': 'Value of {diff_path} changed from {val_t1} to {val_t2}.',
767
                    'dictionary_item_added': 'Item {diff_path} added to dictionary as {val_t2}.',
768
                    'dictionary_item_removed': 'Item {diff_path} removed from dictionary (was {val_t1}).',
769
                    'iterable_item_added': 'Item {diff_path} added to iterable as {val_t2}.',
770
                    'iterable_item_removed': 'Item {diff_path} removed from iterable (was {val_t1}).',
771
                    'attribute_added': 'Attribute {diff_path} added as {val_t2}.',
772
                    'attribute_removed': 'Attribute {diff_path} removed (was {val_t1}).',
773
                    'set_item_added': 'Item root[{val_t2}] added to set as {val_t1}.',
774
                    'set_item_removed': 'Item root[{val_t1}] removed from set (was {val_t2}).',
775
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
776
                }
777

778
            def _pretty_print_diff(ddiff: DiffLevel) -> str:
8✔
779
                """
780
                Customized version of deepdiff.serialization.pretty_print_diff() function, edited to include the
781
                values deleted or added.
782
                """
783
                type_t1 = type(ddiff.t1).__name__
8✔
784
                type_t2 = type(ddiff.t2).__name__
8✔
785

786
                val_t1 = (
8✔
787
                    f'"{ddiff.t1}"'
788
                    if type_t1 in {'str', 'int', 'float'}
789
                    else (
790
                        jsonlib.dumps(ddiff.t1, ensure_ascii=False, indent=2)
791
                        if type_t1 in {'dict', 'list'}
792
                        else str(ddiff.t1)
793
                    )
794
                )
795
                val_t2 = (
8✔
796
                    f'"{ddiff.t2}"'
797
                    if type_t2 in {'str', 'int', 'float'}
798
                    else (
799
                        jsonlib.dumps(ddiff.t2, ensure_ascii=False, indent=2)
800
                        if type_t2 in {'dict', 'list'}
801
                        else str(ddiff.t2)
802
                    )
803
                )
804

805
                diff_path = ddiff.path()  # type: ignore[no-untyped-call]
8✔
806
                return '• ' + PRETTY_FORM_TEXTS.get(ddiff.report_type, '').format(
8✔
807
                    diff_path=diff_path,
808
                    type_t1=type_t1,
809
                    type_t2=type_t2,
810
                    val_t1=val_t1,
811
                    val_t2=val_t2,
812
                )
813

814
            result = []
8✔
815
            for key in ddiff.tree.keys():
8✔
816
                for item_key in ddiff.tree[key]:
8✔
817
                    result.append(_pretty_print_diff(item_key))
8✔
818

819
            return '\n'.join(result)
8✔
820

821
        data_type = directives.get('data_type', 'json')
8✔
822
        old_data = ''
8✔
823
        new_data = ''
8✔
824
        if data_type == 'json':
8✔
825
            try:
8✔
826
                old_data = jsonlib.loads(self.state.old_data)
8✔
827
            except jsonlib.JSONDecodeError:
8✔
828
                old_data = ''
8✔
829
            try:
8✔
830
                new_data = jsonlib.loads(self.state.new_data)
8✔
831
            except jsonlib.JSONDecodeError as e:
8✔
832
                self.state.exception = e
8✔
833
                self.state.traceback = self.job.format_error(e, traceback.format_exc())
8✔
834
                logger.error(f'Job {self.job.index_number}: New data is invalid JSON: {e} ({self.job.get_location()})')
8✔
835
                logger.info(f'Job {self.job.index_number}: {self.state.new_data!r}')
8✔
836
                return {
8✔
837
                    'text': f'Differ {self.__kind__} ERROR: New data is invalid JSON\n{e}',
838
                    'markdown': f'Differ {self.__kind__} **ERROR: New data is invalid JSON**\n{e}',
839
                    'html': f'Differ {self.__kind__} <b>ERROR: New data is invalid JSON</b>\n{e}',
840
                }
841
        elif data_type == 'xml':
8✔
842
            if isinstance(xmltodict, str):  # pragma: no cover
843
                self.raise_import_error('xmltodict', xmltodict)
844

845
            old_data = xmltodict.parse(self.state.old_data)
8✔
846
            new_data = xmltodict.parse(self.state.new_data)
8✔
847

848
        ignore_order: bool = directives.get('ignore_order')  # type: ignore[assignment]
8✔
849
        ignore_string_case: bool = directives.get('ignore_string_case')  # type: ignore[assignment]
8✔
850
        significant_digits = directives.get('significant_digits')
8✔
851
        ddiff = DeepDiff(
8✔
852
            old_data,
853
            new_data,
854
            cache_size=500,
855
            cache_purge_level=0,
856
            cache_tuning_sample_size=500,
857
            ignore_order=ignore_order,
858
            ignore_string_type_changes=True,
859
            ignore_numeric_type_changes=True,
860
            ignore_string_case=ignore_string_case,
861
            significant_digits=significant_digits,
862
            verbose_level=min(2, max(0, math.ceil(3 - logger.getEffectiveLevel() / 10))),
863
        )
864
        diff_text = _pretty_deepdiff(ddiff, report_kind)
8✔
865
        if not diff_text:
8✔
866
            self.state.verb = 'changed,no_report'
8✔
867
            return {'text': '', 'markdown': '', 'html': ''}
8✔
868

869
        self.job.set_to_monospace()
8✔
870
        if report_kind == 'html':
8✔
871
            html_diff = (
8✔
872
                f'<span style="font-family:monospace;white-space:pre-wrap;">'
873
                # f'Differ: {self.__kind__} for {data_type}\n'
874
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>\n'
875
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>\n'
876
                + diff_text[:-1].replace('][', ']<wbr>[')
877
                + '</span>'
878
            )
879
            return {'html': html_diff}
8✔
880
        else:
881
            text_diff = (
8✔
882
                # f'Differ: {self.__kind__} for {data_type}\n'
883
                f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\n'
884
                f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\n'
885
                f'{diff_text}'
886
            )
887
            return {'text': text_diff, 'markdown': text_diff}
8✔
888

889

890
class ImageDiffer(DifferBase):
8✔
891
    """Compares two images providing an image outlining areas that have changed."""
892

893
    __kind__ = 'image'
8✔
894

895
    __supported_directives__ = {
8✔
896
        'data_type': (
897
            "'url' (to retrieve an image), 'ascii85' (Ascii85 data), 'base64' (Base64 data) or 'filename' (the path "
898
            "to an image file) (default: 'url')"
899
        ),
900
        'mse_threshold': (
901
            'the minimum mean squared error (MSE) between two images to consider them changed, if numpy in installed '
902
            '(default: 2.5)'
903
        ),
904
        'ai_google': 'Generative AI summary of changes (BETA)',
905
    }
906

907
    def differ(
8✔
908
        self,
909
        directives: dict[str, Any],
910
        report_kind: Literal['text', 'markdown', 'html'],
911
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
912
        tz: ZoneInfo | None = None,
913
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
914
        warnings.warn(
2✔
915
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
916
            f'change in the future. Please report any problems or suggestions at '
917
            f'https://github.com/mborsetti/webchanges/discussions.',
918
            RuntimeWarning,
919
        )
920
        if isinstance(Image, str):  # pragma: no cover
921
            self.raise_import_error('pillow', Image)
922
        if isinstance(httpx, str):  # pragma: no cover
923
            self.raise_import_error('httpx', httpx)
924

925
        def load_image_from_web(url: str) -> Image.Image:
2✔
926
            """Fetches the image from an url."""
927
            logging.debug(f'Retrieving image from {url}')
2✔
928
            with httpx.stream('GET', url, timeout=10) as response:
2✔
929
                response.raise_for_status()
2✔
930
                return Image.open(BytesIO(b''.join(response.iter_bytes())))
2✔
931

932
        def load_image_from_file(filename: str) -> Image.Image:
2✔
933
            """Load an image from a file."""
934
            logging.debug(f'Reading image from {filename}')
2✔
935
            return Image.open(filename)
2✔
936

937
        def load_image_from_base64(base_64: str) -> Image.Image:
2✔
938
            """Load an image from an encoded bytes object."""
939
            logging.debug('Retrieving image from a base64 string')
2✔
940
            return Image.open(BytesIO(base64.b64decode(base_64)))
2✔
941

942
        def load_image_from_ascii85(ascii85: str) -> Image.Image:
2✔
943
            """Load an image from an encoded bytes object."""
944
            logging.debug('Retrieving image from an ascii85 string')
2✔
945
            return Image.open(BytesIO(base64.a85decode(ascii85)))
2✔
946

947
        def compute_diff_image(img1: Image.Image, img2: Image.Image) -> tuple[Image.Image, np.float64]:
2✔
948
            """Compute the difference between two images."""
949
            # Compute the absolute value of the pixel-by-pixel difference between the two images.
950
            diff_image = ImageChops.difference(img1, img2)
2✔
951

952
            # Compute the mean squared error between the images
953
            if not isinstance(np, str):
2✔
954
                diff_array = np.array(diff_image)
2✔
955
                mse_value = np.mean(np.square(diff_array))
2✔
956
            else:  # pragma: no cover
957
                mse_value = None
958

959
            # Create the diff image by overlaying this difference on a darkened greyscale background
960
            back_image = img1.convert('L')
2✔
961
            back_image_brightness = ImageStat.Stat(back_image).rms[0]
2✔
962
            back_image = ImageEnhance.Brightness(back_image).enhance(back_image_brightness / 225)
2✔
963

964
            # Convert the 'L' image to 'RGB' using a matrix that applies to yellow tint
965
            # The matrix has 12 elements: 4 for Red, 4 for Green, and 4 for Blue.
966
            # For yellow, we want Red and Green to copy the L values (1.0) and Blue to be zero.
967
            # The matrix is: [R, G, B, A] for each of the three output channels
968
            yellow_tint_matrix = (
2✔
969
                1.0,
970
                0.0,
971
                0.0,
972
                0.0,  # Red = 100% of the grayscale value
973
                1.0,
974
                0.0,
975
                0.0,
976
                0.0,  # Green = 100% of the grayscale value
977
                0.0,
978
                0.0,
979
                0.0,
980
                0.0,  # Blue = 0% of the grayscale value
981
            )
982

983
            # Apply the conversion
984
            diff_colored = diff_image.convert('RGB').convert('RGB', matrix=yellow_tint_matrix)
2✔
985

986
            final_img = ImageChops.add(back_image.convert('RGB'), diff_colored)
2✔
987
            final_img.format = img2.format
2✔
988

989
            return final_img, mse_value
2✔
990

991
        def ai_google(
2✔
992
            old_image: Image.Image,
993
            new_image: Image.Image,
994
            diff_image: Image.Image,
995
            directives: AiGoogleDirectives,
996
        ) -> str:
997
            """Summarize changes in image using Generative AI (ALPHA)."""
998
            logger.info(f'Job {self.job.index_number}: Running ai_google for {self.__kind__} differ')
×
999
            warnings.warn(
×
1000
                f'Job {self.job.index_number}: Using ai_google in differ {self.__kind__}, which is ALPHA, '
1001
                f'may have bugs, and may change in the future. Please report any problems or suggestions at '
1002
                f'https://github.com/mborsetti/webchanges/discussions.',
1003
                RuntimeWarning,
1004
            )
1005

1006
            api_version = '1beta'
×
1007
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
×
1008
            if len(GOOGLE_AI_API_KEY) != 39:
×
1009
                logger.error(
×
1010
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1011
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1012
                )
1013
                return (
×
1014
                    f'## ERROR in summarizing changes using {self.__kind__}:\n'
1015
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1016
                    f'{len(GOOGLE_AI_API_KEY)}.\n'
1017
                )
1018
            client = httpx.Client(http2=True, timeout=self.job.timeout)
×
1019

1020
            def _load_image(img_data: tuple[str, Image.Image]) -> dict[str, dict[str, str] | Exception | str]:
×
1021
                img_name, image = img_data
×
1022
                # Convert image to bytes
1023
                img_byte_arr = io.BytesIO()
×
1024
                image.save(img_byte_arr, format=image.format)
×
1025
                image_data = img_byte_arr.getvalue()
×
1026
                mime_type = f'image/{image.format.lower()}'  # type: ignore[union-attr]
×
1027

1028
                logger.info(
×
1029
                    f'Job {self.job.index_number}: Loading {img_name} ({image.format}) to Google AI '
1030
                    f'({len(image_data) / 1024:,.0f} kbytes)'
1031
                )
1032

1033
                # Initial resumable upload request
1034
                headers = {
×
1035
                    'X-Goog-Upload-Protocol': 'resumable',
1036
                    'X-Goog-Upload-Command': 'start',
1037
                    'X-Goog-Upload-Header-Content-Length': str(len(image_data)),
1038
                    'X-Goog-Upload-Header-Content-Type': mime_type,
1039
                    'Content-Type': 'application/json',
1040
                }
1041
                data = {'file': {'display_name': 'TEXT'}}
×
1042

1043
                try:
×
1044
                    response = client.post(
×
1045
                        f'https://generativelanguage.googleapis.com/upload/v{api_version}/files?'
1046
                        f'key={GOOGLE_AI_API_KEY}',
1047
                        headers=headers,
1048
                        json=data,
1049
                    )
1050
                except httpx.HTTPError as e:
×
1051
                    return {'error': e, 'img_name': img_name}
×
1052
                upload_url = response.headers['X-Goog-Upload-Url']
×
1053

1054
                # Upload the image data
1055
                headers = {
×
1056
                    'Content-Length': str(len(image_data)),
1057
                    'X-Goog-Upload-Offset': '0',
1058
                    'X-Goog-Upload-Command': 'upload, finalize',
1059
                }
1060
                try:
×
1061
                    response = client.post(upload_url, headers=headers, content=image_data)
×
1062
                except httpx.HTTPError as e:
×
1063
                    return {'error': e, 'img_name': img_name}
×
1064

1065
                # Extract file URI from response
1066
                file_info = response.json()
×
1067
                file_uri = file_info['file']['uri']
×
1068
                logger.info(f'Job {self.job.index_number}: {img_name.capitalize()} loaded to {file_uri}')
×
1069

1070
                return {
×
1071
                    'file_data': {
1072
                        'mime_type': mime_type,
1073
                        'file_uri': file_uri,
1074
                    }
1075
                }
1076

1077
            # upload to Google
1078
            additional_parts: list[dict[str, dict[str, str]]] = []
×
1079
            executor = ThreadPoolExecutor()
×
1080
            for additional_part in executor.map(
×
1081
                _load_image,
1082
                (
1083
                    ('old image', old_image),
1084
                    ('new image', new_image),
1085
                    ('differences image', diff_image),
1086
                ),
1087
            ):
1088
                if 'error' not in additional_part:
×
1089
                    additional_parts.append(additional_part)  # type: ignore[arg-type]
×
1090
                else:
1091
                    logger.error(
×
1092
                        f'Job {self.job.index_number}: ai_google for {self.__kind__} HTTP Client error '
1093
                        f"{type(additional_part['error'])} when loading {additional_part['img_name']} to Google AI: "
1094
                        f"{additional_part['error']}"
1095
                    )
1096
                    return (
×
1097
                        f"HTTP Client error {type(additional_part['error'])} when loading "
1098
                        f"{additional_part['img_name']} to Google AI: {additional_part['error']}"
1099
                    )
1100

1101
            system_instructions = (
×
1102
                'You are a skilled journalist tasked with summarizing the key differences between two versions '
1103
                'of the same image. The audience for your summary is already familiar with the image, so you can'
1104
                'focus on the most significant changes.'
1105
            )
1106
            model_prompt = (
×
1107
                'You are a skilled visual analyst tasked with analyzing two versions of an image and summarizing the '
1108
                'key differences between them. The audience for your summary is already familiar with the '
1109
                "image's content, so you should focus only on the most significant differences.\n\n"
1110
                '**Instructions:**\n\n'
1111
                '1. Carefully examine the yellow areas in the image '
1112
                f"{additional_parts[2]['file_data']['file_uri']}, identify the differences, and describe them.\n"
1113
                f"2. Refer to the old version of the image {additional_parts[0]['file_data']['file_uri']} and the new "
1114
                f" version {additional_parts[1]['file_data']['file_uri']}.\n"
1115
                '3. You are only interested in those differences, such as additions, removals, or alterations, that '
1116
                'modify the intended message or interpretation.\n'
1117
                '4. Summarize the identified differences, except those ignored, in a clear and concise manner, '
1118
                'explaining how the meaning has shifted or evolved in the new version compared to the old version only '
1119
                'when necessary. Be specific and provide examples to illustrate your points when needed.\n'
1120
                '5. If there are only additions to the image, then summarize the additions.\n'
1121
                '6. Use Markdown formatting to structure your summary effectively. Use headings, bullet points, '
1122
                'and other Markdown elements as needed to enhance readability.\n'
1123
                '7. Restrict your analysis and summary to the information provided within these images. Do '
1124
                'not introduce external information or assumptions.\n'
1125
            )
NEW
1126
            summary, _ = AIGoogleDiffer._send_to_model(
×
1127
                self.job,
1128
                system_instructions,
1129
                model_prompt,
1130
                additional_parts=additional_parts,  # type: ignore[arg-type]
1131
                directives=directives,
1132
            )
1133

1134
            return summary
×
1135

1136
        data_type = directives.get('data_type', 'url')
2✔
1137
        mse_threshold = directives.get('mse_threshold', 2.5)
2✔
1138
        if not isinstance(self.state.old_data, str):
2!
1139
            raise ValueError('old_data is not a string')
×
1140
        if not isinstance(self.state.new_data, str):
2!
1141
            raise ValueError('new_data is not a string')
×
1142
        if data_type == 'url':
2✔
1143
            old_image = load_image_from_web(self.state.old_data)
2✔
1144
            new_image = load_image_from_web(self.state.new_data)
2✔
1145
            old_data = f' (<a href="{self.state.old_data}" target="_blank">Old image</a>)'
2✔
1146
            new_data = f' (<a href="{self.state.new_data}" target="_blank">New image</a>)'
2✔
1147
        elif data_type == 'ascii85':
2✔
1148
            old_image = load_image_from_ascii85(self.state.old_data)
2✔
1149
            new_image = load_image_from_ascii85(self.state.new_data)
2✔
1150
            old_data = ''
2✔
1151
            new_data = ''
2✔
1152
        elif data_type == 'base64':
2✔
1153
            old_image = load_image_from_base64(self.state.old_data)
2✔
1154
            new_image = load_image_from_base64(self.state.new_data)
2✔
1155
            old_data = ''
2✔
1156
            new_data = ''
2✔
1157
        else:  # 'filename'
1158
            old_image = load_image_from_file(self.state.old_data)
2✔
1159
            new_image = load_image_from_file(self.state.new_data)
2✔
1160
            old_data = f' (<a href="file://{self.state.old_data}" target="_blank">Old image</a>)'
2✔
1161
            new_data = f' (<a href="file://{self.state.new_data}" target="_blank">New image</a>)'
2✔
1162

1163
        # Check formats  TODO: is it needed? under which circumstances?
1164
        # if new_image.format != old_image.format:
1165
        #     logger.info(f'Image formats do not match: {old_image.format} vs {new_image.format}')
1166
        # else:
1167
        #     logger.debug(f'image format is {old_image.format}')
1168

1169
        # If needed, shrink the larger image
1170
        if new_image.size != old_image.size:
2✔
1171
            if new_image.size > old_image.size:
2✔
1172
                logging.debug(f'Job {self.job.index_number}: Shrinking the new image')
2✔
1173
                img_format = new_image.format
2✔
1174
                new_image = new_image.resize(old_image.size, Image.Resampling.LANCZOS)
2✔
1175
                new_image.format = img_format
2✔
1176

1177
            else:
1178
                logging.debug(f'Job {self.job.index_number}: Shrinking the old image')
2✔
1179
                img_format = old_image.format
2✔
1180
                old_image = old_image.resize(new_image.size, Image.Resampling.LANCZOS)
2✔
1181
                old_image.format = img_format
2✔
1182

1183
        if old_image == new_image:
2✔
1184
            logger.info(f'Job {self.job.index_number}: New image is identical to the old one')
2✔
1185
            self.state.verb = 'unchanged'
2✔
1186
            return {'text': '', 'markdown': '', 'html': ''}
2✔
1187

1188
        diff_image, mse_value = compute_diff_image(old_image, new_image)
2✔
1189
        if mse_value:
2!
1190
            logger.debug(f'Job {self.job.index_number}: MSE value {mse_value:.2f}')
2✔
1191

1192
        if mse_value and mse_value < mse_threshold:
2✔
1193
            logger.info(
2✔
1194
                f'Job {self.job.index_number}: MSE value {mse_value:.2f} below the threshold of {mse_threshold}; '
1195
                f'considering changes not worthy of a report'
1196
            )
1197
            self.state.verb = 'changed,no_report'
2✔
1198
            return {'text': '', 'markdown': '', 'html': ''}
2✔
1199

1200
        # Convert the difference image to a base64 object
1201
        output_stream = BytesIO()
2✔
1202
        diff_image.save(output_stream, format=diff_image.format)
2✔
1203
        encoded_diff = b64encode(output_stream.getvalue()).decode()
2✔
1204

1205
        # Convert the new image to a base64 object
1206
        output_stream = BytesIO()
2✔
1207
        new_image.save(output_stream, format=new_image.format)
2✔
1208
        encoded_new = b64encode(output_stream.getvalue()).decode()
2✔
1209

1210
        # prepare AI summary
1211
        summary = ''
2✔
1212
        if 'ai_google' in directives:
2!
NEW
1213
            summary = ai_google(old_image, new_image, diff_image, directives.get('ai_google', {}))
×
1214

1215
        # Prepare HTML output
1216
        htm = [
2✔
1217
            f'<span style="font-family:monospace">'
1218
            # f'Differ: {self.__kind__} for {data_type}',
1219
            f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}{old_data}</span>',
1220
            f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}{new_data}'
1221
            '</span>',
1222
            '</span>',
1223
            'New image:',
1224
        ]
1225
        if data_type == 'url':
2✔
1226
            htm.append(f'<img src="{self.state.old_data}" style="max-width: 100%; display: block;">')
2✔
1227
        else:
1228
            htm.append(
2✔
1229
                f'<img src="data:image/{(new_image.format or "").lower()};base64,{encoded_new}" '
1230
                'style="max-width: 100%; display: block;">'
1231
            )
1232
        htm.extend(
2✔
1233
            [
1234
                'Differences from old (in yellow):',
1235
                f'<img src="data:image/{(diff_image.format or "").lower()};base64,{encoded_diff}" '
1236
                'style="max-width: 100%; display: block;">',
1237
            ]
1238
        )
1239
        changed_text = 'The image has changed; please see an HTML report for the visualization.'
2✔
1240
        if not summary:
2!
1241
            return {
2✔
1242
                'text': changed_text,
1243
                'markdown': changed_text,
1244
                'html': '<br>\n'.join(htm),
1245
            }
1246

1247
        newline = '\n'  # For Python < 3.12 f-string {} compatibility
×
1248
        back_n = '\\n'  # For Python < 3.12 f-string {} compatibility
×
1249
        directives_text = (
×
1250
            ', '.join(
1251
                f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.get('ai_google', {}).items()
1252
            )
1253
            or 'None'
1254
        )
1255
        footer = f'Summary generated by Google Generative AI (ai_google directive(s): {directives_text})'
×
1256
        return {
×
1257
            'text': (
1258
                f'{summary}\n\n\nA visualization of differences is available in {__package__} HTML reports.'
1259
                f'\n------------\n{footer}'
1260
            ),
1261
            'markdown': (
1262
                f'{summary}\n\n\nA visualization of differences is available in {__package__} HTML reports.'
1263
                f'\n* * *\n{footer}'
1264
            ),
1265
            'html': '<br>\n'.join(
1266
                [
1267
                    mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>'),
1268
                    '',
1269
                ]
1270
                + htm
1271
                + [
1272
                    '-----',
1273
                    f'<i><small>{footer}</small></i>',
1274
                ]
1275
            ),
1276
        }
1277

1278

1279
class AIGoogleDiffer(DifferBase):
8✔
1280
    """(Default) Generates a summary using Google Generative AI (Gemini models).
1281

1282
    Calls Google Gemini APIs; documentation at https://ai.google.dev/api/rest and tutorial at
1283
    https://ai.google.dev/tutorials/rest_quickstart
1284

1285
    """
1286

1287
    __kind__ = 'ai_google'
8✔
1288

1289
    __supported_directives__ = {
8✔
1290
        'model': ('model name from https://ai.google.dev/gemini-api/docs/models/gemini (default: gemini-2.0-flash)'),
1291
        'system_instructions': (
1292
            'Optional tone and style instructions for the model (default: see documentation at'
1293
            'https://webchanges.readthedocs.io/en/stable/differs.html#ai-google-diff)'
1294
        ),
1295
        'prompt': 'a custom prompt - {unified_diff}, {unified_diff_new}, {old_text} and {new_text} will be replaced',
1296
        'additions_only': 'summarizes only added lines (including as a result of a change)',
1297
        'prompt_ud_context_lines': 'the number of context lines for {unified_diff} (default: 9999)',
1298
        'timeout': 'the number of seconds before timing out the API call (default: 300)',
1299
        'max_output_tokens': "the maximum number of tokens returned by the model (default: None, i.e. model's default)",
1300
        'temperature': "the model's Temperature parameter (default: 0.0)",
1301
        'top_p': "the model's TopP parameter (default: None, i.e. model's default",
1302
        'top_k': "the model's TopK parameter (default: None, i.e. model's default",
1303
        'tools': "data passed on to the API's 'tools' field (default: None)",
1304
        'unified': 'directives passed to the unified differ (default: None)',
1305
    }
1306
    __default_directive__ = 'model'
8✔
1307

1308
    @staticmethod
8✔
1309
    def _send_to_model(
8✔
1310
        job: JobBase,
1311
        system_instructions: str,
1312
        model_prompt: str,
1313
        additional_parts: list[dict[str, str | dict[str, str]]] | None = None,
1314
        directives: AiGoogleDirectives | None = None,
1315
    ) -> tuple[str, str]:
1316
        """Creates the summary request to the model; returns the summary and the version of the actual model used."""
1317
        api_version = '1beta'
×
1318
        if directives is None:
×
1319
            directives = {}
×
NEW
1320
        model = directives.get('model', 'gemini-2.0-flash')
×
1321
        timeout = directives.get('timeout', 300)
×
1322
        max_output_tokens = directives.get('max_output_tokens')
×
1323
        temperature = directives.get('temperature', 0.0)
×
NEW
1324
        top_p = directives.get('top_p', 1.0 if temperature == 0.0 else None)
×
1325
        top_k = directives.get('top_k')
×
1326
        GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
×
1327
        if len(GOOGLE_AI_API_KEY) != 39:
×
1328
            logger.error(
×
1329
                f'Job {job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1330
                f'incorrect length {len(GOOGLE_AI_API_KEY)} ({job.get_location()})'
1331
            )
1332
            return (
×
1333
                f'## ERROR in summarizing changes using Google AI:\n'
1334
                f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1335
                f'{len(GOOGLE_AI_API_KEY)}.\n',
1336
                '',
1337
            )
1338

1339
        data: dict[str, Any] = {
×
1340
            'system_instruction': {'parts': [{'text': system_instructions}]},
1341
            'contents': [{'parts': [{'text': model_prompt}]}],
1342
            'generation_config': {
1343
                'max_output_tokens': max_output_tokens,
1344
                'temperature': temperature,
1345
                'top_p': top_p,
1346
                'top_k': top_k,
1347
            },
1348
        }
1349
        if additional_parts:
×
1350
            data['contents'][0]['parts'].extend(additional_parts)
×
1351
        if directives.get('tools'):
×
1352
            data['tools'] = directives['tools']
×
1353
        logger.info(f'Job {job.index_number}: Making the content generation request to Google AI model {model}')
×
NEW
1354
        model_version = model  # default
×
1355
        try:
×
1356
            r = httpx.Client(http2=True).post(  # noqa: S113 Call to httpx without timeout
×
1357
                f'https://generativelanguage.googleapis.com/v{api_version}/models/{model}:generateContent?'
1358
                f'key={GOOGLE_AI_API_KEY}',
1359
                json=data,
1360
                headers={'Content-Type': 'application/json'},
1361
                timeout=timeout,
1362
            )
1363
            if r.is_success:
×
1364
                result = r.json()
×
1365
                candidate = result['candidates'][0]
×
1366
                logger.info(f"Job {job.index_number}: AI generation finished by {candidate['finishReason']}")
×
1367
                if 'content' in candidate:
×
1368
                    summary: str = candidate['content']['parts'][0]['text'].rstrip()
×
1369
                else:
1370
                    summary = (
×
1371
                        f'AI summary unavailable: Model did not return any candidate output:\n'
1372
                        f'{jsonlib.dumps(result, ensure_ascii=True, indent=2)}'
1373
                    )
NEW
1374
                model_version = result['modelVersion']
×
1375

1376
            elif r.status_code == 400:
×
1377
                summary = (
×
1378
                    f'AI summary unavailable: Received error from {r.url.host}: '
1379
                    f"{r.json().get('error', {}).get('message') or ''}"
1380
                )
1381
            else:
1382
                summary = (
×
1383
                    f'AI summary unavailable: Received error {r.status_code} {r.reason_phrase} from ' f'{r.url.host}'
1384
                )
1385
                if r.content:
×
1386
                    summary += f": {r.json().get('error', {}).get('message') or ''}"
×
1387

1388
        except httpx.HTTPError as e:
×
1389
            summary = (
×
1390
                f'AI summary unavailable: HTTP client error: {e} when requesting data from ' f'{e.request.url.host}'
1391
            )
1392

NEW
1393
        return summary, model_version
×
1394

1395
    def differ(
8✔
1396
        self,
1397
        directives: AiGoogleDirectives,  # type: ignore[override]
1398
        report_kind: Literal['text', 'markdown', 'html'],
1399
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1400
        tz: ZoneInfo | None = None,
1401
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1402
        logger.info(f'Job {self.job.index_number}: Running the {self.__kind__} differ from hooks.py')
8✔
1403
        warnings.warn(
8✔
1404
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1405
            f'change in the future. Please report any problems or suggestions at '
1406
            f'https://github.com/mborsetti/webchanges/discussions.',
1407
            RuntimeWarning,
1408
        )
1409

1410
        def get_ai_summary(prompt: str, system_instructions: str) -> tuple[str, str]:
8✔
1411
            """Generate AI summary from unified diff, or an error message, plus the model version."""
1412
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
8✔
1413
            if len(GOOGLE_AI_API_KEY) != 39:
8✔
1414
                logger.error(
8✔
1415
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1416
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1417
                )
1418
                return (
8✔
1419
                    f'## ERROR in summarizing changes using {self.__kind__}:\n'
1420
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1421
                    f'{len(GOOGLE_AI_API_KEY)}.\n',
1422
                    '',
1423
                )
1424

1425
            if '{unified_diff' in prompt:  # matches unified_diff or unified_diff_new
8!
1426
                default_context_lines = 9999 if '{unified_diff}' in prompt else 0  # none if only unified_diff_new
×
1427
                context_lines = directives.get('prompt_ud_context_lines', default_context_lines)
×
1428
                unified_diff = '\n'.join(
×
1429
                    difflib.unified_diff(
1430
                        str(self.state.old_data).splitlines(),
1431
                        str(self.state.new_data).splitlines(),
1432
                        # '@',
1433
                        # '@',
1434
                        # self.make_timestamp(self.state.old_timestamp, tz),
1435
                        # self.make_timestamp(self.state.new_timestamp, tz),
1436
                        n=context_lines,
1437
                    )
1438
                )
1439
                if not unified_diff:
×
1440
                    # no changes
NEW
1441
                    return '', ''
×
1442
            else:
1443
                unified_diff = ''
8✔
1444

1445
            if '{unified_diff_new}' in prompt:
8!
1446
                unified_diff_new_lines = []
×
1447
                for line in unified_diff.splitlines():
×
1448
                    if line.startswith('+'):
×
1449
                        unified_diff_new_lines.append(line[1:])
×
1450
                unified_diff_new = '\n'.join(unified_diff_new_lines)
×
1451
            else:
1452
                unified_diff_new = ''
8✔
1453

1454
            # check if data is different (same data is sent during testing)
1455
            if '{old_text}' in prompt and '{new_text}' in prompt and self.state.old_data == self.state.new_data:
8!
1456
                return '', ''
8✔
1457

1458
            model_prompt = prompt.format(
×
1459
                unified_diff=unified_diff,
1460
                unified_diff_new=unified_diff_new,
1461
                old_text=self.state.old_data,
1462
                new_text=self.state.new_data,
1463
            )
1464

NEW
1465
            summary, model_version = self._send_to_model(
×
1466
                self.job,
1467
                system_instructions,
1468
                model_prompt,
1469
                directives=directives,
1470
            )
1471

NEW
1472
            return summary, model_version
×
1473

1474
        if directives.get('additions_only') or self.job.additions_only:
8!
1475
            default_system_instructions = (
×
1476
                'You are a skilled journalist. Your task is to summarize the provided text in a clear and concise '
1477
                'manner. Restrict your analysis and summary *only* to the text provided. Do not introduce any '
1478
                'external information or assumptions.\n\n'
1479
                'Format your summary using Markdown. Use headings, bullet points, and other Markdown elements where '
1480
                'appropriate to create a well-structured and easily readable summary.'
1481
            )
1482
            default_prompt = '{unified_diff_new}'
×
1483
        else:
1484
            default_system_instructions = (
8✔
1485
                'You are a skilled journalist tasked with analyzing two versions of a text and summarizing the key '
1486
                'differences in meaning between them. The audience for your summary is already familiar with the '
1487
                "text's content, so you can focus on the most significant changes.\n\n"
1488
                '**Instructions:**\n\n'
1489
                '1. Carefully examine the old version of the text, provided within the `<old_version>` and '
1490
                '`</old_version>` tags.\n'
1491
                '2. Carefully examine the new version of the text, provided within the `<new_version>` and '
1492
                '`</new_version>` tags.\n'
1493
                '3. Compare the two versions, identifying areas where the meaning differs. This includes additions, '
1494
                'removals, or alterations that change the intended message or interpretation.\n'
1495
                '4. Ignore changes that do not affect the overall meaning, even if the wording has been modified.\n'
1496
                '5. Summarize the identified differences, except those ignored, in a clear and concise manner, '
1497
                'explaining how the meaning has shifted or evolved in the new version compared to the old version only '
1498
                'when necessary. Be specific and provide examples to illustrate your points when needed.\n'
1499
                '6. If there are only additions to the text, then summarize the additions.\n'
1500
                '7. Use Markdown formatting to structure your summary effectively. Use headings, bullet points, '
1501
                'and other Markdown elements as needed to enhance readability.\n'
1502
                '8. Restrict your analysis and summary to the information provided within the `<old_version>` and '
1503
                '`<new_version>` tags. Do not introduce external information or assumptions.\n'
1504
            )
1505
            default_prompt = '<old_version>\n{old_text}\n</old_version>\n\n<new_version>\n{new_text}\n</new_version>'
8✔
1506
        system_instructions = directives.get('system_instructions', default_system_instructions)
8✔
1507
        prompt = directives.get('prompt', default_prompt).replace('\\n', '\n')
8✔
1508
        summary, model_version = get_ai_summary(prompt, system_instructions)
8✔
1509
        if not summary:
8✔
1510
            self.state.verb = 'changed,no_report'
8✔
1511
            return {'text': '', 'markdown': '', 'html': ''}
8✔
1512
        newline = '\n'  # For Python < 3.12 f-string {} compatibility
8✔
1513
        back_n = '\\n'  # For Python < 3.12 f-string {} compatibility
8✔
1514
        directives.pop('model', None)
8✔
1515
        if directives:
8!
NEW
1516
            directives_text = (
×
1517
                ' (differ directive(s): '
1518
                + (
1519
                    ', '.join(f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.items())
1520
                    or 'None'
1521
                )
1522
                + ')'
1523
            )
1524
        else:
1525
            directives_text = ''
8✔
1526
        footer = (
8✔
1527
            f"Summary by Google Generative AI's model {model_version}{directives_text}"
1528
            if model_version and directives_text
1529
            else ''
1530
        )
1531
        temp_unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
1532
        for rep_kind in ['text', 'html']:  # markdown is same as text
8✔
1533
            unified_report = DifferBase.process(
8✔
1534
                'unified',
1535
                directives.get('unified') or {},  # type: ignore[arg-type]
1536
                self.state,
1537
                rep_kind,  # type: ignore[arg-type]
1538
                tz,
1539
                temp_unfiltered_diff,
1540
            )
1541
        return {
8✔
1542
            'text': f"{summary}\n\n{unified_report['text']}" + (f'\n------------\n{footer}' if footer else ''),
1543
            'markdown': f"{summary}\n\n{unified_report['markdown']}" + (f'\n* * *\n{footer}' if footer else ''),
1544
            'html': '\n'.join(
1545
                [
1546
                    mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>'),
1547
                    '<br>',
1548
                    '<br>',
1549
                    unified_report['html'],
1550
                ]
1551
                + (['-----<br>', f'<i><small>{footer}</small></i>'] if footer else [])
1552
            ),
1553
        }
1554

1555

1556
class WdiffDiffer(DifferBase):
8✔
1557
    __kind__ = 'wdiff'
8✔
1558

1559
    __supported_directives__: dict[str, str] = {
8✔
1560
        'context_lines': 'the number of context lines (default: 3)',
1561
        'range_info': 'include range information lines (default: true)',
1562
    }
1563

1564
    def differ(
8✔
1565
        self,
1566
        directives: dict[str, Any],
1567
        report_kind: Literal['text', 'markdown', 'html'],
1568
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1569
        tz: ZoneInfo | None = None,
1570
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1571
        warnings.warn(
8✔
1572
            f'Job {self.job.index_number}: Differ {self.__kind__} is WORK IN PROGRESS and has KNOWN bugs which '
1573
            "are being worked on. DO NOT USE AS THE RESULTS WON'T BE CORRECT.",
1574
            RuntimeWarning,
1575
        )
1576
        if not isinstance(self.state.old_data, str):
8!
1577
            raise ValueError
×
1578
        if not isinstance(self.state.new_data, str):
8!
1579
            raise ValueError
×
1580

1581
        # Split the texts into words tokenizing newline
1582
        if self.state.is_markdown():
8!
1583
            # Don't split spaces in link text, tokenize space as </s>
1584
            old_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.old_data)
8✔
1585
            words1 = old_data.replace('\n', ' <\\n> ').split(' ')
8✔
1586
            new_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.new_data)
8✔
1587
            words2 = new_data.replace('\n', ' <\\n> ').split(' ')
8✔
1588
        else:
1589
            words1 = self.state.old_data.replace('\n', ' <\\n> ').split(' ')
×
1590
            words2 = self.state.new_data.replace('\n', ' <\\n> ').split(' ')
×
1591

1592
        # Create a Differ object
1593
        import difflib
8✔
1594

1595
        d = difflib.Differ()
8✔
1596

1597
        # Generate a difference list
1598
        diff = list(d.compare(words1, words2))
8✔
1599

1600
        add_html = '<span style="background-color:#d1ffd1;color:#082b08;">'
8✔
1601
        rem_html = '<span style="background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;">'
8✔
1602

1603
        head_text = (
8✔
1604
            # f'Differ: wdiff\n'
1605
            f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m\n'
1606
            f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m\n'
1607
        )
1608
        head_html = '<br>\n'.join(
8✔
1609
            [
1610
                '<span style="font-family:monospace;">'
1611
                # 'Differ: wdiff',
1612
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
1613
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>'
1614
                f'</span>',
1615
                '',
1616
            ]
1617
        )
1618
        # Process the diff output to make it more wdiff-like
1619
        result_text = []
8✔
1620
        result_html = []
8✔
1621
        prev_word_text = ''
8✔
1622
        prev_word_html = ''
8✔
1623
        next_text = ''
8✔
1624
        next_html = ''
8✔
1625
        add = False
8✔
1626
        rem = False
8✔
1627

1628
        for word_text in diff + ['  ']:
8✔
1629
            if word_text[0] == '?':  # additional context line
8✔
1630
                continue
8✔
1631
            word_html = word_text
8✔
1632
            pre_text = [next_text] if next_text else []
8✔
1633
            pre_html = [next_html] if next_html else []
8✔
1634
            next_text = ''
8✔
1635
            next_html = ''
8✔
1636

1637
            if word_text[0] == '+' and not add:  # Beginning of additions
8✔
1638
                if rem:
8✔
1639
                    prev_word_html += '</span>'
8✔
1640
                    rem = False
8✔
1641
                if word_text[2:] == '<\\n>':
8!
1642
                    next_text = '\033[92m'
×
1643
                    next_html = add_html
×
1644
                else:
1645
                    pre_text.append('\033[92m')
8✔
1646
                    pre_html.append(add_html)
8✔
1647
                add = True
8✔
1648
            elif word_text[0] == '-' and not rem:  # Beginning of deletions
8✔
1649
                if add:
8✔
1650
                    prev_word_html += '</span>'
8✔
1651
                    add = False
8✔
1652
                if word_text[2:] == '<\\n>':
8!
1653
                    next_text = '\033[91m'
×
1654
                    next_html = rem_html
×
1655
                else:
1656
                    pre_text.append('\033[91m')
8✔
1657
                    pre_html.append(rem_html)
8✔
1658
                rem = True
8✔
1659
            elif word_text[0] == ' ' and (add or rem):  # Unchanged word
8✔
1660
                if prev_word_text == '<\\n>':
8!
1661
                    prev_word_text = '\033[0m<\\n>'
×
1662
                    prev_word_html = '</span><\\n>'
×
1663
                else:
1664
                    prev_word_text += '\033[0m'
8✔
1665
                    prev_word_html += '</span>'
8✔
1666
                add = False
8✔
1667
                rem = False
8✔
1668
            elif word_text[2:] == '<\\n>':  # New line
8✔
1669
                if add:
8!
1670
                    word_text = '  \033[0m<\\n>'
×
1671
                    word_html = '  </span><\\n>'
×
1672
                    add = False
×
1673
                elif rem:
8!
1674
                    word_text = '  \033[0m<\\n>'
×
1675
                    word_html = '  </span><\\n>'
×
1676
                    rem = False
×
1677

1678
            result_text.append(prev_word_text)
8✔
1679
            result_html.append(prev_word_html)
8✔
1680
            pre_text.append(word_text[2:])
8✔
1681
            pre_html.append(word_html[2:])
8✔
1682
            prev_word_text = ''.join(pre_text)
8✔
1683
            prev_word_html = ''.join(pre_html)
8✔
1684
        if add or rem:
8!
1685
            result_text[-1] += '\033[0m'
×
1686
            result_html[-1] += '</span>'
×
1687

1688
        # rebuild the text from words, replacing the newline token
1689
        diff_text = ' '.join(result_text[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1690
        diff_html = ' '.join(result_html[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1691

1692
        # build contextlines
1693
        contextlines = directives.get('context_lines', self.job.contextlines)
8✔
1694
        # contextlines = 999
1695
        if contextlines is None:
8!
1696
            contextlines = 3
8✔
1697
        range_info = directives.get('range_info', True)
8✔
1698
        if contextlines < len(diff_text.splitlines()):
8!
1699
            lines_with_changes = []
×
1700
            for i, line in enumerate(diff_text.splitlines()):
×
1701
                if '\033[9' in line:
×
1702
                    lines_with_changes.append(i)
×
1703
            if contextlines:
×
1704
                lines_to_keep: set[int] = set()
×
1705
                for i in lines_with_changes:
×
1706
                    lines_to_keep.update(r for r in range(i - contextlines, i + contextlines + 1))
×
1707
            else:
1708
                lines_to_keep = set(lines_with_changes)
×
1709
            new_diff_text = []
×
1710
            new_diff_html = []
×
1711
            last_line = 0
×
1712
            skip = False
×
1713
            i = 0
×
1714
            for i, (line_text, line_html) in enumerate(zip(diff_text.splitlines(), diff_html.splitlines())):
×
1715
                if i in lines_to_keep:
×
1716
                    if range_info and skip:
×
1717
                        new_diff_text.append(f'@@ {last_line + 1}...{i} @@')
×
1718
                        new_diff_html.append(f'@@ {last_line + 1}...{i} @@')
×
1719
                        skip = False
×
1720
                    new_diff_text.append(line_text)
×
1721
                    new_diff_html.append(line_html)
×
1722
                    last_line = i + 1
×
1723
                else:
1724
                    skip = True
×
1725
            if (i + 1) != last_line:
×
1726
                if range_info and skip:
×
1727
                    new_diff_text.append(f'@@ {last_line + 1}...{i + 1} @@')
×
1728
                    new_diff_html.append(f'@@ {last_line + 1}...{i + 1} @@')
×
1729
            diff_text = '\n'.join(new_diff_text)
×
1730
            diff_html = '\n'.join(new_diff_html)
×
1731

1732
        if self.state.is_markdown():
8!
1733
            diff_text = diff_text.replace('</s>', ' ')
8✔
1734
            diff_html = diff_html.replace('</s>', ' ')
8✔
1735
            diff_html = mark_to_html(diff_html, self.job.markdown_padded_tables).replace('<p>', '').replace('</p>', '')
8✔
1736

1737
        if self.job.monospace:
8!
1738
            diff_html = f'<span style="font-family:monospace;white-space:pre-wrap">{diff_html}</span>'
×
1739
        else:
1740
            diff_html = diff_html.replace('\n', '<br>\n')
8✔
1741

1742
        return {
8✔
1743
            'text': head_text + diff_text,
1744
            'markdown': head_text + diff_text,
1745
            'html': head_html + diff_html,
1746
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc