• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 10371336087

13 Aug 2024 02:07PM UTC coverage: 77.832% (-0.2%) from 78.056%
10371336087

push

github

mborsetti
Version 3.25.0rc0

1751 of 2515 branches covered (69.62%)

Branch coverage included in aggregate %.

4446 of 5447 relevant lines covered (81.62%)

6.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.34
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import contextlib
8✔
8
import difflib
8✔
9
import email.utils
8✔
10
import importlib.metadata
8✔
11
import logging
8✔
12
import os
8✔
13
import platform
8✔
14
import re
8✔
15
import shutil
8✔
16
import sqlite3
8✔
17
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
18
import sys
8✔
19
import time
8✔
20
import traceback
8✔
21
from concurrent.futures import ThreadPoolExecutor
8✔
22
from datetime import datetime
8✔
23
from pathlib import Path
8✔
24
from typing import Iterable, Iterator, Optional, TYPE_CHECKING, Union
8✔
25
from urllib.parse import unquote_plus
8✔
26
from zoneinfo import ZoneInfo
8✔
27

28
from webchanges import __docs_url__, __project_name__, __version__
8✔
29
from webchanges.differs import DifferBase
8✔
30
from webchanges.filters import FilterBase
8✔
31
from webchanges.handler import JobState, Report
8✔
32
from webchanges.jobs import BrowserJob, JobBase, NotModifiedError, UrlJob
8✔
33
from webchanges.mailer import smtp_have_password, smtp_set_password, SMTPMailer
8✔
34
from webchanges.reporters import ReporterBase, xmpp_have_password, xmpp_set_password
8✔
35
from webchanges.util import dur_text, edit_file, import_module_from_source
8✔
36

37
try:
8✔
38
    import httpx
8✔
39
except ImportError:  # pragma: no cover
40
    httpx = None  # type: ignore[assignment]
41
    print("Required package 'httpx' not found; will attempt to run using 'requests'")
42
    try:
43
        import requests
44
    except ImportError as e:  # pragma: no cover
45
        raise RuntimeError(
46
            f"A Python HTTP client package (either 'httpx' or 'requests' is required to run {__project_name__}; "
47
            'neither can be imported.'
48
        ) from e
49
if httpx is not None:
8!
50
    try:
8✔
51
        import h2
8✔
52
    except ImportError:  # pragma: no cover
53
        h2 = None  # type: ignore[assignment]
54

55
try:
8✔
56
    import apt
8✔
57
except ImportError:  # pragma: no cover
58
    apt = None  # type: ignore[assignment]
59

60
try:
8✔
61
    from pip._internal.metadata import get_default_environment
8✔
62
except ImportError:  # pragma: no cover
63
    get_default_environment = None  # type: ignore[assignment]
64

65
try:
8✔
66
    from playwright.sync_api import sync_playwright
8✔
67
except ImportError:  # pragma: no cover
68
    sync_playwright = None  # type: ignore[assignment]
69

70
try:
8✔
71
    import psutil
8✔
72
    from psutil._common import bytes2human
8✔
73
except ImportError:  # pragma: no cover
74
    psutil = None  # type: ignore[assignment]
75
    bytes2human = None  # type: ignore[assignment]
76

77
logger = logging.getLogger(__name__)
8✔
78

79
if TYPE_CHECKING:
80
    from webchanges.main import Urlwatch
81
    from webchanges.reporters import _ConfigReportersList
82
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
83

84

85
class UrlwatchCommand:
8✔
86
    """The class that runs the program after initialization and CLI arguments parsing."""
87

88
    def __init__(self, urlwatcher: Urlwatch) -> None:
8✔
89
        self.urlwatcher = urlwatcher
8✔
90
        self.urlwatch_config = urlwatcher.urlwatch_config
8✔
91

92
    @staticmethod
8✔
93
    def _exit(arg: Union[str, int, None]) -> None:
8✔
94
        logger.info(f'Exiting with exit code {arg}')
8✔
95
        sys.exit(arg)
8✔
96

97
    def jobs_from_joblist(self) -> Iterator[JobBase]:
8✔
98
        """Generates the jobs to process from the joblist entered in the CLI."""
99
        if self.urlwatcher.urlwatch_config.joblist:
8✔
100
            jobs = {self._find_job(job_entry) for job_entry in self.urlwatcher.urlwatch_config.joblist}
8✔
101
            enabled_jobs = {job for job in jobs if job.is_enabled()}
8✔
102
            disabled = len(enabled_jobs) - len(jobs)
8✔
103
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
104
            logger.debug(
8✔
105
                f"Processing {len(enabled_jobs)} job{'s' if len(enabled_jobs) else ''}{disabled_str} as specified in "
106
                f"command line: {', '.join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}"
107
            )
108
        else:
109
            enabled_jobs = {job for job in self.urlwatcher.jobs if job.is_enabled()}
8✔
110
            disabled = len(enabled_jobs) - len(self.urlwatcher.jobs)
8✔
111
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
112
            logger.debug(f"Processing {len(enabled_jobs)} job{'s' if len(enabled_jobs) else ''}{disabled_str}")
8✔
113
        for job in enabled_jobs:
8✔
114
            yield job.with_defaults(self.urlwatcher.config_storage.config)
8✔
115

116
    def edit_hooks(self) -> int:
8✔
117
        """Edit hooks file.
118

119
        :returns: 0 if edit is successful, 1 otherwise.
120
        """
121
        # Similar code to BaseTextualFileStorage.edit()
122
        logger.debug(f'Edit file {self.urlwatch_config.hooks_file}')
8✔
123
        # Python 3.9: hooks_edit = self.urlwatch_config.hooks.with_stem(self.urlwatch_config.hooks.stem + '_edit')
124
        hooks_edit = self.urlwatch_config.hooks_file.parent.joinpath(
8✔
125
            self.urlwatch_config.hooks_file.stem + '_edit' + ''.join(self.urlwatch_config.hooks_file.suffixes)
126
        )
127
        if self.urlwatch_config.hooks_file.exists():
8!
128
            shutil.copy(self.urlwatch_config.hooks_file, hooks_edit)
8✔
129
        # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
130
        #         self.urlwatch_config.hooks_py_example):
131
        #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
132

133
        while True:
6✔
134
            try:
8✔
135
                edit_file(hooks_edit)
8✔
136
                import_module_from_source('hooks', hooks_edit)
8✔
137
                break  # stop if no exception on parser
8✔
138
            except SystemExit:
8!
139
                raise
×
140
            except Exception as e:
8✔
141
                print('Parsing failed:')
8✔
142
                print('======')
8✔
143
                print(e)
8✔
144
                print('======')
8✔
145
                print('')
8✔
146
                print(f'The file {self.urlwatch_config.hooks_file} was NOT updated.')
8✔
147
                user_input = input('Do you want to retry the same edit? (Y/n)')
8✔
148
                if not user_input or user_input.lower()[0] == 'y':
×
149
                    continue
×
150
                hooks_edit.unlink()
×
151
                print('No changes have been saved.')
×
152
                return 1
2✔
153

154
        if self.urlwatch_config.hooks_file.is_symlink():
8!
155
            self.urlwatch_config.hooks_file.write_text(hooks_edit.read_text())
×
156
        else:
157
            hooks_edit.replace(self.urlwatch_config.hooks_file)
8✔
158
        hooks_edit.unlink(missing_ok=True)
8✔
159
        print(f'Saved edits in {self.urlwatch_config.hooks_file}')
8✔
160
        return 0
8✔
161

162
    @staticmethod
8✔
163
    def show_features() -> int:
8✔
164
        """
165
        Prints the "features", i.e. a list of job types, filters and reporters.
166

167
        :return: 0.
168
        """
169
        print(f'Please see full documentation at {__docs_url__}')
8✔
170
        print()
8✔
171
        print('Supported jobs:\n')
8✔
172
        print(JobBase.job_documentation())
8✔
173
        print('Supported filters:\n')
8✔
174
        print(FilterBase.filter_documentation())
8✔
175
        print()
8✔
176
        print('Supported differs:\n')
8✔
177
        print(DifferBase.differ_documentation())
8✔
178
        print()
8✔
179
        print('Supported reporters:\n')
8✔
180
        print(ReporterBase.reporter_documentation())
8✔
181
        print()
8✔
182
        print(f'Please see full documentation at {__docs_url__}')
8✔
183

184
        return 0
8✔
185

186
    @staticmethod
8✔
187
    def show_detailed_versions() -> int:
8✔
188
        """
189
        Prints the detailed versions, including of dependencies.
190

191
        :return: 0.
192
        """
193

194
        def dependencies() -> list[str]:
8✔
195
            if get_default_environment is not None:
8!
196
                env = get_default_environment()
8✔
197
                dist = None
8✔
198
                for dist in env.iter_all_distributions():
8✔
199
                    if dist.canonical_name == __project_name__:
8!
200
                        break
×
201
                if dist and dist.canonical_name == __project_name__:
8!
202
                    return sorted(set(d.split()[0] for d in dist.metadata_dict['requires_dist']), key=str.lower)
×
203

204
            # default list of all possible dependencies
205
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
8✔
206
            return [
8✔
207
                'aioxmpp',
208
                'beautifulsoup4',
209
                'chump',
210
                'colorama',
211
                'cryptography',
212
                'cssbeautifier',
213
                'cssselect',
214
                'deepdiff',
215
                'h2',
216
                'html2text',
217
                'httpx',
218
                'jq',
219
                'jsbeautifier',
220
                'keyring',
221
                'lxml',
222
                'markdown2',
223
                'matrix_client',
224
                'msgpack',
225
                'pdftotext',
226
                'Pillow',
227
                'platformdirs',
228
                'playwright',
229
                'psutil',
230
                'pushbullet.py',
231
                'pypdf',
232
                'pytesseract',
233
                'pyyaml',
234
                'redis',
235
                'requests',
236
                'tzdata',
237
                'vobject',
238
            ]
239

240
        print('Software:')
8✔
241
        print(f'• {__project_name__}: {__version__}')
8✔
242
        print(
8✔
243
            f'• {platform.python_implementation()}: {platform.python_version()} '
244
            f'{platform.python_build()} {platform.python_compiler()}'
245
        )
246
        print(f'• SQLite: {sqlite3.sqlite_version}')
8✔
247

248
        if psutil:
8!
249
            print()
8✔
250
            print('System:')
8✔
251
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
8✔
252
            print(f'• Processor: {platform.processor()}')
8✔
253
            print(f'• CPUs (logical): {psutil.cpu_count()}')
8✔
254
            try:
8✔
255
                virt_mem = psutil.virtual_memory().available
8✔
256
                print(
8✔
257
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
258
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
259
                )
260
            except psutil.Error as e:  # pragma: no cover
261
                print(f'• Free memory: Could not read information: {e}')
262
            print(
8✔
263
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
264
                f"({100 - psutil.disk_usage('/').percent:.1f}%)"
265
            )
266

267
        print()
8✔
268
        print('Installed PyPi dependencies:')
8✔
269
        for module_name in dependencies():
8✔
270
            try:
8✔
271
                mod = importlib.metadata.distribution(module_name)
8✔
272
            except ModuleNotFoundError:
8✔
273
                continue
8✔
274
            print(f'• {module_name}: {mod.version}')
8✔
275
            # package requirements
276
            if mod.requires:
8✔
277
                for req_name in [i.split()[0] for i in mod.requires]:
8✔
278
                    try:
8✔
279
                        req = importlib.metadata.distribution(req_name)
8✔
280
                    except ModuleNotFoundError:
8✔
281
                        continue
8✔
282
                    print(f'  - {req_name}: {req.version}')
8✔
283

284
        # playwright
285
        if sync_playwright is not None:
8!
286
            with sync_playwright() as p:
8!
287
                browser = p.chromium.launch(channel='chrome')
×
288
                print()
×
289
                print('Playwright browser:')
×
290
                print(f'• Name: {browser.browser_type.name}')
×
291
                print(f'• Version: {browser.version}')
×
292
                if psutil:
×
293
                    browser.new_page()
×
294
                    try:
×
295
                        virt_mem = psutil.virtual_memory().available
×
296
                        print(
×
297
                            f'• Free memory with browser loaded: {bytes2human(virt_mem)} physical plus '
298
                            f'{bytes2human(psutil.swap_memory().free)} swap'
299
                        )
300
                    except psutil.Error:
×
301
                        pass
×
302

303
        if os.name == 'posix' and apt:
8!
304
            apt_cache = apt.Cache()
×
305

306
            def print_version(libs: list[str]) -> None:
×
307
                for lib in libs:
×
308
                    if lib in apt_cache:
×
309
                        if ver := apt_cache[lib].versions:
×
310
                            print(f'   - {ver[0].package}: {ver[0].version}')
×
311
                return None
×
312

313
            print()
×
314
            print('Installed dpkg dependencies:')
×
315
            for module, apt_dists in (
×
316
                ('jq', ['jq']),
317
                # https://github.com/jalan/pdftotext#os-dependencies
318
                ('pdftotext', ['libpoppler-cpp-dev']),
319
                # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
320
                (
321
                    'Pillow',
322
                    [
323
                        'libjpeg-dev',
324
                        'zlib-dev',
325
                        'zlib1g-dev',
326
                        'libtiff-dev',
327
                        'libfreetype-dev',
328
                        'littlecms-dev',
329
                        'libwebp-dev',
330
                        'tcl/tk-dev',
331
                        'openjpeg-dev',
332
                        'libimagequant-dev',
333
                        'libraqm-dev',
334
                        'libxcb-dev',
335
                        'libxcb1-dev',
336
                    ],
337
                ),
338
                ('playwright', ['google-chrome-stable']),
339
                # https://tesseract-ocr.github.io/tessdoc/Installation.html
340
                ('pytesseract', ['tesseract-ocr']),
341
            ):
342
                try:
×
343
                    importlib.metadata.distribution(module)
×
344
                    print(f'• {module}')
×
345
                    print_version(apt_dists)
×
346
                except importlib.metadata.PackageNotFoundError:
×
347
                    pass
×
348
        return 0
8✔
349

350
    def list_jobs(self, regex: Union[bool, str]) -> None:
8✔
351
        """
352
        Lists the job and their respective _index_number.
353

354
        :return: None.
355
        """
356
        if isinstance(regex, str):
8!
357
            print(f"List of jobs matching the RegEx '{regex}':")
×
358
        else:
359
            print('List of jobs:')
8✔
360
        for job in self.urlwatcher.jobs:
8✔
361
            if self.urlwatch_config.verbose:
8✔
362
                job_desc = f'{job.index_number:3}: {job!r}'
8✔
363
            else:
364
                pretty_name = job.pretty_name()
8✔
365
                location = job.get_location()
8✔
366
                if pretty_name != location:
8!
367
                    job_desc = f'{job.index_number:3}: {pretty_name} ({location})'
8✔
368
                else:
369
                    job_desc = f'{job.index_number:3}: {pretty_name}'
×
370
            if isinstance(regex, bool) or re.findall(regex, job_desc):
8!
371
                print(job_desc)
8✔
372

373
        if len(self.urlwatch_config.jobs_files) > 1:
8✔
374
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
8✔
375
        elif len(self.urlwatch_config.jobs_files) == 1:
8✔
376
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
8✔
377
        else:
378
            jobs_files = []
8✔
379
        print('\n   '.join(jobs_files))
8✔
380

381
    def _find_job(self, query: Union[str, int]) -> JobBase:
8✔
382
        """Finds the job based on a query, which is matched to the job index (also negative) or a job location
383
        (i.e. the url/user_visible_url or command).
384

385
        :param query: The query.
386
        :return: The matching JobBase.
387
        :raises IndexError: If job is not found.
388
        """
389
        if isinstance(query, int):
8✔
390
            index = query
8✔
391
        else:
392
            try:
8✔
393
                index = int(query)
8✔
394
            except ValueError:
8✔
395
                query = unquote_plus(query)
8✔
396
                try:
8✔
397
                    return next((job for job in self.urlwatcher.jobs if unquote_plus(job.get_location()) == query))
8✔
398
                except StopIteration:
8✔
399
                    raise ValueError(f"Job {query} does not match any job's url/user_visible_url or command.") from None
8✔
400

401
        if index == 0:
8✔
402
            raise ValueError(f'Job index {index} out of range.')
8✔
403
        try:
8✔
404
            if index <= 0:
8✔
405
                return self.urlwatcher.jobs[index]
8✔
406
            else:
407
                return self.urlwatcher.jobs[index - 1]
8✔
408
        except IndexError as e:
8✔
409
            raise ValueError(f'Job index {index} out of range (found {len(self.urlwatcher.jobs)} jobs).') from e
8✔
410

411
    def _find_job_with_defaults(self, query: Union[str, int]) -> JobBase:
8✔
412
        """
413
        Returns the job with defaults based on job_id, which could match an index or match a location
414
        (url/user_visible_url or command). Accepts negative numbers.
415

416
        :param query: The query.
417
        :return: The matching JobBase with defaults.
418
        :raises SystemExit: If job is not found.
419
        """
420
        job = self._find_job(query)
8✔
421
        return job.with_defaults(self.urlwatcher.config_storage.config)
8✔
422

423
    def test_job(self, job_id: Union[bool, str, int]) -> None:
8✔
424
        """
425
        Tests the running of a single job outputting the filtered text to stdout. If job_id is True, don't run any
426
        jobs as it's a test of loading config, jobs and hook files for syntax.
427

428
        :param job_id: The job_id or True.
429

430
        :return: None.
431

432
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
433
        """
434
        if job_id is True:
8✔
435
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
8✔
436
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
8✔
437
            if len(self.urlwatch_config.jobs_files) == 1:
8✔
438
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]},')
8✔
439
            else:
440
                message.append(
8✔
441
                    '\n   '.join(
442
                        [f'{conj}jobs files'] + [f'• {file},' for file in sorted(self.urlwatch_config.jobs_files)]
443
                    )
444
                )
445
            if 'hooks' in sys.modules:
8!
446
                message.append(f"\nand hooks file {sys.modules['hooks'].__file__}")
8✔
447
            print(f"{''.join(message)}.")
8✔
448
            return
8✔
449

450
        job = self._find_job_with_defaults(job_id)
8✔
451
        start = time.perf_counter()
8✔
452

453
        if isinstance(job, UrlJob):
8!
454
            # Force re-retrieval of job, as we're testing filters
455
            job.ignore_cached = True
×
456

457
        # Add defaults, as if when run
458
        job = job.with_defaults(self.urlwatcher.config_storage.config)
8✔
459

460
        with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
461
            job_state.process(headless=not self.urlwatch_config.no_headless)
8✔
462
            duration = time.perf_counter() - start
8✔
463
            if job_state.exception is not None:
8!
464
                raise job_state.exception from job_state.exception
×
465
            output = [
8✔
466
                job_state.job.pretty_name(),
467
                ('-' * len(job_state.job.pretty_name())),
468
            ]
469
            if job_state.job.note:
8!
470
                output.append(job_state.job.note)
×
471
            output.extend(
8✔
472
                [
473
                    '',
474
                    str(job_state.new_data),
475
                    '',
476
                    '--',
477
                    f'Job tested in {dur_text(duration)} with {__project_name__} {__version__}.',
478
                ]
479
            )
480
            print('\n'.join(output))
8✔
481
        return
8✔
482

483
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
484
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
485

486
    def test_differ(self, arg_test_differ: list[str]) -> int:
8✔
487
        """
488
        Runs diffs for a job on all the saved snapshots and outputs the result to stdout or whatever reporter is
489
        selected with --test-reporter.
490

491
        :param arg_test_differ: Either the job_id or a list containing [job_id, max_diffs]
492
        :return: 1 if error, 0 if successful.
493
        """
494
        report = Report(self.urlwatcher)
8✔
495
        self.urlwatch_config.jobs_files = [Path('--test-differ')]  # for report footer
8✔
496
        if len(arg_test_differ) == 1:
8✔
497
            job_id = arg_test_differ[0]
8✔
498
            max_diffs = None
8✔
499
        elif len(arg_test_differ) == 2:
8!
500
            job_id, max_diffs_str = arg_test_differ
8✔
501
            max_diffs = int(max_diffs_str)
8✔
502
        else:
503
            raise ValueError('--test-differ takes a maximum of two arguments')
×
504

505
        job = self._find_job_with_defaults(job_id)
8✔
506

507
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid())
8✔
508

509
        num_snapshots = len(history_data)
8✔
510
        if num_snapshots == 0:
8✔
511
            print('This job has never been run before.')
8✔
512
            return 1
8✔
513
        elif num_snapshots < 2:
8✔
514
            print('Not enough historic data available (need at least 2 different snapshots).')
8✔
515
            return 1
8✔
516

517
        if job.compared_versions and job.compared_versions != 1:
8!
518
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
519

520
        max_diffs = max_diffs or num_snapshots - 1
8✔
521
        for i in range(max_diffs):
8✔
522
            with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
523
                job_state.new_data = history_data[i].data
8✔
524
                job_state.new_timestamp = history_data[i].timestamp
8✔
525
                job_state.new_etag = history_data[i].etag
8✔
526
                job_state.new_mime_type = history_data[i].mime_type
8✔
527
                if not job.compared_versions or job.compared_versions == 1:
8!
528
                    job_state.old_data = history_data[i + 1].data
8✔
529
                    job_state.old_timestamp = history_data[i + 1].timestamp
8✔
530
                    job_state.old_etag = history_data[i + 1].etag
8✔
531
                    job_state.old_mime_type = history_data[i + 1].mime_type
8✔
532
                else:
533
                    history_dic_snapshots = {s.data: s for s in history_data[i + 1 : i + 1 + job.compared_versions]}
×
534
                    close_matches: list[str] = difflib.get_close_matches(
×
535
                        str(job_state.new_data), history_dic_snapshots.keys(), n=1  # type: ignore[arg-type]
536
                    )
537
                    if close_matches:
×
538
                        job_state.old_data = close_matches[0]
×
539
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
540
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
541
                        job_state.old_mime_type = history_dic_snapshots[close_matches[0]].mime_type
×
542

543
                if self.urlwatch_config.test_reporter is None:
8✔
544
                    self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
545
                report.job_states = []  # required
8✔
546
                if job_state.new_data == job_state.old_data:
8!
547
                    label = (
×
548
                        f'No change (snapshots {-i:2} AND {-(i + 1):2}) with '
549
                        f"'compared_versions: {job.compared_versions}'"
550
                    )
551
                    job_state.verb = 'changed,no_report'
×
552
                else:
553
                    label = f'Filtered diff (snapshots {-i:2} and {-(i + 1):2})'
8✔
554
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
8✔
555
                if errorlevel:
8!
556
                    self._exit(errorlevel)
×
557

558
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
559
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
560

561
        return 0
8✔
562

563
    def dump_history(self, job_id: str) -> int:
8✔
564
        """
565
        Displays the historical data stored in the snapshot database for a job.
566

567
        :param job_id: The Job ID.
568
        :return: An argument to be used in sys.exit.
569
        """
570

571
        job = self._find_job_with_defaults(job_id)
8✔
572
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid())
8✔
573

574
        print(f'History for job {job.get_indexed_location()}:')
8✔
575
        print(f'(ID: {job.get_guid()})')
8✔
576
        total_failed = 0
8✔
577
        if history_data:
8✔
578
            print('=' * 50)
8✔
579
        for i, snapshot in enumerate(history_data):
8✔
580
            mime_type = f'; {snapshot.mime_type}' if snapshot.mime_type else ''
8✔
581
            etag = f'; ETag: {snapshot.etag}' if snapshot.etag else ''
8✔
582
            tries = f'; error run (number {snapshot.tries})' if snapshot.tries else ''
8✔
583
            total_failed += snapshot.tries > 0
8✔
584
            tz = self.urlwatcher.report.config['report']['tz']
8✔
585
            tzinfo = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
8✔
586
            dt = datetime.fromtimestamp(snapshot.timestamp, tzinfo)
8✔
587
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{mime_type}{etag}{tries}'
8✔
588
            sep_len = max(50, len(header))
8✔
589
            print(header)
8✔
590
            print('-' * sep_len)
8✔
591
            print(snapshot[0])
8✔
592
            print('=' * sep_len, '\n')
8✔
593

594
        print(
8✔
595
            f'Found {len(history_data) - total_failed}'
596
            + (' good' if total_failed else '')
597
            + ' snapshot'
598
            + ('s' if len(history_data) - total_failed != 1 else '')
599
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
600
            + '.'
601
        )
602

603
        return 0
8✔
604

605
    def list_error_jobs(self) -> int:
8✔
606
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
8✔
607
            print(f'Invalid reporter {self.urlwatch_config.errors}')
8✔
608
            return 1
8✔
609

610
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Iterator[str]:
8✔
611
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
612

613
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
614
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
615
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
616
            """
617
            with contextlib.ExitStack() as stack:
8✔
618
                max_workers = min(32, os.cpu_count() or 1) if any(isinstance(job, BrowserJob) for job in jobs) else None
8✔
619
                logger.debug(f'Max_workers set to {max_workers}')
8✔
620
                executor = ThreadPoolExecutor(max_workers=max_workers)
8✔
621

622
                for job_state in executor.map(
8✔
623
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
624
                    (stack.enter_context(JobState(self.urlwatcher.ssdb_storage, job)) for job in jobs),
625
                ):
626
                    if job_state.exception is None or isinstance(job_state.exception, NotModifiedError):
8✔
627
                        if (
8!
628
                            len(job_state.new_data.strip()) == 0
629
                            if hasattr(job_state, 'new_data')
630
                            else len(job_state.old_data.strip()) == 0
631
                        ):
632
                            if self.urlwatch_config.verbose:
×
633
                                yield f'{job_state.job.index_number:3}: No data: {job_state.job!r}'
×
634
                            else:
635
                                pretty_name = job_state.job.pretty_name()
×
636
                                location = job_state.job.get_location()
×
637
                                if pretty_name != location:
×
638
                                    yield f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})'
×
639
                                else:
640
                                    yield f'{job_state.job.index_number:3}: No data: {pretty_name}'
×
641
                    else:
642
                        pretty_name = job_state.job.pretty_name()
8✔
643
                        location = job_state.job.get_location()
8✔
644
                        if pretty_name != location:
8!
645
                            yield (
8✔
646
                                f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
647
                                f'({location})'
648
                            )
649
                        else:
650
                            yield f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})'
×
651

652
        start = time.perf_counter()
8✔
653
        if len(self.urlwatch_config.jobs_files) == 1:
8!
654
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
8✔
655
        else:
656
            jobs_files = ['in the concatenation of the jobs files'] + [
×
657
                f'• {file},' for file in self.urlwatch_config.jobs_files
658
            ]
659
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)'] + jobs_files)
8✔
660

661
        jobs = {
8✔
662
            job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs if job.is_enabled()
663
        }
664
        if self.urlwatch_config.errors == 'stdout':
8!
665
            print(header)
8✔
666
            for line in error_jobs_lines(jobs):
8✔
667
                print(line)
8✔
668
            print('--')
8✔
669
            duration = time.perf_counter() - start
8✔
670
            print(f"Checked {len(jobs)} enabled job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
8✔
671

672
        else:
673
            message = '\n'.join(error_jobs_lines(jobs))
×
674
            if message:
×
675
                # create a dummy job state to run a reporter on
676
                job_state = JobState(
×
677
                    None,  # type: ignore[arg-type]
678
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
679
                )
680
                job_state.traceback = f'{header}\n{message}'
×
681
                duration = time.perf_counter() - start
×
682
                self.urlwatcher.report.config['footnote'] = (
×
683
                    f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}."
684
                )
685
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
686
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
687
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
688
                self.urlwatcher.report.error(job_state)
×
689
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
690
            else:
691
                print(header)
×
692
                print('--')
×
693
                duration = time.perf_counter() - start
×
694
                print('Found no errors')
×
695
                print(f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
×
696

697
        return 0
8✔
698

699
    def delete_snapshot(self, job_id: Union[str, int]) -> int:
8✔
700
        job = self._find_job_with_defaults(job_id)
8✔
701

702
        deleted = self.urlwatcher.ssdb_storage.delete_latest(job.get_guid())
8✔
703
        if deleted:
8✔
704
            print(f'Deleted last snapshot of {job.get_indexed_location()}')
8✔
705
            return 0
8✔
706
        else:
707
            print(f'No snapshots found to be deleted for {job.get_indexed_location()}')
8✔
708
            return 1
8✔
709

710
    def modify_urls(self) -> int:
8✔
711
        if self.urlwatch_config.delete is not None:
8✔
712
            job = self._find_job(self.urlwatch_config.delete)
8✔
713
            if job is not None:
8!
714
                self.urlwatcher.jobs.remove(job)
8✔
715
                print(f'Removed {job}')
8✔
716
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
717
            else:
718
                print(f'Job not found: {self.urlwatch_config.delete}')
×
719
                return 1
×
720

721
        if self.urlwatch_config.add is not None:
8✔
722
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
723
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
8✔
724
            filters = [v for k, v in items if k == 'filter']
8✔
725
            items2 = [(k, v) for k, v in items if k != 'filter']
8✔
726
            d = {k: v for k, v in items2}
8✔
727
            if filters:
8!
728
                d['filter'] = ','.join(filters)
×
729

730
            job = JobBase.unserialize(d)
8✔
731
            print(f'Adding {job}')
8✔
732
            self.urlwatcher.jobs.append(job)
8✔
733
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
734

735
        if self.urlwatch_config.change_location is not None:
8✔
736
            new_loc = self.urlwatch_config.change_location[1]
8✔
737
            # Ensure the user isn't overwriting an existing job with the change.
738
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
8!
739
                print(
×
740
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
741
                    f'different value.\n'
742
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
743
                )
744
                return 1
×
745
            else:
746
                job = self._find_job(self.urlwatch_config.change_location[0])
8✔
747
                if job is not None:
8!
748
                    # Update the job's location (which will also update the guid) and move any history in the database
749
                    # over to the job's updated guid.
750
                    old_loc = job.get_location()
8✔
751
                    print(f'Moving location of "{old_loc}" to "{new_loc}"')
8✔
752
                    old_guid = job.get_guid()
8✔
753
                    if old_guid not in self.urlwatcher.ssdb_storage.get_guids():
8✔
754
                        print(f'No snapshots found for "{old_loc}"')
8✔
755
                        return 1
8✔
756
                    job.set_base_location(new_loc)
8✔
757
                    num_searched = self.urlwatcher.ssdb_storage.move(old_guid, job.get_guid())
8✔
758
                    if num_searched:
8!
759
                        print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}"')
8✔
760
                else:
761
                    print(f'Job not found: "{self.urlwatch_config.change_location[0]}"')
×
762
                    return 1
×
763
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
8✔
764
            if not input(message).lower().startswith('y'):
8!
765
                print(f'Please manually update the jobs file by replacing "{old_loc}" with "{new_loc}".')
×
766
            else:
767
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
768

769
        return 0
8✔
770

771
    def edit_config(self) -> int:
8✔
772
        result = self.urlwatcher.config_storage.edit()
8✔
773
        return result
8✔
774

775
    def check_telegram_chats(self) -> None:
8✔
776
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
8✔
777

778
        bot_token = config['bot_token']
8✔
779
        if not bot_token:
8✔
780
            print('You need to set up your bot token first (see documentation)')
8✔
781
            self._exit(1)
8✔
782

783
        if httpx:
8!
784
            get_client = httpx.Client(http2=h2 is not None).get
8✔
785
        else:
786
            get_client = requests.get  # type: ignore[assignment]
×
787

788
        info = get_client(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
8✔
789
        if not info['ok']:
8!
790
            print(f"Error with token {bot_token}: {info['description']}")
8✔
791
            self._exit(1)
8✔
792

793
        chats = {}
×
794
        updates = get_client(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
795
        if 'result' in updates:
×
796
            for chat_info in updates['result']:
×
797
                chat = chat_info['message']['chat']
×
798
                if chat['type'] == 'private':
×
799
                    chats[chat['id']] = (
×
800
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
801
                    )
802

803
        if not chats:
×
804
            print(f"No chats found. Say hello to your bot at https://t.me/{info['result']['username']}")
×
805
            self._exit(1)
×
806

807
        headers = ('Chat ID', 'Name')
×
808
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
809
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
810
        fmt = f'%-{maxchat}s  %s'
×
811
        print(fmt % headers)
×
812
        print(fmt % ('-' * maxchat, '-' * maxname))
×
813
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
814
            print(fmt % (k, v))
×
815
        print(f"\nChat up your bot here: https://t.me/{info['result']['username']}")
×
816

817
        self._exit(0)
×
818

819
    def check_test_reporter(
8✔
820
        self,
821
        job_state: Optional[JobState] = None,
822
        label: str = 'test',  # type: ignore[assignment]
823
        report: Optional[Report] = None,
824
    ) -> int:
825
        """
826
        Tests a reporter by creating pseudo-jobs of new, changed, unchanged, and error outcomes ('verb').
827

828
        Note: The report will only show new, unchanged and error content if enabled in the respective `display` keys
829
        of the configuration file.
830

831
        :param job_state: The JobState (Optional).
832
        :param label: The label to be used in the report; defaults to 'test'.
833
        :param report: A Report class to use for testing (Optional).
834
        :return: 0 if successful, 1 otherwise.
835
        """
836

837
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
8✔
838
            """Builds a pseudo-job for the reporter to run on."""
839
            job = JobBase.unserialize({'name': job_name, 'url': url})
8✔
840

841
            # Can pass in None for ssdb_storage, as we are not going to load or save the job state for
842
            # testing; also no need to use it as context manager, since no processing is called on the job
843
            job_state = JobState(None, job)  # type: ignore[arg-type]
8✔
844

845
            job_state.old_data = old
8✔
846
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
8✔
847
            job_state.new_data = new
8✔
848
            job_state.new_timestamp = time.time()
8✔
849

850
            return job_state
8✔
851

852
        def set_error(job_state: 'JobState', message: str) -> JobState:
8✔
853
            """Sets a job error message on a JobState."""
854
            try:
8✔
855
                raise ValueError(message)
8✔
856
            except ValueError as e:
8✔
857
                job_state.exception = e
8✔
858
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
859

860
            return job_state
8✔
861

862
        reporter_name = self.urlwatch_config.test_reporter
8✔
863
        if reporter_name not in ReporterBase.__subclasses__:
8✔
864
            print(f'No such reporter: {reporter_name}')
8✔
865
            print(f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}\n')
8✔
866
            return 1
8✔
867

868
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][
8✔
869
            reporter_name  # type: ignore[literal-required]
870
        ]
871
        if job_state:  # we want a full report
8✔
872
            cfg['enabled'] = True
8✔
873
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
8✔
874
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
8✔
875
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
8✔
876
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
8✔
877
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
8✔
878
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
8✔
879
        if not cfg['enabled']:
8✔
880
            print(f'WARNING: Reporter being tested is not enabled: {reporter_name}')
8✔
881
            print('Will still attempt to test it, but this may not work')
8✔
882
            print(f'Use {__project_name__} --edit-config to configure reporters')
8✔
883
            cfg['enabled'] = True
8✔
884

885
        if report is None:
8✔
886
            report = Report(self.urlwatcher)
8✔
887

888
        if job_state:
8✔
889
            report.custom(job_state, label)  # type: ignore[arg-type]
8✔
890
        else:
891
            report.new(
8✔
892
                build_job(
893
                    'Sample job that was newly added',
894
                    'https://example.com/new',
895
                    '',
896
                    '',
897
                )
898
            )
899
            report.changed(
8✔
900
                build_job(
901
                    'Sample job where something changed',
902
                    'https://example.com/changed',
903
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
904
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
905
                )
906
            )
907
            report.unchanged(
8✔
908
                build_job(
909
                    'Sample job where nothing changed',
910
                    'http://example.com/unchanged',
911
                    'Same Old, Same Old\n',
912
                    'Same Old, Same Old\n',
913
                )
914
            )
915
            report.error(
8✔
916
                set_error(
917
                    build_job(
918
                        'Sample job where an error was encountered',
919
                        'https://example.com/error',
920
                        '',
921
                        '',
922
                    ),
923
                    'The error message would appear here.',
924
                )
925
            )
926

927
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
8✔
928

929
        return 0
8✔
930

931
    def check_smtp_login(self) -> None:
8✔
932
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
8✔
933
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
8✔
934

935
        success = True
8✔
936

937
        if not config['enabled']:
8✔
938
            print('Please enable email reporting in the config first.')
8✔
939
            success = False
8✔
940

941
        if config['method'] != 'smtp':
8✔
942
            print('Please set the method to SMTP for the email reporter.')
8✔
943
            success = False
8✔
944

945
        smtp_auth = smtp_config['auth']
8✔
946
        if not smtp_auth:
8✔
947
            print('Authentication must be enabled for SMTP.')
8✔
948
            success = False
8✔
949

950
        smtp_hostname = smtp_config['host']
8✔
951
        if not smtp_hostname:
8✔
952
            print('Please configure the SMTP hostname in the config first.')
8✔
953
            success = False
8✔
954

955
        smtp_username = smtp_config['user'] or config['from']
8✔
956
        if not smtp_username:
8✔
957
            print('Please configure the SMTP user in the config first.')
8✔
958
            success = False
8✔
959

960
        if not success:
8✔
961
            self._exit(1)
8✔
962

963
        insecure_password = smtp_config['insecure_password']
2✔
964
        if insecure_password:
2!
965
            print('The SMTP password is set in the config file (key "insecure_password")')
2✔
966
        elif smtp_have_password(smtp_hostname, smtp_username):
×
967
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
968
            if not input(message).lower().startswith('y'):
×
969
                print('Password unchanged.')
×
970
            else:
971
                smtp_set_password(smtp_hostname, smtp_username)
×
972

973
        smtp_port = smtp_config['port']
2✔
974
        smtp_tls = smtp_config['starttls']
2✔
975

976
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
2✔
977
        print('Trying to log into the SMTP server...')
2✔
978
        mailer.send(None)
2✔
979
        print('Successfully logged into SMTP server')
×
980

981
        self._exit(0)
×
982

983
    def check_xmpp_login(self) -> None:
8✔
984
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
8✔
985

986
        success = True
8✔
987

988
        if not xmpp_config['enabled']:
8✔
989
            print('Please enable XMPP reporting in the config first.')
8✔
990
            success = False
8✔
991

992
        xmpp_sender = xmpp_config['sender']
8✔
993
        if not xmpp_sender:
8✔
994
            print('Please configure the XMPP sender in the config first.')
8✔
995
            success = False
8✔
996

997
        if not xmpp_config['recipient']:
8✔
998
            print('Please configure the XMPP recipient in the config first.')
8✔
999
            success = False
8✔
1000

1001
        if not success:
8✔
1002
            self._exit(1)
8✔
1003

1004
        if 'insecure_password' in xmpp_config:
8!
1005
            print('The XMPP password is already set in the config (key "insecure_password").')
8✔
1006
            self._exit(0)
8✔
1007

1008
        if xmpp_have_password(xmpp_sender):
×
1009
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
1010
            if input(message).lower() != 'y':
×
1011
                print('Password unchanged.')
×
1012
                self._exit(0)
×
1013

1014
        if success:
×
1015
            xmpp_set_password(xmpp_sender)
×
1016

1017
        self._exit(0)
×
1018

1019
    @staticmethod
1020
    def playwright_install_chrome() -> int:  # pragma: no cover
1021
        """
1022
        Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
1023
        install the browser executable.
1024

1025
        :return: Playwright's executable return code.
1026
        """
1027
        try:
1028
            from playwright._impl._driver import compute_driver_executable
1029
        except ImportError:  # pragma: no cover
1030
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
1031

1032
        driver_executable = compute_driver_executable()
1033
        env = os.environ.copy()
1034
        env['PW_CLI_TARGET_LANG'] = 'python'
1035
        cmd = [str(driver_executable), 'install', 'chrome']
1036
        logger.info(f"Running playwright CLI: {' '.join(cmd)}")
1037
        completed_process = subprocess.run(cmd, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
1038
        if completed_process.returncode:
1039
            print(completed_process.stderr)
1040
            return completed_process.returncode
1041
        if completed_process.stdout:
1042
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1043
        return 0
1044

1045
    def handle_actions(self) -> None:
8✔
1046
        """Handles the actions for command line arguments and exits."""
1047
        if self.urlwatch_config.list_jobs:
8✔
1048
            self.list_jobs(self.urlwatch_config.list_jobs)
8✔
1049
            self._exit(0)
8✔
1050

1051
        if self.urlwatch_config.errors:
8✔
1052
            self._exit(self.list_error_jobs())
8✔
1053

1054
        if self.urlwatch_config.test_job:
8✔
1055
            self.test_job(self.urlwatch_config.test_job)
8✔
1056
            self._exit(0)
8✔
1057

1058
        if self.urlwatch_config.test_differ:
8✔
1059
            self._exit(self.test_differ(self.urlwatch_config.test_differ))
8✔
1060

1061
        if self.urlwatch_config.dump_history:
8✔
1062
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
8✔
1063

1064
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
8✔
1065
            self._exit(self.modify_urls())
8✔
1066

1067
        if self.urlwatch_config.test_reporter:
8✔
1068
            self._exit(self.check_test_reporter())
8✔
1069

1070
        if self.urlwatch_config.smtp_login:
8✔
1071
            self.check_smtp_login()
8✔
1072

1073
        if self.urlwatch_config.telegram_chats:
8✔
1074
            self.check_telegram_chats()
8✔
1075

1076
        if self.urlwatch_config.xmpp_login:
8✔
1077
            self.check_xmpp_login()
8✔
1078

1079
        if self.urlwatch_config.edit:
8✔
1080
            self._exit(self.urlwatcher.jobs_storage.edit())
8✔
1081

1082
        if self.urlwatch_config.edit_config:
8✔
1083
            self._exit(self.edit_config())
8✔
1084

1085
        if self.urlwatch_config.edit_hooks:
8✔
1086
            self._exit(self.edit_hooks())
8✔
1087

1088
        if self.urlwatch_config.gc_database:
8✔
1089
            self.urlwatcher.ssdb_storage.gc(
8✔
1090
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1091
            )
1092
            self.urlwatcher.ssdb_storage.close()
8✔
1093
            self._exit(0)
8✔
1094

1095
        if self.urlwatch_config.clean_database:
8✔
1096
            self.urlwatcher.ssdb_storage.clean_ssdb(
8✔
1097
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1098
            )
1099
            self.urlwatcher.ssdb_storage.close()
8✔
1100
            self._exit(0)
8✔
1101

1102
        if self.urlwatch_config.rollback_database:
8✔
1103
            tz = self.urlwatcher.report.config['report']['tz']
8✔
1104
            self.urlwatcher.ssdb_storage.rollback_cache(self.urlwatch_config.rollback_database, tz)
8✔
1105
            self.urlwatcher.ssdb_storage.close()
8✔
1106
            self._exit(0)
8✔
1107

1108
        if self.urlwatch_config.delete_snapshot:
8✔
1109
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
8✔
1110

1111
        if self.urlwatch_config.features:
8✔
1112
            self._exit(self.show_features())
8✔
1113

1114
        if self.urlwatch_config.detailed_versions:
8!
1115
            self._exit(self.show_detailed_versions())
8✔
1116

1117
    def run(self) -> None:  # pragma: no cover
1118
        """The main run logic."""
1119
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1120
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1121

1122
        self.handle_actions()
1123

1124
        self.urlwatcher.run_jobs()
1125

1126
        self.urlwatcher.close()
1127

1128
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc