• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 17710149774

14 Sep 2025 10:49AM UTC coverage: 71.376% (-3.1%) from 74.434%
17710149774

push

github

mborsetti
Version 3.31.1.post2

1383 of 2314 branches covered (59.77%)

Branch coverage included in aggregate %.

4614 of 6088 relevant lines covered (75.79%)

5.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.06
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import difflib
8✔
8
import email.utils
8✔
9
import gc
8✔
10
import importlib.metadata
8✔
11
import logging
8✔
12
import os
8✔
13
import platform
8✔
14
import re
8✔
15
import shutil
8✔
16
import sqlite3
8✔
17
import subprocess
8✔
18
import sys
8✔
19
import time
8✔
20
import traceback
8✔
21
from concurrent.futures import ThreadPoolExecutor
8✔
22
from contextlib import ExitStack
8✔
23
from datetime import datetime, tzinfo
8✔
24
from pathlib import Path
8✔
25
from typing import TYPE_CHECKING, Iterable, Iterator
8✔
26
from urllib.parse import unquote_plus
8✔
27
from zoneinfo import ZoneInfo
8✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
8✔
30
from webchanges.handler import JobState, Report
8✔
31
from webchanges.jobs import JobBase, NotModifiedError, UrlJob
8✔
32
from webchanges.util import dur_text, edit_file, import_module_from_source
8✔
33

34
try:
8✔
35
    import httpx
8✔
36
except ImportError:  # pragma: no cover
37
    httpx = None  # type: ignore[assignment]
38
    print("Required package 'httpx' not found; will attempt to run using 'requests'.")
39
    try:
40
        import requests
41
    except ImportError as e:  # pragma: no cover
42
        raise RuntimeError(
43
            f"A Python HTTP client package (either 'httpx' or 'requests' is required to run {__project_name__}; "
44
            'neither can be imported.'
45
        ) from e
46
if httpx is not None:
8!
47
    try:
8✔
48
        import h2
8✔
49
    except ImportError:  # pragma: no cover
50
        h2 = None  # type: ignore[assignment]
51

52
logger = logging.getLogger(__name__)
8✔
53

54
if TYPE_CHECKING:
55
    from webchanges.main import Urlwatch
56
    from webchanges.reporters import _ConfigReportersList
57
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
58

59

60
class UrlwatchCommand:
8✔
61
    """The class that runs the program after initialization and CLI arguments parsing."""
62

63
    def __init__(self, urlwatcher: Urlwatch) -> None:
8✔
64
        self.urlwatcher = urlwatcher
8✔
65
        self.urlwatch_config = urlwatcher.urlwatch_config
8✔
66

67
    @staticmethod
8✔
68
    def _exit(arg: str | int | None) -> None:
8✔
69
        logger.info(f'Exiting with exit code {arg}')
8✔
70
        sys.exit(arg)
8✔
71

72
    def jobs_from_joblist(self) -> Iterator[JobBase]:
8✔
73
        """Generates the jobs to process from the joblist entered in the CLI."""
74
        if self.urlwatcher.urlwatch_config.joblist:
8✔
75
            jobs = {self._find_job(job_entry) for job_entry in self.urlwatcher.urlwatch_config.joblist}
8✔
76
            enabled_jobs = {job for job in jobs if job.is_enabled()}
8✔
77
            disabled = len(enabled_jobs) - len(jobs)
8✔
78
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
79
            logger.debug(
8✔
80
                f'Processing {len(enabled_jobs)} job{"s" if len(enabled_jobs) else ""}{disabled_str} as specified in '
81
                f'command line: {", ".join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}'
82
            )
83
        else:
84
            enabled_jobs = {job for job in self.urlwatcher.jobs if job.is_enabled()}
8✔
85
            disabled = len(enabled_jobs) - len(self.urlwatcher.jobs)
8✔
86
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
87
            logger.debug(f'Processing {len(enabled_jobs)} job{"s" if len(enabled_jobs) else ""}{disabled_str}')
8✔
88
        for job in enabled_jobs:
8✔
89
            yield job.with_defaults(self.urlwatcher.config_storage.config)
8✔
90

91
    def edit_hooks(self) -> int:
8✔
92
        """Edit hooks file.
93

94
        :returns: 0 if edit is successful, 1 otherwise.
95
        """
96
        # Similar code to BaseTextualFileStorage.edit()
97
        for hooks_file in self.urlwatch_config.hooks_files:
8✔
98
            logger.debug(f'Edit file {hooks_file}')
8✔
99
            # Python 3.9: hooks_edit = self.urlwatch_config.hooks.with_stem(self.urlwatch_config.hooks.stem + '_edit')
100
            hooks_edit = hooks_file.parent.joinpath(hooks_file.stem + '_edit' + ''.join(hooks_file.suffixes))
8✔
101
            if hooks_file.exists():
8!
102
                shutil.copy(hooks_file, hooks_edit)
8✔
103
            # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
104
            #         self.urlwatch_config.hooks_py_example):
105
            #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
106

107
            while True:
8✔
108
                try:
8✔
109
                    edit_file(hooks_edit)
8✔
110
                    import_module_from_source('hooks', hooks_edit)
8✔
111
                    break  # stop if no exception on parser
8✔
112
                except SystemExit:
8✔
113
                    raise
×
114
                except Exception as e:
8✔
115
                    print('Parsing failed:')
8✔
116
                    print('======')
8✔
117
                    print(e)
8✔
118
                    print('======')
8✔
119
                    print('')
8✔
120
                    print(f'The file {hooks_file} was NOT updated.')
8✔
121
                    user_input = input('Do you want to retry the same edit? (Y/n)')
8✔
122
                    if not user_input or user_input.lower()[0] == 'y':
×
123
                        continue
×
124
                    hooks_edit.unlink()
×
125
                    print('No changes have been saved.')
×
126
                    return 1
×
127

128
            if hooks_file.is_symlink():
8!
129
                hooks_file.write_text(hooks_edit.read_text())
×
130
            else:
131
                hooks_edit.replace(hooks_file)
8✔
132
            hooks_edit.unlink(missing_ok=True)
8✔
133
            print(f'Saved edits in {hooks_file}.')
8✔
134

135
        return 0
8✔
136

137
    @staticmethod
8✔
138
    def show_features() -> int:
8✔
139
        """
140
        Prints the "features", i.e. a list of job types, filters and reporters.
141

142
        :return: 0.
143
        """
144
        from webchanges.differs import DifferBase
8✔
145
        from webchanges.filters import FilterBase
8✔
146
        from webchanges.reporters import ReporterBase
8✔
147

148
        print(f'Please see full documentation at {__docs_url__}.')
8✔
149
        print()
8✔
150
        print('Supported jobs:\n')
8✔
151
        print(JobBase.job_documentation())
8✔
152
        print('Supported filters:\n')
8✔
153
        print(FilterBase.filter_documentation())
8✔
154
        print()
8✔
155
        print('Supported differs:\n')
8✔
156
        print(DifferBase.differ_documentation())
8✔
157
        print()
8✔
158
        print('Supported reporters:\n')
8✔
159
        print(ReporterBase.reporter_documentation())
8✔
160
        print()
8✔
161
        print(f'Please see full documentation at {__docs_url__}.')
8✔
162

163
        return 0
8✔
164

165
    @staticmethod
8✔
166
    def show_detailed_versions() -> int:
8✔
167
        """
168
        Prints the detailed versions, including of dependencies.
169

170
        :return: 0.
171
        """
172

173
        def dependencies() -> list[str]:
8✔
174
            try:
8✔
175
                from pip._internal.metadata import get_default_environment
8✔
176

177
                env = get_default_environment()
8✔
178
                dist = None
8✔
179
                for dist in env.iter_all_distributions():
8✔
180
                    if dist.canonical_name == __project_name__:
8!
181
                        break
×
182
                if dist and dist.canonical_name == __project_name__:
8!
183
                    requires_dist = dist.metadata_dict.get('requires_dist', [])
×
184
                    dependencies = [re.split('[ <>=;#^[]', d)[0] for d in requires_dist]
×
185
                    dependencies.extend(('packaging', 'simplejson'))
×
186
                    return sorted(dependencies, key=str.lower)
×
187
            except ImportError:
188
                pass
189

190
            # default list of all possible dependencies
191
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
8✔
192
            return [
8✔
193
                'aioxmpp',
194
                'beautifulsoup4',
195
                'chump',
196
                'colorama',
197
                'cryptography',
198
                'cssbeautifier',
199
                'cssselect',
200
                'deepdiff',
201
                'h2',
202
                'html2text',
203
                'httpx',
204
                'jq',
205
                'jsbeautifier',
206
                'keyring',
207
                'lxml',
208
                'markdown2',
209
                'matrix_client',
210
                'msgpack',
211
                'packaging',
212
                'pdftotext',
213
                'Pillow',
214
                'platformdirs',
215
                'playwright',
216
                'psutil',
217
                'pushbullet.py',
218
                'pypdf',
219
                'pytesseract',
220
                'pyyaml',
221
                'redis',
222
                'requests',
223
                'simplejson',
224
                'tzdata',
225
                'vobject',
226
            ]
227

228
        print('Software:')
8✔
229
        print(f'• {__project_name__}: {__version__}')
8✔
230
        print(
8✔
231
            f'• {platform.python_implementation()}: {platform.python_version()} '
232
            f'{platform.python_build()} {platform.python_compiler()}'
233
        )
234
        print(f'• SQLite: {sqlite3.sqlite_version}')
8✔
235

236
        try:
8✔
237
            import psutil
8✔
238
            from psutil._common import bytes2human
8✔
239

240
            print()
8✔
241
            print('System:')
8✔
242
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
8✔
243
            print(f'• Processor: {platform.processor()}')
8✔
244
            print(f'• CPUs (logical): {psutil.cpu_count()}')
8✔
245
            try:
8✔
246
                virt_mem = psutil.virtual_memory().available
8✔
247
                print(
8✔
248
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
249
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
250
                )
251
            except psutil.Error as e:  # pragma: no cover
252
                print(f'• Free memory: Could not read information: {e}')
253
            print(
8✔
254
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
255
                f'({100 - psutil.disk_usage("/").percent:.1f}%)'
256
            )
257
            executor = ThreadPoolExecutor()
8✔
258
            print(f'• --max-threads default: {executor._max_workers}')
8✔
259
        except ImportError:
260
            pass
261

262
        print()
8✔
263
        print('Relevant PyPi packages:')
8✔
264
        for module_name in dependencies():
8✔
265
            try:
8✔
266
                mod = importlib.metadata.distribution(module_name)
8✔
267
            except ModuleNotFoundError:
8✔
268
                continue
8✔
269
            print(f'• {module_name}: {mod.version}')
8✔
270
            # package requirements
271
            if mod.requires:
8✔
272
                for req_name in [i.split()[0] for i in mod.requires]:
8✔
273
                    try:
8✔
274
                        req = importlib.metadata.distribution(req_name)
8✔
275
                    except ModuleNotFoundError:
8✔
276
                        continue
8✔
277
                    print(f'  - {req_name}: {req.version}')
8✔
278

279
        # playwright
280
        try:
8✔
281
            from playwright.sync_api import Error as PlaywrightError
8✔
282
            from playwright.sync_api import sync_playwright
8✔
283

284
            with sync_playwright() as p:
8✔
285
                try:
×
286
                    browser = p.chromium.launch(channel='chrome')
×
287
                    print()
×
288
                    print('Playwright browser:')
×
289
                    print(f'• Name: {browser.browser_type.name}')
×
290
                    print(f'• Version: {browser.version}')
×
291
                    if psutil:
×
292
                        browser.new_page()
×
293
                        try:
×
294
                            virt_mem = psutil.virtual_memory().available
×
295
                            print(
×
296
                                f'• Free memory with browser loaded: '
297
                                f'{bytes2human(virt_mem)} physical plus '
298
                                f'{bytes2human(psutil.swap_memory().free)} swap'
299
                            )
300
                        except psutil.Error:
×
301
                            pass
×
302
                except PlaywrightError as e:
×
303
                    print()
×
304
                    print('Playwright browser:')
×
305
                    print(f'• Error: {e}')
×
306
        except ImportError:
307
            pass
308

309
        if os.name == 'posix':
8!
310
            try:
8✔
311
                import apt
8✔
312

313
                apt_cache = apt.Cache()
×
314

315
                def print_version(libs: list[str]) -> None:
×
316
                    for lib in libs:
×
317
                        if lib in apt_cache:
×
318
                            if ver := apt_cache[lib].versions:
×
319
                                print(f'   - {ver[0].package}: {ver[0].version}')
×
320
                    return None
×
321

322
                print()
×
323
                print('Installed dpkg dependencies:')
×
324
                for module, apt_dists in (
×
325
                    ('jq', ['jq']),
326
                    # https://github.com/jalan/pdftotext#os-dependencies
327
                    ('pdftotext', ['libpoppler-cpp-dev']),
328
                    # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
329
                    (
330
                        'Pillow',
331
                        [
332
                            'libjpeg-dev',
333
                            'zlib-dev',
334
                            'zlib1g-dev',
335
                            'libtiff-dev',
336
                            'libfreetype-dev',
337
                            'littlecms-dev',
338
                            'libwebp-dev',
339
                            'tcl/tk-dev',
340
                            'openjpeg-dev',
341
                            'libimagequant-dev',
342
                            'libraqm-dev',
343
                            'libxcb-dev',
344
                            'libxcb1-dev',
345
                        ],
346
                    ),
347
                    ('playwright', ['google-chrome-stable']),
348
                    # https://tesseract-ocr.github.io/tessdoc/Installation.html
349
                    ('pytesseract', ['tesseract-ocr']),
350
                ):
351
                    try:
×
352
                        importlib.metadata.distribution(module)
×
353
                        print(f'• {module}')
×
354
                        print_version(apt_dists)
×
355
                    except importlib.metadata.PackageNotFoundError:
×
356
                        pass
×
357
            except ImportError:
358
                pass
359
        return 0
8✔
360

361
    def list_jobs(self, regex: bool | str) -> None:
8✔
362
        """
363
        Lists the job and their respective _index_number.
364

365
        :return: None.
366
        """
367
        if isinstance(regex, str):
8!
368
            print(f"List of jobs matching the RegEx '{regex}':")
×
369
        else:
370
            print('List of jobs:')
8✔
371
        for job in self.urlwatcher.jobs:
8✔
372
            if self.urlwatch_config.verbose:
8✔
373
                job_desc = f'{job.index_number:3}: {job!r}'
8✔
374
            else:
375
                pretty_name = job.pretty_name()
8✔
376
                location = job.get_location()
8✔
377
                if pretty_name != location:
8!
378
                    job_desc = f'{job.index_number:3}: {pretty_name} ({location})'
8✔
379
                else:
380
                    job_desc = f'{job.index_number:3}: {pretty_name}'
×
381
            if isinstance(regex, bool) or re.findall(regex, job_desc):
8!
382
                print(job_desc)
8✔
383

384
        if len(self.urlwatch_config.jobs_files) > 1:
8✔
385
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
8✔
386
        elif len(self.urlwatch_config.jobs_files) == 1:
8✔
387
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
8✔
388
        else:
389
            jobs_files = []
8✔
390
        print('\n   '.join(jobs_files))
8✔
391

392
    def _find_job(self, query: str | int) -> JobBase:
8✔
393
        """Finds the job based on a query, which is matched to the job index (also negative) or a job location
394
        (i.e. the url/user_visible_url or command).
395

396
        :param query: The query.
397
        :return: The matching JobBase.
398
        :raises IndexError: If job is not found.
399
        """
400
        if isinstance(query, int):
8✔
401
            index = query
8✔
402
        else:
403
            try:
8✔
404
                index = int(query)
8✔
405
            except ValueError:
8✔
406
                query = unquote_plus(query)
8✔
407
                try:
8✔
408
                    return next((job for job in self.urlwatcher.jobs if unquote_plus(job.get_location()) == query))
8✔
409
                except StopIteration:
8✔
410
                    raise ValueError(f"Job {query} does not match any job's url/user_visible_url or command.") from None
8✔
411

412
        if index == 0:
8✔
413
            raise ValueError(f'Job index {index} out of range.')
8✔
414
        try:
8✔
415
            if index <= 0:
8✔
416
                return self.urlwatcher.jobs[index]
8✔
417
            else:
418
                return self.urlwatcher.jobs[index - 1]
8✔
419
        except IndexError as e:
8✔
420
            raise ValueError(f'Job index {index} out of range (found {len(self.urlwatcher.jobs)} jobs).') from e
8✔
421

422
    def _find_job_with_defaults(self, query: str | int) -> JobBase:
8✔
423
        """
424
        Returns the job with defaults based on job_id, which could match an index or match a location
425
        (url/user_visible_url or command). Accepts negative numbers.
426

427
        :param query: The query.
428
        :return: The matching JobBase with defaults.
429
        :raises SystemExit: If job is not found.
430
        """
431
        job = self._find_job(query)
8✔
432
        return job.with_defaults(self.urlwatcher.config_storage.config)
8✔
433

434
    def test_job(self, job_id: bool | str | int) -> None:
8✔
435
        """
436
        Tests the running of a single job outputting the filtered text to --test-reporter (default is stdout). If
437
        job_id is True, don't run any jobs but load config, jobs and hook files to trigger any syntax errors.
438

439
        :param job_id: The job_id or True.
440

441
        :return: None.
442

443
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
444
        """
445
        if job_id is True:  # Load to trigger any eventual syntax errors
8✔
446
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
8✔
447
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
8✔
448
            if len(self.urlwatch_config.jobs_files) == 1:
8✔
449
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]},')
8✔
450
            else:
451
                message.append(
8✔
452
                    '\n   '.join(
453
                        [f'{conj}jobs files'] + [f'• {file},' for file in sorted(self.urlwatch_config.jobs_files)]
454
                    )
455
                )
456
            if 'hooks' in sys.modules:
8!
457
                message.append(f'\nand hooks file {sys.modules["hooks"].__file__}')
8✔
458
            print(f'{"".join(message)}.')
8✔
459
            return
8✔
460

461
        job = self._find_job_with_defaults(job_id)
8✔
462

463
        if isinstance(job, UrlJob):
8!
464
            # Force re-retrieval of job, as we're testing filters
465
            job.ignore_cached = True
×
466

467
        with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
468
            # duration = time.perf_counter() - start
469
            job_state.process(headless=not self.urlwatch_config.no_headless)
8✔
470
            if job_state.job.name is None:
8!
471
                job_state.job.name = ''
×
472
            # if job_state.job.note is None:
473
            #     job_state.job.note = ''
474
            data_info = '\n'.join(
8✔
475
                filter(
476
                    None,
477
                    (
478
                        f'• [GUID: {job_state.job.guid}]',
479
                        f'• [Media type: {job_state.new_mime_type}]' if job_state.new_mime_type else None,
480
                        f'• [ETag: {job_state.new_etag}]' if job_state.new_etag else None,
481
                        f'\nERROR {job_state.new_error_data["type"]}: {job_state.new_error_data["message"]}'
482
                        if job_state.new_error_data
483
                        else None,
484
                    ),
485
                )
486
            )
487
            job_state.new_data = f'{data_info}\n\n{job_state.new_data!s}'
8✔
488
            if self.urlwatch_config.test_reporter is None:
8✔
489
                self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
490
            report = Report(self.urlwatcher)
8✔
491
            report.job_states = []  # required
8✔
492
            errorlevel = self.check_test_reporter(
8✔
493
                job_state,
494
                label='test',
495
                report=report,
496
            )
497
            if errorlevel:
8!
498
                self._exit(errorlevel)
×
499
        return
8✔
500

501
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
502
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
503

504
    def prepare_jobs(self) -> None:
8✔
505
        """
506
        Runs jobs that have no history to populate the snapshot database when they're newly added.
507
        """
508
        new_jobs = set()
8✔
509
        for idx, job in enumerate(self.urlwatcher.jobs):
8✔
510
            has_history = bool(self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid))
8✔
511
            if not has_history:
8!
512
                print(f'Running new {job.get_indexed_location()}.')
8✔
513
                new_jobs.add(idx + 1)
8✔
514
        if not new_jobs and not self.urlwatch_config.joblist:
8!
515
            print('Found no new jobs to run.')
×
516
            return
×
517
        self.urlwatcher.urlwatch_config.joblist = set(self.urlwatcher.urlwatch_config.joblist).union(new_jobs)
8✔
518
        self.urlwatcher.run_jobs()
8✔
519
        self.urlwatcher.close()
8✔
520
        return
8✔
521

522
    def test_differ(self, arg_test_differ: list[str]) -> int:
8✔
523
        """
524
        Runs diffs for a job on all the saved snapshots and outputs the result to stdout or the reporter selected
525
        with --test-reporter.
526

527
        :param arg_test_differ: Either the job_id or a list containing [job_id, max_diffs]
528
        :return: 1 if error, 0 if successful.
529
        """
530
        report = Report(self.urlwatcher)
8✔
531
        self.urlwatch_config.jobs_files = [Path('--test-differ')]  # for report footer
8✔
532
        if len(arg_test_differ) == 1:
8✔
533
            job_id = arg_test_differ[0]
8✔
534
            max_diffs = None
8✔
535
        elif len(arg_test_differ) == 2:
8!
536
            job_id, max_diffs_str = arg_test_differ
8✔
537
            max_diffs = int(max_diffs_str)
8✔
538
        else:
539
            raise ValueError('--test-differ takes a maximum of two arguments')
×
540

541
        job = self._find_job_with_defaults(job_id)
8✔
542

543
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
8✔
544

545
        num_snapshots = len(history_data)
8✔
546
        if num_snapshots == 0:
8✔
547
            print('This job has never been run before.')
8✔
548
            return 1
8✔
549
        elif num_snapshots < 2:
8✔
550
            print('Not enough historic data available (need at least 2 different snapshots).')
8✔
551
            return 1
8✔
552

553
        if job.compared_versions and job.compared_versions != 1:
8!
554
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
555

556
        max_diffs = max_diffs or num_snapshots - 1
8✔
557
        for i in range(max_diffs):
8✔
558
            with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
559
                job_state.new_data = history_data[i].data
8✔
560
                job_state.new_timestamp = history_data[i].timestamp
8✔
561
                job_state.new_etag = history_data[i].etag
8✔
562
                job_state.new_mime_type = history_data[i].mime_type
8✔
563
                if not job.compared_versions or job.compared_versions == 1:
8!
564
                    job_state.old_data = history_data[i + 1].data
8✔
565
                    job_state.old_timestamp = history_data[i + 1].timestamp
8✔
566
                    job_state.old_etag = history_data[i + 1].etag
8✔
567
                    job_state.old_mime_type = history_data[i + 1].mime_type
8✔
568
                else:
569
                    history_dic_snapshots = {s.data: s for s in history_data[i + 1 : i + 1 + job.compared_versions]}
×
570
                    close_matches: list[str] = difflib.get_close_matches(
×
571
                        str(job_state.new_data),
572
                        history_dic_snapshots.keys(),  # type: ignore[arg-type]
573
                        n=1,
574
                    )
575
                    if close_matches:
×
576
                        job_state.old_data = close_matches[0]
×
577
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
578
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
579
                        job_state.old_mime_type = history_dic_snapshots[close_matches[0]].mime_type
×
580

581
                if self.urlwatch_config.test_reporter is None:
8✔
582
                    self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
583
                report.job_states = []  # required
8✔
584
                if job_state.new_data == job_state.old_data:
8!
585
                    label = (
×
586
                        f'No change (snapshots {-i:2} vs. {-(i + 1):2}) with '
587
                        f"'compared_versions: {job.compared_versions}'"
588
                    )
589
                    job_state.verb = 'changed,no_report'
×
590
                else:
591
                    label = f'Filtered diff (snapshots {-i:2} vs. {-(i + 1):2})'
8✔
592
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
8✔
593
                if errorlevel:
8!
594
                    self._exit(errorlevel)
×
595

596
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
597
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
598

599
        return 0
8✔
600

601
    def dump_history(self, job_id: str) -> int:
8✔
602
        """
603
        Displays the historical data stored in the snapshot database for a job.
604

605
        :param job_id: The Job ID.
606
        :return: An argument to be used in sys.exit.
607
        """
608

609
        job = self._find_job_with_defaults(job_id)
8✔
610
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
8✔
611

612
        title = f'History for {job.get_indexed_location()}'
8✔
613
        print(f'{title}\nGUID: {job.guid}')
8✔
614
        if history_data:
8✔
615
            print('=' * max(len(title), 46))
8✔
616
        total_failed = 0
8✔
617
        for i, snapshot in enumerate(history_data):
8✔
618
            mime_type = f' | Media type: {snapshot.mime_type}' if snapshot.mime_type else ''
8✔
619
            etag = f' | ETag: {snapshot.etag}' if snapshot.etag else ''
8✔
620
            tries = f' | Error run (number {snapshot.tries})' if snapshot.tries else ''
8✔
621
            total_failed += snapshot.tries > 0
8✔
622
            tz = self.urlwatcher.report.config['report']['tz']
8✔
623
            tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
8✔
624
            dt = datetime.fromtimestamp(snapshot.timestamp, tz_info)
8✔
625
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{mime_type}{etag}{tries}'
8✔
626
            sep_len = max(50, len(header))
8✔
627
            print(header)
8✔
628
            print('-' * sep_len)
8✔
629
            if snapshot.error_data:
8!
630
                print(f'{snapshot.error_data.get("type")}: {snapshot.error_data.get("message")}')
×
631
                print()
×
632
                print('Last good data:')
×
633
            print(snapshot.data)
8✔
634
            print('=' * sep_len, '\n')
8✔
635

636
        print(
8✔
637
            f'Found {len(history_data) - total_failed}'
638
            + (' good' if total_failed else '')
639
            + ' snapshot'
640
            + ('s' if len(history_data) - total_failed != 1 else '')
641
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
642
            + '.'
643
        )
644

645
        return 0
8✔
646

647
    def list_error_jobs(self) -> int:
8✔
648
        from webchanges.reporters import ReporterBase
8✔
649

650
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
8✔
651
            print(f'Invalid reporter {self.urlwatch_config.errors}.')
8✔
652
            return 1
8✔
653

654
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Iterator[str]:
8✔
655
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
656

657
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
658
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
659
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
660
            """
661

662
            def job_runner(
8✔
663
                stack: ExitStack,
664
                jobs: Iterable[JobBase],
665
                max_workers: int | None = None,
666
            ) -> Iterator[str]:
667
                """
668
                Modified worker.job_runner that yields error text for jobs who fail with an exception or yield no data.
669

670
                :param stack: The context manager.
671
                :param jobs: The jobs to run.
672
                :param max_workers: The number of maximum workers for ThreadPoolExecutor.
673
                :return: error text for jobs who fail with an exception or yield no data.
674
                """
675
                executor = ThreadPoolExecutor(max_workers=max_workers)
8✔
676

677
                for job_state in executor.map(
8✔
678
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
679
                    (stack.enter_context(JobState(self.urlwatcher.ssdb_storage, job)) for job in jobs),
680
                ):
681
                    if not isinstance(job_state.exception, NotModifiedError):
8!
682
                        if job_state.exception is None:
8✔
683
                            if (
8!
684
                                len(job_state.new_data.strip()) == 0
685
                                if hasattr(job_state, 'new_data')
686
                                else len(job_state.old_data.strip()) == 0
687
                            ):
688
                                if self.urlwatch_config.verbose:
×
689
                                    yield f'{job_state.job.index_number:3}: No data: {job_state.job!r}'
×
690
                                else:
691
                                    pretty_name = job_state.job.pretty_name()
×
692
                                    location = job_state.job.get_location()
×
693
                                    if pretty_name != location:
×
694
                                        yield f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})'
×
695
                                    else:
696
                                        yield f'{job_state.job.index_number:3}: No data: {pretty_name}'
×
697
                        else:
698
                            pretty_name = job_state.job.pretty_name()
8✔
699
                            location = job_state.job.get_location()
8✔
700
                            if pretty_name != location:
8!
701
                                yield (
8✔
702
                                    f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
703
                                    f'({location})'
704
                                )
705
                            else:
706
                                yield f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})'
×
707

708
            with ExitStack() as stack:
8✔
709
                # This code is from worker.run_jobs, modified to yield from job_runner.
710
                from webchanges.worker import get_virt_mem  # avoid circular imports
8✔
711

712
                # run non-BrowserJob jobs first
713
                jobs_to_run = [job for job in jobs if not job.__is_browser__]
8✔
714
                if jobs_to_run:
8!
715
                    logger.debug(
8✔
716
                        "Running jobs that do not require Chrome (without 'use_browser: true') in parallel with "
717
                        "Python's default max_workers."
718
                    )
719
                    yield from job_runner(stack, jobs_to_run, self.urlwatch_config.max_workers)
8✔
720
                else:
721
                    logger.debug("Found no jobs that do not require Chrome (i.e. without 'use_browser: true').")
×
722

723
                # run BrowserJob jobs after
724
                jobs_to_run = [job for job in jobs if job.__is_browser__]
8✔
725
                if jobs_to_run:
8!
726
                    gc.collect()
×
727
                    virt_mem = get_virt_mem()
×
728
                    if self.urlwatch_config.max_workers:
×
729
                        max_workers = self.urlwatch_config.max_workers
×
730
                    else:
731
                        max_workers = max(int(virt_mem / 200e6), 1)
×
732
                        max_workers = min(max_workers, os.cpu_count() or 1)
×
733
                    logger.debug(
×
734
                        f"Running jobs that require Chrome (i.e. with 'use_browser: true') in parallel with "
735
                        f'{max_workers} max_workers.'
736
                    )
737
                    yield from job_runner(stack, jobs_to_run, max_workers)
×
738
                else:
739
                    logger.debug("Found no jobs that require Chrome (i.e. with 'use_browser: true').")
8✔
740

741
        start = time.perf_counter()
8✔
742

743
        # default max_workers (when not specified) to 1
744
        if self.urlwatch_config.max_workers is None:
8!
745
            self.urlwatch_config.max_workers = 1
8✔
746

747
        if len(self.urlwatch_config.jobs_files) == 1:
8!
748
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
8✔
749
        else:
750
            jobs_files = ['in the concatenation of the jobs files'] + [
×
751
                f'• {file},' for file in self.urlwatch_config.jobs_files
752
            ]
753
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)', *jobs_files])
8✔
754

755
        jobs = {
8✔
756
            job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs if job.is_enabled()
757
        }
758
        if self.urlwatch_config.errors == 'stdout':
8!
759
            print(header)
8✔
760
            for line in error_jobs_lines(jobs):
8✔
761
                print(line)
8✔
762
            print('--')
8✔
763
            duration = time.perf_counter() - start
8✔
764
            print(f'Checked {len(jobs)} enabled job{"s" if len(jobs) else ""} for errors in {dur_text(duration)}.')
8✔
765

766
        else:
767
            message = '\n'.join(error_jobs_lines(jobs))
×
768
            if message:
×
769
                # create a dummy job state to run a reporter on
770
                job_state = JobState(
×
771
                    None,  # type: ignore[arg-type]
772
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
773
                )
774
                job_state.traceback = f'{header}\n{message}'
×
775
                duration = time.perf_counter() - start
×
776
                self.urlwatcher.report.config['footnote'] = (
×
777
                    f'Checked {len(jobs)} job{"s" if len(jobs) else ""} for errors in {dur_text(duration)}.'
778
                )
779
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
780
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
781
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
782
                self.urlwatcher.report.error(job_state)
×
783
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
784
            else:
785
                print(header)
×
786
                print('--')
×
787
                duration = time.perf_counter() - start
×
788
                print('Found no errors.')
×
789
                print(f'Checked {len(jobs)} job{"s" if len(jobs) else ""} for errors in {dur_text(duration)}.')
×
790

791
        return 0
8✔
792

793
    def rollback_database(self, timespec: str) -> int:
8✔
794
        """Issues a warning, calls rollback() and prints out the result.
795

796
        :param timestamp: A timespec that if numeric is interpreted as a Unix timestamp otherwise it's passed to
797
          dateutil.parser (if datetime is installed) or datetime.fromisoformat to be converted into a date.
798

799
        :return: A sys.exit code (0 for succcess, 1 for failure)
800
        """
801

802
        def _convert_to_datetime(timespec: str, tz_info: ZoneInfo | tzinfo | None) -> datetime:
8✔
803
            """Converts inputted string to a datetime object, using dateutil if installed.
804

805
            :param timespec: The string.
806
            :param tz_info: The timezone.
807

808
            :return: The datetime object.
809
            """
810
            try:
8✔
811
                timestamp = float(timespec)
8✔
812
                return datetime.fromtimestamp(timestamp, tz_info)
8✔
813
            except ValueError:
8✔
814
                try:
8✔
815
                    from dateutil import parser as dateutil_parser
8✔
816

817
                    default_dt_with_tz = datetime.now(tz_info).replace(second=0, microsecond=0)
8✔
818
                    return dateutil_parser.parse(timespec, default=default_dt_with_tz)
8✔
819
                    # return dateutil_parser.parse(timespec)
820
                except ImportError:
821
                    dt = datetime.fromisoformat(timespec)
822
                    if not dt.tzinfo:
823
                        dt = dt.replace(tzinfo=tz_info)
824
                    return dt
825

826
        tz = self.urlwatcher.report.config['report']['tz']
8✔
827
        tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
8✔
828
        dt = _convert_to_datetime(timespec, tz_info)
8✔
829
        timestamp_date = email.utils.format_datetime(dt)
8✔
830
        count = self.urlwatcher.ssdb_storage.rollback(dt.timestamp())
8✔
831
        print(f'Rolling back database to {timestamp_date}.')
8✔
832
        if sys.__stdin__ and sys.__stdin__.isatty():
8!
833
            print(
×
834
                f'WARNING: All {count} snapshots after this date/time (check timezone) will be deleted.\n'
835
                f'         ☠  This operation cannot be undone!\n'
836
                f'         We suggest you make a backup of the database file before proceeding:\n'
837
                f'         {self.urlwatch_config.ssdb_file}'
838
            )
839
            resp = input("         Please enter 'Y' to proceed: ")
×
840
            if not resp.upper().startswith('Y'):
×
841
                print('Quitting rollback. No snapshots have been deleted.')
×
842
                return 1
×
843
        count = self.urlwatcher.ssdb_storage.rollback(dt.timestamp())
8✔
844
        if count:
8!
845
            print(f'Deleted {count} snapshots taken after {timestamp_date}.')
×
846
            self.urlwatcher.ssdb_storage.close()
×
847
        else:
848
            print(f'No snapshots found after {timestamp_date}')
8✔
849
        return 0
8✔
850

851
    def delete_snapshot(self, job_id: str | int) -> int:
8✔
852
        job = self._find_job_with_defaults(job_id)
8✔
853
        history = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
8✔
854
        if not history:
8✔
855
            print(f'No snapshots found for {job.get_indexed_location()}.')
8✔
856
            return 1
8✔
857
        tz = self.urlwatcher.report.config['report']['tz']
8✔
858
        tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
8✔
859
        if sys.__stdin__ and sys.__stdin__.isatty():
8!
860
            print(f'WARNING: About to delete the latest snapshot of\n         {job.get_indexed_location()}:')
×
861
            for i, history_job in enumerate(history):
×
862
                print(
×
863
                    f'         {i + 1}. {"❌ " if i == 0 else "   "}'
864
                    f'{email.utils.format_datetime(datetime.fromtimestamp(history_job.timestamp).astimezone(tz_info))}'
865
                    f'{"  ⬅  ABOUT TO BE DELETED!" if i == 0 else ""}'
866
                )
867
            print(
×
868
                f'         ☠  This operation cannot be undone!\n'
869
                f'         We suggest you make a backup of the database file before proceeding:\n'
870
                f'         {self.urlwatch_config.ssdb_file}'
871
            )
872
            resp = input("         Please enter 'Y' to proceed: ")
×
873
            if not resp.upper().startswith('Y'):
×
874
                print('Quitting. No snapshots have been deleted.')
×
875
                return 1
×
876
        count = self.urlwatcher.ssdb_storage.delete_latest(job.guid)
8✔
877
        if count:
8!
878
            print(f'Deleted last snapshot of {job.get_indexed_location()}; {len(history) - 1} snapshots left.')
8✔
879
            return 0
8✔
880
        else:
881
            print(f'No snapshots found for {job.get_indexed_location()}.')
×
882
            return 1
×
883

884
    def modify_urls(self) -> int:
8✔
885
        if self.urlwatch_config.delete is not None:
8✔
886
            job = self._find_job(self.urlwatch_config.delete)
8✔
887
            if job is not None:
8!
888
                if sys.__stdin__ and sys.__stdin__.isatty():
8!
889
                    print(
×
890
                        f'WARNING: About to permanently delete {job.get_indexed_location()}.\n'
891
                        '         Job file will be overwritten and all remarks lost.'
892
                        '         This operation cannot be undone!\n'
893
                    )
894
                    resp = input("         Please enter 'Y' to proceed: ")
×
895
                    if not resp.upper().startswith('Y'):
×
896
                        print(f'Quitting. Job {job.index_number} has not been deleted and job file is unmodified.')
×
897
                        return 1
×
898
                self.urlwatcher.jobs.remove(job)
8✔
899
                print(f'Removed {job}.')
8✔
900
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
901
            else:
902
                print(f'Job not found: {self.urlwatch_config.delete}.')
×
903
                return 1
×
904

905
        if self.urlwatch_config.add is not None:
8✔
906
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
907
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
8✔
908
            filters = [v for k, v in items if k == 'filter']
8✔
909
            items2 = [(k, v) for k, v in items if k != 'filter']
8✔
910
            d = {k: v for k, v in items2}
8✔
911
            if filters:
8!
912
                d['filter'] = ','.join(filters)
×
913

914
            job = JobBase.unserialize(d)
8✔
915
            print(f'Adding {job}.')
8✔
916
            self.urlwatcher.jobs.append(job)
8✔
917
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
918

919
        if self.urlwatch_config.change_location is not None:
8✔
920
            new_loc = self.urlwatch_config.change_location[1]
8✔
921
            # Ensure the user isn't overwriting an existing job with the change.
922
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
8!
923
                print(
×
924
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
925
                    f'different value.\n'
926
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
927
                )
928
                return 1
×
929
            else:
930
                job = self._find_job(self.urlwatch_config.change_location[0])
8✔
931
                if job is not None:
8!
932
                    # Update the job's location (which will also update the guid) and move any history in the database
933
                    # over to the job's updated guid.
934
                    old_loc = job.get_location()
8✔
935
                    print(f'Moving location of "{old_loc}" to "{new_loc}".')
8✔
936
                    old_guid = job.guid
8✔
937
                    if old_guid not in self.urlwatcher.ssdb_storage.get_guids():
8✔
938
                        print(f'No snapshots found for "{old_loc}".')
8✔
939
                        return 1
8✔
940
                    job.set_base_location(new_loc)
8✔
941
                    num_searched = self.urlwatcher.ssdb_storage.move(old_guid, job.guid)
8✔
942
                    if num_searched:
8!
943
                        print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}".')
8✔
944
                else:
945
                    print(f'Job not found: "{self.urlwatch_config.change_location[0]}".')
×
946
                    return 1
×
947
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
8✔
948
            if not input(message).lower().startswith('y'):
8!
949
                print(f'Please manually update the jobs file by replacing "{old_loc}" with "{new_loc}".')
×
950
            else:
951
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
952

953
        return 0
8✔
954

955
    def edit_config(self) -> int:
8✔
956
        result = self.urlwatcher.config_storage.edit()
8✔
957
        return result
8✔
958

959
    def check_telegram_chats(self) -> None:
8✔
960
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
8✔
961

962
        bot_token = config['bot_token']
8✔
963
        if not bot_token:
8✔
964
            print('You need to set up your bot token first (see documentation).')
8✔
965
            self._exit(1)
8✔
966

967
        if httpx:
8!
968
            get_client = httpx.Client(http2=h2 is not None).get
8✔
969
        else:
970
            get_client = requests.get  # type: ignore[assignment]
×
971

972
        info = get_client(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
8✔
973
        if not info['ok']:
8!
974
            print(f'Error with token {bot_token}: {info["description"]}.')
8✔
975
            self._exit(1)
8✔
976

977
        chats = {}
×
978
        updates = get_client(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
979
        if 'result' in updates:
×
980
            for chat_info in updates['result']:
×
981
                chat = chat_info['message']['chat']
×
982
                if chat['type'] == 'private':
×
983
                    chats[chat['id']] = (
×
984
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
985
                    )
986

987
        if not chats:
×
988
            print(f'No chats found. Say hello to your bot at https://t.me/{info["result"]["username"]}.')
×
989
            self._exit(1)
×
990

991
        headers = ('Chat ID', 'Name')
×
992
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
993
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
994
        fmt = f'%-{maxchat}s  %s'
×
995
        print(fmt % headers)
×
996
        print(fmt % ('-' * maxchat, '-' * maxname))
×
997
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
998
            print(fmt % (k, v))
×
999
        print(f'\nChat up your bot here: https://t.me/{info["result"]["username"]}.')
×
1000

1001
        self._exit(0)
×
1002

1003
    def check_test_reporter(
8✔
1004
        self,
1005
        job_state: JobState | None = None,
1006
        label: str = 'test',
1007
        report: Report | None = None,
1008
    ) -> int:
1009
        """
1010
        Tests a reporter by creating pseudo-jobs of new, changed, unchanged, and error outcomes ('verb').
1011

1012
        Note: The report will only show new, unchanged and error content if enabled in the respective `display` keys
1013
        of the configuration file.
1014

1015
        :param job_state: The JobState (Optional).
1016
        :param label: The label to be used in the report; defaults to 'test'.
1017
        :param report: A Report class to use for testing (Optional).
1018
        :return: 0 if successful, 1 otherwise.
1019
        """
1020
        from webchanges.reporters import ReporterBase
8✔
1021

1022
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
8✔
1023
            """Builds a pseudo-job for the reporter to run on."""
1024
            job = JobBase.unserialize({'name': job_name, 'url': url})
8✔
1025

1026
            # Can pass in None for ssdb_storage, as we are not going to load or save the job state for
1027
            # testing; also no need to use it as context manager, since no processing is called on the job
1028
            job_state = JobState(None, job)  # type: ignore[arg-type]
8✔
1029

1030
            job_state.old_data = old
8✔
1031
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
8✔
1032
            job_state.new_data = new
8✔
1033
            job_state.new_timestamp = time.time()
8✔
1034

1035
            return job_state
8✔
1036

1037
        def set_error(job_state: 'JobState', message: str) -> JobState:
8✔
1038
            """Sets a job error message on a JobState."""
1039
            try:
8✔
1040
                raise ValueError(message)
8✔
1041
            except ValueError as e:
8✔
1042
                job_state.exception = e
8✔
1043
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
1044

1045
            return job_state
8✔
1046

1047
        reporter_name = self.urlwatch_config.test_reporter
8✔
1048
        if reporter_name not in ReporterBase.__subclasses__:
8✔
1049
            print(
8✔
1050
                f'No such reporter: {reporter_name}.\n'
1051
                f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}.\n'
1052
            )
1053
            return 1
8✔
1054

1055
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][
8✔
1056
            reporter_name  # type: ignore[literal-required]
1057
        ]
1058
        if job_state:  # we want a full report
8✔
1059
            cfg['enabled'] = True
8✔
1060
            self.urlwatcher.config_storage.config['display'][label] = True  # type: ignore[literal-required]
8✔
1061
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
8✔
1062
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
8✔
1063
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
8✔
1064
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
8✔
1065
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
8✔
1066
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
8✔
1067
            self.urlwatcher.config_storage.config['report']['stdout']['color'] = False
8✔
1068
        elif not cfg['enabled']:
8✔
1069
            print(
8✔
1070
                f'WARNING: Reporter being tested is not enabled: {reporter_name}.\n'
1071
                f'Will still attempt to test it, but this may not work.\n'
1072
                f'Use {__project_name__} --edit-config to configure reporters.'
1073
            )
1074
            cfg['enabled'] = True
8✔
1075

1076
        if report is None:
8✔
1077
            report = Report(self.urlwatcher)
8✔
1078

1079
        if job_state:
8✔
1080
            report.custom(job_state, label)  # type: ignore[arg-type]
8✔
1081
        else:
1082
            report.new(
8✔
1083
                build_job(
1084
                    'Sample job that was newly added',
1085
                    'https://example.com/new',
1086
                    '',
1087
                    '',
1088
                )
1089
            )
1090
            report.changed(
8✔
1091
                build_job(
1092
                    'Sample job where something changed',
1093
                    'https://example.com/changed',
1094
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
1095
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
1096
                )
1097
            )
1098
            report.unchanged(
8✔
1099
                build_job(
1100
                    'Sample job where nothing changed',
1101
                    'http://example.com/unchanged',
1102
                    'Same Old, Same Old\n',
1103
                    'Same Old, Same Old\n',
1104
                )
1105
            )
1106
            report.error(
8✔
1107
                set_error(
1108
                    build_job(
1109
                        'Sample job where an error was encountered',
1110
                        'https://example.com/error',
1111
                        '',
1112
                        '',
1113
                    ),
1114
                    'The error message would appear here.',
1115
                )
1116
            )
1117

1118
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
8✔
1119

1120
        return 0
8✔
1121

1122
    def check_smtp_login(self) -> None:
8✔
1123
        from webchanges.mailer import SMTPMailer, smtp_have_password, smtp_set_password
8✔
1124

1125
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
8✔
1126
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
8✔
1127

1128
        success = True
8✔
1129

1130
        if not config['enabled']:
8✔
1131
            print('Please enable email reporting in the config first.')
8✔
1132
            success = False
8✔
1133

1134
        if config['method'] != 'smtp':
8✔
1135
            print('Please set the method to SMTP for the email reporter.')
8✔
1136
            success = False
8✔
1137

1138
        smtp_auth = smtp_config['auth']
8✔
1139
        if not smtp_auth:
8✔
1140
            print('Authentication must be enabled for SMTP.')
8✔
1141
            success = False
8✔
1142

1143
        smtp_hostname = smtp_config['host']
8✔
1144
        if not smtp_hostname:
8✔
1145
            print('Please configure the SMTP hostname in the config first.')
8✔
1146
            success = False
8✔
1147

1148
        smtp_username = smtp_config['user'] or config['from']
8✔
1149
        if not smtp_username:
8✔
1150
            print('Please configure the SMTP user in the config first.')
8✔
1151
            success = False
8✔
1152

1153
        if not success:
8✔
1154
            self._exit(1)
8✔
1155

1156
        insecure_password = smtp_config['insecure_password']
2✔
1157
        if insecure_password:
2!
1158
            print('The SMTP password is set in the config file (key "insecure_password").')
2✔
1159
        elif smtp_have_password(smtp_hostname, smtp_username):
×
1160
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
1161
            if not input(message).lower().startswith('y'):
×
1162
                print('Password unchanged.')
×
1163
            else:
1164
                smtp_set_password(smtp_hostname, smtp_username)
×
1165

1166
        smtp_port = smtp_config['port']
2✔
1167
        smtp_tls = smtp_config['starttls']
2✔
1168

1169
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
2✔
1170
        print('Trying to log into the SMTP server...')
2✔
1171
        mailer.send(None)
2✔
1172
        print('Successfully logged into SMTP server.')
×
1173

1174
        self._exit(0)
×
1175

1176
    def check_xmpp_login(self) -> None:
8✔
1177
        from webchanges.reporters import xmpp_have_password, xmpp_set_password
8✔
1178

1179
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
8✔
1180

1181
        success = True
8✔
1182

1183
        if not xmpp_config['enabled']:
8✔
1184
            print('Please enable XMPP reporting in the config first.')
8✔
1185
            success = False
8✔
1186

1187
        xmpp_sender = xmpp_config['sender']
8✔
1188
        if not xmpp_sender:
8✔
1189
            print('Please configure the XMPP sender in the config first.')
8✔
1190
            success = False
8✔
1191

1192
        if not xmpp_config['recipient']:
8✔
1193
            print('Please configure the XMPP recipient in the config first.')
8✔
1194
            success = False
8✔
1195

1196
        if not success:
8✔
1197
            self._exit(1)
8✔
1198

1199
        if 'insecure_password' in xmpp_config:
8!
1200
            print('The XMPP password is already set in the config (key "insecure_password").')
8✔
1201
            self._exit(0)
8✔
1202

1203
        if xmpp_have_password(xmpp_sender):
×
1204
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
1205
            if input(message).lower() != 'y':
×
1206
                print('Password unchanged.')
×
1207
                self._exit(0)
×
1208

1209
        if success:
×
1210
            xmpp_set_password(xmpp_sender)
×
1211

1212
        self._exit(0)
×
1213

1214
    @staticmethod
1215
    def playwright_install_chrome() -> int:  # pragma: no cover
1216
        """
1217
        Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
1218
        install the browser executable.
1219

1220
        :return: Playwright's executable return code.
1221
        """
1222
        try:
1223
            from playwright._impl._driver import compute_driver_executable
1224
        except ImportError:  # pragma: no cover
1225
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
1226

1227
        driver_executable = compute_driver_executable()
1228
        env = os.environ.copy()
1229
        env['PW_CLI_TARGET_LANG'] = 'python'
1230
        cmd = [str(driver_executable), 'install', 'chrome']
1231
        logger.info(f'Running playwright CLI: {" ".join(cmd)}')
1232
        completed_process = subprocess.run(cmd, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
1233
        if completed_process.returncode:
1234
            print(completed_process.stderr)
1235
            return completed_process.returncode
1236
        if completed_process.stdout:
1237
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1238
        return 0
1239

1240
    def handle_actions(self) -> None:
8✔
1241
        """Handles the actions for command line arguments and exits."""
1242
        if self.urlwatch_config.list_jobs:
8✔
1243
            self.list_jobs(self.urlwatch_config.list_jobs)
8✔
1244
            self._exit(0)
8✔
1245

1246
        if self.urlwatch_config.errors:
8✔
1247
            self._exit(self.list_error_jobs())
8✔
1248

1249
        if self.urlwatch_config.test_job:
8✔
1250
            self.test_job(self.urlwatch_config.test_job)
8✔
1251
            self._exit(0)
8✔
1252

1253
        if self.urlwatch_config.prepare_jobs:
8✔
1254
            self.prepare_jobs()
8✔
1255
            self._exit(0)
8✔
1256

1257
        if self.urlwatch_config.test_differ:
8✔
1258
            self._exit(self.test_differ(self.urlwatch_config.test_differ))
8✔
1259

1260
        if self.urlwatch_config.dump_history:
8✔
1261
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
8✔
1262

1263
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
8✔
1264
            self._exit(self.modify_urls())
8✔
1265

1266
        if self.urlwatch_config.test_reporter:
8✔
1267
            self._exit(self.check_test_reporter())
8✔
1268

1269
        if self.urlwatch_config.smtp_login:
8✔
1270
            self.check_smtp_login()
8✔
1271

1272
        if self.urlwatch_config.telegram_chats:
8✔
1273
            self.check_telegram_chats()
8✔
1274

1275
        if self.urlwatch_config.xmpp_login:
8✔
1276
            self.check_xmpp_login()
8✔
1277

1278
        if self.urlwatch_config.edit:
8✔
1279
            self._exit(self.urlwatcher.jobs_storage.edit())
8✔
1280

1281
        if self.urlwatch_config.edit_config:
8✔
1282
            self._exit(self.edit_config())
8✔
1283

1284
        if self.urlwatch_config.edit_hooks:
8✔
1285
            self._exit(self.edit_hooks())
8✔
1286

1287
        if self.urlwatch_config.gc_database:
8✔
1288
            self.urlwatcher.ssdb_storage.gc(
8✔
1289
                [job.guid for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1290
            )
1291
            self.urlwatcher.ssdb_storage.close()
8✔
1292
            self._exit(0)
8✔
1293

1294
        if self.urlwatch_config.clean_database:
8✔
1295
            self.urlwatcher.ssdb_storage.clean_ssdb(
8✔
1296
                [job.guid for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1297
            )
1298
            self.urlwatcher.ssdb_storage.close()
8✔
1299
            self._exit(0)
8✔
1300

1301
        if self.urlwatch_config.rollback_database:
8✔
1302
            exit_arg = self.rollback_database(self.urlwatch_config.rollback_database)
8✔
1303
            self.urlwatcher.ssdb_storage.close()
8✔
1304
            self._exit(exit_arg)
8✔
1305

1306
        if self.urlwatch_config.delete_snapshot:
8✔
1307
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
8✔
1308

1309
        if self.urlwatch_config.features:
8✔
1310
            self._exit(self.show_features())
8✔
1311

1312
        if self.urlwatch_config.detailed_versions:
8!
1313
            self._exit(self.show_detailed_versions())
8✔
1314

1315
    def run(self) -> None:  # pragma: no cover
1316
        """The main run logic."""
1317
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1318
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1319

1320
        self.handle_actions()
1321

1322
        self.urlwatcher.run_jobs()
1323

1324
        self.urlwatcher.close()
1325

1326
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc