• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 21545658727

31 Jan 2026 02:04PM UTC coverage: 73.318% (-0.3%) from 73.637%
21545658727

push

github

mborsetti
Version 3.33.0

1404 of 2258 branches covered (62.18%)

Branch coverage included in aggregate %.

1 of 9 new or added lines in 2 files covered. (11.11%)

792 existing lines in 7 files now uncovered.

4710 of 6081 relevant lines covered (77.45%)

11.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.82
/webchanges/handler.py
1
"""Handles the running of jobs and, afterward, of the reports."""
2

3
# The code below is subject to the license contained in the LICENSE.md file, which is part of the source code.
4

5
from __future__ import annotations
15✔
6

7
import logging
15✔
8
import os
15✔
9
import subprocess
15✔
10
import sys
15✔
11
import time
15✔
12
import traceback
15✔
13
from typing import TYPE_CHECKING, Any, ContextManager, Iterator, Literal, NamedTuple, Self, TypedDict
15✔
14
from zoneinfo import ZoneInfo
15✔
15

16
from webchanges.differs import DifferBase, ReportKind
15✔
17
from webchanges.filters import FilterBase
15✔
18
from webchanges.jobs import NotModifiedError
15✔
19
from webchanges.reporters import ReporterBase
15✔
20

21
# https://stackoverflow.com/questions/39740632
22
if TYPE_CHECKING:
23
    from concurrent.futures import Future
24
    from pathlib import Path
25
    from types import TracebackType
26

27
    from webchanges.jobs import JobBase
28
    from webchanges.main import Urlwatch
29
    from webchanges.storage import SsdbStorage, _Config, _ConfigDifferDefaults
30

31
logger = logging.getLogger(__name__)
15✔
32

33

34
class Snapshot(NamedTuple):
15✔
35
    """Type for Snapshot named tuple.
36

37
    * 0: data: str | bytes
38
    * 1: timestamp: float
39
    * 2: tries: int
40
    * 3: etag: str
41
    * 4: mime_type: mime_type
42
    * 5: error: ErrorData
43
    """
44

45
    data: str | bytes
15✔
46
    timestamp: float
15✔
47
    tries: int
15✔
48
    etag: str
15✔
49
    mime_type: str
15✔
50
    error_data: ErrorData
15✔
51

52

53
Verb = Literal[
15✔
54
    'new',  # new job
55
    'changed',  # valid data received, and it has changed
56
    'changed,no_report',  # valid data received, and it has changed, but filtered diff yields no report
57
    'unchanged',  # valid data received, no changes
58
    'error_ended',  # valid data received, no changes from the last data received before an error
59
    'error',  # error, prior state was different (either data or different error)
60
    'repeated_error',  # error, same as before
61
]
62
ErrorData = TypedDict('ErrorData', {'type': str, 'message': str}, total=False)
15✔
63

64

65
class JobState(ContextManager):
15✔
66
    """The JobState class, which contains run information about a job."""
67

68
    _http_client_used: str | None = None
15✔
69
    error_ignored: bool
15✔
70
    exception: Exception | None = None
15✔
71
    generated_diff: dict[ReportKind, str]
15✔
72
    history_dic_snapshots: dict[str | bytes, Snapshot]
15✔
73
    new_data: str | bytes = ''
15✔
74
    new_error_data: ErrorData = {}
15✔
75
    new_etag: str = ''
15✔
76
    new_mime_type: str = ''
15✔
77
    new_timestamp: float
15✔
78
    old_snapshot = Snapshot(
15✔
79
        data='',
80
        timestamp=1605147837.511478,  # initialized to the first release of webchanges!
81
        tries=0,
82
        etag='',
83
        mime_type='text/plain',
84
        error_data={},
85
    )
86
    old_data: str | bytes = ''
15✔
87
    old_error_data: ErrorData = {}
15✔
88
    old_etag: str = ''
15✔
89
    old_mime_type: str = 'text/plain'
15✔
90
    old_timestamp: float = 1605147837.511478  # initialized to the first release of webchanges!
15✔
91
    traceback: str
15✔
92
    tries: int = 0  # if >1, an error; value is the consecutive number of runs leading to an error
15✔
93
    unfiltered_diff: dict[ReportKind, str]
15✔
94
    verb: Verb
15✔
95

96
    def __init__(self, snapshots_db: SsdbStorage, job: JobBase) -> None:
15✔
97
        """Initializes the class
98

99
        :param snapshots_db: The SsdbStorage object with the snapshot database methods.
100
        :param job: A JobBase object with the job information.
101
        """
102
        self.snapshots_db = snapshots_db
15✔
103
        self.job = job
15✔
104

105
        self.generated_diff = {}
15✔
106
        self.unfiltered_diff = {}
15✔
107
        self.history_dic_snapshots = {}
15✔
108

109
    def __enter__(self) -> Self:
15✔
110
        """Context manager invoked on entry to the body of a with statement to make it possible to factor out standard
111
        uses of try/finally statements. Calls the main_thread_enter method of the Job.
112

113
        :returns: Class object.
114
        """
115
        # Below is legacy code that now does nothing, so it's being skipped
116
        # try:
117
        #     self.job.main_thread_enter()
118
        # except Exception as e:
119
        #     logger.info(f'Job {self.job.index_number}: Exception while creating resources for job', exc_info=True)
120
        #     self.exception = e
121
        #     self.traceback = self.job.format_error(e, traceback.format_exc())
122

123
        return self
15✔
124

125
    def __exit__(
15✔
126
        self,
127
        exc_type: type[BaseException] | None,
128
        exc_value: BaseException | None,
129
        tb: TracebackType | None,
130
    ) -> bool | None:
131
        """Context manager invoked on exit from the body of a with statement to make it possible to factor out standard
132
        uses of try/finally statements. Calls the main_thread_exit() method of the Job.
133

134
        :returns: None.
135
        """
136
        # Below is legacy code that now does nothing, so it's being skipped
137
        # try:
138
        #     self.job.main_thread_exit()
139
        # except Exception:
140
        #     # We don't want exceptions from releasing resources to override job run results
141
        #     logger.warning(f'Job {self.index_number}: Exception while releasing resources for job', exc_info=True)
142
        # if isinstance(exc_value, subprocess.CalledProcessError):
143
        #     raise subprocess.SubprocessError(exc_value.stderr)
144
        # elif isinstance(exc_value, FileNotFoundError):
145
        #     raise OSError(exc_value)
146
        return None
15✔
147

148
    @staticmethod
15✔
149
    def debugging_session() -> bool:
15✔
150
        """Checks if the code is currently running within an external debugger (e.g. IDE) and *NOT* in a testing
151
        environment.
152

153
        :returns: True if an external debugger is attached and it's not a pytest session, False otherwise.
154
        """
155
        return sys.breakpointhook.__module__ != 'sys' and 'PYTEST_CURRENT_TEST' not in os.environ
15✔
156

157
    def added_data(self) -> dict[str, bool | str | Exception | float | None]:
15✔
158
        """Returns a dict with the data added in the processing of the job."""
159
        attrs = ('error_ignored', 'exception', 'new_data', 'new_etag', 'new_timestamp')
15✔
160
        return {attr: getattr(self, attr) for attr in attrs if hasattr(self, attr)}
15✔
161

162
    def load(self) -> None:
15✔
163
        """Loads form the database the last snapshot(s) for the job."""
164
        guid = self.job.guid
15✔
165
        self.old_snapshot = self.snapshots_db.load(guid)
15✔
166
        # TODO: Remove these
167
        (
15✔
168
            self.old_data,
169
            self.old_timestamp,
170
            self.tries,
171
            self.old_etag,
172
            self.old_mime_type,
173
            self.old_error_data,
174
        ) = self.old_snapshot
175
        if self.job.compared_versions and self.job.compared_versions > 1:
15✔
176
            self.history_dic_snapshots = {
6✔
177
                s.data: s for s in self.snapshots_db.get_history_snapshots(guid, self.job.compared_versions)
178
            }
179

180
    def save(self) -> None:
15✔
181
        """Saves new data retrieved by the job into the snapshot database."""
182
        if self.new_error_data:  # have encountered an exception, so save the old data
15✔
183
            new_snapshot = Snapshot(
15✔
184
                data=self.old_data,
185
                timestamp=self.new_timestamp,
186
                tries=self.tries,
187
                etag=self.old_etag,
188
                mime_type=self.old_mime_type,
189
                error_data=self.new_error_data,
190
            )
191
        else:
192
            new_snapshot = Snapshot(
15✔
193
                data=self.new_data,
194
                timestamp=self.new_timestamp,
195
                tries=self.tries,
196
                etag=self.new_etag,
197
                mime_type=self.new_mime_type,
198
                error_data=self.new_error_data,
199
            )
200
        self.snapshots_db.save(guid=self.job.guid, snapshot=new_snapshot)
15✔
201
        logger.info(f'Job {self.job.index_number}: Saved new data to database')
15✔
202

203
    def delete_latest(self, temporary: bool = True) -> None:
15✔
204
        """Removes the last instance in the snapshot database."""
205
        self.snapshots_db.delete_latest(guid=self.job.guid, temporary=temporary)
15✔
206

207
    def process(self, headless: bool = True) -> JobState:
15✔
208
        """Processes the job: loads it (i.e. runs it) and handles Exceptions (errors).
209

210
        :returns: a JobState object containing information of the job run.
211
        """
212
        logger.info(f'{self.job.get_indexed_location()} started processing ({type(self.job).__name__})')
15✔
213
        logger.debug(f'Job {self.job.index_number}: {self.job}')
15✔
214

215
        if self.exception and not isinstance(self.exception, NotModifiedError):
15!
216
            self.new_timestamp = time.time()
×
217
            self.new_error_data = {'type': type(self.exception).__name__, 'message': str(self.exception)}
×
218
            logger.info(f'{self.job.get_indexed_location()} ended processing due to exception: {self.exception}')
×
219
            return self
×
220

221
        try:
15✔
222
            self.load()
15✔
223

224
            self.new_timestamp = time.time()
15✔
225
            data, self.new_etag, mime_type = self.job.retrieve(self, headless)
15✔
226
            logger.debug(
15✔
227
                f'Job {self.job.index_number}: Retrieved data={data!r} | etag={self.new_etag} | mime_type={mime_type}'
228
            )
229

230
            # Apply automatic filters first
231
            filtered_data, mime_type = FilterBase.auto_process(self, data, mime_type)
15✔
232

233
            # Apply any specified filters
234
            for filter_kind, subfilter in FilterBase.normalize_filter_list(self.job.filters, self.job.index_number):
15!
235
                filtered_data, mime_type = FilterBase.process(filter_kind, subfilter, self, filtered_data, mime_type)
×
236

237
            self.new_data = filtered_data
15✔
238
            self.new_mime_type = mime_type
15✔
239

240
        except NotModifiedError as e:
15✔
241
            # HTTP 304 response has been received
242
            self.exception = e
×
243
            self.error_ignored = False
×
244
        except Exception as e:
15✔
245
            # Processing error of job failed its chance to handle error
246
            # Job has a chance to format and ignore its error
247
            if self.debugging_session():
15!
248
                logger.warning('Running in a debugging session: raising the exception instead of processing it')
×
249
                raise
×
250
            self.exception = e
15✔
251
            self.error_ignored = self.job.ignore_error(e)
15✔
252
            if not self.error_ignored:
15✔
253
                self.new_timestamp = time.time()
15✔
254
                # Check for specific exception types to provide more detailed tracebacks
255
                if self.job.__class__.__module__ == 'hooks':
15!
256
                    logger.info('Job is from hooks.py: including full traceback in error message')
×
257
                    self.traceback = ''.join(traceback.format_exception(e)).rstrip()
×
258
                elif isinstance(e, subprocess.CalledProcessError):
15✔
259
                    self.traceback = (
15✔
260
                        f'subprocess.CalledProcessError: Command returned non-zero exit status {e.returncode}.\n\n'
261
                        + '\n'.join(filter(None, (e.stderr, e.stdout)))
262
                    )
263
                else:
264
                    # Generic traceback for other exceptions
265
                    self.traceback = self.job.format_error(e, traceback.format_exc())
15✔
266

267
                self.tries += 1
15✔
268
                self.new_error_data = {
15✔
269
                    'type': '.'.join(filter(None, [getattr(e, '__module__', None), e.__class__.__name__])),
270
                    'message': str(e),
271
                }
272
                logger.info(
15✔
273
                    f'Job {self.job.index_number}: Job ended with an error; incrementing cumulative error runs to '
274
                    f'{self.tries}'
275
                )
276

277
        logger.debug(f'Job {self.job.index_number}: Processed as {self.added_data()}')
15✔
278
        logger.info(f'{self.job.get_indexed_location()} ended processing')
15✔
279
        return self
15✔
280

281
    def get_diff(
15✔
282
        self,
283
        report_kind: ReportKind = 'plain',
284
        differ: dict[str, Any] | None = None,
285
        differ_defaults: _ConfigDifferDefaults | None = None,
286
        tz: ZoneInfo | None = None,
287
    ) -> str:
288
        """Generates the job's diff and applies diff_filters to it (if any). Memoized.
289

290
        :parameter report_kind: the kind of report that needs the differ.
291
        :parameter differ: the name of the differ to override self.job.differ.
292
        :parameter tz: The IANA tz_info name of the timezone to use for diff in the job's report (e.g. 'Etc/UTC').
293
        :returns: The job's diff.
294
        """
295
        if report_kind in self.generated_diff:
15✔
296
            return self.generated_diff[report_kind]
15✔
297

298
        if report_kind not in self.unfiltered_diff:
15✔
299
            differ_kind, subdiffer = DifferBase.normalize_differ(
15✔
300
                differ or self.job.differ,
301
                self.job.index_number,
302
                differ_defaults,
303
            )
304
            unfiltered_diff = DifferBase.process(differ_kind, subdiffer, self, report_kind, tz, self.unfiltered_diff)
15✔
305
            self.unfiltered_diff.update(unfiltered_diff)
15✔
306
        _generated_diff = self.unfiltered_diff[report_kind]
15✔
307
        if _generated_diff:
15✔
308
            # Apply any specified diff_filters
309
            _mime_type = 'text/plain'
15✔
310
            for filter_kind, subfilter in FilterBase.normalize_filter_list(
15✔
311
                self.job.diff_filters, self.job.index_number
312
            ):
313
                _generated_diff, _mime_type = FilterBase.process(
15✔
314
                    filter_kind, subfilter, self, _generated_diff, _mime_type
315
                )
316
        self.generated_diff[report_kind] = str(_generated_diff)
15✔
317

318
        return self.generated_diff[report_kind]
15✔
319

320
    def is_markdown(self) -> bool:
15✔
321
        """Returns whether the new data is in markdown."""
322
        return self.new_mime_type == 'text/markdown' or bool(self.job.is_markdown)
15✔
323

324

325
class Report:
15✔
326
    """The base class for reporting."""
327

328
    job_states: list[JobState] = []
15✔
329
    new_release_future: Future[str | bool] | None = None
15✔
330
    start: float = time.perf_counter()
15✔
331

332
    def __init__(self, urlwatch: Urlwatch) -> None:
15✔
333
        """:param urlwatch: The Urlwatch object with the program configuration information."""
334
        self.config: _Config = urlwatch.config_storage.config
15✔
335
        self.tz = (
15✔
336
            ZoneInfo(self.config['report']['tz'])
337
            if 'report' in self.config and self.config['report']['tz'] is not None
338
            else None
339
        )
340

341
    def _result(
15✔
342
        self,
343
        verb: Verb,
344
        job_state: JobState,
345
    ) -> None:
346
        """Logs error and appends the verb to the job_state.
347

348
        :param verb: Description of the result of the job run. Can be one of
349
          • 'new': new job;
350
          • 'changed': valid data received, and it has changed;
351
          • 'changed,no_report': valid data received, and it has changed, but no report;
352
          • 'unchanged': valid data received, no changes;
353
          • 'error_ended': valid data received, no changes from the last data received before an error;
354
          • 'error': error, prior state was different (either data or different error);
355
          • 'repeated_error': error, same as before;
356
        or a custom message such as  'test'.  Ultimately called by job_runner.
357

358
        :param job_state: The JobState object with the information of the job run.
359
        """
360
        if job_state.exception is not None:
15✔
361
            logger.info(
15✔
362
                f'Job {job_state.job.index_number}: Got exception while processing job {job_state.job}',
363
                exc_info=job_state.exception,
364
            )
365

366
        job_state.verb = verb
15✔
367
        self.job_states.append(job_state)
15✔
368

369
    def new(self, job_state: JobState) -> None:
15✔
370
        """Sets the verb of the job in job_state to 'new'. Called by :py:func:`run_jobs` and tests.
371

372
        :param job_state: The JobState object with the information of the job run.
373
        """
374
        self._result('new', job_state)
15✔
375

376
    def changed(self, job_state: JobState) -> None:
15✔
377
        """Sets the verb of the job in job_state to 'changed'. Called by :py:func:`run_jobs` and tests.
378

379
        :param job_state: The JobState object with the information of the job run.
380
        """
381
        self._result('changed', job_state)
15✔
382

383
    def changed_no_report(self, job_state: JobState) -> None:
15✔
384
        """Sets the verb of the job in job_state to 'changed,no_report'. Called by :py:func:`run_jobs` and tests.
385

386
        :param job_state: The JobState object with the information of the job run.
387
        """
388
        self._result('changed,no_report', job_state)
15✔
389

390
    def unchanged(self, job_state: JobState) -> None:
15✔
391
        """Sets the verb of the job in job_state to 'unchanged'. Called by :py:func:`run_jobs` and tests.
392

393
        :param job_state: The JobState object with the information of the job run.
394
        """
395
        self._result('unchanged', job_state)
15✔
396

397
    def unchanged_from_error(self, job_state: JobState) -> None:
15✔
398
        """Sets the verb of the job in job_state to 'unchanged'. Called by :py:func:`run_jobs` and tests.
399

400
        :param job_state: The JobState object with the information of the job run.
401
        """
402
        self._result('error_ended', job_state)
×
403

404
    def error(self, job_state: JobState) -> None:
15✔
405
        """Sets the verb of the job in job_state to 'error'. Called by :py:func:`run_jobs` and tests.
406

407
        :param job_state: The JobState object with the information of the job run.
408
        """
409
        self._result('error', job_state)
15✔
410

411
    def error_same_error(self, job_state: JobState) -> None:
15✔
412
        """Sets the verb of the job in job_state to 'error'. Called by :py:func:`run_jobs` and tests.
413

414
        :param job_state: The JobState object with the information of the job run.
415
        """
416
        self._result('repeated_error', job_state)
15✔
417

418
    def custom(
15✔
419
        self,
420
        job_state: JobState,
421
        label: Verb,
422
    ) -> None:
423
        """Sets the verb of the job in job_state to a custom label. Called by
424
        :py:func:`UrlwatchCommand.check_test_reporter`.
425

426
        :param job_state: The JobState object with the information of the job run.
427
        :param label: The label to set the information of the job run to.
428
        """
429
        self._result(label, job_state)
15✔
430

431
    def get_filtered_job_states(self, job_states: list[JobState]) -> Iterator[JobState]:
15✔
432
        """Returns JobStates that have reportable changes per config['display'].  Called from :py:Class:`ReporterBase`.
433

434
        :param job_states: The list of JobState objects with the information of the job runs.
435
        :returns: An iterable of JobState objects that have reportable changes per config['display'].
436
        """
437

438
        def should_skip_job(self: Report, job_state: JobState) -> bool:
15✔
439
            """Identify jobs to be skipped."""
440
            # Skip states that are hidden by display config
441
            config_verbs: set[Verb] = {'new', 'unchanged', 'error'}
15✔
442
            if any(job_state.verb == verb and not self.config['display'][verb] for verb in config_verbs):
15✔
443
                return True
15✔
444
            # Skip compound states
445
            if job_state.verb == 'changed,no_report':
15✔
446
                return True
15✔
447
            # Skip repeated_error if suppress_repeated_errors directive in job
448
            if job_state.verb == 'repeated_error' and job_state.job.suppress_repeated_errors:
15!
UNCOV
449
                return True
×
450
            # Skip empty diffs unless empty-diff is configured
451
            return (
15✔
452
                job_state.verb == 'changed'
453
                and not self.config['display']['empty-diff']
454
                and job_state.get_diff(tz=self.tz, differ_defaults=self.config['differ_defaults']) == ''
455
            )
456

457
        for job_state in job_states:
15✔
458
            if not should_skip_job(self, job_state):
15✔
459
                yield job_state
15✔
460

461
    def finish(self, jobs_file: list[Path]) -> None:
15✔
462
        """Finish job run: determine its duration and generate reports by submitting job_states to
463
        :py:Class:`ReporterBase` :py:func:`submit_all`.
464

465
        :param jobs_file: The path to the file containing the list of jobs (optional, used in footers).
466
        """
467
        end = time.perf_counter()
15✔
468
        duration = end - self.start
15✔
469

470
        ReporterBase.submit_all(self, self.job_states, duration, jobs_file)
15✔
471

472
    def finish_one(self, name: str, jobs_file: list[Path] | None = None, check_enabled: bool | None = True) -> None:
15✔
473
        """Finish job run of one: determine its duration and generate reports by submitting job_states to
474
        :py:Class:`ReporterBase` :py:func:`submit_one`.  Used in testing.
475

476
        :param name: The name of the reporter to run.
477
        :param jobs_file: The path to the file containing the list of jobs (optional, used in footers).
478
        :param check_enabled: If True (default), run reports only if they are enabled in the configuration.
479
        """
480
        end = time.perf_counter()
15✔
481
        duration = end - self.start
15✔
482
        if jobs_file is None:
15✔
483
            jobs_file = []
15✔
484

485
        ReporterBase.submit_one(name, self, self.job_states, duration, jobs_file, check_enabled)
15✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc