• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 21856489627

10 Feb 2026 07:57AM UTC coverage: 73.228% (-0.09%) from 73.318%
21856489627

push

github

mborsetti
Version 3.34.0rc0

1424 of 2298 branches covered (61.97%)

Branch coverage included in aggregate %.

4766 of 6155 relevant lines covered (77.43%)

11.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.43
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE.md file, which is part of the source code.
4

5
from __future__ import annotations
15✔
6

7
import difflib
15✔
8
import email.utils
15✔
9
import gc
15✔
10
import importlib.metadata
15✔
11
import logging
15✔
12
import os
15✔
13
import platform
15✔
14
import re
15✔
15
import shutil
15✔
16
import sqlite3
15✔
17
import subprocess
15✔
18
import sys
15✔
19
import time
15✔
20
import traceback
15✔
21
from concurrent.futures import ThreadPoolExecutor
15✔
22
from contextlib import ExitStack
15✔
23
from datetime import datetime, tzinfo
15✔
24
from pathlib import Path
15✔
25
from typing import TYPE_CHECKING, Iterable, Iterator
15✔
26
from urllib.parse import unquote_plus
15✔
27
from zoneinfo import ZoneInfo
15✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
15✔
30
from webchanges.handler import JobState, Report
15✔
31
from webchanges.jobs import JobBase, NotModifiedError, UrlJob
15✔
32
from webchanges.util import dur_text, edit_file, import_module_from_source
15✔
33

34
try:
15✔
35
    import httpx
15✔
36
except ImportError:  # pragma: no cover
37
    httpx = None  # type: ignore[assignment]
38
    print("Required package 'httpx' not found; will attempt to run using 'requests'.")
39
    try:
40
        import requests
41
    except ImportError as e:  # pragma: no cover
42
        raise RuntimeError(
43
            f"A Python HTTP client package (either 'httpx' or 'requests' is required to run {__project_name__}; "
44
            'neither can be imported.'
45
        ) from e
46
if httpx is not None:
15✔
47
    try:
15✔
48
        import h2
15✔
49
    except ImportError:  # pragma: no cover
50
        h2 = None  # type: ignore[assignment]
51

52
if TYPE_CHECKING:
53
    from webchanges.main import Urlwatch
54
    from webchanges.reporters import _ConfigReportersList
55
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
56

57
logger = logging.getLogger(__name__)
15✔
58

59

60
class UrlwatchCommand:
15✔
61
    """The class that runs the program after initialization and CLI arguments parsing."""
62

63
    def __init__(self, urlwatcher: Urlwatch) -> None:
15✔
64
        self.urlwatcher = urlwatcher
15✔
65
        self.urlwatch_config = urlwatcher.urlwatch_config
15✔
66

67
    def _exit(self, arg: str | int | None) -> None:
15✔
68
        logger.info(f'Exiting with exit code {arg}')
15✔
69

70
        self.urlwatcher.ssdb_storage.close()
15✔
71
        sys.exit(arg)
15✔
72

73
    def jobs_from_joblist(self) -> Iterator[JobBase]:
15✔
74
        """Generates the jobs to process from the joblist entered in the CLI."""
75
        if self.urlwatcher.urlwatch_config.joblist:
15✔
76
            jobs = {self._find_job(job_entry) for job_entry in self.urlwatcher.urlwatch_config.joblist}
15✔
77
            enabled_jobs = {job for job in jobs if job.is_enabled()}
15✔
78
            disabled = len(enabled_jobs) - len(jobs)
15✔
79
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
15✔
80
            logger.debug(
15✔
81
                f'Processing {len(enabled_jobs)} job{"s" if enabled_jobs else ""}{disabled_str} as specified in '
82
                f'command line: {", ".join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}'
83
            )
84
        else:
85
            enabled_jobs = {job for job in self.urlwatcher.jobs if job.is_enabled()}
15✔
86
            disabled = len(enabled_jobs) - len(self.urlwatcher.jobs)
15✔
87
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
15✔
88
            logger.debug(f'Processing {len(enabled_jobs)} job{"s" if enabled_jobs else ""}{disabled_str}')
15✔
89
        for job in enabled_jobs:
15✔
90
            yield job.with_defaults(self.urlwatcher.config_storage.config)
15✔
91

92
    def edit_hooks(self) -> int:
15✔
93
        """Edit hooks file.
94

95
        :returns: 0 if edit is successful, 1 otherwise.
96
        """
97
        # Similar code to BaseTextualFileStorage.edit()
98
        for hooks_file in self.urlwatch_config.hooks_files:
15✔
99
            logger.debug(f'Edit file {hooks_file}')
15✔
100
            hooks_edit = hooks_file.with_stem(hooks_file.stem + '_edit')
15✔
101
            if hooks_file.exists():
15!
102
                shutil.copy(hooks_file, hooks_edit)
15✔
103
            # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
104
            #         self.urlwatch_config.hooks_py_example):
105
            #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
106

107
            while True:
15✔
108
                try:
15✔
109
                    edit_file(hooks_edit)
15✔
110
                    import_module_from_source('hooks', hooks_edit)
15✔
111
                    break  # stop if no exception on parser
15✔
112
                except SystemExit:
15✔
113
                    raise
×
114
                except Exception as e:  # noqa: BLE001 Do not catch blind exception: `Exception`
15✔
115
                    print('Parsing failed:')
15✔
116
                    print('======')
15✔
117
                    print(e)
15✔
118
                    print('======')
15✔
119
                    print()
15✔
120
                    print(f'The file {hooks_file} was NOT updated.')
15✔
121
                    user_input = input('Do you want to retry the same edit? (Y/n)')
15✔
122
                    if not user_input or user_input.lower()[0] == 'y':
×
123
                        continue
×
124
                    hooks_edit.unlink()
×
125
                    print('No changes have been saved.')
×
126
                    return 1
×
127

128
            if hooks_file.is_symlink():
15!
129
                hooks_file.write_text(hooks_edit.read_text())
×
130
            else:
131
                hooks_edit.replace(hooks_file)
15✔
132
            hooks_edit.unlink(missing_ok=True)
15✔
133
            print(f'Saved edits in {hooks_file}.')
15✔
134

135
        return 0
15✔
136

137
    @staticmethod
15✔
138
    def show_features() -> int:
15✔
139
        """Prints the "features", i.e. a list of job types, filters and reporters.
140

141
        :return: 0.
142
        """
143
        from webchanges.differs import DifferBase
15✔
144
        from webchanges.filters import FilterBase
15✔
145
        from webchanges.reporters import ReporterBase
15✔
146

147
        print(f'Please see full documentation at {__docs_url__}.')
15✔
148
        print()
15✔
149
        print('Supported jobs:\n')
15✔
150
        print(JobBase.job_documentation())
15✔
151
        print('Supported filters:\n')
15✔
152
        print(FilterBase.filter_documentation())
15✔
153
        print()
15✔
154
        print('Supported differs:\n')
15✔
155
        print(DifferBase.differ_documentation())
15✔
156
        print()
15✔
157
        print('Supported reporters:\n')
15✔
158
        print(ReporterBase.reporter_documentation())
15✔
159
        print()
15✔
160
        print(f'Please see full documentation at {__docs_url__}.')
15✔
161

162
        return 0
15✔
163

164
    @staticmethod
15✔
165
    def show_detailed_versions() -> int:
15✔
166
        """Prints the detailed versions, including of dependencies.
167

168
        :return: 0.
169
        """
170

171
        def dependencies() -> list[str]:
15✔
172
            try:
15✔
173
                from pip._internal.metadata import get_default_environment
15✔
174

175
                env = get_default_environment()
15✔
176
                dist = None
15✔
177
                for dist in env.iter_all_distributions():
15✔
178
                    if dist.canonical_name == __project_name__:
15!
179
                        break
×
180
                if dist and dist.canonical_name == __project_name__:
15!
181
                    requires_dist = dist.metadata_dict.get('requires_dist', [])
×
182
                    dependencies = [re.split('[ <>=;#^[]', d)[0] for d in requires_dist]
×
183
                    dependencies.extend(('packaging', 'simplejson'))
×
184
                    return sorted(dependencies, key=str.lower)
×
185
            except ImportError:
186
                pass
187

188
            # default list of all possible dependencies
189
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
15✔
190
            return [
15✔
191
                'aioxmpp',
192
                'beautifulsoup4',
193
                'chump',
194
                'colorama',
195
                'cryptography',
196
                'cssbeautifier',
197
                'cssselect',
198
                'deepdiff',
199
                'h2',
200
                'html2text',
201
                'httpx',
202
                'jq',
203
                'jsbeautifier',
204
                'keyring',
205
                'lxml',
206
                'markdown2',
207
                'matrix_client',
208
                'msgpack',
209
                'packaging',
210
                'pdftotext',
211
                'Pillow',
212
                'platformdirs',
213
                'playwright',
214
                'psutil',
215
                'pushbullet.py',
216
                'pypdf',
217
                'pytesseract',
218
                'pyyaml',
219
                'redis',
220
                'requests',
221
                'simplejson',
222
                'tzdata',
223
                'vobject',
224
            ]
225

226
        print('Software:')
15✔
227
        print(f'• {__project_name__}: {__version__}')
15✔
228
        print(
15✔
229
            f'• {platform.python_implementation()}: {platform.python_version()} '
230
            f'{platform.python_build()} {platform.python_compiler()}'
231
        )
232
        print(f'• SQLite: {sqlite3.sqlite_version}')
15✔
233

234
        try:
15✔
235
            import psutil
15✔
236
            from psutil._common import bytes2human
15✔
237

238
            print()
15✔
239
            print('System:')
15✔
240
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
15✔
241
            print(f'• Processor: {platform.processor()}')
15✔
242
            print(f'• CPUs (logical): {psutil.cpu_count()}')
15✔
243
            try:
15✔
244
                virt_mem = psutil.virtual_memory().available
15✔
245
                print(
15✔
246
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
247
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
248
                )
249
            except psutil.Error as e:  # pragma: no cover
250
                print(f'• Free memory: Could not read information: {e}')
251
            print(
15✔
252
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
253
                f'({100 - psutil.disk_usage("/").percent:.1f}%)'
254
            )
255
            executor = ThreadPoolExecutor()
15✔
256
            print(f'• --max-threads default: {executor._max_workers}')
15✔
257
        except ImportError:
258
            pass
259

260
        print()
15✔
261
        print('Relevant PyPi packages:')
15✔
262
        for module_name in dependencies():
15✔
263
            try:
15✔
264
                mod = importlib.metadata.distribution(module_name)
15✔
265
            except ModuleNotFoundError:
15✔
266
                continue
15✔
267
            print(f'• {module_name}: {mod.version}')
15✔
268
            # package requirements
269
            if mod.requires:
15✔
270
                for req_name in [i.split()[0] for i in mod.requires]:
15✔
271
                    try:
15✔
272
                        req = importlib.metadata.distribution(req_name)
15✔
273
                    except ModuleNotFoundError:
15✔
274
                        continue
15✔
275
                    print(f'  - {req_name}: {req.version}')
15✔
276

277
        # playwright
278
        try:
15✔
279
            from playwright.sync_api import Error as PlaywrightError
15✔
280
            from playwright.sync_api import sync_playwright
12✔
281

282
            with sync_playwright() as p:
12✔
283
                try:
3✔
284
                    print()
3✔
285
                    print('Playwright browser:')
3✔
286
                    browser = p.chromium.launch(channel='chrome')
3✔
287
                    print(f'• Name: {browser.browser_type.name}')
3✔
288
                    print(f'• Version: {browser.version}')
3✔
289
                    print(f'• Executable: {browser.browser_type.executable_path}')
3✔
290
                    if psutil:
3!
291
                        browser.new_page()
3✔
292
                        try:
3✔
293
                            virt_mem = psutil.virtual_memory().available
3✔
294
                            print(
3✔
295
                                f'• Free memory with browser loaded: '
296
                                f'{bytes2human(virt_mem)} physical plus '
297
                                f'{bytes2human(psutil.swap_memory().free)} swap'
298
                            )
299
                        except psutil.Error:
×
300
                            pass
×
301
                except PlaywrightError as e:
×
302
                    print()
×
303
                    print('Playwright browser:')
×
304
                    print(f'• Error: {e}')
×
305
        except ImportError:
306
            pass
307

308
        if os.name == 'posix':
15✔
309
            print()
10✔
310
            print('Installed dpkg dependencies:')
10✔
311
            try:
10✔
312
                import apt  # ty:ignore[unresolved-import]
10✔
313

314
                apt_cache = apt.Cache()
×
315

316
                def print_version(libs: list[str]) -> None:
×
317
                    for lib in libs:
×
318
                        if lib in apt_cache and apt_cache[lib].versions:
×
319
                            ver = apt_cache[lib].versions
×
320
                            print(f'   - {ver[0].package}: {ver[0].version}')
×
321

322
                installed_packages = {dist.metadata['Name'] for dist in importlib.metadata.distributions()}
×
323
                for module, apt_dists in (
×
324
                    ('jq', ['jq']),
325
                    # https://github.com/jalan/pdftotext#os-dependencies
326
                    ('pdftotext', ['libpoppler-cpp-dev']),
327
                    # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
328
                    (
329
                        'Pillow',
330
                        [
331
                            'libjpeg-dev',
332
                            'zlib-dev',
333
                            'zlib1g-dev',
334
                            'libtiff-dev',
335
                            'libfreetype-dev',
336
                            'littlecms-dev',
337
                            'libwebp-dev',
338
                            'tcl/tk-dev',
339
                            'openjpeg-dev',
340
                            'libimagequant-dev',
341
                            'libraqm-dev',
342
                            'libxcb-dev',
343
                            'libxcb1-dev',
344
                        ],
345
                    ),
346
                    ('playwright', ['google-chrome-stable']),
347
                    # https://tesseract-ocr.github.io/tessdoc/Installation.html
348
                    ('pytesseract', ['tesseract-ocr']),
349
                ):
350
                    if module in installed_packages:
×
351
                        importlib.metadata.distribution(module)
×
352
                        print(f'• {module}')
×
353
                        print_version(apt_dists)
×
354
            except ImportError:
355
                print('Dependencies cannot be printed as python3-apt is not installed.')
356
                print("Run 'sudo apt-get install python3-apt' to install.")
357
        print()
15✔
358
        return 0
15✔
359

360
    def list_jobs(self, regex: bool | str) -> None:
15✔
361
        """Lists the job and their respective _index_number.
362

363
        :return: None.
364
        """
365
        if isinstance(regex, str):
15!
366
            print(f"List of jobs matching the RegEx '{regex}':")
×
367
        else:
368
            print('List of jobs:')
15✔
369
        for job in self.urlwatcher.jobs:
15✔
370
            if self.urlwatch_config.verbose:
15✔
371
                job_desc = f'{job.index_number:3}: {job!r}'
15✔
372
            else:
373
                pretty_name = job.pretty_name()
15✔
374
                location = job.get_location()
15✔
375
                if pretty_name != location:
15!
376
                    job_desc = f'{job.index_number:3}: {pretty_name} ({location})'
15✔
377
                else:
378
                    job_desc = f'{job.index_number:3}: {pretty_name}'
×
379
            if isinstance(regex, bool) or re.findall(regex, job_desc):
15!
380
                print(job_desc)
15✔
381

382
        if len(self.urlwatch_config.jobs_files) > 1:
15✔
383
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
15✔
384
        elif len(self.urlwatch_config.jobs_files) == 1:
15✔
385
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
15✔
386
        else:
387
            jobs_files = []
15✔
388
        print('\n   '.join(jobs_files))
15✔
389

390
    def _find_job(self, query: str | int) -> JobBase:
15✔
391
        """Finds the job based on a query.
392

393
        It is matched to the job index (also negative) or a job location (i.e. the url/user_visible_url or command).
394

395
        :param query: The query.
396
        :return: The matching JobBase.
397
        :raises IndexError: If job is not found.
398
        """
399
        if isinstance(query, int):
15✔
400
            index = query
15✔
401
        else:
402
            try:
15✔
403
                index = int(query)
15✔
404
            except ValueError:
15✔
405
                query = unquote_plus(query)
15✔
406
                try:
15✔
407
                    return next((job for job in self.urlwatcher.jobs if unquote_plus(job.get_location()) == query))
15✔
408
                except StopIteration:
15✔
409
                    raise ValueError(f"Job {query} does not match any job's url/user_visible_url or command.") from None
15✔
410

411
        if index == 0:
15✔
412
            raise ValueError(f'Job index {index} out of range.')
15✔
413
        try:
15✔
414
            if index <= 0:
15✔
415
                return self.urlwatcher.jobs[index]
15✔
416
            return self.urlwatcher.jobs[index - 1]
15✔
417
        except IndexError as e:
15✔
418
            raise ValueError(f'Job index {index} out of range (found {len(self.urlwatcher.jobs)} jobs).') from e
15✔
419

420
    def _find_job_with_defaults(self, query: str | int) -> JobBase:
15✔
421
        """Returns the job with defaults based on job_id.
422

423
        This could match an index or a location (url/user_visible_url or command). Accepts negative numbers.
424

425
        :param query: The query.
426
        :return: The matching JobBase with defaults.
427
        :raises SystemExit: If job is not found.
428
        """
429
        job = self._find_job(query)
15✔
430
        return job.with_defaults(self.urlwatcher.config_storage.config)
15✔
431

432
    def test_job(self, job_id: bool | str | int) -> None:
15✔
433
        """Tests the running of a single job outputting the filtered text to --test-reporter (default is stdout).
434

435
        If job_id is True, don't run any jobs but load config, jobs and hook files to trigger any syntax errors.
436

437
        :param job_id: The job_id or True.
438

439
        :return: None.
440

441
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
442
        """
443
        if job_id is True:  # Load to trigger any eventual syntax errors
15✔
444
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
15✔
445
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
15✔
446
            if len(self.urlwatch_config.jobs_files) == 1:
15✔
447
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]},')
15✔
448
            else:
449
                message.append(
15✔
450
                    '\n   '.join(
451
                        [f'{conj}jobs files'] + [f'• {file},' for file in sorted(self.urlwatch_config.jobs_files)]
452
                    )
453
                )
454
            if 'hooks' in sys.modules:
15!
455
                message.append(f'\nand hooks file {sys.modules["hooks"].__file__}')
15✔
456
            print(f'{"".join(message)}.')
15✔
457
            return
15✔
458

459
        job = self._find_job_with_defaults(job_id)
15✔
460

461
        if isinstance(job, UrlJob):
15!
462
            # Force re-retrieval of job, as we're testing filters
463
            job.ignore_cached = True
×
464

465
        with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
15✔
466
            # duration = time.perf_counter() - start
467
            job_state.process(headless=not self.urlwatch_config.no_headless)
15✔
468
            if job_state.job.name is None:
15!
469
                job_state.job.name = ''
×
470
            # if job_state.job.note is None:
471
            #     job_state.job.note = ''
472
            data_info = '\n'.join(
15✔
473
                filter(
474
                    None,
475
                    (
476
                        f'• [GUID: {job_state.job.guid}]',
477
                        f'• [Media type: {job_state.new_mime_type}]' if job_state.new_mime_type else None,
478
                        f'• [ETag: {job_state.new_etag}]' if job_state.new_etag else None,
479
                        f'\nERROR {job_state.new_error_data["type"]}: {job_state.new_error_data["message"]}'
480
                        if job_state.new_error_data
481
                        else None,
482
                    ),
483
                )
484
            )
485
            job_state.new_data = f'{data_info}\n\n{job_state.new_data!s}'
15✔
486
            if self.urlwatch_config.test_reporter is None:
15✔
487
                self.urlwatch_config.test_reporter = 'stdout'  # default
15✔
488
            report = Report(self.urlwatcher)
15✔
489
            report.job_states = []  # required
15✔
490
            errorlevel = self.check_test_reporter(
15✔
491
                job_state,
492
                label='test',
493
                report=report,
494
            )
495
            if errorlevel:
15!
496
                self._exit(errorlevel)
×
497
        return
15✔
498

499
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
500
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
501

502
    def prepare_jobs(self) -> None:
15✔
503
        """Runs jobs that have no history to populate the snapshot database when they're newly added."""
504
        new_jobs = set()
15✔
505
        for idx, job in enumerate(self.urlwatcher.jobs):
15✔
506
            has_history = bool(self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid))
15✔
507
            if not has_history:
15!
508
                print(f'Running new {job.get_indexed_location()}.')
15✔
509
                new_jobs.add(idx + 1)
15✔
510
        if not new_jobs and not self.urlwatch_config.joblist:
15!
511
            print('Found no new jobs to run.')
×
512
            return
×
513
        self.urlwatcher.urlwatch_config.joblist = set(self.urlwatcher.urlwatch_config.joblist).union(new_jobs)
15✔
514
        self.urlwatcher.run_jobs()
15✔
515
        return
15✔
516

517
    def test_differ(self, arg_test_differ: list[str]) -> int:
15✔
518
        """Runs diffs for a job on all the saved snapshots.
519

520
        Outputs the result to stdout or the reporter selected  with --test-reporter.
521

522
        :param arg_test_differ: Either the job_id or a list containing [job_id, max_diffs]
523
        :return: 1 if error, 0 if successful.
524
        """
525
        report = Report(self.urlwatcher)
15✔
526
        self.urlwatch_config.jobs_files = [Path('--test-differ')]  # for report footer
15✔
527
        if len(arg_test_differ) == 1:
15✔
528
            job_id = arg_test_differ[0]
15✔
529
            max_diffs = None
15✔
530
        elif len(arg_test_differ) == 2:
15!
531
            job_id, max_diffs_str = arg_test_differ
15✔
532
            max_diffs = int(max_diffs_str)
15✔
533
        else:
534
            raise ValueError('--test-differ takes a maximum of two arguments')
×
535

536
        job = self._find_job_with_defaults(job_id)
15✔
537

538
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
15✔
539

540
        num_snapshots = len(history_data)
15✔
541
        if num_snapshots == 0:
15✔
542
            print('This job has never been run before.')
15✔
543
            return 1
15✔
544
        if num_snapshots < 2:
15✔
545
            print('Not enough historic data available (need at least 2 different snapshots).')
15✔
546
            return 1
15✔
547

548
        if job.compared_versions and job.compared_versions != 1:
15!
549
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
550

551
        max_diffs = max_diffs or num_snapshots - 1
15✔
552
        for i in range(max_diffs):
15✔
553
            with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
15✔
554
                job_state.new_data = history_data[i].data
15✔
555
                job_state.new_timestamp = history_data[i].timestamp
15✔
556
                job_state.new_etag = history_data[i].etag
15✔
557
                job_state.new_mime_type = history_data[i].mime_type
15✔
558
                if not job.compared_versions or job.compared_versions == 1:
15!
559
                    job_state.old_data = history_data[i + 1].data
15✔
560
                    job_state.old_timestamp = history_data[i + 1].timestamp
15✔
561
                    job_state.old_etag = history_data[i + 1].etag
15✔
562
                    job_state.old_mime_type = history_data[i + 1].mime_type
15✔
563
                else:
564
                    history_dic_snapshots = {s.data: s for s in history_data[i + 1 : i + 1 + job.compared_versions]}
×
565
                    close_matches: list[str] = difflib.get_close_matches(
×
566
                        str(job_state.new_data),
567
                        history_dic_snapshots.keys(),
568
                        n=1,
569
                    )  # ty:ignore[no-matching-overload]
570
                    if close_matches:
×
571
                        job_state.old_data = close_matches[0]
×
572
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
573
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
574
                        job_state.old_mime_type = history_dic_snapshots[close_matches[0]].mime_type
×
575

576
                if self.urlwatch_config.test_reporter is None:
15✔
577
                    self.urlwatch_config.test_reporter = 'stdout'  # default
15✔
578
                report.job_states = []  # required
15✔
579
                if job_state.new_data == job_state.old_data:
15!
580
                    label = (
×
581
                        f'No change (snapshots {-i:2} vs. {-(i + 1):2}) with '
582
                        f"'compared_versions: {job.compared_versions}'"
583
                    )
584
                    job_state.verb = 'changed,no_report'
×
585
                else:
586
                    label = f'Filtered diff (snapshots {-i:2} vs. {-(i + 1):2})'
15✔
587
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
15✔
588
                if errorlevel:
15!
589
                    self._exit(errorlevel)
×
590

591
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
592
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
593

594
        return 0
15✔
595

596
    def dump_history(self, job_id: str) -> int:
15✔
597
        """Displays the historical data stored in the snapshot database for a job.
598

599
        :param job_id: The Job ID.
600
        :return: An argument to be used in sys.exit.
601
        """
602
        try:
15✔
603
            job = self._find_job_with_defaults(job_id)
15✔
604
        except ValueError:
×
605
            print(f"No Job found matching '{job_id}'. Searching database using calculated GUID.")
×
606
            job = JobBase.unserialize({'url': job_id})
×
607

608
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
15✔
609

610
        title = f'History for {job.get_indexed_location()}'
15✔
611
        print(f'{title}\nGUID: {job.guid}')
15✔
612
        if history_data:
15✔
613
            print('=' * max(len(title), 46))
15✔
614
        total_failed = 0
15✔
615
        for i, snapshot in enumerate(history_data):
15✔
616
            mime_type = f' | Media type: {snapshot.mime_type}' if snapshot.mime_type else ''
15✔
617
            etag = f' | ETag: {snapshot.etag}' if snapshot.etag else ''
15✔
618
            tries = f' | Error run (number {snapshot.tries})' if snapshot.tries else ''
15✔
619
            total_failed += snapshot.tries > 0
15✔
620
            tz = self.urlwatcher.report.config['report']['tz']
15✔
621
            tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
15✔
622
            dt = datetime.fromtimestamp(snapshot.timestamp, tz_info)
15✔
623
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{mime_type}{etag}{tries}'
15✔
624
            sep_len = max(50, len(header))
15✔
625
            print(header)
15✔
626
            print('-' * sep_len)
15✔
627
            if snapshot.error_data:
15!
628
                print(f'{snapshot.error_data.get("type")}: {snapshot.error_data.get("message")}')
×
629
                print()
×
630
                print('Last good data:')
×
631
            print(snapshot.data)
15✔
632
            print('=' * sep_len, '\n')
15✔
633

634
        print(
15✔
635
            f'Found {len(history_data) - total_failed}'
636
            + (' good' if total_failed else '')
637
            + ' snapshot'
638
            + ('s' if len(history_data) - total_failed != 1 else '')
639
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
640
            + '.'
641
        )
642

643
        return 0
15✔
644

645
    def list_error_jobs(self) -> int:
15✔
646
        from webchanges.reporters import ReporterBase
15✔
647

648
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
15✔
649
            print(f'Invalid reporter {self.urlwatch_config.errors}.')
15✔
650
            return 1
15✔
651

652
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Iterator[str]:
15✔
653
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
654

655
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
656
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
657
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
658
            """
659

660
            def job_runner(
15✔
661
                stack: ExitStack,
662
                jobs: Iterable[JobBase],
663
                max_workers: int | None = None,
664
            ) -> Iterator[str]:
665
                """Modified worker.job_runner.
666

667
                Yields error text for jobs who fail with an exception or return no data.
668

669
                :param stack: The context manager.
670
                :param jobs: The jobs to run.
671
                :param max_workers: The number of maximum workers for ThreadPoolExecutor.
672
                :return: error text for jobs who fail with an exception or return no data.
673
                """
674
                executor = ThreadPoolExecutor(max_workers=max_workers)
15✔
675

676
                for job_state in executor.map(
15✔
677
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
678
                    (stack.enter_context(JobState(self.urlwatcher.ssdb_storage, job)) for job in jobs),
679
                ):
680
                    if not isinstance(job_state.exception, NotModifiedError):
15!
681
                        if job_state.exception is None:
15✔
682
                            if (
15!
683
                                len(job_state.new_data.strip()) == 0
684
                                if hasattr(job_state, 'new_data')
685
                                else len(job_state.old_data.strip()) == 0
686
                            ):
687
                                if self.urlwatch_config.verbose:
×
688
                                    yield f'{job_state.job.index_number:3}: No data: {job_state.job!r}'
×
689
                                else:
690
                                    pretty_name = job_state.job.pretty_name()
×
691
                                    location = job_state.job.get_location()
×
692
                                    if pretty_name != location:
×
693
                                        yield f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})'
×
694
                                    else:
695
                                        yield f'{job_state.job.index_number:3}: No data: {pretty_name}'
×
696
                        else:
697
                            pretty_name = job_state.job.pretty_name()
15✔
698
                            location = job_state.job.get_location()
15✔
699
                            if pretty_name != location:
15!
700
                                yield (
15✔
701
                                    f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
702
                                    f'({location})'
703
                                )
704
                            else:
705
                                yield f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})'
×
706

707
            with ExitStack() as stack:
15✔
708
                # This code is from worker.run_jobs, modified to yield from job_runner.
709
                from webchanges.worker import get_virt_mem  # avoid circular imports
15✔
710

711
                # run non-BrowserJob jobs first
712
                jobs_to_run = [job for job in jobs if not job.__is_browser__]
15✔
713
                if jobs_to_run:
15!
714
                    logger.debug(
15✔
715
                        "Running jobs that do not require Chrome (without 'use_browser: true') in parallel with "
716
                        "Python's default max_workers."
717
                    )
718
                    yield from job_runner(stack, jobs_to_run, self.urlwatch_config.max_workers)
15✔
719
                else:
720
                    logger.debug("Found no jobs that do not require Chrome (i.e. without 'use_browser: true').")
×
721

722
                # run BrowserJob jobs after
723
                jobs_to_run = [job for job in jobs if job.__is_browser__]
15✔
724
                if jobs_to_run:
15!
725
                    gc.collect()
×
726
                    virt_mem = get_virt_mem()
×
727
                    if self.urlwatch_config.max_workers:
×
728
                        max_workers = self.urlwatch_config.max_workers
×
729
                    else:
730
                        max_workers = max(int(virt_mem / 200e6), 1)
×
731
                        max_workers = min(max_workers, os.cpu_count() or 1)
×
732
                    logger.debug(
×
733
                        f"Running jobs that require Chrome (i.e. with 'use_browser: true') in parallel with "
734
                        f'{max_workers} max_workers.'
735
                    )
736
                    yield from job_runner(stack, jobs_to_run, max_workers)
×
737
                else:
738
                    logger.debug("Found no jobs that require Chrome (i.e. with 'use_browser: true').")
15✔
739

740
        start = time.perf_counter()
15✔
741

742
        # default max_workers (when not specified) to 1
743
        if self.urlwatch_config.max_workers is None:
15!
744
            self.urlwatch_config.max_workers = 1
15✔
745

746
        if len(self.urlwatch_config.jobs_files) == 1:
15!
747
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
15✔
748
        else:
749
            jobs_files = ['in the concatenation of the jobs files'] + [
×
750
                f'• {file},' for file in self.urlwatch_config.jobs_files
751
            ]
752
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)', *jobs_files])
15✔
753

754
        jobs = {
15✔
755
            job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs if job.is_enabled()
756
        }
757
        if self.urlwatch_config.errors == 'stdout':
15!
758
            print(header)
15✔
759
            for line in error_jobs_lines(jobs):
15✔
760
                print(line)
15✔
761
            print('--')
15✔
762
            duration = time.perf_counter() - start
15✔
763
            print(f'Checked {len(jobs)} enabled job{"s" if jobs else ""} for errors in {dur_text(duration)}.')
15✔
764

765
        else:
766
            message = '\n'.join(error_jobs_lines(jobs))
×
767
            if message:
×
768
                # create a dummy job state to run a reporter on
769
                job_state = JobState(
×
770
                    None,  # type: ignore[arg-type]
771
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
772
                )
773
                job_state.traceback = f'{header}\n{message}'
×
774
                duration = time.perf_counter() - start
×
775
                self.urlwatcher.report.config['footnote'] = (
×
776
                    f'Checked {len(jobs)} job{"s" if jobs else ""} for errors in {dur_text(duration)}.'
777
                )
778
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
779
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
780
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
781
                self.urlwatcher.report.error(job_state)
×
782
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
783
            else:
784
                print(header)
×
785
                print('--')
×
786
                duration = time.perf_counter() - start
×
787
                print('Found no errors.')
×
788
                print(f'Checked {len(jobs)} job{"s" if jobs else ""} for errors in {dur_text(duration)}.')
×
789

790
        return 0
15✔
791

792
    def rollback_database(self, timespec: str) -> int:
15✔
793
        """Issues a warning, calls rollback() and prints out the result.
794

795
        :param timestamp: A timespec that if numeric is interpreted as a Unix timestamp otherwise it's passed to
796
          dateutil.parser (if datetime is installed) or datetime.fromisoformat to be converted into a date.
797

798
        :return: A sys.exit code (0 for succcess, 1 for failure)
799
        """
800

801
        def _convert_to_datetime(timespec: str, tz_info: ZoneInfo | tzinfo | None) -> datetime:
15✔
802
            """Converts inputted string to a datetime object, using dateutil if installed.
803

804
            :param timespec: The string.
805
            :param tz_info: The timezone.
806

807
            :return: The datetime object.
808
            """
809
            # --- 1. Try parsing as a numeric timestamp ---
810
            # This is the fastest check and should come first.
811
            if timespec.isnumeric() or (timespec.startswith('-') and timespec[1:].isnumeric()):
15✔
812
                try:
15✔
813
                    timestamp = float(timespec)
15✔
814
                    return datetime.fromtimestamp(timestamp, tz=tz_info)
15✔
815
                except (ValueError, TypeError):
×
816
                    # Pass to the next method if it's not a valid float (e.g., "123a")
817
                    pass
×
818

819
            # --- 2. Try parsing as ISO 8601 format ---
820
            # datetime.fromisoformat is very efficient for standard formats.
821
            try:
15✔
822
                dt = datetime.fromisoformat(timespec)
15✔
823
                # If the parsed datetime is naive (no timezone), apply the provided one.
824
                if dt.tzinfo is None:
×
825
                    return dt.replace(tzinfo=tz_info)
×
826
                return dt
×
827
            except ValueError:
15✔
828
                # Pass to the next method if it's not a valid ISO string.
829
                pass
15✔
830

831
            # --- 3. Try parsing with the flexible but slower dateutil library ---
832
            try:
15✔
833
                from dateutil import parser as dateutil_parser
15✔
834

835
                try:
15✔
836
                    # Set a default datetime to provide context and timezone for ambiguous strings like "Sunday at 4pm".
837
                    default_dt_with_tz = datetime.now(tz_info).replace(second=0, microsecond=0)
15✔
838
                    return dateutil_parser.parse(timespec, default=default_dt_with_tz)  # bug
15✔
839
                except (ValueError, OverflowError):
15✔
840
                    # Pass to the next method if datetutil cannot parse.
841
                    pass
15✔
842
            except ImportError:
843
                # Pass to the next method if datetutil is not installed.
844
                pass
845

846
            # --- 4. If all parsing attempts fail ---
847
            raise ValueError(f'Cannot parse "{timespec}" into a date/time.')
15✔
848

849
        tz = self.urlwatcher.report.config['report']['tz']
15✔
850
        tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
15✔
851
        dt = _convert_to_datetime(timespec, tz_info)
15✔
852
        timestamp_date = email.utils.format_datetime(dt)
15✔
853
        count = self.urlwatcher.ssdb_storage.rollback(dt.timestamp())
15✔
854
        print(f'Rolling back database to {timestamp_date}.')
15✔
855
        if sys.__stdin__ and sys.__stdin__.isatty():
15✔
856
            print(
5✔
857
                f'WARNING: All {count} snapshots after this date/time (check timezone) will be deleted.\n'
858
                f'         ☠  This operation cannot be undone!\n'
859
                f'         We suggest you make a backup of the database file before proceeding:\n'
860
                f'         {self.urlwatch_config.ssdb_file}'
861
            )
862
            resp = input("         Please enter 'Y' to proceed: ")
5✔
863
            if not resp.upper().startswith('Y'):
5!
864
                print('Quitting rollback. No snapshots have been deleted.')
×
865
                return 1
×
866
        count = self.urlwatcher.ssdb_storage.rollback(dt.timestamp())
15✔
867
        if count:
15!
868
            print(f'Deleted {count} snapshots taken after {timestamp_date}.')
×
869
        else:
870
            print(f'No snapshots found after {timestamp_date}')
15✔
871
        return 0
15✔
872

873
    def delete_snapshot(self, job_id: str | int) -> int:
15✔
874
        job = self._find_job_with_defaults(job_id)
15✔
875
        history = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
15✔
876
        if not history:
15✔
877
            print(f'No snapshots found for {job.get_indexed_location()}.')
15✔
878
            return 1
15✔
879
        tz = self.urlwatcher.report.config['report']['tz']
15✔
880
        tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
15✔
881
        if sys.__stdin__ and sys.__stdin__.isatty():
15✔
882
            print(f'WARNING: About to delete the latest snapshot of\n         {job.get_indexed_location()}:')
5✔
883
            for i, history_job in enumerate(history):
5✔
884
                print(
5✔
885
                    f'         {i + 1}. {"❌ " if i == 0 else "   "}'
886
                    f'{email.utils.format_datetime(datetime.fromtimestamp(history_job.timestamp).astimezone(tz_info))}'
887
                    f'{"  ⬅  ABOUT TO BE DELETED!" if i == 0 else ""}'
888
                )
889
            print(
5✔
890
                f'         ☠  This operation cannot be undone!\n'
891
                f'         We suggest you make a backup of the database file before proceeding:\n'
892
                f'         {self.urlwatch_config.ssdb_file}'
893
            )
894
            resp = input("         Please enter 'Y' to proceed: ")
5✔
895
            if not resp.upper().startswith('Y'):
5!
896
                print('Quitting. No snapshots have been deleted.')
×
897
                return 1
×
898
        count = self.urlwatcher.ssdb_storage.delete_latest(job.guid)
15✔
899
        if count:
15!
900
            print(f'Deleted last snapshot of {job.get_indexed_location()}; {len(history) - 1} snapshots left.')
15✔
901
            return 0
15✔
902
        print(f'No snapshots found for {job.get_indexed_location()}.')
×
903
        return 1
×
904

905
    def modify_urls(self) -> int:
15✔
906
        if self.urlwatch_config.delete is not None:
15✔
907
            job = self._find_job(self.urlwatch_config.delete)
15✔
908
            if job is not None:
15!
909
                if sys.__stdin__ and sys.__stdin__.isatty():
15✔
910
                    print(
5✔
911
                        f'WARNING: About to permanently delete {job.get_indexed_location()}.\n'
912
                        '         Job file will be overwritten and all remarks lost.'
913
                        '         This operation cannot be undone!\n'
914
                    )
915
                    resp = input("         Please enter 'Y' to proceed: ")
5✔
916
                    if not resp.upper().startswith('Y'):
5!
917
                        print(f'Quitting. Job {job.index_number} has not been deleted and job file is unmodified.')
×
918
                        return 1
×
919
                self.urlwatcher.jobs.remove(job)
15✔
920
                print(f'Removed {job}.')
15✔
921
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
15✔
922
            else:
923
                print(f'Job not found: {self.urlwatch_config.delete}.')
×
924
                return 1
×
925

926
        if self.urlwatch_config.add is not None:
15✔
927
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
928
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
15✔
929
            filters = [v for k, v in items if k == 'filter']
15✔
930
            items2 = [(k, v) for k, v in items if k != 'filter']
15✔
931
            d = dict(items2)
15✔
932
            if filters:
15!
933
                d['filter'] = ','.join(filters)
×
934

935
            job = JobBase.unserialize(d)
15✔
936
            print(f'Adding {job}.')
15✔
937
            self.urlwatcher.jobs.append(job)
15✔
938
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
15✔
939

940
        if self.urlwatch_config.change_location is not None:
15✔
941
            new_loc = self.urlwatch_config.change_location[1]
15✔
942
            # Ensure the user isn't overwriting an existing job with the change.
943
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
15!
944
                print(
×
945
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
946
                    f'different value.\n'
947
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
948
                )
949
                return 1
×
950
            job = self._find_job(self.urlwatch_config.change_location[0])
15✔
951
            if job is not None:
15!
952
                # Update the job's location (which will also update the guid) and move any history in the database
953
                # over to the job's updated guid.
954
                old_loc = job.get_location()
15✔
955
                print(f'Moving location of "{old_loc}" to "{new_loc}".')
15✔
956
                old_guid = job.guid
15✔
957
                if old_guid not in self.urlwatcher.ssdb_storage.get_guids():
15✔
958
                    print(f'No snapshots found for "{old_loc}".')
15✔
959
                    return 1
15✔
960
                job.set_base_location(new_loc)
15✔
961
                num_searched = self.urlwatcher.ssdb_storage.move(old_guid, job.guid)
15✔
962
                if num_searched:
15!
963
                    print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}".')
15✔
964
            else:
965
                print(f'Job not found: "{self.urlwatch_config.change_location[0]}".')
×
966
                return 1
×
967
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
15✔
968
            if not input(message).lower().startswith('y'):
15!
969
                print(f'Please manually update the jobs file by replacing "{old_loc}" with "{new_loc}".')
×
970
            else:
971
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
15✔
972

973
        return 0
15✔
974

975
    def edit_config(self) -> int:
15✔
976
        return self.urlwatcher.config_storage.edit()
15✔
977

978
    def check_telegram_chats(self) -> None:
15✔
979
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
15✔
980

981
        bot_token = config['bot_token']
15✔
982
        if not bot_token:
15✔
983
            print('You need to set up your bot token first (see documentation).')
15✔
984
            self._exit(1)
15✔
985

986
        with httpx.Client(http2=h2 is not None) if httpx else requests.Session() as http_client:
15✔
987
            info = http_client.get(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
15✔
988
            if not info['ok']:
15!
989
                print(f'Error with token {bot_token}: {info["description"]}.')
15✔
990
                self._exit(1)
15✔
991

992
            chats = {}
×
993
            updates = http_client.get(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
994
        if 'result' in updates:
6!
995
            for chat_info in updates['result']:
×
996
                chat = chat_info['message']['chat']
×
997
                if chat['type'] == 'private':
×
998
                    chats[chat['id']] = (
×
999
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
1000
                    )
1001

1002
        if not chats:
×
1003
            print(f'No chats found. Say hello to your bot at https://t.me/{info["result"]["username"]}.')
×
1004
            self._exit(1)
×
1005

1006
        headers = ('Chat ID', 'Name')
×
1007
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
1008
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
1009
        fmt = f'%-{maxchat}s  %s'
×
1010
        print(fmt % headers)
×
1011
        print(fmt % ('-' * maxchat, '-' * maxname))
×
1012
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
1013
            print(fmt % (k, v))
×
1014
        print(f'\nChat up your bot here: https://t.me/{info["result"]["username"]}.')
×
1015

1016
        self._exit(0)
×
1017

1018
    def check_test_reporter(
15✔
1019
        self,
1020
        job_state: JobState | None = None,
1021
        label: str = 'test',
1022
        report: Report | None = None,
1023
    ) -> int:
1024
        """Tests a reporter by creating pseudo-jobs of new, changed, unchanged, and error outcomes ('verb').
1025

1026
        Note: The report will only show new, unchanged and error content if enabled in the respective `display` keys
1027
        of the configuration file.
1028

1029
        :param job_state: The JobState (Optional).
1030
        :param label: The label to be used in the report; defaults to 'test'.
1031
        :param report: A Report class to use for testing (Optional).
1032
        :return: 0 if successful, 1 otherwise.
1033
        """
1034
        from webchanges.reporters import ReporterBase
15✔
1035

1036
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
15✔
1037
            """Builds a pseudo-job for the reporter to run on."""
1038
            job = JobBase.unserialize({'name': job_name, 'url': url})
15✔
1039

1040
            # Can pass in None for ssdb_storage, as we are not going to load or save the job state for
1041
            # testing; also no need to use it as context manager, since no processing is called on the job
1042
            job_state = JobState(None, job)  # type: ignore[arg-type]
15✔
1043

1044
            job_state.old_data = old
15✔
1045
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
15✔
1046
            job_state.new_data = new
15✔
1047
            job_state.new_timestamp = time.time()
15✔
1048

1049
            return job_state
15✔
1050

1051
        def set_error(job_state: 'JobState', message: str) -> JobState:
15✔
1052
            """Sets a job error message on a JobState."""
1053
            try:
15✔
1054
                raise ValueError(message)
15✔
1055
            except ValueError as e:
15✔
1056
                job_state.exception = e
15✔
1057
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
15✔
1058

1059
            return job_state
15✔
1060

1061
        reporter_name = self.urlwatch_config.test_reporter
15✔
1062
        if reporter_name not in ReporterBase.__subclasses__:
15✔
1063
            print(
15✔
1064
                f'No such reporter: {reporter_name}.\n'
1065
                f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}.\n'
1066
            )
1067
            return 1
15✔
1068

1069
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][reporter_name]
15✔
1070
        if job_state:  # we want a full report
15✔
1071
            cfg['enabled'] = True
15✔
1072
            self.urlwatcher.config_storage.config['display'][label] = True
15✔
1073
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
15✔
1074
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
15✔
1075
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
15✔
1076
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
15✔
1077
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
15✔
1078
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
15✔
1079
            self.urlwatcher.config_storage.config['report']['stdout']['color'] = False
15✔
1080
        elif not cfg['enabled']:
15✔
1081
            print(
15✔
1082
                f'WARNING: Reporter being tested is not enabled: {reporter_name}.\n'
1083
                f'Will still attempt to test it, but this may not work.\n'
1084
                f'Use {__project_name__} --edit-config to configure reporters.'
1085
            )
1086
            cfg['enabled'] = True
15✔
1087

1088
        if report is None:
15✔
1089
            report = Report(self.urlwatcher)
15✔
1090

1091
        if job_state:
15✔
1092
            report.custom(job_state, label)  # type: ignore[arg-type]
15✔
1093
        else:
1094
            report.new(
15✔
1095
                build_job(
1096
                    'Sample job that was newly added',
1097
                    'https://example.com/new',
1098
                    '',
1099
                    '',
1100
                )
1101
            )
1102
            report.changed(
15✔
1103
                build_job(
1104
                    'Sample job where something changed',
1105
                    'https://example.com/changed',
1106
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
1107
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
1108
                )
1109
            )
1110
            report.unchanged(
15✔
1111
                build_job(
1112
                    'Sample job where nothing changed',
1113
                    'http://example.com/unchanged',
1114
                    'Same Old, Same Old\n',
1115
                    'Same Old, Same Old\n',
1116
                )
1117
            )
1118
            report.error(
15✔
1119
                set_error(
1120
                    build_job(
1121
                        'Sample job where an error was encountered',
1122
                        'https://example.com/error',
1123
                        '',
1124
                        '',
1125
                    ),
1126
                    'The error message would appear here.',
1127
                )
1128
            )
1129

1130
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
15✔
1131

1132
        return 0
15✔
1133

1134
    def check_smtp_login(self) -> None:
15✔
1135
        from webchanges.mailer import SMTPMailer, smtp_have_password, smtp_set_password
15✔
1136

1137
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
15✔
1138
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
15✔
1139

1140
        success = True
15✔
1141

1142
        if not config['enabled']:
15✔
1143
            print('Please enable email reporting in the config first.')
15✔
1144
            success = False
15✔
1145

1146
        if config['method'] != 'smtp':
15✔
1147
            print('Please set the method to SMTP for the email reporter.')
15✔
1148
            success = False
15✔
1149

1150
        smtp_auth = smtp_config['auth']
15✔
1151
        if not smtp_auth:
15✔
1152
            print('Authentication must be enabled for SMTP.')
15✔
1153
            success = False
15✔
1154

1155
        smtp_hostname = smtp_config['host']
15✔
1156
        if not smtp_hostname:
15✔
1157
            print('Please configure the SMTP hostname in the config first.')
15✔
1158
            success = False
15✔
1159

1160
        smtp_username = smtp_config['user'] or config['from']
15✔
1161
        if not smtp_username:
15✔
1162
            print('Please configure the SMTP user in the config first.')
15✔
1163
            success = False
15✔
1164

1165
        if not success:
15✔
1166
            self._exit(1)
15✔
1167

1168
        insecure_password = smtp_config['insecure_password']
6✔
1169
        if insecure_password:
6!
1170
            print('The SMTP password is set in the config file (key "insecure_password").')
6✔
1171
        elif smtp_have_password(smtp_hostname, smtp_username):
×
1172
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
1173
            if not input(message).lower().startswith('y'):
×
1174
                print('Password unchanged.')
×
1175
            else:
1176
                smtp_set_password(smtp_hostname, smtp_username)
×
1177

1178
        smtp_port = smtp_config['port']
6✔
1179
        smtp_tls = smtp_config['starttls']
6✔
1180

1181
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
6✔
1182
        print('Trying to log into the SMTP server...')
6✔
1183
        mailer.send(None)
6✔
1184
        print('Successfully logged into SMTP server.')
×
1185

1186
        self._exit(0)
×
1187

1188
    def check_xmpp_login(self) -> None:
15✔
1189
        from webchanges.reporters import xmpp_have_password, xmpp_set_password
15✔
1190

1191
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
15✔
1192

1193
        success = True
15✔
1194

1195
        if not xmpp_config['enabled']:
15✔
1196
            print('Please enable XMPP reporting in the config first.')
15✔
1197
            success = False
15✔
1198

1199
        xmpp_sender = xmpp_config['sender']
15✔
1200
        if not xmpp_sender:
15✔
1201
            print('Please configure the XMPP sender in the config first.')
15✔
1202
            success = False
15✔
1203

1204
        if not xmpp_config['recipient']:
15✔
1205
            print('Please configure the XMPP recipient in the config first.')
15✔
1206
            success = False
15✔
1207

1208
        if not success:
15✔
1209
            self._exit(1)
15✔
1210

1211
        if 'insecure_password' in xmpp_config:
15!
1212
            print('The XMPP password is already set in the config (key "insecure_password").')
15✔
1213
            self._exit(0)
15✔
1214

1215
        if xmpp_have_password(xmpp_sender):
×
1216
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
1217
            if input(message).lower() != 'y':
×
1218
                print('Password unchanged.')
×
1219
                self._exit(0)
×
1220

1221
        if success:
×
1222
            xmpp_set_password(xmpp_sender)
×
1223

1224
        self._exit(0)
×
1225

1226
    @staticmethod
1227
    def playwright_install_chrome() -> int:  # pragma: no cover
1228
        """Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
1229
        install the browser executable.
1230

1231
        :return: Playwright's executable return code.
1232
        """
1233
        try:
1234
            from playwright._impl._driver import compute_driver_executable
1235
        except ImportError:  # pragma: no cover
1236
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
1237

1238
        driver_executable = compute_driver_executable()
1239
        env = os.environ.copy()
1240
        env['PW_CLI_TARGET_LANG'] = 'python'
1241
        cmd = [str(driver_executable), 'install', 'chrome']
1242
        logger.info(f'Running playwright CLI: {" ".join(cmd)}')
1243
        completed_process = subprocess.run(cmd, check=False, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
1244
        if completed_process.returncode:
1245
            print(completed_process.stderr)
1246
            return completed_process.returncode
1247
        if completed_process.stdout:
1248
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1249
        return 0
1250

1251
    def handle_actions(self) -> None:
15✔
1252
        """Handles the actions for command line arguments and exits."""
1253
        if self.urlwatch_config.list_jobs:
15✔
1254
            self.list_jobs(self.urlwatch_config.list_jobs)
15✔
1255
            self._exit(0)
15✔
1256

1257
        if self.urlwatch_config.errors:
15✔
1258
            self._exit(self.list_error_jobs())
15✔
1259

1260
        if self.urlwatch_config.test_job:
15✔
1261
            self.test_job(self.urlwatch_config.test_job)
15✔
1262
            self._exit(0)
15✔
1263

1264
        if self.urlwatch_config.prepare_jobs:
15✔
1265
            self.prepare_jobs()
15✔
1266
            self._exit(0)
15✔
1267

1268
        if self.urlwatch_config.test_differ:
15✔
1269
            self._exit(self.test_differ(self.urlwatch_config.test_differ))
15✔
1270

1271
        if self.urlwatch_config.dump_history:
15✔
1272
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
15✔
1273

1274
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
15✔
1275
            self._exit(self.modify_urls())
15✔
1276

1277
        if self.urlwatch_config.test_reporter:
15✔
1278
            self._exit(self.check_test_reporter())
15✔
1279

1280
        if self.urlwatch_config.smtp_login:
15✔
1281
            self.check_smtp_login()
15✔
1282

1283
        if self.urlwatch_config.telegram_chats:
15✔
1284
            self.check_telegram_chats()
15✔
1285

1286
        if self.urlwatch_config.xmpp_login:
15✔
1287
            self.check_xmpp_login()
15✔
1288

1289
        if self.urlwatch_config.edit:
15✔
1290
            self._exit(self.urlwatcher.jobs_storage.edit())
15✔
1291

1292
        if self.urlwatch_config.edit_config:
15✔
1293
            self._exit(self.edit_config())
15✔
1294

1295
        if self.urlwatch_config.edit_hooks:
15✔
1296
            self._exit(self.edit_hooks())
15✔
1297

1298
        if self.urlwatch_config.gc_database:
15✔
1299
            self.urlwatcher.ssdb_storage.gc(
15✔
1300
                [job.guid for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1301
            )
1302
            self._exit(0)
15✔
1303

1304
        if self.urlwatch_config.clean_database:
15✔
1305
            self.urlwatcher.ssdb_storage.clean_ssdb(
15✔
1306
                [job.guid for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1307
            )
1308
            self._exit(0)
15✔
1309

1310
        if self.urlwatch_config.rollback_database:
15✔
1311
            exit_arg = self.rollback_database(self.urlwatch_config.rollback_database)
15✔
1312
            self._exit(exit_arg)
15✔
1313

1314
        if self.urlwatch_config.delete_snapshot:
15✔
1315
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
15✔
1316

1317
        if self.urlwatch_config.features:
15✔
1318
            self._exit(self.show_features())
15✔
1319

1320
        if self.urlwatch_config.detailed_versions:
15!
1321
            self._exit(self.show_detailed_versions())
15✔
1322

1323
    def run(self) -> None:  # pragma: no cover
1324
        """The main run logic."""
1325
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1326
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1327

1328
        self.handle_actions()
1329

1330
        self.urlwatcher.run_jobs()
1331

1332
        self.urlwatcher.close()
1333

1334
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc