• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 13100552072

02 Feb 2025 04:30PM UTC coverage: 75.393% (-0.2%) from 75.597%
13100552072

push

github

mborsetti
Version 3.27.0b3

1712 of 2597 branches covered (65.92%)

Branch coverage included in aggregate %.

4517 of 5665 relevant lines covered (79.74%)

6.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.08
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import difflib
8✔
8
import email.utils
8✔
9
import gc
8✔
10
import importlib.metadata
8✔
11
import logging
8✔
12
import os
8✔
13
import platform
8✔
14
import re
8✔
15
import shutil
8✔
16
import sqlite3
8✔
17
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
18
import sys
8✔
19
import time
8✔
20
import traceback
8✔
21
from concurrent.futures import ThreadPoolExecutor
8✔
22
from contextlib import ExitStack
8✔
23
from datetime import datetime
8✔
24
from pathlib import Path
8✔
25
from typing import Iterable, Iterator, TYPE_CHECKING
8✔
26
from urllib.parse import unquote_plus
8✔
27
from zoneinfo import ZoneInfo
8✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
8✔
30
from webchanges.differs import DifferBase
8✔
31
from webchanges.filters import FilterBase
8✔
32
from webchanges.handler import JobState, Report
8✔
33
from webchanges.jobs import JobBase, NotModifiedError, UrlJob
8✔
34
from webchanges.mailer import smtp_have_password, smtp_set_password, SMTPMailer
8✔
35
from webchanges.reporters import ReporterBase, xmpp_have_password, xmpp_set_password
8✔
36
from webchanges.util import dur_text, edit_file, import_module_from_source
8✔
37

38
try:
8✔
39
    import httpx
8✔
40
except ImportError:  # pragma: no cover
41
    httpx = None  # type: ignore[assignment]
42
    print("Required package 'httpx' not found; will attempt to run using 'requests'")
43
    try:
44
        import requests
45
    except ImportError as e:  # pragma: no cover
46
        raise RuntimeError(
47
            f"A Python HTTP client package (either 'httpx' or 'requests' is required to run {__project_name__}; "
48
            'neither can be imported.'
49
        ) from e
50
if httpx is not None:
8!
51
    try:
8✔
52
        import h2
8✔
53
    except ImportError:  # pragma: no cover
54
        h2 = None  # type: ignore[assignment]
55

56
try:
8✔
57
    import apt
8✔
58
except ImportError:  # pragma: no cover
59
    apt = None  # type: ignore[assignment]
60

61
try:
8✔
62
    from pip._internal.metadata import get_default_environment
8✔
63
except ImportError:  # pragma: no cover
64
    get_default_environment = None  # type: ignore[assignment]
65

66
try:
8✔
67
    from playwright.sync_api import sync_playwright
8✔
68
except ImportError:  # pragma: no cover
69
    sync_playwright = None  # type: ignore[assignment]
70

71
try:
8✔
72
    import psutil
8✔
73
    from psutil._common import bytes2human
8✔
74
except ImportError:  # pragma: no cover
75
    psutil = None  # type: ignore[assignment]
76
    bytes2human = None  # type: ignore[assignment]
77

78
logger = logging.getLogger(__name__)
8✔
79

80
if TYPE_CHECKING:
81
    from webchanges.main import Urlwatch
82
    from webchanges.reporters import _ConfigReportersList
83
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
84

85

86
class UrlwatchCommand:
8✔
87
    """The class that runs the program after initialization and CLI arguments parsing."""
88

89
    def __init__(self, urlwatcher: Urlwatch) -> None:
8✔
90
        self.urlwatcher = urlwatcher
8✔
91
        self.urlwatch_config = urlwatcher.urlwatch_config
8✔
92

93
    @staticmethod
8✔
94
    def _exit(arg: str | int | None) -> None:
8✔
95
        logger.info(f'Exiting with exit code {arg}')
8✔
96
        sys.exit(arg)
8✔
97

98
    def jobs_from_joblist(self) -> Iterator[JobBase]:
8✔
99
        """Generates the jobs to process from the joblist entered in the CLI."""
100
        if self.urlwatcher.urlwatch_config.joblist:
8✔
101
            jobs = {self._find_job(job_entry) for job_entry in self.urlwatcher.urlwatch_config.joblist}
8!
102
            enabled_jobs = {job for job in jobs if job.is_enabled()}
8!
103
            disabled = len(enabled_jobs) - len(jobs)
8✔
104
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
105
            logger.debug(
8✔
106
                f"Processing {len(enabled_jobs)} job{'s' if len(enabled_jobs) else ''}{disabled_str} as specified in "
107
                f"command line: {', '.join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}"
108
            )
109
        else:
110
            enabled_jobs = {job for job in self.urlwatcher.jobs if job.is_enabled()}
8!
111
            disabled = len(enabled_jobs) - len(self.urlwatcher.jobs)
8✔
112
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
113
            logger.debug(f"Processing {len(enabled_jobs)} job{'s' if len(enabled_jobs) else ''}{disabled_str}")
8✔
114
        for job in enabled_jobs:
8✔
115
            yield job.with_defaults(self.urlwatcher.config_storage.config)
8✔
116

117
    def edit_hooks(self) -> int:
8✔
118
        """Edit hooks file.
119

120
        :returns: 0 if edit is successful, 1 otherwise.
121
        """
122
        # Similar code to BaseTextualFileStorage.edit()
123
        for hooks_file in self.urlwatch_config.hooks_files:
8✔
124
            logger.debug(f'Edit file {hooks_file}')
8✔
125
            # Python 3.9: hooks_edit = self.urlwatch_config.hooks.with_stem(self.urlwatch_config.hooks.stem + '_edit')
126
            hooks_edit = hooks_file.parent.joinpath(hooks_file.stem + '_edit' + ''.join(hooks_file.suffixes))
8✔
127
            if hooks_file.exists():
8!
128
                shutil.copy(hooks_file, hooks_edit)
8✔
129
            # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
130
            #         self.urlwatch_config.hooks_py_example):
131
            #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
132

133
            while True:
8✔
134
                try:
8✔
135
                    edit_file(hooks_edit)
8✔
136
                    import_module_from_source('hooks', hooks_edit)
8✔
137
                    break  # stop if no exception on parser
8✔
138
                except SystemExit:
8!
139
                    raise
×
140
                except Exception as e:
8✔
141
                    print('Parsing failed:')
8✔
142
                    print('======')
8✔
143
                    print(e)
8✔
144
                    print('======')
8✔
145
                    print('')
8✔
146
                    print(f'The file {hooks_file} was NOT updated.')
8✔
147
                    user_input = input('Do you want to retry the same edit? (Y/n)')
8✔
148
                    if not user_input or user_input.lower()[0] == 'y':
×
149
                        continue
×
150
                    hooks_edit.unlink()
×
151
                    print('No changes have been saved.')
×
152
                    return 1
×
153

154
            if hooks_file.is_symlink():
8!
155
                hooks_file.write_text(hooks_edit.read_text())
×
156
            else:
157
                hooks_edit.replace(hooks_file)
8✔
158
            hooks_edit.unlink(missing_ok=True)
8✔
159
            print(f'Saved edits in {hooks_file}')
8✔
160

161
        return 0
8✔
162

163
    @staticmethod
8✔
164
    def show_features() -> int:
8✔
165
        """
166
        Prints the "features", i.e. a list of job types, filters and reporters.
167

168
        :return: 0.
169
        """
170
        print(f'Please see full documentation at {__docs_url__}')
8✔
171
        print()
8✔
172
        print('Supported jobs:\n')
8✔
173
        print(JobBase.job_documentation())
8✔
174
        print('Supported filters:\n')
8✔
175
        print(FilterBase.filter_documentation())
8✔
176
        print()
8✔
177
        print('Supported differs:\n')
8✔
178
        print(DifferBase.differ_documentation())
8✔
179
        print()
8✔
180
        print('Supported reporters:\n')
8✔
181
        print(ReporterBase.reporter_documentation())
8✔
182
        print()
8✔
183
        print(f'Please see full documentation at {__docs_url__}')
8✔
184

185
        return 0
8✔
186

187
    @staticmethod
8✔
188
    def show_detailed_versions() -> int:
8✔
189
        """
190
        Prints the detailed versions, including of dependencies.
191

192
        :return: 0.
193
        """
194

195
        def dependencies() -> list[str]:
8✔
196
            if get_default_environment is not None:
8!
197
                env = get_default_environment()
8✔
198
                dist = None
8✔
199
                for dist in env.iter_all_distributions():
8✔
200
                    if dist.canonical_name == __project_name__:
8!
201
                        break
×
202
                if dist and dist.canonical_name == __project_name__:
8!
203
                    return sorted(set(d.split()[0] for d in dist.metadata_dict['requires_dist']), key=str.lower)
×
204

205
            # default list of all possible dependencies
206
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
8✔
207
            return [
8✔
208
                'aioxmpp',
209
                'beautifulsoup4',
210
                'chump',
211
                'colorama',
212
                'cryptography',
213
                'cssbeautifier',
214
                'cssselect',
215
                'deepdiff',
216
                'h2',
217
                'html2text',
218
                'httpx',
219
                'jq',
220
                'jsbeautifier',
221
                'keyring',
222
                'lxml',
223
                'markdown2',
224
                'matrix_client',
225
                'msgpack',
226
                'pdftotext',
227
                'Pillow',
228
                'platformdirs',
229
                'playwright',
230
                'psutil',
231
                'pushbullet.py',
232
                'pypdf',
233
                'pytesseract',
234
                'pyyaml',
235
                'redis',
236
                'requests',
237
                'tzdata',
238
                'vobject',
239
            ]
240

241
        print('Software:')
8✔
242
        print(f'• {__project_name__}: {__version__}')
8✔
243
        print(
8✔
244
            f'• {platform.python_implementation()}: {platform.python_version()} '
245
            f'{platform.python_build()} {platform.python_compiler()}'
246
        )
247
        print(f'• SQLite: {sqlite3.sqlite_version}')
8✔
248

249
        if psutil:
8!
250
            print()
8✔
251
            print('System:')
8✔
252
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
8✔
253
            print(f'• Processor: {platform.processor()}')
8✔
254
            print(f'• CPUs (logical): {psutil.cpu_count()}')
8✔
255
            try:
8✔
256
                virt_mem = psutil.virtual_memory().available
8✔
257
                print(
8✔
258
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
259
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
260
                )
261
            except psutil.Error as e:  # pragma: no cover
262
                print(f'• Free memory: Could not read information: {e}')
263
            print(
8✔
264
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
265
                f"({100 - psutil.disk_usage('/').percent:.1f}%)"
266
            )
267
            executor = ThreadPoolExecutor()
8✔
268
            print(f'• --max-threads default: {executor._max_workers}')
8✔
269

270
        print()
8✔
271
        print('Installed PyPi dependencies:')
8✔
272
        for module_name in dependencies():
8✔
273
            try:
8✔
274
                mod = importlib.metadata.distribution(module_name)
8✔
275
            except ModuleNotFoundError:
8✔
276
                continue
8✔
277
            print(f'• {module_name}: {mod.version}')
8✔
278
            # package requirements
279
            if mod.requires:
8✔
280
                for req_name in [i.split()[0] for i in mod.requires]:
8!
281
                    try:
8✔
282
                        req = importlib.metadata.distribution(req_name)
8✔
283
                    except ModuleNotFoundError:
8✔
284
                        continue
8✔
285
                    print(f'  - {req_name}: {req.version}')
8✔
286

287
        # playwright
288
        if sync_playwright is not None:
8!
289
            with sync_playwright() as p:
8!
290
                browser = p.chromium.launch(channel='chrome')
×
291
                print()
×
292
                print('Playwright browser:')
×
293
                print(f'• Name: {browser.browser_type.name}')
×
294
                print(f'• Version: {browser.version}')
×
295
                if psutil:
×
296
                    browser.new_page()
×
297
                    try:
×
298
                        virt_mem = psutil.virtual_memory().available
×
299
                        print(
×
300
                            f'• Free memory with browser loaded: {bytes2human(virt_mem)} physical plus '
301
                            f'{bytes2human(psutil.swap_memory().free)} swap'
302
                        )
303
                    except psutil.Error:
×
304
                        pass
×
305

306
        if os.name == 'posix' and apt:
8!
307
            apt_cache = apt.Cache()
×
308

309
            def print_version(libs: list[str]) -> None:
×
310
                for lib in libs:
×
311
                    if lib in apt_cache:
×
312
                        if ver := apt_cache[lib].versions:
×
313
                            print(f'   - {ver[0].package}: {ver[0].version}')
×
314
                return None
×
315

316
            print()
×
317
            print('Installed dpkg dependencies:')
×
318
            for module, apt_dists in (
×
319
                ('jq', ['jq']),
320
                # https://github.com/jalan/pdftotext#os-dependencies
321
                ('pdftotext', ['libpoppler-cpp-dev']),
322
                # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
323
                (
324
                    'Pillow',
325
                    [
326
                        'libjpeg-dev',
327
                        'zlib-dev',
328
                        'zlib1g-dev',
329
                        'libtiff-dev',
330
                        'libfreetype-dev',
331
                        'littlecms-dev',
332
                        'libwebp-dev',
333
                        'tcl/tk-dev',
334
                        'openjpeg-dev',
335
                        'libimagequant-dev',
336
                        'libraqm-dev',
337
                        'libxcb-dev',
338
                        'libxcb1-dev',
339
                    ],
340
                ),
341
                ('playwright', ['google-chrome-stable']),
342
                # https://tesseract-ocr.github.io/tessdoc/Installation.html
343
                ('pytesseract', ['tesseract-ocr']),
344
            ):
345
                try:
×
346
                    importlib.metadata.distribution(module)
×
347
                    print(f'• {module}')
×
348
                    print_version(apt_dists)
×
349
                except importlib.metadata.PackageNotFoundError:
×
350
                    pass
×
351
        return 0
8✔
352

353
    def list_jobs(self, regex: bool | str) -> None:
8✔
354
        """
355
        Lists the job and their respective _index_number.
356

357
        :return: None.
358
        """
359
        if isinstance(regex, str):
8!
360
            print(f"List of jobs matching the RegEx '{regex}':")
×
361
        else:
362
            print('List of jobs:')
8✔
363
        for job in self.urlwatcher.jobs:
8✔
364
            if self.urlwatch_config.verbose:
8✔
365
                job_desc = f'{job.index_number:3}: {job!r}'
8✔
366
            else:
367
                pretty_name = job.pretty_name()
8✔
368
                location = job.get_location()
8✔
369
                if pretty_name != location:
8!
370
                    job_desc = f'{job.index_number:3}: {pretty_name} ({location})'
8✔
371
                else:
372
                    job_desc = f'{job.index_number:3}: {pretty_name}'
×
373
            if isinstance(regex, bool) or re.findall(regex, job_desc):
8!
374
                print(job_desc)
8✔
375

376
        if len(self.urlwatch_config.jobs_files) > 1:
8✔
377
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
8!
378
        elif len(self.urlwatch_config.jobs_files) == 1:
8✔
379
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
8✔
380
        else:
381
            jobs_files = []
8✔
382
        print('\n   '.join(jobs_files))
8✔
383

384
    def _find_job(self, query: str | int) -> JobBase:
8✔
385
        """Finds the job based on a query, which is matched to the job index (also negative) or a job location
386
        (i.e. the url/user_visible_url or command).
387

388
        :param query: The query.
389
        :return: The matching JobBase.
390
        :raises IndexError: If job is not found.
391
        """
392
        if isinstance(query, int):
8✔
393
            index = query
8✔
394
        else:
395
            try:
8✔
396
                index = int(query)
8✔
397
            except ValueError:
8✔
398
                query = unquote_plus(query)
8✔
399
                try:
8✔
400
                    return next((job for job in self.urlwatcher.jobs if unquote_plus(job.get_location()) == query))
8✔
401
                except StopIteration:
8✔
402
                    raise ValueError(f"Job {query} does not match any job's url/user_visible_url or command.") from None
8✔
403

404
        if index == 0:
8✔
405
            raise ValueError(f'Job index {index} out of range.')
8✔
406
        try:
8✔
407
            if index <= 0:
8✔
408
                return self.urlwatcher.jobs[index]
8✔
409
            else:
410
                return self.urlwatcher.jobs[index - 1]
8✔
411
        except IndexError as e:
8✔
412
            raise ValueError(f'Job index {index} out of range (found {len(self.urlwatcher.jobs)} jobs).') from e
8✔
413

414
    def _find_job_with_defaults(self, query: str | int) -> JobBase:
8✔
415
        """
416
        Returns the job with defaults based on job_id, which could match an index or match a location
417
        (url/user_visible_url or command). Accepts negative numbers.
418

419
        :param query: The query.
420
        :return: The matching JobBase with defaults.
421
        :raises SystemExit: If job is not found.
422
        """
423
        job = self._find_job(query)
8✔
424
        return job.with_defaults(self.urlwatcher.config_storage.config)
8✔
425

426
    def test_job(self, job_id: bool | str | int) -> None:
8✔
427
        """
428
        Tests the running of a single job outputting the filtered text to stdout. If job_id is True, don't run any
429
        jobs but load config, jobs and hook files to trigger any syntax errors.
430

431
        :param job_id: The job_id or True.
432

433
        :return: None.
434

435
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
436
        """
437
        if job_id is True:  # Load to trigger any eventual syntax errors
8✔
438
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
8✔
439
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
8✔
440
            if len(self.urlwatch_config.jobs_files) == 1:
8✔
441
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]},')
8✔
442
            else:
443
                message.append(
8!
444
                    '\n   '.join(
445
                        [f'{conj}jobs files'] + [f'• {file},' for file in sorted(self.urlwatch_config.jobs_files)]
446
                    )
447
                )
448
            if 'hooks' in sys.modules:
8!
449
                message.append(f"\nand hooks file {sys.modules['hooks'].__file__}")
8✔
450
            print(f"{''.join(message)}.")
8✔
451
            return
8✔
452

453
        job = self._find_job_with_defaults(job_id)
8✔
454

455
        if isinstance(job, UrlJob):
8!
456
            # Force re-retrieval of job, as we're testing filters
457
            job.ignore_cached = True
×
458

459
        # Add defaults, as if when run
460
        job = job.with_defaults(self.urlwatcher.config_storage.config)
8✔
461

462
        with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
463
            job_state.process(headless=not self.urlwatch_config.no_headless)
8✔
464
            # duration = time.perf_counter() - start
465
            if self.urlwatch_config.test_reporter is None:
8✔
466
                self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
467
            report = Report(self.urlwatcher)
8✔
468
            report.job_states = []  # required
8✔
469
            errorlevel = self.check_test_reporter(
8✔
470
                job_state,
471
                label='error' if job_state.exception else 'new',
472
                report=report,
473
            )
474
            if errorlevel:
8!
475
                self._exit(errorlevel)
×
476
        return
8✔
477

478
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
479
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
480

481
    def test_differ(self, arg_test_differ: list[str]) -> int:
8✔
482
        """
483
        Runs diffs for a job on all the saved snapshots and outputs the result to stdout or whatever reporter is
484
        selected with --test-reporter.
485

486
        :param arg_test_differ: Either the job_id or a list containing [job_id, max_diffs]
487
        :return: 1 if error, 0 if successful.
488
        """
489
        report = Report(self.urlwatcher)
8✔
490
        self.urlwatch_config.jobs_files = [Path('--test-differ')]  # for report footer
8✔
491
        if len(arg_test_differ) == 1:
8✔
492
            job_id = arg_test_differ[0]
8✔
493
            max_diffs = None
8✔
494
        elif len(arg_test_differ) == 2:
8!
495
            job_id, max_diffs_str = arg_test_differ
8✔
496
            max_diffs = int(max_diffs_str)
8✔
497
        else:
498
            raise ValueError('--test-differ takes a maximum of two arguments')
×
499

500
        job = self._find_job_with_defaults(job_id)
8✔
501

502
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid())
8✔
503

504
        num_snapshots = len(history_data)
8✔
505
        if num_snapshots == 0:
8✔
506
            print('This job has never been run before.')
8✔
507
            return 1
8✔
508
        elif num_snapshots < 2:
8✔
509
            print('Not enough historic data available (need at least 2 different snapshots).')
8✔
510
            return 1
8✔
511

512
        if job.compared_versions and job.compared_versions != 1:
8!
513
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
514

515
        max_diffs = max_diffs or num_snapshots - 1
8✔
516
        for i in range(max_diffs):
8✔
517
            with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
518
                job_state.new_data = history_data[i].data
8✔
519
                job_state.new_timestamp = history_data[i].timestamp
8✔
520
                job_state.new_etag = history_data[i].etag
8✔
521
                job_state.new_mime_type = history_data[i].mime_type
8✔
522
                if not job.compared_versions or job.compared_versions == 1:
8!
523
                    job_state.old_data = history_data[i + 1].data
8✔
524
                    job_state.old_timestamp = history_data[i + 1].timestamp
8✔
525
                    job_state.old_etag = history_data[i + 1].etag
8✔
526
                    job_state.old_mime_type = history_data[i + 1].mime_type
8✔
527
                else:
528
                    history_dic_snapshots = {s.data: s for s in history_data[i + 1 : i + 1 + job.compared_versions]}
×
529
                    close_matches: list[str] = difflib.get_close_matches(
×
530
                        str(job_state.new_data), history_dic_snapshots.keys(), n=1  # type: ignore[arg-type]
531
                    )
532
                    if close_matches:
×
533
                        job_state.old_data = close_matches[0]
×
534
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
535
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
536
                        job_state.old_mime_type = history_dic_snapshots[close_matches[0]].mime_type
×
537

538
                if self.urlwatch_config.test_reporter is None:
8✔
539
                    self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
540
                report.job_states = []  # required
8✔
541
                if job_state.new_data == job_state.old_data:
8!
542
                    label = (
×
543
                        f'No change (snapshots {-i:2} AND {-(i + 1):2}) with '
544
                        f"'compared_versions: {job.compared_versions}'"
545
                    )
546
                    job_state.verb = 'changed,no_report'
×
547
                else:
548
                    label = f'Filtered diff (snapshots {-i:2} and {-(i + 1):2})'
8✔
549
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
8✔
550
                if errorlevel:
8!
551
                    self._exit(errorlevel)
×
552

553
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
554
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
555

556
        return 0
8✔
557

558
    def dump_history(self, job_id: str) -> int:
8✔
559
        """
560
        Displays the historical data stored in the snapshot database for a job.
561

562
        :param job_id: The Job ID.
563
        :return: An argument to be used in sys.exit.
564
        """
565

566
        job = self._find_job_with_defaults(job_id)
8✔
567
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid())
8✔
568

569
        print(f'History for job {job.get_indexed_location()}:')
8✔
570
        print(f'(ID: {job.get_guid()})')
8✔
571
        total_failed = 0
8✔
572
        if history_data:
8✔
573
            print('=' * 50)
8✔
574
        for i, snapshot in enumerate(history_data):
8✔
575
            mime_type = f'; {snapshot.mime_type}' if snapshot.mime_type else ''
8✔
576
            etag = f'; ETag: {snapshot.etag}' if snapshot.etag else ''
8✔
577
            tries = f'; error run (number {snapshot.tries})' if snapshot.tries else ''
8✔
578
            total_failed += snapshot.tries > 0
8✔
579
            tz = self.urlwatcher.report.config['report']['tz']
8✔
580
            tzinfo = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
8✔
581
            dt = datetime.fromtimestamp(snapshot.timestamp, tzinfo)
8✔
582
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{mime_type}{etag}{tries}'
8✔
583
            sep_len = max(50, len(header))
8✔
584
            print(header)
8✔
585
            print('-' * sep_len)
8✔
586
            if snapshot.error_data:
8!
587
                print(f"{snapshot.error_data['type']}: {snapshot.error_data['message']}")
×
588
                print()
×
589
                print('Last good data:')
×
590
            print(snapshot.data)
8✔
591
            print('=' * sep_len, '\n')
8✔
592

593
        print(
8✔
594
            f'Found {len(history_data) - total_failed}'
595
            + (' good' if total_failed else '')
596
            + ' snapshot'
597
            + ('s' if len(history_data) - total_failed != 1 else '')
598
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
599
            + '.'
600
        )
601

602
        return 0
8✔
603

604
    def list_error_jobs(self) -> int:
8✔
605
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
8✔
606
            print(f'Invalid reporter {self.urlwatch_config.errors}')
8✔
607
            return 1
8✔
608

609
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Iterator[str]:
8✔
610
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
611

612
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
613
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
614
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
615
            """
616

617
            def job_runner(
8✔
618
                stack: ExitStack,
619
                jobs: Iterable[JobBase],
620
                max_workers: int | None = None,
621
            ) -> Iterator[str]:
622
                """
623
                Modified worker.job_runner that yields error text for jobs who fail with an exception or yield no data.
624

625
                :param stack: The context manager.
626
                :param jobs: The jobs to run.
627
                :param max_workers: The number of maximum workers for ThreadPoolExecutor.
628
                :return: error text for jobs who fail with an exception or yield no data.
629
                """
630
                executor = ThreadPoolExecutor(max_workers=max_workers)
8✔
631

632
                for job_state in executor.map(
8✔
633
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
634
                    (stack.enter_context(JobState(self.urlwatcher.ssdb_storage, job)) for job in jobs),
635
                ):
636
                    if job_state.exception is None or isinstance(job_state.exception, NotModifiedError):
8✔
637
                        if (
8!
638
                            len(job_state.new_data.strip()) == 0
639
                            if hasattr(job_state, 'new_data')
640
                            else len(job_state.old_data.strip()) == 0
641
                        ):
642
                            if self.urlwatch_config.verbose:
×
643
                                yield f'{job_state.job.index_number:3}: No data: {job_state.job!r}'
×
644
                            else:
645
                                pretty_name = job_state.job.pretty_name()
×
646
                                location = job_state.job.get_location()
×
647
                                if pretty_name != location:
×
648
                                    yield f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})'
×
649
                                else:
650
                                    yield f'{job_state.job.index_number:3}: No data: {pretty_name}'
×
651
                    else:
652
                        pretty_name = job_state.job.pretty_name()
8✔
653
                        location = job_state.job.get_location()
8✔
654
                        if pretty_name != location:
8!
655
                            yield (
8✔
656
                                f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
657
                                f'({location})'
658
                            )
659
                        else:
660
                            yield f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})'
×
661

662
            with ExitStack() as stack:
8✔
663
                # This code is from worker.run_jobs, modified to yield from job_runner.
664
                from webchanges.worker import get_virt_mem  # avoid circular imports
8✔
665

666
                # run non-BrowserJob jobs first
667
                jobs_to_run = [job for job in jobs if not job.__is_browser__]
8!
668
                if jobs_to_run:
8!
669
                    logger.debug(
8✔
670
                        "Running jobs that do not require Chrome (without 'use_browser: true') in parallel with "
671
                        "Python's default max_workers."
672
                    )
673
                    yield from job_runner(stack, jobs_to_run, self.urlwatch_config.max_workers)
8✔
674
                else:
675
                    logger.debug("Found no jobs that do not require Chrome (i.e. without 'use_browser: true').")
×
676

677
                # run BrowserJob jobs after
678
                jobs_to_run = [job for job in jobs if job.__is_browser__]
8!
679
                if jobs_to_run:
8!
680
                    gc.collect()
×
681
                    virt_mem = get_virt_mem()
×
682
                    if self.urlwatch_config.max_workers:
×
683
                        max_workers = self.urlwatch_config.max_workers
×
684
                    else:
685
                        max_workers = max(int(virt_mem / 200e6), 1)
×
686
                        max_workers = min(max_workers, os.cpu_count() or 1)
×
687
                    logger.debug(
×
688
                        f"Running jobs that require Chrome (i.e. with 'use_browser: true') in parallel with "
689
                        f'{max_workers} max_workers.'
690
                    )
691
                    yield from job_runner(stack, jobs_to_run, max_workers)
×
692
                else:
693
                    logger.debug("Found no jobs that require Chrome (i.e. with 'use_browser: true').")
8✔
694

695
        start = time.perf_counter()
8✔
696
        if len(self.urlwatch_config.jobs_files) == 1:
8!
697
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
8✔
698
        else:
699
            jobs_files = ['in the concatenation of the jobs files'] + [
×
700
                f'• {file},' for file in self.urlwatch_config.jobs_files
701
            ]
702
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)'] + jobs_files)
8✔
703

704
        jobs = {
8!
705
            job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs if job.is_enabled()
706
        }
707
        if self.urlwatch_config.errors == 'stdout':
8!
708
            print(header)
8✔
709
            for line in error_jobs_lines(jobs):
8✔
710
                print(line)
8✔
711
            print('--')
8✔
712
            duration = time.perf_counter() - start
8✔
713
            print(f"Checked {len(jobs)} enabled job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
8✔
714

715
        else:
716
            message = '\n'.join(error_jobs_lines(jobs))
×
717
            if message:
×
718
                # create a dummy job state to run a reporter on
719
                job_state = JobState(
×
720
                    None,  # type: ignore[arg-type]
721
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
722
                )
723
                job_state.traceback = f'{header}\n{message}'
×
724
                duration = time.perf_counter() - start
×
725
                self.urlwatcher.report.config['footnote'] = (
×
726
                    f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}."
727
                )
728
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
729
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
730
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
731
                self.urlwatcher.report.error(job_state)
×
732
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
733
            else:
734
                print(header)
×
735
                print('--')
×
736
                duration = time.perf_counter() - start
×
737
                print('Found no errors')
×
738
                print(f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
×
739

740
        return 0
8✔
741

742
    def delete_snapshot(self, job_id: str | int) -> int:
8✔
743
        job = self._find_job_with_defaults(job_id)
8✔
744

745
        deleted = self.urlwatcher.ssdb_storage.delete_latest(job.get_guid())
8✔
746
        if deleted:
8✔
747
            print(f'Deleted last snapshot of {job.get_indexed_location()}')
8✔
748
            return 0
8✔
749
        else:
750
            print(f'No snapshots found to be deleted for {job.get_indexed_location()}')
8✔
751
            return 1
8✔
752

753
    def modify_urls(self) -> int:
8✔
754
        if self.urlwatch_config.delete is not None:
8✔
755
            job = self._find_job(self.urlwatch_config.delete)
8✔
756
            if job is not None:
8!
757
                self.urlwatcher.jobs.remove(job)
8✔
758
                print(f'Removed {job}')
8✔
759
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
760
            else:
761
                print(f'Job not found: {self.urlwatch_config.delete}')
×
762
                return 1
×
763

764
        if self.urlwatch_config.add is not None:
8✔
765
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
766
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
8!
767
            filters = [v for k, v in items if k == 'filter']
8!
768
            items2 = [(k, v) for k, v in items if k != 'filter']
8!
769
            d = {k: v for k, v in items2}
8!
770
            if filters:
8!
771
                d['filter'] = ','.join(filters)
×
772

773
            job = JobBase.unserialize(d)
8✔
774
            print(f'Adding {job}')
8✔
775
            self.urlwatcher.jobs.append(job)
8✔
776
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
777

778
        if self.urlwatch_config.change_location is not None:
8✔
779
            new_loc = self.urlwatch_config.change_location[1]
8✔
780
            # Ensure the user isn't overwriting an existing job with the change.
781
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
8!
782
                print(
×
783
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
784
                    f'different value.\n'
785
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
786
                )
787
                return 1
×
788
            else:
789
                job = self._find_job(self.urlwatch_config.change_location[0])
8✔
790
                if job is not None:
8!
791
                    # Update the job's location (which will also update the guid) and move any history in the database
792
                    # over to the job's updated guid.
793
                    old_loc = job.get_location()
8✔
794
                    print(f'Moving location of "{old_loc}" to "{new_loc}"')
8✔
795
                    old_guid = job.get_guid()
8✔
796
                    if old_guid not in self.urlwatcher.ssdb_storage.get_guids():
8✔
797
                        print(f'No snapshots found for "{old_loc}"')
8✔
798
                        return 1
8✔
799
                    job.set_base_location(new_loc)
8✔
800
                    num_searched = self.urlwatcher.ssdb_storage.move(old_guid, job.get_guid())
8✔
801
                    if num_searched:
8!
802
                        print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}"')
8✔
803
                else:
804
                    print(f'Job not found: "{self.urlwatch_config.change_location[0]}"')
×
805
                    return 1
×
806
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
8✔
807
            if not input(message).lower().startswith('y'):
8!
808
                print(f'Please manually update the jobs file by replacing "{old_loc}" with "{new_loc}".')
×
809
            else:
810
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
811

812
        return 0
8✔
813

814
    def edit_config(self) -> int:
8✔
815
        result = self.urlwatcher.config_storage.edit()
8✔
816
        return result
8✔
817

818
    def check_telegram_chats(self) -> None:
8✔
819
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
8✔
820

821
        bot_token = config['bot_token']
8✔
822
        if not bot_token:
8✔
823
            print('You need to set up your bot token first (see documentation)')
8✔
824
            self._exit(1)
8✔
825

826
        if httpx:
8!
827
            get_client = httpx.Client(http2=h2 is not None).get  # noqa: S113 Call to httpx without timeout
8✔
828
        else:
829
            get_client = requests.get  # type: ignore[assignment]
×
830

831
        info = get_client(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
8✔
832
        if not info['ok']:
8!
833
            print(f"Error with token {bot_token}: {info['description']}")
8✔
834
            self._exit(1)
8✔
835

836
        chats = {}
×
837
        updates = get_client(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
838
        if 'result' in updates:
×
839
            for chat_info in updates['result']:
×
840
                chat = chat_info['message']['chat']
×
841
                if chat['type'] == 'private':
×
842
                    chats[chat['id']] = (
×
843
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
844
                    )
845

846
        if not chats:
×
847
            print(f"No chats found. Say hello to your bot at https://t.me/{info['result']['username']}")
×
848
            self._exit(1)
×
849

850
        headers = ('Chat ID', 'Name')
×
851
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
852
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
853
        fmt = f'%-{maxchat}s  %s'
×
854
        print(fmt % headers)
×
855
        print(fmt % ('-' * maxchat, '-' * maxname))
×
856
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
857
            print(fmt % (k, v))
×
858
        print(f"\nChat up your bot here: https://t.me/{info['result']['username']}")
×
859

860
        self._exit(0)
×
861

862
    def check_test_reporter(
8✔
863
        self,
864
        job_state: JobState | None = None,
865
        label: str = 'test',
866
        report: Report | None = None,
867
    ) -> int:
868
        """
869
        Tests a reporter by creating pseudo-jobs of new, changed, unchanged, and error outcomes ('verb').
870

871
        Note: The report will only show new, unchanged and error content if enabled in the respective `display` keys
872
        of the configuration file.
873

874
        :param job_state: The JobState (Optional).
875
        :param label: The label to be used in the report; defaults to 'test'.
876
        :param report: A Report class to use for testing (Optional).
877
        :return: 0 if successful, 1 otherwise.
878
        """
879

880
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
8✔
881
            """Builds a pseudo-job for the reporter to run on."""
882
            job = JobBase.unserialize({'name': job_name, 'url': url})
8✔
883

884
            # Can pass in None for ssdb_storage, as we are not going to load or save the job state for
885
            # testing; also no need to use it as context manager, since no processing is called on the job
886
            job_state = JobState(None, job)  # type: ignore[arg-type]
8✔
887

888
            job_state.old_data = old
8✔
889
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
8✔
890
            job_state.new_data = new
8✔
891
            job_state.new_timestamp = time.time()
8✔
892

893
            return job_state
8✔
894

895
        def set_error(job_state: 'JobState', message: str) -> JobState:
8✔
896
            """Sets a job error message on a JobState."""
897
            try:
8✔
898
                raise ValueError(message)
8✔
899
            except ValueError as e:
8✔
900
                job_state.exception = e
8✔
901
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
902

903
            return job_state
8✔
904

905
        reporter_name = self.urlwatch_config.test_reporter
8✔
906
        if reporter_name not in ReporterBase.__subclasses__:
8✔
907
            print(f'No such reporter: {reporter_name}')
8✔
908
            print(f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}\n')
8✔
909
            return 1
8✔
910

911
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][
8✔
912
            reporter_name  # type: ignore[literal-required]
913
        ]
914
        if job_state:  # we want a full report
8✔
915
            cfg['enabled'] = True
8✔
916
            self.urlwatcher.config_storage.config['display'][label] = True  # type: ignore[literal-required]
8✔
917
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
8✔
918
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
8✔
919
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
8✔
920
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
8✔
921
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
8✔
922
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
8✔
923
        if not cfg['enabled']:
8✔
924
            print(f'WARNING: Reporter being tested is not enabled: {reporter_name}')
8✔
925
            print('Will still attempt to test it, but this may not work')
8✔
926
            print(f'Use {__project_name__} --edit-config to configure reporters')
8✔
927
            cfg['enabled'] = True
8✔
928

929
        if report is None:
8✔
930
            report = Report(self.urlwatcher)
8✔
931

932
        if job_state:
8✔
933
            report.custom(job_state, label)  # type: ignore[arg-type]
8✔
934
        else:
935
            report.new(
8✔
936
                build_job(
937
                    'Sample job that was newly added',
938
                    'https://example.com/new',
939
                    '',
940
                    '',
941
                )
942
            )
943
            report.changed(
8✔
944
                build_job(
945
                    'Sample job where something changed',
946
                    'https://example.com/changed',
947
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
948
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
949
                )
950
            )
951
            report.unchanged(
8✔
952
                build_job(
953
                    'Sample job where nothing changed',
954
                    'http://example.com/unchanged',
955
                    'Same Old, Same Old\n',
956
                    'Same Old, Same Old\n',
957
                )
958
            )
959
            report.error(
8✔
960
                set_error(
961
                    build_job(
962
                        'Sample job where an error was encountered',
963
                        'https://example.com/error',
964
                        '',
965
                        '',
966
                    ),
967
                    'The error message would appear here.',
968
                )
969
            )
970

971
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
8✔
972

973
        return 0
8✔
974

975
    def check_smtp_login(self) -> None:
8✔
976
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
8✔
977
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
8✔
978

979
        success = True
8✔
980

981
        if not config['enabled']:
8✔
982
            print('Please enable email reporting in the config first.')
8✔
983
            success = False
8✔
984

985
        if config['method'] != 'smtp':
8✔
986
            print('Please set the method to SMTP for the email reporter.')
8✔
987
            success = False
8✔
988

989
        smtp_auth = smtp_config['auth']
8✔
990
        if not smtp_auth:
8✔
991
            print('Authentication must be enabled for SMTP.')
8✔
992
            success = False
8✔
993

994
        smtp_hostname = smtp_config['host']
8✔
995
        if not smtp_hostname:
8✔
996
            print('Please configure the SMTP hostname in the config first.')
8✔
997
            success = False
8✔
998

999
        smtp_username = smtp_config['user'] or config['from']
8✔
1000
        if not smtp_username:
8✔
1001
            print('Please configure the SMTP user in the config first.')
8✔
1002
            success = False
8✔
1003

1004
        if not success:
8✔
1005
            self._exit(1)
8✔
1006

1007
        insecure_password = smtp_config['insecure_password']
2✔
1008
        if insecure_password:
2!
1009
            print('The SMTP password is set in the config file (key "insecure_password")')
2✔
1010
        elif smtp_have_password(smtp_hostname, smtp_username):
×
1011
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
1012
            if not input(message).lower().startswith('y'):
×
1013
                print('Password unchanged.')
×
1014
            else:
1015
                smtp_set_password(smtp_hostname, smtp_username)
×
1016

1017
        smtp_port = smtp_config['port']
2✔
1018
        smtp_tls = smtp_config['starttls']
2✔
1019

1020
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
2✔
1021
        print('Trying to log into the SMTP server...')
2✔
1022
        mailer.send(None)
2✔
1023
        print('Successfully logged into SMTP server')
×
1024

1025
        self._exit(0)
×
1026

1027
    def check_xmpp_login(self) -> None:
8✔
1028
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
8✔
1029

1030
        success = True
8✔
1031

1032
        if not xmpp_config['enabled']:
8✔
1033
            print('Please enable XMPP reporting in the config first.')
8✔
1034
            success = False
8✔
1035

1036
        xmpp_sender = xmpp_config['sender']
8✔
1037
        if not xmpp_sender:
8✔
1038
            print('Please configure the XMPP sender in the config first.')
8✔
1039
            success = False
8✔
1040

1041
        if not xmpp_config['recipient']:
8✔
1042
            print('Please configure the XMPP recipient in the config first.')
8✔
1043
            success = False
8✔
1044

1045
        if not success:
8✔
1046
            self._exit(1)
8✔
1047

1048
        if 'insecure_password' in xmpp_config:
8!
1049
            print('The XMPP password is already set in the config (key "insecure_password").')
8✔
1050
            self._exit(0)
8✔
1051

1052
        if xmpp_have_password(xmpp_sender):
×
1053
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
1054
            if input(message).lower() != 'y':
×
1055
                print('Password unchanged.')
×
1056
                self._exit(0)
×
1057

1058
        if success:
×
1059
            xmpp_set_password(xmpp_sender)
×
1060

1061
        self._exit(0)
×
1062

1063
    @staticmethod
2✔
1064
    def playwright_install_chrome() -> int:  # pragma: no cover
1065
        """
1066
        Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
1067
        install the browser executable.
1068

1069
        :return: Playwright's executable return code.
1070
        """
1071
        try:
1072
            from playwright._impl._driver import compute_driver_executable
1073
        except ImportError:  # pragma: no cover
1074
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
1075

1076
        driver_executable = compute_driver_executable()
1077
        env = os.environ.copy()
1078
        env['PW_CLI_TARGET_LANG'] = 'python'
1079
        cmd = [str(driver_executable), 'install', 'chrome']
1080
        logger.info(f"Running playwright CLI: {' '.join(cmd)}")
1081
        completed_process = subprocess.run(cmd, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
1082
        if completed_process.returncode:
1083
            print(completed_process.stderr)
1084
            return completed_process.returncode
1085
        if completed_process.stdout:
1086
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1087
        return 0
1088

1089
    def handle_actions(self) -> None:
8✔
1090
        """Handles the actions for command line arguments and exits."""
1091
        if self.urlwatch_config.list_jobs:
8✔
1092
            self.list_jobs(self.urlwatch_config.list_jobs)
8✔
1093
            self._exit(0)
8✔
1094

1095
        if self.urlwatch_config.errors:
8✔
1096
            self._exit(self.list_error_jobs())
8✔
1097

1098
        if self.urlwatch_config.test_job:
8✔
1099
            self.test_job(self.urlwatch_config.test_job)
8✔
1100
            self._exit(0)
8✔
1101

1102
        if self.urlwatch_config.test_differ:
8✔
1103
            self._exit(self.test_differ(self.urlwatch_config.test_differ))
8✔
1104

1105
        if self.urlwatch_config.dump_history:
8✔
1106
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
8✔
1107

1108
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
8✔
1109
            self._exit(self.modify_urls())
8✔
1110

1111
        if self.urlwatch_config.test_reporter:
8✔
1112
            self._exit(self.check_test_reporter())
8✔
1113

1114
        if self.urlwatch_config.smtp_login:
8✔
1115
            self.check_smtp_login()
8✔
1116

1117
        if self.urlwatch_config.telegram_chats:
8✔
1118
            self.check_telegram_chats()
8✔
1119

1120
        if self.urlwatch_config.xmpp_login:
8✔
1121
            self.check_xmpp_login()
8✔
1122

1123
        if self.urlwatch_config.edit:
8✔
1124
            self._exit(self.urlwatcher.jobs_storage.edit())
8✔
1125

1126
        if self.urlwatch_config.edit_config:
8✔
1127
            self._exit(self.edit_config())
8✔
1128

1129
        if self.urlwatch_config.edit_hooks:
8✔
1130
            self._exit(self.edit_hooks())
8✔
1131

1132
        if self.urlwatch_config.gc_database:
8✔
1133
            self.urlwatcher.ssdb_storage.gc(
8!
1134
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1135
            )
1136
            self.urlwatcher.ssdb_storage.close()
8✔
1137
            self._exit(0)
8✔
1138

1139
        if self.urlwatch_config.clean_database:
8✔
1140
            self.urlwatcher.ssdb_storage.clean_ssdb(
8!
1141
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1142
            )
1143
            self.urlwatcher.ssdb_storage.close()
8✔
1144
            self._exit(0)
8✔
1145

1146
        if self.urlwatch_config.rollback_database:
8✔
1147
            tz = self.urlwatcher.report.config['report']['tz']
8✔
1148
            self.urlwatcher.ssdb_storage.rollback_cache(self.urlwatch_config.rollback_database, tz)
8✔
1149
            self.urlwatcher.ssdb_storage.close()
8✔
1150
            self._exit(0)
8✔
1151

1152
        if self.urlwatch_config.delete_snapshot:
8✔
1153
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
8✔
1154

1155
        if self.urlwatch_config.features:
8✔
1156
            self._exit(self.show_features())
8✔
1157

1158
        if self.urlwatch_config.detailed_versions:
8!
1159
            self._exit(self.show_detailed_versions())
8✔
1160

1161
    def run(self) -> None:  # pragma: no cover
1162
        """The main run logic."""
1163
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1164
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1165

1166
        self.handle_actions()
1167

1168
        self.urlwatcher.run_jobs()
1169

1170
        self.urlwatcher.close()
1171

1172
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc