• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 9519925538

14 Jun 2024 05:19PM UTC coverage: 78.056% (+0.2%) from 77.865%
9519925538

push

github

mborsetti
Version 3.24.1rc0

1744 of 2491 branches covered (70.01%)

Branch coverage included in aggregate %.

4392 of 5370 relevant lines covered (81.79%)

6.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.07
/webchanges/differs.py
1
"""Differs."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import base64
8✔
8
import difflib
8✔
9
import html
8✔
10
import logging
8✔
11
import math
8✔
12
import os
8✔
13
import re
8✔
14
import shlex
8✔
15
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
16
import tempfile
8✔
17
import traceback
8✔
18
import urllib.parse
8✔
19
import warnings
8✔
20
from base64 import b64encode
8✔
21
from datetime import datetime
8✔
22
from io import BytesIO
8✔
23
from pathlib import Path
8✔
24
from typing import Any, Iterator, Literal, Optional, TYPE_CHECKING
8✔
25
from zoneinfo import ZoneInfo
8✔
26

27
import html2text
8✔
28

29
from webchanges.util import linkify, mark_to_html, TrackSubClasses
8✔
30

31
try:
8✔
32
    from deepdiff import DeepDiff
8✔
33
    from deepdiff.model import DiffLevel
8✔
34
except ImportError as e:  # pragma: no cover
35
    DeepDiff = e.msg  # type: ignore[no-redef]
36

37
try:
8✔
38
    import httpx
8✔
39
except ImportError:  # pragma: no cover
40
    httpx = None  # type: ignore[assignment]
41
if httpx is not None:
8!
42
    try:
8✔
43
        import h2
8✔
44
    except ImportError:  # pragma: no cover
45
        h2 = None  # type: ignore[assignment]
46

47
try:
8✔
48
    import numpy as np
8✔
49
except ImportError as e:  # pragma: no cover
50
    np = e.msg  # type: ignore[assignment]
51

52
try:
8✔
53
    from PIL import Image, ImageChops, ImageEnhance, ImageStat
8✔
54
except ImportError as e:  # pragma: no cover
55
    Image = e.msg  # type: ignore[no-redef]
56

57
# https://stackoverflow.com/questions/712791
58
try:
8✔
59
    import simplejson as jsonlib
8✔
60
except ImportError:  # pragma: no cover
61
    import json as jsonlib  # type: ignore[no-redef]
62

63
try:
8✔
64
    import xmltodict
8✔
65
except ImportError as e:  # pragma: no cover
66
    xmltodict = e.msg  # type: ignore[no-redef]
67

68
# https://stackoverflow.com/questions/39740632
69
if TYPE_CHECKING:
70
    from webchanges.handler import JobState
71

72

73
logger = logging.getLogger(__name__)
8✔
74

75

76
class DifferBase(metaclass=TrackSubClasses):
8✔
77
    """The base class for differs."""
78

79
    __subclasses__: dict[str, type[DifferBase]] = {}
8✔
80
    __anonymous_subclasses__: list[type[DifferBase]] = []
8✔
81

82
    __kind__: str = ''
8✔
83

84
    __supported_directives__: dict[str, str] = {}  # this must be present, even if empty
8✔
85

86
    css_added_style = 'background-color:#d1ffd1;color:#082b08;'
8✔
87
    css_deltd_style = 'background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;'
8✔
88

89
    def __init__(self, state: JobState) -> None:
8✔
90
        """
91

92
        :param state: the JobState.
93
        """
94
        self.job = state.job
8✔
95
        self.state = state
8✔
96

97
    @classmethod
8✔
98
    def differ_documentation(cls) -> str:
8✔
99
        """Generates simple differ documentation for use in the --features command line argument.
100

101
        :returns: A string to display.
102
        """
103
        result: list[str] = []
8✔
104
        for sc in TrackSubClasses.sorted_by_kind(cls):
8✔
105
            # default_subdirective = getattr(sc, '__default_subdirective__', None)
106
            result.extend((f'  * {sc.__kind__} - {sc.__doc__}',))
8✔
107
            if hasattr(sc, '__supported_directives__'):
8!
108
                for key, doc in sc.__supported_directives__.items():
8✔
109
                    result.append(f'      {key} ... {doc}')
8✔
110
        result.append('\n[] ... Parameter can be supplied as unnamed value\n')
8✔
111
        return '\n'.join(result)
8✔
112

113
    @classmethod
8✔
114
    def normalize_differ(
8✔
115
        cls,
116
        differ_spec: Optional[dict[str, Any]],
117
        job_index_number: Optional[int] = None,
118
    ) -> tuple[str, dict[str, Any]]:
119
        """Checks the differ_spec for its validity and applies default values.
120

121
        :param differ_spec: The differ as entered by the user; use "unified" if empty.
122
        :param job_index_number: The job index number.
123
        :returns: A validated differ_kind, subdirectives (where subdirectives is a dict).
124
        """
125
        differ_spec = differ_spec or {'name': 'unified'}
8✔
126
        subdirectives = differ_spec.copy()
8✔
127
        differ_kind = subdirectives.pop('name', '')
8✔
128
        if not differ_kind:
8✔
129
            if list(subdirectives.keys()) == ['command']:
8!
130
                differ_kind = 'command'
8✔
131
            else:
132
                raise ValueError(
×
133
                    f"Job {job_index_number}: Differ directive must have a 'name' sub-directive: {differ_spec}."
134
                )
135

136
        differcls = cls.__subclasses__.get(differ_kind, None)
8✔
137
        if not differcls:
8✔
138
            raise ValueError(f'Job {job_index_number}: No differ named {differ_kind}.')
8✔
139

140
        if hasattr(differcls, '__supported_directives__'):
8!
141
            provided_keys = set(subdirectives.keys())
8✔
142
            allowed_keys = set(differcls.__supported_directives__.keys())
8✔
143
            unknown_keys = provided_keys.difference(allowed_keys)
8✔
144
            if unknown_keys and '<any>' not in allowed_keys:
8✔
145
                raise ValueError(
8✔
146
                    f'Job {job_index_number}: Differ {differ_kind} does not support sub-directive(s) '
147
                    f"{', '.join(unknown_keys)} (supported: {', '.join(sorted(allowed_keys))})."
148
                )
149

150
        return differ_kind, subdirectives
8✔
151

152
    @classmethod
8✔
153
    def process(
8✔
154
        cls,
155
        differ_kind: str,
156
        directives: dict[str, Any],
157
        job_state: JobState,
158
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
159
        tz: Optional[str] = None,
160
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
161
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
162
        """Process the differ.
163

164
        :param differ_kind: The name of the differ.
165
        :param directives: The directives.
166
        :param job_state: The JobState.
167
        :param report_kind: The report kind required.
168
        :param tz: The timezone of the report.
169
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
170
           for a different report_kind.
171
        :returns: The output of the differ or a an error message with traceback if it fails.
172
        """
173
        logger.info(f'Job {job_state.job.index_number}: Applying differ {differ_kind}, directives {directives}')
8✔
174
        differcls: Optional[type[DifferBase]] = cls.__subclasses__.get(differ_kind)  # type: ignore[assignment]
8✔
175
        if differcls:
8✔
176
            try:
8✔
177
                return differcls(job_state).differ(directives, report_kind, _unfiltered_diff, tz)
8✔
178
            except Exception as e:
8✔
179
                # Differ failed
180
                logger.info(
8✔
181
                    f'Job {job_state.job.index_number}: Differ {differ_kind} with {directives=} encountered '
182
                    f'error {e}'
183
                )
184
                # Undo saving of new data since user won't see the diff
185
                job_state.delete_latest()
8✔
186

187
                job_state.exception = e
8✔
188
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
189
                directives_text = ', '.join(f'{key}={value}' for key, value in directives.items()) or 'None'
8✔
190
                return {
8✔
191
                    'text': (
192
                        f'Differ {differ_kind} with directive(s) {directives_text} encountered an '
193
                        f'error:\n\n{job_state.traceback.strip()}'
194
                    ),
195
                    'markdown': (
196
                        f'## Differ {differ_kind} with directive(s) {directives_text} '
197
                        f'encountered an error:\n```\n{job_state.traceback.strip()}\n```\n'
198
                    ),
199
                    'html': (
200
                        f'<span style="color:red;font-weight:bold">Differ {differ_kind} with directive(s) '
201
                        f'{directives_text} encountered an error:<br>\n<br>\n'
202
                        f'<span style="font-family:monospace;white-space:pre-wrap;">{job_state.traceback.strip()}'
203
                        f'</span></span>'
204
                    ),
205
                }
206
        else:
207
            return {}
8✔
208

209
    def differ(
8✔
210
        self,
211
        directives: dict[str, Any],
212
        report_kind: Literal['text', 'markdown', 'html'],
213
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
214
        tz: Optional[str] = None,
215
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
216
        """Create a diff from the data. Since this function could be called by different reporters of multiple report
217
        types ('text', 'markdown', 'html'), the differ outputs a dict with data for the report_kind it generated so
218
        that it can be reused.
219

220
        :param directives: The directives.
221
        :param report_kind: The report_kind for which a diff must be generated (at a minimum).
222
        :param _unfiltered_diff: Any previous diffs generated by the same filter, who can be used to generate a diff
223
           for a different report_kind.
224
        :param tz: The timezone of the report.
225
        :returns: An empty dict if there is no change, otherwise a dict with report_kind as key and diff as value
226
           (as a minimum for the report_kind requested).
227
        :raises RuntimeError: If the external diff tool returns an error.
228
        """
229
        raise NotImplementedError()
230

231
    @staticmethod
8✔
232
    def make_timestamp(
8✔
233
        timestamp: float,
234
        tz: Optional[str] = None,
235
    ) -> str:
236
        """Creates a datetime string in RFC 5322 (email) format with the time zone name (if available) in the
237
        Comments and Folding White Space (CFWS) section.
238

239
        :param timestamp: The timestamp.
240
        :param tz: The IANA timezone of the report.
241
        :returns: A datetime string in RFC 5322 (email) format.
242
        """
243
        if timestamp:
8✔
244
            if tz:
8✔
245
                tz_info: Optional[ZoneInfo] = ZoneInfo(tz)
8✔
246
            else:
247
                tz_info = None
8✔
248
            dt = datetime.fromtimestamp(timestamp).astimezone(tz=tz_info)
8✔
249
            # add timezone name if known
250
            if dt.strftime('%Z') != dt.strftime('%z')[:3]:
8✔
251
                cfws = f" ({dt.strftime('%Z')})"
8✔
252
            else:
253
                cfws = ''
8✔
254
            return dt.strftime('%a, %d %b %Y %H:%M:%S %z') + cfws
8✔
255
        else:
256
            return 'NEW'
8✔
257

258
    @staticmethod
8✔
259
    def html2text(data: str) -> str:
8✔
260
        """Converts html to text.
261

262
        :param data: the string in html format.
263
        :returns: the string in text format.
264
        """
265
        parser = html2text.HTML2Text()
8✔
266
        parser.unicode_snob = True
8✔
267
        parser.body_width = 0
8✔
268
        parser.ignore_images = True
8✔
269
        parser.single_line_break = True
8✔
270
        parser.wrap_links = False
8✔
271
        return '\n'.join(line.rstrip() for line in parser.handle(data).splitlines())
8✔
272

273
    def raise_import_error(self, package_name: str, error_message: str) -> None:
8✔
274
        """Raise ImportError for missing package.
275

276
        :param package_name: The name of the module/package that could not be imported.
277
        :param error_message: The error message from ImportError.
278

279
        :raises: ImportError.
280
        """
281
        raise ImportError(
8✔
282
            f"Job {self.job.index_number}: Python package '{package_name}' is not installed; cannot use "
283
            f"'differ: {self.__kind__}' ({self.job.get_location()})\n{error_message}"
284
        )
285

286

287
class UnifiedDiffer(DifferBase):
8✔
288
    """(Default) Generates a unified diff."""
289

290
    __kind__ = 'unified'
8✔
291

292
    __supported_directives__ = {
8✔
293
        'context_lines': 'the number of context lines (default: 3)',
294
        'range_info': 'include range information lines (default: true)',
295
    }
296

297
    def unified_diff_to_html(self, diff: str) -> Iterator[str]:
8✔
298
        """
299
        Generates a colorized HTML table from unified diff, applying styles and processing based on job values.
300

301
        :param diff: the unified diff
302
        """
303

304
        def process_line(line: str, line_num: int, monospace_style: str) -> str:
8✔
305
            """
306
            Processes each line for HTML output, handling special cases and styles.
307

308
            :param line: The line to analyze.
309
            :param line_num: The line number in the document.
310
            :param monospace_style: Additional style string for monospace text.
311

312
            :returns: The line processed into an HTML table row string.
313
            """
314
            # The style= string (or empty string) to add to an HTML tag.
315
            if line_num == 0:
8✔
316
                style = 'font-family:monospace;color:darkred;'
8✔
317
            elif line_num == 1:
8✔
318
                style = 'font-family:monospace;color:darkgreen;'
8✔
319
            elif line[0] == '+':  # addition
8✔
320
                style = f'{monospace_style}{self.css_added_style}'
8✔
321
            elif line[0] == '-':  # deletion
8✔
322
                style = f'{monospace_style}{self.css_deltd_style}'
8✔
323
            elif line[0] == ' ':  # context line
8✔
324
                style = monospace_style
8✔
325
            elif line[0] == '@':  # range information
8✔
326
                style = 'font-family:monospace;background-color:#fbfbfb;'
8✔
327
            elif line[0] == '/':  # informational header added by additions_only or deletions_only filters
8!
328
                style = 'background-color:lightyellow;'
8✔
329
            else:
330
                raise RuntimeError('Unified Diff does not comform to standard!')
×
331
            style = f' style="{style}"' if style else ''
8✔
332

333
            if line_num > 1 and line[0] != '@':  # don't apply to headers or range information
8✔
334
                if self.job.is_markdown or line[0] == '/':  # our informational header
8✔
335
                    line = mark_to_html(line[1:], self.job.markdown_padded_tables)
8✔
336
                else:
337
                    line = linkify(line[1:])
8✔
338
            return f'<tr><td{style}>{line}</td></tr>'
8✔
339

340
        table_style = (
8✔
341
            ' style="border-collapse:collapse;font-family:monospace;white-space:pre-wrap;"'
342
            if self.job.monospace
343
            else ' style="border-collapse:collapse;"'
344
        )
345
        yield f'<table{table_style}>'
8✔
346
        monospace_style = 'font-family:monospace;' if self.job.monospace else ''
8✔
347
        for i, line in enumerate(diff.splitlines()):
8✔
348
            yield process_line(line, i, monospace_style)
8✔
349
        yield '</table>'
8✔
350

351
    def differ(
8✔
352
        self,
353
        directives: dict[str, Any],
354
        report_kind: Literal['text', 'markdown', 'html'],
355
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
356
        tz: Optional[str] = None,
357
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
358
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
359
        if report_kind == 'html' and _unfiltered_diff is not None and 'text' in _unfiltered_diff:
8✔
360
            diff_text = _unfiltered_diff['text']
8✔
361
        else:
362
            empty_return: dict[Literal['text', 'markdown', 'html'], str] = {'text': '', 'markdown': '', 'html': ''}
8✔
363
            contextlines = directives.get('context_lines', self.job.contextlines)
8✔
364
            if contextlines is None:
8✔
365
                if self.job.additions_only or self.job.deletions_only:
8✔
366
                    contextlines = 0
8✔
367
                else:
368
                    contextlines = 3
8✔
369
            diff = list(
8✔
370
                difflib.unified_diff(
371
                    str(self.state.old_data).splitlines(),
372
                    str(self.state.new_data).splitlines(),
373
                    '@',
374
                    '@',
375
                    self.make_timestamp(self.state.old_timestamp, tz),
376
                    self.make_timestamp(self.state.new_timestamp, tz),
377
                    contextlines,
378
                    lineterm='',
379
                )
380
            )
381
            if not diff:
8✔
382
                self.state.verb = 'changed,no_report'
8✔
383
                return empty_return
8✔
384
            # replace tabs in header lines
385
            diff[0] = diff[0].replace('\t', ' ')
8✔
386
            diff[1] = diff[1].replace('\t', ' ')
8✔
387

388
            if self.job.additions_only:
8✔
389
                if len(self.state.old_data) and len(self.state.new_data) / len(self.state.old_data) <= 0.25:
8✔
390
                    diff = (
8✔
391
                        diff[:2]
392
                        + ['/**Comparison type: Additions only**']
393
                        + ['/**Deletions are being shown as 75% or more of the content has been deleted**']
394
                        + diff[2:]
395
                    )
396
                else:
397
                    head = '---' + diff[0][3:]
8✔
398
                    diff = [line for line in diff if line.startswith('+') or line.startswith('@')]
8✔
399
                    diff = [
8✔
400
                        line1
401
                        for line1, line2 in zip([''] + diff, diff + [''])
402
                        if not (line1.startswith('@') and line2.startswith('@'))
403
                    ][1:]
404
                    diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
405
                    if len(diff) == 1 or len([line for line in diff if line.lstrip('+').rstrip()]) == 2:
8✔
406
                        self.state.verb = 'changed,no_report'
8✔
407
                        return empty_return
8✔
408
                    diff = [head, diff[0], '/**Comparison type: Additions only**'] + diff[1:]
8✔
409
            elif self.job.deletions_only:
8✔
410
                head = '--- @' + diff[1][3:]
8✔
411
                diff = [line for line in diff if line.startswith('-') or line.startswith('@')]
8✔
412
                diff = [
8✔
413
                    line1
414
                    for line1, line2 in zip([''] + diff, diff + [''])
415
                    if not (line1.startswith('@') and line2.startswith('@'))
416
                ][1:]
417
                diff = diff[:-1] if diff[-1].startswith('@') else diff
8✔
418
                if len(diff) == 1 or len([line for line in diff if line.lstrip('-').rstrip()]) == 2:
8✔
419
                    self.state.verb = 'changed,no_report'
8✔
420
                    return empty_return
8✔
421
                diff = [diff[0], head, '/**Comparison type: Deletions only**'] + diff[1:]
8✔
422

423
            # remove range info lines if needed
424
            if directives.get('range_info') is False or (
8✔
425
                directives.get('range_info') is None
426
                and self.job.additions_only
427
                and (len(diff) < 4 or diff[3][0] != '/')
428
            ):
429
                diff = [line for line in diff if not line.startswith('@@ ')]
8✔
430

431
            diff_text = '\n'.join(diff)
8✔
432

433
            out_diff.update(
8✔
434
                {
435
                    'text': diff_text,
436
                    'markdown': diff_text,
437
                }
438
            )
439

440
        if report_kind == 'html':
8✔
441
            out_diff['html'] = '\n'.join(self.unified_diff_to_html(diff_text))
8✔
442

443
        return out_diff
8✔
444

445

446
class TableDiffer(DifferBase):
8✔
447
    """Generates a Python HTML table diff."""
448

449
    __kind__ = 'table'
8✔
450

451
    __supported_directives__ = {
8✔
452
        'tabsize': 'tab stop spacing (default: 8)',
453
    }
454

455
    def differ(
8✔
456
        self,
457
        directives: dict[str, Any],
458
        report_kind: Literal['text', 'markdown', 'html'],
459
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
460
        tz: Optional[str] = None,
461
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
462
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
463
        if report_kind in {'text', 'markdown'} and _unfiltered_diff is not None and 'html' in _unfiltered_diff:
8✔
464
            table = _unfiltered_diff['html']
8✔
465
        else:
466
            tabsize = int(directives.get('tabsize', 8))
8✔
467
            html_diff = difflib.HtmlDiff(tabsize=tabsize)
8✔
468
            table = html_diff.make_table(
8✔
469
                str(self.state.old_data).splitlines(keepends=True),
470
                str(self.state.new_data).splitlines(keepends=True),
471
                self.make_timestamp(self.state.old_timestamp, tz),
472
                self.make_timestamp(self.state.new_timestamp, tz),
473
                True,
474
                3,
475
            )
476
            # fix table formatting
477
            table = table.replace('<th ', '<th style="font-family:monospace" ')
8✔
478
            table = table.replace('<td ', '<td style="font-family:monospace" ')
8✔
479
            table = table.replace(' nowrap="nowrap"', '')
8✔
480
            table = table.replace('<a ', '<a style="font-family:monospace;color:inherit" ')
8✔
481
            table = table.replace('<span class="diff_add"', '<span style="color:green;background-color:lightgreen"')
8✔
482
            table = table.replace('<span class="diff_sub"', '<span style="color:red;background-color:lightred"')
8✔
483
            table = table.replace('<span class="diff_chg"', '<span style="color:orange;background-color:lightyellow"')
8✔
484
            out_diff['html'] = table
8✔
485

486
        if report_kind in {'text', 'markdown'}:
8✔
487
            diff_text = self.html2text(table)
8✔
488
            out_diff.update(
8✔
489
                {
490
                    'text': diff_text,
491
                    'markdown': diff_text,
492
                }
493
            )
494

495
        return out_diff
8✔
496

497

498
class CommandDiffer(DifferBase):
8✔
499
    """Runs an external command to generate the diff."""
500

501
    __kind__ = 'command'
8✔
502

503
    __supported_directives__ = {
8✔
504
        'command': 'The command to execute',
505
    }
506

507
    re_ptags = re.compile(r'^<p>|</p>$')
8✔
508
    re_htags = re.compile(r'<(/?)h\d>')
8✔
509
    re_tagend = re.compile(r'<(?!.*<).*>+$')
8✔
510

511
    def differ(
8✔
512
        self,
513
        directives: dict[str, Any],
514
        report_kind: Literal['text', 'markdown', 'html'],
515
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
516
        tz: Optional[str] = None,
517
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
518
        out_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
519
        command = directives['command']
8✔
520
        if (
8✔
521
            report_kind == 'html'
522
            and not command.startswith('wdiff')
523
            and _unfiltered_diff is not None
524
            and 'text' in _unfiltered_diff
525
        ):
526
            diff = _unfiltered_diff['text']
8✔
527
        else:
528
            old_data = self.state.old_data
8✔
529
            new_data = self.state.new_data
8✔
530
            if self.job.is_markdown:
8✔
531
                # protect the link anchor from being split (won't work)
532
                markdown_links_re = re.compile(r'\[(.*?)][(](.*?)[)]')
8✔
533
                old_data = markdown_links_re.sub(
8!
534
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(old_data)
535
                )
536
                new_data = markdown_links_re.sub(
8!
537
                    lambda x: f'[{urllib.parse.quote(x.group(1))}]({x.group(2)})', str(new_data)
538
                )
539

540
            # External diff tool
541
            with tempfile.TemporaryDirectory() as tmp_dir:
8✔
542
                tmp_path = Path(tmp_dir)
8✔
543
                old_file_path = tmp_path.joinpath('old_file')
8✔
544
                new_file_path = tmp_path.joinpath('new_file')
8✔
545
                if isinstance(old_data, str):
8!
546
                    old_file_path.write_text(old_data)
8✔
547
                else:
548
                    old_file_path.write_bytes(old_data)
×
549
                if isinstance(new_data, str):
8!
550
                    new_file_path.write_text(new_data)
8✔
551
                else:
552
                    new_file_path.write_bytes(new_data)
×
553
                cmdline = shlex.split(command) + [str(old_file_path), str(new_file_path)]
8✔
554
                proc = subprocess.run(cmdline, capture_output=True, text=True)  # noqa: S603 subprocess call
8✔
555
            if proc.stderr or proc.returncode > 1:
8✔
556
                raise RuntimeError(
8✔
557
                    f"Job {self.job.index_number}: External differ '{directives}' returned '{proc.stderr.strip()}' "
558
                    f'({self.job.get_location()})'
559
                ) from subprocess.CalledProcessError(proc.returncode, cmdline)
560
            if proc.returncode == 0:
8✔
561
                self.state.verb = 'changed,no_report'
8✔
562
                return {'text': '', 'markdown': '', 'html': ''}
8✔
563
            head = '\n'.join(
8✔
564
                [
565
                    f'Using differ "{directives}"',
566
                    f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}',
567
                    f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}',
568
                ]
569
            )
570
            diff = proc.stdout
8✔
571
            if self.job.is_markdown:
8!
572
                # undo the protection of the link anchor from being split
573
                diff = markdown_links_re.sub(lambda x: f'[{urllib.parse.unquote(x.group(1))}]({x.group(2)})', diff)
8!
574
            if command.startswith('wdiff') and self.job.contextlines == 0:
8!
575
                # remove lines that don't have any changes
576
                keeplines = []
×
577
                for line in diff.splitlines(keepends=True):
×
578
                    if any(x in line for x in {'{+', '+}', '[-', '-]'}):
×
579
                        keeplines.append(line)
×
580
                diff = ''.join(keeplines)
×
581
            diff = f'{head}\n{diff}'
8✔
582
            out_diff.update(
8✔
583
                {
584
                    'text': diff,
585
                    'markdown': diff,
586
                }
587
            )
588

589
        if report_kind == 'html':
8✔
590
            if command.startswith('wdiff'):
8!
591
                # colorize output of wdiff
592
                out_diff['html'] = self.wdiff_to_html(diff)
×
593
            else:
594
                out_diff['html'] = html.escape(diff)
8✔
595

596
        return out_diff
8✔
597

598
    def wdiff_to_html(self, diff: str) -> str:
8✔
599
        """
600
        Colorize output of wdiff.
601

602
        :param diff: The output of the wdiff command.
603
        :returns: The colorized HTML output.
604
        """
605
        html_diff = html.escape(diff)
8✔
606
        if self.job.is_markdown:
8✔
607
            # detect and fix multiline additions or deletions
608
            is_add = False
8✔
609
            is_del = False
8✔
610
            new_diff = []
8✔
611
            for line in html_diff.splitlines():
8✔
612
                if is_add:
8✔
613
                    line = '{+' + line
8✔
614
                    is_add = False
8✔
615
                elif is_del:
8✔
616
                    line = '[-' + line
8✔
617
                    is_del = False
8✔
618
                for match in re.findall(r'\[-|-]|{\+|\+}', line):
8✔
619
                    if match == '[-':
8✔
620
                        is_del = True
8✔
621
                    if match == '-]':
8✔
622
                        is_del = False
8✔
623
                    if match == '{+':
8✔
624
                        is_add = True
8✔
625
                    if match == '+}':
8✔
626
                        is_add = False
8✔
627
                if is_add:
8✔
628
                    line += '+}'
8✔
629
                elif is_del:
8✔
630
                    line += '-]'
8✔
631
                new_diff.append(line)
8✔
632
            html_diff = '<br>\n'.join(new_diff)
8✔
633

634
        # wdiff colorization (cannot be done with global CSS class as Gmail overrides it)
635
        html_diff = re.sub(
8✔
636
            r'\{\+(.*?)\+}',
637
            lambda x: f'<span style="{self.css_added_style}">{x.group(1)}</span>',
638
            html_diff,
639
            flags=re.DOTALL,
640
        )
641
        html_diff = re.sub(
8✔
642
            r'\[-(.*?)-]',
643
            lambda x: f'<span style="{self.css_deltd_style}">{x.group(1)}</span>',
644
            html_diff,
645
            flags=re.DOTALL,
646
        )
647
        if self.job.monospace:
8✔
648
            return f'<span style="font-family:monospace;white-space:pre-wrap">{html_diff}</span>'
8✔
649
        else:
650
            return html_diff
8✔
651

652

653
class DeepdiffDiffer(DifferBase):
8✔
654

655
    __kind__ = 'deepdiff'
8✔
656

657
    __supported_directives__ = {
8✔
658
        'data_type': "either 'json' (default) or 'xml'",
659
        'ignore_order': 'Whether to ignore the order in which the items have appeared (default: false)',
660
        'ignore_string_case': 'Whether to be case-sensitive or not when comparing strings (default: false)',
661
        'significant_digits': (
662
            'The number of digits AFTER the decimal point to be used in the comparison (default: ' 'no limit)'
663
        ),
664
    }
665

666
    def differ(
8✔
667
        self,
668
        directives: dict[str, Any],
669
        report_kind: Literal['text', 'markdown', 'html'],
670
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
671
        tz: Optional[str] = None,
672
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
673
        if isinstance(DeepDiff, str):  # pragma: no cover
674
            self.raise_import_error('deepdiff', DeepDiff)
675

676
        span_added = f'<span style="{self.css_added_style}">'
8✔
677
        span_deltd = f'<span style="{self.css_deltd_style}">'
8✔
678

679
        def _pretty_deepdiff(ddiff: DeepDiff, report_kind: Literal['text', 'markdown', 'html']) -> str:
8✔
680
            """
681
            Customized version of deepdiff.serialization.SerializationMixin.pretty method, edited to include the
682
            values deleted or added and an option for colorized HTML output. The pretty human-readable string
683
            output for the diff object regardless of what view was used to generate the diff.
684
            """
685
            if report_kind == 'html':
8✔
686
                PRETTY_FORM_TEXTS = {
8✔
687
                    'type_changes': (
688
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
689
                        f'from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}</span>.'
690
                    ),
691
                    'values_changed': (
692
                        f'Value of {{diff_path}} changed from {span_deltd}{{val_t1}}</span> to {span_added}{{val_t2}}'
693
                        '</span>.'
694
                    ),
695
                    'dictionary_item_added': (
696
                        f'Item {{diff_path}} added to dictionary as {span_added}{{val_t2}}</span>.'
697
                    ),
698
                    'dictionary_item_removed': (
699
                        f'Item {{diff_path}} removed from dictionary (was {span_deltd}{{val_t1}}</span>).'
700
                    ),
701
                    'iterable_item_added': f'Item {{diff_path}} added to iterable as {span_added}{{val_t2}}</span>.',
702
                    'iterable_item_removed': (
703
                        f'Item {{diff_path}} removed from iterable (was {span_deltd}{{val_t1}}</span>).'
704
                    ),
705
                    'attribute_added': f'Attribute {{diff_path}} added as {span_added}{{val_t2}}</span>.',
706
                    'attribute_removed': f'Attribute {{diff_path}} removed (was {span_deltd}{{val_t1}}</span>).',
707
                    'set_item_added': f'Item root[{{val_t2}}] added to set as {span_added}{{val_t1}}</span>.',
708
                    'set_item_removed': (
709
                        f'Item root[{{val_t1}}] removed from set (was {span_deltd}{{val_t2}}</span>).'
710
                    ),
711
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
712
                }
713
            else:
714
                PRETTY_FORM_TEXTS = {
8✔
715
                    'type_changes': (
716
                        'Type of {diff_path} changed from {type_t1} to {type_t2} and value changed '
717
                        'from {val_t1} to {val_t2}.'
718
                    ),
719
                    'values_changed': 'Value of {diff_path} changed from {val_t1} to {val_t2}.',
720
                    'dictionary_item_added': 'Item {diff_path} added to dictionary as {val_t2}.',
721
                    'dictionary_item_removed': 'Item {diff_path} removed from dictionary (was {val_t1}).',
722
                    'iterable_item_added': 'Item {diff_path} added to iterable as {val_t2}.',
723
                    'iterable_item_removed': 'Item {diff_path} removed from iterable (was {val_t1}).',
724
                    'attribute_added': 'Attribute {diff_path} added as {val_t2}.',
725
                    'attribute_removed': 'Attribute {diff_path} removed (was {val_t1}).',
726
                    'set_item_added': 'Item root[{val_t2}] added to set as {val_t1}.',
727
                    'set_item_removed': 'Item root[{val_t1}] removed from set (was {val_t2}).',
728
                    'repetition_change': 'Repetition change for item {diff_path} ({val_t2}).',
729
                }
730

731
            def _pretty_print_diff(ddiff: DiffLevel) -> str:
8✔
732
                """
733
                Customized version of deepdiff.serialization.pretty_print_diff() function, edited to include the
734
                values deleted or added.
735
                """
736
                type_t1 = type(ddiff.t1).__name__
8✔
737
                type_t2 = type(ddiff.t2).__name__
8✔
738

739
                val_t1 = (
8✔
740
                    f'"{ddiff.t1}"'
741
                    if type_t1 in {'str', 'int', 'float'}
742
                    else (jsonlib.dumps(ddiff.t1, ensure_ascii=False, indent=2) if type_t1 == 'dict' else str(ddiff.t1))
743
                )
744
                val_t2 = (
8✔
745
                    f'"{ddiff.t2}"'
746
                    if type_t2 in {'str', 'int', 'float'}
747
                    else (jsonlib.dumps(ddiff.t2, ensure_ascii=False, indent=2) if type_t2 == 'dict' else str(ddiff.t2))
748
                )
749

750
                diff_path = ddiff.path(root='')
8✔
751
                return '• ' + PRETTY_FORM_TEXTS.get(ddiff.report_type, '').format(
8✔
752
                    diff_path=diff_path,
753
                    type_t1=type_t1,
754
                    type_t2=type_t2,
755
                    val_t1=val_t1,
756
                    val_t2=val_t2,
757
                )
758

759
            result = []
8✔
760
            for key in ddiff.tree.keys():
8✔
761
                for item_key in ddiff.tree[key]:
8✔
762
                    result.append(_pretty_print_diff(item_key))
8✔
763

764
            return '\n'.join(result)
8✔
765

766
        data_type = directives.get('data_type', 'json')
8✔
767
        old_data = ''
8✔
768
        new_data = ''
8✔
769
        if data_type == 'json':
8✔
770
            try:
8✔
771
                old_data = jsonlib.loads(self.state.old_data)
8✔
772
            except jsonlib.JSONDecodeError:
8✔
773
                old_data = ''
8✔
774
            try:
8✔
775
                new_data = jsonlib.loads(self.state.new_data)
8✔
776
            except jsonlib.JSONDecodeError as e:
8✔
777
                self.state.exception = e
8✔
778
                self.state.traceback = self.job.format_error(e, traceback.format_exc())
8✔
779
                logger.error(f'{self.job.index_number}: Invalid JSON data: {e.msg} ({self.job.get_location()})')
8✔
780
                return {
8✔
781
                    'text': f'Differ {self.__kind__} ERROR: New data is invalid JSON\n{e.msg}',
782
                    'markdown': f'Differ {self.__kind__} **ERROR: New data is invalid JSON**\n{e.msg}',
783
                    'html': f'Differ {self.__kind__} <b>ERROR: New data is invalid JSON</b>\n{e.msg}',
784
                }
785
        elif data_type == 'xml':
8✔
786
            if isinstance(xmltodict, str):  # pragma: no cover
787
                self.raise_import_error('xmltodict', xmltodict)
788

789
            old_data = xmltodict.parse(self.state.old_data)
8✔
790
            new_data = xmltodict.parse(self.state.new_data)
8✔
791

792
        ignore_order = directives.get('ignore_order')
8✔
793
        ignore_string_case = directives.get('ignore_string_case')
8✔
794
        significant_digits = directives.get('significant_digits')
8✔
795
        ddiff = DeepDiff(
8✔
796
            old_data,
797
            new_data,
798
            cache_size=500,
799
            cache_purge_level=0,
800
            cache_tuning_sample_size=500,
801
            ignore_order=ignore_order,
802
            ignore_string_type_changes=True,
803
            ignore_numeric_type_changes=True,
804
            ignore_string_case=ignore_string_case,
805
            significant_digits=significant_digits,
806
            verbose_level=min(2, max(0, math.ceil(3 - logger.getEffectiveLevel() / 10))),
807
        )
808
        diff_text = _pretty_deepdiff(ddiff, report_kind)
8✔
809
        if not diff_text:
8✔
810
            self.state.verb = 'changed,no_report'
8✔
811
            return {'text': '', 'markdown': '', 'html': ''}
8✔
812

813
        self.job.set_to_monospace()
8✔
814
        if report_kind == 'html':
8✔
815
            html_diff = (
8✔
816
                f'<span style="font-family:monospace;white-space:pre-wrap;">\n'
817
                f'Differ: {self.__kind__} for {data_type}\n'
818
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>\n'
819
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>\n'
820
                + diff_text[:-1]
821
                + '</span>'
822
            )
823
            return {'html': html_diff}
8✔
824
        else:
825
            text_diff = (
8✔
826
                f'Differ: {self.__kind__} for {data_type}\n'
827
                f'--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\n'
828
                f'+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\n'
829
                f'{diff_text}'
830
            )
831
            return {'text': text_diff, 'markdown': text_diff}
8✔
832

833

834
class ImageDiffer(DifferBase):
8✔
835
    """Compares two images providing an image outlining areas that have changed."""
836

837
    __kind__ = 'image'
8✔
838

839
    __supported_directives__ = {
8✔
840
        'data_type': (
841
            "'url' (to retrieve an image), 'ascii85' (Ascii85 data), 'base64' (Base64 data) or 'filename' (the path "
842
            "to an image file) (default: 'url')"
843
        ),
844
        'mse_threshold': (
845
            'the minimum mean squared error (MSE) between two images to consider them changed if numpy in installed '
846
            '(default: 2.5)'
847
        ),
848
    }
849

850
    def differ(
8✔
851
        self,
852
        directives: dict[str, Any],
853
        report_kind: Literal['text', 'markdown', 'html'],
854
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
855
        tz: Optional[str] = None,
856
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
857
        warnings.warn(
2✔
858
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
859
            f'change in the future. Please report any problems or suggestions at '
860
            f'https://github.com/mborsetti/webchanges/discussions.',
861
            RuntimeWarning,
862
        )
863
        if isinstance(Image, str):  # pragma: no cover
864
            self.raise_import_error('pillow', Image)
865
        if isinstance(httpx, str):  # pragma: no cover
866
            self.raise_import_error('httpx', httpx)
867

868
        def load_image_from_web(url: str) -> Image:
2✔
869
            """Fetches the image from an url."""
870
            logging.debug(f'Retrieving image from {url}')
2✔
871
            with httpx.stream('GET', url, timeout=10) as response:
2✔
872
                response.raise_for_status()
2✔
873
                return Image.open(BytesIO(b''.join(response.iter_bytes())))
2✔
874

875
        def load_image_from_file(filename: str) -> Image:
2✔
876
            """Load an image from a file."""
877
            logging.debug(f'Reading image from {filename}')
2✔
878
            return Image.open(filename)
2✔
879

880
        def load_image_from_base64(base_64: str) -> Image:
2✔
881
            """Load an image from an encoded bytes object."""
882
            logging.debug('Retrieving image from a base64 string')
2✔
883
            return Image.open(BytesIO(base64.b64decode(base_64)))
2✔
884

885
        def load_image_from_ascii85(ascii85: str) -> Image:
2✔
886
            """Load an image from an encoded bytes object."""
887
            logging.debug('Retrieving image from an ascii85 string')
2✔
888
            return Image.open(BytesIO(base64.a85decode(ascii85)))
2✔
889

890
        def compute_diff_image(img1: Image, img2: Image) -> tuple[Image, Optional[np.float64]]:
2✔
891
            """Compute the difference between two images."""
892
            # Compute the absolute value of the pixel-by-pixel difference between the two images.
893
            diff_image = ImageChops.difference(img1, img2)
2✔
894

895
            # Compute the mean squared error between the images
896
            if not isinstance(np, str):
2✔
897
                diff_array = np.array(diff_image)
2✔
898
                mse_value = np.mean(np.square(diff_array))
2✔
899
            else:  # pragma: no cover
900
                mse_value = None
901

902
            # Create the diff image by overlaying this difference on a darkened greyscale background
903
            back_image = img1.convert('L')
2✔
904
            back_image_brightness = ImageStat.Stat(back_image).rms[0]
2✔
905
            back_image = ImageEnhance.Brightness(back_image).enhance(back_image_brightness / 225)
2✔
906

907
            # Convert the 'L' image to 'RGB' using a matrix that applies to yellow tint
908
            # The matrix has 12 elements: 4 for Red, 4 for Green, and 4 for Blue.
909
            # For yellow, we want Red and Green to copy the L values (1.0) and Blue to be zero.
910
            # The matrix is: [R, G, B, A] for each of the three output channels
911
            yellow_tint_matrix = (
2✔
912
                1.0,
913
                0.0,
914
                0.0,
915
                0.0,  # Red = 100% of the grayscale value
916
                1.0,
917
                0.0,
918
                0.0,
919
                0.0,  # Green = 100% of the grayscale value
920
                0.0,
921
                0.0,
922
                0.0,
923
                0.0,  # Blue = 0% of the grayscale value
924
            )
925

926
            # Apply the conversion
927
            diff_colored = diff_image.convert('RGB').convert('RGB', matrix=yellow_tint_matrix)
2✔
928

929
            final_img = ImageChops.add(back_image.convert('RGB'), diff_colored)
2✔
930

931
            return final_img, mse_value
2✔
932

933
        data_type = directives.get('data_type', 'url')
2✔
934
        mse_threshold = directives.get('mse_threshold', 2.5)
2✔
935
        if not isinstance(self.state.old_data, str):
2!
936
            raise ValueError('old_data is not a string')
×
937
        if not isinstance(self.state.new_data, str):
2!
938
            raise ValueError('new_data is not a string')
×
939
        if data_type == 'url':
2✔
940
            old_image = load_image_from_web(self.state.old_data)
2✔
941
            new_image = load_image_from_web(self.state.new_data)
2✔
942
            old_data = f' (<a href="{self.state.old_data}">Old image</a>)'
2✔
943
            new_data = f' (<a href="{self.state.new_data}">New image</a>)'
2✔
944
        elif data_type == 'ascii85':
2✔
945
            old_image = load_image_from_ascii85(self.state.old_data)
2✔
946
            new_image = load_image_from_ascii85(self.state.new_data)
2✔
947
            old_data = ''
2✔
948
            new_data = ''
2✔
949
        elif data_type == 'base64':
2✔
950
            old_image = load_image_from_base64(self.state.old_data)
2✔
951
            new_image = load_image_from_base64(self.state.new_data)
2✔
952
            old_data = ''
2✔
953
            new_data = ''
2✔
954
        else:  # 'filename'
955
            old_image = load_image_from_file(self.state.old_data)
2✔
956
            new_image = load_image_from_file(self.state.new_data)
2✔
957
            old_data = f' (<a href="file://{self.state.old_data}">Old image</a>)'
2✔
958
            new_data = f' (<a href="file://{self.state.new_data}">New image</a>)'
2✔
959

960
        # Check formats  TODO: is it needed? under which circumstances?
961
        # if new_image.format != old_image.format:
962
        #     logger.info(f'Image formats do not match: {old_image.format} vs {new_image.format}')
963
        # else:
964
        #     logger.debug(f'image format is {old_image.format}')
965

966
        # If needed, shrink the larger image
967
        if new_image.size != old_image.size:
2✔
968
            if new_image.size > old_image.size:
2✔
969
                logging.debug(f'Job {self.job.index_number}: Shrinking the new image')
2✔
970
                img_format = new_image.format
2✔
971
                new_image = new_image.resize(old_image.size, Image.LANCZOS)
2✔
972
                new_image.format = img_format
2✔
973

974
            else:
975
                logging.debug(f'Job {self.job.index_number}: Shrinking the old image')
2✔
976
                img_format = old_image.format
2✔
977
                old_image = old_image.resize(new_image.size, Image.LANCZOS)
2✔
978
                old_image.format = img_format
2✔
979

980
        if old_image == new_image:
2✔
981
            logger.info(f'Job {self.job.index_number}: New image is identical to the old one')
2✔
982
            self.state.verb = 'unchanged'
2✔
983
            return {'text': '', 'markdown': '', 'html': ''}
2✔
984

985
        diff_image, mse_value = compute_diff_image(old_image, new_image)
2✔
986
        if mse_value:
2!
987
            logger.debug(f'Job {self.job.index_number}: MSE value {mse_value:.2f}')
2✔
988

989
        if mse_value and mse_value < mse_threshold:
2✔
990
            logger.info(
2✔
991
                f'Job {self.job.index_number}: MSE value {mse_value:.2f} below the threshold of {mse_threshold}; '
992
                f'considering changes not worthy of a report'
993
            )
994
            self.state.verb = 'changed,no_report'
2✔
995
            return {'text': '', 'markdown': '', 'html': ''}
2✔
996

997
        # Convert the difference image to a base64 object
998
        output_stream = BytesIO()
2✔
999
        diff_image.save(output_stream, format=new_image.format)
2✔
1000
        encoded_diff = b64encode(output_stream.getvalue()).decode()
2✔
1001

1002
        # Convert the new image to a base64 object
1003
        output_stream = BytesIO()
2✔
1004
        new_image.save(output_stream, format=new_image.format)
2✔
1005
        encoded_new = b64encode(output_stream.getvalue()).decode()
2✔
1006

1007
        # Prepare HTML output
1008
        htm = [
2✔
1009
            f'<span style="font-family:monospace">Differ: {self.__kind__} for {data_type}',
1010
            f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}{old_data}</span>',
1011
            f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}{new_data}'
1012
            f'</span>',
1013
            'New image:',
1014
            f'<img src="data:image/{new_image.format.lower()};base64,{encoded_new}">',
1015
            'Differences from old (in yellow):',
1016
            f'<img src="data:image/{old_image.format.lower()};base64,{encoded_diff}">',
1017
            '',
1018
        ]
1019

1020
        return {
2✔
1021
            'text': 'The image has changed; please see an HTML report for the visualization.',
1022
            'markdown': 'The image has changed; please see an HTML report for the visualization.',
1023
            'html': '<br>\n'.join(htm),
1024
        }
1025

1026

1027
class AIGoogleDiffer(DifferBase):
8✔
1028
    """(Default) Generates a summary using Google Generative AI (Gemini models).
1029

1030
    Calls Google Gemini APIs; documentation at https://ai.google.dev/api/rest and tutorial at
1031
    https://ai.google.dev/tutorials/rest_quickstart
1032

1033
    """
1034

1035
    __kind__ = 'ai_google'
8✔
1036

1037
    __supported_directives__ = {
8✔
1038
        'model': (
1039
            'model name from https://ai.google.dev/gemini-api/docs/models/gemini (default: gemini-1.5-flash-latest)'
1040
        ),
1041
        'prompt': 'a custom prompt - {unified_diff}, {old_data} and {new_data} will be replaced; ask for markdown',
1042
        'system_instructions': 'Optional tone and style instructions for the model (default: Respond in Markdown)',
1043
        'prompt_ud_context_lines': 'the number of context lines for {unified_diff} (default: 9999)',
1044
        'timeout': 'the number of seconds before timing out the API call (default: 300)',
1045
        'max_output_tokens': "the maximum number of tokens returned by the model (default: None, i.e. model's default)",
1046
        'temperature': "the model's Temperature parameter (default: 0.0)",
1047
        'top_p': "the model's TopP parameter (default: None, i.e. model's default",
1048
        'top_k': "the model's TopK parameter (default: None, i.e. model's default",
1049
        'token_limit': (
1050
            "the maximum number of tokens, if different from model's default (default: None, i.e. model's default)"
1051
        ),
1052
    }
1053
    __default_subdirective__ = 'model'
8✔
1054

1055
    def differ(
8✔
1056
        self,
1057
        directives: dict[str, Any],
1058
        report_kind: Literal['text', 'markdown', 'html'],
1059
        _unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] | None = None,
1060
        tz: str | None = None,
1061
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1062
        logger.info(f'Job {self.job.index_number}: Running the {self.__kind__} differ from hooks.py')
8✔
1063
        warnings.warn(
8✔
1064
            f'Job {self.job.index_number}: Using differ {self.__kind__}, which is BETA, may have bugs, and may '
1065
            f'change in the future. Please report any problems or suggestions at '
1066
            f'https://github.com/mborsetti/webchanges/discussions.',
1067
            RuntimeWarning,
1068
        )
1069

1070
        def get_ai_summary(prompt: str, system_instructions: str) -> str:
8✔
1071
            """Generate AI summary from unified diff, or an error message"""
1072
            GOOGLE_AI_API_KEY = os.environ.get('GOOGLE_AI_API_KEY', '').rstrip()
8✔
1073
            if len(GOOGLE_AI_API_KEY) != 39:
8✔
1074
                logger.error(
8✔
1075
                    f'Job {self.job.index_number}: Environment variable GOOGLE_AI_API_KEY not found or is of the '
1076
                    f'incorrect length {len(GOOGLE_AI_API_KEY)} ({self.job.get_location()})'
1077
                )
1078
                return (
8✔
1079
                    f'## ERROR in summarizing the changes using {self.__kind__}:\n'
1080
                    f'Environment variable GOOGLE_AI_API_KEY not found or is of the incorrect length '
1081
                    f'{len(GOOGLE_AI_API_KEY)}.\n'
1082
                )
1083

1084
            _models_token_limits = {  # from https://ai.google.dev/gemini-api/docs/models/gemini
8✔
1085
                'gemini-1.5-pro-2m': 2097152,
1086
                'gemini-1.5': 1048576,
1087
                'gemini-1.0': 30720,
1088
                'gemini-pro': 30720,  # legacy
1089
            }
1090
            if 'model' not in directives:
8!
1091
                directives['model'] = 'gemini-1.5-flash-latest'  # also for footer
×
1092
            model = directives['model']
8✔
1093
            token_limit = directives.get('token_limit')
8✔
1094
            if not token_limit:
8✔
1095
                for _model, _token_limit in _models_token_limits.items():
8!
1096
                    if model.startswith(_model):
8✔
1097
                        token_limit = _token_limit
8✔
1098
                        break
8✔
1099
                if not token_limit:
8!
1100
                    logger.error(
×
1101
                        f"Job {self.job.index_number}: Differ '{self.__kind__}' does not know `model: {model}` "
1102
                        f"(supported models starting with: {', '.join(sorted(list(_models_token_limits.keys())))}) "
1103
                        f'({self.job.get_location()})'
1104
                    )
1105
                    return f'## ERROR in summarizing the changes using {self.__kind__}:\n' f'Unknown model {model}.\n'
×
1106

1107
            if '{unified_diff}' in prompt:
8✔
1108
                context_lines = directives.get('prompt_ud_context_lines', 9999)
8✔
1109
                unified_diff = '\n'.join(
8✔
1110
                    difflib.unified_diff(
1111
                        str(self.state.old_data).splitlines(),
1112
                        str(self.state.new_data).splitlines(),
1113
                        # '@',
1114
                        # '@',
1115
                        # self.make_timestamp(self.state.old_timestamp, tz),
1116
                        # self.make_timestamp(self.state.new_timestamp, tz),
1117
                        n=context_lines,
1118
                    )
1119
                )
1120
                if not unified_diff:
8!
1121
                    # no changes
1122
                    return ''
×
1123
            else:
1124
                unified_diff = ''
8✔
1125

1126
            def _send_to_model(model_prompt: str, system_instructions: str) -> str:
8✔
1127
                """Creates the summary request to the model"""
1128
                api_version = '1beta'
×
1129
                max_output_tokens = directives.get('max_output_tokens')
×
1130
                temperature = directives.get('temperature', 0.0)
×
1131
                top_p = directives.get('top_p')
×
1132
                top_k = directives.get('top_k')
×
1133
                data = {
×
1134
                    'system_instruction': {'parts': [{'text': system_instructions}]},
1135
                    'contents': [{'parts': [{'text': model_prompt}]}],
1136
                    'generation_config': {
1137
                        'max_output_tokens': max_output_tokens,
1138
                        'temperature': temperature,
1139
                        'top_p': top_p,
1140
                        'top_k': top_k,
1141
                    },
1142
                }
1143
                logger.info(f'Job {self.job.index_number}: Making summary request to Google model {model}')
×
1144
                try:
×
1145
                    timeout = directives.get('timeout', 300)
×
1146
                    r = httpx.Client(http2=True).post(
×
1147
                        f'https://generativelanguage.googleapis.com/v{api_version}/models/{model}:generateContent?'
1148
                        f'key={GOOGLE_AI_API_KEY}',
1149
                        json=data,
1150
                        headers={'Content-Type': 'application/json'},
1151
                        timeout=timeout,
1152
                    )
1153
                    if r.is_success:
×
1154
                        result = r.json()
×
1155
                        candidate = result['candidates'][0]
×
1156
                        logger.info(
×
1157
                            f"Job {self.job.index_number}: AI generation finished by {candidate['finishReason']}"
1158
                        )
1159
                        summary = candidate['content']['parts'][0]['text']
×
1160
                    elif r.status_code == 400:
×
1161
                        summary = (
×
1162
                            f'AI summary unavailable: Received error from {r.url.host}: '
1163
                            f"{r.json().get('error', {}).get('message') or ''}"
1164
                        )
1165
                    else:
1166
                        summary = (
×
1167
                            f'AI summary unavailable: Received error {r.status_code} {r.reason_phrase} from '
1168
                            f'{r.url.host}'
1169
                        )
1170
                        if r.content:
×
1171
                            summary += f": {r.json().get('error', {}).get('message') or ''}"
×
1172

1173
                except httpx.HTTPError as e:
×
1174
                    summary = (
×
1175
                        f'AI summary unavailable: HTTP client error: {e.args[0]} when requesting data from '
1176
                        f'{e.request.url.host}'
1177
                    )
1178

1179
                return summary
×
1180

1181
            # check if data is different (for testing)
1182
            if '{old_data}' in prompt and '{new_data}' in prompt and self.state.old_data == self.state.new_data:
8✔
1183
                return ''
8✔
1184

1185
            model_prompt = prompt.format(
8✔
1186
                unified_diff=unified_diff, old_data=self.state.old_data, new_data=self.state.new_data
1187
            )
1188

1189
            if len(model_prompt) / 4 < token_limit:
8!
1190
                summary = _send_to_model(model_prompt, system_instructions)
×
1191
            elif '{unified_diff}' in prompt:
8!
1192
                logger.info(
8✔
1193
                    f'Job {self.job.index_number}: Model prompt with full diff is too long: '
1194
                    f'({len(model_prompt) / 4:,.0f} est. tokens exceeds limit of {token_limit:,.0f} tokens); '
1195
                    f'recomputing with default contextlines'
1196
                )
1197
                unified_diff = '\n'.join(
8✔
1198
                    difflib.unified_diff(
1199
                        str(self.state.old_data).splitlines(),
1200
                        str(self.state.new_data).splitlines(),
1201
                        # '@',
1202
                        # '@',
1203
                        # self.make_timestamp(self.state.old_timestamp, tz),
1204
                        # self.make_timestamp(self.state.new_timestamp, tz),
1205
                    )
1206
                )
1207
                model_prompt = prompt.format(
8✔
1208
                    unified_diff=unified_diff, old_data=self.state.old_data, new_data=self.state.new_data
1209
                )
1210
                if len(model_prompt) / 4 < token_limit:
8!
1211
                    summary = _send_to_model(model_prompt, system_instructions)
×
1212
                else:
1213
                    summary = (
8✔
1214
                        f'AI summary unavailable (model prompt with unified diff is too long: '
1215
                        f'{len(model_prompt) / 4:,.0f} est. tokens exceeds maximum of {token_limit:,.0f})'
1216
                    )
1217
            else:
1218
                logger.info(
×
1219
                    f'The model prompt may be too long: {len(model_prompt) / 4:,.0f} est. tokens exceeds '
1220
                    f'limit of {token_limit:,.0f} tokens'
1221
                )
1222
                summary = _send_to_model(model_prompt, system_instructions)
×
1223
            return summary
8✔
1224

1225
        prompt = directives.get(
8✔
1226
            'prompt',
1227
            'Identify the changes between the old document (enclosed by an <old> tag) and the new document ('
1228
            'enclosed by a <new> tag) and output a summary of such changes:\n\n<old>\n{old_data}\n</old>\n\n<new>\n'
1229
            '{new_data}\n</new>',
1230
        ).replace('\\n', '\n')
1231
        system_instructions = directives.get('system_instructions', 'Respond in Markdown')
8✔
1232
        summary = get_ai_summary(prompt, system_instructions)
8✔
1233
        if not summary:
8✔
1234
            self.state.verb = 'changed,no_report'
8✔
1235
            return {'text': '', 'markdown': '', 'html': ''}
8✔
1236
        newline = '\n'  # For Python < 3.12 f-string compatibility
8✔
1237
        back_n = '\\n'  # For Python < 3.12 f-string compatibility
8✔
1238
        directives_text = (
8✔
1239
            ', '.join(f'{key}={str(value).replace(newline, back_n)}' for key, value in directives.items()) or 'None'
1240
        )
1241
        footer = f'Summary generated by Google Generative AI (differ directive(s): {directives_text})'
8✔
1242
        temp_unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
1243
        for rep_kind in ['text', 'html']:  # markdown is same as text
8✔
1244
            unified_report = DifferBase.process(
8✔
1245
                'unified',
1246
                directives.get('unified') or {},  # type: ignore[arg-type]
1247
                self.state,
1248
                rep_kind,  # type: ignore[arg-type]
1249
                tz,
1250
                temp_unfiltered_diff,
1251
            )
1252
        return {
8✔
1253
            'text': summary + '\n\n' + unified_report['text'] + '\n------------\n' + footer,
1254
            'markdown': summary + '\n\n' + unified_report['markdown'] + '\n* * *\n' + footer,
1255
            'html': (
1256
                mark_to_html(summary, extras={'tables'}).replace('<h2>', '<h3>').replace('</h2>', '</h3>')
1257
                + '<br>'
1258
                + unified_report['html']
1259
                + '-----<br>'
1260
                + f'<i><small>{footer}</small></i>'
1261
            ),
1262
        }
1263

1264

1265
class WdiffDiffer(DifferBase):
8✔
1266
    __kind__ = 'wdiff'
8✔
1267

1268
    __supported_directives__: dict[str, str] = {
8✔
1269
        'context_lines': 'the number of context lines (default: 3)',
1270
        'range_info': 'include range information lines (default: true)',
1271
    }
1272

1273
    def differ(
8✔
1274
        self,
1275
        directives: dict[str, Any],
1276
        report_kind: Literal['text', 'markdown', 'html'],
1277
        _unfiltered_diff: Optional[dict[Literal['text', 'markdown', 'html'], str]] = None,
1278
        tz: Optional[str] = None,
1279
    ) -> dict[Literal['text', 'markdown', 'html'], str]:
1280
        warnings.warn(
8✔
1281
            f'Job {self.job.index_number}: Differ {self.__kind__} is WORK IN PROGRESS and has KNOWN bugs which '
1282
            "are being worked on. DO NOT USE AS THE RESULTS WON'T BE CORRECT.",
1283
            RuntimeWarning,
1284
        )
1285
        if not isinstance(self.state.old_data, str):
8!
1286
            raise ValueError
×
1287
        if not isinstance(self.state.new_data, str):
8!
1288
            raise ValueError
×
1289

1290
        # Split the texts into words tokenizing newline
1291
        if self.job.is_markdown:
8!
1292
            # Don't split spaces in link text, tokenize space as </s>
1293
            old_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.old_data)
×
1294
            words1 = old_data.replace('\n', ' <\\n> ').split(' ')
×
1295
            new_data = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace(' ', '</s>') + ']', self.state.new_data)
×
1296
            words2 = new_data.replace('\n', ' <\\n> ').split(' ')
×
1297
        else:
1298
            words1 = self.state.old_data.replace('\n', ' <\\n> ').split(' ')
8✔
1299
            words2 = self.state.new_data.replace('\n', ' <\\n> ').split(' ')
8✔
1300

1301
        # Create a Differ object
1302
        import difflib
8✔
1303

1304
        d = difflib.Differ()
8✔
1305

1306
        # Generate a difference list
1307
        diff = list(d.compare(words1, words2))
8✔
1308

1309
        add_html = '<span style="background-color:#d1ffd1;color:#082b08;">'
8✔
1310
        rem_html = '<span style="background-color:#fff0f0;color:#9c1c1c;text-decoration:line-through;">'
8✔
1311

1312
        head_text = (
8✔
1313
            f'Differ: wdiff\n\033[91m--- @ {self.make_timestamp(self.state.old_timestamp, tz)}\033[0m\n'
1314
            f'\033[92m+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}\033[0m\n'
1315
        )
1316
        head_html = '<br>\n'.join(
8✔
1317
            [
1318
                '<span style="font-family:monospace;">Differ: wdiff',
1319
                f'<span style="color:darkred;">--- @ {self.make_timestamp(self.state.old_timestamp, tz)}</span>',
1320
                f'<span style="color:darkgreen;">+++ @ {self.make_timestamp(self.state.new_timestamp, tz)}</span>',
1321
                '',
1322
            ]
1323
        )
1324
        # Process the diff output to make it more wdiff-like
1325
        result_text = []
8✔
1326
        result_html = []
8✔
1327
        prev_word_text = ''
8✔
1328
        prev_word_html = ''
8✔
1329
        next_text = ''
8✔
1330
        next_html = ''
8✔
1331
        add = False
8✔
1332
        rem = False
8✔
1333

1334
        for word_text in diff + ['  ']:
8✔
1335
            word_html = word_text
8✔
1336
            pre_text = [next_text] if next_text else []
8✔
1337
            pre_html = [next_html] if next_html else []
8✔
1338
            next_text = ''
8✔
1339
            next_html = ''
8✔
1340

1341
            if word_text[0] == '+' and not add:  # Beginning of additions
8✔
1342
                if rem:
8✔
1343
                    prev_word_html += '</span>'
8✔
1344
                    rem = False
8✔
1345
                if word_text[2:] == '<\\n>':
8!
1346
                    next_text = '\033[92m'
×
1347
                    next_html = add_html
×
1348
                else:
1349
                    pre_text.append('\033[92m')
8✔
1350
                    pre_html.append(add_html)
8✔
1351
                add = True
8✔
1352
            elif word_text[0] == '-' and not rem:  # Beginning of deletions
8✔
1353
                if add:
8✔
1354
                    prev_word_html += '</span>'
8✔
1355
                    add = False
8✔
1356
                if word_text[2:] == '<\\n>':
8!
1357
                    next_text = '\033[91m'
×
1358
                    next_html = rem_html
×
1359
                else:
1360
                    pre_text.append('\033[91m')
8✔
1361
                    pre_html.append(rem_html)
8✔
1362
                rem = True
8✔
1363
            elif word_text[0] == ' ' and (add or rem):  # Unchanged word
8✔
1364
                if prev_word_text == '<\\n>':
8!
1365
                    prev_word_text = '\033[0m<\\n>'
×
1366
                    prev_word_html = '</span><\\n>'
×
1367
                else:
1368
                    prev_word_text += '\033[0m'
8✔
1369
                    prev_word_html += '</span>'
8✔
1370
                add = False
8✔
1371
                rem = False
8✔
1372
            elif word_text[2:] == '<\\n>':  # New line
8✔
1373
                if add:
8!
1374
                    word_text = '  \033[0m<\\n>'
×
1375
                    word_html = '  </span><\\n>'
×
1376
                    add = False
×
1377
                elif rem:
8!
1378
                    word_text = '  \033[0m<\\n>'
×
1379
                    word_html = '  </span><\\n>'
×
1380
                    rem = False
×
1381

1382
            result_text.append(prev_word_text)
8✔
1383
            result_html.append(prev_word_html)
8✔
1384
            pre_text.append(word_text[2:])
8✔
1385
            pre_html.append(word_html[2:])
8✔
1386
            prev_word_text = ''.join(pre_text)
8✔
1387
            prev_word_html = ''.join(pre_html)
8✔
1388
        if add or rem:
8!
1389
            result_text[-1] += '\033[0m'
×
1390
            result_html[-1] += '</span>'
×
1391

1392
        # rebuild the text from words, replacing the newline token
1393
        diff_text = ' '.join(result_text[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1394
        diff_html = ' '.join(result_html[1:]).replace('<\\n> ', '\n').replace('<\\n>', '\n')
8✔
1395

1396
        # build contextlines
1397
        contextlines = directives.get('context_lines', self.job.contextlines)
8✔
1398
        # contextlines = 999
1399
        if contextlines is None:
8!
1400
            contextlines = 3
8✔
1401
        range_info = directives.get('range_info', True)
8✔
1402
        if contextlines < len(diff_text.splitlines()):
8!
1403
            lines_with_changes = []
×
1404
            for i, line in enumerate(diff_text.splitlines()):
×
1405
                if '\033[9' in line:
×
1406
                    lines_with_changes.append(i)
×
1407
            if contextlines:
×
1408
                lines_to_keep: set[int] = set()
×
1409
                for i in lines_with_changes:
×
1410
                    lines_to_keep.update(r for r in range(i - contextlines, i + contextlines + 1))
×
1411
            else:
1412
                lines_to_keep = set(lines_with_changes)
×
1413
            new_diff_text = []
×
1414
            new_diff_html = []
×
1415
            last_line = 0
×
1416
            skip = False
×
1417
            for i, (line_text, line_html) in enumerate(zip(diff_text.splitlines(), diff_html.splitlines())):
×
1418
                if i in lines_to_keep:
×
1419
                    if range_info and skip:
×
1420
                        new_diff_text.append(f'@@ {last_line}...{i + 1} @@')
×
1421
                        new_diff_html.append(f'@@ {last_line}...{i + 1} @@')
×
1422
                        skip = False
×
1423
                    new_diff_text.append(line_text)
×
1424
                    new_diff_html.append(line_html)
×
1425
                    last_line = i + 1
×
1426
                else:
1427
                    skip = True
×
1428
            diff_text = '\n'.join(new_diff_text)
×
1429
            diff_html = '\n'.join(new_diff_html)
×
1430

1431
        if self.job.is_markdown:
8!
1432
            diff_text = diff_text.replace('</s>', ' ')
×
1433
            diff_html = diff_html.replace('</s>', ' ')
×
1434
            diff_html = mark_to_html(diff_html, self.job.markdown_padded_tables).replace('<p>', '').replace('</p>', '')
×
1435

1436
        if self.job.monospace:
8!
1437
            diff_html = f'<span style="font-family:monospace;white-space:pre-wrap">{diff_html}</span>'
×
1438
        else:
1439
            diff_html = diff_html.replace('\n', '<br>\n')
8✔
1440

1441
        return {
8✔
1442
            'text': head_text + diff_text,
1443
            'markdown': head_text + diff_text,
1444
            'html': head_html + diff_html,
1445
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc