• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 13106214272

03 Feb 2025 04:10AM UTC coverage: 75.479% (+0.09%) from 75.393%
13106214272

push

github

mborsetti
Version 3.27.0rc0

1721 of 2606 branches covered (66.04%)

Branch coverage included in aggregate %.

4537 of 5685 relevant lines covered (79.81%)

6.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.3
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import difflib
8✔
8
import email.utils
8✔
9
import gc
8✔
10
import importlib.metadata
8✔
11
import logging
8✔
12
import os
8✔
13
import platform
8✔
14
import re
8✔
15
import shutil
8✔
16
import sqlite3
8✔
17
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
18
import sys
8✔
19
import time
8✔
20
import traceback
8✔
21
from concurrent.futures import ThreadPoolExecutor
8✔
22
from contextlib import ExitStack
8✔
23
from datetime import datetime
8✔
24
from pathlib import Path
8✔
25
from typing import Iterable, Iterator, TYPE_CHECKING
8✔
26
from urllib.parse import unquote_plus
8✔
27
from zoneinfo import ZoneInfo
8✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
8✔
30
from webchanges.differs import DifferBase
8✔
31
from webchanges.filters import FilterBase
8✔
32
from webchanges.handler import JobState, Report
8✔
33
from webchanges.jobs import JobBase, NotModifiedError, UrlJob
8✔
34
from webchanges.mailer import smtp_have_password, smtp_set_password, SMTPMailer
8✔
35
from webchanges.reporters import ReporterBase, xmpp_have_password, xmpp_set_password
8✔
36
from webchanges.util import dur_text, edit_file, import_module_from_source
8✔
37

38
try:
8✔
39
    import httpx
8✔
40
except ImportError:  # pragma: no cover
41
    httpx = None  # type: ignore[assignment]
42
    print("Required package 'httpx' not found; will attempt to run using 'requests'")
43
    try:
44
        import requests
45
    except ImportError as e:  # pragma: no cover
46
        raise RuntimeError(
47
            f"A Python HTTP client package (either 'httpx' or 'requests' is required to run {__project_name__}; "
48
            'neither can be imported.'
49
        ) from e
50
if httpx is not None:
8!
51
    try:
8✔
52
        import h2
8✔
53
    except ImportError:  # pragma: no cover
54
        h2 = None  # type: ignore[assignment]
55

56
try:
8✔
57
    import apt
8✔
58
except ImportError:  # pragma: no cover
59
    apt = None  # type: ignore[assignment]
60

61
try:
8✔
62
    from pip._internal.metadata import get_default_environment
8✔
63
except ImportError:  # pragma: no cover
64
    get_default_environment = None  # type: ignore[assignment]
65

66
try:
8✔
67
    from playwright.sync_api import sync_playwright
8✔
68
except ImportError:  # pragma: no cover
69
    sync_playwright = None  # type: ignore[assignment]
70

71
try:
8✔
72
    import psutil
8✔
73
    from psutil._common import bytes2human
8✔
74
except ImportError:  # pragma: no cover
75
    psutil = None  # type: ignore[assignment]
76
    bytes2human = None  # type: ignore[assignment]
77

78
logger = logging.getLogger(__name__)
8✔
79

80
if TYPE_CHECKING:
81
    from webchanges.main import Urlwatch
82
    from webchanges.reporters import _ConfigReportersList
83
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
84

85

86
class UrlwatchCommand:
8✔
87
    """The class that runs the program after initialization and CLI arguments parsing."""
88

89
    def __init__(self, urlwatcher: Urlwatch) -> None:
8✔
90
        self.urlwatcher = urlwatcher
8✔
91
        self.urlwatch_config = urlwatcher.urlwatch_config
8✔
92

93
    @staticmethod
8✔
94
    def _exit(arg: str | int | None) -> None:
8✔
95
        logger.info(f'Exiting with exit code {arg}')
8✔
96
        sys.exit(arg)
8✔
97

98
    def jobs_from_joblist(self) -> Iterator[JobBase]:
8✔
99
        """Generates the jobs to process from the joblist entered in the CLI."""
100
        if self.urlwatcher.urlwatch_config.joblist:
8✔
101
            jobs = {self._find_job(job_entry) for job_entry in self.urlwatcher.urlwatch_config.joblist}
8!
102
            enabled_jobs = {job for job in jobs if job.is_enabled()}
8!
103
            disabled = len(enabled_jobs) - len(jobs)
8✔
104
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
105
            logger.debug(
8✔
106
                f"Processing {len(enabled_jobs)} job{'s' if len(enabled_jobs) else ''}{disabled_str} as specified in "
107
                f"command line: {', '.join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}"
108
            )
109
        else:
110
            enabled_jobs = {job for job in self.urlwatcher.jobs if job.is_enabled()}
8!
111
            disabled = len(enabled_jobs) - len(self.urlwatcher.jobs)
8✔
112
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
113
            logger.debug(f"Processing {len(enabled_jobs)} job{'s' if len(enabled_jobs) else ''}{disabled_str}")
8✔
114
        for job in enabled_jobs:
8✔
115
            yield job.with_defaults(self.urlwatcher.config_storage.config)
8✔
116

117
    def edit_hooks(self) -> int:
8✔
118
        """Edit hooks file.
119

120
        :returns: 0 if edit is successful, 1 otherwise.
121
        """
122
        # Similar code to BaseTextualFileStorage.edit()
123
        for hooks_file in self.urlwatch_config.hooks_files:
8✔
124
            logger.debug(f'Edit file {hooks_file}')
8✔
125
            # Python 3.9: hooks_edit = self.urlwatch_config.hooks.with_stem(self.urlwatch_config.hooks.stem + '_edit')
126
            hooks_edit = hooks_file.parent.joinpath(hooks_file.stem + '_edit' + ''.join(hooks_file.suffixes))
8✔
127
            if hooks_file.exists():
8!
128
                shutil.copy(hooks_file, hooks_edit)
8✔
129
            # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
130
            #         self.urlwatch_config.hooks_py_example):
131
            #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
132

133
            while True:
8✔
134
                try:
8✔
135
                    edit_file(hooks_edit)
8✔
136
                    import_module_from_source('hooks', hooks_edit)
8✔
137
                    break  # stop if no exception on parser
8✔
138
                except SystemExit:
8!
139
                    raise
×
140
                except Exception as e:
8✔
141
                    print('Parsing failed:')
8✔
142
                    print('======')
8✔
143
                    print(e)
8✔
144
                    print('======')
8✔
145
                    print('')
8✔
146
                    print(f'The file {hooks_file} was NOT updated.')
8✔
147
                    user_input = input('Do you want to retry the same edit? (Y/n)')
8✔
148
                    if not user_input or user_input.lower()[0] == 'y':
×
149
                        continue
×
150
                    hooks_edit.unlink()
×
151
                    print('No changes have been saved.')
×
152
                    return 1
×
153

154
            if hooks_file.is_symlink():
8!
155
                hooks_file.write_text(hooks_edit.read_text())
×
156
            else:
157
                hooks_edit.replace(hooks_file)
8✔
158
            hooks_edit.unlink(missing_ok=True)
8✔
159
            print(f'Saved edits in {hooks_file}')
8✔
160

161
        return 0
8✔
162

163
    @staticmethod
8✔
164
    def show_features() -> int:
8✔
165
        """
166
        Prints the "features", i.e. a list of job types, filters and reporters.
167

168
        :return: 0.
169
        """
170
        print(f'Please see full documentation at {__docs_url__}')
8✔
171
        print()
8✔
172
        print('Supported jobs:\n')
8✔
173
        print(JobBase.job_documentation())
8✔
174
        print('Supported filters:\n')
8✔
175
        print(FilterBase.filter_documentation())
8✔
176
        print()
8✔
177
        print('Supported differs:\n')
8✔
178
        print(DifferBase.differ_documentation())
8✔
179
        print()
8✔
180
        print('Supported reporters:\n')
8✔
181
        print(ReporterBase.reporter_documentation())
8✔
182
        print()
8✔
183
        print(f'Please see full documentation at {__docs_url__}')
8✔
184

185
        return 0
8✔
186

187
    @staticmethod
8✔
188
    def show_detailed_versions() -> int:
8✔
189
        """
190
        Prints the detailed versions, including of dependencies.
191

192
        :return: 0.
193
        """
194

195
        def dependencies() -> list[str]:
8✔
196
            if get_default_environment is not None:
8!
197
                env = get_default_environment()
8✔
198
                dist = None
8✔
199
                for dist in env.iter_all_distributions():
8✔
200
                    if dist.canonical_name == __project_name__:
8!
201
                        break
×
202
                if dist and dist.canonical_name == __project_name__:
8!
203
                    return sorted(set(d.split()[0] for d in dist.metadata_dict['requires_dist']), key=str.lower)
×
204

205
            # default list of all possible dependencies
206
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
8✔
207
            return [
8✔
208
                'aioxmpp',
209
                'beautifulsoup4',
210
                'chump',
211
                'colorama',
212
                'cryptography',
213
                'cssbeautifier',
214
                'cssselect',
215
                'deepdiff',
216
                'h2',
217
                'html2text',
218
                'httpx',
219
                'jq',
220
                'jsbeautifier',
221
                'keyring',
222
                'lxml',
223
                'markdown2',
224
                'matrix_client',
225
                'msgpack',
226
                'pdftotext',
227
                'Pillow',
228
                'platformdirs',
229
                'playwright',
230
                'psutil',
231
                'pushbullet.py',
232
                'pypdf',
233
                'pytesseract',
234
                'pyyaml',
235
                'redis',
236
                'requests',
237
                'tzdata',
238
                'vobject',
239
            ]
240

241
        print('Software:')
8✔
242
        print(f'• {__project_name__}: {__version__}')
8✔
243
        print(
8✔
244
            f'• {platform.python_implementation()}: {platform.python_version()} '
245
            f'{platform.python_build()} {platform.python_compiler()}'
246
        )
247
        print(f'• SQLite: {sqlite3.sqlite_version}')
8✔
248

249
        if psutil:
8!
250
            print()
8✔
251
            print('System:')
8✔
252
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
8✔
253
            print(f'• Processor: {platform.processor()}')
8✔
254
            print(f'• CPUs (logical): {psutil.cpu_count()}')
8✔
255
            try:
8✔
256
                virt_mem = psutil.virtual_memory().available
8✔
257
                print(
8✔
258
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
259
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
260
                )
261
            except psutil.Error as e:  # pragma: no cover
262
                print(f'• Free memory: Could not read information: {e}')
263
            print(
8✔
264
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
265
                f"({100 - psutil.disk_usage('/').percent:.1f}%)"
266
            )
267
            executor = ThreadPoolExecutor()
8✔
268
            print(f'• --max-threads default: {executor._max_workers}')
8✔
269

270
        print()
8✔
271
        print('Installed PyPi dependencies:')
8✔
272
        for module_name in dependencies():
8✔
273
            try:
8✔
274
                mod = importlib.metadata.distribution(module_name)
8✔
275
            except ModuleNotFoundError:
8✔
276
                continue
8✔
277
            print(f'• {module_name}: {mod.version}')
8✔
278
            # package requirements
279
            if mod.requires:
8✔
280
                for req_name in [i.split()[0] for i in mod.requires]:
8!
281
                    try:
8✔
282
                        req = importlib.metadata.distribution(req_name)
8✔
283
                    except ModuleNotFoundError:
8✔
284
                        continue
8✔
285
                    print(f'  - {req_name}: {req.version}')
8✔
286

287
        # playwright
288
        if sync_playwright is not None:
8!
289
            with sync_playwright() as p:
8!
290
                browser = p.chromium.launch(channel='chrome')
×
291
                print()
×
292
                print('Playwright browser:')
×
293
                print(f'• Name: {browser.browser_type.name}')
×
294
                print(f'• Version: {browser.version}')
×
295
                if psutil:
×
296
                    browser.new_page()
×
297
                    try:
×
298
                        virt_mem = psutil.virtual_memory().available
×
299
                        print(
×
300
                            f'• Free memory with browser loaded: {bytes2human(virt_mem)} physical plus '
301
                            f'{bytes2human(psutil.swap_memory().free)} swap'
302
                        )
303
                    except psutil.Error:
×
304
                        pass
×
305

306
        if os.name == 'posix' and apt:
8!
307
            apt_cache = apt.Cache()
×
308

309
            def print_version(libs: list[str]) -> None:
×
310
                for lib in libs:
×
311
                    if lib in apt_cache:
×
312
                        if ver := apt_cache[lib].versions:
×
313
                            print(f'   - {ver[0].package}: {ver[0].version}')
×
314
                return None
×
315

316
            print()
×
317
            print('Installed dpkg dependencies:')
×
318
            for module, apt_dists in (
×
319
                ('jq', ['jq']),
320
                # https://github.com/jalan/pdftotext#os-dependencies
321
                ('pdftotext', ['libpoppler-cpp-dev']),
322
                # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
323
                (
324
                    'Pillow',
325
                    [
326
                        'libjpeg-dev',
327
                        'zlib-dev',
328
                        'zlib1g-dev',
329
                        'libtiff-dev',
330
                        'libfreetype-dev',
331
                        'littlecms-dev',
332
                        'libwebp-dev',
333
                        'tcl/tk-dev',
334
                        'openjpeg-dev',
335
                        'libimagequant-dev',
336
                        'libraqm-dev',
337
                        'libxcb-dev',
338
                        'libxcb1-dev',
339
                    ],
340
                ),
341
                ('playwright', ['google-chrome-stable']),
342
                # https://tesseract-ocr.github.io/tessdoc/Installation.html
343
                ('pytesseract', ['tesseract-ocr']),
344
            ):
345
                try:
×
346
                    importlib.metadata.distribution(module)
×
347
                    print(f'• {module}')
×
348
                    print_version(apt_dists)
×
349
                except importlib.metadata.PackageNotFoundError:
×
350
                    pass
×
351
        return 0
8✔
352

353
    def list_jobs(self, regex: bool | str) -> None:
8✔
354
        """
355
        Lists the job and their respective _index_number.
356

357
        :return: None.
358
        """
359
        if isinstance(regex, str):
8!
360
            print(f"List of jobs matching the RegEx '{regex}':")
×
361
        else:
362
            print('List of jobs:')
8✔
363
        for job in self.urlwatcher.jobs:
8✔
364
            if self.urlwatch_config.verbose:
8✔
365
                job_desc = f'{job.index_number:3}: {job!r}'
8✔
366
            else:
367
                pretty_name = job.pretty_name()
8✔
368
                location = job.get_location()
8✔
369
                if pretty_name != location:
8!
370
                    job_desc = f'{job.index_number:3}: {pretty_name} ({location})'
8✔
371
                else:
372
                    job_desc = f'{job.index_number:3}: {pretty_name}'
×
373
            if isinstance(regex, bool) or re.findall(regex, job_desc):
8!
374
                print(job_desc)
8✔
375

376
        if len(self.urlwatch_config.jobs_files) > 1:
8✔
377
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
8!
378
        elif len(self.urlwatch_config.jobs_files) == 1:
8✔
379
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
8✔
380
        else:
381
            jobs_files = []
8✔
382
        print('\n   '.join(jobs_files))
8✔
383

384
    def _find_job(self, query: str | int) -> JobBase:
8✔
385
        """Finds the job based on a query, which is matched to the job index (also negative) or a job location
386
        (i.e. the url/user_visible_url or command).
387

388
        :param query: The query.
389
        :return: The matching JobBase.
390
        :raises IndexError: If job is not found.
391
        """
392
        if isinstance(query, int):
8✔
393
            index = query
8✔
394
        else:
395
            try:
8✔
396
                index = int(query)
8✔
397
            except ValueError:
8✔
398
                query = unquote_plus(query)
8✔
399
                try:
8✔
400
                    return next((job for job in self.urlwatcher.jobs if unquote_plus(job.get_location()) == query))
8✔
401
                except StopIteration:
8✔
402
                    raise ValueError(f"Job {query} does not match any job's url/user_visible_url or command.") from None
8✔
403

404
        if index == 0:
8✔
405
            raise ValueError(f'Job index {index} out of range.')
8✔
406
        try:
8✔
407
            if index <= 0:
8✔
408
                return self.urlwatcher.jobs[index]
8✔
409
            else:
410
                return self.urlwatcher.jobs[index - 1]
8✔
411
        except IndexError as e:
8✔
412
            raise ValueError(f'Job index {index} out of range (found {len(self.urlwatcher.jobs)} jobs).') from e
8✔
413

414
    def _find_job_with_defaults(self, query: str | int) -> JobBase:
8✔
415
        """
416
        Returns the job with defaults based on job_id, which could match an index or match a location
417
        (url/user_visible_url or command). Accepts negative numbers.
418

419
        :param query: The query.
420
        :return: The matching JobBase with defaults.
421
        :raises SystemExit: If job is not found.
422
        """
423
        job = self._find_job(query)
8✔
424
        return job.with_defaults(self.urlwatcher.config_storage.config)
8✔
425

426
    def test_job(self, job_id: bool | str | int) -> None:
8✔
427
        """
428
        Tests the running of a single job outputting the filtered text to stdout. If job_id is True, don't run any
429
        jobs but load config, jobs and hook files to trigger any syntax errors.
430

431
        :param job_id: The job_id or True.
432

433
        :return: None.
434

435
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
436
        """
437
        if job_id is True:  # Load to trigger any eventual syntax errors
8✔
438
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
8✔
439
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
8✔
440
            if len(self.urlwatch_config.jobs_files) == 1:
8✔
441
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]},')
8✔
442
            else:
443
                message.append(
8!
444
                    '\n   '.join(
445
                        [f'{conj}jobs files'] + [f'• {file},' for file in sorted(self.urlwatch_config.jobs_files)]
446
                    )
447
                )
448
            if 'hooks' in sys.modules:
8!
449
                message.append(f"\nand hooks file {sys.modules['hooks'].__file__}")
8✔
450
            print(f"{''.join(message)}.")
8✔
451
            return
8✔
452

453
        job = self._find_job_with_defaults(job_id)
8✔
454

455
        if isinstance(job, UrlJob):
8!
456
            # Force re-retrieval of job, as we're testing filters
457
            job.ignore_cached = True
×
458

459
        # Add defaults, as if when run
460
        job = job.with_defaults(self.urlwatcher.config_storage.config)
8✔
461

462
        with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
463
            job_state.process(headless=not self.urlwatch_config.no_headless)
8✔
464
            # duration = time.perf_counter() - start
465
            if self.urlwatch_config.test_reporter is None:
8✔
466
                self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
467
            report = Report(self.urlwatcher)
8✔
468
            report.job_states = []  # required
8✔
469
            errorlevel = self.check_test_reporter(
8✔
470
                job_state,
471
                label='error' if job_state.exception else 'new',
472
                report=report,
473
            )
474
            if errorlevel:
8!
475
                self._exit(errorlevel)
×
476
        return
8✔
477

478
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
479
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
480

481
    def prepare_jobs(self) -> None:
8✔
482
        """
483
        Runs jobs that have no history to populate the snapshot database when they're newly added.
484
        """
485
        new_jobs = []
8✔
486
        for idx, job in enumerate(self.urlwatcher.jobs):
8✔
487
            has_history = bool(self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid()))
8✔
488
            if not has_history:
8!
489
                print(f'Adding new {job.get_indexed_location()}')
8✔
490
                new_jobs.append(idx + 1)
8✔
491
        if not new_jobs:
8!
492
            print('Found no new jobs to run.')
×
493
            return
×
494
        self.urlwatcher.urlwatch_config.joblist = new_jobs
8✔
495
        self.urlwatcher.run_jobs()
8✔
496
        self.urlwatcher.close()
8✔
497
        return
8✔
498

499
    def test_differ(self, arg_test_differ: list[str]) -> int:
8✔
500
        """
501
        Runs diffs for a job on all the saved snapshots and outputs the result to stdout or whatever reporter is
502
        selected with --test-reporter.
503

504
        :param arg_test_differ: Either the job_id or a list containing [job_id, max_diffs]
505
        :return: 1 if error, 0 if successful.
506
        """
507
        report = Report(self.urlwatcher)
8✔
508
        self.urlwatch_config.jobs_files = [Path('--test-differ')]  # for report footer
8✔
509
        if len(arg_test_differ) == 1:
8✔
510
            job_id = arg_test_differ[0]
8✔
511
            max_diffs = None
8✔
512
        elif len(arg_test_differ) == 2:
8!
513
            job_id, max_diffs_str = arg_test_differ
8✔
514
            max_diffs = int(max_diffs_str)
8✔
515
        else:
516
            raise ValueError('--test-differ takes a maximum of two arguments')
×
517

518
        job = self._find_job_with_defaults(job_id)
8✔
519

520
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid())
8✔
521

522
        num_snapshots = len(history_data)
8✔
523
        if num_snapshots == 0:
8✔
524
            print('This job has never been run before.')
8✔
525
            return 1
8✔
526
        elif num_snapshots < 2:
8✔
527
            print('Not enough historic data available (need at least 2 different snapshots).')
8✔
528
            return 1
8✔
529

530
        if job.compared_versions and job.compared_versions != 1:
8!
531
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
532

533
        max_diffs = max_diffs or num_snapshots - 1
8✔
534
        for i in range(max_diffs):
8✔
535
            with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
536
                job_state.new_data = history_data[i].data
8✔
537
                job_state.new_timestamp = history_data[i].timestamp
8✔
538
                job_state.new_etag = history_data[i].etag
8✔
539
                job_state.new_mime_type = history_data[i].mime_type
8✔
540
                if not job.compared_versions or job.compared_versions == 1:
8!
541
                    job_state.old_data = history_data[i + 1].data
8✔
542
                    job_state.old_timestamp = history_data[i + 1].timestamp
8✔
543
                    job_state.old_etag = history_data[i + 1].etag
8✔
544
                    job_state.old_mime_type = history_data[i + 1].mime_type
8✔
545
                else:
546
                    history_dic_snapshots = {s.data: s for s in history_data[i + 1 : i + 1 + job.compared_versions]}
×
547
                    close_matches: list[str] = difflib.get_close_matches(
×
548
                        str(job_state.new_data), history_dic_snapshots.keys(), n=1  # type: ignore[arg-type]
549
                    )
550
                    if close_matches:
×
551
                        job_state.old_data = close_matches[0]
×
552
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
553
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
554
                        job_state.old_mime_type = history_dic_snapshots[close_matches[0]].mime_type
×
555

556
                if self.urlwatch_config.test_reporter is None:
8✔
557
                    self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
558
                report.job_states = []  # required
8✔
559
                if job_state.new_data == job_state.old_data:
8!
560
                    label = (
×
561
                        f'No change (snapshots {-i:2} AND {-(i + 1):2}) with '
562
                        f"'compared_versions: {job.compared_versions}'"
563
                    )
564
                    job_state.verb = 'changed,no_report'
×
565
                else:
566
                    label = f'Filtered diff (snapshots {-i:2} and {-(i + 1):2})'
8✔
567
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
8✔
568
                if errorlevel:
8!
569
                    self._exit(errorlevel)
×
570

571
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
572
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
573

574
        return 0
8✔
575

576
    def dump_history(self, job_id: str) -> int:
8✔
577
        """
578
        Displays the historical data stored in the snapshot database for a job.
579

580
        :param job_id: The Job ID.
581
        :return: An argument to be used in sys.exit.
582
        """
583

584
        job = self._find_job_with_defaults(job_id)
8✔
585
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.get_guid())
8✔
586

587
        print(f'History for job {job.get_indexed_location()}:')
8✔
588
        print(f'(ID: {job.get_guid()})')
8✔
589
        total_failed = 0
8✔
590
        if history_data:
8✔
591
            print('=' * 50)
8✔
592
        for i, snapshot in enumerate(history_data):
8✔
593
            mime_type = f'; {snapshot.mime_type}' if snapshot.mime_type else ''
8✔
594
            etag = f'; ETag: {snapshot.etag}' if snapshot.etag else ''
8✔
595
            tries = f'; error run (number {snapshot.tries})' if snapshot.tries else ''
8✔
596
            total_failed += snapshot.tries > 0
8✔
597
            tz = self.urlwatcher.report.config['report']['tz']
8✔
598
            tzinfo = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
8✔
599
            dt = datetime.fromtimestamp(snapshot.timestamp, tzinfo)
8✔
600
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{mime_type}{etag}{tries}'
8✔
601
            sep_len = max(50, len(header))
8✔
602
            print(header)
8✔
603
            print('-' * sep_len)
8✔
604
            if snapshot.error_data:
8!
605
                print(f"{snapshot.error_data['type']}: {snapshot.error_data['message']}")
×
606
                print()
×
607
                print('Last good data:')
×
608
            print(snapshot.data)
8✔
609
            print('=' * sep_len, '\n')
8✔
610

611
        print(
8✔
612
            f'Found {len(history_data) - total_failed}'
613
            + (' good' if total_failed else '')
614
            + ' snapshot'
615
            + ('s' if len(history_data) - total_failed != 1 else '')
616
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
617
            + '.'
618
        )
619

620
        return 0
8✔
621

622
    def list_error_jobs(self) -> int:
8✔
623
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
8✔
624
            print(f'Invalid reporter {self.urlwatch_config.errors}')
8✔
625
            return 1
8✔
626

627
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Iterator[str]:
8✔
628
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
629

630
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
631
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
632
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
633
            """
634

635
            def job_runner(
8✔
636
                stack: ExitStack,
637
                jobs: Iterable[JobBase],
638
                max_workers: int | None = None,
639
            ) -> Iterator[str]:
640
                """
641
                Modified worker.job_runner that yields error text for jobs who fail with an exception or yield no data.
642

643
                :param stack: The context manager.
644
                :param jobs: The jobs to run.
645
                :param max_workers: The number of maximum workers for ThreadPoolExecutor.
646
                :return: error text for jobs who fail with an exception or yield no data.
647
                """
648
                executor = ThreadPoolExecutor(max_workers=max_workers)
8✔
649

650
                for job_state in executor.map(
8✔
651
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
652
                    (stack.enter_context(JobState(self.urlwatcher.ssdb_storage, job)) for job in jobs),
653
                ):
654
                    if job_state.exception is None or isinstance(job_state.exception, NotModifiedError):
8✔
655
                        if (
8!
656
                            len(job_state.new_data.strip()) == 0
657
                            if hasattr(job_state, 'new_data')
658
                            else len(job_state.old_data.strip()) == 0
659
                        ):
660
                            if self.urlwatch_config.verbose:
×
661
                                yield f'{job_state.job.index_number:3}: No data: {job_state.job!r}'
×
662
                            else:
663
                                pretty_name = job_state.job.pretty_name()
×
664
                                location = job_state.job.get_location()
×
665
                                if pretty_name != location:
×
666
                                    yield f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})'
×
667
                                else:
668
                                    yield f'{job_state.job.index_number:3}: No data: {pretty_name}'
×
669
                    else:
670
                        pretty_name = job_state.job.pretty_name()
8✔
671
                        location = job_state.job.get_location()
8✔
672
                        if pretty_name != location:
8!
673
                            yield (
8✔
674
                                f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
675
                                f'({location})'
676
                            )
677
                        else:
678
                            yield f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})'
×
679

680
            with ExitStack() as stack:
8✔
681
                # This code is from worker.run_jobs, modified to yield from job_runner.
682
                from webchanges.worker import get_virt_mem  # avoid circular imports
8✔
683

684
                # run non-BrowserJob jobs first
685
                jobs_to_run = [job for job in jobs if not job.__is_browser__]
8!
686
                if jobs_to_run:
8!
687
                    logger.debug(
8✔
688
                        "Running jobs that do not require Chrome (without 'use_browser: true') in parallel with "
689
                        "Python's default max_workers."
690
                    )
691
                    yield from job_runner(stack, jobs_to_run, self.urlwatch_config.max_workers)
8✔
692
                else:
693
                    logger.debug("Found no jobs that do not require Chrome (i.e. without 'use_browser: true').")
×
694

695
                # run BrowserJob jobs after
696
                jobs_to_run = [job for job in jobs if job.__is_browser__]
8!
697
                if jobs_to_run:
8!
698
                    gc.collect()
×
699
                    virt_mem = get_virt_mem()
×
700
                    if self.urlwatch_config.max_workers:
×
701
                        max_workers = self.urlwatch_config.max_workers
×
702
                    else:
703
                        max_workers = max(int(virt_mem / 200e6), 1)
×
704
                        max_workers = min(max_workers, os.cpu_count() or 1)
×
705
                    logger.debug(
×
706
                        f"Running jobs that require Chrome (i.e. with 'use_browser: true') in parallel with "
707
                        f'{max_workers} max_workers.'
708
                    )
709
                    yield from job_runner(stack, jobs_to_run, max_workers)
×
710
                else:
711
                    logger.debug("Found no jobs that require Chrome (i.e. with 'use_browser: true').")
8✔
712

713
        start = time.perf_counter()
8✔
714
        if len(self.urlwatch_config.jobs_files) == 1:
8!
715
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
8✔
716
        else:
717
            jobs_files = ['in the concatenation of the jobs files'] + [
×
718
                f'• {file},' for file in self.urlwatch_config.jobs_files
719
            ]
720
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)'] + jobs_files)
8✔
721

722
        jobs = {
8!
723
            job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs if job.is_enabled()
724
        }
725
        if self.urlwatch_config.errors == 'stdout':
8!
726
            print(header)
8✔
727
            for line in error_jobs_lines(jobs):
8✔
728
                print(line)
8✔
729
            print('--')
8✔
730
            duration = time.perf_counter() - start
8✔
731
            print(f"Checked {len(jobs)} enabled job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
8✔
732

733
        else:
734
            message = '\n'.join(error_jobs_lines(jobs))
×
735
            if message:
×
736
                # create a dummy job state to run a reporter on
737
                job_state = JobState(
×
738
                    None,  # type: ignore[arg-type]
739
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
740
                )
741
                job_state.traceback = f'{header}\n{message}'
×
742
                duration = time.perf_counter() - start
×
743
                self.urlwatcher.report.config['footnote'] = (
×
744
                    f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}."
745
                )
746
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
747
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
748
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
749
                self.urlwatcher.report.error(job_state)
×
750
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
751
            else:
752
                print(header)
×
753
                print('--')
×
754
                duration = time.perf_counter() - start
×
755
                print('Found no errors')
×
756
                print(f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
×
757

758
        return 0
8✔
759

760
    def delete_snapshot(self, job_id: str | int) -> int:
8✔
761
        job = self._find_job_with_defaults(job_id)
8✔
762

763
        deleted = self.urlwatcher.ssdb_storage.delete_latest(job.get_guid())
8✔
764
        if deleted:
8✔
765
            print(f'Deleted last snapshot of {job.get_indexed_location()}')
8✔
766
            return 0
8✔
767
        else:
768
            print(f'No snapshots found to be deleted for {job.get_indexed_location()}')
8✔
769
            return 1
8✔
770

771
    def modify_urls(self) -> int:
8✔
772
        if self.urlwatch_config.delete is not None:
8✔
773
            job = self._find_job(self.urlwatch_config.delete)
8✔
774
            if job is not None:
8!
775
                self.urlwatcher.jobs.remove(job)
8✔
776
                print(f'Removed {job}')
8✔
777
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
778
            else:
779
                print(f'Job not found: {self.urlwatch_config.delete}')
×
780
                return 1
×
781

782
        if self.urlwatch_config.add is not None:
8✔
783
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
784
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
8!
785
            filters = [v for k, v in items if k == 'filter']
8!
786
            items2 = [(k, v) for k, v in items if k != 'filter']
8!
787
            d = {k: v for k, v in items2}
8!
788
            if filters:
8!
789
                d['filter'] = ','.join(filters)
×
790

791
            job = JobBase.unserialize(d)
8✔
792
            print(f'Adding {job}')
8✔
793
            self.urlwatcher.jobs.append(job)
8✔
794
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
795

796
        if self.urlwatch_config.change_location is not None:
8✔
797
            new_loc = self.urlwatch_config.change_location[1]
8✔
798
            # Ensure the user isn't overwriting an existing job with the change.
799
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
8!
800
                print(
×
801
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
802
                    f'different value.\n'
803
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
804
                )
805
                return 1
×
806
            else:
807
                job = self._find_job(self.urlwatch_config.change_location[0])
8✔
808
                if job is not None:
8!
809
                    # Update the job's location (which will also update the guid) and move any history in the database
810
                    # over to the job's updated guid.
811
                    old_loc = job.get_location()
8✔
812
                    print(f'Moving location of "{old_loc}" to "{new_loc}"')
8✔
813
                    old_guid = job.get_guid()
8✔
814
                    if old_guid not in self.urlwatcher.ssdb_storage.get_guids():
8✔
815
                        print(f'No snapshots found for "{old_loc}"')
8✔
816
                        return 1
8✔
817
                    job.set_base_location(new_loc)
8✔
818
                    num_searched = self.urlwatcher.ssdb_storage.move(old_guid, job.get_guid())
8✔
819
                    if num_searched:
8!
820
                        print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}"')
8✔
821
                else:
822
                    print(f'Job not found: "{self.urlwatch_config.change_location[0]}"')
×
823
                    return 1
×
824
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
8✔
825
            if not input(message).lower().startswith('y'):
8!
826
                print(f'Please manually update the jobs file by replacing "{old_loc}" with "{new_loc}".')
×
827
            else:
828
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
829

830
        return 0
8✔
831

832
    def edit_config(self) -> int:
8✔
833
        result = self.urlwatcher.config_storage.edit()
8✔
834
        return result
8✔
835

836
    def check_telegram_chats(self) -> None:
8✔
837
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
8✔
838

839
        bot_token = config['bot_token']
8✔
840
        if not bot_token:
8✔
841
            print('You need to set up your bot token first (see documentation)')
8✔
842
            self._exit(1)
8✔
843

844
        if httpx:
8!
845
            get_client = httpx.Client(http2=h2 is not None).get  # noqa: S113 Call to httpx without timeout
8✔
846
        else:
847
            get_client = requests.get  # type: ignore[assignment]
×
848

849
        info = get_client(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
8✔
850
        if not info['ok']:
8!
851
            print(f"Error with token {bot_token}: {info['description']}")
8✔
852
            self._exit(1)
8✔
853

854
        chats = {}
×
855
        updates = get_client(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
856
        if 'result' in updates:
×
857
            for chat_info in updates['result']:
×
858
                chat = chat_info['message']['chat']
×
859
                if chat['type'] == 'private':
×
860
                    chats[chat['id']] = (
×
861
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
862
                    )
863

864
        if not chats:
×
865
            print(f"No chats found. Say hello to your bot at https://t.me/{info['result']['username']}")
×
866
            self._exit(1)
×
867

868
        headers = ('Chat ID', 'Name')
×
869
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
870
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
871
        fmt = f'%-{maxchat}s  %s'
×
872
        print(fmt % headers)
×
873
        print(fmt % ('-' * maxchat, '-' * maxname))
×
874
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
875
            print(fmt % (k, v))
×
876
        print(f"\nChat up your bot here: https://t.me/{info['result']['username']}")
×
877

878
        self._exit(0)
×
879

880
    def check_test_reporter(
8✔
881
        self,
882
        job_state: JobState | None = None,
883
        label: str = 'test',
884
        report: Report | None = None,
885
    ) -> int:
886
        """
887
        Tests a reporter by creating pseudo-jobs of new, changed, unchanged, and error outcomes ('verb').
888

889
        Note: The report will only show new, unchanged and error content if enabled in the respective `display` keys
890
        of the configuration file.
891

892
        :param job_state: The JobState (Optional).
893
        :param label: The label to be used in the report; defaults to 'test'.
894
        :param report: A Report class to use for testing (Optional).
895
        :return: 0 if successful, 1 otherwise.
896
        """
897

898
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
8✔
899
            """Builds a pseudo-job for the reporter to run on."""
900
            job = JobBase.unserialize({'name': job_name, 'url': url})
8✔
901

902
            # Can pass in None for ssdb_storage, as we are not going to load or save the job state for
903
            # testing; also no need to use it as context manager, since no processing is called on the job
904
            job_state = JobState(None, job)  # type: ignore[arg-type]
8✔
905

906
            job_state.old_data = old
8✔
907
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
8✔
908
            job_state.new_data = new
8✔
909
            job_state.new_timestamp = time.time()
8✔
910

911
            return job_state
8✔
912

913
        def set_error(job_state: 'JobState', message: str) -> JobState:
8✔
914
            """Sets a job error message on a JobState."""
915
            try:
8✔
916
                raise ValueError(message)
8✔
917
            except ValueError as e:
8✔
918
                job_state.exception = e
8✔
919
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
920

921
            return job_state
8✔
922

923
        reporter_name = self.urlwatch_config.test_reporter
8✔
924
        if reporter_name not in ReporterBase.__subclasses__:
8✔
925
            print(f'No such reporter: {reporter_name}')
8✔
926
            print(f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}\n')
8✔
927
            return 1
8✔
928

929
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][
8✔
930
            reporter_name  # type: ignore[literal-required]
931
        ]
932
        if job_state:  # we want a full report
8✔
933
            cfg['enabled'] = True
8✔
934
            self.urlwatcher.config_storage.config['display'][label] = True  # type: ignore[literal-required]
8✔
935
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
8✔
936
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
8✔
937
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
8✔
938
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
8✔
939
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
8✔
940
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
8✔
941
        if not cfg['enabled']:
8✔
942
            print(f'WARNING: Reporter being tested is not enabled: {reporter_name}')
8✔
943
            print('Will still attempt to test it, but this may not work')
8✔
944
            print(f'Use {__project_name__} --edit-config to configure reporters')
8✔
945
            cfg['enabled'] = True
8✔
946

947
        if report is None:
8✔
948
            report = Report(self.urlwatcher)
8✔
949

950
        if job_state:
8✔
951
            report.custom(job_state, label)  # type: ignore[arg-type]
8✔
952
        else:
953
            report.new(
8✔
954
                build_job(
955
                    'Sample job that was newly added',
956
                    'https://example.com/new',
957
                    '',
958
                    '',
959
                )
960
            )
961
            report.changed(
8✔
962
                build_job(
963
                    'Sample job where something changed',
964
                    'https://example.com/changed',
965
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
966
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
967
                )
968
            )
969
            report.unchanged(
8✔
970
                build_job(
971
                    'Sample job where nothing changed',
972
                    'http://example.com/unchanged',
973
                    'Same Old, Same Old\n',
974
                    'Same Old, Same Old\n',
975
                )
976
            )
977
            report.error(
8✔
978
                set_error(
979
                    build_job(
980
                        'Sample job where an error was encountered',
981
                        'https://example.com/error',
982
                        '',
983
                        '',
984
                    ),
985
                    'The error message would appear here.',
986
                )
987
            )
988

989
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
8✔
990

991
        return 0
8✔
992

993
    def check_smtp_login(self) -> None:
8✔
994
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
8✔
995
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
8✔
996

997
        success = True
8✔
998

999
        if not config['enabled']:
8✔
1000
            print('Please enable email reporting in the config first.')
8✔
1001
            success = False
8✔
1002

1003
        if config['method'] != 'smtp':
8✔
1004
            print('Please set the method to SMTP for the email reporter.')
8✔
1005
            success = False
8✔
1006

1007
        smtp_auth = smtp_config['auth']
8✔
1008
        if not smtp_auth:
8✔
1009
            print('Authentication must be enabled for SMTP.')
8✔
1010
            success = False
8✔
1011

1012
        smtp_hostname = smtp_config['host']
8✔
1013
        if not smtp_hostname:
8✔
1014
            print('Please configure the SMTP hostname in the config first.')
8✔
1015
            success = False
8✔
1016

1017
        smtp_username = smtp_config['user'] or config['from']
8✔
1018
        if not smtp_username:
8✔
1019
            print('Please configure the SMTP user in the config first.')
8✔
1020
            success = False
8✔
1021

1022
        if not success:
8✔
1023
            self._exit(1)
8✔
1024

1025
        insecure_password = smtp_config['insecure_password']
2✔
1026
        if insecure_password:
2!
1027
            print('The SMTP password is set in the config file (key "insecure_password")')
2✔
1028
        elif smtp_have_password(smtp_hostname, smtp_username):
×
1029
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
1030
            if not input(message).lower().startswith('y'):
×
1031
                print('Password unchanged.')
×
1032
            else:
1033
                smtp_set_password(smtp_hostname, smtp_username)
×
1034

1035
        smtp_port = smtp_config['port']
2✔
1036
        smtp_tls = smtp_config['starttls']
2✔
1037

1038
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
2✔
1039
        print('Trying to log into the SMTP server...')
2✔
1040
        mailer.send(None)
2✔
1041
        print('Successfully logged into SMTP server')
×
1042

1043
        self._exit(0)
×
1044

1045
    def check_xmpp_login(self) -> None:
8✔
1046
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
8✔
1047

1048
        success = True
8✔
1049

1050
        if not xmpp_config['enabled']:
8✔
1051
            print('Please enable XMPP reporting in the config first.')
8✔
1052
            success = False
8✔
1053

1054
        xmpp_sender = xmpp_config['sender']
8✔
1055
        if not xmpp_sender:
8✔
1056
            print('Please configure the XMPP sender in the config first.')
8✔
1057
            success = False
8✔
1058

1059
        if not xmpp_config['recipient']:
8✔
1060
            print('Please configure the XMPP recipient in the config first.')
8✔
1061
            success = False
8✔
1062

1063
        if not success:
8✔
1064
            self._exit(1)
8✔
1065

1066
        if 'insecure_password' in xmpp_config:
8!
1067
            print('The XMPP password is already set in the config (key "insecure_password").')
8✔
1068
            self._exit(0)
8✔
1069

1070
        if xmpp_have_password(xmpp_sender):
×
1071
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
1072
            if input(message).lower() != 'y':
×
1073
                print('Password unchanged.')
×
1074
                self._exit(0)
×
1075

1076
        if success:
×
1077
            xmpp_set_password(xmpp_sender)
×
1078

1079
        self._exit(0)
×
1080

1081
    @staticmethod
2✔
1082
    def playwright_install_chrome() -> int:  # pragma: no cover
1083
        """
1084
        Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
1085
        install the browser executable.
1086

1087
        :return: Playwright's executable return code.
1088
        """
1089
        try:
1090
            from playwright._impl._driver import compute_driver_executable
1091
        except ImportError:  # pragma: no cover
1092
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
1093

1094
        driver_executable = compute_driver_executable()
1095
        env = os.environ.copy()
1096
        env['PW_CLI_TARGET_LANG'] = 'python'
1097
        cmd = [str(driver_executable), 'install', 'chrome']
1098
        logger.info(f"Running playwright CLI: {' '.join(cmd)}")
1099
        completed_process = subprocess.run(cmd, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
1100
        if completed_process.returncode:
1101
            print(completed_process.stderr)
1102
            return completed_process.returncode
1103
        if completed_process.stdout:
1104
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1105
        return 0
1106

1107
    def handle_actions(self) -> None:
8✔
1108
        """Handles the actions for command line arguments and exits."""
1109
        if self.urlwatch_config.list_jobs:
8✔
1110
            self.list_jobs(self.urlwatch_config.list_jobs)
8✔
1111
            self._exit(0)
8✔
1112

1113
        if self.urlwatch_config.errors:
8✔
1114
            self._exit(self.list_error_jobs())
8✔
1115

1116
        if self.urlwatch_config.test_job:
8✔
1117
            self.test_job(self.urlwatch_config.test_job)
8✔
1118
            self._exit(0)
8✔
1119

1120
        if self.urlwatch_config.prepare_jobs:
8✔
1121
            self.prepare_jobs()
8✔
1122
            self._exit(0)
8✔
1123

1124
        if self.urlwatch_config.test_differ:
8✔
1125
            self._exit(self.test_differ(self.urlwatch_config.test_differ))
8✔
1126

1127
        if self.urlwatch_config.dump_history:
8✔
1128
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
8✔
1129

1130
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
8✔
1131
            self._exit(self.modify_urls())
8✔
1132

1133
        if self.urlwatch_config.test_reporter:
8✔
1134
            self._exit(self.check_test_reporter())
8✔
1135

1136
        if self.urlwatch_config.smtp_login:
8✔
1137
            self.check_smtp_login()
8✔
1138

1139
        if self.urlwatch_config.telegram_chats:
8✔
1140
            self.check_telegram_chats()
8✔
1141

1142
        if self.urlwatch_config.xmpp_login:
8✔
1143
            self.check_xmpp_login()
8✔
1144

1145
        if self.urlwatch_config.edit:
8✔
1146
            self._exit(self.urlwatcher.jobs_storage.edit())
8✔
1147

1148
        if self.urlwatch_config.edit_config:
8✔
1149
            self._exit(self.edit_config())
8✔
1150

1151
        if self.urlwatch_config.edit_hooks:
8✔
1152
            self._exit(self.edit_hooks())
8✔
1153

1154
        if self.urlwatch_config.gc_database:
8✔
1155
            self.urlwatcher.ssdb_storage.gc(
8!
1156
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1157
            )
1158
            self.urlwatcher.ssdb_storage.close()
8✔
1159
            self._exit(0)
8✔
1160

1161
        if self.urlwatch_config.clean_database:
8✔
1162
            self.urlwatcher.ssdb_storage.clean_ssdb(
8!
1163
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1164
            )
1165
            self.urlwatcher.ssdb_storage.close()
8✔
1166
            self._exit(0)
8✔
1167

1168
        if self.urlwatch_config.rollback_database:
8✔
1169
            tz = self.urlwatcher.report.config['report']['tz']
8✔
1170
            self.urlwatcher.ssdb_storage.rollback_cache(self.urlwatch_config.rollback_database, tz)
8✔
1171
            self.urlwatcher.ssdb_storage.close()
8✔
1172
            self._exit(0)
8✔
1173

1174
        if self.urlwatch_config.delete_snapshot:
8✔
1175
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
8✔
1176

1177
        if self.urlwatch_config.features:
8✔
1178
            self._exit(self.show_features())
8✔
1179

1180
        if self.urlwatch_config.detailed_versions:
8!
1181
            self._exit(self.show_detailed_versions())
8✔
1182

1183
    def run(self) -> None:  # pragma: no cover
1184
        """The main run logic."""
1185
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1186
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1187

1188
        self.handle_actions()
1189

1190
        self.urlwatcher.run_jobs()
1191

1192
        self.urlwatcher.close()
1193

1194
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc