• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 14020917399

23 Mar 2025 04:35PM UTC coverage: 75.4% (-0.05%) from 75.448%
14020917399

push

github

mborsetti
Version 3.29.0rc2

1739 of 2632 branches covered (66.07%)

Branch coverage included in aggregate %.

4575 of 5742 relevant lines covered (79.68%)

6.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.5
/webchanges/differs.py
1
"""Differs."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import base64
8✔
8
import difflib
8✔
9
import html
8✔
10
import io
8✔
11
import logging
8✔
12
import math
8✔
13
import os
8✔
14
import re
8✔
15
import shlex
8✔
16
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
17
import tempfile
8✔
18
import traceback
8✔
19
import urllib.parse
8✔
20
import warnings
8✔
21
from base64 import b64encode
8✔
22
from concurrent.futures import ThreadPoolExecutor
8✔
23
from datetime import datetime
8✔
24
from io import BytesIO
8✔
25
from pathlib import Path
8✔
26
from typing import Any, Iterator, Literal, TYPE_CHECKING, TypedDict
8✔
27
from zoneinfo import ZoneInfo
8✔
28

29
import html2text
8✔
30

31
from webchanges.jobs import JobBase
8✔
32
from webchanges.util import linkify, mark_to_html, TrackSubClasses
8✔
33

34
try:
8✔
35
    from deepdiff import DeepDiff
8✔
36
    from deepdiff.model import DiffLevel
8✔
37
except ImportError as e:  # pragma: no cover
38
    DeepDiff = str(e)  # type: ignore[assignment,misc]
39

40
try:
8✔
41
    import httpx
8✔
42
except ImportError:  # pragma: no cover
43
    httpx = None  # type: ignore[assignment]
44
if httpx is not None:
8!
45
    try:
8✔
46
        import h2
8✔
47
    except ImportError:  # pragma: no cover
48
        h2 = None  # type: ignore[assignment]
49

50
try:
8✔
51
    import numpy as np
8✔
52
except ImportError as e:  # pragma: no cover
53
    np = str(e)  # type: ignore[assignment]
54

55
try:
8✔
56
    from PIL import Image, ImageChops, ImageEnhance, ImageStat
8✔
57
except ImportError as e:  # pragma: no cover
58
    Image = str(e)  # type: ignore[assignment]
59

60
# https://stackoverflow.com/questions/712791
61
try:
8✔
62
    import simplejson as jsonlib
8✔
63
except ImportError:  # pragma: no cover
64
    import json as jsonlib  # type: ignore[no-redef]
65

66
try:
8✔
67
    import xmltodict
8✔
68
except ImportError as e:  # pragma: no cover
69
    xmltodict = str(e)  # type: ignore[no-redef]
70

71
# https://stackoverflow.com/questions/39740632
72
if TYPE_CHECKING:
73
    from webchanges.handler import JobState
74
    from webchanges.storage import _Config
75

76

77
logger = logging.getLogger(__name__)
8✔
78

79
AiGoogleDirectives = TypedDict(
8✔
80
    'AiGoogleDirectives',
81
    {
82
        'model': str,
83
        'additions_only': str,
84
        'system_instructions': str,
85
        'prompt': str,
86
        'prompt_ud_context_lines': int,
87
        'timeout': int,
88
        'max_output_tokens': int | None,
89
        'temperature': float | None,
90
        'top_p': float | None,
91
        'top_k': float | None,
92
        'tools': list[Any],
93
    },
94
    total=False,
95
)
96

97

98
class DifferBase(metaclass=TrackSubClasses):
8✔
99
    """The base class for differs."""
100

101
    __subclasses__: dict[str, type[DifferBase]] = {}
8✔
102
    __anonymous_subclasses__: list[type[DifferBase]] = []
8✔
103

104
    __kind__: str = ''
8✔
105

106
    __supported_directives__: dict[str, str] = {}  # this must be present, even if empty
8✔
107

108
    css_added_style = 'background-color:#d1ffd1;color:#082b08;'
8✔
109
    css_deltd_style = 'background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;'
8✔
110

111
    def __init__(self, state: JobState) -> None:
8✔
112
        """
113

114
        :param state: the JobState.
115
        """
116
        self.job = state.job
8✔
117
        self.state = state
8✔
118

119
    @classmethod
8✔
120
    def differ_documentation(cls) -> str:
8✔
121
        """Generates simple differ documentation for use in the --features command line argument.
122

123
        :returns: A string to display.
124
        """
125
        result: list[str] = []
8✔
126
        for sc in TrackSubClasses.sorted_by_kind(cls):
8✔
127
            # default_directive = getattr(sc, '__default_directive__', None)
128
            result.extend((f'  * {sc.__kind__} - {sc.__doc__}',))
8✔
129
            if hasattr(sc, '__supported_directives__'):
8!
130
                for key, doc in sc.__supported_directives__.items():
8✔
131
                    result.append(f'      {key} ... {doc}')
8✔
132
        result.append('\n[] ... Parameter can be supplied as unnamed value\n')
8✔
133
        return '\n'.join(result)
8✔
134

135
    @classmethod
8✔
136
    def normalize_differ(
8✔
137
        cls,
138
        differ_spec: dict[str, Any] | None,
139
        job_index_number: int | None = None,
140
        config: _Config | None = None,
141
    ) -> tuple[str, dict[str, Any]]:
142
        """Checks the differ_spec for its validity and applies default values.
143

144
        :param differ_spec: The differ as entered by the user; use "unified" if empty.
145
        :param job_index_number: The job index number.
146
        :returns: A validated differ_kind, directives tuple.
147
        """
148

149
        def directives_with_defaults(
8✔
150
            differ_spec: str, directives: dict[str, Any], config: _Config | None = None
151
        ) -> dict[str, Any]:
152
            """Obtain differ subdirectives that also contains defaults from the configuration.
153

154
            :param differ_spec: The differ as entered by the user; use "unified" if empty.
155
            :param directives: The differ directives as stated in the job.
156
            :param config: The configuration.
157
            :returns: directives inclusive of configuration defaults.
158
            """
159
            if config is None:
8!
160
                logger.info('No configuration object found to look for differ defaults')
8✔
161
                return directives
8✔
162
            cfg = config.get('differ_defaults')
×
163
            if isinstance(cfg, dict):
×
164
                defaults: dict[str, Any] = cfg.get(differ_spec)  # type: ignore[assignment]
×
165
                if defaults:
×
166
                    for key, value in defaults.items():
×
167
                        if key not in directives:
×
168
                            directives[key] = value
×
169
            return directives
×
170

171
        differ_spec = differ_spec or {'name': 'unified'}
8✔
172
        directives = differ_spec.copy()
8✔
173
        differ_kind = directives.pop('name', '')
8✔
174
        if not differ_kind:
8✔
175
            if list(directives.keys()) == ['command']:
8!
176
                differ_kind = 'command'
8✔
177
            else:
178
                raise ValueError(
×
179
                    f"Job {job_index_number}: Differ directive must have a 'name' sub-directive: {differ_spec}."
180
                )
181

182
        differcls: DifferBase | None = cls.__subclasses__.get(differ_kind, None)  # type: ignore[assignment]
8✔
183
        if not differcls:
8✔
184
            raise ValueError(f'Job {job_index_number}: No differ named {differ_kind}.')
8✔
185

186
        if directives:
8✔
187
            directives = directives_with_defaults(differ_kind, directives, config)
8✔
188

189
        if hasattr(differcls, '__supported_directives__'):
8!
190
            provided_keys = set(directives.keys())
8✔
191
            allowed_keys = set(differcls.__supported_directives__.keys())
8✔
192
            unknown_keys = provided_keys.difference(allowed_keys)
8✔
193
            if unknown_keys and '<any>' not in allowed_keys:
8✔
194
                raise ValueError(
8✔
195
                    f'Job {job_index_number}: Differ {differ_kind} does not support sub-directive(s) '
196
                    f"{', '.join(unknown_keys)} (supported: {', '.join(sorted(allowed_keys))})."
197
                )
198

199
        return differ_kind, directives
8✔
200

201
    @classmethod
8✔
202
    def process(
8✔
203
        cls,
204
        differ_kind: str,
205
        directives: dict[str, Any],
206
        job_state: JobState,
207
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
208
        tz: ZoneInfo | None = None,
209
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
210
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
211
        """Process the differ.
212

213
        :param differ_kind: The name of the differ.
214
        :param directives: The directives.
215
        :param job_state: The JobState.
216
        :param report_kind: The report kind required.
217
        :param tz: The timezone of the report.
218
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
219
           for a different report_kind.
220
        :returns: The output of the differ or an error message with traceback if it fails.
221
        """
222
        logger.info(f'Job {job_state.job.index_number}: Applying differ {differ_kind}, directives {directives}')
8✔
223
        differcls: type[DifferBase] | None = cls.__subclasses__.get(differ_kind)  # type: ignore[assignment]
8✔
224
        if differcls:
8✔
225
            try:
8✔
226
                return differcls(job_state).differ(directives, report_kind, _unfiltered_diff, tz)
8✔
227
            except Exception as e:
8✔
228
                # Differ failed
229
                logger.info(
8✔
230
                    f'Job {job_state.job.index_number}: Differ {differ_kind} with {directives=} encountered '
231
                    f'error {e}'
232
                )
233
                # Undo saving of new data since user won't see the diff
234
                job_state.delete_latest()
8✔
235

236
                job_state.exception = e
8✔
237
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
238
                directives_text = ', '.join(f'{key}={value}' for key, value in directives.items()) or 'None'
8✔
239
                return {
8✔
240
                    'text': (
241
                        f'Differ {differ_kind} with directive(s) {directives_text} encountered an '
242
                        f'error:\n\n{job_state.traceback}'
243
                    ),
244
                    'markdown': (
245
                        f'## Differ {differ_kind} with directive(s) {directives_text} '
246
                        f'encountered an error:\n```\n{job_state.traceback}\n```\n'
247
                    ),
248
                    'html': (
249
                        f'<span style="color:red;font-weight:bold">Differ {differ_kind} with directive(s) '
250
                        f'{directives_text} encountered an error:<br>\n<br>\n'
251
                        f'<span style="font-family:monospace;white-space:pre-wrap;">{job_state.traceback}'
252
                        f'</span></span>'
253
                    ),
254
                }
255
        else:
256
            return {}
8✔
257

258
    def differ(
8✔
259
        self,
260
        directives: dict[str, Any],
261
        report_kind: Literal['text', 'markdown', 'html'],
262
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
263
        tz: ZoneInfo | None = None,
264
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
265
        """Generate a formatted diff representation of data changes.
266

267
        Creates a diff representation in one or more output formats (text, markdown, or HTML).
268
        At minimum, this function must return output in the format specified by 'report_kind'.
269
        As results are memoized for performance optimization, it can generate up to all three formats simultaneously.
270

271
        :param state: The JobState.
272

273
        :param directives: The directives.
274
        :param report_kind: The report_kind for which a diff must be generated (at a minimum).
275
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
276
           for a different report_kind.
277
        :param tz: The timezone of the report.
278
        :returns: An empty dict if there is no change, otherwise a dict with report_kind as key and diff as value
279
           (as a minimum for the report_kind requested).
280
        :raises RuntimeError: If the external diff tool returns an error.
281
        """
282
        raise NotImplementedError()
283

284
    @staticmethod
8✔
285
    def make_timestamp(
8✔
286
        timestamp: float,
287
        tz: ZoneInfo | None = None,
288
    ) -> str:
289
        """Format a timestamp as an RFC 5322 compliant datetime string.
290

291
        Converts a numeric timestamp to a formatted datetime string following the RFC 5322 (email) standard. When a
292
        timezone is provided, its full name, if known, is appended.
293

294
        :param timestamp: The timestamp.
295
        :param tz: The IANA timezone of the report.
296
        :returns: A datetime string in RFC 5322 (email) format or 'NEW' if timestamp is 0.
297
        """
298
        if timestamp:
8✔
299
            dt = datetime.fromtimestamp(timestamp).astimezone(tz=tz)
8✔
300
            # add timezone name if known
301
            if dt.strftime('%Z') != dt.strftime('%z')[:3]:
8✔
302
                cfws = f" ({dt.strftime('%Z')})"
8✔
303
            else:
304
                cfws = ''
8✔
305
            return dt.strftime('%a, %d %b %Y %H:%M:%S %z') + cfws
8✔
306
        else:
307
            return 'NEW'
8✔
308

309
    @staticmethod
8✔
310
    def html2text(data: str) -> str:
8✔
311
        """Converts html to text.
312

313
        :param data: the string in html format.
314
        :returns: the string in text format.
315
        """
316
        parser = html2text.HTML2Text()
8✔
317
        parser.unicode_snob = True
8✔
318
        parser.body_width = 0
8✔
319
        parser.ignore_images = True
8✔
320
        parser.single_line_break = True
8✔
321
        parser.wrap_links = False
8✔
322
        return '\n'.join(line.rstrip() for line in parser.handle(data).splitlines())
8✔
323

324
    def raise_import_error(self, package_name: str, error_message: str) -> None:
8✔
325
        """Raise ImportError for missing package.
326

327
        :param package_name: The name of the module/package that could not be imported.
328
        :param error_message: The error message from ImportError.
329

330
        :raises: ImportError.
331
        """
332
        raise ImportError(
8✔
333
            f"Job {self.job.index_number}: Python package '{package_name}' is not installed; cannot use "
334
            f"'differ: {self.__kind__}' ({self.job.get_location()})\n{error_message}"
335
        )
336

337

338
class UnifiedDiffer(DifferBase):
8✔
339
    """(Default) Generates a unified diff."""
340

341
    __kind__ = 'unified'
8✔
342

343
    __supported_directives__ = {
8✔
344
        'context_lines': 'the number of context lines (default: 3)',
345
        'range_info': 'include range information lines (default: true)',
346
        'additions_only': 'keep only addition lines (default: false)',
347
        'deletions_only': 'keep only deletion lines (default: false)',
348
    }
349

350
    def unified_diff_to_html(self, diff: str) -> Iterator[str]:
8✔
351
        """
352
        Generates a colorized HTML table from unified diff, applying styles and processing based on job values.
353

354
        :param diff: the unified diff
355
        """
356

357
        def process_line(line: str, line_num: int, is_markdown: bool, monospace_style: str) -> str:
8✔
358
            """
359
            Processes each line for HTML output, handling special cases and styles.
360

361
            :param line: The line to analyze.
362
            :param line_num: The line number in the document.
363
            :param monospace_style: Additional style string for monospace text.
364

365
            :returns: The line processed into an HTML table row string.
366
            """
367
            # The style= string (or empty string) to add to an HTML tag.
368
            if line_num == 0:
8✔
369
                style = 'font-family:monospace;color:darkred;'
8✔
370
            elif line_num == 1:
8✔
371
                style = 'font-family:monospace;color:darkgreen;'
8✔
372
            elif line[0] == '+':  # addition
8✔
373
                style = f'{monospace_style}{self.css_added_style}'
8✔
374
            elif line[0] == '-':  # deletion
8✔
375
                style = f'{monospace_style}{self.css_deltd_style}'
8✔
376
            elif line[0] == ' ':  # context line
8✔
377
                style = monospace_style
8✔
378
            elif line[0] == '@':  # range information
8✔
379
                style = 'font-family:monospace;background-color:#fbfbfb;'
8✔
380
            elif line[0] == '/':  # informational header added by additions_only or deletions_only filters
8!
381
                style = 'background-color:lightyellow;'
8✔
382
            else:
383
                raise RuntimeError('Unified Diff does not comform to standard!')
×
384
            style = f' style="{style}"' if style else ''
8✔
385

386
            if line_num > 1 and line[0] != '@':  # don't apply to headers or range information
8✔
387
                if is_markdown or line[0] == '/':  # our informational header
8✔
388
                    line = mark_to_html(line[1:], self.job.markdown_padded_tables)
8✔
389
                else:
390
                    line = linkify(line[1:])
8✔
391
            return f'<tr><td{style}>{line}</td></tr>'
8✔
392

393
        table_style = (
8✔
394
            ' style="border-collapse:collapse;font-family:monospace;white-space:pre-wrap;"'
395
            if self.job.monospace
396
            else ' style="border-collapse:collapse;"'
397
        )
398
        yield f'<table{table_style}>'
8✔
399
        is_markdown = self.state.is_markdown()
8✔
400
        monospace_style = 'font-family:monospace;' if self.job.monospace else ''
8✔
401
        for i, line in enumerate(diff.splitlines()):
8✔
402
            yield process_line(line, i, is_markdown, monospace_style)
8✔
403
        yield '</table>'
8✔
404

405
    def differ(
8✔
406
        self,
407
        directives: dict[str, Any],
408
        report_kind: Literal['text', 'markdown', 'html'],
409
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
410
        tz: ZoneInfo | None = None,
411
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
412
        additions_only = directives.get('additions_only') or self.job.additions_only
8✔
413
        deletions_only = directives.get('deletions_only') or self.job.deletions_only
8✔
414
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
415
        if report_kind == 'html' and _unfiltered_diff is not None and 'text' in _unfiltered_diff:
8✔
416
            diff_text = _unfiltered_diff['text']
8✔
417
        else:
418
            empty_return: dict[Literal['text', 'markdown', 'html'], str] = {'text': '', 'markdown': '', 'html': ''}
8✔
419
            contextlines = directives.get('context_lines', self.job.contextlines)
8✔
420
            if contextlines is None:
8✔
421
                if additions_only or deletions_only:
8✔
422
                    contextlines = 0
8✔
423
                else:
424
                    contextlines = 3
8✔
425
            diff = list(
8✔
426
                difflib.unified_diff(
427
                    str(self.state.old_data).splitlines(),
428
                    str(self.state.new_data).splitlines(),
429
                    '@',
430
                    '@',
431
                    self.make_timestamp(self.state.old_timestamp, tz),
432
                    self.make_timestamp(self.state.new_timestamp, tz),
433
                    contextlines,
434
                    lineterm='',
435
                )
436
            )
437
            if not diff:
8✔
438
                self.state.verb = 'changed,no_report'
8✔
439
                return empty_return
8✔
440
            # replace tabs in header lines
441
            diff[0] = diff[0].replace('\t', ' ')
8✔
442
            diff[1] = diff[1].replace('\t', ' ')
8✔
443

444
            if additions_only:
8✔
445
                if len(self.state.old_data) and len(self.state.new_data) / len(self.state.old_data) <= 0.25:
8✔
446
                    diff = (
8✔
447
                        diff[:2]
448
                        + ['/**Comparison type: Additions only**']
449
                        + ['/**Deletions are being shown as 75% or more of the content has been deleted**']
450
                        + diff[2:]
451
                    )
452
                else:
453
                    head = '---' + diff[0][3:]
8✔
454
                    diff = [line for line in diff if line.startswith('+') or line.startswith('@')]
8!
455
                    diff = [
8!
456
                        line1
457
                        for line1, line2 in zip([''] + diff, diff + [''])
458
                        if not (line1.startswith('@') and line2.startswith('@'))
459
                    ][1:]
460
                    diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
461
                    if len(diff) == 1 or len([line for line in diff if line.lstrip('+').rstrip()]) == 2:
8!
462
                        self.state.verb = 'changed,no_report'
8✔
463
                        return empty_return
8✔
464
                    diff = [head, diff[0], '/**Comparison type: Additions only**'] + diff[1:]
8✔
465
            elif deletions_only:
8✔
466
                head = '--- @' + diff[1][3:]
8✔
467
                diff = [line for line in diff if line.startswith('-') or line.startswith('@')]
8!
468
                diff = [
8!
469
                    line1
470
                    for line1, line2 in zip([''] + diff, diff + [''])
471
                    if not (line1.startswith('@') and line2.startswith('@'))
472
                ][1:]
473
                diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
474
                if len(diff) == 1 or len([line for line in diff if line.lstrip('-').rstrip()]) == 2:
8!
475
                    self.state.verb = 'changed,no_report'
8✔
476
                    return empty_return
8✔
477
                diff = [diff[0], head, '/**Comparison type: Deletions only**'] + diff[1:]
8✔
478

479
            # remove range info lines if needed
480
            if directives.get('range_info') is False or (
8✔
481
                directives.get('range_info') is None and additions_only and (len(diff) < 4 or diff[3][0] != '/')
482
            ):
483
                diff = [line for line in diff if not line.startswith('@@ ')]
8!
484

485
            diff_text = '\n'.join(diff)
8✔
486

487
            out_diff.update(
8✔
488
                {
489
                    'text': diff_text,
490
                    'markdown': diff_text,
491
                }
492
            )
493

494
        if report_kind == 'html':
8✔
495
            out_diff['html'] = '\n'.join(self.unified_diff_to_html(diff_text))
8✔
496

497
        return out_diff
8✔
498

499

500
class TableDiffer(DifferBase):
8✔
501
    """Generates a Python HTML table diff."""
502

503
    __kind__ = 'table'
8✔
504

505
    __supported_directives__ = {
8✔
506
        'tabsize': 'tab stop spacing (default: 8)',
507
    }
508

509
    def differ(
8✔
510
        self,
511
        directives: dict[str, Any],
512
        report_kind: Literal['text', 'markdown', 'html'],
513
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
514
        tz: ZoneInfo | None = None,
515
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
516
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
517
        if report_kind in {'text', 'markdown'} and _unfiltered_diff is not None and 'html' in _unfiltered_diff:
8✔
518
            table = _unfiltered_diff['html']
8✔
519
        else:
520
            tabsize = int(directives.get('tabsize', 8))
8✔
521
            html_diff = difflib.HtmlDiff(tabsize=tabsize)
8✔
522
            table = html_diff.make_table(
8✔
523
                str(self.state.old_data).splitlines(keepends=True),
524
                str(self.state.new_data).splitlines(keepends=True),
525
                self.make_timestamp(self.state.old_timestamp, tz),
526
                self.make_timestamp(self.state.new_timestamp, tz),
527
                True,
528
                3,
529
            )
530
            # fix table formatting
531
            table = table.replace('<th ', '<th style="font-family:monospace" ')
8✔
532
            table = table.replace('<td ', '<td style="font-family:monospace" ')
8✔
533
            table = table.replace(' nowrap="nowrap"', '')
8✔
534
            table = table.replace('<a ', '<a style="font-family:monospace;color:inherit" ')
8✔
535
            table = table.replace('<span class="diff_add"', '<span style="color:green;background-color:lightgreen"')
8✔
536
            table = table.replace('<span class="diff_sub"', '<span style="color:red;background-color:lightred"')
8✔
537
            table = table.replace('<span class="diff_chg"', '<span style="color:orange;background-color:lightyellow"')
8✔
538
            out_diff['html'] = table
8✔
539

540
        if report_kind in {'text', 'markdown'}:
8✔
541
            diff_text = self.html2text(table)
8✔
542
            out_diff.update(
8✔
543
                {
544
                    'text': diff_text,
545
                    'markdown': diff_text,
546
                }
547
            )
548

549
        return out_diff
8✔
550

551

552
class CommandDiffer(DifferBase):
8✔
553
    """Runs an external command to generate the diff."""
554

555
    __kind__ = 'command'
8✔
556

557
    __supported_directives__ = {
8✔
558
        'command': 'The command to execute',
559
        'is_html': 'Whether the output of the command is HTML',
560
    }
561

562
    re_ptags = re.compile(r'^<p>|</p>$')
8✔
563
    re_htags = re.compile(r'<(/?)h\d>')
8✔
564
    re_tagend = re.compile(r'<(?!.*<).*>+$')
8✔
565

566
    def differ(
8✔
567
        self,
568
        directives: dict[str, Any],
569
        report_kind: Literal['text', 'markdown', 'html'],
570
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
571
        tz: ZoneInfo | None = None,
572
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
573
        if self.job.monospace:
8!
574
            head_html = '\n'.join(
×
575
                [
576
                    '<span style="font-family:monospace;white-space:pre-wrap;">',
577
                    # f"Using command differ: {directives['command']}",
578
                    f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
579
                    f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>',
580
                ]
581
            )
582
        else:
583
            head_html = '<br>\n'.join(
8✔
584
                [
585
                    '<span style="font-family:monospace;">',
586
                    # f"Using command differ: {directives['command']}",
587
                    f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
588
                    f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>',
589
                    '</span>',
590
                ]
591
            )
592

593
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
594
        command = directives['command']
8✔
595
        if report_kind == 'html' and _unfiltered_diff is not None and 'text' in _unfiltered_diff:
8✔
596
            diff_text = ''.join(_unfiltered_diff['text'].splitlines(keepends=True)[2:])
8✔
597
        else:
598
            old_data = self.state.old_data
8✔
599
            new_data = self.state.new_data
8✔
600
            if self.state.is_markdown():
8✔
601
                # protect the link anchor from being split (won't work)
602
                markdown_links_re = re.compile(r'\[(.*?)][(](.*?)[)]')
8✔
603
                old_data = markdown_links_re.sub(
8!
604
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(old_data)
605
                )
606
                new_data = markdown_links_re.sub(
8!
607
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(new_data)
608
                )
609

610
            # External diff tool
611
            with tempfile.TemporaryDirectory() as tmp_dir:
8✔
612
                tmp_path = Path(tmp_dir)
8✔
613
                old_file_path = tmp_path.joinpath('old_file')
8✔
614
                new_file_path = tmp_path.joinpath('new_file')
8✔
615
                if isinstance(old_data, str):
8!
616
                    old_file_path.write_text(old_data)
8✔
617
                else:
618
                    old_file_path.write_bytes(old_data)
×
619
                if isinstance(new_data, str):
8!
620
                    new_file_path.write_text(new_data)
8✔
621
                else:
622
                    new_file_path.write_bytes(new_data)
×
623
                cmdline = shlex.split(command) + [str(old_file_path), str(new_file_path)]
8✔
624
                proc = subprocess.run(cmdline, capture_output=True, text=True)  # noqa: S603 subprocess call
8✔
625
            if proc.stderr or proc.returncode > 1:
8✔
626
                raise RuntimeError(
8✔
627
                    f"Job {self.job.index_number}: External differ '{directives}' returned '{proc.stderr.strip()}' "
628
                    f'({self.job.get_location()})'
629
                ) from subprocess.CalledProcessError(proc.returncode, cmdline)
630
            if proc.returncode == 0:
8✔
631
                self.state.verb = 'changed,no_report'
8✔
632
                logger.info(
8✔
633
                    f"Job {self.job.index_number}: Command in differ 'command' returned 0 (no report) "
634
                    f'({self.job.get_location()})'
635
                )
636
                return {'text': '', 'markdown': '', 'html': ''}
8✔
637
            head_text = '\n'.join(
8✔
638
                [
639
                    # f"Using command differ: {directives['command']}",
640
                    f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m',
641
                    f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m',
642
                    '',
643
                ]
644
            )
645
            diff = proc.stdout
8✔
646
            if self.state.is_markdown():
8!
647
                # undo the protection of the link anchor from being split
648
                diff = markdown_links_re.sub(lambda x: f'[{urllib.parse.unquote(x.group(1))}]({x.group(2)})', diff)
8!
649
            if command.startswith('wdiff') and self.job.contextlines == 0:
8!
650
                # remove lines that don't have any changes
651
                keeplines = []
×
652
                for line in diff.splitlines(keepends=True):
×
653
                    if any(x in line for x in {'{+', '+}', '[-', '-]'}):
×
654
                        keeplines.append(line)
×
655
                diff = ''.join(keeplines)
×
656
            if directives.get('is_html'):
8!
657
                diff_text = self.html2text(diff)
×
658
                out_diff.update(
×
659
                    {
660
                        'text': head_text + diff_text,
661
                        'markdown': head_text + diff_text,
662
                        'html': head_html + diff,
663
                    }
664
                )
665
            else:
666
                diff_text = diff
8✔
667
                out_diff.update(
8✔
668
                    {
669
                        'text': head_text + diff_text,
670
                        'markdown': head_text + diff_text,
671
                    }
672
                )
673

674
        if report_kind == 'html' and 'html' not in out_diff:
8✔
675
            if command.startswith('wdiff'):
8!
676
                # colorize output of wdiff
677
                out_diff['html'] = head_html + self.wdiff_to_html(diff_text)
×
678
            else:
679
                out_diff['html'] = head_html + html.escape(diff_text)
8✔
680

681
        if self.job.monospace and 'html' in out_diff:
8!
682
            out_diff['html'] += '</span>'
×
683

684
        return out_diff
8✔
685

686
    def wdiff_to_html(self, diff: str) -> str:
8✔
687
        """
688
        Colorize output of wdiff.
689

690
        :param diff: The output of the wdiff command.
691
        :returns: The colorized HTML output.
692
        """
693
        html_diff = html.escape(diff)
8✔
694
        if self.state.is_markdown():
8✔
695
            # detect and fix multiline additions or deletions
696
            is_add = False
8✔
697
            is_del = False
8✔
698
            new_diff = []
8✔
699
            for line in html_diff.splitlines():
8✔
700
                if is_add:
8✔
701
                    line = '{+' + line
8✔
702
                    is_add = False
8✔
703
                elif is_del:
8✔
704
                    line = '[-' + line
8✔
705
                    is_del = False
8✔
706
                for match in re.findall(r'\[-|-]|{\+|\+}', line):
8✔
707
                    if match == '[-':
8✔
708
                        is_del = True
8✔
709
                    if match == '-]':
8✔
710
                        is_del = False
8✔
711
                    if match == '{+':
8✔
712
                        is_add = True
8✔
713
                    if match == '+}':
8✔
714
                        is_add = False
8✔
715
                if is_add:
8✔
716
                    line += '+}'
8✔
717
                elif is_del:
8✔
718
                    line += '-]'
8✔
719
                new_diff.append(line)
8✔
720
            html_diff = '<br>\n'.join(new_diff)
8✔
721

722
        # wdiff colorization (cannot be done with global CSS class as Gmail overrides it)
723
        html_diff = re.sub(
8✔
724
            r'\{\+(.*?)\+}',
725
            lambda x: f'<span style="{self.css_added_style}">{x.group(1)}</span>',
726
            html_diff,
727
            flags=re.DOTALL,
728
        )
729
        html_diff = re.sub(
8✔
730
            r'\[-(.*?)-]',
731
            lambda x: f'<span style="{self.css_deltd_style}">{x.group(1)}</span>',
732
            html_diff,
733
            flags=re.DOTALL,
734
        )
735
        if self.job.monospace:
8✔
736
            return f'<span style="font-family:monospace;white-space:pre-wrap">{html_diff}</span>'
8✔
737
        else:
738
            return html_diff
8✔
739

740

741
class DeepdiffDiffer(DifferBase):
8✔
742

743
    __kind__ = 'deepdiff'
8✔
744

745
    __supported_directives__ = {
8✔
746
        'data_type': "either 'json' (default) or 'xml'",
747
        'ignore_order': 'Whether to ignore the order in which the items have appeared (default: false)',
748
        'ignore_string_case': 'Whether to be case-sensitive or not when comparing strings (default: false)',
749
        'significant_digits': (
750
            'The number of digits AFTER the decimal point to be used in the comparison (default: ' 'no limit)'
751
        ),
752
    }
753

754
    def differ(
8✔
755
        self,
756
        directives: dict[str, Any],
757
        report_kind: Literal['text', 'markdown', 'html'],
758
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
759
        tz: ZoneInfo | None = None,
760
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
761
        if isinstance(DeepDiff, str):  # pragma: no cover
762
            self.raise_import_error('deepdiff', DeepDiff)
763

764
        span_added = f'<span style="{self.css_added_style}">'
8✔
765
        span_deltd = f'<span style="{self.css_deltd_style}">'
8✔
766

767
        def _pretty_deepdiff(ddiff: DeepDiff, report_kind: Literal['text', 'markdown', 'html']) -> str:
8✔
768
            """
769
            Customized version of deepdiff.serialization.SerializationMixin.pretty method, edited to include the
770
            values deleted or added and an option for colorized HTML output. The pretty human-readable string
771
            output for the diff object regardless of what view was used to generate the diff.
772
            """
773
            if report_kind == 'html':
8✔
774
                PRETTY_FORM_TEXTS = {
8✔
775
                    'type_changes': (
776
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
777
                        f'from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}</span>.'
778
                    ),
779
                    'values_changed': (
780
                        f'Value of {{diff_path}} changed from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}'
781
                        '</span>.'
782
                    ),
783
                    'dictionary_item_added': (
784
                        f'Item {{diff_path}} added to dictionary as {span_added}{{val_t2}}</span>.'
785
                    ),
786
                    'dictionary_item_removed': (
787
                        f'Item {{diff_path}} removed from dictionary (was {span_deltd}{{val_t1}}</span>).'
788
                    ),
789
                    'iterable_item_added': f'Item {{diff_path}} added to iterable as {span_added}{{val_t2}}</span>.',
790
                    'iterable_item_removed': (
791
                        f'Item {{diff_path}} removed from iterable (was {span_deltd}{{val_t1}}</span>).'
792
                    ),
793
                    'attribute_added': f'Attribute {{diff_path}} added as {span_added}{{val_t2}}</span>.',
794
                    'attribute_removed': f'Attribute {{diff_path}} removed (was {span_deltd}{{val_t1}}</span>).',
795
                    'set_item_added': f'Item root[{{val_t2}}] added to set as {span_added}{{val_t1}}</span>.',
796
                    'set_item_removed': (
797
                        f'Item root[{{val_t1}}] removed from set (was {span_deltd}{{val_t2}}</span>).'
798
                    ),
799
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
800
                }
801
            else:
802
                PRETTY_FORM_TEXTS = {
8✔
803
                    'type_changes': (
804
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
805
                        'from {val_t1} to {val_t2}.'
806
                    ),
807
                    'values_changed': 'Value of {diff_path} changed from {val_t1} to {val_t2}.',
808
                    'dictionary_item_added': 'Item {diff_path} added to dictionary as {val_t2}.',
809
                    'dictionary_item_removed': 'Item {diff_path} removed from dictionary (was {val_t1}).',
810
                    'iterable_item_added': 'Item {diff_path} added to iterable as {val_t2}.',
811
                    'iterable_item_removed': 'Item {diff_path} removed from iterable (was {val_t1}).',
812
                    'attribute_added': 'Attribute {diff_path} added as {val_t2}.',
813
                    'attribute_removed': 'Attribute {diff_path} removed (was {val_t1}).',
814
                    'set_item_added': 'Item root[{val_t2}] added to set as {val_t1}.',
815
                    'set_item_removed': 'Item root[{val_t1}] removed from set (was {val_t2}).',
816
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
817
                }
818

819
            def _pretty_print_diff(ddiff: DiffLevel) -> str:
8✔
820
                """
821
                Customized version of deepdiff.serialization.pretty_print_diff() function, edited to include the
822
                values deleted or added.
823
                """
824
                type_t1 = type(ddiff.t1).__name__
8✔
825
                type_t2 = type(ddiff.t2).__name__
8✔
826

827
                val_t1 = (
8✔
828
                    f'"{ddiff.t1}"'
829
                    if type_t1 in {'str', 'int', 'float'}
830
                    else (
831
                        jsonlib.dumps(ddiff.t1, ensure_ascii=False, indent=2)
832
                        if type_t1 in {'dict', 'list'}
833
                        else str(ddiff.t1)
834
                    )
835
                )
836
                val_t2 = (
8✔
837
                    f'"{ddiff.t2}"'
838
                    if type_t2 in {'str', 'int', 'float'}
839
                    else (
840
                        jsonlib.dumps(ddiff.t2, ensure_ascii=False, indent=2)
841
                        if type_t2 in {'dict', 'list'}
842
                        else str(ddiff.t2)
843
                    )
844
                )
845

846
                diff_path = ddiff.path()  # type: ignore[no-untyped-call]
8✔
847
                return '• ' + PRETTY_FORM_TEXTS.get(ddiff.report_type, '').format(
8✔
848
                    diff_path=diff_path,
849
                    type_t1=type_t1,
850
                    type_t2=type_t2,
851
                    val_t1=val_t1,
852
                    val_t2=val_t2,
853
                )
854

855
            result = []
8✔
856
            for key in ddiff.tree.keys():
8✔
857
                for item_key in ddiff.tree[key]:
8✔
858
                    result.append(_pretty_print_diff(item_key))
8✔
859

860
            return '\n'.join(result)
8✔
861

862
        data_type = directives.get('data_type', 'json')
8✔
863
        old_data = ''
8✔
864
        new_data = ''
8✔
865
        if data_type == 'json':
8✔
866
            try:
8✔
867
                old_data = jsonlib.loads(self.state.old_data)
8✔
868
            except jsonlib.JSONDecodeError:
8✔
869
                old_data = ''
8✔
870
            try:
8✔
871
                new_data = jsonlib.loads(self.state.new_data)
8✔
872
            except jsonlib.JSONDecodeError as e:
8✔
873
                self.state.exception = e
8✔
874
                self.state.traceback = self.job.format_error(e, traceback.format_exc())
8✔
875
                logger.error(f'Job {self.job.index_number}: New data is invalid JSON: {e} ({self.job.get_location()})')
8✔
876
                logger.info(f'Job {self.job.index_number}: {self.state.new_data!r}')
8✔
877
                return {
8✔
878
                    'text': f'Differ {self.__kind__} ERROR: New data is invalid JSON\n{e}',
879
                    'markdown': f'Differ {self.__kind__} **ERROR: New data is invalid JSON**\n{e}',
880
                    'html': f'Differ {self.__kind__} <b>ERROR: New data is invalid JSON</b>\n{e}',
881
                }
882
        elif data_type == 'xml':
8✔
883
            if isinstance(xmltodict, str):  # pragma: no cover
884
                self.raise_import_error('xmltodict', xmltodict)
885

886
            old_data = xmltodict.parse(self.state.old_data)
8✔
887
            new_data = xmltodict.parse(self.state.new_data)
8✔
888

889
        ignore_order: bool = directives.get('ignore_order')  # type: ignore[assignment]
8✔
890
        ignore_string_case: bool = directives.get('ignore_string_case')  # type: ignore[assignment]
8✔
891
        significant_digits = directives.get('significant_digits')
8✔
892
        ddiff = DeepDiff(
8✔
893
            old_data,
894
            new_data,
895
            cache_size=500,
896
            cache_purge_level=0,
897
            cache_tuning_sample_size=500,
898
            ignore_order=ignore_order,
899
            ignore_string_type_changes=True,
900
            ignore_numeric_type_changes=True,
901
            ignore_string_case=ignore_string_case,
902
            significant_digits=significant_digits,
903
            verbose_level=min(2, max(0, math.ceil(3 - logger.getEffectiveLevel() / 10))),
904
        )
905
        diff_text = _pretty_deepdiff(ddiff, report_kind)
8✔
906
        if not diff_text:
8✔
907
            self.state.verb = 'changed,no_report'
8✔
908
            return {'text': '', 'markdown': '', 'html': ''}
8✔
909

910
        self.job.set_to_monospace()
8✔
911
        if report_kind == 'html':
8✔
912
            html_diff = (
8✔
913
                f'<span style="font-family:monospace;white-space:pre-wrap;">'
914
                # f'Differ: {self.__kind__} for {data_type}\n'
915
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>\n'
916
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>\n'
917
                + diff_text[:-1].replace('][', ']<wbr>[')
918
                + '</span>'
919
            )
920
            return {'html': html_diff}
8✔
921
        else:
922
            text_diff = (
8✔
923
                # f'Differ: {self.__kind__} for {data_type}\n'
924
                f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\n'
925
                f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\n'
926
                f'{diff_text}'
927
            )
928
            return {'text': text_diff, 'markdown': text_diff}
8✔
929

930

931
class ImageDiffer(DifferBase):
8✔
932
    """Compares two images providing an image outlining areas that have changed."""
933

934
    __kind__ = 'image'
8✔
935

936
    __supported_directives__ = {
8✔
937
        'data_type': (
938
            "'url' (to retrieve an image), 'ascii85' (Ascii85 data), 'base64' (Base64 data) or 'filename' (the path "
939
            "to an image file) (default: 'url')"
940
        ),
941
        'mse_threshold': (
942
            'the minimum mean squared error (MSE) between two images to consider them changed, if numpy in installed '
943
            '(default: 2.5)'
944
        ),
945
        'ai_google': 'Generative AI summary of changes (BETA)',
946
    }
947

948
    def differ(
8✔
949
        self,
950
        directives: dict[str, Any],
951
        report_kind: Literal['text', 'markdown', 'html'],
952
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
953
        tz: ZoneInfo | None = None,
954
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
955
        warnings.warn(
2✔
956
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
957
            f'change in the future. Please report any problems or suggestions at '
958
            f'https://github.com/mborsetti/webchanges/discussions.',
959
            RuntimeWarning,
960
        )
961
        if isinstance(Image, str):  # pragma: no cover
962
            self.raise_import_error('pillow', Image)
963
        if isinstance(httpx, str):  # pragma: no cover
964
            self.raise_import_error('httpx', httpx)
965

966
        def load_image_from_web(url: str) -> Image.Image:
2✔
967
            """Fetches the image from an url."""
968
            logging.debug(f'Retrieving image from {url}')
2✔
969
            with httpx.stream('GET', url, timeout=10) as response:
2✔
970
                response.raise_for_status()
2✔
971
                return Image.open(BytesIO(b''.join(response.iter_bytes())))
2✔
972

973
        def load_image_from_file(filename: str) -> Image.Image:
2✔
974
            """Load an image from a file."""
975
            logging.debug(f'Reading image from {filename}')
2✔
976
            return Image.open(filename)
2✔
977

978
        def load_image_from_base64(base_64: str) -> Image.Image:
2✔
979
            """Load an image from an encoded bytes object."""
980
            logging.debug('Retrieving image from a base64 string')
2✔
981
            return Image.open(BytesIO(base64.b64decode(base_64)))
2✔
982

983
        def load_image_from_ascii85(ascii85: str) -> Image.Image:
2✔
984
            """Load an image from an encoded bytes object."""
985
            logging.debug('Retrieving image from an ascii85 string')
2✔
986
            return Image.open(BytesIO(base64.a85decode(ascii85)))
2✔
987

988
        def compute_diff_image(img1: Image.Image, img2: Image.Image) -> tuple[Image.Image, np.float64]:
2✔
989
            """Compute the difference between two images."""
990
            # Compute the absolute value of the pixel-by-pixel difference between the two images.
991
            diff_image = ImageChops.difference(img1, img2)
2✔
992

993
            # Compute the mean squared error between the images
994
            if not isinstance(np, str):
2✔
995
                diff_array = np.array(diff_image)
2✔
996
                mse_value = np.mean(np.square(diff_array))
2✔
997
            else:  # pragma: no cover
998
                mse_value = None
999

1000
            # Create the diff image by overlaying this difference on a darkened greyscale background
1001
            back_image = img1.convert('L')
2✔
1002
            back_image_brightness = ImageStat.Stat(back_image).rms[0]
2✔
1003
            back_image = ImageEnhance.Brightness(back_image).enhance(back_image_brightness / 225)
2✔
1004

1005
            # Convert the 'L' image to 'RGB' using a matrix that applies to yellow tint
1006
            # The matrix has 12 elements: 4 for Red, 4 for Green, and 4 for Blue.
1007
            # For yellow, we want Red and Green to copy the L values (1.0) and Blue to be zero.
1008
            # The matrix is: [R, G, B, A] for each of the three output channels
1009
            yellow_tint_matrix = (
2✔
1010
                1.0,
1011
                0.0,
1012
                0.0,
1013
                0.0,  # Red = 100% of the grayscale value
1014
                1.0,
1015
                0.0,
1016
                0.0,
1017
                0.0,  # Green = 100% of the grayscale value
1018
                0.0,
1019
                0.0,
1020
                0.0,
1021
                0.0,  # Blue = 0% of the grayscale value
1022
            )
1023

1024
            # Apply the conversion
1025
            diff_colored = diff_image.convert('RGB').convert('RGB', matrix=yellow_tint_matrix)
2✔
1026

1027
            final_img = ImageChops.add(back_image.convert('RGB'), diff_colored)
2✔
1028
            final_img.format = img2.format
2✔
1029

1030
            return final_img, mse_value
2✔
1031

1032
        def ai_google(
2✔
1033
            old_image: Image.Image,
1034
            new_image: Image.Image,
1035
            diff_image: Image.Image,
1036
            directives: AiGoogleDirectives,
1037
        ) -> str:
1038
            """Summarize changes in image using Generative AI (ALPHA)."""
1039
            logger.info(f'Job {self.job.index_number}: Running ai_google for {self.__kind__} differ')
×
1040
            warnings.warn(
×
1041
                f'Job {self.job.index_number}: Using ai_google in differ {self.__kind__}, which is ALPHA, '
1042
                f'may have bugs, and may change in the future. Please report any problems or suggestions at '
1043
                f'https://github.com/mborsetti/webchanges/discussions.',
1044
                RuntimeWarning,
1045
            )
1046

1047
            api_version = '1beta'
×
1048
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
×
1049
            if len(GOOGLE_AI_API_KEY) != 39:
×
1050
                logger.error(
×
1051
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1052
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1053
                )
1054
                return (
×
1055
                    f'## ERROR in summarizing changes using {self.__kind__}:\n'
1056
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1057
                    f'{len(GOOGLE_AI_API_KEY)}.\n'
1058
                )
1059
            client = httpx.Client(http2=True, timeout=self.job.timeout)
×
1060

1061
            def _load_image(img_data: tuple[str, Image.Image]) -> dict[str, dict[str, str] | Exception | str]:
×
1062
                img_name, image = img_data
×
1063
                # Convert image to bytes
1064
                img_byte_arr = io.BytesIO()
×
1065
                image.save(img_byte_arr, format=image.format)
×
1066
                image_data = img_byte_arr.getvalue()
×
1067
                mime_type = f'image/{image.format.lower()}'  # type: ignore[union-attr]
×
1068

1069
                logger.info(
×
1070
                    f'Job {self.job.index_number}: Loading {img_name} ({image.format}) to Google AI '
1071
                    f'({len(image_data) / 1024:,.0f} kbytes)'
1072
                )
1073

1074
                # Initial resumable upload request
1075
                headers = {
×
1076
                    'X-Goog-Upload-Protocol': 'resumable',
1077
                    'X-Goog-Upload-Command': 'start',
1078
                    'X-Goog-Upload-Header-Content-Length': str(len(image_data)),
1079
                    'X-Goog-Upload-Header-Content-Type': mime_type,
1080
                    'Content-Type': 'application/json',
1081
                }
1082
                data = {'file': {'display_name': 'TEXT'}}
×
1083

1084
                try:
×
1085
                    response = client.post(
×
1086
                        f'https://generativelanguage.googleapis.com/upload/v{api_version}/files?'
1087
                        f'key={GOOGLE_AI_API_KEY}',
1088
                        headers=headers,
1089
                        json=data,
1090
                    )
1091
                except httpx.HTTPError as e:
×
1092
                    return {'error': e, 'img_name': img_name}
×
1093
                upload_url = response.headers['X-Goog-Upload-Url']
×
1094

1095
                # Upload the image data
1096
                headers = {
×
1097
                    'Content-Length': str(len(image_data)),
1098
                    'X-Goog-Upload-Offset': '0',
1099
                    'X-Goog-Upload-Command': 'upload, finalize',
1100
                }
1101
                try:
×
1102
                    response = client.post(upload_url, headers=headers, content=image_data)
×
1103
                except httpx.HTTPError as e:
×
1104
                    return {'error': e, 'img_name': img_name}
×
1105

1106
                # Extract file URI from response
1107
                file_info = response.json()
×
1108
                file_uri = file_info['file']['uri']
×
1109
                logger.info(f'Job {self.job.index_number}: {img_name.capitalize()} loaded to {file_uri}')
×
1110

1111
                return {
×
1112
                    'file_data': {
1113
                        'mime_type': mime_type,
1114
                        'file_uri': file_uri,
1115
                    }
1116
                }
1117

1118
            # upload to Google
1119
            additional_parts: list[dict[str, dict[str, str]]] = []
×
1120
            executor = ThreadPoolExecutor()
×
1121
            for additional_part in executor.map(
×
1122
                _load_image,
1123
                (
1124
                    ('old image', old_image),
1125
                    ('new image', new_image),
1126
                    ('differences image', diff_image),
1127
                ),
1128
            ):
1129
                if 'error' not in additional_part:
×
1130
                    additional_parts.append(additional_part)  # type: ignore[arg-type]
×
1131
                else:
1132
                    logger.error(
×
1133
                        f'Job {self.job.index_number}: ai_google for {self.__kind__} HTTP Client error '
1134
                        f"{type(additional_part['error'])} when loading {additional_part['img_name']} to Google AI: "
1135
                        f"{additional_part['error']}"
1136
                    )
1137
                    return (
×
1138
                        f"HTTP Client error {type(additional_part['error'])} when loading "
1139
                        f"{additional_part['img_name']} to Google AI: {additional_part['error']}"
1140
                    )
1141

1142
            system_instructions = (
×
1143
                'You are a skilled journalist tasked with summarizing the key differences between two versions '
1144
                'of the same image. The audience for your summary is already familiar with the image, so you can'
1145
                'focus on the most significant changes.'
1146
            )
1147
            model_prompt = (
×
1148
                'You are a skilled visual analyst tasked with analyzing two versions of an image and summarizing the '
1149
                'key differences between them. The audience for your summary is already familiar with the '
1150
                "image's content, so you should focus only on the most significant differences.\n\n"
1151
                '**Instructions:**\n\n'
1152
                '1. Carefully examine the yellow areas in the image '
1153
                f"{additional_parts[2]['file_data']['file_uri']}, identify the differences, and describe them.\n"
1154
                f"2. Refer to the old version of the image {additional_parts[0]['file_data']['file_uri']} and the new "
1155
                f" version {additional_parts[1]['file_data']['file_uri']}.\n"
1156
                '3. You are only interested in those differences, such as additions, removals, or alterations, that '
1157
                'modify the intended message or interpretation.\n'
1158
                '4. Summarize the identified differences, except those ignored, in a clear and concise manner, '
1159
                'explaining how the meaning has shifted or evolved in the new version compared to the old version only '
1160
                'when necessary. Be specific and provide examples to illustrate your points when needed.\n'
1161
                '5. If there are only additions to the image, then summarize the additions.\n'
1162
                '6. Use Markdown formatting to structure your summary effectively. Use headings, bullet points, '
1163
                'and other Markdown elements as needed to enhance readability.\n'
1164
                '7. Restrict your analysis and summary to the information provided within these images. Do '
1165
                'not introduce external information or assumptions.\n'
1166
            )
1167
            summary, _ = AIGoogleDiffer._send_to_model(
×
1168
                self.job,
1169
                system_instructions,
1170
                model_prompt,
1171
                additional_parts=additional_parts,  # type: ignore[arg-type]
1172
                directives=directives,
1173
            )
1174

1175
            return summary
×
1176

1177
        data_type = directives.get('data_type', 'url')
2✔
1178
        mse_threshold = directives.get('mse_threshold', 2.5)
2✔
1179
        if not isinstance(self.state.old_data, str):
2!
1180
            raise ValueError('old_data is not a string')
×
1181
        if not isinstance(self.state.new_data, str):
2!
1182
            raise ValueError('new_data is not a string')
×
1183
        if data_type == 'url':
2✔
1184
            old_image = load_image_from_web(self.state.old_data)
2✔
1185
            new_image = load_image_from_web(self.state.new_data)
2✔
1186
            old_data = f' (<a href="{self.state.old_data}" target="_blank">Old image</a>)'
2✔
1187
            new_data = f' (<a href="{self.state.new_data}" target="_blank">New image</a>)'
2✔
1188
        elif data_type == 'ascii85':
2✔
1189
            old_image = load_image_from_ascii85(self.state.old_data)
2✔
1190
            new_image = load_image_from_ascii85(self.state.new_data)
2✔
1191
            old_data = ''
2✔
1192
            new_data = ''
2✔
1193
        elif data_type == 'base64':
2✔
1194
            old_image = load_image_from_base64(self.state.old_data)
2✔
1195
            new_image = load_image_from_base64(self.state.new_data)
2✔
1196
            old_data = ''
2✔
1197
            new_data = ''
2✔
1198
        else:  # 'filename'
1199
            old_image = load_image_from_file(self.state.old_data)
2✔
1200
            new_image = load_image_from_file(self.state.new_data)
2✔
1201
            old_data = f' (<a href="file://{self.state.old_data}" target="_blank">Old image</a>)'
2✔
1202
            new_data = f' (<a href="file://{self.state.new_data}" target="_blank">New image</a>)'
2✔
1203

1204
        # Check formats  TODO: is it needed? under which circumstances?
1205
        # if new_image.format != old_image.format:
1206
        #     logger.info(f'Image formats do not match: {old_image.format} vs {new_image.format}')
1207
        # else:
1208
        #     logger.debug(f'image format is {old_image.format}')
1209

1210
        # If needed, shrink the larger image
1211
        if new_image.size != old_image.size:
2✔
1212
            if new_image.size > old_image.size:
2✔
1213
                logging.debug(f'Job {self.job.index_number}: Shrinking the new image')
2✔
1214
                img_format = new_image.format
2✔
1215
                new_image = new_image.resize(old_image.size, Image.Resampling.LANCZOS)
2✔
1216
                new_image.format = img_format
2✔
1217

1218
            else:
1219
                logging.debug(f'Job {self.job.index_number}: Shrinking the old image')
2✔
1220
                img_format = old_image.format
2✔
1221
                old_image = old_image.resize(new_image.size, Image.Resampling.LANCZOS)
2✔
1222
                old_image.format = img_format
2✔
1223

1224
        if old_image == new_image:
2✔
1225
            logger.info(f'Job {self.job.index_number}: New image is identical to the old one')
2✔
1226
            self.state.verb = 'unchanged'
2✔
1227
            return {'text': '', 'markdown': '', 'html': ''}
2✔
1228

1229
        diff_image, mse_value = compute_diff_image(old_image, new_image)
2✔
1230
        if mse_value:
2!
1231
            logger.debug(f'Job {self.job.index_number}: MSE value {mse_value:.2f}')
2✔
1232

1233
        if mse_value and mse_value < mse_threshold:
2✔
1234
            logger.info(
2✔
1235
                f'Job {self.job.index_number}: MSE value {mse_value:.2f} below the threshold of {mse_threshold}; '
1236
                f'considering changes not worthy of a report'
1237
            )
1238
            self.state.verb = 'changed,no_report'
2✔
1239
            return {'text': '', 'markdown': '', 'html': ''}
2✔
1240

1241
        # Convert the difference image to a base64 object
1242
        output_stream = BytesIO()
2✔
1243
        diff_image.save(output_stream, format=diff_image.format)
2✔
1244
        encoded_diff = b64encode(output_stream.getvalue()).decode()
2✔
1245

1246
        # Convert the new image to a base64 object
1247
        output_stream = BytesIO()
2✔
1248
        new_image.save(output_stream, format=new_image.format)
2✔
1249
        encoded_new = b64encode(output_stream.getvalue()).decode()
2✔
1250

1251
        # prepare AI summary
1252
        summary = ''
2✔
1253
        if 'ai_google' in directives:
2!
1254
            summary = ai_google(old_image, new_image, diff_image, directives.get('ai_google', {}))
×
1255

1256
        # Prepare HTML output
1257
        htm = [
2✔
1258
            f'<span style="font-family:monospace">'
1259
            # f'Differ: {self.__kind__} for {data_type}',
1260
            f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}{old_data}</span>',
1261
            f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}{new_data}'
1262
            '</span>',
1263
            '</span>',
1264
            'New image:',
1265
        ]
1266
        if data_type == 'url':
2✔
1267
            htm.append(f'<img src="{self.state.old_data}" style="max-width: 100%; display: block;">')
2✔
1268
        else:
1269
            htm.append(
2✔
1270
                f'<img src="data:image/{(new_image.format or "").lower()};base64,{encoded_new}" '
1271
                'style="max-width: 100%; display: block;">'
1272
            )
1273
        htm.extend(
2✔
1274
            [
1275
                'Differences from old (in yellow):',
1276
                f'<img src="data:image/{(diff_image.format or "").lower()};base64,{encoded_diff}" '
1277
                'style="max-width: 100%; display: block;">',
1278
            ]
1279
        )
1280
        changed_text = 'The image has changed; please see an HTML report for the visualization.'
2✔
1281
        if not summary:
2!
1282
            return {
2✔
1283
                'text': changed_text,
1284
                'markdown': changed_text,
1285
                'html': '<br>\n'.join(htm),
1286
            }
1287

1288
        newline = '\n'  # For Python < 3.12 f-string {} compatibility
×
1289
        back_n = '\\n'  # For Python < 3.12 f-string {} compatibility
×
1290
        directives_text = (
×
1291
            ', '.join(
1292
                f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.get('ai_google', {}).items()
1293
            )
1294
            or 'None'
1295
        )
1296
        footer = f'Summary generated by Google Generative AI (ai_google directive(s): {directives_text})'
×
1297
        return {
×
1298
            'text': (
1299
                f'{summary}\n\n\nA visualization of differences is available in {__package__} HTML reports.'
1300
                f'\n------------\n{footer}'
1301
            ),
1302
            'markdown': (
1303
                f'{summary}\n\n\nA visualization of differences is available in {__package__} HTML reports.'
1304
                f'\n* * *\n{footer}'
1305
            ),
1306
            'html': '<br>\n'.join(
1307
                [
1308
                    mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>'),
1309
                    '',
1310
                ]
1311
                + htm
1312
                + [
1313
                    '-----',
1314
                    f'<i><small>{footer}</small></i>',
1315
                ]
1316
            ),
1317
        }
1318

1319

1320
class AIGoogleDiffer(DifferBase):
8✔
1321
    """(Default) Generates a summary using Google Generative AI (Gemini models).
1322

1323
    Calls Google Gemini APIs; documentation at https://ai.google.dev/api/rest and tutorial at
1324
    https://ai.google.dev/tutorials/rest_quickstart
1325

1326
    """
1327

1328
    __kind__ = 'ai_google'
8✔
1329

1330
    __supported_directives__ = {
8✔
1331
        'model': ('model name from https://ai.google.dev/gemini-api/docs/models/gemini (default: gemini-2.0-flash)'),
1332
        'system_instructions': (
1333
            'Optional tone and style instructions for the model (default: see documentation at'
1334
            'https://webchanges.readthedocs.io/en/stable/differs.html#ai-google-diff)'
1335
        ),
1336
        'prompt': 'a custom prompt - {unified_diff}, {unified_diff_new}, {old_text} and {new_text} will be replaced',
1337
        'additions_only': 'summarizes only added lines (including as a result of a change)',
1338
        'prompt_ud_context_lines': 'the number of context lines for {unified_diff} (default: 9999)',
1339
        'timeout': 'the number of seconds before timing out the API call (default: 300)',
1340
        'max_output_tokens': "the maximum number of tokens returned by the model (default: None, i.e. model's default)",
1341
        'temperature': "the model's Temperature parameter (default: 0.0)",
1342
        'top_p': "the model's TopP parameter (default: None, i.e. model's default",
1343
        'top_k': "the model's TopK parameter (default: None, i.e. model's default",
1344
        'tools': "data passed on to the API's 'tools' field (default: None)",
1345
        'unified': 'directives passed to the unified differ (default: None)',
1346
    }
1347
    __default_directive__ = 'model'
8✔
1348

1349
    @staticmethod
8✔
1350
    def _send_to_model(
8✔
1351
        job: JobBase,
1352
        system_instructions: str,
1353
        model_prompt: str,
1354
        additional_parts: list[dict[str, str | dict[str, str]]] | None = None,
1355
        directives: AiGoogleDirectives | None = None,
1356
    ) -> tuple[str, str]:
1357
        """Creates the summary request to the model; returns the summary and the version of the actual model used."""
1358
        api_version = '1beta'
×
1359
        if directives is None:
×
1360
            directives = {}
×
1361
        model = directives.get('model', 'gemini-2.0-flash')
×
1362
        timeout = directives.get('timeout', 300)
×
1363
        max_output_tokens = directives.get('max_output_tokens')
×
1364
        temperature = directives.get('temperature', 0.0)
×
1365
        top_p = directives.get('top_p', 1.0 if temperature == 0.0 else None)
×
1366
        top_k = directives.get('top_k')
×
1367
        GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
×
1368
        if len(GOOGLE_AI_API_KEY) != 39:
×
1369
            logger.error(
×
1370
                f'Job {job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1371
                f'incorrect length {len(GOOGLE_AI_API_KEY)} ({job.get_location()})'
1372
            )
1373
            return (
×
1374
                f'## ERROR in summarizing changes using Google AI:\n'
1375
                f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1376
                f'{len(GOOGLE_AI_API_KEY)}.\n',
1377
                '',
1378
            )
1379

1380
        data: dict[str, Any] = {
×
1381
            'system_instruction': {'parts': [{'text': system_instructions}]},
1382
            'contents': [{'parts': [{'text': model_prompt}]}],
1383
            'generation_config': {
1384
                'max_output_tokens': max_output_tokens,
1385
                'temperature': temperature,
1386
                'top_p': top_p,
1387
                'top_k': top_k,
1388
            },
1389
        }
1390
        if additional_parts:
×
1391
            data['contents'][0]['parts'].extend(additional_parts)
×
1392
        if directives.get('tools'):
×
1393
            data['tools'] = directives['tools']
×
1394
        logger.info(f'Job {job.index_number}: Making the content generation request to Google AI model {model}')
×
1395
        model_version = model  # default
×
1396
        try:
×
1397
            r = httpx.Client(http2=True).post(  # noqa: S113 Call to httpx without timeout
×
1398
                f'https://generativelanguage.googleapis.com/v{api_version}/models/{model}:generateContent?'
1399
                f'key={GOOGLE_AI_API_KEY}',
1400
                json=data,
1401
                headers={'Content-Type': 'application/json'},
1402
                timeout=timeout,
1403
            )
1404
            if r.is_success:
×
1405
                result = r.json()
×
1406
                candidate = result['candidates'][0]
×
1407
                logger.info(f"Job {job.index_number}: AI generation finished by {candidate['finishReason']}")
×
1408
                if 'content' in candidate:
×
1409
                    summary: str = candidate['content']['parts'][0]['text'].rstrip()
×
1410
                else:
1411
                    summary = (
×
1412
                        f'AI summary unavailable: Model did not return any candidate output:\n'
1413
                        f'{jsonlib.dumps(result, ensure_ascii=True, indent=2)}'
1414
                    )
1415
                model_version = result['modelVersion']
×
1416

1417
            elif r.status_code == 400:
×
1418
                summary = (
×
1419
                    f'AI summary unavailable: Received error from {r.url.host}: '
1420
                    f"{r.json().get('error', {}).get('message') or ''}"
1421
                )
1422
            else:
1423
                summary = (
×
1424
                    f'AI summary unavailable: Received error {r.status_code} {r.reason_phrase} from ' f'{r.url.host}'
1425
                )
1426
                if r.content:
×
1427
                    summary += f": {r.json().get('error', {}).get('message') or ''}"
×
1428

1429
        except httpx.HTTPError as e:
×
1430
            summary = (
×
1431
                f'AI summary unavailable: HTTP client error: {e} when requesting data from ' f'{e.request.url.host}'
1432
            )
1433

1434
        return summary, model_version
×
1435

1436
    def differ(
8✔
1437
        self,
1438
        directives: AiGoogleDirectives,  # type: ignore[override]
1439
        report_kind: Literal['text', 'markdown', 'html'],
1440
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1441
        tz: ZoneInfo | None = None,
1442
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1443
        logger.info(f'Job {self.job.index_number}: Running the {self.__kind__} differ from hooks.py')
8✔
1444
        warnings.warn(
8✔
1445
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1446
            f'change in the future. Please report any problems or suggestions at '
1447
            f'https://github.com/mborsetti/webchanges/discussions.',
1448
            RuntimeWarning,
1449
        )
1450

1451
        def get_ai_summary(prompt: str, system_instructions: str) -> tuple[str, str]:
8✔
1452
            """Generate AI summary from unified diff, or an error message, plus the model version."""
1453
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
8✔
1454
            if len(GOOGLE_AI_API_KEY) != 39:
8✔
1455
                logger.error(
8✔
1456
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1457
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1458
                )
1459
                return (
8✔
1460
                    f'## ERROR in summarizing changes using {self.__kind__}:\n'
1461
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1462
                    f'{len(GOOGLE_AI_API_KEY)}.\n',
1463
                    '',
1464
                )
1465

1466
            if '{unified_diff' in prompt:  # matches unified_diff or unified_diff_new
8!
1467
                default_context_lines = 9999 if '{unified_diff}' in prompt else 0  # none if only unified_diff_new
×
1468
                context_lines = directives.get('prompt_ud_context_lines', default_context_lines)
×
1469
                unified_diff = '\n'.join(
×
1470
                    difflib.unified_diff(
1471
                        str(self.state.old_data).splitlines(),
1472
                        str(self.state.new_data).splitlines(),
1473
                        # '@',
1474
                        # '@',
1475
                        # self.make_timestamp(self.state.old_timestamp, tz),
1476
                        # self.make_timestamp(self.state.new_timestamp, tz),
1477
                        n=context_lines,
1478
                    )
1479
                )
1480
                if not unified_diff:
×
1481
                    # no changes
1482
                    return '', ''
×
1483
            else:
1484
                unified_diff = ''
8✔
1485

1486
            if '{unified_diff_new}' in prompt:
8!
1487
                unified_diff_new_lines = []
×
1488
                for line in unified_diff.splitlines():
×
1489
                    if line.startswith('+'):
×
1490
                        unified_diff_new_lines.append(line[1:])
×
1491
                unified_diff_new = '\n'.join(unified_diff_new_lines)
×
1492
            else:
1493
                unified_diff_new = ''
8✔
1494

1495
            # check if data is different (same data is sent during testing)
1496
            if '{old_text}' in prompt and '{new_text}' in prompt and self.state.old_data == self.state.new_data:
8!
1497
                return '', ''
8✔
1498

1499
            model_prompt = prompt.format(
×
1500
                unified_diff=unified_diff,
1501
                unified_diff_new=unified_diff_new,
1502
                old_text=self.state.old_data,
1503
                new_text=self.state.new_data,
1504
            )
1505

1506
            summary, model_version = self._send_to_model(
×
1507
                self.job,
1508
                system_instructions,
1509
                model_prompt,
1510
                directives=directives,
1511
            )
1512

1513
            return summary, model_version
×
1514

1515
        if directives.get('additions_only') or self.job.additions_only:
8!
1516
            default_system_instructions = (
×
1517
                'You are a skilled journalist. Your task is to summarize the provided text in a clear and concise '
1518
                'manner. Restrict your analysis and summary *only* to the text provided. Do not introduce any '
1519
                'external information or assumptions.\n\n'
1520
                'Format your summary using Markdown. Use headings, bullet points, and other Markdown elements where '
1521
                'appropriate to create a well-structured and easily readable summary.'
1522
            )
1523
            default_prompt = '{unified_diff_new}'
×
1524
        else:
1525
            default_system_instructions = (
8✔
1526
                'You are a skilled journalist tasked with analyzing two versions of a text and summarizing the key '
1527
                'differences in meaning between them. The audience for your summary is already familiar with the '
1528
                "text's content, so you can focus on the most significant changes.\n\n"
1529
                '**Instructions:**\n\n'
1530
                '1. Carefully examine the old version of the text, provided within the `<old_version>` and '
1531
                '`</old_version>` tags.\n'
1532
                '2. Carefully examine the new version of the text, provided within the `<new_version>` and '
1533
                '`</new_version>` tags.\n'
1534
                '3. Compare the two versions, identifying areas where the meaning differs. This includes additions, '
1535
                'removals, or alterations that change the intended message or interpretation.\n'
1536
                '4. Ignore changes that do not affect the overall meaning, even if the wording has been modified.\n'
1537
                '5. Summarize the identified differences, except those ignored, in a clear and concise manner, '
1538
                'explaining how the meaning has shifted or evolved in the new version compared to the old version only '
1539
                'when necessary. Be specific and provide examples to illustrate your points when needed.\n'
1540
                '6. If there are only additions to the text, then summarize the additions.\n'
1541
                '7. Use Markdown formatting to structure your summary effectively. Use headings, bullet points, '
1542
                'and other Markdown elements as needed to enhance readability.\n'
1543
                '8. Restrict your analysis and summary to the information provided within the `<old_version>` and '
1544
                '`<new_version>` tags. Do not introduce external information or assumptions.\n'
1545
            )
1546
            default_prompt = '<old_version>\n{old_text}\n</old_version>\n\n<new_version>\n{new_text}\n</new_version>'
8✔
1547
        system_instructions = directives.get('system_instructions', default_system_instructions)
8✔
1548
        prompt = directives.get('prompt', default_prompt).replace('\\n', '\n')
8✔
1549
        summary, model_version = get_ai_summary(prompt, system_instructions)
8✔
1550
        if not summary:
8✔
1551
            self.state.verb = 'changed,no_report'
8✔
1552
            return {'text': '', 'markdown': '', 'html': ''}
8✔
1553
        newline = '\n'  # For Python < 3.12 f-string {} compatibility
8✔
1554
        back_n = '\\n'  # For Python < 3.12 f-string {} compatibility
8✔
1555
        directives.pop('model', None)
8✔
1556
        if directives:
8!
1557
            directives_text = (
×
1558
                ' (differ directive(s): '
1559
                + (
1560
                    ', '.join(f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.items())
1561
                    or 'None'
1562
                )
1563
                + ')'
1564
            )
1565
        else:
1566
            directives_text = ''
8✔
1567
        footer = (
8✔
1568
            f"Summary by Google Generative AI's model {model_version}{directives_text}"
1569
            if model_version and directives_text
1570
            else ''
1571
        )
1572
        temp_unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
1573
        for rep_kind in ['text', 'html']:  # markdown is same as text
8✔
1574
            unified_report = DifferBase.process(
8✔
1575
                'unified',
1576
                directives.get('unified') or {},  # type: ignore[arg-type]
1577
                self.state,
1578
                rep_kind,  # type: ignore[arg-type]
1579
                tz,
1580
                temp_unfiltered_diff,
1581
            )
1582
        return {
8✔
1583
            'text': f"{summary}\n\n{unified_report['text']}" + (f'\n------------\n{footer}' if footer else ''),
1584
            'markdown': f"{summary}\n\n{unified_report['markdown']}" + (f'\n* * *\n{footer}' if footer else ''),
1585
            'html': '\n'.join(
1586
                [
1587
                    mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>'),
1588
                    '<br>',
1589
                    '<br>',
1590
                    unified_report['html'],
1591
                ]
1592
                + (['-----<br>', f'<i><small>{footer}</small></i>'] if footer else [])
1593
            ),
1594
        }
1595

1596

1597
class WdiffDiffer(DifferBase):
8✔
1598
    __kind__ = 'wdiff'
8✔
1599

1600
    __supported_directives__: dict[str, str] = {
8✔
1601
        'context_lines': 'the number of context lines (default: 3)',
1602
        'range_info': 'include range information lines (default: true)',
1603
    }
1604

1605
    def differ(
8✔
1606
        self,
1607
        directives: dict[str, Any],
1608
        report_kind: Literal['text', 'markdown', 'html'],
1609
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1610
        tz: ZoneInfo | None = None,
1611
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1612
        warnings.warn(
8✔
1613
            f'Job {self.job.index_number}: Differ {self.__kind__} is WORK IN PROGRESS and has KNOWN bugs which '
1614
            "are being worked on. DO NOT USE AS THE RESULTS WON'T BE CORRECT.",
1615
            RuntimeWarning,
1616
        )
1617
        if not isinstance(self.state.old_data, str):
8!
1618
            raise ValueError
×
1619
        if not isinstance(self.state.new_data, str):
8!
1620
            raise ValueError
×
1621

1622
        # Split the texts into words tokenizing newline
1623
        if self.state.is_markdown():
8!
1624
            # Don't split spaces in link text, tokenize space as </s>
1625
            old_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.old_data)
8✔
1626
            words1 = old_data.replace('\n', ' <\\n> ').split(' ')
8✔
1627
            new_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.new_data)
8✔
1628
            words2 = new_data.replace('\n', ' <\\n> ').split(' ')
8✔
1629
        else:
1630
            words1 = self.state.old_data.replace('\n', ' <\\n> ').split(' ')
×
1631
            words2 = self.state.new_data.replace('\n', ' <\\n> ').split(' ')
×
1632

1633
        # Create a Differ object
1634
        import difflib
8✔
1635

1636
        d = difflib.Differ()
8✔
1637

1638
        # Generate a difference list
1639
        diff = list(d.compare(words1, words2))
8✔
1640

1641
        add_html = '<span style="background-color:#d1ffd1;color:#082b08;">'
8✔
1642
        rem_html = '<span style="background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;">'
8✔
1643

1644
        head_text = '\n'.join(
8✔
1645
            [
1646
                # f'Differ: wdiff',
1647
                f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m',
1648
                f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m',
1649
                '',
1650
            ]
1651
        )
1652
        head_html = '<br>\n'.join(
8✔
1653
            [
1654
                '<span style="font-family:monospace;">'
1655
                # 'Differ: wdiff',
1656
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
1657
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>'
1658
                f'</span>',
1659
                '',
1660
            ]
1661
        )
1662
        # Process the diff output to make it more wdiff-like
1663
        result_text = []
8✔
1664
        result_html = []
8✔
1665
        prev_word_text = ''
8✔
1666
        prev_word_html = ''
8✔
1667
        next_text = ''
8✔
1668
        next_html = ''
8✔
1669
        add = False
8✔
1670
        rem = False
8✔
1671

1672
        for word_text in diff + ['  ']:
8✔
1673
            if word_text[0] == '?':  # additional context line
8✔
1674
                continue
8✔
1675
            word_html = word_text
8✔
1676
            pre_text = [next_text] if next_text else []
8✔
1677
            pre_html = [next_html] if next_html else []
8✔
1678
            next_text = ''
8✔
1679
            next_html = ''
8✔
1680

1681
            if word_text[0] == '+' and not add:  # Beginning of additions
8✔
1682
                if rem:
8✔
1683
                    prev_word_html += '</span>'
8✔
1684
                    rem = False
8✔
1685
                if word_text[2:] == '<\\n>':
8!
1686
                    next_text = '\033[92m'
×
1687
                    next_html = add_html
×
1688
                else:
1689
                    pre_text.append('\033[92m')
8✔
1690
                    pre_html.append(add_html)
8✔
1691
                add = True
8✔
1692
            elif word_text[0] == '-' and not rem:  # Beginning of deletions
8✔
1693
                if add:
8✔
1694
                    prev_word_html += '</span>'
8✔
1695
                    add = False
8✔
1696
                if word_text[2:] == '<\\n>':
8!
1697
                    next_text = '\033[91m'
×
1698
                    next_html = rem_html
×
1699
                else:
1700
                    pre_text.append('\033[91m')
8✔
1701
                    pre_html.append(rem_html)
8✔
1702
                rem = True
8✔
1703
            elif word_text[0] == ' ' and (add or rem):  # Unchanged word
8✔
1704
                if prev_word_text == '<\\n>':
8!
1705
                    prev_word_text = '\033[0m<\\n>'
×
1706
                    prev_word_html = '</span><\\n>'
×
1707
                else:
1708
                    prev_word_text += '\033[0m'
8✔
1709
                    prev_word_html += '</span>'
8✔
1710
                add = False
8✔
1711
                rem = False
8✔
1712
            elif word_text[2:] == '<\\n>':  # New line
8✔
1713
                if add:
8!
1714
                    word_text = '  \033[0m<\\n>'
×
1715
                    word_html = '  </span><\\n>'
×
1716
                    add = False
×
1717
                elif rem:
8!
1718
                    word_text = '  \033[0m<\\n>'
×
1719
                    word_html = '  </span><\\n>'
×
1720
                    rem = False
×
1721

1722
            result_text.append(prev_word_text)
8✔
1723
            result_html.append(prev_word_html)
8✔
1724
            pre_text.append(word_text[2:])
8✔
1725
            pre_html.append(word_html[2:])
8✔
1726
            prev_word_text = ''.join(pre_text)
8✔
1727
            prev_word_html = ''.join(pre_html)
8✔
1728
        if add or rem:
8!
1729
            result_text[-1] += '\033[0m'
×
1730
            result_html[-1] += '</span>'
×
1731

1732
        # rebuild the text from words, replacing the newline token
1733
        diff_text = ' '.join(result_text[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1734
        diff_html = ' '.join(result_html[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1735

1736
        # build contextlines
1737
        contextlines = directives.get('context_lines', self.job.contextlines)
8✔
1738
        # contextlines = 999
1739
        if contextlines is None:
8!
1740
            contextlines = 3
8✔
1741
        range_info = directives.get('range_info', True)
8✔
1742
        if contextlines < len(diff_text.splitlines()):
8!
1743
            lines_with_changes = []
×
1744
            for i, line in enumerate(diff_text.splitlines()):
×
1745
                if '\033[9' in line:
×
1746
                    lines_with_changes.append(i)
×
1747
            if contextlines:
×
1748
                lines_to_keep: set[int] = set()
×
1749
                for i in lines_with_changes:
×
1750
                    lines_to_keep.update(r for r in range(i - contextlines, i + contextlines + 1))
×
1751
            else:
1752
                lines_to_keep = set(lines_with_changes)
×
1753
            new_diff_text = []
×
1754
            new_diff_html = []
×
1755
            last_line = 0
×
1756
            skip = False
×
1757
            i = 0
×
1758
            for i, (line_text, line_html) in enumerate(zip(diff_text.splitlines(), diff_html.splitlines())):
×
1759
                if i in lines_to_keep:
×
1760
                    if range_info and skip:
×
1761
                        new_diff_text.append(f'@@ {last_line + 1}...{i} @@')
×
1762
                        new_diff_html.append(f'@@ {last_line + 1}...{i} @@')
×
1763
                        skip = False
×
1764
                    new_diff_text.append(line_text)
×
1765
                    new_diff_html.append(line_html)
×
1766
                    last_line = i + 1
×
1767
                else:
1768
                    skip = True
×
1769
            if (i + 1) != last_line:
×
1770
                if range_info and skip:
×
1771
                    new_diff_text.append(f'@@ {last_line + 1}...{i + 1} @@')
×
1772
                    new_diff_html.append(f'@@ {last_line + 1}...{i + 1} @@')
×
1773
            diff_text = '\n'.join(new_diff_text)
×
1774
            diff_html = '\n'.join(new_diff_html)
×
1775

1776
        if self.state.is_markdown():
8!
1777
            diff_text = diff_text.replace('</s>', ' ')
8✔
1778
            diff_html = diff_html.replace('</s>', ' ')
8✔
1779
            diff_html = mark_to_html(diff_html, self.job.markdown_padded_tables).replace('<p>', '').replace('</p>', '')
8✔
1780

1781
        if self.job.monospace:
8!
1782
            diff_html = f'<span style="font-family:monospace;white-space:pre-wrap">{diff_html}</span>'
×
1783
        else:
1784
            diff_html = diff_html.replace('\n', '<br>\n')
8✔
1785

1786
        return {
8✔
1787
            'text': head_text + diff_text,
1788
            'markdown': head_text + diff_text,
1789
            'html': head_html + diff_html,
1790
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc