• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 16548352850

27 Jul 2025 06:53AM UTC coverage: 74.68% (-0.4%) from 75.068%
16548352850

push

github

mborsetti
Version 3.31.0rc0

1799 of 2750 branches covered (65.42%)

Branch coverage included in aggregate %.

4 of 4 new or added lines in 1 file covered. (100.0%)

799 existing lines in 8 files now uncovered.

4669 of 5911 relevant lines covered (78.99%)

6.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.39
/webchanges/handler.py
1
"""Handles the running of jobs and, afterward, of the reports."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import logging
8✔
8
import subprocess  # noqa: S404 Consider possible security implications
8✔
9
import sys
8✔
10
import time
8✔
11
import traceback
8✔
12
from concurrent.futures import Future
8✔
13
from pathlib import Path
8✔
14
from types import TracebackType
8✔
15
from typing import Any, ContextManager, Iterator, Literal, NamedTuple, TYPE_CHECKING, TypedDict
8✔
16
from zoneinfo import ZoneInfo
8✔
17

18
from webchanges.differs import DifferBase
8✔
19
from webchanges.filters import FilterBase
8✔
20
from webchanges.jobs import NotModifiedError
8✔
21
from webchanges.reporters import ReporterBase
8✔
22

23
# https://stackoverflow.com/questions/39740632
24
if TYPE_CHECKING:
25
    from webchanges.jobs import JobBase
26
    from webchanges.main import Urlwatch
27
    from webchanges.storage import _Config, _ConfigDifferDefaults, SsdbStorage
28

29
logger = logging.getLogger(__name__)
8✔
30

31

32
class Snapshot(NamedTuple):
8✔
33
    """Type for Snapshot named tuple.
34

35
    * 0: data: str | bytes
36
    * 1: timestamp: float
37
    * 2: tries: int
38
    * 3: etag: str
39
    * 4: mime_type: mime_type
40
    * 5: error: ErrorData
41
    """
42

43
    data: str | bytes
8✔
44
    timestamp: float
8✔
45
    tries: int
8✔
46
    etag: str
8✔
47
    mime_type: str
8✔
48
    error_data: ErrorData
8✔
49

50

51
Verb = Literal[
8✔
52
    'new',  # new job
53
    'changed',  # valid data received, and it has changed
54
    'changed,no_report',  # valid data received, and it has changed, but filtered diff yields no report
55
    'unchanged',  # valid data received, no changes
56
    'error_ended',  # valid data received, no changes from the last data received before an error
57
    'error',  # error, prior state was different (either data or different error)
58
    'repeated_error',  # error, same as before
59
]
60
ErrorData = TypedDict('ErrorData', {'type': str, 'message': str}, total=False)
8✔
61

62

63
class JobState(ContextManager):
8✔
64
    """The JobState class, which contains run information about a job."""
65

66
    _http_client_used: str | None = None
8✔
67
    error_ignored: bool | str
8✔
68
    exception: Exception | None = None
8✔
69
    generated_diff: dict[Literal['text', 'markdown', 'html'], str]
8✔
70
    history_dic_snapshots: dict[str | bytes, Snapshot]
8✔
71
    new_data: str | bytes = ''
8✔
72
    new_error_data: ErrorData = {}
8✔
73
    new_etag: str = ''
8✔
74
    new_mime_type: str = ''
8✔
75
    new_timestamp: float
8✔
76
    old_snapshot = Snapshot(
8✔
77
        data='',
78
        timestamp=1605147837.511478,  # initialized to the first release of webchanges!
79
        tries=0,
80
        etag='',
81
        mime_type='text/plain',
82
        error_data={},
83
    )
84
    old_data: str | bytes = ''
8✔
85
    old_error_data: ErrorData = {}
8✔
86
    old_etag: str = ''
8✔
87
    old_mime_type: str = 'text/plain'
8✔
88
    old_timestamp: float = 1605147837.511478  # initialized to the first release of webchanges!
8✔
89
    traceback: str
8✔
90
    tries: int = 0  # if >1, an error; value is the consecutive number of runs leading to an error
8✔
91
    unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str]
8✔
92
    verb: Verb
8✔
93

94
    def __init__(self, snapshots_db: SsdbStorage, job: JobBase) -> None:
8✔
95
        """
96
        Initializes the class
97

98
        :param snapshots_db: The SsdbStorage object with the snapshot database methods.
99
        :param job: A JobBase object with the job information.
100
        """
101
        self.snapshots_db = snapshots_db
8✔
102
        self.job = job
8✔
103

104
        self.generated_diff = {}
8✔
105
        self.unfiltered_diff = {}
8✔
106
        self.history_dic_snapshots = {}
8✔
107

108
    def __enter__(self) -> 'JobState':
8✔
109
        """Context manager invoked on entry to the body of a with statement to make it possible to factor out standard
110
        uses of try/finally statements. Calls the main_thread_enter method of the Job.
111

112
        :returns: Class object.
113
        """
114
        # Below is legacy code that now does nothing, so it's being skipped
115
        # try:
116
        #     self.job.main_thread_enter()
117
        # except Exception as e:
118
        #     logger.info(f'Job {self.job.index_number}: Exception while creating resources for job', exc_info=True)
119
        #     self.exception = e
120
        #     self.traceback = self.job.format_error(e, traceback.format_exc())
121

122
        return self
8✔
123

124
    def __exit__(
8✔
125
        self,
126
        exc_type: type[BaseException] | None,
127
        exc_value: BaseException | None,
128
        tb: TracebackType | None,
129
    ) -> bool | None:
130
        """Context manager invoked on exit from the body of a with statement to make it possible to factor out standard
131
        uses of try/finally statements. Calls the main_thread_exit() method of the Job.
132

133
        :returns: None.
134
        """
135
        # Below is legacy code that now does nothing, so it's being skipped
136
        # try:
137
        #     self.job.main_thread_exit()
138
        # except Exception:
139
        #     # We don't want exceptions from releasing resources to override job run results
140
        #     logger.warning(f'Job {self.index_number}: Exception while releasing resources for job', exc_info=True)
141
        # if isinstance(exc_value, subprocess.CalledProcessError):
142
        #     raise subprocess.SubprocessError(exc_value.stderr)
143
        # elif isinstance(exc_value, FileNotFoundError):
144
        #     raise OSError(exc_value)
145
        return None
8✔
146

147
    @staticmethod
8✔
148
    def debugger_attached() -> bool:
8✔
149
        """Checks if the code is currently running within an external debugger (e.g. IDE).
150

151
        :returns: True if an external debugger is attached, False otherwise.
152
        """
153
        return sys.breakpointhook.__module__ != 'sys'
8✔
154

155
    def added_data(self) -> dict[str, bool | str | Exception | float | None]:
8✔
156
        """Returns a dict with the data added in the processing of the job."""
157
        attrs = ('error_ignored', 'exception', 'new_data', 'new_etag', 'new_timestamp')
8✔
158
        return {attr: getattr(self, attr) for attr in attrs if hasattr(self, attr)}
8!
159

160
    def load(self) -> None:
8✔
161
        """Loads form the database the last snapshot(s) for the job."""
162
        guid = self.job.guid
8✔
163
        self.old_snapshot = self.snapshots_db.load(guid)
8✔
164
        # TODO: remove these
165
        (
8✔
166
            self.old_data,
167
            self.old_timestamp,
168
            self.tries,
169
            self.old_etag,
170
            self.old_mime_type,
171
            self.old_error_data,
172
        ) = self.old_snapshot
173
        if self.job.compared_versions and self.job.compared_versions > 1:
8!
174
            self.history_dic_snapshots = {
×
175
                s.data: s for s in self.snapshots_db.get_history_snapshots(guid, self.job.compared_versions)
176
            }
177

178
    def save(self) -> None:
8✔
179
        """Saves new data retrieved by the job into the snapshot database.
180

181
        :param use_old_data: Whether old data (and ETag) should be used (e.g. due to error, leading to new data or
182
           data being an error message instead of the relevant data).
183
        """
184
        if self.new_error_data:  # have encountered an exception, so save the old data
8✔
185
            new_snapshot = Snapshot(
8✔
186
                data=self.old_data,
187
                timestamp=self.new_timestamp,
188
                tries=self.tries,
189
                etag=self.old_etag,
190
                mime_type=self.old_mime_type,
191
                error_data=self.new_error_data,
192
            )
193
        else:
194
            new_snapshot = Snapshot(
8✔
195
                data=self.new_data,
196
                timestamp=self.new_timestamp,
197
                tries=self.tries,
198
                etag=self.new_etag,
199
                mime_type=self.new_mime_type,
200
                error_data=self.new_error_data,
201
            )
202
        self.snapshots_db.save(guid=self.job.guid, snapshot=new_snapshot)
8✔
203
        logger.info(f'Job {self.job.index_number}: Saved new data to database')
8✔
204

205
    def delete_latest(self, temporary: bool = True) -> None:
8✔
206
        """Removes the last instance in the snapshot database."""
207
        self.snapshots_db.delete_latest(guid=self.job.guid, temporary=temporary)
8✔
208

209
    def process(self, headless: bool = True) -> JobState:
8✔
210
        """Processes the job: loads it (i.e. runs it) and handles Exceptions (errors).
211

212
        :returns: a JobState object containing information of the job run.
213
        """
214
        logger.info(f'{self.job.get_indexed_location()} started processing ({type(self.job).__name__})')
8✔
215
        logger.debug(f'Job {self.job.index_number}: {self.job}')
8✔
216

217
        if self.exception and not isinstance(self.exception, NotModifiedError):
8!
218
            self.new_timestamp = time.time()
×
219
            self.new_error_data = {
×
220
                'type': type(self.exception).__name__,
221
                'message': str(self.exception),
222
            }
223
            logger.info(f'{self.job.get_indexed_location()} ended processing due to exception: {self.exception}')
×
224
            return self
×
225

226
        try:
8✔
227
            try:
8✔
228
                self.load()
8✔
229

230
                self.new_timestamp = time.time()
8✔
231
                data, self.new_etag, mime_type = self.job.retrieve(self, headless)
8✔
232
                logger.debug(
8✔
233
                    f'Job {self.job.index_number}: Retrieved data '
234
                    f'{dict(data=data, etag=self.new_etag, mime_type=mime_type)}'
235
                )
236

237
            except Exception as e:
8✔
238
                # Job has a chance to format and ignore its error
239
                if self.debugger_attached():
8!
240
                    logger.warning('Running in a debugger: raising the exception instead of processing it')
×
241
                    raise
×
242
                self.new_timestamp = time.time()
8✔
243
                self.error_ignored = self.job.ignore_error(e)
8✔
244
                if not (self.error_ignored or isinstance(e, NotModifiedError)):
8✔
245
                    self.exception = e
8✔
246
                    self.traceback = self.job.format_error(e, traceback.format_exc())
8✔
247
                    self.tries += 1
8✔
248
                    self.new_error_data = {
8✔
249
                        'type': e.__class__.__name__,
250
                        'message': str(e),
251
                    }
252
                    logger.info(
8✔
253
                        f'Job {self.job.index_number}: Job ended with error; incrementing cumulative error runs to '
254
                        f'{self.tries}'
255
                    )
256

257
            else:
258
                # Apply automatic filters first
259
                filtered_data, mime_type = FilterBase.auto_process(self, data, mime_type)
8✔
260

261
                # Apply any specified filters
262
                for filter_kind, subfilter in FilterBase.normalize_filter_list(self.job.filters, self.job.index_number):
8!
263
                    filtered_data, mime_type = FilterBase.process(
×
264
                        filter_kind, subfilter, self, filtered_data, mime_type
265
                    )
266

267
                self.new_data = filtered_data
8✔
268
                self.new_mime_type = mime_type
8✔
269

270
        except Exception as e:
×
271
            # Processing error or job failed its chance to handle error
272
            if self.debugger_attached():
×
273
                logger.warning('Running in a debugger: raising the exception instead of processing it')
×
274
                raise
×
275
            self.new_timestamp = time.time()
×
276
            self.exception = e
×
277
            if isinstance(e, subprocess.CalledProcessError):
×
278
                self.traceback = (
×
279
                    f'subprocess.CalledProcessError: Command returned non-zero exit status {e.returncode}.\n\n'
280
                    f'{e.stderr}'
281
                )
282
            else:
UNCOV
283
                self.traceback = ''.join(traceback.format_exception_only(e, show_group=True)).rstrip()
×
284
            self.error_ignored = False
×
UNCOV
285
            self.tries += 1
×
UNCOV
286
            self.new_error_data = {
×
287
                'type': '.'.join(filter(None, [getattr(e, '__module__', None), e.__class__.__name__])),
288
                'message': str(e),
289
            }
UNCOV
290
            logger.info(
×
291
                f'Job {self.job.index_number}: Job ended with error (internal handling failed); incrementing '
292
                f'cumulative error runs to {self.tries}'
293
            )
294

295
        logger.debug(f'Job {self.job.index_number}: Processed as {self.added_data()}')
8✔
296
        logger.info(f'{self.job.get_indexed_location()} ended processing')
8✔
297
        return self
8✔
298

299
    def get_diff(
8✔
300
        self,
301
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
302
        differ: dict[str, Any] | None = None,
303
        differ_defaults: _ConfigDifferDefaults | None = None,
304
        tz: ZoneInfo | None = None,
305
    ) -> str:
306
        """Generates the job's diff and applies diff_filters to it (if any). Memoized.
307

308
        :parameter report_kind: the kind of report that needs the differ.
309
        :parameter differ: the name of the differ to override self.job.differ.
310
        :parameter tz: The IANA tz_info name of the timezone to use for diff in the job's report (e.g. 'Etc/UTC').
311
        :returns: The job's diff.
312
        """
313
        if report_kind in self.generated_diff:
8✔
314
            return self.generated_diff[report_kind]
8✔
315

316
        if report_kind not in self.unfiltered_diff:
8✔
317
            differ_kind, subdiffer = DifferBase.normalize_differ(
8✔
318
                differ or self.job.differ,
319
                self.job.index_number,
320
                differ_defaults,
321
            )
322
            unfiltered_diff = DifferBase.process(differ_kind, subdiffer, self, report_kind, tz, self.unfiltered_diff)
8✔
323
            self.unfiltered_diff.update(unfiltered_diff)
8✔
324
        _generated_diff = self.unfiltered_diff[report_kind]
8✔
325
        if _generated_diff:
8✔
326
            # Apply any specified diff_filters
327
            _mime_type = 'text/plain'
8✔
328
            for filter_kind, subfilter in FilterBase.normalize_filter_list(
8✔
329
                self.job.diff_filters, self.job.index_number
330
            ):
331
                _generated_diff, _mime_type = FilterBase.process(  # type: ignore[assignment]
8✔
332
                    filter_kind, subfilter, self, _generated_diff, _mime_type
333
                )
334
        self.generated_diff[report_kind] = str(_generated_diff)
8✔
335

336
        return self.generated_diff[report_kind]
8✔
337

338
    def is_markdown(self) -> bool:
8✔
339
        """Returns whether the new data is in markdown."""
340
        return self.new_mime_type == 'text/markdown' or bool(self.job.is_markdown)
8✔
341

342

343
class Report:
8✔
344
    """The base class for reporting."""
345

346
    job_states: list[JobState] = []
8✔
347
    new_release_future: Future[str | bool] | None = None
8✔
348
    start: float = time.perf_counter()
8✔
349

350
    def __init__(self, urlwatch: Urlwatch) -> None:
8✔
351
        """
352

353
        :param urlwatch: The Urlwatch object with the program configuration information.
354
        """
355
        self.config: _Config = urlwatch.config_storage.config
8✔
356
        self.tz = (
8✔
357
            ZoneInfo(self.config['report']['tz'])
358
            if 'report' in self.config and self.config['report']['tz'] is not None
359
            else None
360
        )
361

362
    def _result(
8✔
363
        self,
364
        verb: Verb,
365
        job_state: JobState,
366
    ) -> None:
367
        """Logs error and appends the verb to the job_state.
368

369
        :param verb: Description of the result of the job run. Can be one of
370
          • 'new': new job;
371
          • 'changed': valid data received, and it has changed;
372
          • 'changed,no_report': valid data received, and it has changed, but no report;
373
          • 'unchanged': valid data received, no changes;
374
          • 'error_ended': valid data received, no changes from the last data received before an error;
375
          • 'error': error, prior state was different (either data or different error);
376
          • 'repeated_error': error, same as before;
377
        or a custom message such as  'test'.  Ultimately called by job_runner.
378

379
        :param job_state: The JobState object with the information of the job run.
380
        """
381
        if job_state.exception is not None and not isinstance(job_state.exception, NotModifiedError):
8✔
382
            logger.info(
8✔
383
                f'Job {job_state.job.index_number}: Got exception while processing job {job_state.job}',
384
                exc_info=job_state.exception,
385
            )
386

387
        job_state.verb = verb
8✔
388
        self.job_states.append(job_state)
8✔
389

390
    def new(self, job_state: JobState) -> None:
8✔
391
        """Sets the verb of the job in job_state to 'new'. Called by :py:func:`run_jobs` and tests.
392

393
        :param job_state: The JobState object with the information of the job run.
394
        """
395
        self._result('new', job_state)
8✔
396

397
    def changed(self, job_state: JobState) -> None:
8✔
398
        """Sets the verb of the job in job_state to 'changed'. Called by :py:func:`run_jobs` and tests.
399

400
        :param job_state: The JobState object with the information of the job run.
401
        """
402
        self._result('changed', job_state)
8✔
403

404
    def changed_no_report(self, job_state: JobState) -> None:
8✔
405
        """Sets the verb of the job in job_state to 'changed,no_report'. Called by :py:func:`run_jobs` and tests.
406

407
        :param job_state: The JobState object with the information of the job run.
408
        """
409
        self._result('changed,no_report', job_state)
8✔
410

411
    def unchanged(self, job_state: JobState) -> None:
8✔
412
        """Sets the verb of the job in job_state to 'unchanged'. Called by :py:func:`run_jobs` and tests.
413

414
        :param job_state: The JobState object with the information of the job run.
415
        """
416
        self._result('unchanged', job_state)
8✔
417

418
    def unchanged_from_error(self, job_state: JobState) -> None:
8✔
419
        """Sets the verb of the job in job_state to 'unchanged'. Called by :py:func:`run_jobs` and tests.
420

421
        :param job_state: The JobState object with the information of the job run.
422
        """
UNCOV
423
        self._result('error_ended', job_state)
×
424

425
    def error(self, job_state: JobState) -> None:
8✔
426
        """Sets the verb of the job in job_state to 'error'. Called by :py:func:`run_jobs` and tests.
427

428
        :param job_state: The JobState object with the information of the job run.
429
        """
430
        self._result('error', job_state)
8✔
431

432
    def error_same_error(self, job_state: JobState) -> None:
8✔
433
        """Sets the verb of the job in job_state to 'error'. Called by :py:func:`run_jobs` and tests.
434

435
        :param job_state: The JobState object with the information of the job run.
436
        """
437
        self._result('repeated_error', job_state)
8✔
438

439
    def custom(
8✔
440
        self,
441
        job_state: JobState,
442
        label: Verb,
443
    ) -> None:
444
        """Sets the verb of the job in job_state to a custom label. Called by
445
        :py:func:`UrlwatchCommand.check_test_reporter`.
446

447
        :param job_state: The JobState object with the information of the job run.
448
        :param label: The label to set the information of the job run to.
449
        """
450
        self._result(label, job_state)
8✔
451

452
    def get_filtered_job_states(self, job_states: list[JobState]) -> Iterator[JobState]:
8✔
453
        """Returns JobStates that have reportable changes per config['display'].  Called from :py:Class:`ReporterBase`.
454

455
        :param job_states: The list of JobState objects with the information of the job runs.
456
        :returns: An iterable of JobState objects that have reportable changes per config['display'].
457
        """
458

459
        def should_skip_job(self: Report, job_state: JobState) -> bool:
8✔
460
            """Identify jobs to be skipped."""
461
            # Skip states that are hidden by display config
462
            config_verbs: set[Verb] = {'new', 'unchanged', 'error'}
8✔
463
            if any(
8✔
464
                job_state.verb == verb and not self.config['display'][verb]  # type: ignore[typeddict-item]
465
                for verb in config_verbs
466
            ):
467
                return True
8✔
468
            # Skip compound states
469
            if job_state.verb == 'changed,no_report':
8✔
470
                return True
8✔
471
            # Skip repeated_error if suppress_repeated_errors directive in job
472
            if job_state.verb == 'repeated_error' and job_state.job.suppress_repeated_errors:
8!
UNCOV
473
                return True
×
474
            # Skip empty diffs unless empty-diff is configured
475
            if (
8!
476
                job_state.verb == 'changed'
477
                and not self.config['display']['empty-diff']
478
                and job_state.get_diff(tz=self.tz, differ_defaults=self.config['differ_defaults']) == ''
479
            ):
UNCOV
480
                return True
×
481

482
            return False
8✔
483

484
        for job_state in job_states:
8✔
485
            if not should_skip_job(self, job_state):
8✔
486
                yield job_state
8✔
487

488
    def finish(self, jobs_file: list[Path] | None = None) -> None:
8✔
489
        """Finish job run: determine its duration and generate reports by submitting job_states to
490
        :py:Class:`ReporterBase` :py:func:`submit_all`.
491

492
        :param jobs_file: The path to the file containing the list of jobs (optional, used in footers).
493
        """
494
        end = time.perf_counter()
8✔
495
        duration = end - self.start
8✔
496

497
        ReporterBase.submit_all(self, self.job_states, duration, jobs_file)
8✔
498

499
    def finish_one(self, name: str, jobs_file: list[Path] | None = None, check_enabled: bool | None = True) -> None:
8✔
500
        """Finish job run of one: determine its duration and generate reports by submitting job_states to
501
        :py:Class:`ReporterBase` :py:func:`submit_one`.  Used in testing.
502

503
        :param name: The name of the reporter to run.
504
        :param jobs_file: The path to the file containing the list of jobs (optional, used in footers).
505
        :param check_enabled: If True (default), run reports only if they are enabled in the configuration.
506
        """
507
        end = time.perf_counter()
8✔
508
        duration = end - self.start
8✔
509

510
        ReporterBase.submit_one(name, self, self.job_states, duration, jobs_file, check_enabled)
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc