• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 21545658727

31 Jan 2026 02:04PM UTC coverage: 73.318% (-0.3%) from 73.637%
21545658727

push

github

mborsetti
Version 3.33.0

1404 of 2258 branches covered (62.18%)

Branch coverage included in aggregate %.

1 of 9 new or added lines in 2 files covered. (11.11%)

792 existing lines in 7 files now uncovered.

4710 of 6081 relevant lines covered (77.45%)

11.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.34
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE.md file, which is part of the source code.
4

5
from __future__ import annotations
15✔
6

7
import difflib
15✔
8
import email.utils
15✔
9
import gc
15✔
10
import importlib.metadata
15✔
11
import logging
15✔
12
import os
15✔
13
import platform
15✔
14
import re
15✔
15
import shutil
15✔
16
import sqlite3
15✔
17
import subprocess
15✔
18
import sys
15✔
19
import time
15✔
20
import traceback
15✔
21
from concurrent.futures import ThreadPoolExecutor
15✔
22
from contextlib import ExitStack
15✔
23
from datetime import datetime, tzinfo
15✔
24
from pathlib import Path
15✔
25
from typing import TYPE_CHECKING, Iterable, Iterator
15✔
26
from urllib.parse import unquote_plus
15✔
27
from zoneinfo import ZoneInfo
15✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
15✔
30
from webchanges.handler import JobState, Report
15✔
31
from webchanges.jobs import JobBase, NotModifiedError, UrlJob
15✔
32
from webchanges.util import dur_text, edit_file, import_module_from_source
15✔
33

34
try:
15✔
35
    import httpx
15✔
36
except ImportError:  # pragma: no cover
37
    httpx = None  # type: ignore[assignment]
38
    print("Required package 'httpx' not found; will attempt to run using 'requests'.")
39
    try:
40
        import requests
41
    except ImportError as e:  # pragma: no cover
42
        raise RuntimeError(
43
            f"A Python HTTP client package (either 'httpx' or 'requests' is required to run {__project_name__}; "
44
            'neither can be imported.'
45
        ) from e
46
if httpx is not None:
15✔
47
    try:
15✔
48
        import h2
15✔
49
    except ImportError:  # pragma: no cover
50
        h2 = None  # type: ignore[assignment]
51

52
if TYPE_CHECKING:
53
    from webchanges.main import Urlwatch
54
    from webchanges.reporters import _ConfigReportersList
55
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
56

57
logger = logging.getLogger(__name__)
15✔
58

59

60
class UrlwatchCommand:
15✔
61
    """The class that runs the program after initialization and CLI arguments parsing."""
62

63
    def __init__(self, urlwatcher: Urlwatch) -> None:
15✔
64
        self.urlwatcher = urlwatcher
15✔
65
        self.urlwatch_config = urlwatcher.urlwatch_config
15✔
66

67
    @staticmethod
15✔
68
    def _exit(arg: str | int | None) -> None:
15✔
69
        logger.info(f'Exiting with exit code {arg}')
15✔
70
        sys.exit(arg)
15✔
71

72
    def jobs_from_joblist(self) -> Iterator[JobBase]:
15✔
73
        """Generates the jobs to process from the joblist entered in the CLI."""
74
        if self.urlwatcher.urlwatch_config.joblist:
15✔
75
            jobs = {self._find_job(job_entry) for job_entry in self.urlwatcher.urlwatch_config.joblist}
15✔
76
            enabled_jobs = {job for job in jobs if job.is_enabled()}
15✔
77
            disabled = len(enabled_jobs) - len(jobs)
15✔
78
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
15✔
79
            logger.debug(
15✔
80
                f'Processing {len(enabled_jobs)} job{"s" if enabled_jobs else ""}{disabled_str} as specified in '
81
                f'command line: {", ".join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}'
82
            )
83
        else:
84
            enabled_jobs = {job for job in self.urlwatcher.jobs if job.is_enabled()}
15✔
85
            disabled = len(enabled_jobs) - len(self.urlwatcher.jobs)
15✔
86
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
15✔
87
            logger.debug(f'Processing {len(enabled_jobs)} job{"s" if enabled_jobs else ""}{disabled_str}')
15✔
88
        for job in enabled_jobs:
15✔
89
            yield job.with_defaults(self.urlwatcher.config_storage.config)
15✔
90

91
    def edit_hooks(self) -> int:
15✔
92
        """Edit hooks file.
93

94
        :returns: 0 if edit is successful, 1 otherwise.
95
        """
96
        # Similar code to BaseTextualFileStorage.edit()
97
        for hooks_file in self.urlwatch_config.hooks_files:
15✔
98
            logger.debug(f'Edit file {hooks_file}')
15✔
99
            hooks_edit = hooks_file.with_stem(hooks_file.stem + '_edit')
15✔
100
            if hooks_file.exists():
15!
101
                shutil.copy(hooks_file, hooks_edit)
15✔
102
            # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
103
            #         self.urlwatch_config.hooks_py_example):
104
            #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
105

106
            while True:
15✔
107
                try:
15✔
108
                    edit_file(hooks_edit)
15✔
109
                    import_module_from_source('hooks', hooks_edit)
15✔
110
                    break  # stop if no exception on parser
15✔
111
                except SystemExit:
15✔
112
                    raise
×
113
                except Exception as e:  # noqa: BLE001 Do not catch blind exception: `Exception`
15✔
114
                    print('Parsing failed:')
15✔
115
                    print('======')
15✔
116
                    print(e)
15✔
117
                    print('======')
15✔
118
                    print()
15✔
119
                    print(f'The file {hooks_file} was NOT updated.')
15✔
120
                    user_input = input('Do you want to retry the same edit? (Y/n)')
15✔
121
                    if not user_input or user_input.lower()[0] == 'y':
×
122
                        continue
×
123
                    hooks_edit.unlink()
×
124
                    print('No changes have been saved.')
×
125
                    return 1
×
126

127
            if hooks_file.is_symlink():
15!
128
                hooks_file.write_text(hooks_edit.read_text())
×
129
            else:
130
                hooks_edit.replace(hooks_file)
15✔
131
            hooks_edit.unlink(missing_ok=True)
15✔
132
            print(f'Saved edits in {hooks_file}.')
15✔
133

134
        return 0
15✔
135

136
    @staticmethod
15✔
137
    def show_features() -> int:
15✔
138
        """Prints the "features", i.e. a list of job types, filters and reporters.
139

140
        :return: 0.
141
        """
142
        from webchanges.differs import DifferBase
15✔
143
        from webchanges.filters import FilterBase
15✔
144
        from webchanges.reporters import ReporterBase
15✔
145

146
        print(f'Please see full documentation at {__docs_url__}.')
15✔
147
        print()
15✔
148
        print('Supported jobs:\n')
15✔
149
        print(JobBase.job_documentation())
15✔
150
        print('Supported filters:\n')
15✔
151
        print(FilterBase.filter_documentation())
15✔
152
        print()
15✔
153
        print('Supported differs:\n')
15✔
154
        print(DifferBase.differ_documentation())
15✔
155
        print()
15✔
156
        print('Supported reporters:\n')
15✔
157
        print(ReporterBase.reporter_documentation())
15✔
158
        print()
15✔
159
        print(f'Please see full documentation at {__docs_url__}.')
15✔
160

161
        return 0
15✔
162

163
    @staticmethod
15✔
164
    def show_detailed_versions() -> int:
15✔
165
        """Prints the detailed versions, including of dependencies.
166

167
        :return: 0.
168
        """
169

170
        def dependencies() -> list[str]:
15✔
171
            try:
15✔
172
                from pip._internal.metadata import get_default_environment
15✔
173

174
                env = get_default_environment()
15✔
175
                dist = None
15✔
176
                for dist in env.iter_all_distributions():
15✔
177
                    if dist.canonical_name == __project_name__:
15!
178
                        break
×
179
                if dist and dist.canonical_name == __project_name__:
15!
180
                    requires_dist = dist.metadata_dict.get('requires_dist', [])
×
181
                    dependencies = [re.split('[ <>=;#^[]', d)[0] for d in requires_dist]
×
182
                    dependencies.extend(('packaging', 'simplejson'))
×
183
                    return sorted(dependencies, key=str.lower)
×
184
            except ImportError:
185
                pass
186

187
            # default list of all possible dependencies
188
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
15✔
189
            return [
15✔
190
                'aioxmpp',
191
                'beautifulsoup4',
192
                'chump',
193
                'colorama',
194
                'cryptography',
195
                'cssbeautifier',
196
                'cssselect',
197
                'deepdiff',
198
                'h2',
199
                'html2text',
200
                'httpx',
201
                'jq',
202
                'jsbeautifier',
203
                'keyring',
204
                'lxml',
205
                'markdown2',
206
                'matrix_client',
207
                'msgpack',
208
                'packaging',
209
                'pdftotext',
210
                'Pillow',
211
                'platformdirs',
212
                'playwright',
213
                'psutil',
214
                'pushbullet.py',
215
                'pypdf',
216
                'pytesseract',
217
                'pyyaml',
218
                'redis',
219
                'requests',
220
                'simplejson',
221
                'tzdata',
222
                'vobject',
223
            ]
224

225
        print('Software:')
15✔
226
        print(f'• {__project_name__}: {__version__}')
15✔
227
        print(
15✔
228
            f'• {platform.python_implementation()}: {platform.python_version()} '
229
            f'{platform.python_build()} {platform.python_compiler()}'
230
        )
231
        print(f'• SQLite: {sqlite3.sqlite_version}')
15✔
232

233
        try:
15✔
234
            import psutil
15✔
235
            from psutil._common import bytes2human
15✔
236

237
            print()
15✔
238
            print('System:')
15✔
239
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
15✔
240
            print(f'• Processor: {platform.processor()}')
15✔
241
            print(f'• CPUs (logical): {psutil.cpu_count()}')
15✔
242
            try:
15✔
243
                virt_mem = psutil.virtual_memory().available
15✔
244
                print(
15✔
245
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
246
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
247
                )
248
            except psutil.Error as e:  # pragma: no cover
249
                print(f'• Free memory: Could not read information: {e}')
250
            print(
15✔
251
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
252
                f'({100 - psutil.disk_usage("/").percent:.1f}%)'
253
            )
254
            executor = ThreadPoolExecutor()
15✔
255
            print(f'• --max-threads default: {executor._max_workers}')
15✔
256
        except ImportError:
257
            pass
258

259
        print()
15✔
260
        print('Relevant PyPi packages:')
15✔
261
        for module_name in dependencies():
15✔
262
            try:
15✔
263
                mod = importlib.metadata.distribution(module_name)
15✔
264
            except ModuleNotFoundError:
15✔
265
                continue
15✔
266
            print(f'• {module_name}: {mod.version}')
15✔
267
            # package requirements
268
            if mod.requires:
15✔
269
                for req_name in [i.split()[0] for i in mod.requires]:
15✔
270
                    try:
15✔
271
                        req = importlib.metadata.distribution(req_name)
15✔
272
                    except ModuleNotFoundError:
15✔
273
                        continue
15✔
274
                    print(f'  - {req_name}: {req.version}')
15✔
275

276
        # playwright
277
        try:
15✔
278
            from playwright.sync_api import Error as PlaywrightError
15✔
279
            from playwright.sync_api import sync_playwright
12✔
280

281
            with sync_playwright() as p:
12✔
282
                try:
3✔
283
                    print()
3✔
284
                    print('Playwright browser:')
3✔
285
                    browser = p.chromium.launch(channel='chrome')
3✔
286
                    print(f'• Name: {browser.browser_type.name}')
3✔
287
                    print(f'• Version: {browser.version}')
3✔
288
                    print(f'• Executable: {browser.browser_type.executable_path}')
3✔
289
                    if psutil:
3!
290
                        browser.new_page()
3✔
291
                        try:
3✔
292
                            virt_mem = psutil.virtual_memory().available
3✔
293
                            print(
3✔
294
                                f'• Free memory with browser loaded: '
295
                                f'{bytes2human(virt_mem)} physical plus '
296
                                f'{bytes2human(psutil.swap_memory().free)} swap'
297
                            )
298
                        except psutil.Error:
×
299
                            pass
×
300
                except PlaywrightError as e:
×
301
                    print()
×
302
                    print('Playwright browser:')
×
UNCOV
303
                    print(f'• Error: {e}')
×
304
        except ImportError:
305
            pass
306

307
        if os.name == 'posix':
15✔
308
            print()
10✔
309
            print('Installed dpkg dependencies:')
10✔
310
            try:
10✔
311
                import apt  # ty:ignore[unresolved-import]
10✔
312

313
                apt_cache = apt.Cache()
×
314

315
                def print_version(libs: list[str]) -> None:
×
316
                    for lib in libs:
×
UNCOV
317
                        if lib in apt_cache and apt_cache[lib].versions:
×
318
                            ver = apt_cache[lib].versions
×
319
                            print(f'   - {ver[0].package}: {ver[0].version}')
×
320

321
                installed_packages = {dist.metadata['Name'] for dist in importlib.metadata.distributions()}
×
UNCOV
322
                for module, apt_dists in (
×
323
                    ('jq', ['jq']),
324
                    # https://github.com/jalan/pdftotext#os-dependencies
325
                    ('pdftotext', ['libpoppler-cpp-dev']),
326
                    # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
327
                    (
328
                        'Pillow',
329
                        [
330
                            'libjpeg-dev',
331
                            'zlib-dev',
332
                            'zlib1g-dev',
333
                            'libtiff-dev',
334
                            'libfreetype-dev',
335
                            'littlecms-dev',
336
                            'libwebp-dev',
337
                            'tcl/tk-dev',
338
                            'openjpeg-dev',
339
                            'libimagequant-dev',
340
                            'libraqm-dev',
341
                            'libxcb-dev',
342
                            'libxcb1-dev',
343
                        ],
344
                    ),
345
                    ('playwright', ['google-chrome-stable']),
346
                    # https://tesseract-ocr.github.io/tessdoc/Installation.html
347
                    ('pytesseract', ['tesseract-ocr']),
348
                ):
349
                    if module in installed_packages:
×
350
                        importlib.metadata.distribution(module)
×
351
                        print(f'• {module}')
×
UNCOV
352
                        print_version(apt_dists)
×
353
            except ImportError:
354
                print('Dependencies cannot be printed as python3-apt is not installed.')
355
                print("Run 'sudo apt-get install python3-apt' to install.")
356
        print()
15✔
357
        return 0
15✔
358

359
    def list_jobs(self, regex: bool | str) -> None:
15✔
360
        """Lists the job and their respective _index_number.
361

362
        :return: None.
363
        """
364
        if isinstance(regex, str):
15!
UNCOV
365
            print(f"List of jobs matching the RegEx '{regex}':")
×
366
        else:
367
            print('List of jobs:')
15✔
368
        for job in self.urlwatcher.jobs:
15✔
369
            if self.urlwatch_config.verbose:
15✔
370
                job_desc = f'{job.index_number:3}: {job!r}'
15✔
371
            else:
372
                pretty_name = job.pretty_name()
15✔
373
                location = job.get_location()
15✔
374
                if pretty_name != location:
15!
375
                    job_desc = f'{job.index_number:3}: {pretty_name} ({location})'
15✔
376
                else:
UNCOV
377
                    job_desc = f'{job.index_number:3}: {pretty_name}'
×
378
            if isinstance(regex, bool) or re.findall(regex, job_desc):
15!
379
                print(job_desc)
15✔
380

381
        if len(self.urlwatch_config.jobs_files) > 1:
15✔
382
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
15✔
383
        elif len(self.urlwatch_config.jobs_files) == 1:
15✔
384
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
15✔
385
        else:
386
            jobs_files = []
15✔
387
        print('\n   '.join(jobs_files))
15✔
388

389
    def _find_job(self, query: str | int) -> JobBase:
15✔
390
        """Finds the job based on a query.
391

392
        It is matched to the job index (also negative) or a job location (i.e. the url/user_visible_url or command).
393

394
        :param query: The query.
395
        :return: The matching JobBase.
396
        :raises IndexError: If job is not found.
397
        """
398
        if isinstance(query, int):
15✔
399
            index = query
15✔
400
        else:
401
            try:
15✔
402
                index = int(query)
15✔
403
            except ValueError:
15✔
404
                query = unquote_plus(query)
15✔
405
                try:
15✔
406
                    return next((job for job in self.urlwatcher.jobs if unquote_plus(job.get_location()) == query))
15✔
407
                except StopIteration:
15✔
408
                    raise ValueError(f"Job {query} does not match any job's url/user_visible_url or command.") from None
15✔
409

410
        if index == 0:
15✔
411
            raise ValueError(f'Job index {index} out of range.')
15✔
412
        try:
15✔
413
            if index <= 0:
15✔
414
                return self.urlwatcher.jobs[index]
15✔
415
            return self.urlwatcher.jobs[index - 1]
15✔
416
        except IndexError as e:
15✔
417
            raise ValueError(f'Job index {index} out of range (found {len(self.urlwatcher.jobs)} jobs).') from e
15✔
418

419
    def _find_job_with_defaults(self, query: str | int) -> JobBase:
15✔
420
        """Returns the job with defaults based on job_id.
421

422
        This could match an index or a location (url/user_visible_url or command). Accepts negative numbers.
423

424
        :param query: The query.
425
        :return: The matching JobBase with defaults.
426
        :raises SystemExit: If job is not found.
427
        """
428
        job = self._find_job(query)
15✔
429
        return job.with_defaults(self.urlwatcher.config_storage.config)
15✔
430

431
    def test_job(self, job_id: bool | str | int) -> None:
15✔
432
        """Tests the running of a single job outputting the filtered text to --test-reporter (default is stdout).
433

434
        If job_id is True, don't run any jobs but load config, jobs and hook files to trigger any syntax errors.
435

436
        :param job_id: The job_id or True.
437

438
        :return: None.
439

440
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
441
        """
442
        if job_id is True:  # Load to trigger any eventual syntax errors
15✔
443
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
15✔
444
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
15✔
445
            if len(self.urlwatch_config.jobs_files) == 1:
15✔
446
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]},')
15✔
447
            else:
448
                message.append(
15✔
449
                    '\n   '.join(
450
                        [f'{conj}jobs files'] + [f'• {file},' for file in sorted(self.urlwatch_config.jobs_files)]
451
                    )
452
                )
453
            if 'hooks' in sys.modules:
15!
454
                message.append(f'\nand hooks file {sys.modules["hooks"].__file__}')
15✔
455
            print(f'{"".join(message)}.')
15✔
456
            return
15✔
457

458
        job = self._find_job_with_defaults(job_id)
15✔
459

460
        if isinstance(job, UrlJob):
15!
461
            # Force re-retrieval of job, as we're testing filters
UNCOV
462
            job.ignore_cached = True
×
463

464
        with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
15✔
465
            # duration = time.perf_counter() - start
466
            job_state.process(headless=not self.urlwatch_config.no_headless)
15✔
467
            if job_state.job.name is None:
15!
UNCOV
468
                job_state.job.name = ''
×
469
            # if job_state.job.note is None:
470
            #     job_state.job.note = ''
471
            data_info = '\n'.join(
15✔
472
                filter(
473
                    None,
474
                    (
475
                        f'• [GUID: {job_state.job.guid}]',
476
                        f'• [Media type: {job_state.new_mime_type}]' if job_state.new_mime_type else None,
477
                        f'• [ETag: {job_state.new_etag}]' if job_state.new_etag else None,
478
                        f'\nERROR {job_state.new_error_data["type"]}: {job_state.new_error_data["message"]}'
479
                        if job_state.new_error_data
480
                        else None,
481
                    ),
482
                )
483
            )
484
            job_state.new_data = f'{data_info}\n\n{job_state.new_data!s}'
15✔
485
            if self.urlwatch_config.test_reporter is None:
15✔
486
                self.urlwatch_config.test_reporter = 'stdout'  # default
15✔
487
            report = Report(self.urlwatcher)
15✔
488
            report.job_states = []  # required
15✔
489
            errorlevel = self.check_test_reporter(
15✔
490
                job_state,
491
                label='test',
492
                report=report,
493
            )
494
            if errorlevel:
15!
UNCOV
495
                self._exit(errorlevel)
×
496
        return
15✔
497

498
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
499
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
500

501
    def prepare_jobs(self) -> None:
15✔
502
        """Runs jobs that have no history to populate the snapshot database when they're newly added."""
503
        new_jobs = set()
15✔
504
        for idx, job in enumerate(self.urlwatcher.jobs):
15✔
505
            has_history = bool(self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid))
15✔
506
            if not has_history:
15!
507
                print(f'Running new {job.get_indexed_location()}.')
15✔
508
                new_jobs.add(idx + 1)
15✔
509
        if not new_jobs and not self.urlwatch_config.joblist:
15!
UNCOV
510
            print('Found no new jobs to run.')
×
UNCOV
511
            return
×
512
        self.urlwatcher.urlwatch_config.joblist = set(self.urlwatcher.urlwatch_config.joblist).union(new_jobs)
15✔
513
        self.urlwatcher.run_jobs()
15✔
514
        self.urlwatcher.close()
15✔
515
        return
15✔
516

517
    def test_differ(self, arg_test_differ: list[str]) -> int:
15✔
518
        """Runs diffs for a job on all the saved snapshots.
519

520
        Outputs the result to stdout or the reporter selected  with --test-reporter.
521

522
        :param arg_test_differ: Either the job_id or a list containing [job_id, max_diffs]
523
        :return: 1 if error, 0 if successful.
524
        """
525
        report = Report(self.urlwatcher)
15✔
526
        self.urlwatch_config.jobs_files = [Path('--test-differ')]  # for report footer
15✔
527
        if len(arg_test_differ) == 1:
15✔
528
            job_id = arg_test_differ[0]
15✔
529
            max_diffs = None
15✔
530
        elif len(arg_test_differ) == 2:
15!
531
            job_id, max_diffs_str = arg_test_differ
15✔
532
            max_diffs = int(max_diffs_str)
15✔
533
        else:
UNCOV
534
            raise ValueError('--test-differ takes a maximum of two arguments')
×
535

536
        job = self._find_job_with_defaults(job_id)
15✔
537

538
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
15✔
539

540
        num_snapshots = len(history_data)
15✔
541
        if num_snapshots == 0:
15✔
542
            print('This job has never been run before.')
15✔
543
            return 1
15✔
544
        if num_snapshots < 2:
15✔
545
            print('Not enough historic data available (need at least 2 different snapshots).')
15✔
546
            return 1
15✔
547

548
        if job.compared_versions and job.compared_versions != 1:
15!
UNCOV
549
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
550

551
        max_diffs = max_diffs or num_snapshots - 1
15✔
552
        for i in range(max_diffs):
15✔
553
            with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
15✔
554
                job_state.new_data = history_data[i].data
15✔
555
                job_state.new_timestamp = history_data[i].timestamp
15✔
556
                job_state.new_etag = history_data[i].etag
15✔
557
                job_state.new_mime_type = history_data[i].mime_type
15✔
558
                if not job.compared_versions or job.compared_versions == 1:
15!
559
                    job_state.old_data = history_data[i + 1].data
15✔
560
                    job_state.old_timestamp = history_data[i + 1].timestamp
15✔
561
                    job_state.old_etag = history_data[i + 1].etag
15✔
562
                    job_state.old_mime_type = history_data[i + 1].mime_type
15✔
563
                else:
UNCOV
564
                    history_dic_snapshots = {s.data: s for s in history_data[i + 1 : i + 1 + job.compared_versions]}
×
UNCOV
565
                    close_matches: list[str] = difflib.get_close_matches(
×
566
                        str(job_state.new_data),
567
                        history_dic_snapshots.keys(),
568
                        n=1,
569
                    )  # ty:ignore[no-matching-overload]
570
                    if close_matches:
×
571
                        job_state.old_data = close_matches[0]
×
572
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
UNCOV
573
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
UNCOV
574
                        job_state.old_mime_type = history_dic_snapshots[close_matches[0]].mime_type
×
575

576
                if self.urlwatch_config.test_reporter is None:
15✔
577
                    self.urlwatch_config.test_reporter = 'stdout'  # default
15✔
578
                report.job_states = []  # required
15✔
579
                if job_state.new_data == job_state.old_data:
15!
UNCOV
580
                    label = (
×
581
                        f'No change (snapshots {-i:2} vs. {-(i + 1):2}) with '
582
                        f"'compared_versions: {job.compared_versions}'"
583
                    )
UNCOV
584
                    job_state.verb = 'changed,no_report'
×
585
                else:
586
                    label = f'Filtered diff (snapshots {-i:2} vs. {-(i + 1):2})'
15✔
587
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
15✔
588
                if errorlevel:
15!
UNCOV
589
                    self._exit(errorlevel)
×
590

591
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
592
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
593

594
        return 0
15✔
595

596
    def dump_history(self, job_id: str) -> int:
15✔
597
        """Displays the historical data stored in the snapshot database for a job.
598

599
        :param job_id: The Job ID.
600
        :return: An argument to be used in sys.exit.
601
        """
602
        try:
15✔
603
            job = self._find_job_with_defaults(job_id)
15✔
UNCOV
604
        except ValueError:
×
UNCOV
605
            print(f"No Job found matching '{job_id}'. Searching database using calculated GUID.")
×
UNCOV
606
            job = JobBase.unserialize({'url': job_id})
×
607

608
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
15✔
609

610
        title = f'History for {job.get_indexed_location()}'
15✔
611
        print(f'{title}\nGUID: {job.guid}')
15✔
612
        if history_data:
15✔
613
            print('=' * max(len(title), 46))
15✔
614
        total_failed = 0
15✔
615
        for i, snapshot in enumerate(history_data):
15✔
616
            mime_type = f' | Media type: {snapshot.mime_type}' if snapshot.mime_type else ''
15✔
617
            etag = f' | ETag: {snapshot.etag}' if snapshot.etag else ''
15✔
618
            tries = f' | Error run (number {snapshot.tries})' if snapshot.tries else ''
15✔
619
            total_failed += snapshot.tries > 0
15✔
620
            tz = self.urlwatcher.report.config['report']['tz']
15✔
621
            tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
15✔
622
            dt = datetime.fromtimestamp(snapshot.timestamp, tz_info)
15✔
623
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{mime_type}{etag}{tries}'
15✔
624
            sep_len = max(50, len(header))
15✔
625
            print(header)
15✔
626
            print('-' * sep_len)
15✔
627
            if snapshot.error_data:
15!
UNCOV
628
                print(f'{snapshot.error_data.get("type")}: {snapshot.error_data.get("message")}')
×
UNCOV
629
                print()
×
UNCOV
630
                print('Last good data:')
×
631
            print(snapshot.data)
15✔
632
            print('=' * sep_len, '\n')
15✔
633

634
        print(
15✔
635
            f'Found {len(history_data) - total_failed}'
636
            + (' good' if total_failed else '')
637
            + ' snapshot'
638
            + ('s' if len(history_data) - total_failed != 1 else '')
639
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
640
            + '.'
641
        )
642

643
        return 0
15✔
644

645
    def list_error_jobs(self) -> int:
15✔
646
        from webchanges.reporters import ReporterBase
15✔
647

648
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
15✔
649
            print(f'Invalid reporter {self.urlwatch_config.errors}.')
15✔
650
            return 1
15✔
651

652
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Iterator[str]:
15✔
653
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
654

655
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
656
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
657
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
658
            """
659

660
            def job_runner(
15✔
661
                stack: ExitStack,
662
                jobs: Iterable[JobBase],
663
                max_workers: int | None = None,
664
            ) -> Iterator[str]:
665
                """Modified worker.job_runner.
666

667
                Yields error text for jobs who fail with an exception or return no data.
668

669
                :param stack: The context manager.
670
                :param jobs: The jobs to run.
671
                :param max_workers: The number of maximum workers for ThreadPoolExecutor.
672
                :return: error text for jobs who fail with an exception or return no data.
673
                """
674
                executor = ThreadPoolExecutor(max_workers=max_workers)
15✔
675

676
                for job_state in executor.map(
15✔
677
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
678
                    (stack.enter_context(JobState(self.urlwatcher.ssdb_storage, job)) for job in jobs),
679
                ):
680
                    if not isinstance(job_state.exception, NotModifiedError):
15!
681
                        if job_state.exception is None:
15✔
682
                            if (
15!
683
                                len(job_state.new_data.strip()) == 0
684
                                if hasattr(job_state, 'new_data')
685
                                else len(job_state.old_data.strip()) == 0
686
                            ):
UNCOV
687
                                if self.urlwatch_config.verbose:
×
688
                                    yield f'{job_state.job.index_number:3}: No data: {job_state.job!r}'
×
689
                                else:
UNCOV
690
                                    pretty_name = job_state.job.pretty_name()
×
UNCOV
691
                                    location = job_state.job.get_location()
×
UNCOV
692
                                    if pretty_name != location:
×
UNCOV
693
                                        yield f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})'
×
694
                                    else:
UNCOV
695
                                        yield f'{job_state.job.index_number:3}: No data: {pretty_name}'
×
696
                        else:
697
                            pretty_name = job_state.job.pretty_name()
15✔
698
                            location = job_state.job.get_location()
15✔
699
                            if pretty_name != location:
15!
700
                                yield (
15✔
701
                                    f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
702
                                    f'({location})'
703
                                )
704
                            else:
UNCOV
705
                                yield f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})'
×
706

707
            with ExitStack() as stack:
15✔
708
                # This code is from worker.run_jobs, modified to yield from job_runner.
709
                from webchanges.worker import get_virt_mem  # avoid circular imports
15✔
710

711
                # run non-BrowserJob jobs first
712
                jobs_to_run = [job for job in jobs if not job.__is_browser__]
15✔
713
                if jobs_to_run:
15!
714
                    logger.debug(
15✔
715
                        "Running jobs that do not require Chrome (without 'use_browser: true') in parallel with "
716
                        "Python's default max_workers."
717
                    )
718
                    yield from job_runner(stack, jobs_to_run, self.urlwatch_config.max_workers)
15✔
719
                else:
720
                    logger.debug("Found no jobs that do not require Chrome (i.e. without 'use_browser: true').")
×
721

722
                # run BrowserJob jobs after
723
                jobs_to_run = [job for job in jobs if job.__is_browser__]
15✔
724
                if jobs_to_run:
15!
725
                    gc.collect()
×
UNCOV
726
                    virt_mem = get_virt_mem()
×
UNCOV
727
                    if self.urlwatch_config.max_workers:
×
UNCOV
728
                        max_workers = self.urlwatch_config.max_workers
×
729
                    else:
UNCOV
730
                        max_workers = max(int(virt_mem / 200e6), 1)
×
UNCOV
731
                        max_workers = min(max_workers, os.cpu_count() or 1)
×
UNCOV
732
                    logger.debug(
×
733
                        f"Running jobs that require Chrome (i.e. with 'use_browser: true') in parallel with "
734
                        f'{max_workers} max_workers.'
735
                    )
UNCOV
736
                    yield from job_runner(stack, jobs_to_run, max_workers)
×
737
                else:
738
                    logger.debug("Found no jobs that require Chrome (i.e. with 'use_browser: true').")
15✔
739

740
        start = time.perf_counter()
15✔
741

742
        # default max_workers (when not specified) to 1
743
        if self.urlwatch_config.max_workers is None:
15!
744
            self.urlwatch_config.max_workers = 1
15✔
745

746
        if len(self.urlwatch_config.jobs_files) == 1:
15!
747
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
15✔
748
        else:
UNCOV
749
            jobs_files = ['in the concatenation of the jobs files'] + [
×
750
                f'• {file},' for file in self.urlwatch_config.jobs_files
751
            ]
752
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)', *jobs_files])
15✔
753

754
        jobs = {
15✔
755
            job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs if job.is_enabled()
756
        }
757
        if self.urlwatch_config.errors == 'stdout':
15!
758
            print(header)
15✔
759
            for line in error_jobs_lines(jobs):
15✔
760
                print(line)
15✔
761
            print('--')
15✔
762
            duration = time.perf_counter() - start
15✔
763
            print(f'Checked {len(jobs)} enabled job{"s" if jobs else ""} for errors in {dur_text(duration)}.')
15✔
764

765
        else:
766
            message = '\n'.join(error_jobs_lines(jobs))
×
767
            if message:
×
768
                # create a dummy job state to run a reporter on
UNCOV
769
                job_state = JobState(
×
770
                    None,  # type: ignore[arg-type]
771
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
772
                )
773
                job_state.traceback = f'{header}\n{message}'
×
774
                duration = time.perf_counter() - start
×
775
                self.urlwatcher.report.config['footnote'] = (
×
776
                    f'Checked {len(jobs)} job{"s" if jobs else ""} for errors in {dur_text(duration)}.'
777
                )
778
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
779
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
780
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
781
                self.urlwatcher.report.error(job_state)
×
UNCOV
782
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
783
            else:
UNCOV
784
                print(header)
×
UNCOV
785
                print('--')
×
UNCOV
786
                duration = time.perf_counter() - start
×
UNCOV
787
                print('Found no errors.')
×
UNCOV
788
                print(f'Checked {len(jobs)} job{"s" if jobs else ""} for errors in {dur_text(duration)}.')
×
789

790
        return 0
15✔
791

792
    def rollback_database(self, timespec: str) -> int:
15✔
793
        """Issues a warning, calls rollback() and prints out the result.
794

795
        :param timestamp: A timespec that if numeric is interpreted as a Unix timestamp otherwise it's passed to
796
          dateutil.parser (if datetime is installed) or datetime.fromisoformat to be converted into a date.
797

798
        :return: A sys.exit code (0 for succcess, 1 for failure)
799
        """
800

801
        def _convert_to_datetime(timespec: str, tz_info: ZoneInfo | tzinfo | None) -> datetime:
15✔
802
            """Converts inputted string to a datetime object, using dateutil if installed.
803

804
            :param timespec: The string.
805
            :param tz_info: The timezone.
806

807
            :return: The datetime object.
808
            """
809
            # --- 1. Try parsing as a numeric timestamp ---
810
            # This is the fastest check and should come first.
811
            if timespec.isnumeric() or (timespec.startswith('-') and timespec[1:].isnumeric()):
15✔
812
                try:
15✔
813
                    timestamp = float(timespec)
15✔
814
                    return datetime.fromtimestamp(timestamp, tz=tz_info)
15✔
UNCOV
815
                except (ValueError, TypeError):
×
816
                    # Pass to the next method if it's not a valid float (e.g., "123a")
817
                    pass
×
818

819
            # --- 2. Try parsing as ISO 8601 format ---
820
            # datetime.fromisoformat is very efficient for standard formats.
821
            try:
15✔
822
                dt = datetime.fromisoformat(timespec)
15✔
823
                # If the parsed datetime is naive (no timezone), apply the provided one.
UNCOV
824
                if dt.tzinfo is None:
×
UNCOV
825
                    return dt.replace(tzinfo=tz_info)
×
UNCOV
826
                return dt
×
827
            except ValueError:
15✔
828
                # Pass to the next method if it's not a valid ISO string.
829
                pass
15✔
830

831
            # --- 3. Try parsing with the flexible but slower dateutil library ---
832
            try:
15✔
833
                from dateutil import parser as dateutil_parser
15✔
834

835
                try:
15✔
836
                    # Set a default datetime to provide context and timezone for ambiguous strings like "Sunday at 4pm".
837
                    default_dt_with_tz = datetime.now(tz_info).replace(second=0, microsecond=0)
15✔
838
                    return dateutil_parser.parse(timespec, default=default_dt_with_tz)  # bug
15✔
839
                except (ValueError, OverflowError):
15✔
840
                    # Pass to the next method if datetutil cannot parse.
841
                    pass
15✔
842
            except ImportError:
843
                # Pass to the next method if datetutil is not installed.
844
                pass
845

846
            # --- 4. If all parsing attempts fail ---
847
            raise ValueError(f'Cannot parse "{timespec}" into a date/time.')
15✔
848

849
        tz = self.urlwatcher.report.config['report']['tz']
15✔
850
        tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
15✔
851
        dt = _convert_to_datetime(timespec, tz_info)
15✔
852
        timestamp_date = email.utils.format_datetime(dt)
15✔
853
        count = self.urlwatcher.ssdb_storage.rollback(dt.timestamp())
15✔
854
        print(f'Rolling back database to {timestamp_date}.')
15✔
855
        if sys.__stdin__ and sys.__stdin__.isatty():
15✔
856
            print(
5✔
857
                f'WARNING: All {count} snapshots after this date/time (check timezone) will be deleted.\n'
858
                f'         ☠  This operation cannot be undone!\n'
859
                f'         We suggest you make a backup of the database file before proceeding:\n'
860
                f'         {self.urlwatch_config.ssdb_file}'
861
            )
862
            resp = input("         Please enter 'Y' to proceed: ")
5✔
863
            if not resp.upper().startswith('Y'):
5!
UNCOV
864
                print('Quitting rollback. No snapshots have been deleted.')
×
UNCOV
865
                return 1
×
866
        count = self.urlwatcher.ssdb_storage.rollback(dt.timestamp())
15✔
867
        if count:
15!
UNCOV
868
            print(f'Deleted {count} snapshots taken after {timestamp_date}.')
×
UNCOV
869
            self.urlwatcher.ssdb_storage.close()
×
870
        else:
871
            print(f'No snapshots found after {timestamp_date}')
15✔
872
        return 0
15✔
873

874
    def delete_snapshot(self, job_id: str | int) -> int:
15✔
875
        job = self._find_job_with_defaults(job_id)
15✔
876
        history = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
15✔
877
        if not history:
15✔
878
            print(f'No snapshots found for {job.get_indexed_location()}.')
15✔
879
            return 1
15✔
880
        tz = self.urlwatcher.report.config['report']['tz']
15✔
881
        tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
15✔
882
        if sys.__stdin__ and sys.__stdin__.isatty():
15✔
883
            print(f'WARNING: About to delete the latest snapshot of\n         {job.get_indexed_location()}:')
5✔
884
            for i, history_job in enumerate(history):
5✔
885
                print(
5✔
886
                    f'         {i + 1}. {"❌ " if i == 0 else "   "}'
887
                    f'{email.utils.format_datetime(datetime.fromtimestamp(history_job.timestamp).astimezone(tz_info))}'
888
                    f'{"  ⬅  ABOUT TO BE DELETED!" if i == 0 else ""}'
889
                )
890
            print(
5✔
891
                f'         ☠  This operation cannot be undone!\n'
892
                f'         We suggest you make a backup of the database file before proceeding:\n'
893
                f'         {self.urlwatch_config.ssdb_file}'
894
            )
895
            resp = input("         Please enter 'Y' to proceed: ")
5✔
896
            if not resp.upper().startswith('Y'):
5!
897
                print('Quitting. No snapshots have been deleted.')
×
UNCOV
898
                return 1
×
899
        count = self.urlwatcher.ssdb_storage.delete_latest(job.guid)
15✔
900
        if count:
15!
901
            print(f'Deleted last snapshot of {job.get_indexed_location()}; {len(history) - 1} snapshots left.')
15✔
902
            return 0
15✔
UNCOV
903
        print(f'No snapshots found for {job.get_indexed_location()}.')
×
UNCOV
904
        return 1
×
905

906
    def modify_urls(self) -> int:
15✔
907
        if self.urlwatch_config.delete is not None:
15✔
908
            job = self._find_job(self.urlwatch_config.delete)
15✔
909
            if job is not None:
15!
910
                if sys.__stdin__ and sys.__stdin__.isatty():
15✔
911
                    print(
5✔
912
                        f'WARNING: About to permanently delete {job.get_indexed_location()}.\n'
913
                        '         Job file will be overwritten and all remarks lost.'
914
                        '         This operation cannot be undone!\n'
915
                    )
916
                    resp = input("         Please enter 'Y' to proceed: ")
5✔
917
                    if not resp.upper().startswith('Y'):
5!
918
                        print(f'Quitting. Job {job.index_number} has not been deleted and job file is unmodified.')
×
UNCOV
919
                        return 1
×
920
                self.urlwatcher.jobs.remove(job)
15✔
921
                print(f'Removed {job}.')
15✔
922
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
15✔
923
            else:
UNCOV
924
                print(f'Job not found: {self.urlwatch_config.delete}.')
×
UNCOV
925
                return 1
×
926

927
        if self.urlwatch_config.add is not None:
15✔
928
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
929
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
15✔
930
            filters = [v for k, v in items if k == 'filter']
15✔
931
            items2 = [(k, v) for k, v in items if k != 'filter']
15✔
932
            d = dict(items2)
15✔
933
            if filters:
15!
UNCOV
934
                d['filter'] = ','.join(filters)
×
935

936
            job = JobBase.unserialize(d)
15✔
937
            print(f'Adding {job}.')
15✔
938
            self.urlwatcher.jobs.append(job)
15✔
939
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
15✔
940

941
        if self.urlwatch_config.change_location is not None:
15✔
942
            new_loc = self.urlwatch_config.change_location[1]
15✔
943
            # Ensure the user isn't overwriting an existing job with the change.
944
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
15!
UNCOV
945
                print(
×
946
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
947
                    f'different value.\n'
948
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
949
                )
UNCOV
950
                return 1
×
951
            job = self._find_job(self.urlwatch_config.change_location[0])
15✔
952
            if job is not None:
15!
953
                # Update the job's location (which will also update the guid) and move any history in the database
954
                # over to the job's updated guid.
955
                old_loc = job.get_location()
15✔
956
                print(f'Moving location of "{old_loc}" to "{new_loc}".')
15✔
957
                old_guid = job.guid
15✔
958
                if old_guid not in self.urlwatcher.ssdb_storage.get_guids():
15✔
959
                    print(f'No snapshots found for "{old_loc}".')
15✔
960
                    return 1
15✔
961
                job.set_base_location(new_loc)
15✔
962
                num_searched = self.urlwatcher.ssdb_storage.move(old_guid, job.guid)
15✔
963
                if num_searched:
15!
964
                    print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}".')
15✔
965
            else:
UNCOV
966
                print(f'Job not found: "{self.urlwatch_config.change_location[0]}".')
×
UNCOV
967
                return 1
×
968
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
15✔
969
            if not input(message).lower().startswith('y'):
15!
UNCOV
970
                print(f'Please manually update the jobs file by replacing "{old_loc}" with "{new_loc}".')
×
971
            else:
972
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
15✔
973

974
        return 0
15✔
975

976
    def edit_config(self) -> int:
15✔
977
        return self.urlwatcher.config_storage.edit()
15✔
978

979
    def check_telegram_chats(self) -> None:
15✔
980
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
15✔
981

982
        bot_token = config['bot_token']
15✔
983
        if not bot_token:
15✔
984
            print('You need to set up your bot token first (see documentation).')
15✔
985
            self._exit(1)
15✔
986

987
        get_client = httpx.Client(http2=h2 is not None).get if httpx else requests.get
15✔
988

989
        info = get_client(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
15✔
990
        if not info['ok']:
15!
991
            print(f'Error with token {bot_token}: {info["description"]}.')
15✔
992
            self._exit(1)
15✔
993

UNCOV
994
        chats = {}
×
UNCOV
995
        updates = get_client(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
UNCOV
996
        if 'result' in updates:
×
997
            for chat_info in updates['result']:
×
998
                chat = chat_info['message']['chat']
×
999
                if chat['type'] == 'private':
×
UNCOV
1000
                    chats[chat['id']] = (
×
1001
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
1002
                    )
1003

1004
        if not chats:
×
1005
            print(f'No chats found. Say hello to your bot at https://t.me/{info["result"]["username"]}.')
×
1006
            self._exit(1)
×
1007

1008
        headers = ('Chat ID', 'Name')
×
1009
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
UNCOV
1010
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
1011
        fmt = f'%-{maxchat}s  %s'
×
UNCOV
1012
        print(fmt % headers)
×
UNCOV
1013
        print(fmt % ('-' * maxchat, '-' * maxname))
×
UNCOV
1014
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
UNCOV
1015
            print(fmt % (k, v))
×
UNCOV
1016
        print(f'\nChat up your bot here: https://t.me/{info["result"]["username"]}.')
×
1017

UNCOV
1018
        self._exit(0)
×
1019

1020
    def check_test_reporter(
15✔
1021
        self,
1022
        job_state: JobState | None = None,
1023
        label: str = 'test',
1024
        report: Report | None = None,
1025
    ) -> int:
1026
        """Tests a reporter by creating pseudo-jobs of new, changed, unchanged, and error outcomes ('verb').
1027

1028
        Note: The report will only show new, unchanged and error content if enabled in the respective `display` keys
1029
        of the configuration file.
1030

1031
        :param job_state: The JobState (Optional).
1032
        :param label: The label to be used in the report; defaults to 'test'.
1033
        :param report: A Report class to use for testing (Optional).
1034
        :return: 0 if successful, 1 otherwise.
1035
        """
1036
        from webchanges.reporters import ReporterBase
15✔
1037

1038
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
15✔
1039
            """Builds a pseudo-job for the reporter to run on."""
1040
            job = JobBase.unserialize({'name': job_name, 'url': url})
15✔
1041

1042
            # Can pass in None for ssdb_storage, as we are not going to load or save the job state for
1043
            # testing; also no need to use it as context manager, since no processing is called on the job
1044
            job_state = JobState(None, job)  # type: ignore[arg-type]
15✔
1045

1046
            job_state.old_data = old
15✔
1047
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
15✔
1048
            job_state.new_data = new
15✔
1049
            job_state.new_timestamp = time.time()
15✔
1050

1051
            return job_state
15✔
1052

1053
        def set_error(job_state: 'JobState', message: str) -> JobState:
15✔
1054
            """Sets a job error message on a JobState."""
1055
            try:
15✔
1056
                raise ValueError(message)
15✔
1057
            except ValueError as e:
15✔
1058
                job_state.exception = e
15✔
1059
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
15✔
1060

1061
            return job_state
15✔
1062

1063
        reporter_name = self.urlwatch_config.test_reporter
15✔
1064
        if reporter_name not in ReporterBase.__subclasses__:
15✔
1065
            print(
15✔
1066
                f'No such reporter: {reporter_name}.\n'
1067
                f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}.\n'
1068
            )
1069
            return 1
15✔
1070

1071
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][reporter_name]
15✔
1072
        if job_state:  # we want a full report
15✔
1073
            cfg['enabled'] = True
15✔
1074
            self.urlwatcher.config_storage.config['display'][label] = True
15✔
1075
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
15✔
1076
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
15✔
1077
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
15✔
1078
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
15✔
1079
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
15✔
1080
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
15✔
1081
            self.urlwatcher.config_storage.config['report']['stdout']['color'] = False
15✔
1082
        elif not cfg['enabled']:
15✔
1083
            print(
15✔
1084
                f'WARNING: Reporter being tested is not enabled: {reporter_name}.\n'
1085
                f'Will still attempt to test it, but this may not work.\n'
1086
                f'Use {__project_name__} --edit-config to configure reporters.'
1087
            )
1088
            cfg['enabled'] = True
15✔
1089

1090
        if report is None:
15✔
1091
            report = Report(self.urlwatcher)
15✔
1092

1093
        if job_state:
15✔
1094
            report.custom(job_state, label)  # type: ignore[arg-type]
15✔
1095
        else:
1096
            report.new(
15✔
1097
                build_job(
1098
                    'Sample job that was newly added',
1099
                    'https://example.com/new',
1100
                    '',
1101
                    '',
1102
                )
1103
            )
1104
            report.changed(
15✔
1105
                build_job(
1106
                    'Sample job where something changed',
1107
                    'https://example.com/changed',
1108
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
1109
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
1110
                )
1111
            )
1112
            report.unchanged(
15✔
1113
                build_job(
1114
                    'Sample job where nothing changed',
1115
                    'http://example.com/unchanged',
1116
                    'Same Old, Same Old\n',
1117
                    'Same Old, Same Old\n',
1118
                )
1119
            )
1120
            report.error(
15✔
1121
                set_error(
1122
                    build_job(
1123
                        'Sample job where an error was encountered',
1124
                        'https://example.com/error',
1125
                        '',
1126
                        '',
1127
                    ),
1128
                    'The error message would appear here.',
1129
                )
1130
            )
1131

1132
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
15✔
1133

1134
        return 0
15✔
1135

1136
    def check_smtp_login(self) -> None:
15✔
1137
        from webchanges.mailer import SMTPMailer, smtp_have_password, smtp_set_password
15✔
1138

1139
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
15✔
1140
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
15✔
1141

1142
        success = True
15✔
1143

1144
        if not config['enabled']:
15✔
1145
            print('Please enable email reporting in the config first.')
15✔
1146
            success = False
15✔
1147

1148
        if config['method'] != 'smtp':
15✔
1149
            print('Please set the method to SMTP for the email reporter.')
15✔
1150
            success = False
15✔
1151

1152
        smtp_auth = smtp_config['auth']
15✔
1153
        if not smtp_auth:
15✔
1154
            print('Authentication must be enabled for SMTP.')
15✔
1155
            success = False
15✔
1156

1157
        smtp_hostname = smtp_config['host']
15✔
1158
        if not smtp_hostname:
15✔
1159
            print('Please configure the SMTP hostname in the config first.')
15✔
1160
            success = False
15✔
1161

1162
        smtp_username = smtp_config['user'] or config['from']
15✔
1163
        if not smtp_username:
15✔
1164
            print('Please configure the SMTP user in the config first.')
15✔
1165
            success = False
15✔
1166

1167
        if not success:
15✔
1168
            self._exit(1)
15✔
1169

1170
        insecure_password = smtp_config['insecure_password']
6✔
1171
        if insecure_password:
6!
1172
            print('The SMTP password is set in the config file (key "insecure_password").')
6✔
1173
        elif smtp_have_password(smtp_hostname, smtp_username):
×
UNCOV
1174
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
UNCOV
1175
            if not input(message).lower().startswith('y'):
×
UNCOV
1176
                print('Password unchanged.')
×
1177
            else:
UNCOV
1178
                smtp_set_password(smtp_hostname, smtp_username)
×
1179

1180
        smtp_port = smtp_config['port']
6✔
1181
        smtp_tls = smtp_config['starttls']
6✔
1182

1183
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
6✔
1184
        print('Trying to log into the SMTP server...')
6✔
1185
        mailer.send(None)
6✔
UNCOV
1186
        print('Successfully logged into SMTP server.')
×
1187

UNCOV
1188
        self._exit(0)
×
1189

1190
    def check_xmpp_login(self) -> None:
15✔
1191
        from webchanges.reporters import xmpp_have_password, xmpp_set_password
15✔
1192

1193
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
15✔
1194

1195
        success = True
15✔
1196

1197
        if not xmpp_config['enabled']:
15✔
1198
            print('Please enable XMPP reporting in the config first.')
15✔
1199
            success = False
15✔
1200

1201
        xmpp_sender = xmpp_config['sender']
15✔
1202
        if not xmpp_sender:
15✔
1203
            print('Please configure the XMPP sender in the config first.')
15✔
1204
            success = False
15✔
1205

1206
        if not xmpp_config['recipient']:
15✔
1207
            print('Please configure the XMPP recipient in the config first.')
15✔
1208
            success = False
15✔
1209

1210
        if not success:
15✔
1211
            self._exit(1)
15✔
1212

1213
        if 'insecure_password' in xmpp_config:
15!
1214
            print('The XMPP password is already set in the config (key "insecure_password").')
15✔
1215
            self._exit(0)
15✔
1216

UNCOV
1217
        if xmpp_have_password(xmpp_sender):
×
1218
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
1219
            if input(message).lower() != 'y':
×
UNCOV
1220
                print('Password unchanged.')
×
1221
                self._exit(0)
×
1222

UNCOV
1223
        if success:
×
UNCOV
1224
            xmpp_set_password(xmpp_sender)
×
1225

UNCOV
1226
        self._exit(0)
×
1227

1228
    @staticmethod
1229
    def playwright_install_chrome() -> int:  # pragma: no cover
1230
        """Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
1231
        install the browser executable.
1232

1233
        :return: Playwright's executable return code.
1234
        """
1235
        try:
1236
            from playwright._impl._driver import compute_driver_executable
1237
        except ImportError:  # pragma: no cover
1238
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
1239

1240
        driver_executable = compute_driver_executable()
1241
        env = os.environ.copy()
1242
        env['PW_CLI_TARGET_LANG'] = 'python'
1243
        cmd = [str(driver_executable), 'install', 'chrome']
1244
        logger.info(f'Running playwright CLI: {" ".join(cmd)}')
1245
        completed_process = subprocess.run(cmd, check=False, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
1246
        if completed_process.returncode:
1247
            print(completed_process.stderr)
1248
            return completed_process.returncode
1249
        if completed_process.stdout:
1250
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1251
        return 0
1252

1253
    def handle_actions(self) -> None:
15✔
1254
        """Handles the actions for command line arguments and exits."""
1255
        if self.urlwatch_config.list_jobs:
15✔
1256
            self.list_jobs(self.urlwatch_config.list_jobs)
15✔
1257
            self._exit(0)
15✔
1258

1259
        if self.urlwatch_config.errors:
15✔
1260
            self._exit(self.list_error_jobs())
15✔
1261

1262
        if self.urlwatch_config.test_job:
15✔
1263
            self.test_job(self.urlwatch_config.test_job)
15✔
1264
            self._exit(0)
15✔
1265

1266
        if self.urlwatch_config.prepare_jobs:
15✔
1267
            self.prepare_jobs()
15✔
1268
            self._exit(0)
15✔
1269

1270
        if self.urlwatch_config.test_differ:
15✔
1271
            self._exit(self.test_differ(self.urlwatch_config.test_differ))
15✔
1272

1273
        if self.urlwatch_config.dump_history:
15✔
1274
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
15✔
1275

1276
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
15✔
1277
            self._exit(self.modify_urls())
15✔
1278

1279
        if self.urlwatch_config.test_reporter:
15✔
1280
            self._exit(self.check_test_reporter())
15✔
1281

1282
        if self.urlwatch_config.smtp_login:
15✔
1283
            self.check_smtp_login()
15✔
1284

1285
        if self.urlwatch_config.telegram_chats:
15✔
1286
            self.check_telegram_chats()
15✔
1287

1288
        if self.urlwatch_config.xmpp_login:
15✔
1289
            self.check_xmpp_login()
15✔
1290

1291
        if self.urlwatch_config.edit:
15✔
1292
            self._exit(self.urlwatcher.jobs_storage.edit())
15✔
1293

1294
        if self.urlwatch_config.edit_config:
15✔
1295
            self._exit(self.edit_config())
15✔
1296

1297
        if self.urlwatch_config.edit_hooks:
15✔
1298
            self._exit(self.edit_hooks())
15✔
1299

1300
        if self.urlwatch_config.gc_database:
15✔
1301
            self.urlwatcher.ssdb_storage.gc(
15✔
1302
                [job.guid for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1303
            )
1304
            self.urlwatcher.ssdb_storage.close()
15✔
1305
            self._exit(0)
15✔
1306

1307
        if self.urlwatch_config.clean_database:
15✔
1308
            self.urlwatcher.ssdb_storage.clean_ssdb(
15✔
1309
                [job.guid for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1310
            )
1311
            self.urlwatcher.ssdb_storage.close()
15✔
1312
            self._exit(0)
15✔
1313

1314
        if self.urlwatch_config.rollback_database:
15✔
1315
            exit_arg = self.rollback_database(self.urlwatch_config.rollback_database)
15✔
1316
            self.urlwatcher.ssdb_storage.close()
15✔
1317
            self._exit(exit_arg)
15✔
1318

1319
        if self.urlwatch_config.delete_snapshot:
15✔
1320
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
15✔
1321

1322
        if self.urlwatch_config.features:
15✔
1323
            self._exit(self.show_features())
15✔
1324

1325
        if self.urlwatch_config.detailed_versions:
15!
1326
            self._exit(self.show_detailed_versions())
15✔
1327

1328
    def run(self) -> None:  # pragma: no cover
1329
        """The main run logic."""
1330
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1331
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1332

1333
        self.handle_actions()
1334

1335
        self.urlwatcher.run_jobs()
1336

1337
        self.urlwatcher.close()
1338

1339
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc