• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 11992535834

24 Nov 2024 02:51AM UTC coverage: 75.597%. First build
11992535834

push

github

mborsetti
Version 3.27.0b2

1703 of 2575 branches covered (66.14%)

Branch coverage included in aggregate %.

29 of 40 new or added lines in 6 files covered. (72.5%)

4505 of 5637 relevant lines covered (79.92%)

6.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.87
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import difflib
8✔
8
import email.utils
8✔
9
import gc
8✔
10
import importlib.metadata
8✔
11
import logging
8✔
12
import os
8✔
13
import platform
8✔
14
import re
8✔
15
import shutil
8✔
16
import sqlite3
8✔
17
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
18
import sys
8✔
19
import time
8✔
20
import traceback
8✔
21
from concurrent.futures import ThreadPoolExecutor
8✔
22
from contextlib import ExitStack
8✔
23
from datetime import datetime
8✔
24
from pathlib import Path
8✔
25
from typing import Iterable, Iterator, TYPE_CHECKING
8✔
26
from urllib.parse import unquote_plus
8✔
27
from zoneinfo import ZoneInfo
8✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
8✔
30
from webchanges.differs import DifferBase
8✔
31
from webchanges.filters import FilterBase
8✔
32
from webchanges.handler import JobState, Report
8✔
33
from webchanges.jobs import JobBase, NotModifiedError, UrlJob
8✔
34
from webchanges.mailer import smtp_have_password, smtp_set_password, SMTPMailer
8✔
35
from webchanges.reporters import ReporterBase, xmpp_have_password, xmpp_set_password
8✔
36
from webchanges.util import dur_text, edit_file, import_module_from_source
8✔
37

38
try:
8✔
39
    import httpx
8✔
40
except ImportError:  # pragma: no cover
41
    httpx = None  # type: ignore[assignment]
42
    print("Required package 'httpx' not found; will attempt to run using 'requests'")
43
    try:
44
        import requests
45
    except ImportError as e:  # pragma: no cover
46
        raise RuntimeError(
47
            f"A Python HTTP client package (either 'httpx' or 'requests' is required to run {__project_name__}; "
48
            'neither can be imported.'
49
        ) from e
50
if httpx is not None:
8!
51
    try:
8✔
52
        import h2
8✔
53
    except ImportError:  # pragma: no cover
54
        h2 = None  # type: ignore[assignment]
55

56
try:
8✔
57
    import apt
8✔
58
except ImportError:  # pragma: no cover
59
    apt = None  # type: ignore[assignment]
60

61
try:
8✔
62
    from pip._internal.metadata import get_default_environment
8✔
63
except ImportError:  # pragma: no cover
64
    get_default_environment = None  # type: ignore[assignment]
65

66
try:
8✔
67
    from playwright.sync_api import sync_playwright
8✔
68
except ImportError:  # pragma: no cover
69
    sync_playwright = None  # type: ignore[assignment]
70

71
try:
8✔
72
    import psutil
8✔
73
    from psutil._common import bytes2human
8✔
74
except ImportError:  # pragma: no cover
75
    psutil = None  # type: ignore[assignment]
76
    bytes2human = None  # type: ignore[assignment]
77

78
logger = logging.getLogger(__name__)
8✔
79

80
if TYPE_CHECKING:
81
    from webchanges.main import Urlwatch
82
    from webchanges.reporters import _ConfigReportersList
83
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
84

85

86
class UrlwatchCommand:
8✔
87
    """The class that runs the program after initialization and CLI arguments parsing."""
88

89
    def __init__(self, urlwatcher: Urlwatch) -> None:
8✔
90
        self.urlwatcher = urlwatcher
8✔
91
        self.urlwatch_config = urlwatcher.urlwatch_config
8✔
92

93
    @staticmethod
8✔
94
    def _exit(arg: str | int | None) -> None:
8✔
95
        logger.info(f'Exiting with exit code {arg}')
8✔
96
        sys.exit(arg)
8✔
97

98
    def jobs_from_joblist(self) -> Iterator[JobBase]:
8✔
99
        """Generates the jobs to process from the joblist entered in the CLI."""
100
        if self.urlwatcher.urlwatch_config.joblist:
8✔
101
            jobs = {self._find_job(job_entry) for job_entry in self.urlwatcher.urlwatch_config.joblist}
8!
102
            enabled_jobs = {job for job in jobs if job.is_enabled()}
8!
103
            disabled = len(enabled_jobs) - len(jobs)
8✔
104
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
105
            logger.debug(
8✔
106
                f"Processing {len(enabled_jobs)} job{'s' if len(enabled_jobs) else ''}{disabled_str} as specified in "
107
                f"command line: {', '.join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}"
108
            )
109
        else:
110
            enabled_jobs = {job for job in self.urlwatcher.jobs if job.is_enabled()}
8!
111
            disabled = len(enabled_jobs) - len(self.urlwatcher.jobs)
8✔
112
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
113
            logger.debug(f"Processing {len(enabled_jobs)} job{'s' if len(enabled_jobs) else ''}{disabled_str}")
8✔
114
        for job in enabled_jobs:
8✔
115
            yield job.with_defaults(self.urlwatcher.config_storage.config)
8✔
116

117
    def edit_hooks(self) -> int:
8✔
118
        """Edit hooks file.
119

120
        :returns: 0 if edit is successful, 1 otherwise.
121
        """
122
        # Similar code to BaseTextualFileStorage.edit()
123
        for hooks_file in self.urlwatch_config.hooks_files:
8✔
124
            logger.debug(f'Edit file {hooks_file}')
8✔
125
            # Python 3.9: hooks_edit = self.urlwatch_config.hooks.with_stem(self.urlwatch_config.hooks.stem + '_edit')
126
            hooks_edit = hooks_file.parent.joinpath(hooks_file.stem + '_edit' + ''.join(hooks_file.suffixes))
8✔
127
            if hooks_file.exists():
8!
128
                shutil.copy(hooks_file, hooks_edit)
8✔
129
            # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
130
            #         self.urlwatch_config.hooks_py_example):
131
            #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
132

133
            while True:
8✔
134
                try:
8✔
135
                    edit_file(hooks_edit)
8✔
136
                    import_module_from_source('hooks', hooks_edit)
8✔
137
                    break  # stop if no exception on parser
8✔
138
                except SystemExit:
8!
139
                    raise
×
140
                except Exception as e:
8✔
141
                    print('Parsing failed:')
8✔
142
                    print('======')
8✔
143
                    print(e)
8✔
144
                    print('======')
8✔
145
                    print('')
8✔
146
                    print(f'The file {hooks_file} was NOT updated.')
8✔
147
                    user_input = input('Do you want to retry the same edit? (Y/n)')
8✔
148
                    if not user_input or user_input.lower()[0] == 'y':
×
149
                        continue
×
150
                    hooks_edit.unlink()
×
151
                    print('No changes have been saved.')
×
152
                    return 1
×
153

154
            if hooks_file.is_symlink():
8!
155
                hooks_file.write_text(hooks_edit.read_text())
×
156
            else:
157
                hooks_edit.replace(hooks_file)
8✔
158
            hooks_edit.unlink(missing_ok=True)
8✔
159
            print(f'Saved edits in {hooks_file}')
8✔
160

161
        return 0
8✔
162

163
    @staticmethod
8✔
164
    def show_features() -> int:
8✔
165
        """
166
        Prints the "features", i.e. a list of job types, filters and reporters.
167

168
        :return: 0.
169
        """
170
        print(f'Please see full documentation at {__docs_url__}')
8✔
171
        print()
8✔
172
        print('Supported jobs:\n')
8✔
173
        print(JobBase.job_documentation())
8✔
174
        print('Supported filters:\n')
8✔
175
        print(FilterBase.filter_documentation())
8✔
176
        print()
8✔
177
        print('Supported differs:\n')
8✔
178
        print(DifferBase.differ_documentation())
8✔
179
        print()
8✔
180
        print('Supported reporters:\n')
8✔
181
        print(ReporterBase.reporter_documentation())
8✔
182
        print()
8✔
183
        print(f'Please see full documentation at {__docs_url__}')
8✔
184

185
        return 0
8✔
186

187
    @staticmethod
8✔
188
    def show_detailed_versions() -> int:
8✔
189
        """
190
        Prints the detailed versions, including of dependencies.
191

192
        :return: 0.
193
        """
194

195
        def dependencies() -> list[str]:
8✔
196
            if get_default_environment is not None:
8!
197
                env = get_default_environment()
8✔
198
                dist = None
8✔
199
                for dist in env.iter_all_distributions():
8✔
200
                    if dist.canonical_name == __project_name__:
8!
201
                        break
×
202
                if dist and dist.canonical_name == __project_name__:
8!
203
                    return sorted(set(d.split()[0] for d in dist.metadata_dict['requires_dist']), key=str.lower)
×
204

205
            # default list of all possible dependencies
206
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
8✔
207
            return [
8✔
208
                'aioxmpp',
209
                'beautifulsoup4',
210
                'chump',
211
                'colorama',
212
                'cryptography',
213
                'cssbeautifier',
214
                'cssselect',
215
                'deepdiff',
216
                'h2',
217
                'html2text',
218
                'httpx',
219
                'jq',
220
                'jsbeautifier',
221
                'keyring',
222
                'lxml',
223
                'markdown2',
224
                'matrix_client',
225
                'msgpack',
226
                'pdftotext',
227
                'Pillow',
228
                'platformdirs',
229
                'playwright',
230
                'psutil',
231
                'pushbullet.py',
232
                'pypdf',
233
                'pytesseract',
234
                'pyyaml',
235
                'redis',
236
                'requests',
237
                'tzdata',
238
                'vobject',
239
            ]
240

241
        print('Software:')
8✔
242
        print(f'• {__project_name__}: {__version__}')
8✔
243
        print(
8✔
244
            f'• {platform.python_implementation()}: {platform.python_version()} '
245
            f'{platform.python_build()} {platform.python_compiler()}'
246
        )
247
        print(f'• SQLite: {sqlite3.sqlite_version}')
8✔
248

249
        if psutil:
8!
250
            print()
8✔
251
            print('System:')
8✔
252
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
8✔
253
            print(f'• Processor: {platform.processor()}')
8✔
254
            print(f'• CPUs (logical): {psutil.cpu_count()}')
8✔
255
            try:
8✔
256
                virt_mem = psutil.virtual_memory().available
8✔
257
                print(
8✔
258
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
259
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
260
                )
261
            except psutil.Error as e:  # pragma: no cover
262
                print(f'• Free memory: Could not read information: {e}')
263
            print(
8✔
264
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
265
                f"({100 - psutil.disk_usage('/').percent:.1f}%)"
266
            )
267
            executor = ThreadPoolExecutor()
8✔
268
            print(f'• --max-threads default: {executor._max_workers}')
8✔
269

270
        print()
8✔
271
        print('Installed PyPi dependencies:')
8✔
272
        for module_name in dependencies():
8✔
273
            try:
8✔
274
                mod = importlib.metadata.distribution(module_name)
8✔
275
            except ModuleNotFoundError:
8✔
276
                continue
8✔
277
            print(f'• {module_name}: {mod.version}')
8✔
278
            # package requirements
279
            if mod.requires:
8✔
280
                for req_name in [i.split()[0] for i in mod.requires]:
8!
281
                    try:
8✔
282
                        req = importlib.metadata.distribution(req_name)
8✔
283
                    except ModuleNotFoundError:
8✔
284
                        continue
8✔
285
                    print(f'  - {req_name}: {req.version}')
8✔
286

287
        # playwright
288
        if sync_playwright is not None:
8!
289
            with sync_playwright() as p:
8!
290
                browser = p.chromium.launch(channel='chrome')
×
291
                print()
×
292
                print('Playwright browser:')
×
293
                print(f'• Name: {browser.browser_type.name}')
×
294
                print(f'• Version: {browser.version}')
×
295
                if psutil:
×
296
                    browser.new_page()
×
297
                    try:
×
298
                        virt_mem = psutil.virtual_memory().available
×
299
                        print(
×
300
                            f'• Free memory with browser loaded: {bytes2human(virt_mem)} physical plus '
301
                            f'{bytes2human(psutil.swap_memory().free)} swap'
302
                        )
303
                    except psutil.Error:
×
304
                        pass
×
305

306
        if os.name == 'posix' and apt:
8!
307
            apt_cache = apt.Cache()
×
308

309
            def print_version(libs: list[str]) -> None:
×
310
                for lib in libs:
×
311
                    if lib in apt_cache:
×
312
                        if ver := apt_cache[lib].versions:
×
313
                            print(f'   - {ver[0].package}: {ver[0].version}')
×
314
                return None
×
315

316
            print()
×
317
            print('Installed dpkg dependencies:')
×
318
            for module, apt_dists in (
×
319
                ('jq', ['jq']),
320
                # https://github.com/jalan/pdftotext#os-dependencies
321
                ('pdftotext', ['libpoppler-cpp-dev']),
322
                # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
323
                (
324
                    'Pillow',
325
                    [
326
                        'libjpeg-dev',
327
                        'zlib-dev',
328
                        'zlib1g-dev',
329
                        'libtiff-dev',
330
                        'libfreetype-dev',
331
                        'littlecms-dev',
332
                        'libwebp-dev',
333
                        'tcl/tk-dev',
334
                        'openjpeg-dev',
335
                        'libimagequant-dev',
336
                        'libraqm-dev',
337
                        'libxcb-dev',
338
                        'libxcb1-dev',
339
                    ],
340
                ),
341
                ('playwright', ['google-chrome-stable']),
342
                # https://tesseract-ocr.github.io/tessdoc/Installation.html
343
                ('pytesseract', ['tesseract-ocr']),
344
            ):
345
                try:
×
346
                    importlib.metadata.distribution(module)
×
347
                    print(f'• {module}')
×
348
                    print_version(apt_dists)
×
349
                except importlib.metadata.PackageNotFoundError:
×
350
                    pass
×
351
        return 0
8✔
352

353
    def list_jobs(self, regex: bool | str) -> None:
8✔
354
        """
355
        Lists the job and their respective _index_number.
356

357
        :return: None.
358
        """
359
        if isinstance(regex, str):
8!
360
            print(f"List of jobs matching the RegEx '{regex}':")
×
361
        else:
362
            print('List of jobs:')
8✔
363
        for job in self.urlwatcher.jobs:
8✔
364
            if self.urlwatch_config.verbose:
8✔
365
                job_desc = f'{job.index_number:3}: {job!r}'
8✔
366
            else:
367
                pretty_name = job.pretty_name()
8✔
368
                location = job.get_location()
8✔
369
                if pretty_name != location:
8!
370
                    job_desc = f'{job.index_number:3}: {pretty_name} ({location})'
8✔
371
                else:
372
                    job_desc = f'{job.index_number:3}: {pretty_name}'
×
373
            if isinstance(regex, bool) or re.findall(regex, job_desc):
8!
374
                print(job_desc)
8✔
375

376
        if len(self.urlwatch_config.jobs_files) > 1:
8✔
377
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
8!
378
        elif len(self.urlwatch_config.jobs_files) == 1:
8✔
379
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
8✔
380
        else:
381
            jobs_files = []
8✔
382
        print('\n   '.join(jobs_files))
8✔
383

384
    def _find_job(self, query: str | int) -> JobBase:
8✔
385
        """Finds the job based on a query, which is matched to the job index (also negative) or a job location
386
        (i.e. the url/user_visible_url or command).
387

388
        :param query: The query.
389
        :return: The matching JobBase.
390
        :raises IndexError: If job is not found.
391
        """
392
        if isinstance(query, int):
8✔
393
            index = query
8✔
394
        else:
395
            try:
8✔
396
                index = int(query)
8✔
397
            except ValueError:
8✔
398
                query = unquote_plus(query)
8✔
399
                try:
8✔
400
                    return next((job for job in self.urlwatcher.jobs if unquote_plus(job.get_location()) == query))
8✔
401
                except StopIteration:
8✔
402
                    raise ValueError(f"Job {query} does not match any job's url/user_visible_url or command.") from None
8✔
403

404
        if index == 0:
8✔
405
            raise ValueError(f'Job index {index} out of range.')
8✔
406
        try:
8✔
407
            if index <= 0:
8✔
408
                return self.urlwatcher.jobs[index]
8✔
409
            else:
410
                return self.urlwatcher.jobs[index - 1]
8✔
411
        except IndexError as e:
8✔
412
            raise ValueError(f'Job index {index} out of range (found {len(self.urlwatcher.jobs)} jobs).') from e
8✔
413

414
    def _find_job_with_defaults(self, query: str | int) -> JobBase:
8✔
415
        """
416
        Returns the job with defaults based on job_id, which could match an index or match a location
417
        (url/user_visible_url or command). Accepts negative numbers.
418

419
        :param query: The query.
420
        :return: The matching JobBase with defaults.
421
        :raises SystemExit: If job is not found.
422
        """
423
        job = self._find_job(query)
8✔
424
        return job.with_defaults(self.urlwatcher.config_storage.config)
8✔
425

426
    def test_job(self, job_id: bool | str | int) -> None:
8✔
427
        """
428
        Tests the running of a single job outputting the filtered text to stdout. If job_id is True, don't run any
429
        jobs but load config, jobs and hook files to trigger any syntax errors.
430

431
        :param job_id: The job_id or True.
432

433
        :return: None.
434

435
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
436
        """
437
        if job_id is True:  # Load to trigger any eventual syntax errors
8✔
438
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
8✔
439
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
8✔
440
            if len(self.urlwatch_config.jobs_files) == 1:
8✔
441
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]},')
8✔
442
            else:
443
                message.append(
8!
444
                    '\n   '.join(
445
                        [f'{conj}jobs files'] + [f'• {file},' for file in sorted(self.urlwatch_config.jobs_files)]
446
                    )
447
                )
448
            if 'hooks' in sys.modules:
8!
449
                message.append(f"\nand hooks file {sys.modules['hooks'].__file__}")
8✔
450
            print(f"{''.join(message)}.")
8✔
451
            return
8✔
452

453
        job = self._find_job_with_defaults(job_id)
8✔
454
        start = time.perf_counter()
8✔
455

456
        if isinstance(job, UrlJob):
8!
457
            # Force re-retrieval of job, as we're testing filters
458
            job.ignore_cached = True
×
459

460
        # Add defaults, as if when run
461
        job = job.with_defaults(self.urlwatcher.config_storage.config)
8✔
462

463
        with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
464
            job_state.process(headless=not self.urlwatch_config.no_headless)
8✔
465
            duration = time.perf_counter() - start
8✔
466
            if job_state.exception is not None:
8!
467
                raise job_state.exception from job_state.exception
×
468
            output = [
8✔
469
                job_state.job.pretty_name(),
470
                ('-' * len(job_state.job.pretty_name())),
471
            ]
472
            if job_state.job.note:
8!
473
                output.append(job_state.job.note)
×
474
            output.extend(
8✔
475
                [
476
                    '',
477
                    str(job_state.new_data),
478
                    '',
479
                    '--',
480
                    f'Job tested in {dur_text(duration)} with {__project_name__} {__version__}.',
481
                ]
482
            )
483
            print('\n'.join(output))
8✔
484
        return
8✔
485

486
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
487
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
488

489
    def test_differ(self, arg_test_differ: list[str]) -> int:
8✔
490
        """
491
        Runs diffs for a job on all the saved snapshots and outputs the result to stdout or whatever reporter is
492
        selected with --test-reporter.
493

494
        :param arg_test_differ: Either the job_id or a list containing [job_id, max_diffs]
495
        :return: 1 if error, 0 if successful.
496
        """
497
        report = Report(self.urlwatcher)
8✔
498
        self.urlwatch_config.jobs_files = [Path('--test-differ')]  # for report footer
8✔
499
        if len(arg_test_differ) == 1:
8✔
500
            job_id = arg_test_differ[0]
8✔
501
            max_diffs = None
8✔
502
        elif len(arg_test_differ) == 2:
8!
503
            job_id, max_diffs_str = arg_test_differ
8✔
504
            max_diffs = int(max_diffs_str)
8✔
505
        else:
506
            raise ValueError('--test-differ takes a maximum of two arguments')
×
507

508
        job = self._find_job_with_defaults(job_id)
8✔
509

510
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid())
8✔
511

512
        num_snapshots = len(history_data)
8✔
513
        if num_snapshots == 0:
8✔
514
            print('This job has never been run before.')
8✔
515
            return 1
8✔
516
        elif num_snapshots < 2:
8✔
517
            print('Not enough historic data available (need at least 2 different snapshots).')
8✔
518
            return 1
8✔
519

520
        if job.compared_versions and job.compared_versions != 1:
8!
521
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
522

523
        max_diffs = max_diffs or num_snapshots - 1
8✔
524
        for i in range(max_diffs):
8✔
525
            with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
526
                job_state.new_data = history_data[i].data
8✔
527
                job_state.new_timestamp = history_data[i].timestamp
8✔
528
                job_state.new_etag = history_data[i].etag
8✔
529
                job_state.new_mime_type = history_data[i].mime_type
8✔
530
                if not job.compared_versions or job.compared_versions == 1:
8!
531
                    job_state.old_data = history_data[i + 1].data
8✔
532
                    job_state.old_timestamp = history_data[i + 1].timestamp
8✔
533
                    job_state.old_etag = history_data[i + 1].etag
8✔
534
                    job_state.old_mime_type = history_data[i + 1].mime_type
8✔
535
                else:
536
                    history_dic_snapshots = {s.data: s for s in history_data[i + 1 : i + 1 + job.compared_versions]}
×
537
                    close_matches: list[str] = difflib.get_close_matches(
×
538
                        str(job_state.new_data), history_dic_snapshots.keys(), n=1  # type: ignore[arg-type]
539
                    )
540
                    if close_matches:
×
541
                        job_state.old_data = close_matches[0]
×
542
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
543
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
544
                        job_state.old_mime_type = history_dic_snapshots[close_matches[0]].mime_type
×
545

546
                if self.urlwatch_config.test_reporter is None:
8✔
547
                    self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
548
                report.job_states = []  # required
8✔
549
                if job_state.new_data == job_state.old_data:
8!
550
                    label = (
×
551
                        f'No change (snapshots {-i:2} AND {-(i + 1):2}) with '
552
                        f"'compared_versions: {job.compared_versions}'"
553
                    )
554
                    job_state.verb = 'changed,no_report'
×
555
                else:
556
                    label = f'Filtered diff (snapshots {-i:2} and {-(i + 1):2})'
8✔
557
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
8✔
558
                if errorlevel:
8!
559
                    self._exit(errorlevel)
×
560

561
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
562
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
563

564
        return 0
8✔
565

566
    def dump_history(self, job_id: str) -> int:
8✔
567
        """
568
        Displays the historical data stored in the snapshot database for a job.
569

570
        :param job_id: The Job ID.
571
        :return: An argument to be used in sys.exit.
572
        """
573

574
        job = self._find_job_with_defaults(job_id)
8✔
575
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid())
8✔
576

577
        print(f'History for job {job.get_indexed_location()}:')
8✔
578
        print(f'(ID: {job.get_guid()})')
8✔
579
        total_failed = 0
8✔
580
        if history_data:
8✔
581
            print('=' * 50)
8✔
582
        for i, snapshot in enumerate(history_data):
8✔
583
            mime_type = f'; {snapshot.mime_type}' if snapshot.mime_type else ''
8✔
584
            etag = f'; ETag: {snapshot.etag}' if snapshot.etag else ''
8✔
585
            tries = f'; error run (number {snapshot.tries})' if snapshot.tries else ''
8✔
586
            total_failed += snapshot.tries > 0
8✔
587
            tz = self.urlwatcher.report.config['report']['tz']
8✔
588
            tzinfo = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
8✔
589
            dt = datetime.fromtimestamp(snapshot.timestamp, tzinfo)
8✔
590
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{mime_type}{etag}{tries}'
8✔
591
            sep_len = max(50, len(header))
8✔
592
            print(header)
8✔
593
            print('-' * sep_len)
8✔
594
            if snapshot.error_data:
8!
595
                print(f"{snapshot.error_data['type']}: {snapshot.error_data['message']}")
×
596
                print()
×
597
                print('Last good data:')
×
598
            print(snapshot.data)
8✔
599
            print('=' * sep_len, '\n')
8✔
600

601
        print(
8✔
602
            f'Found {len(history_data) - total_failed}'
603
            + (' good' if total_failed else '')
604
            + ' snapshot'
605
            + ('s' if len(history_data) - total_failed != 1 else '')
606
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
607
            + '.'
608
        )
609

610
        return 0
8✔
611

612
    def list_error_jobs(self) -> int:
8✔
613
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
8✔
614
            print(f'Invalid reporter {self.urlwatch_config.errors}')
8✔
615
            return 1
8✔
616

617
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Iterator[str]:
8✔
618
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
619

620
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
621
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
622
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
623
            """
624

625
            def job_runner(
8✔
626
                stack: ExitStack,
627
                jobs: Iterable[JobBase],
628
                max_workers: int | None = None,
629
            ) -> Iterator[str]:
630
                """
631
                Modified worker.job_runner that yields error text for jobs who fail with an exception or yield no data.
632

633
                :param stack: The context manager.
634
                :param jobs: The jobs to run.
635
                :param max_workers: The number of maximum workers for ThreadPoolExecutor.
636
                :return: error text for jobs who fail with an exception or yield no data.
637
                """
638
                executor = ThreadPoolExecutor(max_workers=max_workers)
8✔
639

640
                for job_state in executor.map(
8✔
641
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
642
                    (stack.enter_context(JobState(self.urlwatcher.ssdb_storage, job)) for job in jobs),
643
                ):
644
                    if job_state.exception is None or isinstance(job_state.exception, NotModifiedError):
8✔
645
                        if (
8!
646
                            len(job_state.new_data.strip()) == 0
647
                            if hasattr(job_state, 'new_data')
648
                            else len(job_state.old_data.strip()) == 0
649
                        ):
650
                            if self.urlwatch_config.verbose:
×
651
                                yield f'{job_state.job.index_number:3}: No data: {job_state.job!r}'
×
652
                            else:
653
                                pretty_name = job_state.job.pretty_name()
×
654
                                location = job_state.job.get_location()
×
655
                                if pretty_name != location:
×
656
                                    yield f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})'
×
657
                                else:
658
                                    yield f'{job_state.job.index_number:3}: No data: {pretty_name}'
×
659
                    else:
660
                        pretty_name = job_state.job.pretty_name()
8✔
661
                        location = job_state.job.get_location()
8✔
662
                        if pretty_name != location:
8!
663
                            yield (
8✔
664
                                f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
665
                                f'({location})'
666
                            )
667
                        else:
668
                            yield f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})'
×
669

670
            with ExitStack() as stack:
8✔
671
                # This code is from worker.run_jobs, modified to yield from job_runner.
672
                from webchanges.worker import get_virt_mem  # avoid circular imports
8✔
673

674
                # run non-BrowserJob jobs first
675
                jobs_to_run = [job for job in jobs if not job.__is_browser__]
8!
676
                if jobs_to_run:
8!
677
                    logger.debug(
8✔
678
                        "Running jobs that do not require Chrome (without 'use_browser: true') in parallel with "
679
                        "Python's default max_workers."
680
                    )
681
                    yield from job_runner(stack, jobs_to_run, self.urlwatch_config.max_workers)
8✔
682
                else:
NEW
683
                    logger.debug("Found no jobs that do not require Chrome (i.e. without 'use_browser: true').")
×
684

685
                # run BrowserJob jobs after
686
                jobs_to_run = [job for job in jobs if job.__is_browser__]
8!
687
                if jobs_to_run:
8!
NEW
688
                    gc.collect()
×
NEW
689
                    virt_mem = get_virt_mem()
×
NEW
690
                    if self.urlwatch_config.max_workers:
×
NEW
691
                        max_workers = self.urlwatch_config.max_workers
×
692
                    else:
NEW
693
                        max_workers = max(int(virt_mem / 200e6), 1)
×
NEW
694
                        max_workers = min(max_workers, os.cpu_count() or 1)
×
NEW
695
                    logger.debug(
×
696
                        f"Running jobs that require Chrome (i.e. with 'use_browser: true') in parallel with "
697
                        f'{max_workers} max_workers.'
698
                    )
NEW
699
                    yield from job_runner(stack, jobs_to_run, max_workers)
×
700
                else:
701
                    logger.debug("Found no jobs that require Chrome (i.e. with 'use_browser: true').")
8✔
702

703
        start = time.perf_counter()
8✔
704
        if len(self.urlwatch_config.jobs_files) == 1:
8!
705
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
8✔
706
        else:
707
            jobs_files = ['in the concatenation of the jobs files'] + [
×
708
                f'• {file},' for file in self.urlwatch_config.jobs_files
709
            ]
710
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)'] + jobs_files)
8✔
711

712
        jobs = {
8!
713
            job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs if job.is_enabled()
714
        }
715
        if self.urlwatch_config.errors == 'stdout':
8!
716
            print(header)
8✔
717
            for line in error_jobs_lines(jobs):
8✔
718
                print(line)
8✔
719
            print('--')
8✔
720
            duration = time.perf_counter() - start
8✔
721
            print(f"Checked {len(jobs)} enabled job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
8✔
722

723
        else:
724
            message = '\n'.join(error_jobs_lines(jobs))
×
725
            if message:
×
726
                # create a dummy job state to run a reporter on
727
                job_state = JobState(
×
728
                    None,  # type: ignore[arg-type]
729
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
730
                )
731
                job_state.traceback = f'{header}\n{message}'
×
732
                duration = time.perf_counter() - start
×
733
                self.urlwatcher.report.config['footnote'] = (
×
734
                    f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}."
735
                )
736
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
737
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
738
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
739
                self.urlwatcher.report.error(job_state)
×
740
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
741
            else:
742
                print(header)
×
743
                print('--')
×
744
                duration = time.perf_counter() - start
×
745
                print('Found no errors')
×
746
                print(f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
×
747

748
        return 0
8✔
749

750
    def delete_snapshot(self, job_id: str | int) -> int:
8✔
751
        job = self._find_job_with_defaults(job_id)
8✔
752

753
        deleted = self.urlwatcher.ssdb_storage.delete_latest(job.get_guid())
8✔
754
        if deleted:
8✔
755
            print(f'Deleted last snapshot of {job.get_indexed_location()}')
8✔
756
            return 0
8✔
757
        else:
758
            print(f'No snapshots found to be deleted for {job.get_indexed_location()}')
8✔
759
            return 1
8✔
760

761
    def modify_urls(self) -> int:
8✔
762
        if self.urlwatch_config.delete is not None:
8✔
763
            job = self._find_job(self.urlwatch_config.delete)
8✔
764
            if job is not None:
8!
765
                self.urlwatcher.jobs.remove(job)
8✔
766
                print(f'Removed {job}')
8✔
767
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
768
            else:
769
                print(f'Job not found: {self.urlwatch_config.delete}')
×
770
                return 1
×
771

772
        if self.urlwatch_config.add is not None:
8✔
773
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
774
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
8!
775
            filters = [v for k, v in items if k == 'filter']
8!
776
            items2 = [(k, v) for k, v in items if k != 'filter']
8!
777
            d = {k: v for k, v in items2}
8!
778
            if filters:
8!
779
                d['filter'] = ','.join(filters)
×
780

781
            job = JobBase.unserialize(d)
8✔
782
            print(f'Adding {job}')
8✔
783
            self.urlwatcher.jobs.append(job)
8✔
784
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
785

786
        if self.urlwatch_config.change_location is not None:
8✔
787
            new_loc = self.urlwatch_config.change_location[1]
8✔
788
            # Ensure the user isn't overwriting an existing job with the change.
789
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
8!
790
                print(
×
791
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
792
                    f'different value.\n'
793
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
794
                )
795
                return 1
×
796
            else:
797
                job = self._find_job(self.urlwatch_config.change_location[0])
8✔
798
                if job is not None:
8!
799
                    # Update the job's location (which will also update the guid) and move any history in the database
800
                    # over to the job's updated guid.
801
                    old_loc = job.get_location()
8✔
802
                    print(f'Moving location of "{old_loc}" to "{new_loc}"')
8✔
803
                    old_guid = job.get_guid()
8✔
804
                    if old_guid not in self.urlwatcher.ssdb_storage.get_guids():
8✔
805
                        print(f'No snapshots found for "{old_loc}"')
8✔
806
                        return 1
8✔
807
                    job.set_base_location(new_loc)
8✔
808
                    num_searched = self.urlwatcher.ssdb_storage.move(old_guid, job.get_guid())
8✔
809
                    if num_searched:
8!
810
                        print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}"')
8✔
811
                else:
812
                    print(f'Job not found: "{self.urlwatch_config.change_location[0]}"')
×
813
                    return 1
×
814
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
8✔
815
            if not input(message).lower().startswith('y'):
8!
816
                print(f'Please manually update the jobs file by replacing "{old_loc}" with "{new_loc}".')
×
817
            else:
818
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
819

820
        return 0
8✔
821

822
    def edit_config(self) -> int:
8✔
823
        result = self.urlwatcher.config_storage.edit()
8✔
824
        return result
8✔
825

826
    def check_telegram_chats(self) -> None:
8✔
827
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
8✔
828

829
        bot_token = config['bot_token']
8✔
830
        if not bot_token:
8✔
831
            print('You need to set up your bot token first (see documentation)')
8✔
832
            self._exit(1)
8✔
833

834
        if httpx:
8!
835
            get_client = httpx.Client(http2=h2 is not None).get  # noqa: S113 Call to httpx without timeout
8✔
836
        else:
837
            get_client = requests.get  # type: ignore[assignment]
×
838

839
        info = get_client(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
8✔
840
        if not info['ok']:
8!
841
            print(f"Error with token {bot_token}: {info['description']}")
8✔
842
            self._exit(1)
8✔
843

844
        chats = {}
×
845
        updates = get_client(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
846
        if 'result' in updates:
×
847
            for chat_info in updates['result']:
×
848
                chat = chat_info['message']['chat']
×
849
                if chat['type'] == 'private':
×
850
                    chats[chat['id']] = (
×
851
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
852
                    )
853

854
        if not chats:
×
855
            print(f"No chats found. Say hello to your bot at https://t.me/{info['result']['username']}")
×
856
            self._exit(1)
×
857

858
        headers = ('Chat ID', 'Name')
×
859
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
860
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
861
        fmt = f'%-{maxchat}s  %s'
×
862
        print(fmt % headers)
×
863
        print(fmt % ('-' * maxchat, '-' * maxname))
×
864
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
865
            print(fmt % (k, v))
×
866
        print(f"\nChat up your bot here: https://t.me/{info['result']['username']}")
×
867

868
        self._exit(0)
×
869

870
    def check_test_reporter(
8✔
871
        self,
872
        job_state: JobState | None = None,
873
        label: str = 'test',  # type: ignore[assignment]
874
        report: Report | None = None,
875
    ) -> int:
876
        """
877
        Tests a reporter by creating pseudo-jobs of new, changed, unchanged, and error outcomes ('verb').
878

879
        Note: The report will only show new, unchanged and error content if enabled in the respective `display` keys
880
        of the configuration file.
881

882
        :param job_state: The JobState (Optional).
883
        :param label: The label to be used in the report; defaults to 'test'.
884
        :param report: A Report class to use for testing (Optional).
885
        :return: 0 if successful, 1 otherwise.
886
        """
887

888
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
8✔
889
            """Builds a pseudo-job for the reporter to run on."""
890
            job = JobBase.unserialize({'name': job_name, 'url': url})
8✔
891

892
            # Can pass in None for ssdb_storage, as we are not going to load or save the job state for
893
            # testing; also no need to use it as context manager, since no processing is called on the job
894
            job_state = JobState(None, job)  # type: ignore[arg-type]
8✔
895

896
            job_state.old_data = old
8✔
897
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
8✔
898
            job_state.new_data = new
8✔
899
            job_state.new_timestamp = time.time()
8✔
900

901
            return job_state
8✔
902

903
        def set_error(job_state: 'JobState', message: str) -> JobState:
8✔
904
            """Sets a job error message on a JobState."""
905
            try:
8✔
906
                raise ValueError(message)
8✔
907
            except ValueError as e:
8✔
908
                job_state.exception = e
8✔
909
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
910

911
            return job_state
8✔
912

913
        reporter_name = self.urlwatch_config.test_reporter
8✔
914
        if reporter_name not in ReporterBase.__subclasses__:
8✔
915
            print(f'No such reporter: {reporter_name}')
8✔
916
            print(f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}\n')
8✔
917
            return 1
8✔
918

919
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][
8✔
920
            reporter_name  # type: ignore[literal-required]
921
        ]
922
        if job_state:  # we want a full report
8✔
923
            cfg['enabled'] = True
8✔
924
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
8✔
925
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
8✔
926
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
8✔
927
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
8✔
928
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
8✔
929
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
8✔
930
        if not cfg['enabled']:
8✔
931
            print(f'WARNING: Reporter being tested is not enabled: {reporter_name}')
8✔
932
            print('Will still attempt to test it, but this may not work')
8✔
933
            print(f'Use {__project_name__} --edit-config to configure reporters')
8✔
934
            cfg['enabled'] = True
8✔
935

936
        if report is None:
8✔
937
            report = Report(self.urlwatcher)
8✔
938

939
        if job_state:
8✔
940
            report.custom(job_state, label)  # type: ignore[arg-type]
8✔
941
        else:
942
            report.new(
8✔
943
                build_job(
944
                    'Sample job that was newly added',
945
                    'https://example.com/new',
946
                    '',
947
                    '',
948
                )
949
            )
950
            report.changed(
8✔
951
                build_job(
952
                    'Sample job where something changed',
953
                    'https://example.com/changed',
954
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
955
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
956
                )
957
            )
958
            report.unchanged(
8✔
959
                build_job(
960
                    'Sample job where nothing changed',
961
                    'http://example.com/unchanged',
962
                    'Same Old, Same Old\n',
963
                    'Same Old, Same Old\n',
964
                )
965
            )
966
            report.error(
8✔
967
                set_error(
968
                    build_job(
969
                        'Sample job where an error was encountered',
970
                        'https://example.com/error',
971
                        '',
972
                        '',
973
                    ),
974
                    'The error message would appear here.',
975
                )
976
            )
977

978
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
8✔
979

980
        return 0
8✔
981

982
    def check_smtp_login(self) -> None:
8✔
983
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
8✔
984
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
8✔
985

986
        success = True
8✔
987

988
        if not config['enabled']:
8✔
989
            print('Please enable email reporting in the config first.')
8✔
990
            success = False
8✔
991

992
        if config['method'] != 'smtp':
8✔
993
            print('Please set the method to SMTP for the email reporter.')
8✔
994
            success = False
8✔
995

996
        smtp_auth = smtp_config['auth']
8✔
997
        if not smtp_auth:
8✔
998
            print('Authentication must be enabled for SMTP.')
8✔
999
            success = False
8✔
1000

1001
        smtp_hostname = smtp_config['host']
8✔
1002
        if not smtp_hostname:
8✔
1003
            print('Please configure the SMTP hostname in the config first.')
8✔
1004
            success = False
8✔
1005

1006
        smtp_username = smtp_config['user'] or config['from']
8✔
1007
        if not smtp_username:
8✔
1008
            print('Please configure the SMTP user in the config first.')
8✔
1009
            success = False
8✔
1010

1011
        if not success:
8✔
1012
            self._exit(1)
8✔
1013

1014
        insecure_password = smtp_config['insecure_password']
2✔
1015
        if insecure_password:
2!
1016
            print('The SMTP password is set in the config file (key "insecure_password")')
2✔
1017
        elif smtp_have_password(smtp_hostname, smtp_username):
×
1018
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
1019
            if not input(message).lower().startswith('y'):
×
1020
                print('Password unchanged.')
×
1021
            else:
1022
                smtp_set_password(smtp_hostname, smtp_username)
×
1023

1024
        smtp_port = smtp_config['port']
2✔
1025
        smtp_tls = smtp_config['starttls']
2✔
1026

1027
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
2✔
1028
        print('Trying to log into the SMTP server...')
2✔
1029
        mailer.send(None)
2✔
1030
        print('Successfully logged into SMTP server')
×
1031

1032
        self._exit(0)
×
1033

1034
    def check_xmpp_login(self) -> None:
8✔
1035
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
8✔
1036

1037
        success = True
8✔
1038

1039
        if not xmpp_config['enabled']:
8✔
1040
            print('Please enable XMPP reporting in the config first.')
8✔
1041
            success = False
8✔
1042

1043
        xmpp_sender = xmpp_config['sender']
8✔
1044
        if not xmpp_sender:
8✔
1045
            print('Please configure the XMPP sender in the config first.')
8✔
1046
            success = False
8✔
1047

1048
        if not xmpp_config['recipient']:
8✔
1049
            print('Please configure the XMPP recipient in the config first.')
8✔
1050
            success = False
8✔
1051

1052
        if not success:
8✔
1053
            self._exit(1)
8✔
1054

1055
        if 'insecure_password' in xmpp_config:
8!
1056
            print('The XMPP password is already set in the config (key "insecure_password").')
8✔
1057
            self._exit(0)
8✔
1058

1059
        if xmpp_have_password(xmpp_sender):
×
1060
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
1061
            if input(message).lower() != 'y':
×
1062
                print('Password unchanged.')
×
1063
                self._exit(0)
×
1064

1065
        if success:
×
1066
            xmpp_set_password(xmpp_sender)
×
1067

1068
        self._exit(0)
×
1069

1070
    @staticmethod
2✔
1071
    def playwright_install_chrome() -> int:  # pragma: no cover
1072
        """
1073
        Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
1074
        install the browser executable.
1075

1076
        :return: Playwright's executable return code.
1077
        """
1078
        try:
1079
            from playwright._impl._driver import compute_driver_executable
1080
        except ImportError:  # pragma: no cover
1081
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
1082

1083
        driver_executable = compute_driver_executable()
1084
        env = os.environ.copy()
1085
        env['PW_CLI_TARGET_LANG'] = 'python'
1086
        cmd = [str(driver_executable), 'install', 'chrome']
1087
        logger.info(f"Running playwright CLI: {' '.join(cmd)}")
1088
        completed_process = subprocess.run(cmd, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
1089
        if completed_process.returncode:
1090
            print(completed_process.stderr)
1091
            return completed_process.returncode
1092
        if completed_process.stdout:
1093
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1094
        return 0
1095

1096
    def handle_actions(self) -> None:
8✔
1097
        """Handles the actions for command line arguments and exits."""
1098
        if self.urlwatch_config.list_jobs:
8✔
1099
            self.list_jobs(self.urlwatch_config.list_jobs)
8✔
1100
            self._exit(0)
8✔
1101

1102
        if self.urlwatch_config.errors:
8✔
1103
            self._exit(self.list_error_jobs())
8✔
1104

1105
        if self.urlwatch_config.test_job:
8✔
1106
            self.test_job(self.urlwatch_config.test_job)
8✔
1107
            self._exit(0)
8✔
1108

1109
        if self.urlwatch_config.test_differ:
8✔
1110
            self._exit(self.test_differ(self.urlwatch_config.test_differ))
8✔
1111

1112
        if self.urlwatch_config.dump_history:
8✔
1113
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
8✔
1114

1115
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
8✔
1116
            self._exit(self.modify_urls())
8✔
1117

1118
        if self.urlwatch_config.test_reporter:
8✔
1119
            self._exit(self.check_test_reporter())
8✔
1120

1121
        if self.urlwatch_config.smtp_login:
8✔
1122
            self.check_smtp_login()
8✔
1123

1124
        if self.urlwatch_config.telegram_chats:
8✔
1125
            self.check_telegram_chats()
8✔
1126

1127
        if self.urlwatch_config.xmpp_login:
8✔
1128
            self.check_xmpp_login()
8✔
1129

1130
        if self.urlwatch_config.edit:
8✔
1131
            self._exit(self.urlwatcher.jobs_storage.edit())
8✔
1132

1133
        if self.urlwatch_config.edit_config:
8✔
1134
            self._exit(self.edit_config())
8✔
1135

1136
        if self.urlwatch_config.edit_hooks:
8✔
1137
            self._exit(self.edit_hooks())
8✔
1138

1139
        if self.urlwatch_config.gc_database:
8✔
1140
            self.urlwatcher.ssdb_storage.gc(
8!
1141
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1142
            )
1143
            self.urlwatcher.ssdb_storage.close()
8✔
1144
            self._exit(0)
8✔
1145

1146
        if self.urlwatch_config.clean_database:
8✔
1147
            self.urlwatcher.ssdb_storage.clean_ssdb(
8!
1148
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1149
            )
1150
            self.urlwatcher.ssdb_storage.close()
8✔
1151
            self._exit(0)
8✔
1152

1153
        if self.urlwatch_config.rollback_database:
8✔
1154
            tz = self.urlwatcher.report.config['report']['tz']
8✔
1155
            self.urlwatcher.ssdb_storage.rollback_cache(self.urlwatch_config.rollback_database, tz)
8✔
1156
            self.urlwatcher.ssdb_storage.close()
8✔
1157
            self._exit(0)
8✔
1158

1159
        if self.urlwatch_config.delete_snapshot:
8✔
1160
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
8✔
1161

1162
        if self.urlwatch_config.features:
8✔
1163
            self._exit(self.show_features())
8✔
1164

1165
        if self.urlwatch_config.detailed_versions:
8!
1166
            self._exit(self.show_detailed_versions())
8✔
1167

1168
    def run(self) -> None:  # pragma: no cover
1169
        """The main run logic."""
1170
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1171
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1172

1173
        self.handle_actions()
1174

1175
        self.urlwatcher.run_jobs()
1176

1177
        self.urlwatcher.close()
1178

1179
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc