• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 11226808835

08 Oct 2024 01:21AM UTC coverage: 77.666% (-0.1%) from 77.792%
11226808835

push

github

mborsetti
Version 3.26.0rc0

1751 of 2524 branches covered (69.37%)

Branch coverage included in aggregate %.

4477 of 5495 relevant lines covered (81.47%)

4.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.47
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
6✔
6

7
import contextlib
6✔
8
import difflib
6✔
9
import email.utils
6✔
10
import importlib.metadata
6✔
11
import logging
6✔
12
import os
6✔
13
import platform
6✔
14
import re
6✔
15
import shutil
6✔
16
import sqlite3
6✔
17
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
6✔
18
import sys
6✔
19
import time
6✔
20
import traceback
6✔
21
from concurrent.futures import ThreadPoolExecutor
6✔
22
from datetime import datetime
6✔
23
from pathlib import Path
6✔
24
from typing import Iterable, Iterator, TYPE_CHECKING
6✔
25
from urllib.parse import unquote_plus
6✔
26
from zoneinfo import ZoneInfo
6✔
27

28
from webchanges import __docs_url__, __project_name__, __version__
6✔
29
from webchanges.differs import DifferBase
6✔
30
from webchanges.filters import FilterBase
6✔
31
from webchanges.handler import JobState, Report
6✔
32
from webchanges.jobs import BrowserJob, JobBase, NotModifiedError, UrlJob
6✔
33
from webchanges.mailer import smtp_have_password, smtp_set_password, SMTPMailer
6✔
34
from webchanges.reporters import ReporterBase, xmpp_have_password, xmpp_set_password
6✔
35
from webchanges.util import dur_text, edit_file, import_module_from_source
6✔
36

37
try:
6✔
38
    import httpx
6✔
39
except ImportError:  # pragma: no cover
40
    httpx = None  # type: ignore[assignment]
41
    print("Required package 'httpx' not found; will attempt to run using 'requests'")
42
    try:
43
        import requests
44
    except ImportError as e:  # pragma: no cover
45
        raise RuntimeError(
46
            f"A Python HTTP client package (either 'httpx' or 'requests' is required to run {__project_name__}; "
47
            'neither can be imported.'
48
        ) from e
49
if httpx is not None:
6!
50
    try:
6✔
51
        import h2
6✔
52
    except ImportError:  # pragma: no cover
53
        h2 = None  # type: ignore[assignment]
54

55
try:
6✔
56
    import apt
6✔
57
except ImportError:  # pragma: no cover
58
    apt = None  # type: ignore[assignment]
59

60
try:
6✔
61
    from pip._internal.metadata import get_default_environment
6✔
62
except ImportError:  # pragma: no cover
63
    get_default_environment = None  # type: ignore[assignment]
64

65
try:
6✔
66
    from playwright.sync_api import sync_playwright
6✔
67
except ImportError:  # pragma: no cover
68
    sync_playwright = None  # type: ignore[assignment]
69

70
try:
6✔
71
    import psutil
6✔
72
    from psutil._common import bytes2human
6✔
73
except ImportError:  # pragma: no cover
74
    psutil = None  # type: ignore[assignment]
75
    bytes2human = None  # type: ignore[assignment]
76

77
logger = logging.getLogger(__name__)
6✔
78

79
if TYPE_CHECKING:
80
    from webchanges.main import Urlwatch
81
    from webchanges.reporters import _ConfigReportersList
82
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
83

84

85
class UrlwatchCommand:
6✔
86
    """The class that runs the program after initialization and CLI arguments parsing."""
87

88
    def __init__(self, urlwatcher: Urlwatch) -> None:
6✔
89
        self.urlwatcher = urlwatcher
6✔
90
        self.urlwatch_config = urlwatcher.urlwatch_config
6✔
91

92
    @staticmethod
6✔
93
    def _exit(arg: str | int | None) -> None:
6✔
94
        logger.info(f'Exiting with exit code {arg}')
6✔
95
        sys.exit(arg)
6✔
96

97
    def jobs_from_joblist(self) -> Iterator[JobBase]:
6✔
98
        """Generates the jobs to process from the joblist entered in the CLI."""
99
        if self.urlwatcher.urlwatch_config.joblist:
6✔
100
            jobs = {self._find_job(job_entry) for job_entry in self.urlwatcher.urlwatch_config.joblist}
6✔
101
            enabled_jobs = {job for job in jobs if job.is_enabled()}
6✔
102
            disabled = len(enabled_jobs) - len(jobs)
6✔
103
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
6✔
104
            logger.debug(
6✔
105
                f"Processing {len(enabled_jobs)} job{'s' if len(enabled_jobs) else ''}{disabled_str} as specified in "
106
                f"command line: {', '.join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}"
107
            )
108
        else:
109
            enabled_jobs = {job for job in self.urlwatcher.jobs if job.is_enabled()}
6✔
110
            disabled = len(enabled_jobs) - len(self.urlwatcher.jobs)
6✔
111
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
6✔
112
            logger.debug(f"Processing {len(enabled_jobs)} job{'s' if len(enabled_jobs) else ''}{disabled_str}")
6✔
113
        for job in enabled_jobs:
6✔
114
            yield job.with_defaults(self.urlwatcher.config_storage.config)
6✔
115

116
    def edit_hooks(self) -> int:
6✔
117
        """Edit hooks file.
118

119
        :returns: 0 if edit is successful, 1 otherwise.
120
        """
121
        # Similar code to BaseTextualFileStorage.edit()
122
        for hooks_file in self.urlwatch_config.hooks_files:
6✔
123
            logger.debug(f'Edit file {hooks_file}')
6✔
124
            # Python 3.9: hooks_edit = self.urlwatch_config.hooks.with_stem(self.urlwatch_config.hooks.stem + '_edit')
125
            hooks_edit = hooks_file.parent.joinpath(hooks_file.stem + '_edit' + ''.join(hooks_file.suffixes))
6✔
126
            if hooks_file.exists():
6!
127
                shutil.copy(hooks_file, hooks_edit)
6✔
128
            # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
129
            #         self.urlwatch_config.hooks_py_example):
130
            #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
131

132
            while True:
6✔
133
                try:
6✔
134
                    edit_file(hooks_edit)
6✔
135
                    import_module_from_source('hooks', hooks_edit)
6✔
136
                    break  # stop if no exception on parser
6✔
137
                except SystemExit:
6!
138
                    raise
×
139
                except Exception as e:
6✔
140
                    print('Parsing failed:')
6✔
141
                    print('======')
6✔
142
                    print(e)
6✔
143
                    print('======')
6✔
144
                    print('')
6✔
145
                    print(f'The file {hooks_file} was NOT updated.')
6✔
146
                    user_input = input('Do you want to retry the same edit? (Y/n)')
6✔
147
                    if not user_input or user_input.lower()[0] == 'y':
×
148
                        continue
×
149
                    hooks_edit.unlink()
×
150
                    print('No changes have been saved.')
×
151
                    return 1
×
152

153
            if hooks_file.is_symlink():
6!
154
                hooks_file.write_text(hooks_edit.read_text())
×
155
            else:
156
                hooks_edit.replace(hooks_file)
6✔
157
            hooks_edit.unlink(missing_ok=True)
6✔
158
            print(f'Saved edits in {hooks_file}')
6✔
159

160
        return 0
6✔
161

162
    @staticmethod
6✔
163
    def show_features() -> int:
6✔
164
        """
165
        Prints the "features", i.e. a list of job types, filters and reporters.
166

167
        :return: 0.
168
        """
169
        print(f'Please see full documentation at {__docs_url__}')
6✔
170
        print()
6✔
171
        print('Supported jobs:\n')
6✔
172
        print(JobBase.job_documentation())
6✔
173
        print('Supported filters:\n')
6✔
174
        print(FilterBase.filter_documentation())
6✔
175
        print()
6✔
176
        print('Supported differs:\n')
6✔
177
        print(DifferBase.differ_documentation())
6✔
178
        print()
6✔
179
        print('Supported reporters:\n')
6✔
180
        print(ReporterBase.reporter_documentation())
6✔
181
        print()
6✔
182
        print(f'Please see full documentation at {__docs_url__}')
6✔
183

184
        return 0
6✔
185

186
    @staticmethod
6✔
187
    def show_detailed_versions() -> int:
6✔
188
        """
189
        Prints the detailed versions, including of dependencies.
190

191
        :return: 0.
192
        """
193

194
        def dependencies() -> list[str]:
6✔
195
            if get_default_environment is not None:
6!
196
                env = get_default_environment()
6✔
197
                dist = None
6✔
198
                for dist in env.iter_all_distributions():
6✔
199
                    if dist.canonical_name == __project_name__:
6!
200
                        break
×
201
                if dist and dist.canonical_name == __project_name__:
6!
202
                    return sorted(set(d.split()[0] for d in dist.metadata_dict['requires_dist']), key=str.lower)
×
203

204
            # default list of all possible dependencies
205
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
6✔
206
            return [
6✔
207
                'aioxmpp',
208
                'beautifulsoup4',
209
                'chump',
210
                'colorama',
211
                'cryptography',
212
                'cssbeautifier',
213
                'cssselect',
214
                'deepdiff',
215
                'h2',
216
                'html2text',
217
                'httpx',
218
                'jq',
219
                'jsbeautifier',
220
                'keyring',
221
                'lxml',
222
                'markdown2',
223
                'matrix_client',
224
                'msgpack',
225
                'pdftotext',
226
                'Pillow',
227
                'platformdirs',
228
                'playwright',
229
                'psutil',
230
                'pushbullet.py',
231
                'pypdf',
232
                'pytesseract',
233
                'pyyaml',
234
                'redis',
235
                'requests',
236
                'tzdata',
237
                'vobject',
238
            ]
239

240
        print('Software:')
6✔
241
        print(f'• {__project_name__}: {__version__}')
6✔
242
        print(
6✔
243
            f'• {platform.python_implementation()}: {platform.python_version()} '
244
            f'{platform.python_build()} {platform.python_compiler()}'
245
        )
246
        print(f'• SQLite: {sqlite3.sqlite_version}')
6✔
247

248
        if psutil:
6!
249
            print()
6✔
250
            print('System:')
6✔
251
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
6✔
252
            print(f'• Processor: {platform.processor()}')
6✔
253
            print(f'• CPUs (logical): {psutil.cpu_count()}')
6✔
254
            try:
6✔
255
                virt_mem = psutil.virtual_memory().available
6✔
256
                print(
6✔
257
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
258
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
259
                )
260
            except psutil.Error as e:  # pragma: no cover
261
                print(f'• Free memory: Could not read information: {e}')
262
            print(
6✔
263
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
264
                f"({100 - psutil.disk_usage('/').percent:.1f}%)"
265
            )
266
            executor = ThreadPoolExecutor()
6✔
267
            print(f'• --max-threads default: {executor._max_workers}')
6✔
268

269
        print()
6✔
270
        print('Installed PyPi dependencies:')
6✔
271
        for module_name in dependencies():
6✔
272
            try:
6✔
273
                mod = importlib.metadata.distribution(module_name)
6✔
274
            except ModuleNotFoundError:
6✔
275
                continue
6✔
276
            print(f'• {module_name}: {mod.version}')
6✔
277
            # package requirements
278
            if mod.requires:
6✔
279
                for req_name in [i.split()[0] for i in mod.requires]:
6✔
280
                    try:
6✔
281
                        req = importlib.metadata.distribution(req_name)
6✔
282
                    except ModuleNotFoundError:
6✔
283
                        continue
6✔
284
                    print(f'  - {req_name}: {req.version}')
6✔
285

286
        # playwright
287
        if sync_playwright is not None:
6!
288
            with sync_playwright() as p:
6!
289
                browser = p.chromium.launch(channel='chrome')
×
290
                print()
×
291
                print('Playwright browser:')
×
292
                print(f'• Name: {browser.browser_type.name}')
×
293
                print(f'• Version: {browser.version}')
×
294
                if psutil:
×
295
                    browser.new_page()
×
296
                    try:
×
297
                        virt_mem = psutil.virtual_memory().available
×
298
                        print(
×
299
                            f'• Free memory with browser loaded: {bytes2human(virt_mem)} physical plus '
300
                            f'{bytes2human(psutil.swap_memory().free)} swap'
301
                        )
302
                    except psutil.Error:
×
303
                        pass
×
304

305
        if os.name == 'posix' and apt:
6!
306
            apt_cache = apt.Cache()
×
307

308
            def print_version(libs: list[str]) -> None:
×
309
                for lib in libs:
×
310
                    if lib in apt_cache:
×
311
                        if ver := apt_cache[lib].versions:
×
312
                            print(f'   - {ver[0].package}: {ver[0].version}')
×
313
                return None
×
314

315
            print()
×
316
            print('Installed dpkg dependencies:')
×
317
            for module, apt_dists in (
×
318
                ('jq', ['jq']),
319
                # https://github.com/jalan/pdftotext#os-dependencies
320
                ('pdftotext', ['libpoppler-cpp-dev']),
321
                # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
322
                (
323
                    'Pillow',
324
                    [
325
                        'libjpeg-dev',
326
                        'zlib-dev',
327
                        'zlib1g-dev',
328
                        'libtiff-dev',
329
                        'libfreetype-dev',
330
                        'littlecms-dev',
331
                        'libwebp-dev',
332
                        'tcl/tk-dev',
333
                        'openjpeg-dev',
334
                        'libimagequant-dev',
335
                        'libraqm-dev',
336
                        'libxcb-dev',
337
                        'libxcb1-dev',
338
                    ],
339
                ),
340
                ('playwright', ['google-chrome-stable']),
341
                # https://tesseract-ocr.github.io/tessdoc/Installation.html
342
                ('pytesseract', ['tesseract-ocr']),
343
            ):
344
                try:
×
345
                    importlib.metadata.distribution(module)
×
346
                    print(f'• {module}')
×
347
                    print_version(apt_dists)
×
348
                except importlib.metadata.PackageNotFoundError:
×
349
                    pass
×
350
        return 0
6✔
351

352
    def list_jobs(self, regex: bool | str) -> None:
6✔
353
        """
354
        Lists the job and their respective _index_number.
355

356
        :return: None.
357
        """
358
        if isinstance(regex, str):
6!
359
            print(f"List of jobs matching the RegEx '{regex}':")
×
360
        else:
361
            print('List of jobs:')
6✔
362
        for job in self.urlwatcher.jobs:
6✔
363
            if self.urlwatch_config.verbose:
6✔
364
                job_desc = f'{job.index_number:3}: {job!r}'
6✔
365
            else:
366
                pretty_name = job.pretty_name()
6✔
367
                location = job.get_location()
6✔
368
                if pretty_name != location:
6!
369
                    job_desc = f'{job.index_number:3}: {pretty_name} ({location})'
6✔
370
                else:
371
                    job_desc = f'{job.index_number:3}: {pretty_name}'
×
372
            if isinstance(regex, bool) or re.findall(regex, job_desc):
6!
373
                print(job_desc)
6✔
374

375
        if len(self.urlwatch_config.jobs_files) > 1:
6✔
376
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
6✔
377
        elif len(self.urlwatch_config.jobs_files) == 1:
6✔
378
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
6✔
379
        else:
380
            jobs_files = []
6✔
381
        print('\n   '.join(jobs_files))
6✔
382

383
    def _find_job(self, query: str | int) -> JobBase:
6✔
384
        """Finds the job based on a query, which is matched to the job index (also negative) or a job location
385
        (i.e. the url/user_visible_url or command).
386

387
        :param query: The query.
388
        :return: The matching JobBase.
389
        :raises IndexError: If job is not found.
390
        """
391
        if isinstance(query, int):
6✔
392
            index = query
6✔
393
        else:
394
            try:
6✔
395
                index = int(query)
6✔
396
            except ValueError:
6✔
397
                query = unquote_plus(query)
6✔
398
                try:
6✔
399
                    return next((job for job in self.urlwatcher.jobs if unquote_plus(job.get_location()) == query))
6✔
400
                except StopIteration:
6✔
401
                    raise ValueError(f"Job {query} does not match any job's url/user_visible_url or command.") from None
6✔
402

403
        if index == 0:
6✔
404
            raise ValueError(f'Job index {index} out of range.')
6✔
405
        try:
6✔
406
            if index <= 0:
6✔
407
                return self.urlwatcher.jobs[index]
6✔
408
            else:
409
                return self.urlwatcher.jobs[index - 1]
6✔
410
        except IndexError as e:
6✔
411
            raise ValueError(f'Job index {index} out of range (found {len(self.urlwatcher.jobs)} jobs).') from e
6✔
412

413
    def _find_job_with_defaults(self, query: str | int) -> JobBase:
6✔
414
        """
415
        Returns the job with defaults based on job_id, which could match an index or match a location
416
        (url/user_visible_url or command). Accepts negative numbers.
417

418
        :param query: The query.
419
        :return: The matching JobBase with defaults.
420
        :raises SystemExit: If job is not found.
421
        """
422
        job = self._find_job(query)
6✔
423
        return job.with_defaults(self.urlwatcher.config_storage.config)
6✔
424

425
    def test_job(self, job_id: bool | str | int) -> None:
6✔
426
        """
427
        Tests the running of a single job outputting the filtered text to stdout. If job_id is True, don't run any
428
        jobs as it's a test of loading config, jobs and hook files for syntax.
429

430
        :param job_id: The job_id or True.
431

432
        :return: None.
433

434
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
435
        """
436
        if job_id is True:
6✔
437
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
6✔
438
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
6✔
439
            if len(self.urlwatch_config.jobs_files) == 1:
6✔
440
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]},')
6✔
441
            else:
442
                message.append(
6✔
443
                    '\n   '.join(
444
                        [f'{conj}jobs files'] + [f'• {file},' for file in sorted(self.urlwatch_config.jobs_files)]
445
                    )
446
                )
447
            if 'hooks' in sys.modules:
6!
448
                message.append(f"\nand hooks file {sys.modules['hooks'].__file__}")
6✔
449
            print(f"{''.join(message)}.")
6✔
450
            return
6✔
451

452
        job = self._find_job_with_defaults(job_id)
6✔
453
        start = time.perf_counter()
6✔
454

455
        if isinstance(job, UrlJob):
6!
456
            # Force re-retrieval of job, as we're testing filters
457
            job.ignore_cached = True
×
458

459
        # Add defaults, as if when run
460
        job = job.with_defaults(self.urlwatcher.config_storage.config)
6✔
461

462
        with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
6✔
463
            job_state.process(headless=not self.urlwatch_config.no_headless)
6✔
464
            duration = time.perf_counter() - start
6✔
465
            if job_state.exception is not None:
6!
466
                raise job_state.exception from job_state.exception
×
467
            output = [
6✔
468
                job_state.job.pretty_name(),
469
                ('-' * len(job_state.job.pretty_name())),
470
            ]
471
            if job_state.job.note:
6!
472
                output.append(job_state.job.note)
×
473
            output.extend(
6✔
474
                [
475
                    '',
476
                    str(job_state.new_data),
477
                    '',
478
                    '--',
479
                    f'Job tested in {dur_text(duration)} with {__project_name__} {__version__}.',
480
                ]
481
            )
482
            print('\n'.join(output))
6✔
483
        return
6✔
484

485
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
486
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
487

488
    def test_differ(self, arg_test_differ: list[str]) -> int:
6✔
489
        """
490
        Runs diffs for a job on all the saved snapshots and outputs the result to stdout or whatever reporter is
491
        selected with --test-reporter.
492

493
        :param arg_test_differ: Either the job_id or a list containing [job_id, max_diffs]
494
        :return: 1 if error, 0 if successful.
495
        """
496
        report = Report(self.urlwatcher)
6✔
497
        self.urlwatch_config.jobs_files = [Path('--test-differ')]  # for report footer
6✔
498
        if len(arg_test_differ) == 1:
6✔
499
            job_id = arg_test_differ[0]
6✔
500
            max_diffs = None
6✔
501
        elif len(arg_test_differ) == 2:
6!
502
            job_id, max_diffs_str = arg_test_differ
6✔
503
            max_diffs = int(max_diffs_str)
6✔
504
        else:
505
            raise ValueError('--test-differ takes a maximum of two arguments')
×
506

507
        job = self._find_job_with_defaults(job_id)
6✔
508

509
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid())
6✔
510

511
        num_snapshots = len(history_data)
6✔
512
        if num_snapshots == 0:
6✔
513
            print('This job has never been run before.')
6✔
514
            return 1
6✔
515
        elif num_snapshots < 2:
6✔
516
            print('Not enough historic data available (need at least 2 different snapshots).')
6✔
517
            return 1
6✔
518

519
        if job.compared_versions and job.compared_versions != 1:
6!
520
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
521

522
        max_diffs = max_diffs or num_snapshots - 1
6✔
523
        for i in range(max_diffs):
6✔
524
            with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
6✔
525
                job_state.new_data = history_data[i].data
6✔
526
                job_state.new_timestamp = history_data[i].timestamp
6✔
527
                job_state.new_etag = history_data[i].etag
6✔
528
                job_state.new_mime_type = history_data[i].mime_type
6✔
529
                if not job.compared_versions or job.compared_versions == 1:
6!
530
                    job_state.old_data = history_data[i + 1].data
6✔
531
                    job_state.old_timestamp = history_data[i + 1].timestamp
6✔
532
                    job_state.old_etag = history_data[i + 1].etag
6✔
533
                    job_state.old_mime_type = history_data[i + 1].mime_type
6✔
534
                else:
535
                    history_dic_snapshots = {s.data: s for s in history_data[i + 1 : i + 1 + job.compared_versions]}
×
536
                    close_matches: list[str] = difflib.get_close_matches(
×
537
                        str(job_state.new_data), history_dic_snapshots.keys(), n=1  # type: ignore[arg-type]
538
                    )
539
                    if close_matches:
×
540
                        job_state.old_data = close_matches[0]
×
541
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
542
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
543
                        job_state.old_mime_type = history_dic_snapshots[close_matches[0]].mime_type
×
544

545
                if self.urlwatch_config.test_reporter is None:
6✔
546
                    self.urlwatch_config.test_reporter = 'stdout'  # default
6✔
547
                report.job_states = []  # required
6✔
548
                if job_state.new_data == job_state.old_data:
6!
549
                    label = (
×
550
                        f'No change (snapshots {-i:2} AND {-(i + 1):2}) with '
551
                        f"'compared_versions: {job.compared_versions}'"
552
                    )
553
                    job_state.verb = 'changed,no_report'
×
554
                else:
555
                    label = f'Filtered diff (snapshots {-i:2} and {-(i + 1):2})'
6✔
556
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
6✔
557
                if errorlevel:
6!
558
                    self._exit(errorlevel)
×
559

560
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
561
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
562

563
        return 0
6✔
564

565
    def dump_history(self, job_id: str) -> int:
6✔
566
        """
567
        Displays the historical data stored in the snapshot database for a job.
568

569
        :param job_id: The Job ID.
570
        :return: An argument to be used in sys.exit.
571
        """
572

573
        job = self._find_job_with_defaults(job_id)
6✔
574
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid())
6✔
575

576
        print(f'History for job {job.get_indexed_location()}:')
6✔
577
        print(f'(ID: {job.get_guid()})')
6✔
578
        total_failed = 0
6✔
579
        if history_data:
6✔
580
            print('=' * 50)
6✔
581
        for i, snapshot in enumerate(history_data):
6✔
582
            mime_type = f'; {snapshot.mime_type}' if snapshot.mime_type else ''
6✔
583
            etag = f'; ETag: {snapshot.etag}' if snapshot.etag else ''
6✔
584
            tries = f'; error run (number {snapshot.tries})' if snapshot.tries else ''
6✔
585
            total_failed += snapshot.tries > 0
6✔
586
            tz = self.urlwatcher.report.config['report']['tz']
6✔
587
            tzinfo = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
6✔
588
            dt = datetime.fromtimestamp(snapshot.timestamp, tzinfo)
6✔
589
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{mime_type}{etag}{tries}'
6✔
590
            sep_len = max(50, len(header))
6✔
591
            print(header)
6✔
592
            print('-' * sep_len)
6✔
593
            print(snapshot[0])
6✔
594
            print('=' * sep_len, '\n')
6✔
595

596
        print(
6✔
597
            f'Found {len(history_data) - total_failed}'
598
            + (' good' if total_failed else '')
599
            + ' snapshot'
600
            + ('s' if len(history_data) - total_failed != 1 else '')
601
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
602
            + '.'
603
        )
604

605
        return 0
6✔
606

607
    def list_error_jobs(self) -> int:
6✔
608
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
6✔
609
            print(f'Invalid reporter {self.urlwatch_config.errors}')
6✔
610
            return 1
6✔
611

612
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Iterator[str]:
6✔
613
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
614

615
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
616
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
617
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
618
            """
619
            with contextlib.ExitStack() as stack:
6✔
620
                max_workers = min(32, os.cpu_count() or 1) if any(isinstance(job, BrowserJob) for job in jobs) else None
6✔
621
                logger.debug(f'Max_workers set to {max_workers}')
6✔
622
                executor = ThreadPoolExecutor(max_workers=max_workers)
6✔
623

624
                for job_state in executor.map(
6✔
625
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
626
                    (stack.enter_context(JobState(self.urlwatcher.ssdb_storage, job)) for job in jobs),
627
                ):
628
                    if job_state.exception is None or isinstance(job_state.exception, NotModifiedError):
6✔
629
                        if (
6!
630
                            len(job_state.new_data.strip()) == 0
631
                            if hasattr(job_state, 'new_data')
632
                            else len(job_state.old_data.strip()) == 0
633
                        ):
634
                            if self.urlwatch_config.verbose:
×
635
                                yield f'{job_state.job.index_number:3}: No data: {job_state.job!r}'
×
636
                            else:
637
                                pretty_name = job_state.job.pretty_name()
×
638
                                location = job_state.job.get_location()
×
639
                                if pretty_name != location:
×
640
                                    yield f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})'
×
641
                                else:
642
                                    yield f'{job_state.job.index_number:3}: No data: {pretty_name}'
×
643
                    else:
644
                        pretty_name = job_state.job.pretty_name()
6✔
645
                        location = job_state.job.get_location()
6✔
646
                        if pretty_name != location:
6!
647
                            yield (
6✔
648
                                f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
649
                                f'({location})'
650
                            )
651
                        else:
652
                            yield f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})'
×
653

654
        start = time.perf_counter()
6✔
655
        if len(self.urlwatch_config.jobs_files) == 1:
6!
656
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
6✔
657
        else:
658
            jobs_files = ['in the concatenation of the jobs files'] + [
×
659
                f'• {file},' for file in self.urlwatch_config.jobs_files
660
            ]
661
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)'] + jobs_files)
6✔
662

663
        jobs = {
6✔
664
            job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs if job.is_enabled()
665
        }
666
        if self.urlwatch_config.errors == 'stdout':
6!
667
            print(header)
6✔
668
            for line in error_jobs_lines(jobs):
6✔
669
                print(line)
6✔
670
            print('--')
6✔
671
            duration = time.perf_counter() - start
6✔
672
            print(f"Checked {len(jobs)} enabled job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
6✔
673

674
        else:
675
            message = '\n'.join(error_jobs_lines(jobs))
×
676
            if message:
×
677
                # create a dummy job state to run a reporter on
678
                job_state = JobState(
×
679
                    None,  # type: ignore[arg-type]
680
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
681
                )
682
                job_state.traceback = f'{header}\n{message}'
×
683
                duration = time.perf_counter() - start
×
684
                self.urlwatcher.report.config['footnote'] = (
×
685
                    f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}."
686
                )
687
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
688
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
689
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
690
                self.urlwatcher.report.error(job_state)
×
691
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
692
            else:
693
                print(header)
×
694
                print('--')
×
695
                duration = time.perf_counter() - start
×
696
                print('Found no errors')
×
697
                print(f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
×
698

699
        return 0
6✔
700

701
    def delete_snapshot(self, job_id: str | int) -> int:
6✔
702
        job = self._find_job_with_defaults(job_id)
6✔
703

704
        deleted = self.urlwatcher.ssdb_storage.delete_latest(job.get_guid())
6✔
705
        if deleted:
6✔
706
            print(f'Deleted last snapshot of {job.get_indexed_location()}')
6✔
707
            return 0
6✔
708
        else:
709
            print(f'No snapshots found to be deleted for {job.get_indexed_location()}')
6✔
710
            return 1
6✔
711

712
    def modify_urls(self) -> int:
6✔
713
        if self.urlwatch_config.delete is not None:
6✔
714
            job = self._find_job(self.urlwatch_config.delete)
6✔
715
            if job is not None:
6!
716
                self.urlwatcher.jobs.remove(job)
6✔
717
                print(f'Removed {job}')
6✔
718
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
6✔
719
            else:
720
                print(f'Job not found: {self.urlwatch_config.delete}')
×
721
                return 1
×
722

723
        if self.urlwatch_config.add is not None:
6✔
724
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
725
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
6✔
726
            filters = [v for k, v in items if k == 'filter']
6✔
727
            items2 = [(k, v) for k, v in items if k != 'filter']
6✔
728
            d = {k: v for k, v in items2}
6✔
729
            if filters:
6!
730
                d['filter'] = ','.join(filters)
×
731

732
            job = JobBase.unserialize(d)
6✔
733
            print(f'Adding {job}')
6✔
734
            self.urlwatcher.jobs.append(job)
6✔
735
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
6✔
736

737
        if self.urlwatch_config.change_location is not None:
6✔
738
            new_loc = self.urlwatch_config.change_location[1]
6✔
739
            # Ensure the user isn't overwriting an existing job with the change.
740
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
6!
741
                print(
×
742
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
743
                    f'different value.\n'
744
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
745
                )
746
                return 1
×
747
            else:
748
                job = self._find_job(self.urlwatch_config.change_location[0])
6✔
749
                if job is not None:
6!
750
                    # Update the job's location (which will also update the guid) and move any history in the database
751
                    # over to the job's updated guid.
752
                    old_loc = job.get_location()
6✔
753
                    print(f'Moving location of "{old_loc}" to "{new_loc}"')
6✔
754
                    old_guid = job.get_guid()
6✔
755
                    if old_guid not in self.urlwatcher.ssdb_storage.get_guids():
6✔
756
                        print(f'No snapshots found for "{old_loc}"')
6✔
757
                        return 1
6✔
758
                    job.set_base_location(new_loc)
6✔
759
                    num_searched = self.urlwatcher.ssdb_storage.move(old_guid, job.get_guid())
6✔
760
                    if num_searched:
6!
761
                        print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}"')
6✔
762
                else:
763
                    print(f'Job not found: "{self.urlwatch_config.change_location[0]}"')
×
764
                    return 1
×
765
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
6✔
766
            if not input(message).lower().startswith('y'):
6!
767
                print(f'Please manually update the jobs file by replacing "{old_loc}" with "{new_loc}".')
×
768
            else:
769
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
6✔
770

771
        return 0
6✔
772

773
    def edit_config(self) -> int:
6✔
774
        result = self.urlwatcher.config_storage.edit()
6✔
775
        return result
6✔
776

777
    def check_telegram_chats(self) -> None:
6✔
778
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
6✔
779

780
        bot_token = config['bot_token']
6✔
781
        if not bot_token:
6✔
782
            print('You need to set up your bot token first (see documentation)')
6✔
783
            self._exit(1)
6✔
784

785
        if httpx:
6!
786
            get_client = httpx.Client(http2=h2 is not None).get  # noqa: S113 Call to httpx without timeout
6✔
787
        else:
788
            get_client = requests.get  # type: ignore[assignment]
×
789

790
        info = get_client(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
6✔
791
        if not info['ok']:
6!
792
            print(f"Error with token {bot_token}: {info['description']}")
6✔
793
            self._exit(1)
6✔
794

795
        chats = {}
×
796
        updates = get_client(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
797
        if 'result' in updates:
×
798
            for chat_info in updates['result']:
×
799
                chat = chat_info['message']['chat']
×
800
                if chat['type'] == 'private':
×
801
                    chats[chat['id']] = (
×
802
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
803
                    )
804

805
        if not chats:
×
806
            print(f"No chats found. Say hello to your bot at https://t.me/{info['result']['username']}")
×
807
            self._exit(1)
×
808

809
        headers = ('Chat ID', 'Name')
×
810
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
811
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
812
        fmt = f'%-{maxchat}s  %s'
×
813
        print(fmt % headers)
×
814
        print(fmt % ('-' * maxchat, '-' * maxname))
×
815
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
816
            print(fmt % (k, v))
×
817
        print(f"\nChat up your bot here: https://t.me/{info['result']['username']}")
×
818

819
        self._exit(0)
×
820

821
    def check_test_reporter(
6✔
822
        self,
823
        job_state: JobState | None = None,
824
        label: str = 'test',  # type: ignore[assignment]
825
        report: Report | None = None,
826
    ) -> int:
827
        """
828
        Tests a reporter by creating pseudo-jobs of new, changed, unchanged, and error outcomes ('verb').
829

830
        Note: The report will only show new, unchanged and error content if enabled in the respective `display` keys
831
        of the configuration file.
832

833
        :param job_state: The JobState (Optional).
834
        :param label: The label to be used in the report; defaults to 'test'.
835
        :param report: A Report class to use for testing (Optional).
836
        :return: 0 if successful, 1 otherwise.
837
        """
838

839
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
6✔
840
            """Builds a pseudo-job for the reporter to run on."""
841
            job = JobBase.unserialize({'name': job_name, 'url': url})
6✔
842

843
            # Can pass in None for ssdb_storage, as we are not going to load or save the job state for
844
            # testing; also no need to use it as context manager, since no processing is called on the job
845
            job_state = JobState(None, job)  # type: ignore[arg-type]
6✔
846

847
            job_state.old_data = old
6✔
848
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
6✔
849
            job_state.new_data = new
6✔
850
            job_state.new_timestamp = time.time()
6✔
851

852
            return job_state
6✔
853

854
        def set_error(job_state: 'JobState', message: str) -> JobState:
6✔
855
            """Sets a job error message on a JobState."""
856
            try:
6✔
857
                raise ValueError(message)
6✔
858
            except ValueError as e:
6✔
859
                job_state.exception = e
6✔
860
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
6✔
861

862
            return job_state
6✔
863

864
        reporter_name = self.urlwatch_config.test_reporter
6✔
865
        if reporter_name not in ReporterBase.__subclasses__:
6✔
866
            print(f'No such reporter: {reporter_name}')
6✔
867
            print(f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}\n')
6✔
868
            return 1
6✔
869

870
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][
6✔
871
            reporter_name  # type: ignore[literal-required]
872
        ]
873
        if job_state:  # we want a full report
6✔
874
            cfg['enabled'] = True
6✔
875
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
6✔
876
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
6✔
877
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
6✔
878
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
6✔
879
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
6✔
880
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
6✔
881
        if not cfg['enabled']:
6✔
882
            print(f'WARNING: Reporter being tested is not enabled: {reporter_name}')
6✔
883
            print('Will still attempt to test it, but this may not work')
6✔
884
            print(f'Use {__project_name__} --edit-config to configure reporters')
6✔
885
            cfg['enabled'] = True
6✔
886

887
        if report is None:
6✔
888
            report = Report(self.urlwatcher)
6✔
889

890
        if job_state:
6✔
891
            report.custom(job_state, label)  # type: ignore[arg-type]
6✔
892
        else:
893
            report.new(
6✔
894
                build_job(
895
                    'Sample job that was newly added',
896
                    'https://example.com/new',
897
                    '',
898
                    '',
899
                )
900
            )
901
            report.changed(
6✔
902
                build_job(
903
                    'Sample job where something changed',
904
                    'https://example.com/changed',
905
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
906
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
907
                )
908
            )
909
            report.unchanged(
6✔
910
                build_job(
911
                    'Sample job where nothing changed',
912
                    'http://example.com/unchanged',
913
                    'Same Old, Same Old\n',
914
                    'Same Old, Same Old\n',
915
                )
916
            )
917
            report.error(
6✔
918
                set_error(
919
                    build_job(
920
                        'Sample job where an error was encountered',
921
                        'https://example.com/error',
922
                        '',
923
                        '',
924
                    ),
925
                    'The error message would appear here.',
926
                )
927
            )
928

929
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
6✔
930

931
        return 0
6✔
932

933
    def check_smtp_login(self) -> None:
6✔
934
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
6✔
935
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
6✔
936

937
        success = True
6✔
938

939
        if not config['enabled']:
6✔
940
            print('Please enable email reporting in the config first.')
6✔
941
            success = False
6✔
942

943
        if config['method'] != 'smtp':
6✔
944
            print('Please set the method to SMTP for the email reporter.')
6✔
945
            success = False
6✔
946

947
        smtp_auth = smtp_config['auth']
6✔
948
        if not smtp_auth:
6✔
949
            print('Authentication must be enabled for SMTP.')
6✔
950
            success = False
6✔
951

952
        smtp_hostname = smtp_config['host']
6✔
953
        if not smtp_hostname:
6✔
954
            print('Please configure the SMTP hostname in the config first.')
6✔
955
            success = False
6✔
956

957
        smtp_username = smtp_config['user'] or config['from']
6✔
958
        if not smtp_username:
6✔
959
            print('Please configure the SMTP user in the config first.')
6✔
960
            success = False
6✔
961

962
        if not success:
6✔
963
            self._exit(1)
6✔
964

965
        insecure_password = smtp_config['insecure_password']
2✔
966
        if insecure_password:
2!
967
            print('The SMTP password is set in the config file (key "insecure_password")')
2✔
968
        elif smtp_have_password(smtp_hostname, smtp_username):
×
969
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
970
            if not input(message).lower().startswith('y'):
×
971
                print('Password unchanged.')
×
972
            else:
973
                smtp_set_password(smtp_hostname, smtp_username)
×
974

975
        smtp_port = smtp_config['port']
2✔
976
        smtp_tls = smtp_config['starttls']
2✔
977

978
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
2✔
979
        print('Trying to log into the SMTP server...')
2✔
980
        mailer.send(None)
2✔
981
        print('Successfully logged into SMTP server')
×
982

983
        self._exit(0)
×
984

985
    def check_xmpp_login(self) -> None:
6✔
986
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
6✔
987

988
        success = True
6✔
989

990
        if not xmpp_config['enabled']:
6✔
991
            print('Please enable XMPP reporting in the config first.')
6✔
992
            success = False
6✔
993

994
        xmpp_sender = xmpp_config['sender']
6✔
995
        if not xmpp_sender:
6✔
996
            print('Please configure the XMPP sender in the config first.')
6✔
997
            success = False
6✔
998

999
        if not xmpp_config['recipient']:
6✔
1000
            print('Please configure the XMPP recipient in the config first.')
6✔
1001
            success = False
6✔
1002

1003
        if not success:
6✔
1004
            self._exit(1)
6✔
1005

1006
        if 'insecure_password' in xmpp_config:
6!
1007
            print('The XMPP password is already set in the config (key "insecure_password").')
6✔
1008
            self._exit(0)
6✔
1009

1010
        if xmpp_have_password(xmpp_sender):
×
1011
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
1012
            if input(message).lower() != 'y':
×
1013
                print('Password unchanged.')
×
1014
                self._exit(0)
×
1015

1016
        if success:
×
1017
            xmpp_set_password(xmpp_sender)
×
1018

1019
        self._exit(0)
×
1020

1021
    @staticmethod
1022
    def playwright_install_chrome() -> int:  # pragma: no cover
1023
        """
1024
        Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
1025
        install the browser executable.
1026

1027
        :return: Playwright's executable return code.
1028
        """
1029
        try:
1030
            from playwright._impl._driver import compute_driver_executable
1031
        except ImportError:  # pragma: no cover
1032
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
1033

1034
        driver_executable = compute_driver_executable()
1035
        env = os.environ.copy()
1036
        env['PW_CLI_TARGET_LANG'] = 'python'
1037
        cmd = [str(driver_executable), 'install', 'chrome']
1038
        logger.info(f"Running playwright CLI: {' '.join(cmd)}")
1039
        completed_process = subprocess.run(cmd, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
1040
        if completed_process.returncode:
1041
            print(completed_process.stderr)
1042
            return completed_process.returncode
1043
        if completed_process.stdout:
1044
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1045
        return 0
1046

1047
    def handle_actions(self) -> None:
6✔
1048
        """Handles the actions for command line arguments and exits."""
1049
        if self.urlwatch_config.list_jobs:
6✔
1050
            self.list_jobs(self.urlwatch_config.list_jobs)
6✔
1051
            self._exit(0)
6✔
1052

1053
        if self.urlwatch_config.errors:
6✔
1054
            self._exit(self.list_error_jobs())
6✔
1055

1056
        if self.urlwatch_config.test_job:
6✔
1057
            self.test_job(self.urlwatch_config.test_job)
6✔
1058
            self._exit(0)
6✔
1059

1060
        if self.urlwatch_config.test_differ:
6✔
1061
            self._exit(self.test_differ(self.urlwatch_config.test_differ))
6✔
1062

1063
        if self.urlwatch_config.dump_history:
6✔
1064
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
6✔
1065

1066
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
6✔
1067
            self._exit(self.modify_urls())
6✔
1068

1069
        if self.urlwatch_config.test_reporter:
6✔
1070
            self._exit(self.check_test_reporter())
6✔
1071

1072
        if self.urlwatch_config.smtp_login:
6✔
1073
            self.check_smtp_login()
6✔
1074

1075
        if self.urlwatch_config.telegram_chats:
6✔
1076
            self.check_telegram_chats()
6✔
1077

1078
        if self.urlwatch_config.xmpp_login:
6✔
1079
            self.check_xmpp_login()
6✔
1080

1081
        if self.urlwatch_config.edit:
6✔
1082
            self._exit(self.urlwatcher.jobs_storage.edit())
6✔
1083

1084
        if self.urlwatch_config.edit_config:
6✔
1085
            self._exit(self.edit_config())
6✔
1086

1087
        if self.urlwatch_config.edit_hooks:
6✔
1088
            self._exit(self.edit_hooks())
6✔
1089

1090
        if self.urlwatch_config.gc_database:
6✔
1091
            self.urlwatcher.ssdb_storage.gc(
6✔
1092
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1093
            )
1094
            self.urlwatcher.ssdb_storage.close()
6✔
1095
            self._exit(0)
6✔
1096

1097
        if self.urlwatch_config.clean_database:
6✔
1098
            self.urlwatcher.ssdb_storage.clean_ssdb(
6✔
1099
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1100
            )
1101
            self.urlwatcher.ssdb_storage.close()
6✔
1102
            self._exit(0)
6✔
1103

1104
        if self.urlwatch_config.rollback_database:
6✔
1105
            tz = self.urlwatcher.report.config['report']['tz']
6✔
1106
            self.urlwatcher.ssdb_storage.rollback_cache(self.urlwatch_config.rollback_database, tz)
6✔
1107
            self.urlwatcher.ssdb_storage.close()
6✔
1108
            self._exit(0)
6✔
1109

1110
        if self.urlwatch_config.delete_snapshot:
6✔
1111
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
6✔
1112

1113
        if self.urlwatch_config.features:
6✔
1114
            self._exit(self.show_features())
6✔
1115

1116
        if self.urlwatch_config.detailed_versions:
6!
1117
            self._exit(self.show_detailed_versions())
6✔
1118

1119
    def run(self) -> None:  # pragma: no cover
1120
        """The main run logic."""
1121
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1122
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1123

1124
        self.handle_actions()
1125

1126
        self.urlwatcher.run_jobs()
1127

1128
        self.urlwatcher.close()
1129

1130
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc