• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 24920572770

25 Apr 2026 02:35AM UTC coverage: 72.339% (-0.6%) from 72.893%
24920572770

push

github

mborsetti
Version 3.35.0rc0

1454 of 2420 branches covered (60.08%)

Branch coverage included in aggregate %.

129 of 209 new or added lines in 7 files covered. (61.72%)

62 existing lines in 1 file now uncovered.

5126 of 6676 relevant lines covered (76.78%)

11.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.25
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE.md file, which is part of the source code.
4

5
from __future__ import annotations
15✔
6

7
import difflib
15✔
8
import email.utils
15✔
9
import gc
15✔
10
import importlib.metadata
15✔
11
import logging
15✔
12
import os
15✔
13
import platform
15✔
14
import re
15✔
15
import shutil
15✔
16
import sqlite3
15✔
17
import subprocess
15✔
18
import sys
15✔
19
import time
15✔
20
import traceback
15✔
21
from concurrent.futures import ThreadPoolExecutor
15✔
22
from contextlib import ExitStack
15✔
23
from datetime import datetime, tzinfo
15✔
24
from pathlib import Path
15✔
25
from typing import TYPE_CHECKING, Iterable, Iterator
15✔
26
from urllib.parse import unquote_plus
15✔
27
from zoneinfo import ZoneInfo
15✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
15✔
30
from webchanges.handler import JobState, Report
15✔
31
from webchanges.jobs import JobBase, NotModifiedError, UrlJob
15✔
32
from webchanges.util import dur_text, edit_file, import_module_from_source
15✔
33

34
try:
15✔
35
    import httpx
15✔
36
except ImportError:  # pragma: no cover
37
    httpx = None  # ty:ignore[invalid-assignment]
38
    print("Required package 'httpx' not found; will attempt to run using 'requests'.")
39
    try:
40
        import requests
41
    except ImportError as e:  # pragma: no cover
42
        raise RuntimeError(
43
            f"A Python HTTP client package (either 'httpx' or 'requests' is required to run {__project_name__}; "
44
            'neither can be imported.'
45
        ) from e
46
if httpx is not None:
15✔
47
    try:
15✔
48
        import h2
15✔
49
    except ImportError:  # pragma: no cover
50
        h2 = None  # ty:ignore[invalid-assignment]
51

52
if TYPE_CHECKING:
53
    from webchanges.main import Urlwatch
54
    from webchanges.reporters._base import _ConfigReportersList
55
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
56

57
logger = logging.getLogger(__name__)
15✔
58

59

60
class UrlwatchCommand:
15✔
61
    """The class that runs the program after initialization and CLI arguments parsing."""
62

63
    def __init__(self, urlwatcher: Urlwatch) -> None:
15✔
64
        self.urlwatcher = urlwatcher
15✔
65
        self.urlwatch_config = urlwatcher.urlwatch_config
15✔
66

67
    def _exit(self, arg: str | int | None) -> None:
15✔
68
        logger.info(f'Exiting with exit code {arg}')
15✔
69

70
        self.urlwatcher.ssdb_storage.close()
15✔
71
        sys.exit(arg)
15✔
72

73
    def jobs_from_joblist(self) -> Iterator[JobBase]:
15✔
74
        """Generates the jobs to process from the joblist entered in the CLI."""
75
        if self.urlwatcher.urlwatch_config.joblist:
15✔
76
            jobs = [self._find_job(job_entry) for job_entry in self.urlwatcher.urlwatch_config.joblist]
15✔
77
            enabled_jobs = [job for job in jobs if job.is_enabled()]
15✔
78
            disabled = len(enabled_jobs) - len(jobs)
15✔
79
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
15✔
80
            logger.debug(
15✔
81
                f'Processing {len(enabled_jobs)} job{"s" if enabled_jobs else ""}{disabled_str} as specified in '
82
                f'command line: {", ".join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}'
83
            )
84
        else:
85
            enabled_jobs = [job for job in self.urlwatcher.jobs if job.is_enabled()]
15✔
86
            disabled = len(enabled_jobs) - len(self.urlwatcher.jobs)
15✔
87
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
15✔
88
            logger.debug(f'Processing {len(enabled_jobs)} job{"s" if enabled_jobs else ""}{disabled_str}')
15✔
89
        for job in enabled_jobs:
15✔
90
            yield job.with_defaults(self.urlwatcher.config_storage.config)
15✔
91

92
    def edit_hooks(self) -> int:
15✔
93
        """Edit hooks file.
94

95
        :returns: 0 if edit is successful, 1 otherwise.
96
        """
97
        # Similar code to BaseTextualFileStorage.edit()
98
        for hooks_file in self.urlwatch_config.hooks_files:
15✔
99
            logger.debug(f'Edit file {hooks_file}')
15✔
100
            hooks_edit = hooks_file.with_stem(hooks_file.stem + '_edit')
15✔
101
            if hooks_file.exists():
15!
102
                shutil.copy(hooks_file, hooks_edit)
15✔
103
            # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
104
            #         self.urlwatch_config.hooks_py_example):
105
            #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
106

107
            while True:
15✔
108
                try:
15✔
109
                    edit_file(hooks_edit)
15✔
110
                    import_module_from_source('hooks', hooks_edit)
15✔
111
                    break  # stop if no exception on parser
15✔
112
                except SystemExit:
15✔
113
                    raise
×
114
                except Exception as e:  # noqa: BLE001 Do not catch blind exception: `Exception`
15✔
115
                    print('Parsing failed:')
15✔
116
                    print('======')
15✔
117
                    print(e)
15✔
118
                    print('======')
15✔
119
                    print()
15✔
120
                    print(f'The file {hooks_file} was NOT updated.')
15✔
121
                    user_input = input('Do you want to retry the same edit? (Y/n)')
15✔
122
                    if not user_input or user_input.lower()[0] == 'y':
×
123
                        continue
×
124
                    hooks_edit.unlink()
×
125
                    print('No changes have been saved.')
×
126
                    return 1
×
127

128
            if hooks_file.is_symlink():
15!
129
                hooks_file.write_text(hooks_edit.read_text())
×
130
            else:
131
                hooks_edit.replace(hooks_file)
15✔
132
            hooks_edit.unlink(missing_ok=True)
15✔
133
            print(f'Saved edits in {hooks_file}.')
15✔
134

135
        return 0
15✔
136

137
    @staticmethod
15✔
138
    def show_features() -> int:
15✔
139
        """Prints the "features", i.e. a list of job types, filters and reporters.
140

141
        :return: 0.
142
        """
143
        from webchanges.differs import DifferBase
15✔
144
        from webchanges.filters import FilterBase
15✔
145
        from webchanges.reporters import ReporterBase
15✔
146

147
        print(f'Please see full documentation at {__docs_url__}.')
15✔
148
        print()
15✔
149
        print('Supported jobs:\n')
15✔
150
        print(JobBase.job_documentation())
15✔
151
        print('Supported filters:\n')
15✔
152
        print(FilterBase.filter_documentation())
15✔
153
        print()
15✔
154
        print('Supported differs:\n')
15✔
155
        print(DifferBase.differ_documentation())
15✔
156
        print()
15✔
157
        print('Supported reporters:\n')
15✔
158
        print(ReporterBase.reporter_documentation())
15✔
159
        print()
15✔
160
        print(f'Please see full documentation at {__docs_url__}.')
15✔
161

162
        return 0
15✔
163

164
    @staticmethod
15✔
165
    def show_detailed_versions() -> int:
15✔
166
        """Prints the detailed versions, including of dependencies.
167

168
        :return: 0.
169
        """
170

171
        def dependencies() -> list[str]:
15✔
172
            try:
15✔
173
                from pip._internal.metadata import get_default_environment
15✔
174

175
                env = get_default_environment()
15✔
176
                dist = None
15✔
177
                for dist in env.iter_all_distributions():
15✔
178
                    if dist.canonical_name == __project_name__:
15!
179
                        break
×
180
                if dist and dist.canonical_name == __project_name__:
15!
181
                    requires_dist = dist.metadata_dict.get('requires_dist', [])
×
182
                    dependencies = [re.split('[ <>=;#^[]', d)[0] for d in requires_dist]
×
183
                    dependencies.extend(('packaging', 'simplejson'))
×
184
                    return sorted(dependencies, key=str.lower)
×
185
            except ImportError:
186
                pass
187

188
            # default list of all possible dependencies
189
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
15✔
190
            return [
15✔
191
                'aioxmpp',
192
                'beautifulsoup4',
193
                'chump',
194
                'colorama',
195
                'cryptography',
196
                'cssbeautifier',
197
                'cssselect',
198
                'curl_cffi',
199
                'deepdiff',
200
                'h2',
201
                'html2text',
202
                'httpx',
203
                'jq',
204
                'jsbeautifier',
205
                'keyring',
206
                'lxml',
207
                'markdown2',
208
                'matrix_client',
209
                'msgpack',
210
                'packaging',
211
                'pdftotext',
212
                'Pillow',
213
                'platformdirs',
214
                'playwright',
215
                'psutil',
216
                'pushbullet.py',
217
                'pypdf',
218
                'pytesseract',
219
                'pyyaml',
220
                'redis',
221
                'requests',
222
                'simplejson',
223
                'tzdata',
224
                'vobject',
225
            ]
226

227
        print('Software:')
15✔
228
        print(f'• {__project_name__}: {__version__}')
15✔
229
        print(
15✔
230
            f'• {platform.python_implementation()}: {platform.python_version()} '
231
            f'{platform.python_build()} {platform.python_compiler()}'
232
        )
233
        print(f'• SQLite: {sqlite3.sqlite_version}')
15✔
234

235
        try:
15✔
236
            import psutil
15✔
237
            from psutil._common import bytes2human
15✔
238

239
            print()
15✔
240
            print('System:')
15✔
241
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
15✔
242
            print(f'• Processor: {platform.processor()}')
15✔
243
            print(f'• CPUs (logical): {psutil.cpu_count()}')
15✔
244
            try:
15✔
245
                virt_mem = psutil.virtual_memory().available
15✔
246
                print(
15✔
247
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
248
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
249
                )
250
            except psutil.Error as e:  # pragma: no cover
251
                print(f'• Free memory: Could not read information: {e}')
252
            print(
15✔
253
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
254
                f'({100 - psutil.disk_usage("/").percent:.1f}%)'
255
            )
256
            executor = ThreadPoolExecutor()
15✔
257
            print(f'• --max-threads default: {executor._max_workers}')
15✔
258
        except ImportError:
259
            pass
260

261
        print()
15✔
262
        print('Relevant PyPi packages:')
15✔
263
        for module_name in dependencies():
15✔
264
            try:
15✔
265
                mod = importlib.metadata.distribution(module_name)
15✔
266
            except ModuleNotFoundError:
15✔
267
                continue
15✔
268
            print(f'• {module_name}: {mod.version}')
15✔
269
            # package requirements
270
            if mod.requires:
15✔
271
                for req_name in [i.split()[0] for i in mod.requires]:
15✔
272
                    try:
15✔
273
                        req = importlib.metadata.distribution(req_name)
15✔
274
                    except ModuleNotFoundError:
15✔
275
                        continue
15✔
276
                    print(f'  - {req_name}: {req.version}')
15✔
277

278
        # playwright
279
        try:
15✔
280
            from playwright.sync_api import Error as PlaywrightError
15✔
281
            from playwright.sync_api import sync_playwright
12✔
282

283
            with sync_playwright() as p:
12✔
284
                try:
3✔
285
                    print()
3✔
286
                    print('Playwright browser:')
3✔
287
                    browser = p.chromium.launch(channel='chrome')
3✔
288
                    print(f'• Name: {browser.browser_type.name}')
3✔
289
                    print(f'• Version: {browser.version}')
3✔
290
                    print(f'• Executable: {browser.browser_type.executable_path}')
3✔
291
                    if psutil:
3!
292
                        browser.new_page()
3✔
293
                        try:
3✔
294
                            virt_mem = psutil.virtual_memory().available
3✔
295
                            print(
3✔
296
                                f'• Free memory with browser loaded: '
297
                                f'{bytes2human(virt_mem)} physical plus '
298
                                f'{bytes2human(psutil.swap_memory().free)} swap'
299
                            )
300
                        except psutil.Error:
×
301
                            pass
×
302
                except PlaywrightError as e:
×
303
                    print()
×
304
                    print('Playwright browser:')
×
305
                    print(f'• Error: {e}')
×
306
        except ImportError:
307
            pass
308

309
        if os.name == 'posix':
15✔
310
            print()
10✔
311
            print('Installed dpkg dependencies:')
10✔
312
            try:
10✔
313
                import apt
10✔
314

315
                apt_cache = apt.Cache()
×
316

317
                def print_version(libs: list[str]) -> None:
×
318
                    for lib in libs:
×
319
                        if lib in apt_cache and apt_cache[lib].versions:
×
320
                            ver = apt_cache[lib].versions
×
321
                            print(f'   - {ver[0].package}: {ver[0].version}')
×
322

323
                installed_packages = {dist.metadata['Name'] for dist in importlib.metadata.distributions()}
×
324
                for module, apt_dists in (
×
325
                    ('jq', ['jq']),
326
                    # https://github.com/jalan/pdftotext#os-dependencies
327
                    ('pdftotext', ['libpoppler-cpp-dev']),
328
                    # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
329
                    (
330
                        'Pillow',
331
                        [
332
                            'libjpeg-dev',
333
                            'zlib-dev',
334
                            'zlib1g-dev',
335
                            'libtiff-dev',
336
                            'libfreetype-dev',
337
                            'littlecms-dev',
338
                            'libwebp-dev',
339
                            'tcl/tk-dev',
340
                            'openjpeg-dev',
341
                            'libimagequant-dev',
342
                            'libraqm-dev',
343
                            'libxcb-dev',
344
                            'libxcb1-dev',
345
                        ],
346
                    ),
347
                    ('playwright', ['google-chrome-stable']),
348
                    # https://tesseract-ocr.github.io/tessdoc/Installation.html
349
                    ('pytesseract', ['tesseract-ocr']),
350
                ):
351
                    if module in installed_packages:
×
352
                        importlib.metadata.distribution(module)
×
353
                        print(f'• {module}')
×
354
                        print_version(apt_dists)
×
355
            except ImportError:
356
                print('Dependencies cannot be printed as python3-apt is not installed.')
357
                print("Run 'sudo apt-get install python3-apt' to install.")
358
        print()
15✔
359
        return 0
15✔
360

361
    def list_jobs(self, regex: bool | str) -> None:
15✔
362
        """Lists the job and their respective _index_number.
363

364
        :return: None.
365
        """
366
        if isinstance(regex, str):
15!
367
            print(f"List of jobs matching the RegEx '{regex}':")
×
368
        else:
369
            print('List of jobs:')
15✔
370
        for job in self.urlwatcher.jobs:
15✔
371
            if self.urlwatch_config.verbose:
15✔
372
                job_desc = f'{job.index_number:3}: {job!r}'
15✔
373
            else:
374
                pretty_name = job.pretty_name()
15✔
375
                location = job.get_location()
15✔
376
                if pretty_name != location:
15!
377
                    job_desc = f'{job.index_number:3}: {pretty_name} ({location})'
15✔
378
                else:
379
                    job_desc = f'{job.index_number:3}: {pretty_name}'
×
380
            if isinstance(regex, bool) or re.findall(regex, job_desc):
15!
381
                print(job_desc)
15✔
382

383
        if len(self.urlwatch_config.jobs_files) > 1:
15✔
384
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
15✔
385
        elif len(self.urlwatch_config.jobs_files) == 1:
15✔
386
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
15✔
387
        else:
388
            jobs_files = []
15✔
389
        print('\n   '.join(jobs_files))
15✔
390

391
    def _find_job(self, query: str | int) -> JobBase:
15✔
392
        """Finds the job based on a query.
393

394
        It is matched to the job index (also negative) or a job location (i.e. the url/user_visible_url or command).
395

396
        :param query: The query.
397
        :return: The matching JobBase.
398
        :raises IndexError: If job is not found.
399
        """
400
        if isinstance(query, int):
15✔
401
            index = query
15✔
402
        else:
403
            try:
15✔
404
                index = int(query)
15✔
405
            except ValueError:
15✔
406
                query = unquote_plus(query)
15✔
407
                try:
15✔
408
                    return next((job for job in self.urlwatcher.jobs if unquote_plus(job.get_location()) == query))
15✔
409
                except StopIteration:
15✔
410
                    raise ValueError(f"Job {query} does not match any job's url/user_visible_url or command.") from None
15✔
411

412
        if index == 0:
15✔
413
            raise ValueError(f'Job index {index} out of range.')
15✔
414
        try:
15✔
415
            if index <= 0:
15✔
416
                return self.urlwatcher.jobs[index]
15✔
417
            return self.urlwatcher.jobs[index - 1]
15✔
418
        except IndexError as e:
15✔
419
            raise ValueError(f'Job index {index} out of range (found {len(self.urlwatcher.jobs)} jobs).') from e
15✔
420

421
    def _find_job_with_defaults(self, query: str | int) -> JobBase:
15✔
422
        """Returns the job with defaults based on job_id.
423

424
        This could match an index or a location (url/user_visible_url or command). Accepts negative numbers.
425

426
        :param query: The query.
427
        :return: The matching JobBase with defaults.
428
        :raises SystemExit: If job is not found.
429
        """
430
        job = self._find_job(query)
15✔
431
        return job.with_defaults(self.urlwatcher.config_storage.config)
15✔
432

433
    def test_job(self, job_id: bool | str | int) -> None:
15✔
434
        """Tests the running of a single job outputting the filtered text to --test-reporter (default is stdout).
435

436
        If job_id is True, don't run any jobs but load config, jobs and hook files to trigger any syntax errors.
437

438
        :param job_id: The job_id or True.
439

440
        :return: None.
441

442
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
443
        """
444
        if job_id is True:  # Load to trigger any eventual syntax errors
15✔
445
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
15✔
446
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
15✔
447
            if len(self.urlwatch_config.jobs_files) == 1:
15✔
448
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]},')
15✔
449
            else:
450
                message.append(
15✔
451
                    '\n   '.join(
452
                        [f'{conj}jobs files'] + [f'• {file},' for file in sorted(self.urlwatch_config.jobs_files)]
453
                    )
454
                )
455
            if 'hooks' in sys.modules:
15!
456
                message.append(f'\nand hooks file {sys.modules["hooks"].__file__}')
15✔
457
            print(f'{"".join(message)}.')
15✔
458
            return
15✔
459

460
        job = self._find_job_with_defaults(job_id)
15✔
461

462
        if isinstance(job, UrlJob):
15!
463
            # Force re-retrieval of job, as we're testing filters
464
            job.ignore_cached = True
×
465

466
        with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
15✔
467
            # duration = time.perf_counter() - start
468
            job_state.process(headless=not self.urlwatch_config.no_headless)
15✔
469
            if job_state.job.name is None:
15!
470
                job_state.job.name = ''
×
471
            # if job_state.job.note is None:
472
            #     job_state.job.note = ''
473
            data_info = '\n'.join(
15✔
474
                filter(
475
                    None,
476
                    (
477
                        f'• [GUID: {job_state.job.guid}]',
478
                        f'• [Media type: {job_state.new_mime_type}]' if job_state.new_mime_type else None,
479
                        f'• [ETag: {job_state.new_etag}]' if job_state.new_etag else None,
480
                        f'\nERROR {job_state.new_error_data["type"]}: {job_state.new_error_data["message"]}'
481
                        if job_state.new_error_data
482
                        else None,
483
                    ),
484
                )
485
            )
486
            job_state.new_data = f'{data_info}\n\n{job_state.new_data!s}'
15✔
487
            if self.urlwatch_config.test_reporter is None:
15✔
488
                self.urlwatch_config.test_reporter = 'stdout'  # default
15✔
489
            report = Report(self.urlwatcher)
15✔
490
            report.job_states = []  # required
15✔
491
            errorlevel = self.check_test_reporter(
15✔
492
                job_state,
493
                label='test',
494
                report=report,
495
            )
496
            if errorlevel:
15!
497
                self._exit(errorlevel)
×
498
        return
15✔
499

500
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
501
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
502

503
    def prepare_jobs(self) -> None:
15✔
504
        """Runs jobs that have no history to populate the snapshot database when they're newly added."""
505
        new_jobs = []
15✔
506
        for idx, job in enumerate(self.urlwatcher.jobs):
15✔
507
            has_history = bool(self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid))
15✔
508
            if not has_history:
15!
509
                print(f'Running new {job.get_indexed_location()}.')
15✔
510
                new_jobs.append(idx + 1)
15✔
511
        if not new_jobs and not self.urlwatch_config.joblist:
15!
512
            print('Found no new jobs to run.')
×
513
            return
×
514
        self.urlwatcher.urlwatch_config.joblist = list(self.urlwatcher.urlwatch_config.joblist) + new_jobs
15✔
515
        self.urlwatcher.run_jobs()
15✔
516
        if self.urlwatcher.report.config['display']['new']:
15!
UNCOV
517
            self.urlwatcher.report.finish(self.urlwatcher.jobs_storage.filename)
×
518
        return
15✔
519

520
    def test_differ(self, arg_test_differ: list[str]) -> int:
15✔
521
        """Runs diffs for a job on all the saved snapshots.
522

523
        Outputs the result to stdout or the reporter selected with --test-reporter.
524

525
        :param arg_test_differ: Either the job_id or a list containing [job_id, max_diffs]
526
        :return: 1 if error, 0 if successful.
527
        """
528
        report = Report(self.urlwatcher)
15✔
529
        self.urlwatch_config.jobs_files = [Path('--test-differ')]  # for report footer
15✔
530
        if len(arg_test_differ) == 1:
15✔
531
            job_id = arg_test_differ[0]
15✔
532
            max_diffs = None
15✔
533
        elif len(arg_test_differ) == 2:
15!
534
            job_id, max_diffs_str = arg_test_differ
15✔
535
            max_diffs = int(max_diffs_str)
15✔
536
        else:
UNCOV
537
            raise ValueError('--test-differ takes a maximum of two arguments')
×
538

539
        job = self._find_job_with_defaults(job_id)
15✔
540

541
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
15✔
542

543
        num_snapshots = len(history_data)
15✔
544
        if num_snapshots == 0:
15✔
545
            print('This job has never been run before.')
15✔
546
            return 1
15✔
547
        if num_snapshots < 2:
15✔
548
            print('Not enough historic data available (need at least 2 different snapshots).')
15✔
549
            return 1
15✔
550

551
        if job.compared_versions and job.compared_versions != 1:
15!
UNCOV
552
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
553

554
        max_diffs = max_diffs or num_snapshots - 1
15✔
555
        for i in range(max_diffs):
15✔
556
            with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
15✔
557
                job_state.new_data = history_data[i].data
15✔
558
                job_state.new_timestamp = history_data[i].timestamp
15✔
559
                job_state.new_etag = history_data[i].etag
15✔
560
                job_state.new_mime_type = history_data[i].mime_type
15✔
561
                if not job.compared_versions or job.compared_versions == 1:
15!
562
                    job_state.old_data = history_data[i + 1].data
15✔
563
                    job_state.old_timestamp = history_data[i + 1].timestamp
15✔
564
                    job_state.old_etag = history_data[i + 1].etag
15✔
565
                    job_state.old_mime_type = history_data[i + 1].mime_type
15✔
566
                else:
UNCOV
567
                    history_dic_snapshots = {s.data: s for s in history_data[i + 1 : i + 1 + job.compared_versions]}
×
UNCOV
568
                    close_matches: list[str] = difflib.get_close_matches(
×
569
                        str(job_state.new_data),
570
                        history_dic_snapshots.keys(),
571
                        n=1,
572
                    )  # ty:ignore[invalid-assignment]
573
                    if close_matches:
×
574
                        job_state.old_data = close_matches[0]
×
575
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
UNCOV
576
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
UNCOV
577
                        job_state.old_mime_type = history_dic_snapshots[close_matches[0]].mime_type
×
578

579
                if self.urlwatch_config.test_reporter is None:
15✔
580
                    self.urlwatch_config.test_reporter = 'stdout'  # default
15✔
581
                report.job_states = []  # required
15✔
582
                if job_state.new_data == job_state.old_data:
15!
UNCOV
583
                    label = (
×
584
                        f'No change (snapshots {-i:2} vs. {-(i + 1):2}) with '
585
                        f"'compared_versions: {job.compared_versions}'"
586
                    )
UNCOV
587
                    job_state.verb = 'changed,no_report'
×
588
                else:
589
                    label = f'Filtered diff (snapshots {-i:2} vs. {-(i + 1):2})'
15✔
590
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
15✔
591
                if errorlevel:
15!
UNCOV
592
                    self._exit(errorlevel)
×
593

594
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
595
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
596

597
        return 0
15✔
598

599
    def dump_history(self, job_id: str) -> int:
15✔
600
        """Displays the historical data stored in the snapshot database for a job.
601

602
        :param job_id: The Job ID.
603
        :return: An argument to be used in sys.exit.
604
        """
605
        try:
15✔
606
            job = self._find_job_with_defaults(job_id)
15✔
607
        except ValueError:
×
UNCOV
608
            print(f"No Job found matching '{job_id}'. Searching database using calculated GUID.")
×
UNCOV
609
            job = JobBase.unserialize({'url': job_id})
×
610

611
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
15✔
612

613
        title = f'History for {job.get_indexed_location()}'
15✔
614
        print(f'{title}\nGUID: {job.guid}')
15✔
615
        if history_data:
15✔
616
            print('=' * max(len(title), 46))
15✔
617
        total_failed = 0
15✔
618
        for i, snapshot in enumerate(history_data):
15✔
619
            mime_type = f' | Media type: {snapshot.mime_type}' if snapshot.mime_type else ''
15✔
620
            etag = f' | ETag: {snapshot.etag}' if snapshot.etag else ''
15✔
621
            tries = f' | Error run (number {snapshot.tries})' if snapshot.tries else ''
15✔
622
            total_failed += snapshot.tries > 0
15✔
623
            tz = self.urlwatcher.report.config['report']['tz']
15✔
624
            tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
15✔
625
            dt = datetime.fromtimestamp(snapshot.timestamp, tz_info)
15✔
626
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{mime_type}{etag}{tries}'
15✔
627
            sep_len = max(50, len(header))
15✔
628
            print(header)
15✔
629
            print('-' * sep_len)
15✔
630
            if snapshot.error_data:
15!
631
                print(f'{snapshot.error_data.get("type")}: {snapshot.error_data.get("message")}')
×
UNCOV
632
                print()
×
UNCOV
633
                print('Last good data:')
×
634
            print(snapshot.data)
15✔
635
            print('=' * sep_len, '\n')
15✔
636

637
        print(
15✔
638
            f'Found {len(history_data) - total_failed}'
639
            + (' good' if total_failed else '')
640
            + ' snapshot'
641
            + ('s' if len(history_data) - total_failed != 1 else '')
642
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
643
            + '.'
644
        )
645

646
        return 0
15✔
647

648
    def list_error_jobs(self) -> int:
15✔
649
        from webchanges.reporters import ReporterBase
15✔
650

651
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
15✔
652
            print(f'Invalid reporter {self.urlwatch_config.errors}.')
15✔
653
            return 1
15✔
654

655
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Iterator[str]:
15✔
656
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
657

658
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
659
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
660
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
661
            """
662

663
            def job_runner(
15✔
664
                stack: ExitStack,
665
                jobs: Iterable[JobBase],
666
                max_workers: int | None = None,
667
            ) -> Iterator[str]:
668
                """Modified worker.job_runner.
669

670
                Yields error text for jobs who fail with an exception or return no data.
671

672
                :param stack: The context manager.
673
                :param jobs: The jobs to run.
674
                :param max_workers: The number of maximum workers for ThreadPoolExecutor.
675
                :return: error text for jobs who fail with an exception or return no data.
676
                """
677
                executor = ThreadPoolExecutor(max_workers=max_workers)
15✔
678

679
                job_state: JobState
680
                for job_state in executor.map(
15✔
681
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
682
                    (stack.enter_context(JobState(self.urlwatcher.ssdb_storage, job)) for job in jobs),
683
                ):
684
                    if not isinstance(job_state.exception, NotModifiedError):
15!
685
                        if job_state.exception is None:
15✔
686
                            if (
15!
687
                                len(job_state.new_data.strip()) == 0
688
                                if hasattr(job_state, 'new_data')
689
                                else len(job_state.old_data.strip()) == 0
690
                            ):
UNCOV
691
                                if self.urlwatch_config.verbose:
×
692
                                    yield f'{job_state.job.index_number:3}: No data: {job_state.job!r}'
×
693
                                else:
694
                                    pretty_name = job_state.job.pretty_name()
×
695
                                    location = job_state.job.get_location()
×
UNCOV
696
                                    if pretty_name != location:
×
697
                                        yield f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})'
×
698
                                    else:
UNCOV
699
                                        yield f'{job_state.job.index_number:3}: No data: {pretty_name}'
×
700
                        else:
701
                            pretty_name = job_state.job.pretty_name()
15✔
702
                            location = job_state.job.get_location()
15✔
703
                            if pretty_name != location:
15!
704
                                yield (
15✔
705
                                    f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
706
                                    f'({location})'
707
                                )
708
                            else:
UNCOV
709
                                yield f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})'
×
710

711
            with ExitStack() as stack:
15✔
712
                # This code is from worker.run_jobs, modified to yield from job_runner.
713
                from webchanges.worker import get_virt_mem_mib  # avoid circular imports
15✔
714

715
                # run non-BrowserJob jobs first
716
                jobs_to_run = [job for job in jobs if not job.__is_browser__]
15✔
717
                if jobs_to_run:
15!
718
                    logger.debug(
15✔
719
                        "Running jobs that do not require a browser (without 'use_browser') in parallel with "
720
                        "Python's default max_workers."
721
                    )
722
                    yield from job_runner(stack, jobs_to_run, self.urlwatch_config.max_workers)
15✔
723
                else:
NEW
724
                    logger.debug("Found no jobs that do not require a browser (i.e. without 'use_browser').")
×
725

726
                # run BrowserJob jobs after
727
                jobs_to_run = [job for job in jobs if job.__is_browser__]
15✔
728
                if jobs_to_run:
15!
729
                    gc.collect()
×
730
                    virt_mem = get_virt_mem_mib()  # in MiB
×
731
                    virt_mem = virt_mem * 0.85  # reserve 15% for misc. overhead
×
UNCOV
732
                    if self.urlwatch_config.max_workers:
×
733
                        max_workers = self.urlwatch_config.max_workers
×
734
                    else:
735
                        max_workers = max(int(virt_mem / 800), 1)
×
UNCOV
736
                        max_workers = min(max_workers, os.cpu_count() or 1)
×
UNCOV
737
                    logger.debug(
×
738
                        f"Running jobs that require a browser (i.e. with 'use_browser') in parallel with "
739
                        f'{max_workers} max_workers.'
740
                    )
UNCOV
741
                    yield from job_runner(stack, jobs_to_run, max_workers)
×
742
                else:
743
                    logger.debug("Found no jobs that require a browser (i.e. with 'use_browser').")
15✔
744

745
        start = time.perf_counter()
15✔
746

747
        # default max_workers (when not specified) to 1
748
        if self.urlwatch_config.max_workers is None:
15!
749
            self.urlwatch_config.max_workers = 1
15✔
750

751
        if len(self.urlwatch_config.jobs_files) == 1:
15!
752
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
15✔
753
        else:
UNCOV
754
            jobs_files = ['in the concatenation of the jobs files'] + [
×
755
                f'• {file},' for file in self.urlwatch_config.jobs_files
756
            ]
757
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)', *jobs_files])
15✔
758

759
        jobs = {
15✔
760
            job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs if job.is_enabled()
761
        }
762
        if self.urlwatch_config.errors == 'stdout':
15!
763
            print(header)
15✔
764
            for line in error_jobs_lines(jobs):
15✔
765
                print(line)
15✔
766
            print('--')
15✔
767
            duration = time.perf_counter() - start
15✔
768
            print(f'Checked {len(jobs)} enabled job{"s" if jobs else ""} for errors in {dur_text(duration)}.')
15✔
769

770
        else:
UNCOV
771
            message = '\n'.join(error_jobs_lines(jobs))
×
772
            if message:
×
773
                # create a dummy job state to run a reporter on
UNCOV
774
                job_state = JobState(
×
775
                    None,  # ty:ignore[invalid-argument-type]
776
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
777
                )
778
                job_state.traceback = f'{header}\n{message}'
×
UNCOV
779
                duration = time.perf_counter() - start
×
UNCOV
780
                self.urlwatcher.report.config['footnote'] = (
×
781
                    f'Checked {len(jobs)} job{"s" if jobs else ""} for errors in {dur_text(duration)}.'
782
                )
783
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
784
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
785
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
UNCOV
786
                self.urlwatcher.report.error(job_state)
×
787
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
788
            else:
789
                print(header)
×
790
                print('--')
×
791
                duration = time.perf_counter() - start
×
UNCOV
792
                print('Found no errors.')
×
UNCOV
793
                print(f'Checked {len(jobs)} job{"s" if jobs else ""} for errors in {dur_text(duration)}.')
×
794

795
        return 0
15✔
796

797
    def rollback_database(self, timespec: str) -> int:
15✔
798
        """Issues a warning, calls rollback() and prints out the result.
799

800
        :param timestamp: A timespec that if numeric is interpreted as a Unix timestamp otherwise it's passed to
801
          dateutil.parser (if datetime is installed) or datetime.fromisoformat to be converted into a date.
802

803
        :return: A sys.exit code (0 for succcess, 1 for failure)
804
        """
805

806
        def _convert_to_datetime(timespec: str, tz_info: ZoneInfo | tzinfo | None) -> datetime:
15✔
807
            """Converts inputted string to a datetime object, using dateutil if installed.
808

809
            :param timespec: The string.
810
            :param tz_info: The timezone.
811

812
            :return: The datetime object.
813
            """
814
            # --- 1. Try parsing as a numeric timestamp ---
815
            # This is the fastest check and should come first.
816
            if timespec.isnumeric() or (timespec.startswith('-') and timespec[1:].isnumeric()):
15✔
817
                try:
15✔
818
                    timestamp = float(timespec)
15✔
819
                    return datetime.fromtimestamp(timestamp, tz=tz_info)
15✔
820
                except (ValueError, TypeError):
×
821
                    # Pass to the next method if it's not a valid float (e.g., "123a")
UNCOV
822
                    pass
×
823

824
            # --- 2. Try parsing as ISO 8601 format ---
825
            # datetime.fromisoformat is very efficient for standard formats.
826
            try:
15✔
827
                dt = datetime.fromisoformat(timespec)
15✔
828
                # If the parsed datetime is naive (no timezone), apply the provided one.
829
                if dt.tzinfo is None:
×
UNCOV
830
                    return dt.replace(tzinfo=tz_info)
×
UNCOV
831
                return dt
×
832
            except ValueError:
15✔
833
                # Pass to the next method if it's not a valid ISO string.
834
                pass
15✔
835

836
            # --- 3. Try parsing with the flexible but slower dateutil library ---
837
            try:
15✔
838
                from dateutil import parser as dateutil_parser
15✔
839

840
                try:
15✔
841
                    # Set a default datetime to provide context and timezone for ambiguous strings like "Sunday at 4pm".
842
                    default_dt_with_tz = datetime.now(tz_info).replace(second=0, microsecond=0)
15✔
843
                    return dateutil_parser.parse(timespec, default=default_dt_with_tz)  # bug
15✔
844
                except (ValueError, OverflowError):
15✔
845
                    # Pass to the next method if datetutil cannot parse.
846
                    pass
15✔
847
            except ImportError:
848
                # Pass to the next method if datetutil is not installed.
849
                pass
850

851
            # --- 4. If all parsing attempts fail ---
852
            raise ValueError(f'Cannot parse "{timespec}" into a date/time.')
15✔
853

854
        tz = self.urlwatcher.report.config['report']['tz']
15✔
855
        tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
15✔
856
        dt = _convert_to_datetime(timespec, tz_info)
15✔
857
        timestamp_date = email.utils.format_datetime(dt)
15✔
858
        count = self.urlwatcher.ssdb_storage.rollback(dt.timestamp())
15✔
859
        print(f'Rolling back database to {timestamp_date}.')
15✔
860
        if sys.__stdin__ and sys.__stdin__.isatty():
15✔
861
            print(
5✔
862
                f'WARNING: All {count} snapshots after this date/time (check timezone) will be deleted.\n'
863
                f'         ☠  This operation cannot be undone!\n'
864
                f'         We suggest you make a backup of the database file before proceeding:\n'
865
                f'         {self.urlwatch_config.ssdb_file}'
866
            )
867
            resp = input("         Please enter 'Y' to proceed: ")
5✔
868
            if not resp.upper().startswith('Y'):
5!
UNCOV
869
                print('Quitting rollback. No snapshots have been deleted.')
×
UNCOV
870
                return 1
×
871
        count = self.urlwatcher.ssdb_storage.rollback(dt.timestamp())
15✔
872
        if count:
15!
UNCOV
873
            print(f'Deleted {count} snapshots taken after {timestamp_date}.')
×
874
        else:
875
            print(f'No snapshots found after {timestamp_date}')
15✔
876
        return 0
15✔
877

878
    def delete_snapshot(self, job_id: str | int) -> int:
15✔
879
        job = self._find_job_with_defaults(job_id)
15✔
880
        history = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
15✔
881
        if not history:
15✔
882
            print(f'No snapshots found for {job.get_indexed_location()}.')
15✔
883
            return 1
15✔
884
        tz = self.urlwatcher.report.config['report']['tz']
15✔
885
        tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
15✔
886
        if sys.__stdin__ and sys.__stdin__.isatty():
15✔
887
            print(f'WARNING: About to delete the latest snapshot of\n         {job.get_indexed_location()}:')
5✔
888
            for i, history_job in enumerate(history):
5✔
889
                print(
5✔
890
                    f'         {i + 1}. {"❌ " if i == 0 else "   "}'
891
                    f'{email.utils.format_datetime(datetime.fromtimestamp(history_job.timestamp).astimezone(tz_info))}'
892
                    f'{"  ⬅  ABOUT TO BE DELETED!" if i == 0 else ""}'
893
                )
894
            print(
5✔
895
                f'         ☠  This operation cannot be undone!\n'
896
                f'         We suggest you make a backup of the database file before proceeding:\n'
897
                f'         {self.urlwatch_config.ssdb_file}'
898
            )
899
            resp = input("         Please enter 'Y' to proceed: ")
5✔
900
            if not resp.upper().startswith('Y'):
5!
UNCOV
901
                print('Quitting. No snapshots have been deleted.')
×
UNCOV
902
                return 1
×
903
        count = self.urlwatcher.ssdb_storage.delete_latest(job.guid)
15✔
904
        if count:
15!
905
            print(f'Deleted last snapshot of {job.get_indexed_location()}; {len(history) - 1} snapshots left.')
15✔
906
            return 0
15✔
UNCOV
907
        print(f'No snapshots found for {job.get_indexed_location()}.')
×
UNCOV
908
        return 1
×
909

910
    def modify_urls(self) -> int:
15✔
911
        if self.urlwatch_config.delete is not None:
15✔
912
            job = self._find_job(self.urlwatch_config.delete)
15✔
913
            if job is not None:
15!
914
                if sys.__stdin__ and sys.__stdin__.isatty():
15✔
915
                    print(
5✔
916
                        f'WARNING: About to permanently delete {job.get_indexed_location()}.\n'
917
                        '         Job file will be overwritten and all remarks lost.'
918
                        '         This operation cannot be undone!\n'
919
                    )
920
                    resp = input("         Please enter 'Y' to proceed: ")
5✔
921
                    if not resp.upper().startswith('Y'):
5!
UNCOV
922
                        print(f'Quitting. Job {job.index_number} has not been deleted and job file is unmodified.')
×
UNCOV
923
                        return 1
×
924
                self.urlwatcher.jobs.remove(job)
15✔
925
                print(f'Removed {job}.')
15✔
926
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
15✔
927
            else:
UNCOV
928
                print(f'Job not found: {self.urlwatch_config.delete}.')
×
UNCOV
929
                return 1
×
930

931
        if self.urlwatch_config.add is not None:
15✔
932
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
933
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
15✔
934
            filters = [v for k, v in items if k == 'filter']
15✔
935
            items2 = [(k, v) for k, v in items if k != 'filter']
15✔
936
            d = dict(items2)
15✔
937
            if filters:
15!
UNCOV
938
                d['filter'] = ','.join(filters)
×
939

940
            job = JobBase.unserialize(d)
15✔
941
            print(f'Adding {job}.')
15✔
942
            self.urlwatcher.jobs.append(job)
15✔
943
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
15✔
944

945
        if self.urlwatch_config.change_location is not None:
15✔
946
            new_loc = self.urlwatch_config.change_location[1]
15✔
947
            # Ensure the user isn't overwriting an existing job with the change.
948
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
15!
UNCOV
949
                print(
×
950
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
951
                    f'different value.\n'
952
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
953
                )
UNCOV
954
                return 1
×
955
            job = self._find_job(self.urlwatch_config.change_location[0])
15✔
956
            if job is not None:
15!
957
                # Update the job's location (which will also update the guid) and move any history in the database
958
                # over to the job's updated guid.
959
                old_loc = job.get_location()
15✔
960
                print(f'Moving location of "{old_loc}" to "{new_loc}".')
15✔
961
                old_guid = job.guid
15✔
962
                if old_guid not in self.urlwatcher.ssdb_storage.get_guids():
15✔
963
                    print(f'No snapshots found for "{old_loc}".')
15✔
964
                    return 1
15✔
965
                job.set_base_location(new_loc)
15✔
966
                num_searched = self.urlwatcher.ssdb_storage.move(old_guid, job.guid)
15✔
967
                if num_searched:
15!
968
                    print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}".')
15✔
969
            else:
UNCOV
970
                print(f'Job not found: "{self.urlwatch_config.change_location[0]}".')
×
UNCOV
971
                return 1
×
972
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
15✔
973
            if not input(message).lower().startswith('y'):
15!
UNCOV
974
                print(f'Please manually update the jobs file by replacing "{old_loc}" with "{new_loc}".')
×
975
            else:
976
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
15✔
977

978
        return 0
15✔
979

980
    def edit_config(self) -> int:
15✔
981
        return self.urlwatcher.config_storage.edit()
15✔
982

983
    def check_telegram_chats(self) -> None:
15✔
984
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
15✔
985

986
        bot_token = config['bot_token']
15✔
987
        if not bot_token:
15✔
988
            print('You need to set up your bot token first (see documentation).')
15✔
989
            self._exit(1)
15✔
990

991
        with httpx.Client(http2=h2 is not None) if httpx else requests.Session() as http_client:
15✔
992
            info = http_client.get(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
15✔
993
            if not info['ok']:
15!
994
                print(f'Error with token {bot_token}: {info["description"]}.')
15✔
995
                self._exit(1)
15✔
996

UNCOV
997
            chats = {}
×
998
            updates = http_client.get(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
999
        if 'result' in updates:
6!
1000
            for chat_info in updates['result']:
×
1001
                chat = chat_info['message']['chat']
×
UNCOV
1002
                if chat['type'] == 'private':
×
UNCOV
1003
                    chats[chat['id']] = (
×
1004
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
1005
                    )
1006

1007
        if not chats:
×
UNCOV
1008
            print(f'No chats found. Say hello to your bot at https://t.me/{info["result"]["username"]}.')
×
1009
            self._exit(1)
×
1010

1011
        headers = ('Chat ID', 'Name')
×
1012
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
1013
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
1014
        fmt = f'%-{maxchat}s  %s'
×
1015
        print(fmt % headers)
×
1016
        print(fmt % ('-' * maxchat, '-' * maxname))
×
1017
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
UNCOV
1018
            print(fmt % (k, v))
×
1019
        print(f'\nChat up your bot here: https://t.me/{info["result"]["username"]}.')
×
1020

UNCOV
1021
        self._exit(0)
×
1022

1023
    def check_test_reporter(
15✔
1024
        self,
1025
        job_state: JobState | None = None,
1026
        label: str = 'test',
1027
        report: Report | None = None,
1028
    ) -> int:
1029
        """Tests a reporter by creating pseudo-jobs of new, changed, unchanged, and error outcomes ('verb').
1030

1031
        Note: The report will only show new, unchanged and error content if enabled in the respective `display` keys
1032
        of the configuration file.
1033

1034
        :param job_state: The JobState (Optional).
1035
        :param label: The label to be used in the report; defaults to 'test'.
1036
        :param report: A Report class to use for testing (Optional).
1037
        :return: 0 if successful, 1 otherwise.
1038
        """
1039
        from webchanges.reporters import ReporterBase
15✔
1040

1041
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
15✔
1042
            """Builds a pseudo-job for the reporter to run on."""
1043
            job = JobBase.unserialize({'name': job_name, 'url': url})
15✔
1044

1045
            # Can pass in None for ssdb_storage, as we are not going to load or save the job state for
1046
            # testing; also no need to use it as context manager, since no processing is called on the job
1047
            job_state = JobState(None, job)  # ty:ignore[invalid-argument-type]
15✔
1048

1049
            job_state.old_data = old
15✔
1050
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
15✔
1051
            job_state.new_data = new
15✔
1052
            job_state.new_timestamp = time.time()
15✔
1053

1054
            return job_state
15✔
1055

1056
        def set_error(job_state: 'JobState', message: str) -> JobState:
15✔
1057
            """Sets a job error message on a JobState."""
1058
            try:
15✔
1059
                raise ValueError(message)
15✔
1060
            except ValueError as e:
15✔
1061
                job_state.exception = e
15✔
1062
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
15✔
1063

1064
            return job_state
15✔
1065

1066
        reporter_name = self.urlwatch_config.test_reporter
15✔
1067
        if reporter_name not in ReporterBase.__subclasses__:
15✔
1068
            print(
15✔
1069
                f'No such reporter: {reporter_name}.\n'
1070
                f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}.\n'
1071
            )
1072
            return 1
15✔
1073

1074
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][reporter_name]  # ty:ignore[invalid-key]
15✔
1075
        if job_state:  # we want a full report
15✔
1076
            cfg['enabled'] = True
15✔
1077
            self.urlwatcher.config_storage.config['display'][label] = True  # ty:ignore[invalid-key]
15✔
1078
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
15✔
1079
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
15✔
1080
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
15✔
1081
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
15✔
1082
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
15✔
1083
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
15✔
1084
            self.urlwatcher.config_storage.config['report']['stdout']['color'] = False
15✔
1085
        elif not cfg['enabled']:
15✔
1086
            print(
15✔
1087
                f'WARNING: Reporter being tested is not enabled: {reporter_name}.\n'
1088
                f'Will still attempt to test it, but this may not work.\n'
1089
                f'Use {__project_name__} --edit-config to configure reporters.'
1090
            )
1091
            cfg['enabled'] = True
15✔
1092

1093
        if report is None:
15✔
1094
            report = Report(self.urlwatcher)
15✔
1095

1096
        if job_state:
15✔
1097
            report.custom(job_state, label)  # ty:ignore[invalid-argument-type]
15✔
1098
        else:
1099
            report.new(
15✔
1100
                build_job(
1101
                    'Sample job that was newly added',
1102
                    'https://example.com/new',
1103
                    '',
1104
                    '',
1105
                )
1106
            )
1107
            report.changed(
15✔
1108
                build_job(
1109
                    'Sample job where something changed',
1110
                    'https://example.com/changed',
1111
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
1112
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
1113
                )
1114
            )
1115
            report.unchanged(
15✔
1116
                build_job(
1117
                    'Sample job where nothing changed',
1118
                    'http://example.com/unchanged',
1119
                    'Same Old, Same Old\n',
1120
                    'Same Old, Same Old\n',
1121
                )
1122
            )
1123
            report.error(
15✔
1124
                set_error(
1125
                    build_job(
1126
                        'Sample job where an error was encountered',
1127
                        'https://example.com/error',
1128
                        '',
1129
                        '',
1130
                    ),
1131
                    'The error message would appear here.',
1132
                )
1133
            )
1134

1135
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
15✔
1136

1137
        return 0
15✔
1138

1139
    def check_smtp_login(self) -> None:
15✔
1140
        from webchanges.mailer import SMTPMailer, smtp_have_password, smtp_set_password
15✔
1141

1142
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
15✔
1143
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
15✔
1144

1145
        success = True
15✔
1146

1147
        if not config['enabled']:
15✔
1148
            print('Please enable email reporting in the config first.')
15✔
1149
            success = False
15✔
1150

1151
        if config['method'] != 'smtp':
15✔
1152
            print('Please set the method to SMTP for the email reporter.')
15✔
1153
            success = False
15✔
1154

1155
        smtp_auth = smtp_config['auth']
15✔
1156
        if not smtp_auth:
15✔
1157
            print('Authentication must be enabled for SMTP.')
15✔
1158
            success = False
15✔
1159

1160
        smtp_hostname = smtp_config['host']
15✔
1161
        if not smtp_hostname:
15✔
1162
            print('Please configure the SMTP hostname in the config first.')
15✔
1163
            success = False
15✔
1164

1165
        smtp_username = smtp_config['user'] or config['from']
15✔
1166
        if not smtp_username:
15✔
1167
            print('Please configure the SMTP user in the config first.')
15✔
1168
            success = False
15✔
1169

1170
        if not success:
15✔
1171
            self._exit(1)
15✔
1172

1173
        insecure_password = smtp_config['insecure_password']
6✔
1174
        if insecure_password:
6!
1175
            print('The SMTP password is set in the config file (key "insecure_password").')
6✔
1176
        elif smtp_have_password(smtp_hostname, smtp_username):
×
1177
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
UNCOV
1178
            if not input(message).lower().startswith('y'):
×
1179
                print('Password unchanged.')
×
1180
            else:
UNCOV
1181
                smtp_set_password(smtp_hostname, smtp_username)
×
1182

1183
        smtp_port = smtp_config['port']
6✔
1184
        smtp_tls = smtp_config['starttls']
6✔
1185

1186
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
6✔
1187
        print('Trying to log into the SMTP server...')
6✔
1188
        mailer.send(None)
6✔
1189
        print('Successfully logged into SMTP server.')
×
1190

UNCOV
1191
        self._exit(0)
×
1192

1193
    def check_xmpp_login(self) -> None:
15✔
1194
        from webchanges.reporters import xmpp_have_password, xmpp_set_password
15✔
1195

1196
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
15✔
1197

1198
        success = True
15✔
1199

1200
        if not xmpp_config['enabled']:
15✔
1201
            print('Please enable XMPP reporting in the config first.')
15✔
1202
            success = False
15✔
1203

1204
        xmpp_sender = xmpp_config['sender']
15✔
1205
        if not xmpp_sender:
15✔
1206
            print('Please configure the XMPP sender in the config first.')
15✔
1207
            success = False
15✔
1208

1209
        if not xmpp_config['recipient']:
15✔
1210
            print('Please configure the XMPP recipient in the config first.')
15✔
1211
            success = False
15✔
1212

1213
        if not success:
15✔
1214
            self._exit(1)
15✔
1215

1216
        if 'insecure_password' in xmpp_config:
15!
1217
            print('The XMPP password is already set in the config (key "insecure_password").')
15✔
1218
            self._exit(0)
15✔
1219

1220
        if xmpp_have_password(xmpp_sender):
×
1221
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
1222
            if input(message).lower() != 'y':
×
UNCOV
1223
                print('Password unchanged.')
×
1224
                self._exit(0)
×
1225

UNCOV
1226
        if success:
×
1227
            xmpp_set_password(xmpp_sender)
×
1228

UNCOV
1229
        self._exit(0)
×
1230

1231
    @staticmethod
1232
    def playwright_install_chrome() -> int:  # pragma: no cover
1233
        """Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
1234
        install the browser executable.
1235

1236
        :return: Playwright's executable return code.
1237
        """
1238
        try:
1239
            from playwright._impl._driver import compute_driver_executable
1240
        except ImportError:  # pragma: no cover
1241
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
1242

1243
        driver_executable = compute_driver_executable()
1244
        env = os.environ.copy()
1245
        env['PW_CLI_TARGET_LANG'] = 'python'
1246
        cmd = [str(driver_executable), 'install', 'chrome']
1247
        logger.info(f'Running playwright CLI: {" ".join(cmd)}')
1248
        completed_process = subprocess.run(cmd, check=False, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
1249
        if completed_process.returncode:
1250
            print(completed_process.stderr)
1251
            return completed_process.returncode
1252
        if completed_process.stdout:
1253
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1254
        return 0
1255

1256
    def handle_actions(self) -> None:
15✔
1257
        """Handles the actions for command line arguments and exits."""
1258
        if self.urlwatch_config.list_jobs:
15✔
1259
            self.list_jobs(self.urlwatch_config.list_jobs)
15✔
1260
            self._exit(0)
15✔
1261

1262
        if self.urlwatch_config.errors:
15✔
1263
            self._exit(self.list_error_jobs())
15✔
1264

1265
        if self.urlwatch_config.test_job:
15✔
1266
            self.test_job(self.urlwatch_config.test_job)
15✔
1267
            self._exit(0)
15✔
1268

1269
        if self.urlwatch_config.prepare_jobs:
15✔
1270
            self.prepare_jobs()
15✔
1271
            self._exit(0)
15✔
1272

1273
        if self.urlwatch_config.test_differ:
15✔
1274
            self._exit(self.test_differ(self.urlwatch_config.test_differ))
15✔
1275

1276
        if self.urlwatch_config.dump_history:
15✔
1277
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
15✔
1278

1279
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
15✔
1280
            self._exit(self.modify_urls())
15✔
1281

1282
        if self.urlwatch_config.test_reporter:
15✔
1283
            self._exit(self.check_test_reporter())
15✔
1284

1285
        if self.urlwatch_config.smtp_login:
15✔
1286
            self.check_smtp_login()
15✔
1287

1288
        if self.urlwatch_config.telegram_chats:
15✔
1289
            self.check_telegram_chats()
15✔
1290

1291
        if self.urlwatch_config.xmpp_login:
15✔
1292
            self.check_xmpp_login()
15✔
1293

1294
        if self.urlwatch_config.edit:
15✔
1295
            self._exit(self.urlwatcher.jobs_storage.edit())
15✔
1296

1297
        if self.urlwatch_config.edit_config:
15✔
1298
            self._exit(self.edit_config())
15✔
1299

1300
        if self.urlwatch_config.edit_hooks:
15✔
1301
            self._exit(self.edit_hooks())
15✔
1302

1303
        if self.urlwatch_config.gc_database:
15✔
1304
            self.urlwatcher.ssdb_storage.gc(
15✔
1305
                [job.guid for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1306
            )
1307
            self._exit(0)
15✔
1308

1309
        if self.urlwatch_config.clean_database:
15✔
1310
            self.urlwatcher.ssdb_storage.clean_ssdb(
15✔
1311
                [job.guid for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1312
            )
1313
            self._exit(0)
15✔
1314

1315
        if self.urlwatch_config.rollback_database:
15✔
1316
            exit_arg = self.rollback_database(self.urlwatch_config.rollback_database)
15✔
1317
            self._exit(exit_arg)
15✔
1318

1319
        if self.urlwatch_config.delete_snapshot:
15✔
1320
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
15✔
1321

1322
        if self.urlwatch_config.features:
15✔
1323
            self._exit(self.show_features())
15✔
1324

1325
        if self.urlwatch_config.detailed_versions:
15!
1326
            self._exit(self.show_detailed_versions())
15✔
1327

1328
    def run(self) -> None:  # pragma: no cover
1329
        """The main run logic."""
1330
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1331
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1332

1333
        self.handle_actions()
1334

1335
        self.urlwatcher.run_jobs()
1336

1337
        self.urlwatcher.close()
1338

1339
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc