• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 14147994698

29 Mar 2025 05:43PM UTC coverage: 75.068% (-0.3%) from 75.4%
14147994698

push

github

mborsetti
Version 3.30.0.rc1

1756 of 2660 branches covered (66.02%)

Branch coverage included in aggregate %.

19 of 24 new or added lines in 3 files covered. (79.17%)

249 existing lines in 5 files now uncovered.

4606 of 5815 relevant lines covered (79.21%)

6.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.09
/webchanges/differs.py
1
"""Differs."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import base64
8✔
8
import difflib
8✔
9
import html
8✔
10
import io
8✔
11
import logging
8✔
12
import math
8✔
13
import os
8✔
14
import re
8✔
15
import shlex
8✔
16
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
17
import tempfile
8✔
18
import traceback
8✔
19
import urllib.parse
8✔
20
import warnings
8✔
21
from base64 import b64encode
8✔
22
from concurrent.futures import ThreadPoolExecutor
8✔
23
from datetime import datetime
8✔
24
from io import BytesIO
8✔
25
from pathlib import Path
8✔
26
from typing import Any, Iterator, Literal, TYPE_CHECKING, TypedDict
8✔
27
from zoneinfo import ZoneInfo
8✔
28

29
import html2text
8✔
30
import yaml
8✔
31

32
from webchanges.jobs import JobBase
8✔
33
from webchanges.util import linkify, mark_to_html, TrackSubClasses
8✔
34

35
try:
8✔
36
    from deepdiff import DeepDiff
8✔
37
    from deepdiff.model import DiffLevel
8✔
38
except ImportError as e:  # pragma: no cover
39
    DeepDiff = str(e)  # type: ignore[assignment,misc]
40

41
try:
8✔
42
    import httpx
8✔
43
except ImportError:  # pragma: no cover
44
    httpx = None  # type: ignore[assignment]
45
if httpx is not None:
8!
46
    try:
8✔
47
        import h2
8✔
48
    except ImportError:  # pragma: no cover
49
        h2 = None  # type: ignore[assignment]
50

51
try:
8✔
52
    import numpy as np
8✔
53
except ImportError as e:  # pragma: no cover
54
    np = str(e)  # type: ignore[assignment]
55

56
try:
8✔
57
    from PIL import Image, ImageChops, ImageEnhance, ImageStat
8✔
58
except ImportError as e:  # pragma: no cover
59
    Image = str(e)  # type: ignore[assignment]
60

61
# https://stackoverflow.com/questions/712791
62
try:
8✔
63
    import simplejson as jsonlib
8✔
64
except ImportError:  # pragma: no cover
65
    import json as jsonlib  # type: ignore[no-redef]
66

67
try:
8✔
68
    from xml.parsers.expat import ExpatError
8✔
69

70
    import xmltodict
8✔
71
except ImportError as e:  # pragma: no cover
72
    xmltodict = str(e)  # type: ignore[no-redef,assignment]
73

74
# https://stackoverflow.com/questions/39740632
75
if TYPE_CHECKING:
76
    from webchanges.handler import JobState
77
    from webchanges.storage import _Config
78

79

80
logger = logging.getLogger(__name__)
8✔
81

82
AiGoogleDirectives = TypedDict(
8✔
83
    'AiGoogleDirectives',
84
    {
85
        'model': str,
86
        'additions_only': str,
87
        'system_instructions': str,
88
        'prompt': str,
89
        'prompt_ud_context_lines': int,
90
        'timeout': int,
91
        'max_output_tokens': int | None,
92
        'temperature': float | None,
93
        'top_p': float | None,
94
        'top_k': float | None,
95
        'tools': list[Any],
96
    },
97
    total=False,
98
)
99

100

101
class DifferBase(metaclass=TrackSubClasses):
8✔
102
    """The base class for differs."""
103

104
    __subclasses__: dict[str, type[DifferBase]] = {}
8✔
105
    __anonymous_subclasses__: list[type[DifferBase]] = []
8✔
106

107
    __kind__: str = ''
8✔
108

109
    __supported_directives__: dict[str, str] = {}  # this must be present, even if empty
8✔
110

111
    css_added_style = 'background-color:#d1ffd1;color:#082b08;'
8✔
112
    css_deltd_style = 'background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;'
8✔
113

114
    def __init__(self, state: JobState) -> None:
8✔
115
        """
116

117
        :param state: the JobState.
118
        """
119
        self.job = state.job
8✔
120
        self.state = state
8✔
121

122
    @classmethod
8✔
123
    def differ_documentation(cls) -> str:
8✔
124
        """Generates simple differ documentation for use in the --features command line argument.
125

126
        :returns: A string to display.
127
        """
128
        result: list[str] = []
8✔
129
        for sc in TrackSubClasses.sorted_by_kind(cls):
8✔
130
            # default_directive = getattr(sc, '__default_directive__', None)
131
            result.extend((f'  * {sc.__kind__} - {sc.__doc__}',))
8✔
132
            if hasattr(sc, '__supported_directives__'):
8!
133
                for key, doc in sc.__supported_directives__.items():
8✔
134
                    result.append(f'      {key} ... {doc}')
8✔
135
        result.append('\n[] ... Parameter can be supplied as unnamed value\n')
8✔
136
        return '\n'.join(result)
8✔
137

138
    @classmethod
8✔
139
    def normalize_differ(
8✔
140
        cls,
141
        differ_spec: dict[str, Any] | None,
142
        job_index_number: int | None = None,
143
        config: _Config | None = None,
144
    ) -> tuple[str, dict[str, Any]]:
145
        """Checks the differ_spec for its validity and applies default values.
146

147
        :param differ_spec: The differ as entered by the user; use "unified" if empty.
148
        :param job_index_number: The job index number.
149
        :returns: A validated differ_kind, directives tuple.
150
        """
151

152
        def directives_with_defaults(
8✔
153
            differ_spec: str, directives: dict[str, Any], config: _Config | None = None
154
        ) -> dict[str, Any]:
155
            """Obtain differ subdirectives that also contains defaults from the configuration.
156

157
            :param differ_spec: The differ as entered by the user; use "unified" if empty.
158
            :param directives: The differ directives as stated in the job.
159
            :param config: The configuration.
160
            :returns: directives inclusive of configuration defaults.
161
            """
162
            if config is None:
8!
163
                logger.info('No configuration object found to look for differ defaults')
8✔
164
                return directives
8✔
165
            cfg = config.get('differ_defaults')
×
166
            if isinstance(cfg, dict):
×
167
                defaults: dict[str, Any] = cfg.get(differ_spec)  # type: ignore[assignment]
×
168
                if defaults:
×
169
                    for key, value in defaults.items():
×
UNCOV
170
                        if key not in directives:
×
UNCOV
171
                            directives[key] = value
×
UNCOV
172
            return directives
×
173

174
        differ_spec = differ_spec or {'name': 'unified'}
8✔
175
        directives = differ_spec.copy()
8✔
176
        differ_kind = directives.pop('name', '')
8✔
177
        if not differ_kind:
8✔
178
            if list(directives.keys()) == ['command']:
8!
179
                differ_kind = 'command'
8✔
180
            else:
UNCOV
181
                raise ValueError(
×
182
                    f"Job {job_index_number}: Differ directive must have a 'name' sub-directive: {differ_spec}."
183
                )
184

185
        differcls: DifferBase | None = cls.__subclasses__.get(differ_kind, None)  # type: ignore[assignment]
8✔
186
        if not differcls:
8✔
187
            raise ValueError(f'Job {job_index_number}: No differ named {differ_kind}.')
8✔
188

189
        if directives:
8✔
190
            directives = directives_with_defaults(differ_kind, directives, config)
8✔
191

192
        if hasattr(differcls, '__supported_directives__'):
8!
193
            provided_keys = set(directives.keys())
8✔
194
            allowed_keys = set(differcls.__supported_directives__.keys())
8✔
195
            unknown_keys = provided_keys.difference(allowed_keys)
8✔
196
            if unknown_keys and '<any>' not in allowed_keys:
8✔
197
                raise ValueError(
8✔
198
                    f'Job {job_index_number}: Differ {differ_kind} does not support sub-directive(s) '
199
                    f"{', '.join(unknown_keys)} (supported: {', '.join(sorted(allowed_keys))})."
200
                )
201

202
        return differ_kind, directives
8✔
203

204
    @classmethod
8✔
205
    def process(
8✔
206
        cls,
207
        differ_kind: str,
208
        directives: dict[str, Any],
209
        job_state: JobState,
210
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
211
        tz: ZoneInfo | None = None,
212
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
213
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
214
        """Process the differ.
215

216
        :param differ_kind: The name of the differ.
217
        :param directives: The directives.
218
        :param job_state: The JobState.
219
        :param report_kind: The report kind required.
220
        :param tz: The timezone of the report.
221
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
222
           for a different report_kind.
223
        :returns: The output of the differ or an error message with traceback if it fails.
224
        """
225
        logger.info(f'Job {job_state.job.index_number}: Applying differ {differ_kind}, directives {directives}')
8✔
226
        differcls: type[DifferBase] | None = cls.__subclasses__.get(differ_kind)  # type: ignore[assignment]
8✔
227
        if differcls:
8✔
228
            try:
8✔
229
                return differcls(job_state).differ(directives, report_kind, _unfiltered_diff, tz)
8✔
230
            except Exception as e:
8✔
231
                # Differ failed
232
                logger.info(
8✔
233
                    f'Job {job_state.job.index_number}: Differ {differ_kind} with {directives=} encountered '
234
                    f'error {e}'
235
                )
236
                # Undo saving of new data since user won't see the diff
237
                job_state.delete_latest()
8✔
238

239
                job_state.exception = e
8✔
240
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
241
                directives_text = ', '.join(f'{key}={value}' for key, value in directives.items()) or 'None'
8✔
242
                return {
8✔
243
                    'text': (
244
                        f'Differ {differ_kind} with directive(s) {directives_text} encountered an '
245
                        f'error:\n\n{job_state.traceback}'
246
                    ),
247
                    'markdown': (
248
                        f'## Differ {differ_kind} with directive(s) {directives_text} '
249
                        f'encountered an error:\n```\n{job_state.traceback}\n```\n'
250
                    ),
251
                    'html': (
252
                        f'<span style="color:red;font-weight:bold">Differ {differ_kind} with directive(s) '
253
                        f'{directives_text} encountered an error:<br>\n<br>\n'
254
                        f'<span style="font-family:monospace;white-space:pre-wrap;">{job_state.traceback}'
255
                        f'</span></span>'
256
                    ),
257
                }
258
        else:
259
            return {}
8✔
260

261
    def differ(
8✔
262
        self,
263
        directives: dict[str, Any],
264
        report_kind: Literal['text', 'markdown', 'html'],
265
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
266
        tz: ZoneInfo | None = None,
267
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
268
        """Generate a formatted diff representation of data changes.
269

270
        Creates a diff representation in one or more output formats (text, markdown, or HTML).
271
        At minimum, this function must return output in the format specified by 'report_kind'.
272
        As results are memoized for performance optimization, it can generate up to all three formats simultaneously.
273

274
        :param state: The JobState.
275

276
        :param directives: The directives.
277
        :param report_kind: The report_kind for which a diff must be generated (at a minimum).
278
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
279
           for a different report_kind.
280
        :param tz: The timezone of the report.
281
        :returns: An empty dict if there is no change, otherwise a dict with report_kind as key and diff as value
282
           (as a minimum for the report_kind requested).
283
        :raises RuntimeError: If the external diff tool returns an error.
284
        """
285
        raise NotImplementedError()
286

287
    @staticmethod
8✔
288
    def make_timestamp(
8✔
289
        timestamp: float,
290
        tz: ZoneInfo | None = None,
291
    ) -> str:
292
        """Format a timestamp as an RFC 5322 compliant datetime string.
293

294
        Converts a numeric timestamp to a formatted datetime string following the RFC 5322 (email) standard. When a
295
        timezone is provided, its full name, if known, is appended.
296

297
        :param timestamp: The timestamp.
298
        :param tz: The IANA timezone of the report.
299
        :returns: A datetime string in RFC 5322 (email) format or 'NEW' if timestamp is 0.
300
        """
301
        if timestamp:
8✔
302
            dt = datetime.fromtimestamp(timestamp).astimezone(tz=tz)
8✔
303
            # add timezone name if known
304
            if dt.strftime('%Z') != dt.strftime('%z')[:3]:
8✔
305
                cfws = f" ({dt.strftime('%Z')})"
8✔
306
            else:
307
                cfws = ''
8✔
308
            return dt.strftime('%a, %d %b %Y %H:%M:%S %z') + cfws
8✔
309
        else:
310
            return 'NEW'
8✔
311

312
    @staticmethod
8✔
313
    def html2text(data: str) -> str:
8✔
314
        """Converts html to text.
315

316
        :param data: the string in html format.
317
        :returns: the string in text format.
318
        """
319
        parser = html2text.HTML2Text()
8✔
320
        parser.unicode_snob = True
8✔
321
        parser.body_width = 0
8✔
322
        parser.ignore_images = True
8✔
323
        parser.single_line_break = True
8✔
324
        parser.wrap_links = False
8✔
325
        return '\n'.join(line.rstrip() for line in parser.handle(data).splitlines())
8✔
326

327
    def raise_import_error(self, package_name: str, error_message: str) -> None:
8✔
328
        """Raise ImportError for missing package.
329

330
        :param package_name: The name of the module/package that could not be imported.
331
        :param error_message: The error message from ImportError.
332

333
        :raises: ImportError.
334
        """
335
        raise ImportError(
8✔
336
            f"Job {self.job.index_number}: Python package '{package_name}' is not installed; cannot use "
337
            f"'differ: {self.__kind__}' ({self.job.get_location()})\n{error_message}"
338
        )
339

340

341
class UnifiedDiffer(DifferBase):
8✔
342
    """(Default) Generates a unified diff."""
343

344
    __kind__ = 'unified'
8✔
345

346
    __supported_directives__ = {
8✔
347
        'context_lines': 'the number of context lines (default: 3)',
348
        'range_info': 'include range information lines (default: true)',
349
        'additions_only': 'keep only addition lines (default: false)',
350
        'deletions_only': 'keep only deletion lines (default: false)',
351
    }
352

353
    def unified_diff_to_html(self, diff: str) -> Iterator[str]:
8✔
354
        """
355
        Generates a colorized HTML table from unified diff, applying styles and processing based on job values.
356

357
        :param diff: the unified diff
358
        """
359

360
        def process_line(line: str, line_num: int, is_markdown: bool, monospace_style: str) -> str:
8✔
361
            """
362
            Processes each line for HTML output, handling special cases and styles.
363

364
            :param line: The line to analyze.
365
            :param line_num: The line number in the document.
366
            :param monospace_style: Additional style string for monospace text.
367

368
            :returns: The line processed into an HTML table row string.
369
            """
370
            # The style= string (or empty string) to add to an HTML tag.
371
            if line_num == 0:
8✔
372
                style = 'font-family:monospace;color:darkred;'
8✔
373
            elif line_num == 1:
8✔
374
                style = 'font-family:monospace;color:darkgreen;'
8✔
375
            elif line[0] == '+':  # addition
8✔
376
                style = f'{monospace_style}{self.css_added_style}'
8✔
377
            elif line[0] == '-':  # deletion
8✔
378
                style = f'{monospace_style}{self.css_deltd_style}'
8✔
379
            elif line[0] == ' ':  # context line
8✔
380
                style = monospace_style
8✔
381
            elif line[0] == '@':  # range information
8✔
382
                style = 'font-family:monospace;background-color:#fbfbfb;'
8✔
383
            elif line[0] == '/':  # informational header added by additions_only or deletions_only filters
8!
384
                style = 'background-color:lightyellow;'
8✔
385
            else:
UNCOV
386
                raise RuntimeError('Unified Diff does not comform to standard!')
×
387
            style = f' style="{style}"' if style else ''
8✔
388

389
            if line_num > 1 and line[0] != '@':  # don't apply to headers or range information
8✔
390
                if is_markdown or line[0] == '/':  # our informational header
8✔
391
                    line = mark_to_html(line[1:], self.job.markdown_padded_tables)
8✔
392
                else:
393
                    line = linkify(line[1:])
8✔
394
            return f'<tr><td{style}>{line}</td></tr>'
8✔
395

396
        table_style = (
8✔
397
            ' style="border-collapse:collapse;font-family:monospace;white-space:pre-wrap;"'
398
            if self.job.monospace
399
            else ' style="border-collapse:collapse;"'
400
        )
401
        yield f'<table{table_style}>'
8✔
402
        is_markdown = self.state.is_markdown()
8✔
403
        monospace_style = 'font-family:monospace;' if self.job.monospace else ''
8✔
404
        for i, line in enumerate(diff.splitlines()):
8✔
405
            yield process_line(line, i, is_markdown, monospace_style)
8✔
406
        yield '</table>'
8✔
407

408
    def differ(
8✔
409
        self,
410
        directives: dict[str, Any],
411
        report_kind: Literal['text', 'markdown', 'html'],
412
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
413
        tz: ZoneInfo | None = None,
414
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
415
        additions_only = directives.get('additions_only') or self.job.additions_only
8✔
416
        deletions_only = directives.get('deletions_only') or self.job.deletions_only
8✔
417
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
418
        if report_kind == 'html' and _unfiltered_diff is not None and 'text' in _unfiltered_diff:
8✔
419
            diff_text = _unfiltered_diff['text']
8✔
420
        else:
421
            empty_return: dict[Literal['text', 'markdown', 'html'], str] = {'text': '', 'markdown': '', 'html': ''}
8✔
422
            contextlines = directives.get('context_lines', self.job.contextlines)
8✔
423
            if contextlines is None:
8✔
424
                if additions_only or deletions_only:
8✔
425
                    contextlines = 0
8✔
426
                else:
427
                    contextlines = 3
8✔
428
            diff = list(
8✔
429
                difflib.unified_diff(
430
                    str(self.state.old_data).splitlines(),
431
                    str(self.state.new_data).splitlines(),
432
                    '@',
433
                    '@',
434
                    self.make_timestamp(self.state.old_timestamp, tz),
435
                    self.make_timestamp(self.state.new_timestamp, tz),
436
                    contextlines,
437
                    lineterm='',
438
                )
439
            )
440
            if not diff:
8✔
441
                self.state.verb = 'changed,no_report'
8✔
442
                return empty_return
8✔
443
            # replace tabs in header lines
444
            diff[0] = diff[0].replace('\t', ' ')
8✔
445
            diff[1] = diff[1].replace('\t', ' ')
8✔
446

447
            if additions_only:
8✔
448
                if len(self.state.old_data) and len(self.state.new_data) / len(self.state.old_data) <= 0.25:
8✔
449
                    diff = (
8✔
450
                        diff[:2]
451
                        + ['/**Comparison type: Additions only**']
452
                        + ['/**Deletions are being shown as 75% or more of the content has been deleted**']
453
                        + diff[2:]
454
                    )
455
                else:
456
                    head = '---' + diff[0][3:]
8✔
457
                    diff = [line for line in diff if line.startswith('+') or line.startswith('@')]
8!
458
                    diff = [
8!
459
                        line1
460
                        for line1, line2 in zip([''] + diff, diff + [''])
461
                        if not (line1.startswith('@') and line2.startswith('@'))
462
                    ][1:]
463
                    diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
464
                    if len(diff) == 1 or len([line for line in diff if line.lstrip('+').rstrip()]) == 2:
8!
465
                        self.state.verb = 'changed,no_report'
8✔
466
                        return empty_return
8✔
467
                    diff = [head, diff[0], '/**Comparison type: Additions only**'] + diff[1:]
8✔
468
            elif deletions_only:
8✔
469
                head = '--- @' + diff[1][3:]
8✔
470
                diff = [line for line in diff if line.startswith('-') or line.startswith('@')]
8!
471
                diff = [
8!
472
                    line1
473
                    for line1, line2 in zip([''] + diff, diff + [''])
474
                    if not (line1.startswith('@') and line2.startswith('@'))
475
                ][1:]
476
                diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
477
                if len(diff) == 1 or len([line for line in diff if line.lstrip('-').rstrip()]) == 2:
8!
478
                    self.state.verb = 'changed,no_report'
8✔
479
                    return empty_return
8✔
480
                diff = [diff[0], head, '/**Comparison type: Deletions only**'] + diff[1:]
8✔
481

482
            # remove range info lines if needed
483
            if directives.get('range_info') is False or (
8✔
484
                directives.get('range_info') is None and additions_only and (len(diff) < 4 or diff[3][0] != '/')
485
            ):
486
                diff = [line for line in diff if not line.startswith('@@ ')]
8!
487

488
            diff_text = '\n'.join(diff)
8✔
489

490
            out_diff.update(
8✔
491
                {
492
                    'text': diff_text,
493
                    'markdown': diff_text,
494
                }
495
            )
496

497
        if report_kind == 'html':
8✔
498
            out_diff['html'] = '\n'.join(self.unified_diff_to_html(diff_text))
8✔
499

500
        return out_diff
8✔
501

502

503
class TableDiffer(DifferBase):
8✔
504
    """Generates a Python HTML table diff."""
505

506
    __kind__ = 'table'
8✔
507

508
    __supported_directives__ = {
8✔
509
        'tabsize': 'tab stop spacing (default: 8)',
510
    }
511

512
    def differ(
8✔
513
        self,
514
        directives: dict[str, Any],
515
        report_kind: Literal['text', 'markdown', 'html'],
516
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
517
        tz: ZoneInfo | None = None,
518
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
519
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
520
        if report_kind in {'text', 'markdown'} and _unfiltered_diff is not None and 'html' in _unfiltered_diff:
8✔
521
            table = _unfiltered_diff['html']
8✔
522
        else:
523
            tabsize = int(directives.get('tabsize', 8))
8✔
524
            html_diff = difflib.HtmlDiff(tabsize=tabsize)
8✔
525
            table = html_diff.make_table(
8✔
526
                str(self.state.old_data).splitlines(keepends=True),
527
                str(self.state.new_data).splitlines(keepends=True),
528
                self.make_timestamp(self.state.old_timestamp, tz),
529
                self.make_timestamp(self.state.new_timestamp, tz),
530
                True,
531
                3,
532
            )
533
            # fix table formatting
534
            table = table.replace('<th ', '<th style="font-family:monospace" ')
8✔
535
            table = table.replace('<td ', '<td style="font-family:monospace" ')
8✔
536
            table = table.replace(' nowrap="nowrap"', '')
8✔
537
            table = table.replace('<a ', '<a style="font-family:monospace;color:inherit" ')
8✔
538
            table = table.replace('<span class="diff_add"', '<span style="color:green;background-color:lightgreen"')
8✔
539
            table = table.replace('<span class="diff_sub"', '<span style="color:red;background-color:lightred"')
8✔
540
            table = table.replace('<span class="diff_chg"', '<span style="color:orange;background-color:lightyellow"')
8✔
541
            out_diff['html'] = table
8✔
542

543
        if report_kind in {'text', 'markdown'}:
8✔
544
            diff_text = self.html2text(table)
8✔
545
            out_diff.update(
8✔
546
                {
547
                    'text': diff_text,
548
                    'markdown': diff_text,
549
                }
550
            )
551

552
        return out_diff
8✔
553

554

555
class CommandDiffer(DifferBase):
8✔
556
    """Runs an external command to generate the diff."""
557

558
    __kind__ = 'command'
8✔
559

560
    __supported_directives__ = {
8✔
561
        'command': 'The command to execute',
562
        'is_html': 'Whether the output of the command is HTML',
563
    }
564

565
    re_ptags = re.compile(r'^<p>|</p>$')
8✔
566
    re_htags = re.compile(r'<(/?)h\d>')
8✔
567
    re_tagend = re.compile(r'<(?!.*<).*>+$')
8✔
568

569
    def differ(
8✔
570
        self,
571
        directives: dict[str, Any],
572
        report_kind: Literal['text', 'markdown', 'html'],
573
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
574
        tz: ZoneInfo | None = None,
575
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
576
        if self.job.monospace:
8!
UNCOV
577
            head_html = '\n'.join(
×
578
                [
579
                    '<span style="font-family:monospace;white-space:pre-wrap;">',
580
                    # f"Using command differ: {directives['command']}",
581
                    f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
582
                    f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>',
583
                ]
584
            )
585
        else:
586
            head_html = '<br>\n'.join(
8✔
587
                [
588
                    '<span style="font-family:monospace;">',
589
                    # f"Using command differ: {directives['command']}",
590
                    f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
591
                    f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>',
592
                    '</span>',
593
                ]
594
            )
595

596
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
597
        command = directives['command']
8✔
598
        if report_kind == 'html' and _unfiltered_diff is not None and 'text' in _unfiltered_diff:
8✔
599
            diff_text = ''.join(_unfiltered_diff['text'].splitlines(keepends=True)[2:])
8✔
600
        else:
601
            old_data = self.state.old_data
8✔
602
            new_data = self.state.new_data
8✔
603
            if self.state.is_markdown():
8✔
604
                # protect the link anchor from being split (won't work)
605
                markdown_links_re = re.compile(r'\[(.*?)][(](.*?)[)]')
8✔
606
                old_data = markdown_links_re.sub(
8!
607
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(old_data)
608
                )
609
                new_data = markdown_links_re.sub(
8!
610
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(new_data)
611
                )
612

613
            # External diff tool
614
            with tempfile.TemporaryDirectory() as tmp_dir:
8✔
615
                tmp_path = Path(tmp_dir)
8✔
616
                old_file_path = tmp_path.joinpath('old_file')
8✔
617
                new_file_path = tmp_path.joinpath('new_file')
8✔
618
                if isinstance(old_data, str):
8!
619
                    old_file_path.write_text(old_data)
8✔
620
                else:
UNCOV
621
                    old_file_path.write_bytes(old_data)
×
622
                if isinstance(new_data, str):
8!
623
                    new_file_path.write_text(new_data)
8✔
624
                else:
UNCOV
625
                    new_file_path.write_bytes(new_data)
×
626
                cmdline = shlex.split(command) + [str(old_file_path), str(new_file_path)]
8✔
627
                proc = subprocess.run(cmdline, capture_output=True, text=True)  # noqa: S603 subprocess call
8✔
628
            if proc.stderr or proc.returncode > 1:
8✔
629
                raise RuntimeError(
8✔
630
                    f"Job {self.job.index_number}: External differ '{directives}' returned '{proc.stderr.strip()}' "
631
                    f'({self.job.get_location()})'
632
                ) from subprocess.CalledProcessError(proc.returncode, cmdline)
633
            if proc.returncode == 0:
8✔
634
                self.state.verb = 'changed,no_report'
8✔
635
                logger.info(
8✔
636
                    f"Job {self.job.index_number}: Command in differ 'command' returned 0 (no report) "
637
                    f'({self.job.get_location()})'
638
                )
639
                return {'text': '', 'markdown': '', 'html': ''}
8✔
640
            head_text = '\n'.join(
8✔
641
                [
642
                    # f"Using command differ: {directives['command']}",
643
                    f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m',
644
                    f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m',
645
                    '',
646
                ]
647
            )
648
            diff = proc.stdout
8✔
649
            if self.state.is_markdown():
8!
650
                # undo the protection of the link anchor from being split
651
                diff = markdown_links_re.sub(lambda x: f'[{urllib.parse.unquote(x.group(1))}]({x.group(2)})', diff)
8!
652
            if command.startswith('wdiff') and self.job.contextlines == 0:
8!
653
                # remove lines that don't have any changes
654
                keeplines = []
×
655
                for line in diff.splitlines(keepends=True):
×
UNCOV
656
                    if any(x in line for x in {'{+', '+}', '[-', '-]'}):
×
657
                        keeplines.append(line)
×
658
                diff = ''.join(keeplines)
×
659
            if directives.get('is_html'):
8!
UNCOV
660
                diff_text = self.html2text(diff)
×
UNCOV
661
                out_diff.update(
×
662
                    {
663
                        'text': head_text + diff_text,
664
                        'markdown': head_text + diff_text,
665
                        'html': head_html + diff,
666
                    }
667
                )
668
            else:
669
                diff_text = diff
8✔
670
                out_diff.update(
8✔
671
                    {
672
                        'text': head_text + diff_text,
673
                        'markdown': head_text + diff_text,
674
                    }
675
                )
676

677
        if report_kind == 'html' and 'html' not in out_diff:
8✔
678
            if command.startswith('wdiff'):
8!
679
                # colorize output of wdiff
UNCOV
680
                out_diff['html'] = head_html + self.wdiff_to_html(diff_text)
×
681
            else:
682
                out_diff['html'] = head_html + html.escape(diff_text)
8✔
683

684
        if self.job.monospace and 'html' in out_diff:
8!
UNCOV
685
            out_diff['html'] += '</span>'
×
686

687
        return out_diff
8✔
688

689
    def wdiff_to_html(self, diff: str) -> str:
8✔
690
        """
691
        Colorize output of wdiff.
692

693
        :param diff: The output of the wdiff command.
694
        :returns: The colorized HTML output.
695
        """
696
        html_diff = html.escape(diff)
8✔
697
        if self.state.is_markdown():
8✔
698
            # detect and fix multiline additions or deletions
699
            is_add = False
8✔
700
            is_del = False
8✔
701
            new_diff = []
8✔
702
            for line in html_diff.splitlines():
8✔
703
                if is_add:
8✔
704
                    line = '{+' + line
8✔
705
                    is_add = False
8✔
706
                elif is_del:
8✔
707
                    line = '[-' + line
8✔
708
                    is_del = False
8✔
709
                for match in re.findall(r'\[-|-]|{\+|\+}', line):
8✔
710
                    if match == '[-':
8✔
711
                        is_del = True
8✔
712
                    if match == '-]':
8✔
713
                        is_del = False
8✔
714
                    if match == '{+':
8✔
715
                        is_add = True
8✔
716
                    if match == '+}':
8✔
717
                        is_add = False
8✔
718
                if is_add:
8✔
719
                    line += '+}'
8✔
720
                elif is_del:
8✔
721
                    line += '-]'
8✔
722
                new_diff.append(line)
8✔
723
            html_diff = '<br>\n'.join(new_diff)
8✔
724

725
        # wdiff colorization (cannot be done with global CSS class as Gmail overrides it)
726
        html_diff = re.sub(
8✔
727
            r'\{\+(.*?)\+}',
728
            lambda x: f'<span style="{self.css_added_style}">{x.group(1)}</span>',
729
            html_diff,
730
            flags=re.DOTALL,
731
        )
732
        html_diff = re.sub(
8✔
733
            r'\[-(.*?)-]',
734
            lambda x: f'<span style="{self.css_deltd_style}">{x.group(1)}</span>',
735
            html_diff,
736
            flags=re.DOTALL,
737
        )
738
        if self.job.monospace:
8✔
739
            return f'<span style="font-family:monospace;white-space:pre-wrap">{html_diff}</span>'
8✔
740
        else:
741
            return html_diff
8✔
742

743

744
class DeepdiffDiffer(DifferBase):
8✔
745

746
    __kind__ = 'deepdiff'
8✔
747

748
    __supported_directives__ = {
8✔
749
        'data_type': "either 'json' (default), 'yaml', or 'xml'",
750
        'ignore_order': 'Whether to ignore the order in which the items have appeared (default: false)',
751
        'ignore_string_case': 'Whether to be case-sensitive or not when comparing strings (default: false)',
752
        'significant_digits': (
753
            'The number of digits AFTER the decimal point to be used in the comparison (default: ' 'no limit)'
754
        ),
755
    }
756

757
    def differ(
8✔
758
        self,
759
        directives: dict[str, Any],
760
        report_kind: Literal['text', 'markdown', 'html'],
761
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
762
        tz: ZoneInfo | None = None,
763
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
764
        if isinstance(DeepDiff, str):  # pragma: no cover
765
            self.raise_import_error('deepdiff', DeepDiff)
766

767
        span_added = f'<span style="{self.css_added_style}">'
8✔
768
        span_deltd = f'<span style="{self.css_deltd_style}">'
8✔
769

770
        def _pretty_deepdiff(ddiff: DeepDiff, report_kind: Literal['text', 'markdown', 'html']) -> str:
8✔
771
            """
772
            Customized version of deepdiff.serialization.SerializationMixin.pretty method, edited to include the
773
            values deleted or added and an option for colorized HTML output. The pretty human-readable string
774
            output for the diff object regardless of what view was used to generate the diff.
775
            """
776
            if report_kind == 'html':
8✔
777
                PRETTY_FORM_TEXTS = {
8✔
778
                    'type_changes': (
779
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
780
                        f'from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}</span>.'
781
                    ),
782
                    'values_changed': (
783
                        f'Value of {{diff_path}} changed from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}'
784
                        '</span>.'
785
                    ),
786
                    'dictionary_item_added': (
787
                        f'Item {{diff_path}} added to dictionary as {span_added}{{val_t2}}</span>.'
788
                    ),
789
                    'dictionary_item_removed': (
790
                        f'Item {{diff_path}} removed from dictionary (was {span_deltd}{{val_t1}}</span>).'
791
                    ),
792
                    'iterable_item_added': f'Item {{diff_path}} added to iterable as {span_added}{{val_t2}}</span>.',
793
                    'iterable_item_removed': (
794
                        f'Item {{diff_path}} removed from iterable (was {span_deltd}{{val_t1}}</span>).'
795
                    ),
796
                    'attribute_added': f'Attribute {{diff_path}} added as {span_added}{{val_t2}}</span>.',
797
                    'attribute_removed': f'Attribute {{diff_path}} removed (was {span_deltd}{{val_t1}}</span>).',
798
                    'set_item_added': f'Item root[{{val_t2}}] added to set as {span_added}{{val_t1}}</span>.',
799
                    'set_item_removed': (
800
                        f'Item root[{{val_t1}}] removed from set (was {span_deltd}{{val_t2}}</span>).'
801
                    ),
802
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
803
                }
804
            else:
805
                PRETTY_FORM_TEXTS = {
8✔
806
                    'type_changes': (
807
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
808
                        'from {val_t1} to {val_t2}.'
809
                    ),
810
                    'values_changed': 'Value of {diff_path} changed from {val_t1} to {val_t2}.',
811
                    'dictionary_item_added': 'Item {diff_path} added to dictionary as {val_t2}.',
812
                    'dictionary_item_removed': 'Item {diff_path} removed from dictionary (was {val_t1}).',
813
                    'iterable_item_added': 'Item {diff_path} added to iterable as {val_t2}.',
814
                    'iterable_item_removed': 'Item {diff_path} removed from iterable (was {val_t1}).',
815
                    'attribute_added': 'Attribute {diff_path} added as {val_t2}.',
816
                    'attribute_removed': 'Attribute {diff_path} removed (was {val_t1}).',
817
                    'set_item_added': 'Item root[{val_t2}] added to set as {val_t1}.',
818
                    'set_item_removed': 'Item root[{val_t1}] removed from set (was {val_t2}).',
819
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
820
                }
821

822
            def _pretty_print_diff(ddiff: DiffLevel) -> str:
8✔
823
                """
824
                Customized version of deepdiff.serialization.pretty_print_diff() function, edited to include the
825
                values deleted or added.
826
                """
827
                type_t1 = type(ddiff.t1).__name__
8✔
828
                type_t2 = type(ddiff.t2).__name__
8✔
829

830
                val_t1 = (
8✔
831
                    f'"{ddiff.t1}"'
832
                    if type_t1 in {'str', 'int', 'float'}
833
                    else (
834
                        jsonlib.dumps(ddiff.t1, ensure_ascii=False, indent=2)
835
                        if type_t1 in {'dict', 'list'}
836
                        else str(ddiff.t1)
837
                    )
838
                )
839
                val_t2 = (
8✔
840
                    f'"{ddiff.t2}"'
841
                    if type_t2 in {'str', 'int', 'float'}
842
                    else (
843
                        jsonlib.dumps(ddiff.t2, ensure_ascii=False, indent=2)
844
                        if type_t2 in {'dict', 'list'}
845
                        else str(ddiff.t2)
846
                    )
847
                )
848

849
                diff_path = ddiff.path()  # type: ignore[no-untyped-call]
8✔
850
                return '• ' + PRETTY_FORM_TEXTS.get(ddiff.report_type, '').format(
8✔
851
                    diff_path=diff_path,
852
                    type_t1=type_t1,
853
                    type_t2=type_t2,
854
                    val_t1=val_t1,
855
                    val_t2=val_t2,
856
                )
857

858
            result = []
8✔
859
            for key in ddiff.tree.keys():
8✔
860
                for item_key in ddiff.tree[key]:
8✔
861
                    result.append(_pretty_print_diff(item_key))
8✔
862

863
            return '\n'.join(result)
8✔
864

865
        if directives.get('data_type'):
8✔
866
            old_data_type = directives['data_type']
8✔
867
            new_data_type = directives['data_type']
8✔
868
        else:
869
            if self.state.old_mime_type:
8!
870
                media_subtype = self.state.old_mime_type.split('/')[-1].split('+')[-1].split('x-')[-1]
8✔
871
                if media_subtype in ('yaml', 'yml'):
8!
UNCOV
872
                    old_data_type = 'yaml'
×
873
                elif media_subtype == 'xml':
8✔
874
                    old_data_type = 'xml'
8✔
875
                elif media_subtype == 'json':
8!
UNCOV
876
                    old_data_type = 'json'
×
877
                else:
878
                    logger.info(
8✔
879
                        f'Differ {self.__kind__} could not determine data type of old data from media type '
880
                        f"{self.state.old_mime_type}; defaulting to 'json'"
881
                    )
882
                    old_data_type = 'json'
8✔
883
            else:
UNCOV
884
                logger.info(
×
885
                    f"Differ {self.__kind__} data_type for old data defaulted to 'json' as media type is missing"
886
                )
UNCOV
887
                old_data_type = 'json'
×
888
            if self.state.new_mime_type:
8✔
889
                media_subtype = self.state.new_mime_type.split('/')[-1].split('+')[-1].split('x-')[-1]
8✔
890
                if media_subtype in ('yaml', 'yml'):
8!
891
                    new_data_type = 'yaml'
8✔
UNCOV
892
                elif media_subtype == 'xml':
×
UNCOV
893
                    new_data_type = 'xml'
×
UNCOV
894
                elif media_subtype == 'json':
×
UNCOV
895
                    new_data_type = 'json'
×
896
                else:
UNCOV
897
                    logger.info(
×
898
                        f'Differ {self.__kind__} could not determine data type of new data from media type '
899
                        f"{self.state.new_mime_type}; defaulting to 'json'"
900
                    )
UNCOV
901
                    new_data_type = 'json'
×
902
            else:
903
                logger.info(
8✔
904
                    f"Differ {self.__kind__} data_type for new data defaulted to 'json' as media type is missing"
905
                )
906
                new_data_type = 'json'
8✔
907

908
        old_data: Any = ''
8✔
909
        if old_data_type == 'json':
8✔
910
            try:
8✔
911
                old_data = jsonlib.loads(self.state.old_data)
8✔
912
            except jsonlib.JSONDecodeError:
8✔
913
                pass
8✔
914
        elif old_data_type == 'yaml':
8!
UNCOV
915
            try:
×
UNCOV
916
                old_data = yaml.safe_load(self.state.old_data)
×
UNCOV
917
            except yaml.YAMLError:
×
UNCOV
918
                pass
×
919
        elif old_data_type == 'xml':
8✔
920
            if isinstance(xmltodict, str):  # pragma: no cover
921
                self.raise_import_error('xmltodict', xmltodict)
922
            try:
8✔
923
                old_data = xmltodict.parse(self.state.old_data)
8✔
UNCOV
924
            except ExpatError:
×
UNCOV
925
                pass
×
926

927
        new_data: Any = ''
8✔
928
        if new_data_type == 'json':
8✔
929
            try:
8✔
930
                new_data = jsonlib.loads(self.state.new_data)
8✔
931
            except jsonlib.JSONDecodeError as e:
8✔
932
                self.state.exception = e
8✔
933
                self.state.traceback = self.job.format_error(e, traceback.format_exc())
8✔
934
                logger.error(f'Job {self.job.index_number}: New data is invalid JSON: {e} ({self.job.get_location()})')
8✔
935
                logger.info(f'Job {self.job.index_number}: {self.state.new_data!r}')
8✔
936
                return {
8✔
937
                    'text': f'Differ {self.__kind__} ERROR: New data is invalid JSON\n{e}',
938
                    'markdown': f'Differ {self.__kind__} **ERROR: New data is invalid JSON**\n{e}',
939
                    'html': f'Differ {self.__kind__} <b>ERROR: New data is invalid JSON</b>\n{e}',
940
                }
941
        elif new_data_type == 'yaml':
8✔
942
            try:
8✔
943
                new_data = yaml.safe_load(self.state.new_data)
8✔
UNCOV
944
            except yaml.YAMLError as e:
×
UNCOV
945
                self.state.exception = e
×
UNCOV
946
                self.state.traceback = self.job.format_error(e, traceback.format_exc())
×
UNCOV
947
                logger.error(f'Job {self.job.index_number}: New data is invalid YAML: {e} ({self.job.get_location()})')
×
UNCOV
948
                logger.info(f'Job {self.job.index_number}: {self.state.new_data!r}')
×
UNCOV
949
                return {
×
950
                    'text': f'Differ {self.__kind__} ERROR: New data is invalid YAML\n{e}',
951
                    'markdown': f'Differ {self.__kind__} **ERROR: New data is invalid YAML**\n{e}',
952
                    'html': f'Differ {self.__kind__} <b>ERROR: New data is invalid YAML</b>\n{e}',
953
                }
954
        elif new_data_type == 'xml':
8✔
955
            if isinstance(xmltodict, str):  # pragma: no cover
956
                self.raise_import_error('xmltodict', xmltodict)
957
            try:
8✔
958
                new_data = xmltodict.parse(self.state.new_data)
8✔
UNCOV
959
            except ExpatError as e:
×
UNCOV
960
                self.state.exception = e
×
UNCOV
961
                self.state.traceback = self.job.format_error(e, traceback.format_exc())
×
UNCOV
962
                logger.error(f'Job {self.job.index_number}: New data is invalid XML: {e} ({self.job.get_location()})')
×
UNCOV
963
                logger.info(f'Job {self.job.index_number}: {self.state.new_data!r}')
×
UNCOV
964
                return {
×
965
                    'text': f'Differ {self.__kind__} ERROR: New data is invalid XML\n{e}',
966
                    'markdown': f'Differ {self.__kind__} **ERROR: New data is invalid XML**\n{e}',
967
                    'html': f'Differ {self.__kind__} <b>ERROR: New data is invalid XML</b>\n{e}',
968
                }
969

970
        ignore_order: bool = directives.get('ignore_order')  # type: ignore[assignment]
8✔
971
        ignore_string_case: bool = directives.get('ignore_string_case')  # type: ignore[assignment]
8✔
972
        significant_digits = directives.get('significant_digits')
8✔
973
        ddiff = DeepDiff(
8✔
974
            old_data,
975
            new_data,
976
            cache_size=500,
977
            cache_purge_level=0,
978
            cache_tuning_sample_size=500,
979
            ignore_order=ignore_order,
980
            ignore_string_type_changes=True,
981
            ignore_numeric_type_changes=True,
982
            ignore_string_case=ignore_string_case,
983
            significant_digits=significant_digits,
984
            verbose_level=min(2, max(0, math.ceil(3 - logger.getEffectiveLevel() / 10))),
985
        )
986
        diff_text = _pretty_deepdiff(ddiff, report_kind)
8✔
987
        if not diff_text:
8✔
988
            self.state.verb = 'changed,no_report'
8✔
989
            return {'text': '', 'markdown': '', 'html': ''}
8✔
990

991
        self.job.set_to_monospace()
8✔
992
        if report_kind == 'html':
8✔
993
            html_diff = (
8✔
994
                f'<span style="font-family:monospace;white-space:pre-wrap;">'
995
                # f'Differ: {self.__kind__} for {data_type}\n'
996
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>\n'
997
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>\n'
998
                + diff_text[:-1].replace('][', ']<wbr>[')
999
                + '</span>'
1000
            )
1001
            return {'html': html_diff}
8✔
1002
        else:
1003
            text_diff = (
8✔
1004
                # f'Differ: {self.__kind__} for {data_type}\n'
1005
                f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\n'
1006
                f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\n'
1007
                f'{diff_text}'
1008
            )
1009
            return {'text': text_diff, 'markdown': text_diff}
8✔
1010

1011

1012
class ImageDiffer(DifferBase):
8✔
1013
    """Compares two images providing an image outlining areas that have changed."""
1014

1015
    __kind__ = 'image'
8✔
1016

1017
    __supported_directives__ = {
8✔
1018
        'data_type': (
1019
            "'url' (to retrieve an image), 'ascii85' (Ascii85 data), 'base64' (Base64 data) or 'filename' (the path "
1020
            "to an image file) (default: 'url')"
1021
        ),
1022
        'mse_threshold': (
1023
            'the minimum mean squared error (MSE) between two images to consider them changed, if numpy in installed '
1024
            '(default: 2.5)'
1025
        ),
1026
        'ai_google': 'Generative AI summary of changes (BETA)',
1027
    }
1028

1029
    def differ(
8✔
1030
        self,
1031
        directives: dict[str, Any],
1032
        report_kind: Literal['text', 'markdown', 'html'],
1033
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1034
        tz: ZoneInfo | None = None,
1035
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1036
        warnings.warn(
2✔
1037
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1038
            f'change in the future. Please report any problems or suggestions at '
1039
            f'https://github.com/mborsetti/webchanges/discussions.',
1040
            RuntimeWarning,
1041
        )
1042
        if isinstance(Image, str):  # pragma: no cover
1043
            self.raise_import_error('pillow', Image)
1044
        if isinstance(httpx, str):  # pragma: no cover
1045
            self.raise_import_error('httpx', httpx)
1046

1047
        def load_image_from_web(url: str) -> Image.Image:
2✔
1048
            """Fetches the image from an url."""
1049
            logging.debug(f'Retrieving image from {url}')
2✔
1050
            with httpx.stream('GET', url, timeout=10) as response:
2✔
1051
                response.raise_for_status()
2✔
1052
                return Image.open(BytesIO(b''.join(response.iter_bytes())))
2✔
1053

1054
        def load_image_from_file(filename: str) -> Image.Image:
2✔
1055
            """Load an image from a file."""
1056
            logging.debug(f'Reading image from {filename}')
2✔
1057
            return Image.open(filename)
2✔
1058

1059
        def load_image_from_base64(base_64: str) -> Image.Image:
2✔
1060
            """Load an image from an encoded bytes object."""
1061
            logging.debug('Retrieving image from a base64 string')
2✔
1062
            return Image.open(BytesIO(base64.b64decode(base_64)))
2✔
1063

1064
        def load_image_from_ascii85(ascii85: str) -> Image.Image:
2✔
1065
            """Load an image from an encoded bytes object."""
1066
            logging.debug('Retrieving image from an ascii85 string')
2✔
1067
            return Image.open(BytesIO(base64.a85decode(ascii85)))
2✔
1068

1069
        def compute_diff_image(img1: Image.Image, img2: Image.Image) -> tuple[Image.Image, np.float64]:
2✔
1070
            """Compute the difference between two images."""
1071
            # Compute the absolute value of the pixel-by-pixel difference between the two images.
1072
            diff_image = ImageChops.difference(img1, img2)
2✔
1073

1074
            # Compute the mean squared error between the images
1075
            if not isinstance(np, str):
2✔
1076
                diff_array = np.array(diff_image)
2✔
1077
                mse_value = np.mean(np.square(diff_array))
2✔
1078
            else:  # pragma: no cover
1079
                mse_value = None
1080

1081
            # Create the diff image by overlaying this difference on a darkened greyscale background
1082
            back_image = img1.convert('L')
2✔
1083
            back_image_brightness = ImageStat.Stat(back_image).rms[0]
2✔
1084
            back_image = ImageEnhance.Brightness(back_image).enhance(back_image_brightness / 225)
2✔
1085

1086
            # Convert the 'L' image to 'RGB' using a matrix that applies to yellow tint
1087
            # The matrix has 12 elements: 4 for Red, 4 for Green, and 4 for Blue.
1088
            # For yellow, we want Red and Green to copy the L values (1.0) and Blue to be zero.
1089
            # The matrix is: [R, G, B, A] for each of the three output channels
1090
            yellow_tint_matrix = (
2✔
1091
                1.0,
1092
                0.0,
1093
                0.0,
1094
                0.0,  # Red = 100% of the grayscale value
1095
                1.0,
1096
                0.0,
1097
                0.0,
1098
                0.0,  # Green = 100% of the grayscale value
1099
                0.0,
1100
                0.0,
1101
                0.0,
1102
                0.0,  # Blue = 0% of the grayscale value
1103
            )
1104

1105
            # Apply the conversion
1106
            diff_colored = diff_image.convert('RGB').convert('RGB', matrix=yellow_tint_matrix)
2✔
1107

1108
            final_img = ImageChops.add(back_image.convert('RGB'), diff_colored)
2✔
1109
            final_img.format = img2.format
2✔
1110

1111
            return final_img, mse_value
2✔
1112

1113
        def ai_google(
2✔
1114
            old_image: Image.Image,
1115
            new_image: Image.Image,
1116
            diff_image: Image.Image,
1117
            directives: AiGoogleDirectives,
1118
        ) -> str:
1119
            """Summarize changes in image using Generative AI (ALPHA)."""
1120
            logger.info(f'Job {self.job.index_number}: Running ai_google for {self.__kind__} differ')
×
1121
            warnings.warn(
×
1122
                f'Job {self.job.index_number}: Using ai_google in differ {self.__kind__}, which is ALPHA, '
1123
                f'may have bugs, and may change in the future. Please report any problems or suggestions at '
1124
                f'https://github.com/mborsetti/webchanges/discussions.',
1125
                RuntimeWarning,
1126
            )
1127

UNCOV
1128
            api_version = '1beta'
×
1129
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
×
1130
            if len(GOOGLE_AI_API_KEY) != 39:
×
UNCOV
1131
                logger.error(
×
1132
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1133
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1134
                )
UNCOV
1135
                return (
×
1136
                    f'## ERROR in summarizing changes using {self.__kind__}:\n'
1137
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1138
                    f'{len(GOOGLE_AI_API_KEY)}.\n'
1139
                )
UNCOV
1140
            client = httpx.Client(http2=True, timeout=self.job.timeout)
×
1141

1142
            def _load_image(img_data: tuple[str, Image.Image]) -> dict[str, dict[str, str] | Exception | str]:
×
UNCOV
1143
                img_name, image = img_data
×
1144
                # Convert image to bytes
UNCOV
1145
                img_byte_arr = io.BytesIO()
×
UNCOV
1146
                image.save(img_byte_arr, format=image.format)
×
1147
                image_data = img_byte_arr.getvalue()
×
UNCOV
1148
                mime_type = f'image/{image.format.lower()}'  # type: ignore[union-attr]
×
1149

UNCOV
1150
                logger.info(
×
1151
                    f'Job {self.job.index_number}: Loading {img_name} ({image.format}) to Google AI '
1152
                    f'({len(image_data) / 1024:,.0f} kbytes)'
1153
                )
1154

1155
                # Initial resumable upload request
UNCOV
1156
                headers = {
×
1157
                    'X-Goog-Upload-Protocol': 'resumable',
1158
                    'X-Goog-Upload-Command': 'start',
1159
                    'X-Goog-Upload-Header-Content-Length': str(len(image_data)),
1160
                    'X-Goog-Upload-Header-Content-Type': mime_type,
1161
                    'Content-Type': 'application/json',
1162
                }
UNCOV
1163
                data = {'file': {'display_name': 'TEXT'}}
×
1164

UNCOV
1165
                try:
×
UNCOV
1166
                    response = client.post(
×
1167
                        f'https://generativelanguage.googleapis.com/upload/v{api_version}/files?'
1168
                        f'key={GOOGLE_AI_API_KEY}',
1169
                        headers=headers,
1170
                        json=data,
1171
                    )
UNCOV
1172
                except httpx.HTTPError as e:
×
UNCOV
1173
                    return {'error': e, 'img_name': img_name}
×
UNCOV
1174
                upload_url = response.headers['X-Goog-Upload-Url']
×
1175

1176
                # Upload the image data
UNCOV
1177
                headers = {
×
1178
                    'Content-Length': str(len(image_data)),
1179
                    'X-Goog-Upload-Offset': '0',
1180
                    'X-Goog-Upload-Command': 'upload, finalize',
1181
                }
1182
                try:
×
UNCOV
1183
                    response = client.post(upload_url, headers=headers, content=image_data)
×
UNCOV
1184
                except httpx.HTTPError as e:
×
UNCOV
1185
                    return {'error': e, 'img_name': img_name}
×
1186

1187
                # Extract file URI from response
UNCOV
1188
                file_info = response.json()
×
UNCOV
1189
                file_uri = file_info['file']['uri']
×
UNCOV
1190
                logger.info(f'Job {self.job.index_number}: {img_name.capitalize()} loaded to {file_uri}')
×
1191

UNCOV
1192
                return {
×
1193
                    'file_data': {
1194
                        'mime_type': mime_type,
1195
                        'file_uri': file_uri,
1196
                    }
1197
                }
1198

1199
            # upload to Google
UNCOV
1200
            additional_parts: list[dict[str, dict[str, str]]] = []
×
UNCOV
1201
            executor = ThreadPoolExecutor()
×
UNCOV
1202
            for additional_part in executor.map(
×
1203
                _load_image,
1204
                (
1205
                    ('old image', old_image),
1206
                    ('new image', new_image),
1207
                    ('differences image', diff_image),
1208
                ),
1209
            ):
UNCOV
1210
                if 'error' not in additional_part:
×
UNCOV
1211
                    additional_parts.append(additional_part)  # type: ignore[arg-type]
×
1212
                else:
UNCOV
1213
                    logger.error(
×
1214
                        f'Job {self.job.index_number}: ai_google for {self.__kind__} HTTP Client error '
1215
                        f"{type(additional_part['error'])} when loading {additional_part['img_name']} to Google AI: "
1216
                        f"{additional_part['error']}"
1217
                    )
UNCOV
1218
                    return (
×
1219
                        f"HTTP Client error {type(additional_part['error'])} when loading "
1220
                        f"{additional_part['img_name']} to Google AI: {additional_part['error']}"
1221
                    )
1222

UNCOV
1223
            system_instructions = (
×
1224
                'You are a skilled journalist tasked with summarizing the key differences between two versions '
1225
                'of the same image. The audience for your summary is already familiar with the image, so you can'
1226
                'focus on the most significant changes.'
1227
            )
UNCOV
1228
            model_prompt = (
×
1229
                'You are a skilled visual analyst tasked with analyzing two versions of an image and summarizing the '
1230
                'key differences between them. The audience for your summary is already familiar with the '
1231
                "image's content, so you should focus only on the most significant differences.\n\n"
1232
                '**Instructions:**\n\n'
1233
                '1. Carefully examine the yellow areas in the image '
1234
                f"{additional_parts[2]['file_data']['file_uri']}, identify the differences, and describe them.\n"
1235
                f"2. Refer to the old version of the image {additional_parts[0]['file_data']['file_uri']} and the new "
1236
                f" version {additional_parts[1]['file_data']['file_uri']}.\n"
1237
                '3. You are only interested in those differences, such as additions, removals, or alterations, that '
1238
                'modify the intended message or interpretation.\n'
1239
                '4. Summarize the identified differences, except those ignored, in a clear and concise manner, '
1240
                'explaining how the meaning has shifted or evolved in the new version compared to the old version only '
1241
                'when necessary. Be specific and provide examples to illustrate your points when needed.\n'
1242
                '5. If there are only additions to the image, then summarize the additions.\n'
1243
                '6. Use Markdown formatting to structure your summary effectively. Use headings, bullet points, '
1244
                'and other Markdown elements as needed to enhance readability.\n'
1245
                '7. Restrict your analysis and summary to the information provided within these images. Do '
1246
                'not introduce external information or assumptions.\n'
1247
            )
UNCOV
1248
            summary, _ = AIGoogleDiffer._send_to_model(
×
1249
                self.job,
1250
                system_instructions,
1251
                model_prompt,
1252
                additional_parts=additional_parts,  # type: ignore[arg-type]
1253
                directives=directives,
1254
            )
1255

UNCOV
1256
            return summary
×
1257

1258
        data_type = directives.get('data_type', 'url')
2✔
1259
        mse_threshold = directives.get('mse_threshold', 2.5)
2✔
1260
        if not isinstance(self.state.old_data, str):
2!
UNCOV
1261
            raise ValueError('old_data is not a string')
×
1262
        if not isinstance(self.state.new_data, str):
2!
UNCOV
1263
            raise ValueError('new_data is not a string')
×
1264
        if data_type == 'url':
2✔
1265
            old_image = load_image_from_web(self.state.old_data)
2✔
1266
            new_image = load_image_from_web(self.state.new_data)
2✔
1267
            old_data = f' (<a href="{self.state.old_data}" target="_blank">Old image</a>)'
2✔
1268
            new_data = f' (<a href="{self.state.new_data}" target="_blank">New image</a>)'
2✔
1269
        elif data_type == 'ascii85':
2✔
1270
            old_image = load_image_from_ascii85(self.state.old_data)
2✔
1271
            new_image = load_image_from_ascii85(self.state.new_data)
2✔
1272
            old_data = ''
2✔
1273
            new_data = ''
2✔
1274
        elif data_type == 'base64':
2✔
1275
            old_image = load_image_from_base64(self.state.old_data)
2✔
1276
            new_image = load_image_from_base64(self.state.new_data)
2✔
1277
            old_data = ''
2✔
1278
            new_data = ''
2✔
1279
        else:  # 'filename'
1280
            old_image = load_image_from_file(self.state.old_data)
2✔
1281
            new_image = load_image_from_file(self.state.new_data)
2✔
1282
            old_data = f' (<a href="file://{self.state.old_data}" target="_blank">Old image</a>)'
2✔
1283
            new_data = f' (<a href="file://{self.state.new_data}" target="_blank">New image</a>)'
2✔
1284

1285
        # Check formats  TODO: is it needed? under which circumstances?
1286
        # if new_image.format != old_image.format:
1287
        #     logger.info(f'Image formats do not match: {old_image.format} vs {new_image.format}')
1288
        # else:
1289
        #     logger.debug(f'image format is {old_image.format}')
1290

1291
        # If needed, shrink the larger image
1292
        if new_image.size != old_image.size:
2✔
1293
            if new_image.size > old_image.size:
2✔
1294
                logging.debug(f'Job {self.job.index_number}: Shrinking the new image')
2✔
1295
                img_format = new_image.format
2✔
1296
                new_image = new_image.resize(old_image.size, Image.Resampling.LANCZOS)
2✔
1297
                new_image.format = img_format
2✔
1298

1299
            else:
1300
                logging.debug(f'Job {self.job.index_number}: Shrinking the old image')
2✔
1301
                img_format = old_image.format
2✔
1302
                old_image = old_image.resize(new_image.size, Image.Resampling.LANCZOS)
2✔
1303
                old_image.format = img_format
2✔
1304

1305
        if old_image == new_image:
2✔
1306
            logger.info(f'Job {self.job.index_number}: New image is identical to the old one')
2✔
1307
            self.state.verb = 'unchanged'
2✔
1308
            return {'text': '', 'markdown': '', 'html': ''}
2✔
1309

1310
        diff_image, mse_value = compute_diff_image(old_image, new_image)
2✔
1311
        if mse_value:
2!
1312
            logger.debug(f'Job {self.job.index_number}: MSE value {mse_value:.2f}')
2✔
1313

1314
        if mse_value and mse_value < mse_threshold:
2✔
1315
            logger.info(
2✔
1316
                f'Job {self.job.index_number}: MSE value {mse_value:.2f} below the threshold of {mse_threshold}; '
1317
                f'considering changes not worthy of a report'
1318
            )
1319
            self.state.verb = 'changed,no_report'
2✔
1320
            return {'text': '', 'markdown': '', 'html': ''}
2✔
1321

1322
        # Convert the difference image to a base64 object
1323
        output_stream = BytesIO()
2✔
1324
        diff_image.save(output_stream, format=diff_image.format)
2✔
1325
        encoded_diff = b64encode(output_stream.getvalue()).decode()
2✔
1326

1327
        # Convert the new image to a base64 object
1328
        output_stream = BytesIO()
2✔
1329
        new_image.save(output_stream, format=new_image.format)
2✔
1330
        encoded_new = b64encode(output_stream.getvalue()).decode()
2✔
1331

1332
        # prepare AI summary
1333
        summary = ''
2✔
1334
        if 'ai_google' in directives:
2!
UNCOV
1335
            summary = ai_google(old_image, new_image, diff_image, directives.get('ai_google', {}))
×
1336

1337
        # Prepare HTML output
1338
        htm = [
2✔
1339
            f'<span style="font-family:monospace">'
1340
            # f'Differ: {self.__kind__} for {data_type}',
1341
            f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}{old_data}</span>',
1342
            f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}{new_data}'
1343
            '</span>',
1344
            '</span>',
1345
            'New image:',
1346
        ]
1347
        if data_type == 'url':
2✔
1348
            htm.append(f'<img src="{self.state.old_data}" style="max-width: 100%; display: block;">')
2✔
1349
        else:
1350
            htm.append(
2✔
1351
                f'<img src="data:image/{(new_image.format or "").lower()};base64,{encoded_new}" '
1352
                'style="max-width: 100%; display: block;">'
1353
            )
1354
        htm.extend(
2✔
1355
            [
1356
                'Differences from old (in yellow):',
1357
                f'<img src="data:image/{(diff_image.format or "").lower()};base64,{encoded_diff}" '
1358
                'style="max-width: 100%; display: block;">',
1359
            ]
1360
        )
1361
        changed_text = 'The image has changed; please see an HTML report for the visualization.'
2✔
1362
        if not summary:
2!
1363
            return {
2✔
1364
                'text': changed_text,
1365
                'markdown': changed_text,
1366
                'html': '<br>\n'.join(htm),
1367
            }
1368

1369
        newline = '\n'  # For Python < 3.12 f-string {} compatibility
×
UNCOV
1370
        back_n = '\\n'  # For Python < 3.12 f-string {} compatibility
×
UNCOV
1371
        directives_text = (
×
1372
            ', '.join(
1373
                f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.get('ai_google', {}).items()
1374
            )
1375
            or 'None'
1376
        )
UNCOV
1377
        footer = f'Summary generated by Google Generative AI (ai_google directive(s): {directives_text})'
×
UNCOV
1378
        return {
×
1379
            'text': (
1380
                f'{summary}\n\n\nA visualization of differences is available in {__package__} HTML reports.'
1381
                f'\n------------\n{footer}'
1382
            ),
1383
            'markdown': (
1384
                f'{summary}\n\n\nA visualization of differences is available in {__package__} HTML reports.'
1385
                f'\n* * *\n{footer}'
1386
            ),
1387
            'html': '<br>\n'.join(
1388
                [
1389
                    mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>'),
1390
                    '',
1391
                ]
1392
                + htm
1393
                + [
1394
                    '-----',
1395
                    f'<i><small>{footer}</small></i>',
1396
                ]
1397
            ),
1398
        }
1399

1400

1401
class AIGoogleDiffer(DifferBase):
8✔
1402
    """(Default) Generates a summary using Google Generative AI (Gemini models).
1403

1404
    Calls Google Gemini APIs; documentation at https://ai.google.dev/api/rest and tutorial at
1405
    https://ai.google.dev/tutorials/rest_quickstart
1406

1407
    """
1408

1409
    __kind__ = 'ai_google'
8✔
1410

1411
    __supported_directives__ = {
8✔
1412
        'model': ('model name from https://ai.google.dev/gemini-api/docs/models/gemini (default: gemini-2.0-flash)'),
1413
        'system_instructions': (
1414
            'Optional tone and style instructions for the model (default: see documentation at'
1415
            'https://webchanges.readthedocs.io/en/stable/differs.html#ai-google-diff)'
1416
        ),
1417
        'prompt': 'a custom prompt - {unified_diff}, {unified_diff_new}, {old_text} and {new_text} will be replaced',
1418
        'additions_only': 'summarizes only added lines (including as a result of a change)',
1419
        'prompt_ud_context_lines': 'the number of context lines for {unified_diff} (default: 9999)',
1420
        'timeout': 'the number of seconds before timing out the API call (default: 300)',
1421
        'max_output_tokens': "the maximum number of tokens returned by the model (default: None, i.e. model's default)",
1422
        'temperature': "the model's Temperature parameter (default: 0.0)",
1423
        'top_p': "the model's TopP parameter (default: None, i.e. model's default",
1424
        'top_k': "the model's TopK parameter (default: None, i.e. model's default",
1425
        'tools': "data passed on to the API's 'tools' field (default: None)",
1426
        'unified': 'directives passed to the unified differ (default: None)',
1427
    }
1428
    __default_directive__ = 'model'
8✔
1429

1430
    @staticmethod
8✔
1431
    def _send_to_model(
8✔
1432
        job: JobBase,
1433
        system_instructions: str,
1434
        model_prompt: str,
1435
        additional_parts: list[dict[str, str | dict[str, str]]] | None = None,
1436
        directives: AiGoogleDirectives | None = None,
1437
    ) -> tuple[str, str]:
1438
        """Creates the summary request to the model; returns the summary and the version of the actual model used."""
UNCOV
1439
        api_version = '1beta'
×
UNCOV
1440
        if directives is None:
×
UNCOV
1441
            directives = {}
×
UNCOV
1442
        model = directives.get('model', 'gemini-2.0-flash')
×
UNCOV
1443
        timeout = directives.get('timeout', 300)
×
UNCOV
1444
        max_output_tokens = directives.get('max_output_tokens')
×
UNCOV
1445
        temperature = directives.get('temperature', 0.0)
×
UNCOV
1446
        top_p = directives.get('top_p', 1.0 if temperature == 0.0 else None)
×
UNCOV
1447
        top_k = directives.get('top_k')
×
UNCOV
1448
        GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
×
UNCOV
1449
        if len(GOOGLE_AI_API_KEY) != 39:
×
UNCOV
1450
            logger.error(
×
1451
                f'Job {job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1452
                f'incorrect length {len(GOOGLE_AI_API_KEY)} ({job.get_location()})'
1453
            )
UNCOV
1454
            return (
×
1455
                f'## ERROR in summarizing changes using Google AI:\n'
1456
                f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1457
                f'{len(GOOGLE_AI_API_KEY)}.\n',
1458
                '',
1459
            )
1460

UNCOV
1461
        data: dict[str, Any] = {
×
1462
            'system_instruction': {'parts': [{'text': system_instructions}]},
1463
            'contents': [{'parts': [{'text': model_prompt}]}],
1464
            'generation_config': {
1465
                'max_output_tokens': max_output_tokens,
1466
                'temperature': temperature,
1467
                'top_p': top_p,
1468
                'top_k': top_k,
1469
            },
1470
        }
UNCOV
1471
        if additional_parts:
×
UNCOV
1472
            data['contents'][0]['parts'].extend(additional_parts)
×
UNCOV
1473
        if directives.get('tools'):
×
UNCOV
1474
            data['tools'] = directives['tools']
×
UNCOV
1475
        logger.info(f'Job {job.index_number}: Making the content generation request to Google AI model {model}')
×
UNCOV
1476
        model_version = model  # default
×
UNCOV
1477
        try:
×
UNCOV
1478
            r = httpx.Client(http2=True).post(  # noqa: S113 Call to httpx without timeout
×
1479
                f'https://generativelanguage.googleapis.com/v{api_version}/models/{model}:generateContent?'
1480
                f'key={GOOGLE_AI_API_KEY}',
1481
                json=data,
1482
                headers={'Content-Type': 'application/json'},
1483
                timeout=timeout,
1484
            )
UNCOV
1485
            if r.is_success:
×
UNCOV
1486
                result = r.json()
×
1487
                candidate = result['candidates'][0]
×
1488
                logger.info(f"Job {job.index_number}: AI generation finished by {candidate['finishReason']}")
×
1489
                if 'content' in candidate:
×
1490
                    summary: str = candidate['content']['parts'][0]['text'].rstrip()
×
1491
                else:
UNCOV
1492
                    summary = (
×
1493
                        f'AI summary unavailable: Model did not return any candidate output:\n'
1494
                        f'{jsonlib.dumps(result, ensure_ascii=True, indent=2)}'
1495
                    )
UNCOV
1496
                model_version = result['modelVersion']
×
1497

UNCOV
1498
            elif r.status_code == 400:
×
1499
                summary = (
×
1500
                    f'AI summary unavailable: Received error from {r.url.host}: '
1501
                    f"{r.json().get('error', {}).get('message') or ''}"
1502
                )
1503
            else:
UNCOV
1504
                summary = (
×
1505
                    f'AI summary unavailable: Received error {r.status_code} {r.reason_phrase} from ' f'{r.url.host}'
1506
                )
UNCOV
1507
                if r.content:
×
UNCOV
1508
                    summary += f": {r.json().get('error', {}).get('message') or ''}"
×
1509

UNCOV
1510
        except httpx.HTTPError as e:
×
UNCOV
1511
            summary = (
×
1512
                f'AI summary unavailable: HTTP client error: {e} when requesting data from ' f'{e.request.url.host}'
1513
            )
1514

UNCOV
1515
        return summary, model_version
×
1516

1517
    def differ(
8✔
1518
        self,
1519
        directives: AiGoogleDirectives,  # type: ignore[override]
1520
        report_kind: Literal['text', 'markdown', 'html'],
1521
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1522
        tz: ZoneInfo | None = None,
1523
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1524
        logger.info(f'Job {self.job.index_number}: Running the {self.__kind__} differ from hooks.py')
8✔
1525
        warnings.warn(
8✔
1526
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1527
            f'change in the future. Please report any problems or suggestions at '
1528
            f'https://github.com/mborsetti/webchanges/discussions.',
1529
            RuntimeWarning,
1530
        )
1531

1532
        def get_ai_summary(prompt: str, system_instructions: str) -> tuple[str, str]:
8✔
1533
            """Generate AI summary from unified diff, or an error message, plus the model version."""
1534
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
8✔
1535
            if len(GOOGLE_AI_API_KEY) != 39:
8✔
1536
                logger.error(
8✔
1537
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1538
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1539
                )
1540
                return (
8✔
1541
                    f'## ERROR in summarizing changes using {self.__kind__}:\n'
1542
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1543
                    f'{len(GOOGLE_AI_API_KEY)}.\n',
1544
                    '',
1545
                )
1546

1547
            if '{unified_diff' in prompt:  # matches unified_diff or unified_diff_new
8!
UNCOV
1548
                default_context_lines = 9999 if '{unified_diff}' in prompt else 0  # none if only unified_diff_new
×
UNCOV
1549
                context_lines = directives.get('prompt_ud_context_lines', default_context_lines)
×
UNCOV
1550
                unified_diff = '\n'.join(
×
1551
                    difflib.unified_diff(
1552
                        str(self.state.old_data).splitlines(),
1553
                        str(self.state.new_data).splitlines(),
1554
                        # '@',
1555
                        # '@',
1556
                        # self.make_timestamp(self.state.old_timestamp, tz),
1557
                        # self.make_timestamp(self.state.new_timestamp, tz),
1558
                        n=context_lines,
1559
                    )
1560
                )
UNCOV
1561
                if not unified_diff:
×
1562
                    # no changes
UNCOV
1563
                    return '', ''
×
1564
            else:
1565
                unified_diff = ''
8✔
1566

1567
            if '{unified_diff_new}' in prompt:
8!
UNCOV
1568
                unified_diff_new_lines = []
×
UNCOV
1569
                for line in unified_diff.splitlines():
×
UNCOV
1570
                    if line.startswith('+'):
×
UNCOV
1571
                        unified_diff_new_lines.append(line[1:])
×
UNCOV
1572
                unified_diff_new = '\n'.join(unified_diff_new_lines)
×
1573
            else:
1574
                unified_diff_new = ''
8✔
1575

1576
            # check if data is different (same data is sent during testing)
1577
            if '{old_text}' in prompt and '{new_text}' in prompt and self.state.old_data == self.state.new_data:
8!
1578
                return '', ''
8✔
1579

UNCOV
1580
            model_prompt = prompt.format(
×
1581
                unified_diff=unified_diff,
1582
                unified_diff_new=unified_diff_new,
1583
                old_text=self.state.old_data,
1584
                new_text=self.state.new_data,
1585
            )
1586

UNCOV
1587
            summary, model_version = self._send_to_model(
×
1588
                self.job,
1589
                system_instructions,
1590
                model_prompt,
1591
                directives=directives,
1592
            )
1593

UNCOV
1594
            return summary, model_version
×
1595

1596
        if directives.get('additions_only') or self.job.additions_only:
8!
UNCOV
1597
            default_system_instructions = (
×
1598
                'You are a skilled journalist. Your task is to summarize the provided text in a clear and concise '
1599
                'manner. Restrict your analysis and summary *only* to the text provided. Do not introduce any '
1600
                'external information or assumptions.\n\n'
1601
                'Format your summary using Markdown. Use headings, bullet points, and other Markdown elements where '
1602
                'appropriate to create a well-structured and easily readable summary.'
1603
            )
UNCOV
1604
            default_prompt = '{unified_diff_new}'
×
1605
        else:
1606
            default_system_instructions = (
8✔
1607
                'You are a skilled journalist tasked with analyzing two versions of a text and summarizing the key '
1608
                'differences in meaning between them. The audience for your summary is already familiar with the '
1609
                "text's content, so you can focus on the most significant changes.\n\n"
1610
                '**Instructions:**\n\n'
1611
                '1. Carefully examine the old version of the text, provided within the `<old_version>` and '
1612
                '`</old_version>` tags.\n'
1613
                '2. Carefully examine the new version of the text, provided within the `<new_version>` and '
1614
                '`</new_version>` tags.\n'
1615
                '3. Compare the two versions, identifying areas where the meaning differs. This includes additions, '
1616
                'removals, or alterations that change the intended message or interpretation.\n'
1617
                '4. Ignore changes that do not affect the overall meaning, even if the wording has been modified.\n'
1618
                '5. Summarize the identified differences, except those ignored, in a clear and concise manner, '
1619
                'explaining how the meaning has shifted or evolved in the new version compared to the old version only '
1620
                'when necessary. Be specific and provide examples to illustrate your points when needed.\n'
1621
                '6. If there are only additions to the text, then summarize the additions.\n'
1622
                '7. Use Markdown formatting to structure your summary effectively. Use headings, bullet points, '
1623
                'and other Markdown elements as needed to enhance readability.\n'
1624
                '8. Restrict your analysis and summary to the information provided within the `<old_version>` and '
1625
                '`<new_version>` tags. Do not introduce external information or assumptions.\n'
1626
            )
1627
            default_prompt = '<old_version>\n{old_text}\n</old_version>\n\n<new_version>\n{new_text}\n</new_version>'
8✔
1628
        system_instructions = directives.get('system_instructions', default_system_instructions)
8✔
1629
        prompt = directives.get('prompt', default_prompt).replace('\\n', '\n')
8✔
1630
        summary, model_version = get_ai_summary(prompt, system_instructions)
8✔
1631
        if not summary:
8✔
1632
            self.state.verb = 'changed,no_report'
8✔
1633
            return {'text': '', 'markdown': '', 'html': ''}
8✔
1634
        newline = '\n'  # For Python < 3.12 f-string {} compatibility
8✔
1635
        back_n = '\\n'  # For Python < 3.12 f-string {} compatibility
8✔
1636
        directives.pop('model', None)
8✔
1637
        if directives:
8!
UNCOV
1638
            directives_text = (
×
1639
                ' (differ directive(s): '
1640
                + (
1641
                    ', '.join(f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.items())
1642
                    or 'None'
1643
                )
1644
                + ')'
1645
            )
1646
        else:
1647
            directives_text = ''
8✔
1648
        footer = (
8✔
1649
            f"Summary by Google Generative AI's model {model_version}{directives_text}"
1650
            if model_version or directives_text
1651
            else ''
1652
        )
1653
        temp_unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
1654
        for rep_kind in ['text', 'html']:  # markdown is same as text
8✔
1655
            unified_report = DifferBase.process(
8✔
1656
                'unified',
1657
                directives.get('unified') or {},  # type: ignore[arg-type]
1658
                self.state,
1659
                rep_kind,  # type: ignore[arg-type]
1660
                tz,
1661
                temp_unfiltered_diff,
1662
            )
1663
        return {
8✔
1664
            'text': f"{summary}\n\n{unified_report['text']}" + (f'\n------------\n{footer}' if footer else ''),
1665
            'markdown': f"{summary}\n\n{unified_report['markdown']}" + (f'\n* * *\n{footer}' if footer else ''),
1666
            'html': '\n'.join(
1667
                [
1668
                    mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>'),
1669
                    '<br>',
1670
                    unified_report['html'],
1671
                ]
1672
                + (['-----<br>', f'<i><small>{footer}</small></i>'] if footer else [])
1673
            ),
1674
        }
1675

1676

1677
class WdiffDiffer(DifferBase):
8✔
1678
    __kind__ = 'wdiff'
8✔
1679

1680
    __supported_directives__: dict[str, str] = {
8✔
1681
        'context_lines': 'the number of context lines (default: 3)',
1682
        'range_info': 'include range information lines (default: true)',
1683
    }
1684

1685
    def differ(
8✔
1686
        self,
1687
        directives: dict[str, Any],
1688
        report_kind: Literal['text', 'markdown', 'html'],
1689
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1690
        tz: ZoneInfo | None = None,
1691
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1692
        warnings.warn(
8✔
1693
            f'Job {self.job.index_number}: Differ {self.__kind__} is WORK IN PROGRESS and has KNOWN bugs which '
1694
            "are being worked on. DO NOT USE AS THE RESULTS WON'T BE CORRECT.",
1695
            RuntimeWarning,
1696
        )
1697
        if not isinstance(self.state.old_data, str):
8!
1698
            raise ValueError
×
1699
        if not isinstance(self.state.new_data, str):
8!
UNCOV
1700
            raise ValueError
×
1701

1702
        # Split the texts into words tokenizing newline
1703
        if self.state.is_markdown():
8!
1704
            # Don't split spaces in link text, tokenize space as </s>
1705
            old_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.old_data)
8✔
1706
            words1 = old_data.replace('\n', ' <\\n> ').split(' ')
8✔
1707
            new_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.new_data)
8✔
1708
            words2 = new_data.replace('\n', ' <\\n> ').split(' ')
8✔
1709
        else:
UNCOV
1710
            words1 = self.state.old_data.replace('\n', ' <\\n> ').split(' ')
×
UNCOV
1711
            words2 = self.state.new_data.replace('\n', ' <\\n> ').split(' ')
×
1712

1713
        # Create a Differ object
1714
        import difflib
8✔
1715

1716
        d = difflib.Differ()
8✔
1717

1718
        # Generate a difference list
1719
        diff = list(d.compare(words1, words2))
8✔
1720

1721
        add_html = '<span style="background-color:#d1ffd1;color:#082b08;">'
8✔
1722
        rem_html = '<span style="background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;">'
8✔
1723

1724
        head_text = '\n'.join(
8✔
1725
            [
1726
                # f'Differ: wdiff',
1727
                f'\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m',
1728
                f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m',
1729
                '',
1730
            ]
1731
        )
1732
        head_html = '<br>\n'.join(
8✔
1733
            [
1734
                '<span style="font-family:monospace;">'
1735
                # 'Differ: wdiff',
1736
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
1737
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>'
1738
                f'</span>',
1739
                '',
1740
            ]
1741
        )
1742
        # Process the diff output to make it more wdiff-like
1743
        result_text = []
8✔
1744
        result_html = []
8✔
1745
        prev_word_text = ''
8✔
1746
        prev_word_html = ''
8✔
1747
        next_text = ''
8✔
1748
        next_html = ''
8✔
1749
        add = False
8✔
1750
        rem = False
8✔
1751

1752
        for word_text in diff + ['  ']:
8✔
1753
            if word_text[0] == '?':  # additional context line
8✔
1754
                continue
8✔
1755
            word_html = word_text
8✔
1756
            pre_text = [next_text] if next_text else []
8✔
1757
            pre_html = [next_html] if next_html else []
8✔
1758
            next_text = ''
8✔
1759
            next_html = ''
8✔
1760

1761
            if word_text[0] == '+' and not add:  # Beginning of additions
8✔
1762
                if rem:
8✔
1763
                    prev_word_html += '</span>'
8✔
1764
                    rem = False
8✔
1765
                if word_text[2:] == '<\\n>':
8!
1766
                    next_text = '\033[92m'
×
UNCOV
1767
                    next_html = add_html
×
1768
                else:
1769
                    pre_text.append('\033[92m')
8✔
1770
                    pre_html.append(add_html)
8✔
1771
                add = True
8✔
1772
            elif word_text[0] == '-' and not rem:  # Beginning of deletions
8✔
1773
                if add:
8✔
1774
                    prev_word_html += '</span>'
8✔
1775
                    add = False
8✔
1776
                if word_text[2:] == '<\\n>':
8!
UNCOV
1777
                    next_text = '\033[91m'
×
UNCOV
1778
                    next_html = rem_html
×
1779
                else:
1780
                    pre_text.append('\033[91m')
8✔
1781
                    pre_html.append(rem_html)
8✔
1782
                rem = True
8✔
1783
            elif word_text[0] == ' ' and (add or rem):  # Unchanged word
8✔
1784
                if prev_word_text == '<\\n>':
8!
UNCOV
1785
                    prev_word_text = '\033[0m<\\n>'
×
UNCOV
1786
                    prev_word_html = '</span><\\n>'
×
1787
                else:
1788
                    prev_word_text += '\033[0m'
8✔
1789
                    prev_word_html += '</span>'
8✔
1790
                add = False
8✔
1791
                rem = False
8✔
1792
            elif word_text[2:] == '<\\n>':  # New line
8✔
1793
                if add:
8!
UNCOV
1794
                    word_text = '  \033[0m<\\n>'
×
UNCOV
1795
                    word_html = '  </span><\\n>'
×
UNCOV
1796
                    add = False
×
1797
                elif rem:
8!
UNCOV
1798
                    word_text = '  \033[0m<\\n>'
×
UNCOV
1799
                    word_html = '  </span><\\n>'
×
UNCOV
1800
                    rem = False
×
1801

1802
            result_text.append(prev_word_text)
8✔
1803
            result_html.append(prev_word_html)
8✔
1804
            pre_text.append(word_text[2:])
8✔
1805
            pre_html.append(word_html[2:])
8✔
1806
            prev_word_text = ''.join(pre_text)
8✔
1807
            prev_word_html = ''.join(pre_html)
8✔
1808
        if add or rem:
8!
UNCOV
1809
            result_text[-1] += '\033[0m'
×
UNCOV
1810
            result_html[-1] += '</span>'
×
1811

1812
        # rebuild the text from words, replacing the newline token
1813
        diff_text = ' '.join(result_text[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1814
        diff_html = ' '.join(result_html[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1815

1816
        # build contextlines
1817
        contextlines = directives.get('context_lines', self.job.contextlines)
8✔
1818
        # contextlines = 999
1819
        if contextlines is None:
8!
1820
            contextlines = 3
8✔
1821
        range_info = directives.get('range_info', True)
8✔
1822
        if contextlines < len(diff_text.splitlines()):
8!
UNCOV
1823
            lines_with_changes = []
×
UNCOV
1824
            for i, line in enumerate(diff_text.splitlines()):
×
UNCOV
1825
                if '\033[9' in line:
×
UNCOV
1826
                    lines_with_changes.append(i)
×
UNCOV
1827
            if contextlines:
×
UNCOV
1828
                lines_to_keep: set[int] = set()
×
UNCOV
1829
                for i in lines_with_changes:
×
UNCOV
1830
                    lines_to_keep.update(r for r in range(i - contextlines, i + contextlines + 1))
×
1831
            else:
UNCOV
1832
                lines_to_keep = set(lines_with_changes)
×
UNCOV
1833
            new_diff_text = []
×
UNCOV
1834
            new_diff_html = []
×
UNCOV
1835
            last_line = 0
×
UNCOV
1836
            skip = False
×
UNCOV
1837
            i = 0
×
UNCOV
1838
            for i, (line_text, line_html) in enumerate(zip(diff_text.splitlines(), diff_html.splitlines())):
×
UNCOV
1839
                if i in lines_to_keep:
×
UNCOV
1840
                    if range_info and skip:
×
UNCOV
1841
                        new_diff_text.append(f'@@ {last_line + 1}...{i} @@')
×
UNCOV
1842
                        new_diff_html.append(f'@@ {last_line + 1}...{i} @@')
×
UNCOV
1843
                        skip = False
×
UNCOV
1844
                    new_diff_text.append(line_text)
×
UNCOV
1845
                    new_diff_html.append(line_html)
×
UNCOV
1846
                    last_line = i + 1
×
1847
                else:
UNCOV
1848
                    skip = True
×
UNCOV
1849
            if (i + 1) != last_line:
×
UNCOV
1850
                if range_info and skip:
×
UNCOV
1851
                    new_diff_text.append(f'@@ {last_line + 1}...{i + 1} @@')
×
UNCOV
1852
                    new_diff_html.append(f'@@ {last_line + 1}...{i + 1} @@')
×
UNCOV
1853
            diff_text = '\n'.join(new_diff_text)
×
UNCOV
1854
            diff_html = '\n'.join(new_diff_html)
×
1855

1856
        if self.state.is_markdown():
8!
1857
            diff_text = diff_text.replace('</s>', ' ')
8✔
1858
            diff_html = diff_html.replace('</s>', ' ')
8✔
1859
            diff_html = mark_to_html(diff_html, self.job.markdown_padded_tables).replace('<p>', '').replace('</p>', '')
8✔
1860

1861
        if self.job.monospace:
8!
UNCOV
1862
            diff_html = f'<span style="font-family:monospace;white-space:pre-wrap">{diff_html}</span>'
×
1863
        else:
1864
            diff_html = diff_html.replace('\n', '<br>\n')
8✔
1865

1866
        return {
8✔
1867
            'text': head_text + diff_text,
1868
            'markdown': head_text + diff_text,
1869
            'html': head_html + diff_html,
1870
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc