• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 14020917399

23 Mar 2025 04:35PM UTC coverage: 75.4% (-0.05%) from 75.448%
14020917399

push

github

mborsetti
Version 3.29.0rc2

1739 of 2632 branches covered (66.07%)

Branch coverage included in aggregate %.

4575 of 5742 relevant lines covered (79.68%)

6.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.78
/webchanges/handler.py
1
"""Handles the running of jobs and, afterward, of the reports."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import logging
8✔
8
import sys
8✔
9
import time
8✔
10
import traceback
8✔
11
from concurrent.futures import Future
8✔
12
from pathlib import Path
8✔
13
from types import TracebackType
8✔
14
from typing import Any, ContextManager, Iterator, Literal, NamedTuple, TYPE_CHECKING, TypedDict
8✔
15
from zoneinfo import ZoneInfo
8✔
16

17
from webchanges.differs import DifferBase
8✔
18
from webchanges.filters import FilterBase
8✔
19
from webchanges.jobs import NotModifiedError
8✔
20
from webchanges.reporters import ReporterBase
8✔
21

22
# https://stackoverflow.com/questions/39740632
23
if TYPE_CHECKING:
24
    from webchanges.jobs import JobBase
25
    from webchanges.main import Urlwatch
26
    from webchanges.storage import _Config, SsdbStorage
27

28
logger = logging.getLogger(__name__)
8✔
29

30

31
class Snapshot(NamedTuple):
8✔
32
    """Type for Snapshot named tuple.
33

34
    * 0: data: str | bytes
35
    * 1: timestamp: float
36
    * 2: tries: int
37
    * 3: etag: str
38
    * 4: mime_type: mime_type
39
    * 5: error: ErrorData
40
    """
41

42
    data: str | bytes
8✔
43
    timestamp: float
8✔
44
    tries: int
8✔
45
    etag: str
8✔
46
    mime_type: str
8✔
47
    error_data: ErrorData
8✔
48

49

50
Verb = Literal[
8✔
51
    'new',  # new job
52
    'changed',  # valid data received, and it has changed
53
    'changed,no_report',  # valid data received, and it has changed, but filtered diff yields no report
54
    'unchanged',  # valid data received, no changes
55
    'error_ended',  # valid data received, no changes from the last data received before an error
56
    'error',  # error, prior state was different (either data or different error)
57
    'repeated_error',  # error, same as before
58
]
59
ErrorData = TypedDict('ErrorData', {'type': str, 'message': str}, total=False)
8✔
60

61

62
class JobState(ContextManager):
8✔
63
    """The JobState class, which contains run information about a job."""
64

65
    _http_client_used: str | None = None
8✔
66
    error_ignored: bool | str
8✔
67
    exception: Exception | None = None
8✔
68
    generated_diff: dict[Literal['text', 'markdown', 'html'], str]
8✔
69
    history_dic_snapshots: dict[str | bytes, Snapshot]
8✔
70
    new_data: str | bytes = ''
8✔
71
    new_error_data: ErrorData = {}
8✔
72
    new_etag: str
8✔
73
    new_mime_type: str = ''
8✔
74
    new_timestamp: float
8✔
75
    old_snapshot = Snapshot(
8✔
76
        data='',
77
        timestamp=1605147837.511478,  # initialized to the first release of webchanges!
78
        tries=0,
79
        etag='',
80
        mime_type='text/plain',
81
        error_data={},
82
    )
83
    old_data: str | bytes = ''
8✔
84
    old_error_data: ErrorData = {}
8✔
85
    old_etag: str = ''
8✔
86
    old_mime_type: str = 'text/plain'
8✔
87
    old_timestamp: float = 1605147837.511478  # initialized to the first release of webchanges!
8✔
88
    traceback: str
8✔
89
    tries: int = 0  # if >1, an error; value is the consecutive number of runs leading to an error
8✔
90
    unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str]
8✔
91
    verb: Verb
8✔
92

93
    def __init__(self, snapshots_db: SsdbStorage, job: JobBase) -> None:
8✔
94
        """
95
        Initializes the class
96

97
        :param snapshots_db: The SsdbStorage object with the snapshot database methods.
98
        :param job: A JobBase object with the job information.
99
        """
100
        self.snapshots_db = snapshots_db
8✔
101
        self.job = job
8✔
102

103
        self.generated_diff = {}
8✔
104
        self.unfiltered_diff = {}
8✔
105
        self.history_dic_snapshots = {}
8✔
106

107
    def __enter__(self) -> 'JobState':
8✔
108
        """Context manager invoked on entry to the body of a with statement to make it possible to factor out standard
109
        uses of try/finally statements. Calls the main_thread_enter method of the Job.
110

111
        :returns: Class object.
112
        """
113
        # Below is legacy code that now does nothing, so it's being skipped
114
        # try:
115
        #     self.job.main_thread_enter()
116
        # except Exception as e:
117
        #     logger.info(f'Job {self.job.index_number}: Exception while creating resources for job', exc_info=True)
118
        #     self.exception = e
119
        #     self.traceback = self.job.format_error(e, traceback.format_exc())
120

121
        return self
8✔
122

123
    def __exit__(
8✔
124
        self,
125
        exc_type: type[BaseException] | None,
126
        exc_value: BaseException | None,
127
        tb: TracebackType | None,
128
    ) -> bool | None:
129
        """Context manager invoked on exit from the body of a with statement to make it possible to factor out standard
130
        uses of try/finally statements. Calls the main_thread_exit() method of the Job.
131

132
        :returns: None.
133
        """
134
        # Below is legacy code that now does nothing, so it's being skipped
135
        # try:
136
        #     self.job.main_thread_exit()
137
        # except Exception:
138
        #     # We don't want exceptions from releasing resources to override job run results
139
        #     logger.warning(f'Job {self.index_number}: Exception while releasing resources for job', exc_info=True)
140
        # if isinstance(exc_value, subprocess.CalledProcessError):
141
        #     raise subprocess.SubprocessError(exc_value.stderr)
142
        # elif isinstance(exc_value, FileNotFoundError):
143
        #     raise OSError(exc_value)
144
        return None
8✔
145

146
    @staticmethod
8✔
147
    def debugger_attached() -> bool:
8✔
148
        """Checks if the code is currently running within an external debugger (e.g. IDE).
149

150
        :returns: True if an external debugger is attached, False otherwise.
151
        """
152
        return False
8✔
153
        return sys.breakpointhook.__module__ != 'sys'
154

155
    def added_data(self) -> dict[str, bool | str | Exception | float | None]:
8✔
156
        """Returns a dict with the data added in the processing of the job."""
157
        attrs = ('error_ignored', 'exception', 'new_data', 'new_etag', 'new_timestamp')
8✔
158
        return {attr: getattr(self, attr) for attr in attrs if hasattr(self, attr)}
8!
159

160
    def load(self) -> None:
8✔
161
        """Loads form the database the last snapshot(s) for the job."""
162
        guid = self.job.get_guid()
8✔
163
        self.old_snapshot = self.snapshots_db.load(guid)
8✔
164
        # TODO: remove these
165
        (
8✔
166
            self.old_data,
167
            self.old_timestamp,
168
            self.tries,
169
            self.old_etag,
170
            self.old_mime_type,
171
            self.old_error_data,
172
        ) = self.old_snapshot
173
        if self.job.compared_versions and self.job.compared_versions > 1:
8!
174
            self.history_dic_snapshots = {
×
175
                s.data: s for s in self.snapshots_db.get_history_snapshots(guid, self.job.compared_versions)
176
            }
177

178
    def save(self) -> None:
8✔
179
        """Saves new data retrieved by the job into the snapshot database.
180

181
        :param use_old_data: Whether old data (and ETag) should be used (e.g. due to error, leading to new data or
182
           data being an error message instead of the relevant data).
183
        """
184
        if self.new_error_data:  # have encountered an exception, so save the old data
8✔
185
            new_snapshot = Snapshot(
8✔
186
                data=self.old_data,
187
                timestamp=self.new_timestamp,
188
                tries=self.tries,
189
                etag=self.old_etag,
190
                mime_type=self.old_mime_type,
191
                error_data=self.new_error_data,
192
            )
193
        else:
194
            new_snapshot = Snapshot(
8✔
195
                data=self.new_data,
196
                timestamp=self.new_timestamp,
197
                tries=self.tries,
198
                etag=self.new_etag,
199
                mime_type=self.new_mime_type,
200
                error_data=self.new_error_data,
201
            )
202
        self.snapshots_db.save(guid=self.job.get_guid(), snapshot=new_snapshot)
8✔
203
        logger.info(f'Job {self.job.index_number}: Saved new data to database')
8✔
204

205
    def delete_latest(self, temporary: bool = True) -> None:
8✔
206
        """Removes the last instance in the snapshot database."""
207
        self.snapshots_db.delete_latest(guid=self.job.get_guid(), temporary=temporary)
8✔
208

209
    def process(self, headless: bool = True) -> JobState:
8✔
210
        """Processes the job: loads it (i.e. runs it) and handles Exceptions (errors).
211

212
        :returns: a JobState object containing information of the job run.
213
        """
214
        logger.info(f'{self.job.get_indexed_location()} started processing ({type(self.job).__name__})')
8✔
215
        logger.debug(f'Job {self.job.index_number}: {self.job}')
8✔
216

217
        if self.exception and not isinstance(self.exception, NotModifiedError):
8!
218
            self.new_timestamp = time.time()
×
219
            self.new_error_data = {
×
220
                'type': type(self.exception).__name__,
221
                'message': str(self.exception),
222
            }
223
            logger.info(f'{self.job.get_indexed_location()} ended processing due to exception: {self.exception}')
×
224
            return self
×
225

226
        try:
8✔
227
            try:
8✔
228
                self.load()
8✔
229

230
                self.new_timestamp = time.time()
8✔
231
                data, self.new_etag, mime_type = self.job.retrieve(self, headless)
8✔
232
                logger.debug(
8✔
233
                    f'Job {self.job.index_number}: Retrieved data '
234
                    f'{dict(data=data, etag=self.new_etag, mime_type=mime_type)}'
235
                )
236

237
            except Exception as e:
8✔
238
                # Job has a chance to format and ignore its error
239
                if self.debugger_attached():
8!
240
                    logger.warning('Running in a debugger: raising the exception instead of processing it')
×
241
                    raise
×
242
                self.new_timestamp = time.time()
8✔
243
                self.exception = e
8✔
244
                self.traceback = self.job.format_error(e, traceback.format_exc())
8✔
245
                self.error_ignored = self.job.ignore_error(e)
8✔
246
                if not (self.error_ignored or isinstance(e, NotModifiedError)):
8✔
247
                    self.tries += 1
8✔
248
                    self.new_error_data = {
8✔
249
                        'type': e.__class__.__name__,
250
                        'message': str(e),
251
                    }
252
                    logger.info(
8✔
253
                        f'Job {self.job.index_number}: Job ended with error; incrementing cumulative error runs to '
254
                        f'{self.tries}'
255
                    )
256

257
            else:
258
                # Apply automatic filters first
259
                filtered_data, mime_type = FilterBase.auto_process(self, data, mime_type)
8✔
260

261
                # Apply any specified filters
262
                for filter_kind, subfilter in FilterBase.normalize_filter_list(self.job.filters, self.job.index_number):
8!
263
                    filtered_data, mime_type = FilterBase.process(
×
264
                        filter_kind, subfilter, self, filtered_data, mime_type
265
                    )
266

267
                self.new_data = filtered_data
8✔
268
                self.new_mime_type = mime_type
8✔
269

270
        except Exception as e:
×
271
            # Processing error or job failed its chance to handle error
272
            if self.debugger_attached():
×
273
                logger.warning('Running in a debugger: raising the exception instead of processing it')
×
274
                raise
×
275
            self.new_timestamp = time.time()
×
276
            self.exception = e
×
277
            self.traceback = ''.join(traceback.format_exception_only(e, show_group=True)).rstrip()
×
278
            self.error_ignored = False
×
279
            self.tries += 1
×
280
            self.new_error_data = {
×
281
                'type': '.'.join(filter(None, [getattr(e, '__module__', None), e.__class__.__name__])),
282
                'message': str(e),
283
            }
284
            logger.info(
×
285
                f'Job {self.job.index_number}: Job ended with error (internal handling failed); incrementing '
286
                f'cumulative error runs to {self.tries}'
287
            )
288

289
        logger.debug(f'Job {self.job.index_number}: Processed as {self.added_data()}')
8✔
290
        logger.info(f'{self.job.get_indexed_location()} ended processing')
8✔
291
        return self
8✔
292

293
    def get_diff(
8✔
294
        self,
295
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
296
        differ: dict[str, Any] | None = None,
297
        tz: ZoneInfo | None = None,
298
        config: _Config | None = None,
299
    ) -> str:
300
        """Generates the job's diff and applies diff_filters to it (if any). Memoized.
301

302
        :parameter report_kind: the kind of report that needs the differ.
303
        :parameter differ: the name of the differ to override self.job.differ.
304
        :parameter tz: The IANA tz_info name of the timezone to use for diff in the job's report (e.g. 'Etc/UTC').
305
        :returns: The job's diff.
306
        """
307
        if report_kind in self.generated_diff:
8✔
308
            return self.generated_diff[report_kind]
8✔
309

310
        if report_kind not in self.unfiltered_diff:
8✔
311
            differ_kind, subdiffer = DifferBase.normalize_differ(
8✔
312
                differ or self.job.differ,
313
                self.job.index_number,
314
                config,
315
            )
316
            unfiltered_diff = DifferBase.process(differ_kind, subdiffer, self, report_kind, tz, self.unfiltered_diff)
8✔
317
            self.unfiltered_diff.update(unfiltered_diff)
8✔
318
        _generated_diff = self.unfiltered_diff[report_kind]
8✔
319
        if _generated_diff:
8✔
320
            # Apply any specified diff_filters
321
            _mime_type = 'text/plain'
8✔
322
            for filter_kind, subfilter in FilterBase.normalize_filter_list(
8✔
323
                self.job.diff_filters, self.job.index_number
324
            ):
325
                _generated_diff, _mime_type = FilterBase.process(  # type: ignore[assignment]
8✔
326
                    filter_kind, subfilter, self, _generated_diff, _mime_type
327
                )
328
        self.generated_diff[report_kind] = str(_generated_diff)
8✔
329

330
        return self.generated_diff[report_kind]
8✔
331

332
    def is_markdown(self) -> bool:
8✔
333
        """Returns whether the new data is in markdown."""
334
        return self.new_mime_type == 'text/markdown' or bool(self.job.is_markdown)
8✔
335

336

337
class Report:
8✔
338
    """The base class for reporting."""
339

340
    job_states: list[JobState] = []
8✔
341
    new_release_future: Future[str | bool] | None = None
8✔
342
    start: float = time.perf_counter()
8✔
343

344
    def __init__(self, urlwatch: Urlwatch) -> None:
8✔
345
        """
346

347
        :param urlwatch: The Urlwatch object with the program configuration information.
348
        """
349
        self.config: _Config = urlwatch.config_storage.config
8✔
350
        self.tz = (
8✔
351
            ZoneInfo(self.config['report']['tz'])
352
            if 'report' in self.config and self.config['report']['tz'] is not None
353
            else None
354
        )
355

356
    def _result(
8✔
357
        self,
358
        verb: Verb,
359
        job_state: JobState,
360
    ) -> None:
361
        """Logs error and appends the verb to the job_state.
362

363
        :param verb: Description of the result of the job run. Can be one of
364
          • 'new': new job;
365
          • 'changed': valid data received, and it has changed;
366
          • 'changed,no_report': valid data received, and it has changed, but no report;
367
          • 'unchanged': valid data received, no changes;
368
          • 'error_ended': valid data received, no changes from the last data received before an error;
369
          • 'error': error, prior state was different (either data or different error);
370
          • 'repeated_error': error, same as before;
371
        or a custom message such as  'test'.  Ultimately called by job_runner.
372

373
        :param job_state: The JobState object with the information of the job run.
374
        """
375
        if job_state.exception is not None and not isinstance(job_state.exception, NotModifiedError):
8✔
376
            logger.info(
8✔
377
                f'Job {job_state.job.index_number}: Got exception while processing job {job_state.job}',
378
                exc_info=job_state.exception,
379
            )
380

381
        job_state.verb = verb
8✔
382
        self.job_states.append(job_state)
8✔
383

384
    def new(self, job_state: JobState) -> None:
8✔
385
        """Sets the verb of the job in job_state to 'new'. Called by :py:func:`run_jobs` and tests.
386

387
        :param job_state: The JobState object with the information of the job run.
388
        """
389
        self._result('new', job_state)
8✔
390

391
    def changed(self, job_state: JobState) -> None:
8✔
392
        """Sets the verb of the job in job_state to 'changed'. Called by :py:func:`run_jobs` and tests.
393

394
        :param job_state: The JobState object with the information of the job run.
395
        """
396
        self._result('changed', job_state)
8✔
397

398
    def changed_no_report(self, job_state: JobState) -> None:
8✔
399
        """Sets the verb of the job in job_state to 'changed,no_report'. Called by :py:func:`run_jobs` and tests.
400

401
        :param job_state: The JobState object with the information of the job run.
402
        """
403
        self._result('changed,no_report', job_state)
8✔
404

405
    def unchanged(self, job_state: JobState) -> None:
8✔
406
        """Sets the verb of the job in job_state to 'unchanged'. Called by :py:func:`run_jobs` and tests.
407

408
        :param job_state: The JobState object with the information of the job run.
409
        """
410
        self._result('unchanged', job_state)
8✔
411

412
    def unchanged_from_error(self, job_state: JobState) -> None:
8✔
413
        """Sets the verb of the job in job_state to 'unchanged'. Called by :py:func:`run_jobs` and tests.
414

415
        :param job_state: The JobState object with the information of the job run.
416
        """
417
        self._result('error_ended', job_state)
×
418

419
    def error(self, job_state: JobState) -> None:
8✔
420
        """Sets the verb of the job in job_state to 'error'. Called by :py:func:`run_jobs` and tests.
421

422
        :param job_state: The JobState object with the information of the job run.
423
        """
424
        self._result('error', job_state)
8✔
425

426
    def error_same_error(self, job_state: JobState) -> None:
8✔
427
        """Sets the verb of the job in job_state to 'error'. Called by :py:func:`run_jobs` and tests.
428

429
        :param job_state: The JobState object with the information of the job run.
430
        """
431
        self._result('repeated_error', job_state)
8✔
432

433
    def custom(
8✔
434
        self,
435
        job_state: JobState,
436
        label: Verb,
437
    ) -> None:
438
        """Sets the verb of the job in job_state to a custom label. Called by
439
        :py:func:`UrlwatchCommand.check_test_reporter`.
440

441
        :param job_state: The JobState object with the information of the job run.
442
        :param label: The label to set the information of the job run to.
443
        """
444
        self._result(label, job_state)
8✔
445

446
    def get_filtered_job_states(self, job_states: list[JobState]) -> Iterator[JobState]:
8✔
447
        """Returns JobStates that have reportable changes per config['display'].  Called from :py:Class:`ReporterBase`.
448

449
        :param job_states: The list of JobState objects with the information of the job runs.
450
        :returns: An iterable of JobState objects that have reportable changes per config['display'].
451
        """
452

453
        def should_skip_job(self: Report, job_state: JobState) -> bool:
8✔
454
            """Identify jobs to be skipped."""
455
            # Skip states that are hidden by display config
456
            config_verbs: set[Verb] = {'new', 'unchanged', 'error'}
8✔
457
            if any(
8✔
458
                job_state.verb == verb and not self.config['display'][verb]  # type: ignore[typeddict-item]
459
                for verb in config_verbs
460
            ):
461
                return True
8✔
462
            # Skip compound states
463
            if job_state.verb == 'changed,no_report':
8✔
464
                return True
8✔
465
            # Skip repeated_error if suppress_repeated_errors directive in job
466
            if job_state.verb == 'repeated_error' and job_state.job.suppress_repeated_errors:
8!
467
                return True
×
468
            # Skip empty diffs unless empty-diff is configured
469
            if (
8!
470
                job_state.verb == 'changed'
471
                and not self.config['display']['empty-diff']
472
                and job_state.get_diff(tz=self.tz, config=self.config) == ''
473
            ):
474
                return True
×
475

476
            return False
8✔
477

478
        for job_state in job_states:
8✔
479
            if not should_skip_job(self, job_state):
8✔
480
                yield job_state
8✔
481

482
    def finish(self, jobs_file: list[Path] | None = None) -> None:
8✔
483
        """Finish job run: determine its duration and generate reports by submitting job_states to
484
        :py:Class:`ReporterBase` :py:func:`submit_all`.
485

486
        :param jobs_file: The path to the file containing the list of jobs (optional, used in footers).
487
        """
488
        end = time.perf_counter()
8✔
489
        duration = end - self.start
8✔
490

491
        ReporterBase.submit_all(self, self.job_states, duration, jobs_file)
8✔
492

493
    def finish_one(self, name: str, jobs_file: list[Path] | None = None, check_enabled: bool | None = True) -> None:
8✔
494
        """Finish job run of one: determine its duration and generate reports by submitting job_states to
495
        :py:Class:`ReporterBase` :py:func:`submit_one`.  Used in testing.
496

497
        :param name: The name of the reporter to run.
498
        :param jobs_file: The path to the file containing the list of jobs (optional, used in footers).
499
        :param check_enabled: If True (default), run reports only if they are enabled in the configuration.
500
        """
501
        end = time.perf_counter()
8✔
502
        duration = end - self.start
8✔
503

504
        ReporterBase.submit_one(name, self, self.job_states, duration, jobs_file, check_enabled)
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc