• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 10371336087

13 Aug 2024 02:07PM UTC coverage: 77.832% (-0.2%) from 78.056%
10371336087

push

github

mborsetti
Version 3.25.0rc0

1751 of 2515 branches covered (69.62%)

Branch coverage included in aggregate %.

4446 of 5447 relevant lines covered (81.62%)

6.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/webchanges/handler.py
1
"""Handles the running of jobs and, afterward, of the reports."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import logging
8✔
8
import os
8✔
9
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
10
import time
8✔
11
import traceback
8✔
12
from concurrent.futures import Future
8✔
13
from pathlib import Path
8✔
14
from types import TracebackType
8✔
15
from typing import Any, ContextManager, Iterator, Literal, NamedTuple, Optional, TYPE_CHECKING, Union
8✔
16

17
from webchanges.differs import DifferBase
8✔
18
from webchanges.filters import FilterBase
8✔
19
from webchanges.jobs import NotModifiedError
8✔
20
from webchanges.reporters import ReporterBase
8✔
21

22
# https://stackoverflow.com/questions/39740632
23
if TYPE_CHECKING:
24
    from webchanges.jobs import JobBase
25
    from webchanges.main import Urlwatch
26
    from webchanges.storage import _Config, SsdbStorage
27

28
logger = logging.getLogger(__name__)
8✔
29

30

31
class Snapshot(NamedTuple):
8✔
32
    """Type for Snapshot named tuple.
33

34
    * 0: data: str | bytes
35
    * 1: timestamp: float
36
    * 2: tries: int
37
    * 3: etag: str
38
    * 4: mime_type: mime_type
39
    """
40

41
    data: Union[str, bytes]
8✔
42
    timestamp: float
8✔
43
    tries: int
8✔
44
    etag: str
8✔
45
    mime_type: str
8✔
46

47

48
class JobState(ContextManager):
8✔
49
    """The JobState class, which contains run information about a job."""
50

51
    _http_client_used: Optional[str] = None
8✔
52
    error_ignored: Union[bool, str]
8✔
53
    exception: Optional[Exception] = None
8✔
54
    generated_diff: dict[Literal['text', 'markdown', 'html'], str]
8✔
55
    history_dic_snapshots: dict[Union[str, bytes], Snapshot] = {}
8✔
56
    new_data: Union[str, bytes]
8✔
57
    new_etag: str
8✔
58
    new_mime_type: str = ''
8✔
59
    new_timestamp: float
8✔
60
    old_snapshot = Snapshot(
8✔
61
        data='',
62
        timestamp=1605147837.511478,  # initialized to the first release of webchanges!
63
        tries=0,
64
        etag='',
65
        mime_type='text/plain',
66
    )
67
    old_data: Union[str, bytes] = ''
8✔
68
    old_etag: str = ''
8✔
69
    old_mime_type: str = 'text/plain'
8✔
70
    old_timestamp: float = 1605147837.511478  # initialized to the first release of webchanges!
8✔
71
    traceback: str
8✔
72
    tries: int = 0  # if >1, an error; value is the consecutive number of runs leading to an error
8✔
73
    unfiltered_diff: dict[Literal['text', 'markdown', 'html'], str] = {}
8✔
74
    verb: Literal['new', 'changed', 'changed,no_report', 'unchanged', 'error']
8✔
75

76
    def __init__(self, snapshots_db: SsdbStorage, job: JobBase) -> None:
8✔
77
        """
78
        Initializes the class
79

80
        :param snapshots_db: The SsdbStorage object with the snapshot database methods.
81
        :param job: A JobBase object with the job information.
82
        """
83
        self.snapshots_db = snapshots_db
8✔
84
        self.job = job
8✔
85

86
        self.generated_diff = {}
8✔
87
        self.unfiltered_diff = {}
8✔
88

89
    def __enter__(self) -> 'JobState':
8✔
90
        """Context manager invoked on entry to the body of a with statement to make it possible to factor out standard
91
        uses of try/finally statements. Calls the main_thread_enter method of the Job.
92

93
        :returns: Class object.
94
        """
95
        # Below is legacy code that now does nothing, so it's being skipped
96
        # try:
97
        #     self.job.main_thread_enter()
98
        # except Exception as e:
99
        #     logger.info(f'Job {self.job.index_number}: Exception while creating resources for job', exc_info=True)
100
        #     self.exception = e
101
        #     self.traceback = self.job.format_error(e, traceback.format_exc())
102

103
        return self
8✔
104

105
    def __exit__(
8✔
106
        self,
107
        exc_type: Optional[type[BaseException]],
108
        exc_value: Optional[BaseException],
109
        tb: Optional[TracebackType],
110
    ) -> Optional[bool]:
111
        """Context manager invoked on exit from the body of a with statement to make it possible to factor out standard
112
        uses of try/finally statements. Calls the main_thread_exit() method of the Job.
113

114
        :returns: None.
115
        """
116
        # Below is legacy code that now does nothing, so it's being skipped
117
        # try:
118
        #     self.job.main_thread_exit()
119
        # except Exception:
120
        #     # We don't want exceptions from releasing resources to override job run results
121
        #     logger.warning(f'Job {self.index_number}: Exception while releasing resources for job', exc_info=True)
122
        if isinstance(exc_value, subprocess.CalledProcessError):
8!
123
            raise subprocess.SubprocessError(exc_value.stderr)
×
124
        elif isinstance(exc_value, FileNotFoundError):
8!
125
            raise OSError(exc_value)
×
126
        return None
8✔
127

128
    def added_data(self) -> dict[str, Optional[Union[bool, str, Exception, float]]]:
8✔
129
        """Returns a dict with the data added in the processing of the job."""
130
        attrs = ('error_ignored', 'exception', 'new_data', 'new_etag', 'new_timestamp')
8✔
131
        return {attr: getattr(self, attr) for attr in attrs if hasattr(self, attr)}
8✔
132

133
    def load(self) -> None:
8✔
134
        """Loads form the database the last snapshot(s) for the job."""
135
        guid = self.job.get_guid()
8✔
136
        self.old_snapshot = self.snapshots_db.load(guid)
8✔
137
        # TODO: remove these
138
        (
8✔
139
            self.old_data,
140
            self.old_timestamp,
141
            self.tries,
142
            self.old_etag,
143
            self.old_mime_type,
144
        ) = self.old_snapshot
145
        if self.job.compared_versions and self.job.compared_versions > 1:
8!
146
            self.history_dic_snapshots = {
×
147
                s.data: s for s in self.snapshots_db.get_history_snapshots(guid, self.job.compared_versions)
148
            }
149

150
    def save(self, use_old_data: bool = False) -> None:
8✔
151
        """Saves new data retrieved by the job into the snapshot database.
152

153
        :param use_old_data: Whether old data (and ETag) should be used (e.g. due to error, leading to new data or
154
           data being an error message instead of the relevant data).
155
        """
156
        if use_old_data:
8✔
157
            self.new_data = self.old_data
8✔
158
            self.new_etag = self.old_etag
8✔
159
            self.new_mime_type = self.old_mime_type
8✔
160

161
        new_snapshot = Snapshot(
8✔
162
            data=self.new_data,
163
            timestamp=self.new_timestamp,
164
            tries=self.tries,
165
            etag=self.new_etag,
166
            mime_type=self.new_mime_type,
167
        )
168
        self.snapshots_db.save(guid=self.job.get_guid(), snapshot=new_snapshot)
8✔
169
        logger.info(f'Job {self.job.index_number}: Saved new data to database')
8✔
170

171
    def delete_latest(self, temporary: bool = True) -> None:
8✔
172
        """Removes the last instance in the snapshot database."""
173
        self.snapshots_db.delete_latest(guid=self.job.get_guid(), temporary=temporary)
8✔
174

175
    def process(self, headless: bool = True) -> JobState:
8✔
176
        """Processes the job: loads it (i.e. runs it) and handles Exceptions (errors).
177

178
        :returns: a JobState object containing information of the job run.
179
        """
180
        logger.info(f'{self.job.get_indexed_location()} started processing ({type(self.job).__name__})')
8✔
181
        logger.debug(f'Job {self.job.index_number}: {self.job}')
8✔
182

183
        if self.exception:
8!
184
            self.new_timestamp = time.time()
×
185
            logger.info(f'{self.job.get_indexed_location()} ended processing due to exception: {self.exception}')
×
186
            return self
×
187

188
        try:
8✔
189
            try:
8✔
190
                self.load()
8✔
191

192
                self.new_timestamp = time.time()
8✔
193
                data, self.new_etag, mime_type = self.job.retrieve(self, headless)
8✔
194
                logger.debug(
8✔
195
                    f'Job {self.job.index_number}: Retrieved data '
196
                    f'{dict(data=data, etag=self.new_etag, mime_type=mime_type)}'
197
                )
198

199
                # Apply automatic filters first
200
                filtered_data, mime_type = FilterBase.auto_process(self, data, mime_type)
8✔
201

202
                # Apply any specified filters
203
                for filter_kind, subfilter in FilterBase.normalize_filter_list(self.job.filter, self.job.index_number):
8!
204
                    filtered_data, mime_type = FilterBase.process(
×
205
                        filter_kind, subfilter, self, filtered_data, mime_type
206
                    )
207

208
                self.new_data = filtered_data
8✔
209
                self.new_mime_type = mime_type
8✔
210

211
            except Exception as e:
8✔
212
                # Job has a chance to format and ignore its error
213
                # if os.getenv('PYCHARM_HOSTED'):
214
                #     raise
215
                self.new_timestamp = time.time()
8✔
216
                self.exception = e
8✔
217
                self.traceback = self.job.format_error(e, traceback.format_exc())
8✔
218
                self.error_ignored = self.job.ignore_error(e)
8✔
219
                if not (self.error_ignored or isinstance(e, NotModifiedError)):
8✔
220
                    self.tries += 1
8✔
221
                    logger.info(
8✔
222
                        f'Job {self.job.index_number}: Job ended with error; incrementing cumulative error runs to '
223
                        f'{self.tries}'
224
                    )
225
        except Exception as e:
×
226
            # Job failed its chance to handle error
227
            if os.getenv('PYCHARM_HOSTED'):
×
228
                raise
×
229
            self.exception = e
×
230
            self.traceback = self.job.format_error(e, traceback.format_exc())
×
231
            self.error_ignored = False
×
232
            if not isinstance(e, NotModifiedError):
×
233
                self.tries += 1
×
234
                logger.info(
×
235
                    f'Job {self.job.index_number}: Job ended with error (internal handling failed); incrementing '
236
                    f'cumulative error runs to {self.tries}'
237
                )
238

239
        logger.debug(f'Job {self.job.index_number}: Processed as {self.added_data()}')
8✔
240
        logger.info(f'{self.job.get_indexed_location()} ended processing')
8✔
241
        return self
8✔
242

243
    def get_diff(
8✔
244
        self,
245
        report_kind: Literal['text', 'markdown', 'html'] = 'text',
246
        differ: Optional[dict[str, Any]] = None,
247
        tz: Optional[str] = None,
248
    ) -> str:
249
        """Generates the job's diff and applies diff_filters to it (if any). Memoized.
250

251
        :parameter report_kind: the kind of report that needs the differ.
252
        :parameter differ: the name of the differ to override self.job.differ.
253
        :parameter tz: The IANA tz_info name of the timezone to use for diff in the job's report (e.g. 'Etc/UTC').
254
        :returns: The job's diff.
255
        """
256
        # generated_diff must be initialized as None
257
        if self.generated_diff is not {} and report_kind in self.generated_diff:
8✔
258
            return self.generated_diff[report_kind]
8✔
259

260
        if self.generated_diff is {} or report_kind not in self.unfiltered_diff:
8✔
261
            differ_kind, subdiffer = DifferBase.normalize_differ(differ or self.job.differ, self.job.index_number)
8✔
262
            unfiltered_diff = DifferBase.process(differ_kind, subdiffer, self, report_kind, tz, self.unfiltered_diff)
8✔
263
            self.unfiltered_diff.update(unfiltered_diff)
8✔
264
        _generated_diff = self.unfiltered_diff[report_kind]
8✔
265
        if _generated_diff:
8✔
266
            # Apply any specified diff_filters
267
            _mime_type = 'text/plain'
8✔
268
            for filter_kind, subfilter in FilterBase.normalize_filter_list(self.job.diff_filter, self.job.index_number):
8✔
269
                _generated_diff, _mime_type = FilterBase.process(  # type: ignore[assignment]
8✔
270
                    filter_kind, subfilter, self, _generated_diff, _mime_type
271
                )
272
        self.generated_diff[report_kind] = _generated_diff
8✔
273

274
        return self.generated_diff[report_kind]
8✔
275

276
    def is_markdown(self) -> bool:
8✔
277
        """Returns whether the new data is in markdown."""
278
        return self.new_mime_type == 'text/markdown' or bool(self.job.is_markdown)
8✔
279

280

281
class Report:
8✔
282
    """The base class for reporting."""
283

284
    job_states: list[JobState] = []
8✔
285
    new_release_future: Optional[Future[Union[str, bool]]] = None
8✔
286
    start: float = time.perf_counter()
8✔
287

288
    def __init__(self, urlwatch: Urlwatch) -> None:
8✔
289
        """
290

291
        :param urlwatch: The Urlwatch object with the program configuration information.
292
        """
293
        self.config: _Config = urlwatch.config_storage.config
8✔
294

295
    def _result(
8✔
296
        self,
297
        verb: Literal['new', 'changed', 'changed,no_report', 'unchanged', 'error'],
298
        job_state: JobState,
299
    ) -> None:
300
        """Logs error and appends the verb to the job_state.
301

302
        :param verb: Description of the result of the job run. Can be one of 'new', 'changed', 'changed,no_report',
303
        'unchanged', 'error', which have a meaning, or a custom message such as 'test'.
304
        :param job_state: The JobState object with the information of the job run.
305
        """
306
        if job_state.exception is not None and job_state.exception is not NotModifiedError:
8✔
307
            logger.info(
8✔
308
                f'Job {job_state.job.index_number}: Got exception while processing job {job_state.job}',
309
                exc_info=job_state.exception,
310
            )
311

312
        job_state.verb = verb
8✔
313
        self.job_states.append(job_state)
8✔
314

315
    def new(self, job_state: JobState) -> None:
8✔
316
        """Sets the verb of the job in job_state to 'new'. Called by :py:func:`run_jobs` and tests.
317

318
        :param job_state: The JobState object with the information of the job run.
319
        """
320
        self._result('new', job_state)
8✔
321

322
    def changed(self, job_state: JobState) -> None:
8✔
323
        """Sets the verb of the job in job_state to 'changed'. Called by :py:func:`run_jobs` and tests.
324

325
        :param job_state: The JobState object with the information of the job run.
326
        """
327
        self._result('changed', job_state)
8✔
328

329
    def changed_no_report(self, job_state: JobState) -> None:
8✔
330
        """Sets the verb of the job in job_state to 'changed,no_report'. Called by :py:func:`run_jobs` and tests.
331

332
        :param job_state: The JobState object with the information of the job run.
333
        """
334
        self._result('changed,no_report', job_state)
8✔
335

336
    def unchanged(self, job_state: JobState) -> None:
8✔
337
        """Sets the verb of the job in job_state to 'unchanged'. Called by :py:func:`run_jobs` and tests.
338

339
        :param job_state: The JobState object with the information of the job run.
340
        """
341
        self._result('unchanged', job_state)
8✔
342

343
    def error(self, job_state: JobState) -> None:
8✔
344
        """Sets the verb of the job in job_state to 'error'. Called by :py:func:`run_jobs` and tests.
345

346
        :param job_state: The JobState object with the information of the job run.
347
        """
348
        self._result('error', job_state)
8✔
349

350
    def custom(
8✔
351
        self,
352
        job_state: JobState,
353
        label: Literal['new', 'changed', 'changed,no_report', 'unchanged', 'error'],
354
    ) -> None:
355
        """Sets the verb of the job in job_state to a custom label. Called by
356
        :py:func:`UrlwatchCommand.check_test_reporter`.
357

358
        :param job_state: The JobState object with the information of the job run.
359
        :param label: The label to set the information of the job run to.
360
        """
361
        self._result(label, job_state)
8✔
362

363
    def get_filtered_job_states(self, job_states: list[JobState]) -> Iterator[JobState]:
8✔
364
        """Returns JobStates that have reportable changes per config['display'].  Called from :py:Class:`ReporterBase`.
365

366
        :param job_states: The list of JobState objects with the information of the job runs.
367
        :returns: An iterable of JobState objects that have reportable changes per config['display'].
368
        """
369
        for job_state in job_states:
8✔
370
            if (
8✔
371
                not any(
372
                    job_state.verb == verb and not self.config['display'][verb]  # type: ignore[literal-required]
373
                    for verb in {'unchanged', 'new', 'error'}
374
                )
375
                and job_state.verb != 'changed,no_report'
376
            ):
377
                if (
8!
378
                    job_state.verb == 'changed'
379
                    and not self.config['display']['empty-diff']
380
                    and job_state.get_diff() == ''
381
                ):
382
                    continue
×
383

384
                yield job_state
8✔
385

386
    def finish(self, jobs_file: Optional[list[Path]] = None) -> None:
8✔
387
        """Finish job run: determine its duration and generate reports by submitting job_states to
388
        :py:Class:`ReporterBase` :py:func:`submit_all`.
389

390
        :param jobs_file: The path to the file containing the list of jobs (optional, used in footers).
391
        """
392
        end = time.perf_counter()
8✔
393
        duration = end - self.start
8✔
394

395
        ReporterBase.submit_all(self, self.job_states, duration, jobs_file)
8✔
396

397
    def finish_one(
8✔
398
        self, name: str, jobs_file: Optional[list[Path]] = None, check_enabled: Optional[bool] = True
399
    ) -> None:
400
        """Finish job run of one: determine its duration and generate reports by submitting job_states to
401
        :py:Class:`ReporterBase` :py:func:`submit_one`.  Used in testing.
402

403
        :param name: The name of the reporter to run.
404
        :param jobs_file: The path to the file containing the list of jobs (optional, used in footers).
405
        :param check_enabled: If True (default), run reports only if they are enabled in the configuration.
406
        """
407
        end = time.perf_counter()
8✔
408
        duration = end - self.start
8✔
409

410
        ReporterBase.submit_one(name, self, self.job_states, duration, jobs_file, check_enabled)
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc