• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 13814211410

12 Mar 2025 02:43PM UTC coverage: 75.448% (-0.01%) from 75.46%
13814211410

push

github

mborsetti
Version 3.29.0rc1

1739 of 2631 branches covered (66.1%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 2 files covered. (100.0%)

108 existing lines in 1 file now uncovered.

4573 of 5735 relevant lines covered (79.74%)

6.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.72
/webchanges/differs.py
1
"""Differs."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import base64
8✔
8
import difflib
8✔
9
import html
8✔
10
import io
8✔
11
import logging
8✔
12
import math
8✔
13
import os
8✔
14
import re
8✔
15
import shlex
8✔
16
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
17
import tempfile
8✔
18
import traceback
8✔
19
import urllib.parse
8✔
20
import warnings
8✔
21
from base64 import b64encode
8✔
22
from concurrent.futures import ThreadPoolExecutor
8✔
23
from datetime import datetime
8✔
24
from io import BytesIO
8✔
25
from pathlib import Path
8✔
26
from typing import Any, Iterator, Literal, TYPE_CHECKING, TypedDict
8✔
27
from zoneinfo import ZoneInfo
8✔
28

29
import html2text
8✔
30

31
from webchanges.jobs import JobBase
8✔
32
from webchanges.util import linkify, mark_to_html, TrackSubClasses
8✔
33

34
try:
8✔
35
    from deepdiff import DeepDiff
8✔
36
    from deepdiff.model import DiffLevel
8✔
37
except ImportError as e:  # pragma: no cover
38
    DeepDiff = str(e)  # type: ignore[assignment,misc]
39

40
try:
8✔
41
    import httpx
8✔
42
except ImportError:  # pragma: no cover
43
    httpx = None  # type: ignore[assignment]
44
if httpx is not None:
8!
45
    try:
8✔
46
        import h2
8✔
47
    except ImportError:  # pragma: no cover
48
        h2 = None  # type: ignore[assignment]
49

50
try:
8✔
51
    import numpy as np
8✔
52
except ImportError as e:  # pragma: no cover
53
    np = str(e)  # type: ignore[assignment]
54

55
try:
8✔
56
    from PIL import Image, ImageChops, ImageEnhance, ImageStat
8✔
57
except ImportError as e:  # pragma: no cover
58
    Image = str(e)  # type: ignore[assignment]
59

60
# https://stackoverflow.com/questions/712791
61
try:
8✔
62
    import simplejson as jsonlib
8✔
63
except ImportError:  # pragma: no cover
64
    import json as jsonlib  # type: ignore[no-redef]
65

66
try:
8✔
67
    import xmltodict
8✔
68
except ImportError as e:  # pragma: no cover
69
    xmltodict = str(e)  # type: ignore[no-redef]
70

71
# https://stackoverflow.com/questions/39740632
72
if TYPE_CHECKING:
73
    from webchanges.handler import JobState
74
    from webchanges.storage import _Config
75

76

77
logger = logging.getLogger(__name__)
8✔
78

79
AiGoogleDirectives = TypedDict(
8✔
80
    'AiGoogleDirectives',
81
    {
82
        'model': str,
83
        'additions_only': str,
84
        'system_instructions': str,
85
        'prompt': str,
86
        'prompt_ud_context_lines': int,
87
        'timeout': int,
88
        'max_output_tokens': int | None,
89
        'temperature': float | None,
90
        'top_p': float | None,
91
        'top_k': float | None,
92
        'tools': list[Any],
93
    },
94
    total=False,
95
)
96

97

98
class DifferBase(metaclass=TrackSubClasses):
8✔
99
    """The base class for differs."""
100

101
    __subclasses__: dict[str, type[DifferBase]] = {}
8✔
102
    __anonymous_subclasses__: list[type[DifferBase]] = []
8✔
103

104
    __kind__: str = ''
8✔
105

106
    __supported_directives__: dict[str, str] = {}  # this must be present, even if empty
8✔
107

108
    css_added_style = 'background-color:#d1ffd1;color:#082b08;'
8✔
109
    css_deltd_style = 'background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;'
8✔
110

111
    def __init__(self, state: JobState) -> None:
8✔
112
        """
113

114
        :param state: the JobState.
115
        """
116
        self.job = state.job
8✔
117
        self.state = state
8✔
118

119
    @classmethod
8✔
120
    def differ_documentation(cls) -> str:
8✔
121
        """Generates simple differ documentation for use in the --features command line argument.
122

123
        :returns: A string to display.
124
        """
125
        result: list[str] = []
8✔
126
        for sc in TrackSubClasses.sorted_by_kind(cls):
8✔
127
            # default_directive = getattr(sc, '__default_directive__', None)
128
            result.extend((f'  * {sc.__kind__} - {sc.__doc__}',))
8✔
129
            if hasattr(sc, '__supported_directives__'):
8!
130
                for key, doc in sc.__supported_directives__.items():
8✔
131
                    result.append(f'      {key} ... {doc}')
8✔
132
        result.append('\n[] ... Parameter can be supplied as unnamed value\n')
8✔
133
        return '\n'.join(result)
8✔
134

135
    @classmethod
8✔
136
    def normalize_differ(
8✔
137
        cls,
138
        differ_spec: dict[str, Any] | None,
139
        job_index_number: int | None = None,
140
        config: _Config | None = None,
141
    ) -> tuple[str, dict[str, Any]]:
142
        """Checks the differ_spec for its validity and applies default values.
143

144
        :param differ_spec: The differ as entered by the user; use "unified" if empty.
145
        :param job_index_number: The job index number.
146
        :returns: A validated differ_kind, directives tuple.
147
        """
148

149
        def directives_with_defaults(
8✔
150
            differ_spec: str, directives: dict[str, Any], config: _Config | None = None
151
        ) -> dict[str, Any]:
152
            """Obtain differ subdirectives that also contains defaults from the configuration.
153

154
            :param differ_kind: The differ kind.
155
            :param directives: The differ directives as stated in the job.
156
            :returns: directives inclusive of configuration defaults.
157
            """
158
            if config is None:
8!
159
                logger.error('Cannot merge differ differdirectives with defaults as no config object was passed')
8✔
160
                return directives
8✔
161
            cfg = config.get('differ_defaults')
×
162
            if isinstance(cfg, dict):
×
163
                defaults: dict[str, Any] = cfg.get(differ_spec)  # type: ignore[assignment]
×
164
                if defaults:
×
165
                    for key, value in defaults.items():
×
166
                        if key not in directives:
×
167
                            directives[key] = value
×
168
            return directives
×
169

170
        differ_spec = differ_spec or {'name': 'unified'}
8✔
171
        directives = differ_spec.copy()
8✔
172
        differ_kind = directives.pop('name', '')
8✔
173
        if not differ_kind:
8✔
174
            if list(directives.keys()) == ['command']:
8!
175
                differ_kind = 'command'
8✔
176
            else:
177
                raise ValueError(
×
178
                    f"Job {job_index_number}: Differ directive must have a 'name' sub-directive: {differ_spec}."
179
                )
180

181
        differcls: DifferBase | None = cls.__subclasses__.get(differ_kind, None)  # type: ignore[assignment]
8✔
182
        if not differcls:
8✔
183
            raise ValueError(f'Job {job_index_number}: No differ named {differ_kind}.')
8✔
184

185
        if directives:
8✔
186
            directives = directives_with_defaults(differ_kind, directives, config)
8✔
187

188
        if hasattr(differcls, '__supported_directives__'):
8!
189
            provided_keys = set(directives.keys())
8✔
190
            allowed_keys = set(differcls.__supported_directives__.keys())
8✔
191
            unknown_keys = provided_keys.difference(allowed_keys)
8✔
192
            if unknown_keys and '<any>' not in allowed_keys:
8✔
193
                raise ValueError(
8✔
194
                    f'Job {job_index_number}: Differ {differ_kind} does not support sub-directive(s) '
195
                    f"{', '.join(unknown_keys)} (supported: {', '.join(sorted(allowed_keys))})."
196
                )
197

198
        return differ_kind, directives
8✔
199

200
    @classmethod
8✔
201
    def process(
8✔
202
        cls,
203
        differ_kind: str,
204
        directives: dict[str, Any],
205
        job_state: JobState,
206
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
207
        tz: ZoneInfo | None = None,
208
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
209
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
210
        """Process the differ.
211

212
        :param differ_kind: The name of the differ.
213
        :param directives: The directives.
214
        :param job_state: The JobState.
215
        :param report_kind: The report kind required.
216
        :param tz: The timezone of the report.
217
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
218
           for a different report_kind.
219
        :returns: The output of the differ or an error message with traceback if it fails.
220
        """
221
        logger.info(f'Job {job_state.job.index_number}: Applying differ {differ_kind}, directives {directives}')
8✔
222
        differcls: type[DifferBase] | None = cls.__subclasses__.get(differ_kind)  # type: ignore[assignment]
8✔
223
        if differcls:
8✔
224
            try:
8✔
225
                return differcls(job_state).differ(directives, report_kind, _unfiltered_diff, tz)
8✔
226
            except Exception as e:
8✔
227
                # Differ failed
228
                logger.info(
8✔
229
                    f'Job {job_state.job.index_number}: Differ {differ_kind} with {directives=} encountered '
230
                    f'error {e}'
231
                )
232
                # Undo saving of new data since user won't see the diff
233
                job_state.delete_latest()
8✔
234

235
                job_state.exception = e
8✔
236
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
237
                directives_text = ', '.join(f'{key}={value}' for key, value in directives.items()) or 'None'
8✔
238
                return {
8✔
239
                    'text': (
240
                        f'Differ {differ_kind} with directive(s) {directives_text} encountered an '
241
                        f'error:\n\n{job_state.traceback}'
242
                    ),
243
                    'markdown': (
244
                        f'## Differ {differ_kind} with directive(s) {directives_text} '
245
                        f'encountered an error:\n```\n{job_state.traceback}\n```\n'
246
                    ),
247
                    'html': (
248
                        f'<span style="color:red;font-weight:bold">Differ {differ_kind} with directive(s) '
249
                        f'{directives_text} encountered an error:<br>\n<br>\n'
250
                        f'<span style="font-family:monospace;white-space:pre-wrap;">{job_state.traceback}'
251
                        f'</span></span>'
252
                    ),
253
                }
254
        else:
255
            return {}
8✔
256

257
    def differ(
8✔
258
        self,
259
        directives: dict[str, Any],
260
        report_kind: Literal['text', 'markdown', 'html'],
261
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
262
        tz: ZoneInfo | None = None,
263
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
264
        """Generate a formatted diff representation of data changes.
265

266
        Creates a diff representation in one or more output formats (text, markdown, or HTML).
267
        At minimum, this function must return output in the format specified by 'report_kind'.
268
        As results are memoized for performance optimization, it can generate up to all three formats simultaneously.
269

270
        :param state: The JobState.
271

272
        :param directives: The directives.
273
        :param report_kind: The report_kind for which a diff must be generated (at a minimum).
274
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
275
           for a different report_kind.
276
        :param tz: The timezone of the report.
277
        :returns: An empty dict if there is no change, otherwise a dict with report_kind as key and diff as value
278
           (as a minimum for the report_kind requested).
279
        :raises RuntimeError: If the external diff tool returns an error.
280
        """
281
        raise NotImplementedError()
282

283
    @staticmethod
8✔
284
    def make_timestamp(
8✔
285
        timestamp: float,
286
        tz: ZoneInfo | None = None,
287
    ) -> str:
288
        """Format a timestamp as an RFC 5322 compliant datetime string.
289

290
        Converts a numeric timestamp to a formatted datetime string following the RFC 5322 (email) standard. When a
291
        timezone is provided, its full name (abbreviation), if known, is appended.
292

293
        :param timestamp: The timestamp.
294
        :param tz: The IANA timezone of the report.
295
        :returns: A datetime string in RFC 5322 (email) format or 'NEW' if timestamp is 0.
296
        """
297
        if timestamp:
8✔
298
            dt = datetime.fromtimestamp(timestamp).astimezone(tz=tz)
8✔
299
            # add timezone name if known
300
            if dt.strftime('%Z') != dt.strftime('%z')[:3]:
8✔
301
                cfws = f" ({dt.strftime('%Z')})"
8✔
302
            else:
303
                cfws = ''
8✔
304
            return dt.strftime('%a, %d %b %Y %H:%M:%S %z') + cfws
8✔
305
        else:
306
            return 'NEW'
8✔
307

308
    @staticmethod
8✔
309
    def html2text(data: str) -> str:
8✔
310
        """Converts html to text.
311

312
        :param data: the string in html format.
313
        :returns: the string in text format.
314
        """
315
        parser = html2text.HTML2Text()
8✔
316
        parser.unicode_snob = True
8✔
317
        parser.body_width = 0
8✔
318
        parser.ignore_images = True
8✔
319
        parser.single_line_break = True
8✔
320
        parser.wrap_links = False
8✔
321
        return '\n'.join(line.rstrip() for line in parser.handle(data).splitlines())
8✔
322

323
    def raise_import_error(self, package_name: str, error_message: str) -> None:
8✔
324
        """Raise ImportError for missing package.
325

326
        :param package_name: The name of the module/package that could not be imported.
327
        :param error_message: The error message from ImportError.
328

329
        :raises: ImportError.
330
        """
331
        raise ImportError(
8✔
332
            f"Job {self.job.index_number}: Python package '{package_name}' is not installed; cannot use "
333
            f"'differ: {self.__kind__}' ({self.job.get_location()})\n{error_message}"
334
        )
335

336

337
class UnifiedDiffer(DifferBase):
8✔
338
    """(Default) Generates a unified diff."""
339

340
    __kind__ = 'unified'
8✔
341

342
    __supported_directives__ = {
8✔
343
        'context_lines': 'the number of context lines (default: 3)',
344
        'range_info': 'include range information lines (default: true)',
345
        'additions_only': 'keep only addition lines (default: false)',
346
        'deletions_only': 'keep only deletion lines (default: false)',
347
    }
348

349
    def unified_diff_to_html(self, diff: str) -> Iterator[str]:
8✔
350
        """
351
        Generates a colorized HTML table from unified diff, applying styles and processing based on job values.
352

353
        :param diff: the unified diff
354
        """
355

356
        def process_line(line: str, line_num: int, is_markdown: bool, monospace_style: str) -> str:
8✔
357
            """
358
            Processes each line for HTML output, handling special cases and styles.
359

360
            :param line: The line to analyze.
361
            :param line_num: The line number in the document.
362
            :param monospace_style: Additional style string for monospace text.
363

364
            :returns: The line processed into an HTML table row string.
365
            """
366
            # The style= string (or empty string) to add to an HTML tag.
367
            if line_num == 0:
8✔
368
                style = 'font-family:monospace;color:darkred;'
8✔
369
            elif line_num == 1:
8✔
370
                style = 'font-family:monospace;color:darkgreen;'
8✔
371
            elif line[0] == '+':  # addition
8✔
372
                style = f'{monospace_style}{self.css_added_style}'
8✔
373
            elif line[0] == '-':  # deletion
8✔
374
                style = f'{monospace_style}{self.css_deltd_style}'
8✔
375
            elif line[0] == ' ':  # context line
8✔
376
                style = monospace_style
8✔
377
            elif line[0] == '@':  # range information
8✔
378
                style = 'font-family:monospace;background-color:#fbfbfb;'
8✔
379
            elif line[0] == '/':  # informational header added by additions_only or deletions_only filters
8!
380
                style = 'background-color:lightyellow;'
8✔
381
            else:
UNCOV
382
                raise RuntimeError('Unified Diff does not comform to standard!')
×
383
            style = f' style="{style}"' if style else ''
8✔
384

385
            if line_num > 1 and line[0] != '@':  # don't apply to headers or range information
8✔
386
                if is_markdown or line[0] == '/':  # our informational header
8✔
387
                    line = mark_to_html(line[1:], self.job.markdown_padded_tables)
8✔
388
                else:
389
                    line = linkify(line[1:])
8✔
390
            return f'<tr><td{style}>{line}</td></tr>'
8✔
391

392
        table_style = (
8✔
393
            ' style="border-collapse:collapse;font-family:monospace;white-space:pre-wrap;"'
394
            if self.job.monospace
395
            else ' style="border-collapse:collapse;"'
396
        )
397
        yield f'<table{table_style}>'
8✔
398
        is_markdown = self.state.is_markdown()
8✔
399
        monospace_style = 'font-family:monospace;' if self.job.monospace else ''
8✔
400
        for i, line in enumerate(diff.splitlines()):
8✔
401
            yield process_line(line, i, is_markdown, monospace_style)
8✔
402
        yield '</table>'
8✔
403

404
    def differ(
8✔
405
        self,
406
        directives: dict[str, Any],
407
        report_kind: Literal['text', 'markdown', 'html'],
408
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
409
        tz: ZoneInfo | None = None,
410
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
411
        additions_only = directives.get('additions_only') or self.job.additions_only
8✔
412
        deletions_only = directives.get('deletions_only') or self.job.deletions_only
8✔
413
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
414
        if report_kind == 'html' and _unfiltered_diff is not None and 'text' in _unfiltered_diff:
8✔
415
            diff_text = _unfiltered_diff['text']
8✔
416
        else:
417
            empty_return: dict[Literal['text', 'markdown', 'html'], str] = {'text': '', 'markdown': '', 'html': ''}
8✔
418
            contextlines = directives.get('context_lines', self.job.contextlines)
8✔
419
            if contextlines is None:
8✔
420
                if additions_only or deletions_only:
8✔
421
                    contextlines = 0
8✔
422
                else:
423
                    contextlines = 3
8✔
424
            diff = list(
8✔
425
                difflib.unified_diff(
426
                    str(self.state.old_data).splitlines(),
427
                    str(self.state.new_data).splitlines(),
428
                    '@',
429
                    '@',
430
                    self.make_timestamp(self.state.old_timestamp, tz),
431
                    self.make_timestamp(self.state.new_timestamp, tz),
432
                    contextlines,
433
                    lineterm='',
434
                )
435
            )
436
            if not diff:
8✔
437
                self.state.verb = 'changed,no_report'
8✔
438
                return empty_return
8✔
439
            # replace tabs in header lines
440
            diff[0] = diff[0].replace('\t', ' ')
8✔
441
            diff[1] = diff[1].replace('\t', ' ')
8✔
442

443
            if additions_only:
8✔
444
                if len(self.state.old_data) and len(self.state.new_data) / len(self.state.old_data) <= 0.25:
8✔
445
                    diff = (
8✔
446
                        diff[:2]
447
                        + ['/**Comparison type: Additions only**']
448
                        + ['/**Deletions are being shown as 75% or more of the content has been deleted**']
449
                        + diff[2:]
450
                    )
451
                else:
452
                    head = '---' + diff[0][3:]
8✔
453
                    diff = [line for line in diff if line.startswith('+') or line.startswith('@')]
8!
454
                    diff = [
8!
455
                        line1
456
                        for line1, line2 in zip([''] + diff, diff + [''])
457
                        if not (line1.startswith('@') and line2.startswith('@'))
458
                    ][1:]
459
                    diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
460
                    if len(diff) == 1 or len([line for line in diff if line.lstrip('+').rstrip()]) == 2:
8!
461
                        self.state.verb = 'changed,no_report'
8✔
462
                        return empty_return
8✔
463
                    diff = [head, diff[0], '/**Comparison type: Additions only**'] + diff[1:]
8✔
464
            elif deletions_only:
8✔
465
                head = '--- @' + diff[1][3:]
8✔
466
                diff = [line for line in diff if line.startswith('-') or line.startswith('@')]
8!
467
                diff = [
8!
468
                    line1
469
                    for line1, line2 in zip([''] + diff, diff + [''])
470
                    if not (line1.startswith('@') and line2.startswith('@'))
471
                ][1:]
472
                diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
473
                if len(diff) == 1 or len([line for line in diff if line.lstrip('-').rstrip()]) == 2:
8!
474
                    self.state.verb = 'changed,no_report'
8✔
475
                    return empty_return
8✔
476
                diff = [diff[0], head, '/**Comparison type: Deletions only**'] + diff[1:]
8✔
477

478
            # remove range info lines if needed
479
            if directives.get('range_info') is False or (
8✔
480
                directives.get('range_info') is None and additions_only and (len(diff) < 4 or diff[3][0] != '/')
481
            ):
482
                diff = [line for line in diff if not line.startswith('@@ ')]
8!
483

484
            diff_text = '\n'.join(diff)
8✔
485

486
            out_diff.update(
8✔
487
                {
488
                    'text': diff_text,
489
                    'markdown': diff_text,
490
                }
491
            )
492

493
        if report_kind == 'html':
8✔
494
            out_diff['html'] = '\n'.join(self.unified_diff_to_html(diff_text))
8✔
495

496
        return out_diff
8✔
497

498

499
class TableDiffer(DifferBase):
8✔
500
    """Generates a Python HTML table diff."""
501

502
    __kind__ = 'table'
8✔
503

504
    __supported_directives__ = {
8✔
505
        'tabsize': 'tab stop spacing (default: 8)',
506
    }
507

508
    def differ(
8✔
509
        self,
510
        directives: dict[str, Any],
511
        report_kind: Literal['text', 'markdown', 'html'],
512
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
513
        tz: ZoneInfo | None = None,
514
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
515
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
516
        if report_kind in {'text', 'markdown'} and _unfiltered_diff is not None and 'html' in _unfiltered_diff:
8✔
517
            table = _unfiltered_diff['html']
8✔
518
        else:
519
            tabsize = int(directives.get('tabsize', 8))
8✔
520
            html_diff = difflib.HtmlDiff(tabsize=tabsize)
8✔
521
            table = html_diff.make_table(
8✔
522
                str(self.state.old_data).splitlines(keepends=True),
523
                str(self.state.new_data).splitlines(keepends=True),
524
                self.make_timestamp(self.state.old_timestamp, tz),
525
                self.make_timestamp(self.state.new_timestamp, tz),
526
                True,
527
                3,
528
            )
529
            # fix table formatting
530
            table = table.replace('<th ', '<th style="font-family:monospace" ')
8✔
531
            table = table.replace('<td ', '<td style="font-family:monospace" ')
8✔
532
            table = table.replace(' nowrap="nowrap"', '')
8✔
533
            table = table.replace('<a ', '<a style="font-family:monospace;color:inherit" ')
8✔
534
            table = table.replace('<span class="diff_add"', '<span style="color:green;background-color:lightgreen"')
8✔
535
            table = table.replace('<span class="diff_sub"', '<span style="color:red;background-color:lightred"')
8✔
536
            table = table.replace('<span class="diff_chg"', '<span style="color:orange;background-color:lightyellow"')
8✔
537
            out_diff['html'] = table
8✔
538

539
        if report_kind in {'text', 'markdown'}:
8✔
540
            diff_text = self.html2text(table)
8✔
541
            out_diff.update(
8✔
542
                {
543
                    'text': diff_text,
544
                    'markdown': diff_text,
545
                }
546
            )
547

548
        return out_diff
8✔
549

550

551
class CommandDiffer(DifferBase):
8✔
552
    """Runs an external command to generate the diff."""
553

554
    __kind__ = 'command'
8✔
555

556
    __supported_directives__ = {
8✔
557
        'command': 'The command to execute',
558
        'is_html': 'Whether the output of the command is HTML',
559
    }
560

561
    re_ptags = re.compile(r'^<p>|</p>$')
8✔
562
    re_htags = re.compile(r'<(/?)h\d>')
8✔
563
    re_tagend = re.compile(r'<(?!.*<).*>+$')
8✔
564

565
    def differ(
8✔
566
        self,
567
        directives: dict[str, Any],
568
        report_kind: Literal['text', 'markdown', 'html'],
569
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
570
        tz: ZoneInfo | None = None,
571
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
572
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
573
        command = directives['command']
8✔
574
        if (
8✔
575
            report_kind == 'html'
576
            and not command.startswith('wdiff')
577
            and _unfiltered_diff is not None
578
            and 'text' in _unfiltered_diff
579
        ):
580
            diff = _unfiltered_diff['text']
8✔
581
        else:
582
            old_data = self.state.old_data
8✔
583
            new_data = self.state.new_data
8✔
584
            if self.state.is_markdown():
8✔
585
                # protect the link anchor from being split (won't work)
586
                markdown_links_re = re.compile(r'\[(.*?)][(](.*?)[)]')
8✔
587
                old_data = markdown_links_re.sub(
8!
588
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(old_data)
589
                )
590
                new_data = markdown_links_re.sub(
8!
591
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(new_data)
592
                )
593

594
            # External diff tool
595
            with tempfile.TemporaryDirectory() as tmp_dir:
8✔
596
                tmp_path = Path(tmp_dir)
8✔
597
                old_file_path = tmp_path.joinpath('old_file')
8✔
598
                new_file_path = tmp_path.joinpath('new_file')
8✔
599
                if isinstance(old_data, str):
8!
600
                    old_file_path.write_text(old_data)
8✔
601
                else:
UNCOV
602
                    old_file_path.write_bytes(old_data)
×
603
                if isinstance(new_data, str):
8!
604
                    new_file_path.write_text(new_data)
8✔
605
                else:
UNCOV
606
                    new_file_path.write_bytes(new_data)
×
607
                cmdline = shlex.split(command) + [str(old_file_path), str(new_file_path)]
8✔
608
                proc = subprocess.run(cmdline, capture_output=True, text=True)  # noqa: S603 subprocess call
8✔
609
            if proc.stderr or proc.returncode > 1:
8✔
610
                raise RuntimeError(
8✔
611
                    f"Job {self.job.index_number}: External differ '{directives}' returned '{proc.stderr.strip()}' "
612
                    f'({self.job.get_location()})'
613
                ) from subprocess.CalledProcessError(proc.returncode, cmdline)
614
            if proc.returncode == 0:
8✔
615
                self.state.verb = 'changed,no_report'
8✔
616
                return {'text': '', 'markdown': '', 'html': ''}
8✔
617
            head = '\n'.join(
8✔
618
                [
619
                    f'Using differ "{directives}"',
620
                    f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}',
621
                    f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}',
622
                ]
623
            )
624
            diff = proc.stdout
8✔
625
            if self.state.is_markdown():
8!
626
                # undo the protection of the link anchor from being split
627
                diff = markdown_links_re.sub(lambda x: f'[{urllib.parse.unquote(x.group(1))}]({x.group(2)})', diff)
8!
628
            if command.startswith('wdiff') and self.job.contextlines == 0:
8!
629
                # remove lines that don't have any changes
UNCOV
630
                keeplines = []
×
UNCOV
631
                for line in diff.splitlines(keepends=True):
×
UNCOV
632
                    if any(x in line for x in {'{+', '+}', '[-', '-]'}):
×
UNCOV
633
                        keeplines.append(line)
×
UNCOV
634
                diff = ''.join(keeplines)
×
635
            diff = f'{head}\n{diff}'
8✔
636
            out_diff.update(
8✔
637
                {
638
                    'text': diff,
639
                    'markdown': diff,
640
                }
641
            )
642

643
        if directives.get('is_html'):
8!
UNCOV
644
            out_diff['html'] = diff
×
645
        elif report_kind == 'html':
8✔
646
            if command.startswith('wdiff'):
8!
647
                # colorize output of wdiff
UNCOV
648
                out_diff['html'] = self.wdiff_to_html(diff)
×
649
            else:
650
                out_diff['html'] = html.escape(diff)
8✔
651

652
        return out_diff
8✔
653

654
    def wdiff_to_html(self, diff: str) -> str:
8✔
655
        """
656
        Colorize output of wdiff.
657

658
        :param diff: The output of the wdiff command.
659
        :returns: The colorized HTML output.
660
        """
661
        html_diff = html.escape(diff)
8✔
662
        if self.state.is_markdown():
8✔
663
            # detect and fix multiline additions or deletions
664
            is_add = False
8✔
665
            is_del = False
8✔
666
            new_diff = []
8✔
667
            for line in html_diff.splitlines():
8✔
668
                if is_add:
8✔
669
                    line = '{+' + line
8✔
670
                    is_add = False
8✔
671
                elif is_del:
8✔
672
                    line = '[-' + line
8✔
673
                    is_del = False
8✔
674
                for match in re.findall(r'\[-|-]|{\+|\+}', line):
8✔
675
                    if match == '[-':
8✔
676
                        is_del = True
8✔
677
                    if match == '-]':
8✔
678
                        is_del = False
8✔
679
                    if match == '{+':
8✔
680
                        is_add = True
8✔
681
                    if match == '+}':
8✔
682
                        is_add = False
8✔
683
                if is_add:
8✔
684
                    line += '+}'
8✔
685
                elif is_del:
8✔
686
                    line += '-]'
8✔
687
                new_diff.append(line)
8✔
688
            html_diff = '<br>\n'.join(new_diff)
8✔
689

690
        # wdiff colorization (cannot be done with global CSS class as Gmail overrides it)
691
        html_diff = re.sub(
8✔
692
            r'\{\+(.*?)\+}',
693
            lambda x: f'<span style="{self.css_added_style}">{x.group(1)}</span>',
694
            html_diff,
695
            flags=re.DOTALL,
696
        )
697
        html_diff = re.sub(
8✔
698
            r'\[-(.*?)-]',
699
            lambda x: f'<span style="{self.css_deltd_style}">{x.group(1)}</span>',
700
            html_diff,
701
            flags=re.DOTALL,
702
        )
703
        if self.job.monospace:
8✔
704
            return f'<span style="font-family:monospace;white-space:pre-wrap">{html_diff}</span>'
8✔
705
        else:
706
            return html_diff
8✔
707

708

709
class DeepdiffDiffer(DifferBase):
8✔
710

711
    __kind__ = 'deepdiff'
8✔
712

713
    __supported_directives__ = {
8✔
714
        'data_type': "either 'json' (default) or 'xml'",
715
        'ignore_order': 'Whether to ignore the order in which the items have appeared (default: false)',
716
        'ignore_string_case': 'Whether to be case-sensitive or not when comparing strings (default: false)',
717
        'significant_digits': (
718
            'The number of digits AFTER the decimal point to be used in the comparison (default: ' 'no limit)'
719
        ),
720
    }
721

722
    def differ(
8✔
723
        self,
724
        directives: dict[str, Any],
725
        report_kind: Literal['text', 'markdown', 'html'],
726
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
727
        tz: ZoneInfo | None = None,
728
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
729
        if isinstance(DeepDiff, str):  # pragma: no cover
730
            self.raise_import_error('deepdiff', DeepDiff)
731

732
        span_added = f'<span style="{self.css_added_style}">'
8✔
733
        span_deltd = f'<span style="{self.css_deltd_style}">'
8✔
734

735
        def _pretty_deepdiff(ddiff: DeepDiff, report_kind: Literal['text', 'markdown', 'html']) -> str:
8✔
736
            """
737
            Customized version of deepdiff.serialization.SerializationMixin.pretty method, edited to include the
738
            values deleted or added and an option for colorized HTML output. The pretty human-readable string
739
            output for the diff object regardless of what view was used to generate the diff.
740
            """
741
            if report_kind == 'html':
8✔
742
                PRETTY_FORM_TEXTS = {
8✔
743
                    'type_changes': (
744
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
745
                        f'from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}</span>.'
746
                    ),
747
                    'values_changed': (
748
                        f'Value of {{diff_path}} changed from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}'
749
                        '</span>.'
750
                    ),
751
                    'dictionary_item_added': (
752
                        f'Item {{diff_path}} added to dictionary as {span_added}{{val_t2}}</span>.'
753
                    ),
754
                    'dictionary_item_removed': (
755
                        f'Item {{diff_path}} removed from dictionary (was {span_deltd}{{val_t1}}</span>).'
756
                    ),
757
                    'iterable_item_added': f'Item {{diff_path}} added to iterable as {span_added}{{val_t2}}</span>.',
758
                    'iterable_item_removed': (
759
                        f'Item {{diff_path}} removed from iterable (was {span_deltd}{{val_t1}}</span>).'
760
                    ),
761
                    'attribute_added': f'Attribute {{diff_path}} added as {span_added}{{val_t2}}</span>.',
762
                    'attribute_removed': f'Attribute {{diff_path}} removed (was {span_deltd}{{val_t1}}</span>).',
763
                    'set_item_added': f'Item root[{{val_t2}}] added to set as {span_added}{{val_t1}}</span>.',
764
                    'set_item_removed': (
765
                        f'Item root[{{val_t1}}] removed from set (was {span_deltd}{{val_t2}}</span>).'
766
                    ),
767
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
768
                }
769
            else:
770
                PRETTY_FORM_TEXTS = {
8✔
771
                    'type_changes': (
772
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
773
                        'from {val_t1} to {val_t2}.'
774
                    ),
775
                    'values_changed': 'Value of {diff_path} changed from {val_t1} to {val_t2}.',
776
                    'dictionary_item_added': 'Item {diff_path} added to dictionary as {val_t2}.',
777
                    'dictionary_item_removed': 'Item {diff_path} removed from dictionary (was {val_t1}).',
778
                    'iterable_item_added': 'Item {diff_path} added to iterable as {val_t2}.',
779
                    'iterable_item_removed': 'Item {diff_path} removed from iterable (was {val_t1}).',
780
                    'attribute_added': 'Attribute {diff_path} added as {val_t2}.',
781
                    'attribute_removed': 'Attribute {diff_path} removed (was {val_t1}).',
782
                    'set_item_added': 'Item root[{val_t2}] added to set as {val_t1}.',
783
                    'set_item_removed': 'Item root[{val_t1}] removed from set (was {val_t2}).',
784
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
785
                }
786

787
            def _pretty_print_diff(ddiff: DiffLevel) -> str:
8✔
788
                """
789
                Customized version of deepdiff.serialization.pretty_print_diff() function, edited to include the
790
                values deleted or added.
791
                """
792
                type_t1 = type(ddiff.t1).__name__
8✔
793
                type_t2 = type(ddiff.t2).__name__
8✔
794

795
                val_t1 = (
8✔
796
                    f'"{ddiff.t1}"'
797
                    if type_t1 in {'str', 'int', 'float'}
798
                    else (
799
                        jsonlib.dumps(ddiff.t1, ensure_ascii=False, indent=2)
800
                        if type_t1 in {'dict', 'list'}
801
                        else str(ddiff.t1)
802
                    )
803
                )
804
                val_t2 = (
8✔
805
                    f'"{ddiff.t2}"'
806
                    if type_t2 in {'str', 'int', 'float'}
807
                    else (
808
                        jsonlib.dumps(ddiff.t2, ensure_ascii=False, indent=2)
809
                        if type_t2 in {'dict', 'list'}
810
                        else str(ddiff.t2)
811
                    )
812
                )
813

814
                diff_path = ddiff.path()  # type: ignore[no-untyped-call]
8✔
815
                return '• ' + PRETTY_FORM_TEXTS.get(ddiff.report_type, '').format(
8✔
816
                    diff_path=diff_path,
817
                    type_t1=type_t1,
818
                    type_t2=type_t2,
819
                    val_t1=val_t1,
820
                    val_t2=val_t2,
821
                )
822

823
            result = []
8✔
824
            for key in ddiff.tree.keys():
8✔
825
                for item_key in ddiff.tree[key]:
8✔
826
                    result.append(_pretty_print_diff(item_key))
8✔
827

828
            return '\n'.join(result)
8✔
829

830
        data_type = directives.get('data_type', 'json')
8✔
831
        old_data = ''
8✔
832
        new_data = ''
8✔
833
        if data_type == 'json':
8✔
834
            try:
8✔
835
                old_data = jsonlib.loads(self.state.old_data)
8✔
836
            except jsonlib.JSONDecodeError:
8✔
837
                old_data = ''
8✔
838
            try:
8✔
839
                new_data = jsonlib.loads(self.state.new_data)
8✔
840
            except jsonlib.JSONDecodeError as e:
8✔
841
                self.state.exception = e
8✔
842
                self.state.traceback = self.job.format_error(e, traceback.format_exc())
8✔
843
                logger.error(f'Job {self.job.index_number}: New data is invalid JSON: {e} ({self.job.get_location()})')
8✔
844
                logger.info(f'Job {self.job.index_number}: {self.state.new_data!r}')
8✔
845
                return {
8✔
846
                    'text': f'Differ {self.__kind__} ERROR: New data is invalid JSON\n{e}',
847
                    'markdown': f'Differ {self.__kind__} **ERROR: New data is invalid JSON**\n{e}',
848
                    'html': f'Differ {self.__kind__} <b>ERROR: New data is invalid JSON</b>\n{e}',
849
                }
850
        elif data_type == 'xml':
8✔
851
            if isinstance(xmltodict, str):  # pragma: no cover
852
                self.raise_import_error('xmltodict', xmltodict)
853

854
            old_data = xmltodict.parse(self.state.old_data)
8✔
855
            new_data = xmltodict.parse(self.state.new_data)
8✔
856

857
        ignore_order: bool = directives.get('ignore_order')  # type: ignore[assignment]
8✔
858
        ignore_string_case: bool = directives.get('ignore_string_case')  # type: ignore[assignment]
8✔
859
        significant_digits = directives.get('significant_digits')
8✔
860
        ddiff = DeepDiff(
8✔
861
            old_data,
862
            new_data,
863
            cache_size=500,
864
            cache_purge_level=0,
865
            cache_tuning_sample_size=500,
866
            ignore_order=ignore_order,
867
            ignore_string_type_changes=True,
868
            ignore_numeric_type_changes=True,
869
            ignore_string_case=ignore_string_case,
870
            significant_digits=significant_digits,
871
            verbose_level=min(2, max(0, math.ceil(3 - logger.getEffectiveLevel() / 10))),
872
        )
873
        diff_text = _pretty_deepdiff(ddiff, report_kind)
8✔
874
        if not diff_text:
8✔
875
            self.state.verb = 'changed,no_report'
8✔
876
            return {'text': '', 'markdown': '', 'html': ''}
8✔
877

878
        self.job.set_to_monospace()
8✔
879
        if report_kind == 'html':
8✔
880
            html_diff = (
8✔
881
                f'<span style="font-family:monospace;white-space:pre-wrap;">'
882
                # f'Differ: {self.__kind__} for {data_type}\n'
883
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>\n'
884
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>\n'
885
                + diff_text[:-1].replace('][', ']<wbr>[')
886
                + '</span>'
887
            )
888
            return {'html': html_diff}
8✔
889
        else:
890
            text_diff = (
8✔
891
                # f'Differ: {self.__kind__} for {data_type}\n'
892
                f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\n'
893
                f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\n'
894
                f'{diff_text}'
895
            )
896
            return {'text': text_diff, 'markdown': text_diff}
8✔
897

898

899
class ImageDiffer(DifferBase):
8✔
900
    """Compares two images providing an image outlining areas that have changed."""
901

902
    __kind__ = 'image'
8✔
903

904
    __supported_directives__ = {
8✔
905
        'data_type': (
906
            "'url' (to retrieve an image), 'ascii85' (Ascii85 data), 'base64' (Base64 data) or 'filename' (the path "
907
            "to an image file) (default: 'url')"
908
        ),
909
        'mse_threshold': (
910
            'the minimum mean squared error (MSE) between two images to consider them changed, if numpy in installed '
911
            '(default: 2.5)'
912
        ),
913
        'ai_google': 'Generative AI summary of changes (BETA)',
914
    }
915

916
    def differ(
8✔
917
        self,
918
        directives: dict[str, Any],
919
        report_kind: Literal['text', 'markdown', 'html'],
920
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
921
        tz: ZoneInfo | None = None,
922
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
923
        warnings.warn(
2✔
924
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
925
            f'change in the future. Please report any problems or suggestions at '
926
            f'https://github.com/mborsetti/webchanges/discussions.',
927
            RuntimeWarning,
928
        )
929
        if isinstance(Image, str):  # pragma: no cover
930
            self.raise_import_error('pillow', Image)
931
        if isinstance(httpx, str):  # pragma: no cover
932
            self.raise_import_error('httpx', httpx)
933

934
        def load_image_from_web(url: str) -> Image.Image:
2✔
935
            """Fetches the image from an url."""
936
            logging.debug(f'Retrieving image from {url}')
2✔
937
            with httpx.stream('GET', url, timeout=10) as response:
2✔
938
                response.raise_for_status()
2✔
939
                return Image.open(BytesIO(b''.join(response.iter_bytes())))
2✔
940

941
        def load_image_from_file(filename: str) -> Image.Image:
2✔
942
            """Load an image from a file."""
943
            logging.debug(f'Reading image from {filename}')
2✔
944
            return Image.open(filename)
2✔
945

946
        def load_image_from_base64(base_64: str) -> Image.Image:
2✔
947
            """Load an image from an encoded bytes object."""
948
            logging.debug('Retrieving image from a base64 string')
2✔
949
            return Image.open(BytesIO(base64.b64decode(base_64)))
2✔
950

951
        def load_image_from_ascii85(ascii85: str) -> Image.Image:
2✔
952
            """Load an image from an encoded bytes object."""
953
            logging.debug('Retrieving image from an ascii85 string')
2✔
954
            return Image.open(BytesIO(base64.a85decode(ascii85)))
2✔
955

956
        def compute_diff_image(img1: Image.Image, img2: Image.Image) -> tuple[Image.Image, np.float64]:
2✔
957
            """Compute the difference between two images."""
958
            # Compute the absolute value of the pixel-by-pixel difference between the two images.
959
            diff_image = ImageChops.difference(img1, img2)
2✔
960

961
            # Compute the mean squared error between the images
962
            if not isinstance(np, str):
2✔
963
                diff_array = np.array(diff_image)
2✔
964
                mse_value = np.mean(np.square(diff_array))
2✔
965
            else:  # pragma: no cover
966
                mse_value = None
967

968
            # Create the diff image by overlaying this difference on a darkened greyscale background
969
            back_image = img1.convert('L')
2✔
970
            back_image_brightness = ImageStat.Stat(back_image).rms[0]
2✔
971
            back_image = ImageEnhance.Brightness(back_image).enhance(back_image_brightness / 225)
2✔
972

973
            # Convert the 'L' image to 'RGB' using a matrix that applies to yellow tint
974
            # The matrix has 12 elements: 4 for Red, 4 for Green, and 4 for Blue.
975
            # For yellow, we want Red and Green to copy the L values (1.0) and Blue to be zero.
976
            # The matrix is: [R, G, B, A] for each of the three output channels
977
            yellow_tint_matrix = (
2✔
978
                1.0,
979
                0.0,
980
                0.0,
981
                0.0,  # Red = 100% of the grayscale value
982
                1.0,
983
                0.0,
984
                0.0,
985
                0.0,  # Green = 100% of the grayscale value
986
                0.0,
987
                0.0,
988
                0.0,
989
                0.0,  # Blue = 0% of the grayscale value
990
            )
991

992
            # Apply the conversion
993
            diff_colored = diff_image.convert('RGB').convert('RGB', matrix=yellow_tint_matrix)
2✔
994

995
            final_img = ImageChops.add(back_image.convert('RGB'), diff_colored)
2✔
996
            final_img.format = img2.format
2✔
997

998
            return final_img, mse_value
2✔
999

1000
        def ai_google(
2✔
1001
            old_image: Image.Image,
1002
            new_image: Image.Image,
1003
            diff_image: Image.Image,
1004
            directives: AiGoogleDirectives,
1005
        ) -> str:
1006
            """Summarize changes in image using Generative AI (ALPHA)."""
1007
            logger.info(f'Job {self.job.index_number}: Running ai_google for {self.__kind__} differ')
×
1008
            warnings.warn(
×
1009
                f'Job {self.job.index_number}: Using ai_google in differ {self.__kind__}, which is ALPHA, '
1010
                f'may have bugs, and may change in the future. Please report any problems or suggestions at '
1011
                f'https://github.com/mborsetti/webchanges/discussions.',
1012
                RuntimeWarning,
1013
            )
1014

UNCOV
1015
            api_version = '1beta'
×
UNCOV
1016
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
×
UNCOV
1017
            if len(GOOGLE_AI_API_KEY) != 39:
×
1018
                logger.error(
×
1019
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1020
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1021
                )
UNCOV
1022
                return (
×
1023
                    f'## ERROR in summarizing changes using {self.__kind__}:\n'
1024
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1025
                    f'{len(GOOGLE_AI_API_KEY)}.\n'
1026
                )
UNCOV
1027
            client = httpx.Client(http2=True, timeout=self.job.timeout)
×
1028

UNCOV
1029
            def _load_image(img_data: tuple[str, Image.Image]) -> dict[str, dict[str, str] | Exception | str]:
×
UNCOV
1030
                img_name, image = img_data
×
1031
                # Convert image to bytes
UNCOV
1032
                img_byte_arr = io.BytesIO()
×
UNCOV
1033
                image.save(img_byte_arr, format=image.format)
×
1034
                image_data = img_byte_arr.getvalue()
×
UNCOV
1035
                mime_type = f'image/{image.format.lower()}'  # type: ignore[union-attr]
×
1036

UNCOV
1037
                logger.info(
×
1038
                    f'Job {self.job.index_number}: Loading {img_name} ({image.format}) to Google AI '
1039
                    f'({len(image_data) / 1024:,.0f} kbytes)'
1040
                )
1041

1042
                # Initial resumable upload request
1043
                headers = {
×
1044
                    'X-Goog-Upload-Protocol': 'resumable',
1045
                    'X-Goog-Upload-Command': 'start',
1046
                    'X-Goog-Upload-Header-Content-Length': str(len(image_data)),
1047
                    'X-Goog-Upload-Header-Content-Type': mime_type,
1048
                    'Content-Type': 'application/json',
1049
                }
1050
                data = {'file': {'display_name': 'TEXT'}}
×
1051

1052
                try:
×
UNCOV
1053
                    response = client.post(
×
1054
                        f'https://generativelanguage.googleapis.com/upload/v{api_version}/files?'
1055
                        f'key={GOOGLE_AI_API_KEY}',
1056
                        headers=headers,
1057
                        json=data,
1058
                    )
UNCOV
1059
                except httpx.HTTPError as e:
×
1060
                    return {'error': e, 'img_name': img_name}
×
1061
                upload_url = response.headers['X-Goog-Upload-Url']
×
1062

1063
                # Upload the image data
UNCOV
1064
                headers = {
×
1065
                    'Content-Length': str(len(image_data)),
1066
                    'X-Goog-Upload-Offset': '0',
1067
                    'X-Goog-Upload-Command': 'upload, finalize',
1068
                }
UNCOV
1069
                try:
×
1070
                    response = client.post(upload_url, headers=headers, content=image_data)
×
UNCOV
1071
                except httpx.HTTPError as e:
×
UNCOV
1072
                    return {'error': e, 'img_name': img_name}
×
1073

1074
                # Extract file URI from response
UNCOV
1075
                file_info = response.json()
×
UNCOV
1076
                file_uri = file_info['file']['uri']
×
UNCOV
1077
                logger.info(f'Job {self.job.index_number}: {img_name.capitalize()} loaded to {file_uri}')
×
1078

1079
                return {
×
1080
                    'file_data': {
1081
                        'mime_type': mime_type,
1082
                        'file_uri': file_uri,
1083
                    }
1084
                }
1085

1086
            # upload to Google
UNCOV
1087
            additional_parts: list[dict[str, dict[str, str]]] = []
×
1088
            executor = ThreadPoolExecutor()
×
1089
            for additional_part in executor.map(
×
1090
                _load_image,
1091
                (
1092
                    ('old image', old_image),
1093
                    ('new image', new_image),
1094
                    ('differences image', diff_image),
1095
                ),
1096
            ):
UNCOV
1097
                if 'error' not in additional_part:
×
UNCOV
1098
                    additional_parts.append(additional_part)  # type: ignore[arg-type]
×
1099
                else:
UNCOV
1100
                    logger.error(
×
1101
                        f'Job {self.job.index_number}: ai_google for {self.__kind__} HTTP Client error '
1102
                        f"{type(additional_part['error'])} when loading {additional_part['img_name']} to Google AI: "
1103
                        f"{additional_part['error']}"
1104
                    )
UNCOV
1105
                    return (
×
1106
                        f"HTTP Client error {type(additional_part['error'])} when loading "
1107
                        f"{additional_part['img_name']} to Google AI: {additional_part['error']}"
1108
                    )
1109

UNCOV
1110
            system_instructions = (
×
1111
                'You are a skilled journalist tasked with summarizing the key differences between two versions '
1112
                'of the same image. The audience for your summary is already familiar with the image, so you can'
1113
                'focus on the most significant changes.'
1114
            )
UNCOV
1115
            model_prompt = (
×
1116
                'You are a skilled visual analyst tasked with analyzing two versions of an image and summarizing the '
1117
                'key differences between them. The audience for your summary is already familiar with the '
1118
                "image's content, so you should focus only on the most significant differences.\n\n"
1119
                '**Instructions:**\n\n'
1120
                '1. Carefully examine the yellow areas in the image '
1121
                f"{additional_parts[2]['file_data']['file_uri']}, identify the differences, and describe them.\n"
1122
                f"2. Refer to the old version of the image {additional_parts[0]['file_data']['file_uri']} and the new "
1123
                f" version {additional_parts[1]['file_data']['file_uri']}.\n"
1124
                '3. You are only interested in those differences, such as additions, removals, or alterations, that '
1125
                'modify the intended message or interpretation.\n'
1126
                '4. Summarize the identified differences, except those ignored, in a clear and concise manner, '
1127
                'explaining how the meaning has shifted or evolved in the new version compared to the old version only '
1128
                'when necessary. Be specific and provide examples to illustrate your points when needed.\n'
1129
                '5. If there are only additions to the image, then summarize the additions.\n'
1130
                '6. Use Markdown formatting to structure your summary effectively. Use headings, bullet points, '
1131
                'and other Markdown elements as needed to enhance readability.\n'
1132
                '7. Restrict your analysis and summary to the information provided within these images. Do '
1133
                'not introduce external information or assumptions.\n'
1134
            )
UNCOV
1135
            summary, _ = AIGoogleDiffer._send_to_model(
×
1136
                self.job,
1137
                system_instructions,
1138
                model_prompt,
1139
                additional_parts=additional_parts,  # type: ignore[arg-type]
1140
                directives=directives,
1141
            )
1142

UNCOV
1143
            return summary
×
1144

1145
        data_type = directives.get('data_type', 'url')
2✔
1146
        mse_threshold = directives.get('mse_threshold', 2.5)
2✔
1147
        if not isinstance(self.state.old_data, str):
2!
UNCOV
1148
            raise ValueError('old_data is not a string')
×
1149
        if not isinstance(self.state.new_data, str):
2!
UNCOV
1150
            raise ValueError('new_data is not a string')
×
1151
        if data_type == 'url':
2✔
1152
            old_image = load_image_from_web(self.state.old_data)
2✔
1153
            new_image = load_image_from_web(self.state.new_data)
2✔
1154
            old_data = f' (<a href="{self.state.old_data}" target="_blank">Old image</a>)'
2✔
1155
            new_data = f' (<a href="{self.state.new_data}" target="_blank">New image</a>)'
2✔
1156
        elif data_type == 'ascii85':
2✔
1157
            old_image = load_image_from_ascii85(self.state.old_data)
2✔
1158
            new_image = load_image_from_ascii85(self.state.new_data)
2✔
1159
            old_data = ''
2✔
1160
            new_data = ''
2✔
1161
        elif data_type == 'base64':
2✔
1162
            old_image = load_image_from_base64(self.state.old_data)
2✔
1163
            new_image = load_image_from_base64(self.state.new_data)
2✔
1164
            old_data = ''
2✔
1165
            new_data = ''
2✔
1166
        else:  # 'filename'
1167
            old_image = load_image_from_file(self.state.old_data)
2✔
1168
            new_image = load_image_from_file(self.state.new_data)
2✔
1169
            old_data = f' (<a href="file://{self.state.old_data}" target="_blank">Old image</a>)'
2✔
1170
            new_data = f' (<a href="file://{self.state.new_data}" target="_blank">New image</a>)'
2✔
1171

1172
        # Check formats  TODO: is it needed? under which circumstances?
1173
        # if new_image.format != old_image.format:
1174
        #     logger.info(f'Image formats do not match: {old_image.format} vs {new_image.format}')
1175
        # else:
1176
        #     logger.debug(f'image format is {old_image.format}')
1177

1178
        # If needed, shrink the larger image
1179
        if new_image.size != old_image.size:
2✔
1180
            if new_image.size > old_image.size:
2✔
1181
                logging.debug(f'Job {self.job.index_number}: Shrinking the new image')
2✔
1182
                img_format = new_image.format
2✔
1183
                new_image = new_image.resize(old_image.size, Image.Resampling.LANCZOS)
2✔
1184
                new_image.format = img_format
2✔
1185

1186
            else:
1187
                logging.debug(f'Job {self.job.index_number}: Shrinking the old image')
2✔
1188
                img_format = old_image.format
2✔
1189
                old_image = old_image.resize(new_image.size, Image.Resampling.LANCZOS)
2✔
1190
                old_image.format = img_format
2✔
1191

1192
        if old_image == new_image:
2✔
1193
            logger.info(f'Job {self.job.index_number}: New image is identical to the old one')
2✔
1194
            self.state.verb = 'unchanged'
2✔
1195
            return {'text': '', 'markdown': '', 'html': ''}
2✔
1196

1197
        diff_image, mse_value = compute_diff_image(old_image, new_image)
2✔
1198
        if mse_value:
2!
1199
            logger.debug(f'Job {self.job.index_number}: MSE value {mse_value:.2f}')
2✔
1200

1201
        if mse_value and mse_value < mse_threshold:
2✔
1202
            logger.info(
2✔
1203
                f'Job {self.job.index_number}: MSE value {mse_value:.2f} below the threshold of {mse_threshold}; '
1204
                f'considering changes not worthy of a report'
1205
            )
1206
            self.state.verb = 'changed,no_report'
2✔
1207
            return {'text': '', 'markdown': '', 'html': ''}
2✔
1208

1209
        # Convert the difference image to a base64 object
1210
        output_stream = BytesIO()
2✔
1211
        diff_image.save(output_stream, format=diff_image.format)
2✔
1212
        encoded_diff = b64encode(output_stream.getvalue()).decode()
2✔
1213

1214
        # Convert the new image to a base64 object
1215
        output_stream = BytesIO()
2✔
1216
        new_image.save(output_stream, format=new_image.format)
2✔
1217
        encoded_new = b64encode(output_stream.getvalue()).decode()
2✔
1218

1219
        # prepare AI summary
1220
        summary = ''
2✔
1221
        if 'ai_google' in directives:
2!
UNCOV
1222
            summary = ai_google(old_image, new_image, diff_image, directives.get('ai_google', {}))
×
1223

1224
        # Prepare HTML output
1225
        htm = [
2✔
1226
            f'<span style="font-family:monospace">'
1227
            # f'Differ: {self.__kind__} for {data_type}',
1228
            f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}{old_data}</span>',
1229
            f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}{new_data}'
1230
            '</span>',
1231
            '</span>',
1232
            'New image:',
1233
        ]
1234
        if data_type == 'url':
2✔
1235
            htm.append(f'<img src="{self.state.old_data}" style="max-width: 100%; display: block;">')
2✔
1236
        else:
1237
            htm.append(
2✔
1238
                f'<img src="data:image/{(new_image.format or "").lower()};base64,{encoded_new}" '
1239
                'style="max-width: 100%; display: block;">'
1240
            )
1241
        htm.extend(
2✔
1242
            [
1243
                'Differences from old (in yellow):',
1244
                f'<img src="data:image/{(diff_image.format or "").lower()};base64,{encoded_diff}" '
1245
                'style="max-width: 100%; display: block;">',
1246
            ]
1247
        )
1248
        changed_text = 'The image has changed; please see an HTML report for the visualization.'
2✔
1249
        if not summary:
2!
1250
            return {
2✔
1251
                'text': changed_text,
1252
                'markdown': changed_text,
1253
                'html': '<br>\n'.join(htm),
1254
            }
1255

1256
        newline = '\n'  # For Python < 3.12 f-string {} compatibility
×
UNCOV
1257
        back_n = '\\n'  # For Python < 3.12 f-string {} compatibility
×
UNCOV
1258
        directives_text = (
×
1259
            ', '.join(
1260
                f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.get('ai_google', {}).items()
1261
            )
1262
            or 'None'
1263
        )
UNCOV
1264
        footer = f'Summary generated by Google Generative AI (ai_google directive(s): {directives_text})'
×
UNCOV
1265
        return {
×
1266
            'text': (
1267
                f'{summary}\n\n\nA visualization of differences is available in {__package__} HTML reports.'
1268
                f'\n------------\n{footer}'
1269
            ),
1270
            'markdown': (
1271
                f'{summary}\n\n\nA visualization of differences is available in {__package__} HTML reports.'
1272
                f'\n* * *\n{footer}'
1273
            ),
1274
            'html': '<br>\n'.join(
1275
                [
1276
                    mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>'),
1277
                    '',
1278
                ]
1279
                + htm
1280
                + [
1281
                    '-----',
1282
                    f'<i><small>{footer}</small></i>',
1283
                ]
1284
            ),
1285
        }
1286

1287

1288
class AIGoogleDiffer(DifferBase):
8✔
1289
    """(Default) Generates a summary using Google Generative AI (Gemini models).
1290

1291
    Calls Google Gemini APIs; documentation at https://ai.google.dev/api/rest and tutorial at
1292
    https://ai.google.dev/tutorials/rest_quickstart
1293

1294
    """
1295

1296
    __kind__ = 'ai_google'
8✔
1297

1298
    __supported_directives__ = {
8✔
1299
        'model': ('model name from https://ai.google.dev/gemini-api/docs/models/gemini (default: gemini-2.0-flash)'),
1300
        'system_instructions': (
1301
            'Optional tone and style instructions for the model (default: see documentation at'
1302
            'https://webchanges.readthedocs.io/en/stable/differs.html#ai-google-diff)'
1303
        ),
1304
        'prompt': 'a custom prompt - {unified_diff}, {unified_diff_new}, {old_text} and {new_text} will be replaced',
1305
        'additions_only': 'summarizes only added lines (including as a result of a change)',
1306
        'prompt_ud_context_lines': 'the number of context lines for {unified_diff} (default: 9999)',
1307
        'timeout': 'the number of seconds before timing out the API call (default: 300)',
1308
        'max_output_tokens': "the maximum number of tokens returned by the model (default: None, i.e. model's default)",
1309
        'temperature': "the model's Temperature parameter (default: 0.0)",
1310
        'top_p': "the model's TopP parameter (default: None, i.e. model's default",
1311
        'top_k': "the model's TopK parameter (default: None, i.e. model's default",
1312
        'tools': "data passed on to the API's 'tools' field (default: None)",
1313
        'unified': 'directives passed to the unified differ (default: None)',
1314
    }
1315
    __default_directive__ = 'model'
8✔
1316

1317
    @staticmethod
8✔
1318
    def _send_to_model(
8✔
1319
        job: JobBase,
1320
        system_instructions: str,
1321
        model_prompt: str,
1322
        additional_parts: list[dict[str, str | dict[str, str]]] | None = None,
1323
        directives: AiGoogleDirectives | None = None,
1324
    ) -> tuple[str, str]:
1325
        """Creates the summary request to the model; returns the summary and the version of the actual model used."""
1326
        api_version = '1beta'
×
1327
        if directives is None:
×
1328
            directives = {}
×
UNCOV
1329
        model = directives.get('model', 'gemini-2.0-flash')
×
UNCOV
1330
        timeout = directives.get('timeout', 300)
×
UNCOV
1331
        max_output_tokens = directives.get('max_output_tokens')
×
1332
        temperature = directives.get('temperature', 0.0)
×
UNCOV
1333
        top_p = directives.get('top_p', 1.0 if temperature == 0.0 else None)
×
UNCOV
1334
        top_k = directives.get('top_k')
×
UNCOV
1335
        GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
×
UNCOV
1336
        if len(GOOGLE_AI_API_KEY) != 39:
×
UNCOV
1337
            logger.error(
×
1338
                f'Job {job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1339
                f'incorrect length {len(GOOGLE_AI_API_KEY)} ({job.get_location()})'
1340
            )
UNCOV
1341
            return (
×
1342
                f'## ERROR in summarizing changes using Google AI:\n'
1343
                f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1344
                f'{len(GOOGLE_AI_API_KEY)}.\n',
1345
                '',
1346
            )
1347

UNCOV
1348
        data: dict[str, Any] = {
×
1349
            'system_instruction': {'parts': [{'text': system_instructions}]},
1350
            'contents': [{'parts': [{'text': model_prompt}]}],
1351
            'generation_config': {
1352
                'max_output_tokens': max_output_tokens,
1353
                'temperature': temperature,
1354
                'top_p': top_p,
1355
                'top_k': top_k,
1356
            },
1357
        }
UNCOV
1358
        if additional_parts:
×
UNCOV
1359
            data['contents'][0]['parts'].extend(additional_parts)
×
UNCOV
1360
        if directives.get('tools'):
×
UNCOV
1361
            data['tools'] = directives['tools']
×
UNCOV
1362
        logger.info(f'Job {job.index_number}: Making the content generation request to Google AI model {model}')
×
1363
        model_version = model  # default
×
1364
        try:
×
1365
            r = httpx.Client(http2=True).post(  # noqa: S113 Call to httpx without timeout
×
1366
                f'https://generativelanguage.googleapis.com/v{api_version}/models/{model}:generateContent?'
1367
                f'key={GOOGLE_AI_API_KEY}',
1368
                json=data,
1369
                headers={'Content-Type': 'application/json'},
1370
                timeout=timeout,
1371
            )
UNCOV
1372
            if r.is_success:
×
UNCOV
1373
                result = r.json()
×
1374
                candidate = result['candidates'][0]
×
UNCOV
1375
                logger.info(f"Job {job.index_number}: AI generation finished by {candidate['finishReason']}")
×
1376
                if 'content' in candidate:
×
1377
                    summary: str = candidate['content']['parts'][0]['text'].rstrip()
×
1378
                else:
UNCOV
1379
                    summary = (
×
1380
                        f'AI summary unavailable: Model did not return any candidate output:\n'
1381
                        f'{jsonlib.dumps(result, ensure_ascii=True, indent=2)}'
1382
                    )
UNCOV
1383
                model_version = result['modelVersion']
×
1384

1385
            elif r.status_code == 400:
×
1386
                summary = (
×
1387
                    f'AI summary unavailable: Received error from {r.url.host}: '
1388
                    f"{r.json().get('error', {}).get('message') or ''}"
1389
                )
1390
            else:
UNCOV
1391
                summary = (
×
1392
                    f'AI summary unavailable: Received error {r.status_code} {r.reason_phrase} from ' f'{r.url.host}'
1393
                )
UNCOV
1394
                if r.content:
×
UNCOV
1395
                    summary += f": {r.json().get('error', {}).get('message') or ''}"
×
1396

UNCOV
1397
        except httpx.HTTPError as e:
×
UNCOV
1398
            summary = (
×
1399
                f'AI summary unavailable: HTTP client error: {e} when requesting data from ' f'{e.request.url.host}'
1400
            )
1401

UNCOV
1402
        return summary, model_version
×
1403

1404
    def differ(
8✔
1405
        self,
1406
        directives: AiGoogleDirectives,  # type: ignore[override]
1407
        report_kind: Literal['text', 'markdown', 'html'],
1408
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1409
        tz: ZoneInfo | None = None,
1410
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1411
        logger.info(f'Job {self.job.index_number}: Running the {self.__kind__} differ from hooks.py')
8✔
1412
        warnings.warn(
8✔
1413
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1414
            f'change in the future. Please report any problems or suggestions at '
1415
            f'https://github.com/mborsetti/webchanges/discussions.',
1416
            RuntimeWarning,
1417
        )
1418

1419
        def get_ai_summary(prompt: str, system_instructions: str) -> tuple[str, str]:
8✔
1420
            """Generate AI summary from unified diff, or an error message, plus the model version."""
1421
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
8✔
1422
            if len(GOOGLE_AI_API_KEY) != 39:
8✔
1423
                logger.error(
8✔
1424
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1425
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1426
                )
1427
                return (
8✔
1428
                    f'## ERROR in summarizing changes using {self.__kind__}:\n'
1429
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1430
                    f'{len(GOOGLE_AI_API_KEY)}.\n',
1431
                    '',
1432
                )
1433

1434
            if '{unified_diff' in prompt:  # matches unified_diff or unified_diff_new
8!
UNCOV
1435
                default_context_lines = 9999 if '{unified_diff}' in prompt else 0  # none if only unified_diff_new
×
UNCOV
1436
                context_lines = directives.get('prompt_ud_context_lines', default_context_lines)
×
UNCOV
1437
                unified_diff = '\n'.join(
×
1438
                    difflib.unified_diff(
1439
                        str(self.state.old_data).splitlines(),
1440
                        str(self.state.new_data).splitlines(),
1441
                        # '@',
1442
                        # '@',
1443
                        # self.make_timestamp(self.state.old_timestamp, tz),
1444
                        # self.make_timestamp(self.state.new_timestamp, tz),
1445
                        n=context_lines,
1446
                    )
1447
                )
1448
                if not unified_diff:
×
1449
                    # no changes
1450
                    return '', ''
×
1451
            else:
1452
                unified_diff = ''
8✔
1453

1454
            if '{unified_diff_new}' in prompt:
8!
UNCOV
1455
                unified_diff_new_lines = []
×
UNCOV
1456
                for line in unified_diff.splitlines():
×
UNCOV
1457
                    if line.startswith('+'):
×
1458
                        unified_diff_new_lines.append(line[1:])
×
UNCOV
1459
                unified_diff_new = '\n'.join(unified_diff_new_lines)
×
1460
            else:
1461
                unified_diff_new = ''
8✔
1462

1463
            # check if data is different (same data is sent during testing)
1464
            if '{old_text}' in prompt and '{new_text}' in prompt and self.state.old_data == self.state.new_data:
8!
1465
                return '', ''
8✔
1466

UNCOV
1467
            model_prompt = prompt.format(
×
1468
                unified_diff=unified_diff,
1469
                unified_diff_new=unified_diff_new,
1470
                old_text=self.state.old_data,
1471
                new_text=self.state.new_data,
1472
            )
1473

UNCOV
1474
            summary, model_version = self._send_to_model(
×
1475
                self.job,
1476
                system_instructions,
1477
                model_prompt,
1478
                directives=directives,
1479
            )
1480

UNCOV
1481
            return summary, model_version
×
1482

1483
        if directives.get('additions_only') or self.job.additions_only:
8!
UNCOV
1484
            default_system_instructions = (
×
1485
                'You are a skilled journalist. Your task is to summarize the provided text in a clear and concise '
1486
                'manner. Restrict your analysis and summary *only* to the text provided. Do not introduce any '
1487
                'external information or assumptions.\n\n'
1488
                'Format your summary using Markdown. Use headings, bullet points, and other Markdown elements where '
1489
                'appropriate to create a well-structured and easily readable summary.'
1490
            )
UNCOV
1491
            default_prompt = '{unified_diff_new}'
×
1492
        else:
1493
            default_system_instructions = (
8✔
1494
                'You are a skilled journalist tasked with analyzing two versions of a text and summarizing the key '
1495
                'differences in meaning between them. The audience for your summary is already familiar with the '
1496
                "text's content, so you can focus on the most significant changes.\n\n"
1497
                '**Instructions:**\n\n'
1498
                '1. Carefully examine the old version of the text, provided within the `<old_version>` and '
1499
                '`</old_version>` tags.\n'
1500
                '2. Carefully examine the new version of the text, provided within the `<new_version>` and '
1501
                '`</new_version>` tags.\n'
1502
                '3. Compare the two versions, identifying areas where the meaning differs. This includes additions, '
1503
                'removals, or alterations that change the intended message or interpretation.\n'
1504
                '4. Ignore changes that do not affect the overall meaning, even if the wording has been modified.\n'
1505
                '5. Summarize the identified differences, except those ignored, in a clear and concise manner, '
1506
                'explaining how the meaning has shifted or evolved in the new version compared to the old version only '
1507
                'when necessary. Be specific and provide examples to illustrate your points when needed.\n'
1508
                '6. If there are only additions to the text, then summarize the additions.\n'
1509
                '7. Use Markdown formatting to structure your summary effectively. Use headings, bullet points, '
1510
                'and other Markdown elements as needed to enhance readability.\n'
1511
                '8. Restrict your analysis and summary to the information provided within the `<old_version>` and '
1512
                '`<new_version>` tags. Do not introduce external information or assumptions.\n'
1513
            )
1514
            default_prompt = '<old_version>\n{old_text}\n</old_version>\n\n<new_version>\n{new_text}\n</new_version>'
8✔
1515
        system_instructions = directives.get('system_instructions', default_system_instructions)
8✔
1516
        prompt = directives.get('prompt', default_prompt).replace('\\n', '\n')
8✔
1517
        summary, model_version = get_ai_summary(prompt, system_instructions)
8✔
1518
        if not summary:
8✔
1519
            self.state.verb = 'changed,no_report'
8✔
1520
            return {'text': '', 'markdown': '', 'html': ''}
8✔
1521
        newline = '\n'  # For Python < 3.12 f-string {} compatibility
8✔
1522
        back_n = '\\n'  # For Python < 3.12 f-string {} compatibility
8✔
1523
        directives.pop('model', None)
8✔
1524
        if directives:
8!
UNCOV
1525
            directives_text = (
×
1526
                ' (differ directive(s): '
1527
                + (
1528
                    ', '.join(f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.items())
1529
                    or 'None'
1530
                )
1531
                + ')'
1532
            )
1533
        else:
1534
            directives_text = ''
8✔
1535
        footer = (
8✔
1536
            f"Summary by Google Generative AI's model {model_version}{directives_text}"
1537
            if model_version and directives_text
1538
            else ''
1539
        )
1540
        temp_unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
1541
        for rep_kind in ['text', 'html']:  # markdown is same as text
8✔
1542
            unified_report = DifferBase.process(
8✔
1543
                'unified',
1544
                directives.get('unified') or {},  # type: ignore[arg-type]
1545
                self.state,
1546
                rep_kind,  # type: ignore[arg-type]
1547
                tz,
1548
                temp_unfiltered_diff,
1549
            )
1550
        return {
8✔
1551
            'text': f"{summary}\n\n{unified_report['text']}" + (f'\n------------\n{footer}' if footer else ''),
1552
            'markdown': f"{summary}\n\n{unified_report['markdown']}" + (f'\n* * *\n{footer}' if footer else ''),
1553
            'html': '\n'.join(
1554
                [
1555
                    mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>'),
1556
                    '<br>',
1557
                    '<br>',
1558
                    unified_report['html'],
1559
                ]
1560
                + (['-----<br>', f'<i><small>{footer}</small></i>'] if footer else [])
1561
            ),
1562
        }
1563

1564

1565
class WdiffDiffer(DifferBase):
8✔
1566
    __kind__ = 'wdiff'
8✔
1567

1568
    __supported_directives__: dict[str, str] = {
8✔
1569
        'context_lines': 'the number of context lines (default: 3)',
1570
        'range_info': 'include range information lines (default: true)',
1571
    }
1572

1573
    def differ(
8✔
1574
        self,
1575
        directives: dict[str, Any],
1576
        report_kind: Literal['text', 'markdown', 'html'],
1577
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1578
        tz: ZoneInfo | None = None,
1579
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1580
        warnings.warn(
8✔
1581
            f'Job {self.job.index_number}: Differ {self.__kind__} is WORK IN PROGRESS and has KNOWN bugs which '
1582
            "are being worked on. DO NOT USE AS THE RESULTS WON'T BE CORRECT.",
1583
            RuntimeWarning,
1584
        )
1585
        if not isinstance(self.state.old_data, str):
8!
UNCOV
1586
            raise ValueError
×
1587
        if not isinstance(self.state.new_data, str):
8!
UNCOV
1588
            raise ValueError
×
1589

1590
        # Split the texts into words tokenizing newline
1591
        if self.state.is_markdown():
8!
1592
            # Don't split spaces in link text, tokenize space as </s>
1593
            old_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.old_data)
8✔
1594
            words1 = old_data.replace('\n', ' <\\n> ').split(' ')
8✔
1595
            new_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.new_data)
8✔
1596
            words2 = new_data.replace('\n', ' <\\n> ').split(' ')
8✔
1597
        else:
UNCOV
1598
            words1 = self.state.old_data.replace('\n', ' <\\n> ').split(' ')
×
UNCOV
1599
            words2 = self.state.new_data.replace('\n', ' <\\n> ').split(' ')
×
1600

1601
        # Create a Differ object
1602
        import difflib
8✔
1603

1604
        d = difflib.Differ()
8✔
1605

1606
        # Generate a difference list
1607
        diff = list(d.compare(words1, words2))
8✔
1608

1609
        add_html = '<span style="background-color:#d1ffd1;color:#082b08;">'
8✔
1610
        rem_html = '<span style="background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;">'
8✔
1611

1612
        head_text = (
8✔
1613
            # f'Differ: wdiff\n'
1614
            f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m\n'
1615
            f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m\n'
1616
        )
1617
        head_html = '<br>\n'.join(
8✔
1618
            [
1619
                '<span style="font-family:monospace;">'
1620
                # 'Differ: wdiff',
1621
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
1622
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>'
1623
                f'</span>',
1624
                '',
1625
            ]
1626
        )
1627
        # Process the diff output to make it more wdiff-like
1628
        result_text = []
8✔
1629
        result_html = []
8✔
1630
        prev_word_text = ''
8✔
1631
        prev_word_html = ''
8✔
1632
        next_text = ''
8✔
1633
        next_html = ''
8✔
1634
        add = False
8✔
1635
        rem = False
8✔
1636

1637
        for word_text in diff + ['  ']:
8✔
1638
            if word_text[0] == '?':  # additional context line
8✔
1639
                continue
8✔
1640
            word_html = word_text
8✔
1641
            pre_text = [next_text] if next_text else []
8✔
1642
            pre_html = [next_html] if next_html else []
8✔
1643
            next_text = ''
8✔
1644
            next_html = ''
8✔
1645

1646
            if word_text[0] == '+' and not add:  # Beginning of additions
8✔
1647
                if rem:
8✔
1648
                    prev_word_html += '</span>'
8✔
1649
                    rem = False
8✔
1650
                if word_text[2:] == '<\\n>':
8!
UNCOV
1651
                    next_text = '\033[92m'
×
UNCOV
1652
                    next_html = add_html
×
1653
                else:
1654
                    pre_text.append('\033[92m')
8✔
1655
                    pre_html.append(add_html)
8✔
1656
                add = True
8✔
1657
            elif word_text[0] == '-' and not rem:  # Beginning of deletions
8✔
1658
                if add:
8✔
1659
                    prev_word_html += '</span>'
8✔
1660
                    add = False
8✔
1661
                if word_text[2:] == '<\\n>':
8!
1662
                    next_text = '\033[91m'
×
UNCOV
1663
                    next_html = rem_html
×
1664
                else:
1665
                    pre_text.append('\033[91m')
8✔
1666
                    pre_html.append(rem_html)
8✔
1667
                rem = True
8✔
1668
            elif word_text[0] == ' ' and (add or rem):  # Unchanged word
8✔
1669
                if prev_word_text == '<\\n>':
8!
1670
                    prev_word_text = '\033[0m<\\n>'
×
1671
                    prev_word_html = '</span><\\n>'
×
1672
                else:
1673
                    prev_word_text += '\033[0m'
8✔
1674
                    prev_word_html += '</span>'
8✔
1675
                add = False
8✔
1676
                rem = False
8✔
1677
            elif word_text[2:] == '<\\n>':  # New line
8✔
1678
                if add:
8!
UNCOV
1679
                    word_text = '  \033[0m<\\n>'
×
UNCOV
1680
                    word_html = '  </span><\\n>'
×
UNCOV
1681
                    add = False
×
1682
                elif rem:
8!
UNCOV
1683
                    word_text = '  \033[0m<\\n>'
×
UNCOV
1684
                    word_html = '  </span><\\n>'
×
1685
                    rem = False
×
1686

1687
            result_text.append(prev_word_text)
8✔
1688
            result_html.append(prev_word_html)
8✔
1689
            pre_text.append(word_text[2:])
8✔
1690
            pre_html.append(word_html[2:])
8✔
1691
            prev_word_text = ''.join(pre_text)
8✔
1692
            prev_word_html = ''.join(pre_html)
8✔
1693
        if add or rem:
8!
UNCOV
1694
            result_text[-1] += '\033[0m'
×
UNCOV
1695
            result_html[-1] += '</span>'
×
1696

1697
        # rebuild the text from words, replacing the newline token
1698
        diff_text = ' '.join(result_text[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1699
        diff_html = ' '.join(result_html[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1700

1701
        # build contextlines
1702
        contextlines = directives.get('context_lines', self.job.contextlines)
8✔
1703
        # contextlines = 999
1704
        if contextlines is None:
8!
1705
            contextlines = 3
8✔
1706
        range_info = directives.get('range_info', True)
8✔
1707
        if contextlines < len(diff_text.splitlines()):
8!
1708
            lines_with_changes = []
×
1709
            for i, line in enumerate(diff_text.splitlines()):
×
1710
                if '\033[9' in line:
×
1711
                    lines_with_changes.append(i)
×
1712
            if contextlines:
×
1713
                lines_to_keep: set[int] = set()
×
1714
                for i in lines_with_changes:
×
1715
                    lines_to_keep.update(r for r in range(i - contextlines, i + contextlines + 1))
×
1716
            else:
1717
                lines_to_keep = set(lines_with_changes)
×
1718
            new_diff_text = []
×
1719
            new_diff_html = []
×
1720
            last_line = 0
×
1721
            skip = False
×
1722
            i = 0
×
UNCOV
1723
            for i, (line_text, line_html) in enumerate(zip(diff_text.splitlines(), diff_html.splitlines())):
×
1724
                if i in lines_to_keep:
×
1725
                    if range_info and skip:
×
1726
                        new_diff_text.append(f'@@ {last_line + 1}...{i} @@')
×
1727
                        new_diff_html.append(f'@@ {last_line + 1}...{i} @@')
×
1728
                        skip = False
×
1729
                    new_diff_text.append(line_text)
×
1730
                    new_diff_html.append(line_html)
×
UNCOV
1731
                    last_line = i + 1
×
1732
                else:
UNCOV
1733
                    skip = True
×
UNCOV
1734
            if (i + 1) != last_line:
×
UNCOV
1735
                if range_info and skip:
×
UNCOV
1736
                    new_diff_text.append(f'@@ {last_line + 1}...{i + 1} @@')
×
UNCOV
1737
                    new_diff_html.append(f'@@ {last_line + 1}...{i + 1} @@')
×
1738
            diff_text = '\n'.join(new_diff_text)
×
UNCOV
1739
            diff_html = '\n'.join(new_diff_html)
×
1740

1741
        if self.state.is_markdown():
8!
1742
            diff_text = diff_text.replace('</s>', ' ')
8✔
1743
            diff_html = diff_html.replace('</s>', ' ')
8✔
1744
            diff_html = mark_to_html(diff_html, self.job.markdown_padded_tables).replace('<p>', '').replace('</p>', '')
8✔
1745

1746
        if self.job.monospace:
8!
UNCOV
1747
            diff_html = f'<span style="font-family:monospace;white-space:pre-wrap">{diff_html}</span>'
×
1748
        else:
1749
            diff_html = diff_html.replace('\n', '<br>\n')
8✔
1750

1751
        return {
8✔
1752
            'text': head_text + diff_text,
1753
            'markdown': head_text + diff_text,
1754
            'html': head_html + diff_html,
1755
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc