• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 16871375955

11 Aug 2025 05:06AM UTC coverage: 72.561% (-1.9%) from 74.431%
16871375955

push

github

mborsetti
Version 3.31.1rc0

1749 of 2772 branches covered (63.1%)

Branch coverage included in aggregate %.

4574 of 5942 relevant lines covered (76.98%)

5.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.35
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import difflib
8✔
8
import email.utils
8✔
9
import gc
8✔
10
import importlib.metadata
8✔
11
import logging
8✔
12
import os
8✔
13
import platform
8✔
14
import re
8✔
15
import shutil
8✔
16
import sqlite3
8✔
17
import subprocess
8✔
18
import sys
8✔
19
import time
8✔
20
import traceback
8✔
21
from concurrent.futures import ThreadPoolExecutor
8✔
22
from contextlib import ExitStack
8✔
23
from datetime import datetime, tzinfo
8✔
24
from pathlib import Path
8✔
25
from typing import TYPE_CHECKING, Iterable, Iterator
8✔
26
from urllib.parse import unquote_plus
8✔
27
from zoneinfo import ZoneInfo
8✔
28

29
from webchanges import __docs_url__, __project_name__, __version__
8✔
30
from webchanges.differs import DifferBase
8✔
31
from webchanges.filters import FilterBase
8✔
32
from webchanges.handler import JobState, Report
8✔
33
from webchanges.jobs import JobBase, NotModifiedError, UrlJob
8✔
34
from webchanges.mailer import SMTPMailer, smtp_have_password, smtp_set_password
8✔
35
from webchanges.reporters import ReporterBase, xmpp_have_password, xmpp_set_password
8✔
36
from webchanges.util import dur_text, edit_file, import_module_from_source
8✔
37

38
try:
8✔
39
    import httpx
8✔
40
except ImportError:  # pragma: no cover
41
    httpx = None  # type: ignore[assignment]
42
    print("Required package 'httpx' not found; will attempt to run using 'requests'.")
43
    try:
44
        import requests
45
    except ImportError as e:  # pragma: no cover
46
        raise RuntimeError(
47
            f"A Python HTTP client package (either 'httpx' or 'requests' is required to run {__project_name__}; "
48
            'neither can be imported.'
49
        ) from e
50
if httpx is not None:
8!
51
    try:
8✔
52
        import h2
8✔
53
    except ImportError:  # pragma: no cover
54
        h2 = None  # type: ignore[assignment]
55

56
if os.name == 'posix':
8!
57
    try:
8✔
58
        import apt
8✔
59
    except ImportError:  # pragma: no cover
60
        apt = None
61

62
try:
8✔
63
    from pip._internal.metadata import get_default_environment
8✔
64
except ImportError:  # pragma: no cover
65
    get_default_environment = None  # type: ignore[assignment]
66

67
try:
8✔
68
    from playwright.sync_api import sync_playwright
8✔
69
except ImportError:  # pragma: no cover
70
    sync_playwright = None  # type: ignore[assignment]
71

72
try:
8✔
73
    import psutil
8✔
74
    from psutil._common import bytes2human
8✔
75
except ImportError:  # pragma: no cover
76
    psutil = None  # type: ignore[assignment]
77
    bytes2human = None  # type: ignore[assignment]
78

79
logger = logging.getLogger(__name__)
8✔
80

81
if TYPE_CHECKING:
82
    from webchanges.main import Urlwatch
83
    from webchanges.reporters import _ConfigReportersList
84
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
85

86

87
class UrlwatchCommand:
8✔
88
    """The class that runs the program after initialization and CLI arguments parsing."""
89

90
    def __init__(self, urlwatcher: Urlwatch) -> None:
8✔
91
        self.urlwatcher = urlwatcher
8✔
92
        self.urlwatch_config = urlwatcher.urlwatch_config
8✔
93

94
    @staticmethod
8✔
95
    def _exit(arg: str | int | None) -> None:
8✔
96
        logger.info(f'Exiting with exit code {arg}')
8✔
97
        sys.exit(arg)
8✔
98

99
    def jobs_from_joblist(self) -> Iterator[JobBase]:
8✔
100
        """Generates the jobs to process from the joblist entered in the CLI."""
101
        if self.urlwatcher.urlwatch_config.joblist:
8✔
102
            jobs = {self._find_job(job_entry) for job_entry in self.urlwatcher.urlwatch_config.joblist}
8!
103
            enabled_jobs = {job for job in jobs if job.is_enabled()}
8!
104
            disabled = len(enabled_jobs) - len(jobs)
8✔
105
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
106
            logger.debug(
8✔
107
                f'Processing {len(enabled_jobs)} job{"s" if len(enabled_jobs) else ""}{disabled_str} as specified in '
108
                f'command line: {", ".join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}'
109
            )
110
        else:
111
            enabled_jobs = {job for job in self.urlwatcher.jobs if job.is_enabled()}
8!
112
            disabled = len(enabled_jobs) - len(self.urlwatcher.jobs)
8✔
113
            disabled_str = f' (excluding {disabled} disabled)' if disabled else ''
8✔
114
            logger.debug(f'Processing {len(enabled_jobs)} job{"s" if len(enabled_jobs) else ""}{disabled_str}')
8✔
115
        for job in enabled_jobs:
8✔
116
            yield job.with_defaults(self.urlwatcher.config_storage.config)
8✔
117

118
    def edit_hooks(self) -> int:
8✔
119
        """Edit hooks file.
120

121
        :returns: 0 if edit is successful, 1 otherwise.
122
        """
123
        # Similar code to BaseTextualFileStorage.edit()
124
        for hooks_file in self.urlwatch_config.hooks_files:
8✔
125
            logger.debug(f'Edit file {hooks_file}')
8✔
126
            # Python 3.9: hooks_edit = self.urlwatch_config.hooks.with_stem(self.urlwatch_config.hooks.stem + '_edit')
127
            hooks_edit = hooks_file.parent.joinpath(hooks_file.stem + '_edit' + ''.join(hooks_file.suffixes))
8✔
128
            if hooks_file.exists():
8!
129
                shutil.copy(hooks_file, hooks_edit)
8✔
130
            # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
131
            #         self.urlwatch_config.hooks_py_example):
132
            #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
133

134
            while True:
8✔
135
                try:
8✔
136
                    edit_file(hooks_edit)
8✔
137
                    import_module_from_source('hooks', hooks_edit)
8✔
138
                    break  # stop if no exception on parser
8✔
139
                except SystemExit:
8!
140
                    raise
×
141
                except Exception as e:
8✔
142
                    print('Parsing failed:')
8✔
143
                    print('======')
8✔
144
                    print(e)
8✔
145
                    print('======')
8✔
146
                    print('')
8✔
147
                    print(f'The file {hooks_file} was NOT updated.')
8✔
148
                    user_input = input('Do you want to retry the same edit? (Y/n)')
8✔
149
                    if not user_input or user_input.lower()[0] == 'y':
×
150
                        continue
×
151
                    hooks_edit.unlink()
×
152
                    print('No changes have been saved.')
×
153
                    return 1
×
154

155
            if hooks_file.is_symlink():
8!
156
                hooks_file.write_text(hooks_edit.read_text())
×
157
            else:
158
                hooks_edit.replace(hooks_file)
8✔
159
            hooks_edit.unlink(missing_ok=True)
8✔
160
            print(f'Saved edits in {hooks_file}.')
8✔
161

162
        return 0
8✔
163

164
    @staticmethod
8✔
165
    def show_features() -> int:
8✔
166
        """
167
        Prints the "features", i.e. a list of job types, filters and reporters.
168

169
        :return: 0.
170
        """
171
        print(f'Please see full documentation at {__docs_url__}.')
8✔
172
        print()
8✔
173
        print('Supported jobs:\n')
8✔
174
        print(JobBase.job_documentation())
8✔
175
        print('Supported filters:\n')
8✔
176
        print(FilterBase.filter_documentation())
8✔
177
        print()
8✔
178
        print('Supported differs:\n')
8✔
179
        print(DifferBase.differ_documentation())
8✔
180
        print()
8✔
181
        print('Supported reporters:\n')
8✔
182
        print(ReporterBase.reporter_documentation())
8✔
183
        print()
8✔
184
        print(f'Please see full documentation at {__docs_url__}.')
8✔
185

186
        return 0
8✔
187

188
    @staticmethod
8✔
189
    def show_detailed_versions() -> int:
8✔
190
        """
191
        Prints the detailed versions, including of dependencies.
192

193
        :return: 0.
194
        """
195

196
        def dependencies() -> list[str]:
8✔
197
            if get_default_environment is not None:
8!
198
                env = get_default_environment()
8✔
199
                dist = None
8✔
200
                for dist in env.iter_all_distributions():
8✔
201
                    if dist.canonical_name == __project_name__:
8!
202
                        break
×
203
                if dist and dist.canonical_name == __project_name__:
8!
204
                    return sorted(set(d.split()[0] for d in dist.metadata_dict['requires_dist']), key=str.lower)
×
205

206
            # default list of all possible dependencies
207
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
8✔
208
            return [
8✔
209
                'aioxmpp',
210
                'beautifulsoup4',
211
                'chump',
212
                'colorama',
213
                'cryptography',
214
                'cssbeautifier',
215
                'cssselect',
216
                'deepdiff',
217
                'h2',
218
                'html2text',
219
                'httpx',
220
                'jq',
221
                'jsbeautifier',
222
                'keyring',
223
                'lxml',
224
                'markdown2',
225
                'matrix_client',
226
                'msgpack',
227
                'pdftotext',
228
                'Pillow',
229
                'platformdirs',
230
                'playwright',
231
                'psutil',
232
                'pushbullet.py',
233
                'pypdf',
234
                'pytesseract',
235
                'pyyaml',
236
                'redis',
237
                'requests',
238
                'tzdata',
239
                'vobject',
240
            ]
241

242
        print('Software:')
8✔
243
        print(f'• {__project_name__}: {__version__}')
8✔
244
        print(
8✔
245
            f'• {platform.python_implementation()}: {platform.python_version()} '
246
            f'{platform.python_build()} {platform.python_compiler()}'
247
        )
248
        print(f'• SQLite: {sqlite3.sqlite_version}')
8✔
249

250
        if psutil:
8!
251
            print()
8✔
252
            print('System:')
8✔
253
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
8✔
254
            print(f'• Processor: {platform.processor()}')
8✔
255
            print(f'• CPUs (logical): {psutil.cpu_count()}')
8✔
256
            try:
8✔
257
                virt_mem = psutil.virtual_memory().available
8✔
258
                print(
8✔
259
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
260
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
261
                )
262
            except psutil.Error as e:  # pragma: no cover
263
                print(f'• Free memory: Could not read information: {e}')
264
            print(
8✔
265
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
266
                f'({100 - psutil.disk_usage("/").percent:.1f}%)'
267
            )
268
            executor = ThreadPoolExecutor()
8✔
269
            print(f'• --max-threads default: {executor._max_workers}')
8✔
270

271
        print()
8✔
272
        print('Installed PyPi dependencies:')
8✔
273
        for module_name in dependencies():
8✔
274
            try:
8✔
275
                mod = importlib.metadata.distribution(module_name)
8✔
276
            except ModuleNotFoundError:
8✔
277
                continue
8✔
278
            print(f'• {module_name}: {mod.version}')
8✔
279
            # package requirements
280
            if mod.requires:
8✔
281
                for req_name in [i.split()[0] for i in mod.requires]:
8!
282
                    try:
8✔
283
                        req = importlib.metadata.distribution(req_name)
8✔
284
                    except ModuleNotFoundError:
8✔
285
                        continue
8✔
286
                    print(f'  - {req_name}: {req.version}')
8✔
287

288
        # playwright
289
        if sync_playwright is not None:
8!
290
            with sync_playwright() as p:
8!
291
                browser = p.chromium.launch(channel='chrome')
×
292
                print()
×
293
                print('Playwright browser:')
×
294
                print(f'• Name: {browser.browser_type.name}')
×
295
                print(f'• Version: {browser.version}')
×
296
                if psutil:
×
297
                    browser.new_page()
×
298
                    try:
×
299
                        virt_mem = psutil.virtual_memory().available
×
300
                        print(
×
301
                            f'• Free memory with browser loaded: '
302
                            f'{bytes2human(virt_mem)} physical plus '
303
                            f'{bytes2human(psutil.swap_memory().free)} swap'
304
                        )
305
                    except psutil.Error:
×
306
                        pass
×
307

308
        if os.name == 'posix' and apt:
8!
309
            apt_cache = apt.Cache()
×
310

311
            def print_version(libs: list[str]) -> None:
×
312
                for lib in libs:
×
313
                    if lib in apt_cache:
×
314
                        if ver := apt_cache[lib].versions:
×
315
                            print(f'   - {ver[0].package}: {ver[0].version}')
×
316
                return None
×
317

318
            print()
×
319
            print('Installed dpkg dependencies:')
×
320
            for module, apt_dists in (
×
321
                ('jq', ['jq']),
322
                # https://github.com/jalan/pdftotext#os-dependencies
323
                ('pdftotext', ['libpoppler-cpp-dev']),
324
                # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
325
                (
326
                    'Pillow',
327
                    [
328
                        'libjpeg-dev',
329
                        'zlib-dev',
330
                        'zlib1g-dev',
331
                        'libtiff-dev',
332
                        'libfreetype-dev',
333
                        'littlecms-dev',
334
                        'libwebp-dev',
335
                        'tcl/tk-dev',
336
                        'openjpeg-dev',
337
                        'libimagequant-dev',
338
                        'libraqm-dev',
339
                        'libxcb-dev',
340
                        'libxcb1-dev',
341
                    ],
342
                ),
343
                ('playwright', ['google-chrome-stable']),
344
                # https://tesseract-ocr.github.io/tessdoc/Installation.html
345
                ('pytesseract', ['tesseract-ocr']),
346
            ):
347
                try:
×
348
                    importlib.metadata.distribution(module)
×
349
                    print(f'• {module}')
×
350
                    print_version(apt_dists)
×
351
                except importlib.metadata.PackageNotFoundError:
×
352
                    pass
×
353
        return 0
8✔
354

355
    def list_jobs(self, regex: bool | str) -> None:
8✔
356
        """
357
        Lists the job and their respective _index_number.
358

359
        :return: None.
360
        """
361
        if isinstance(regex, str):
8!
362
            print(f"List of jobs matching the RegEx '{regex}':")
×
363
        else:
364
            print('List of jobs:')
8✔
365
        for job in self.urlwatcher.jobs:
8✔
366
            if self.urlwatch_config.verbose:
8✔
367
                job_desc = f'{job.index_number:3}: {job!r}'
8✔
368
            else:
369
                pretty_name = job.pretty_name()
8✔
370
                location = job.get_location()
8✔
371
                if pretty_name != location:
8!
372
                    job_desc = f'{job.index_number:3}: {pretty_name} ({location})'
8✔
373
                else:
374
                    job_desc = f'{job.index_number:3}: {pretty_name}'
×
375
            if isinstance(regex, bool) or re.findall(regex, job_desc):
8!
376
                print(job_desc)
8✔
377

378
        if len(self.urlwatch_config.jobs_files) > 1:
8✔
379
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
8!
380
        elif len(self.urlwatch_config.jobs_files) == 1:
8✔
381
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
8✔
382
        else:
383
            jobs_files = []
8✔
384
        print('\n   '.join(jobs_files))
8✔
385

386
    def _find_job(self, query: str | int) -> JobBase:
8✔
387
        """Finds the job based on a query, which is matched to the job index (also negative) or a job location
388
        (i.e. the url/user_visible_url or command).
389

390
        :param query: The query.
391
        :return: The matching JobBase.
392
        :raises IndexError: If job is not found.
393
        """
394
        if isinstance(query, int):
8✔
395
            index = query
8✔
396
        else:
397
            try:
8✔
398
                index = int(query)
8✔
399
            except ValueError:
8✔
400
                query = unquote_plus(query)
8✔
401
                try:
8✔
402
                    return next((job for job in self.urlwatcher.jobs if unquote_plus(job.get_location()) == query))
8✔
403
                except StopIteration:
8✔
404
                    raise ValueError(f"Job {query} does not match any job's url/user_visible_url or command.") from None
8✔
405

406
        if index == 0:
8✔
407
            raise ValueError(f'Job index {index} out of range.')
8✔
408
        try:
8✔
409
            if index <= 0:
8✔
410
                return self.urlwatcher.jobs[index]
8✔
411
            else:
412
                return self.urlwatcher.jobs[index - 1]
8✔
413
        except IndexError as e:
8✔
414
            raise ValueError(f'Job index {index} out of range (found {len(self.urlwatcher.jobs)} jobs).') from e
8✔
415

416
    def _find_job_with_defaults(self, query: str | int) -> JobBase:
8✔
417
        """
418
        Returns the job with defaults based on job_id, which could match an index or match a location
419
        (url/user_visible_url or command). Accepts negative numbers.
420

421
        :param query: The query.
422
        :return: The matching JobBase with defaults.
423
        :raises SystemExit: If job is not found.
424
        """
425
        job = self._find_job(query)
8✔
426
        return job.with_defaults(self.urlwatcher.config_storage.config)
8✔
427

428
    def test_job(self, job_id: bool | str | int) -> None:
8✔
429
        """
430
        Tests the running of a single job outputting the filtered text to --test-reporter (default is stdout). If
431
        job_id is True, don't run any jobs but load config, jobs and hook files to trigger any syntax errors.
432

433
        :param job_id: The job_id or True.
434

435
        :return: None.
436

437
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
438
        """
439
        if job_id is True:  # Load to trigger any eventual syntax errors
8✔
440
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
8✔
441
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
8✔
442
            if len(self.urlwatch_config.jobs_files) == 1:
8✔
443
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]},')
8✔
444
            else:
445
                message.append(
8!
446
                    '\n   '.join(
447
                        [f'{conj}jobs files'] + [f'• {file},' for file in sorted(self.urlwatch_config.jobs_files)]
448
                    )
449
                )
450
            if 'hooks' in sys.modules:
8!
451
                message.append(f'\nand hooks file {sys.modules["hooks"].__file__}')
8✔
452
            print(f'{"".join(message)}.')
8✔
453
            return
8✔
454

455
        job = self._find_job_with_defaults(job_id)
8✔
456

457
        if isinstance(job, UrlJob):
8!
458
            # Force re-retrieval of job, as we're testing filters
459
            job.ignore_cached = True
×
460

461
        with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
462
            # duration = time.perf_counter() - start
463
            job_state.process(headless=not self.urlwatch_config.no_headless)
8✔
464
            if job_state.job.name is None:
8!
465
                job_state.job.name = ''
×
466
            # if job_state.job.note is None:
467
            #     job_state.job.note = ''
468
            data_info = '\n'.join(
8✔
469
                filter(
470
                    None,
471
                    (
472
                        f'• [GUID: {job_state.job.guid}]',
473
                        f'• [Media type: {job_state.new_mime_type}]' if job_state.new_mime_type else None,
474
                        f'• [ETag: {job_state.new_etag}]' if job_state.new_etag else None,
475
                    ),
476
                )
477
            )
478
            job_state.new_data = f'{data_info}\n\n{job_state.new_data!s}'
8✔
479
            if self.urlwatch_config.test_reporter is None:
8✔
480
                self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
481
            report = Report(self.urlwatcher)
8✔
482
            report.job_states = []  # required
8✔
483
            errorlevel = self.check_test_reporter(
8✔
484
                job_state,
485
                label='error' if job_state.exception else 'test',
486
                report=report,
487
            )
488
            if errorlevel:
8!
489
                self._exit(errorlevel)
×
490
        return
8✔
491

492
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
493
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
494

495
    def prepare_jobs(self) -> None:
8✔
496
        """
497
        Runs jobs that have no history to populate the snapshot database when they're newly added.
498
        """
499
        new_jobs = set()
8✔
500
        for idx, job in enumerate(self.urlwatcher.jobs):
8✔
501
            has_history = bool(self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid))
8✔
502
            if not has_history:
8!
503
                print(f'Running new {job.get_indexed_location()}.')
8✔
504
                new_jobs.add(idx + 1)
8✔
505
        if not new_jobs and not self.urlwatch_config.joblist:
8!
506
            print('Found no new jobs to run.')
×
507
            return
×
508
        self.urlwatcher.urlwatch_config.joblist = set(self.urlwatcher.urlwatch_config.joblist).union(new_jobs)
8✔
509
        self.urlwatcher.run_jobs()
8✔
510
        self.urlwatcher.close()
8✔
511
        return
8✔
512

513
    def test_differ(self, arg_test_differ: list[str]) -> int:
8✔
514
        """
515
        Runs diffs for a job on all the saved snapshots and outputs the result to stdout or the reporter selected
516
        with --test-reporter.
517

518
        :param arg_test_differ: Either the job_id or a list containing [job_id, max_diffs]
519
        :return: 1 if error, 0 if successful.
520
        """
521
        report = Report(self.urlwatcher)
8✔
522
        self.urlwatch_config.jobs_files = [Path('--test-differ')]  # for report footer
8✔
523
        if len(arg_test_differ) == 1:
8✔
524
            job_id = arg_test_differ[0]
8✔
525
            max_diffs = None
8✔
526
        elif len(arg_test_differ) == 2:
8!
527
            job_id, max_diffs_str = arg_test_differ
8✔
528
            max_diffs = int(max_diffs_str)
8✔
529
        else:
530
            raise ValueError('--test-differ takes a maximum of two arguments')
×
531

532
        job = self._find_job_with_defaults(job_id)
8✔
533

534
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
8✔
535

536
        num_snapshots = len(history_data)
8✔
537
        if num_snapshots == 0:
8✔
538
            print('This job has never been run before.')
8✔
539
            return 1
8✔
540
        elif num_snapshots < 2:
8✔
541
            print('Not enough historic data available (need at least 2 different snapshots).')
8✔
542
            return 1
8✔
543

544
        if job.compared_versions and job.compared_versions != 1:
8!
545
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
546

547
        max_diffs = max_diffs or num_snapshots - 1
8✔
548
        for i in range(max_diffs):
8✔
549
            with JobState(self.urlwatcher.ssdb_storage, job) as job_state:
8✔
550
                job_state.new_data = history_data[i].data
8✔
551
                job_state.new_timestamp = history_data[i].timestamp
8✔
552
                job_state.new_etag = history_data[i].etag
8✔
553
                job_state.new_mime_type = history_data[i].mime_type
8✔
554
                if not job.compared_versions or job.compared_versions == 1:
8!
555
                    job_state.old_data = history_data[i + 1].data
8✔
556
                    job_state.old_timestamp = history_data[i + 1].timestamp
8✔
557
                    job_state.old_etag = history_data[i + 1].etag
8✔
558
                    job_state.old_mime_type = history_data[i + 1].mime_type
8✔
559
                else:
560
                    history_dic_snapshots = {s.data: s for s in history_data[i + 1 : i + 1 + job.compared_versions]}
×
561
                    close_matches: list[str] = difflib.get_close_matches(
×
562
                        str(job_state.new_data),
563
                        history_dic_snapshots.keys(),  # type: ignore[arg-type]
564
                        n=1,
565
                    )
566
                    if close_matches:
×
567
                        job_state.old_data = close_matches[0]
×
568
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
569
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
570
                        job_state.old_mime_type = history_dic_snapshots[close_matches[0]].mime_type
×
571

572
                if self.urlwatch_config.test_reporter is None:
8✔
573
                    self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
574
                report.job_states = []  # required
8✔
575
                if job_state.new_data == job_state.old_data:
8!
576
                    label = (
×
577
                        f'No change (snapshots {-i:2} AND {-(i + 1):2}) with '
578
                        f"'compared_versions: {job.compared_versions}'"
579
                    )
580
                    job_state.verb = 'changed,no_report'
×
581
                else:
582
                    label = f'Filtered diff (snapshots {-i:2} and {-(i + 1):2})'
8✔
583
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
8✔
584
                if errorlevel:
8!
585
                    self._exit(errorlevel)
×
586

587
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
588
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
589

590
        return 0
8✔
591

592
    def dump_history(self, job_id: str) -> int:
8✔
593
        """
594
        Displays the historical data stored in the snapshot database for a job.
595

596
        :param job_id: The Job ID.
597
        :return: An argument to be used in sys.exit.
598
        """
599

600
        job = self._find_job_with_defaults(job_id)
8✔
601
        history_data = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
8✔
602

603
        title = f'History for {job.get_indexed_location()}'
8✔
604
        print(f'{title}\nGUID: {job.guid}')
8✔
605
        if history_data:
8✔
606
            print('=' * max(len(title), 46))
8✔
607
        total_failed = 0
8✔
608
        for i, snapshot in enumerate(history_data):
8✔
609
            mime_type = f' | Media type: {snapshot.mime_type}' if snapshot.mime_type else ''
8✔
610
            etag = f' | ETag: {snapshot.etag}' if snapshot.etag else ''
8✔
611
            tries = f' | Error run (number {snapshot.tries})' if snapshot.tries else ''
8✔
612
            total_failed += snapshot.tries > 0
8✔
613
            tz = self.urlwatcher.report.config['report']['tz']
8✔
614
            tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
8✔
615
            dt = datetime.fromtimestamp(snapshot.timestamp, tz_info)
8✔
616
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{mime_type}{etag}{tries}'
8✔
617
            sep_len = max(50, len(header))
8✔
618
            print(header)
8✔
619
            print('-' * sep_len)
8✔
620
            if snapshot.error_data:
8!
621
                print(f'{snapshot.error_data.get("type")}: {snapshot.error_data.get("message")}')
×
622
                print()
×
623
                print('Last good data:')
×
624
            print(snapshot.data)
8✔
625
            print('=' * sep_len, '\n')
8✔
626

627
        print(
8✔
628
            f'Found {len(history_data) - total_failed}'
629
            + (' good' if total_failed else '')
630
            + ' snapshot'
631
            + ('s' if len(history_data) - total_failed != 1 else '')
632
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
633
            + '.'
634
        )
635

636
        return 0
8✔
637

638
    def list_error_jobs(self) -> int:
8✔
639
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
8✔
640
            print(f'Invalid reporter {self.urlwatch_config.errors}.')
8✔
641
            return 1
8✔
642

643
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Iterator[str]:
8✔
644
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
645

646
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
647
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
648
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
649
            """
650

651
            def job_runner(
8✔
652
                stack: ExitStack,
653
                jobs: Iterable[JobBase],
654
                max_workers: int | None = None,
655
            ) -> Iterator[str]:
656
                """
657
                Modified worker.job_runner that yields error text for jobs who fail with an exception or yield no data.
658

659
                :param stack: The context manager.
660
                :param jobs: The jobs to run.
661
                :param max_workers: The number of maximum workers for ThreadPoolExecutor.
662
                :return: error text for jobs who fail with an exception or yield no data.
663
                """
664
                executor = ThreadPoolExecutor(max_workers=max_workers)
8✔
665

666
                for job_state in executor.map(
8✔
667
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
668
                    (stack.enter_context(JobState(self.urlwatcher.ssdb_storage, job)) for job in jobs),
669
                ):
670
                    if not isinstance(job_state.exception, NotModifiedError):
8!
671
                        if job_state.exception is None:
8✔
672
                            if (
8!
673
                                len(job_state.new_data.strip()) == 0
674
                                if hasattr(job_state, 'new_data')
675
                                else len(job_state.old_data.strip()) == 0
676
                            ):
677
                                if self.urlwatch_config.verbose:
×
678
                                    yield f'{job_state.job.index_number:3}: No data: {job_state.job!r}'
×
679
                                else:
680
                                    pretty_name = job_state.job.pretty_name()
×
681
                                    location = job_state.job.get_location()
×
682
                                    if pretty_name != location:
×
683
                                        yield f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})'
×
684
                                    else:
685
                                        yield f'{job_state.job.index_number:3}: No data: {pretty_name}'
×
686
                        else:
687
                            pretty_name = job_state.job.pretty_name()
8✔
688
                            location = job_state.job.get_location()
8✔
689
                            if pretty_name != location:
8!
690
                                yield (
8✔
691
                                    f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
692
                                    f'({location})'
693
                                )
694
                            else:
695
                                yield f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})'
×
696

697
            with ExitStack() as stack:
8✔
698
                # This code is from worker.run_jobs, modified to yield from job_runner.
699
                from webchanges.worker import get_virt_mem  # avoid circular imports
8✔
700

701
                # run non-BrowserJob jobs first
702
                jobs_to_run = [job for job in jobs if not job.__is_browser__]
8!
703
                if jobs_to_run:
8!
704
                    logger.debug(
8✔
705
                        "Running jobs that do not require Chrome (without 'use_browser: true') in parallel with "
706
                        "Python's default max_workers."
707
                    )
708
                    yield from job_runner(stack, jobs_to_run, self.urlwatch_config.max_workers)
8✔
709
                else:
710
                    logger.debug("Found no jobs that do not require Chrome (i.e. without 'use_browser: true').")
×
711

712
                # run BrowserJob jobs after
713
                jobs_to_run = [job for job in jobs if job.__is_browser__]
8!
714
                if jobs_to_run:
8!
715
                    gc.collect()
×
716
                    virt_mem = get_virt_mem()
×
717
                    if self.urlwatch_config.max_workers:
×
718
                        max_workers = self.urlwatch_config.max_workers
×
719
                    else:
720
                        max_workers = max(int(virt_mem / 200e6), 1)
×
721
                        max_workers = min(max_workers, os.cpu_count() or 1)
×
722
                    logger.debug(
×
723
                        f"Running jobs that require Chrome (i.e. with 'use_browser: true') in parallel with "
724
                        f'{max_workers} max_workers.'
725
                    )
726
                    yield from job_runner(stack, jobs_to_run, max_workers)
×
727
                else:
728
                    logger.debug("Found no jobs that require Chrome (i.e. with 'use_browser: true').")
8✔
729

730
        start = time.perf_counter()
8✔
731

732
        # default max_workers (when not specified) to 1
733
        if self.urlwatch_config.max_workers is None:
8!
734
            self.urlwatch_config.max_workers = 1
8✔
735

736
        if len(self.urlwatch_config.jobs_files) == 1:
8!
737
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
8✔
738
        else:
739
            jobs_files = ['in the concatenation of the jobs files'] + [
×
740
                f'• {file},' for file in self.urlwatch_config.jobs_files
741
            ]
742
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)', *jobs_files])
8✔
743

744
        jobs = {
8!
745
            job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs if job.is_enabled()
746
        }
747
        if self.urlwatch_config.errors == 'stdout':
8!
748
            print(header)
8✔
749
            for line in error_jobs_lines(jobs):
8✔
750
                print(line)
8✔
751
            print('--')
8✔
752
            duration = time.perf_counter() - start
8✔
753
            print(f'Checked {len(jobs)} enabled job{"s" if len(jobs) else ""} for errors in {dur_text(duration)}.')
8✔
754

755
        else:
756
            message = '\n'.join(error_jobs_lines(jobs))
×
757
            if message:
×
758
                # create a dummy job state to run a reporter on
759
                job_state = JobState(
×
760
                    None,  # type: ignore[arg-type]
761
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
762
                )
763
                job_state.traceback = f'{header}\n{message}'
×
764
                duration = time.perf_counter() - start
×
765
                self.urlwatcher.report.config['footnote'] = (
×
766
                    f'Checked {len(jobs)} job{"s" if len(jobs) else ""} for errors in {dur_text(duration)}.'
767
                )
768
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
769
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
770
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
771
                self.urlwatcher.report.error(job_state)
×
772
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
773
            else:
774
                print(header)
×
775
                print('--')
×
776
                duration = time.perf_counter() - start
×
777
                print('Found no errors.')
×
778
                print(f'Checked {len(jobs)} job{"s" if len(jobs) else ""} for errors in {dur_text(duration)}.')
×
779

780
        return 0
8✔
781

782
    def rollback_database(self, timespec: str) -> int:
8✔
783
        """Issues a warning, calls rollback() and prints out the result.
784

785
        :param timestamp: A timespec that if numeric is interpreted as a Unix timestamp otherwise it's passed to
786
          dateutil.parser (if datetime is installed) or datetime.fromisoformat to be converted into a date.
787

788
        :return: A sys.exit code (0 for succcess, 1 for failure)
789
        """
790

791
        def _convert_to_datetime(timespec: str, tz_info: ZoneInfo | tzinfo | None) -> datetime:
8✔
792
            """Converts inputted string to a datetime object, using dateutil if installed.
793

794
            :param timespec: The string.
795
            :param tz_info: The timezone.
796

797
            :return: The datetime object.
798
            """
799
            try:
8✔
800
                timestamp = float(timespec)
8✔
801
                return datetime.fromtimestamp(timestamp, tz_info)
8✔
802
            except ValueError:
8✔
803
                try:
8✔
804
                    from dateutil import parser as dateutil_parser
8✔
805

806
                    default_dt_with_tz = datetime.now(tz_info).replace(second=0, microsecond=0)
8✔
807
                    return dateutil_parser.parse(timespec, default=default_dt_with_tz)
8✔
808
                    # return dateutil_parser.parse(timespec)
809
                except ImportError:
810
                    dt = datetime.fromisoformat(timespec)
811
                    if not dt.tzinfo:
812
                        dt = dt.replace(tzinfo=tz_info)
813
                    return dt
814

815
        tz = self.urlwatcher.report.config['report']['tz']
8✔
816
        tz_info = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
8✔
817
        dt = _convert_to_datetime(timespec, tz_info)
8✔
818
        timestamp_date = email.utils.format_datetime(dt)
8✔
819
        count = self.urlwatcher.ssdb_storage.rollback(dt.timestamp())
8✔
820
        print(f'Rolling back database to {timestamp_date}.')
8✔
821
        if sys.__stdin__ and sys.__stdin__.isatty():
8!
822
            print(
×
823
                f'WARNING: All {count} snapshots after this date/time (check timezone) will be deleted.\n'
824
                '         💀 This operation cannot be undone!\n'
825
                '         We suggest you make a backup of the database file before proceeding.\n'
826
            )
827
            resp = input("         Please enter 'Y' to proceed: ")
×
828
            if not resp.upper().startswith('Y'):
×
829
                print('Quitting rollback. No snapshots have been deleted.')
×
830
                return 1
×
831
        count = self.urlwatcher.ssdb_storage.rollback(dt.timestamp())
8✔
832
        if count:
8!
833
            print(f'Deleted {count} snapshots taken after {timestamp_date}.')
×
834
            self.urlwatcher.ssdb_storage.close()
×
835
        else:
836
            print(f'No snapshots found after {timestamp_date}')
8✔
837
        return 0
8✔
838

839
    def delete_snapshot(self, job_id: str | int) -> int:
8✔
840
        job = self._find_job_with_defaults(job_id)
8✔
841
        history = self.urlwatcher.ssdb_storage.get_history_snapshots(job.guid)
8✔
842
        if not history:
8✔
843
            print(f'No snapshots found for {job.get_indexed_location()}.')
8✔
844
            return 1
8✔
845
        if sys.__stdin__ and sys.__stdin__.isatty():
8!
846
            print(f'WARNING: About to delete the latest snapshot of\n         {job.get_indexed_location()}:')
×
847
            for i, history_job in enumerate(history):
×
848
                print(
×
849
                    f'         {i + 1}. {"❌ " if i == 0 else "   "}'
850
                    f'{email.utils.format_datetime(datetime.fromtimestamp(history_job.timestamp))}'  # noqa: DTZ006
851
                    f'{"  ⬅  ABOUT TO BE DELETED!" if i == 0 else ""}'
852
                )
853
            print(
×
854
                '         This operation cannot be undone!\n'
855
                '         We suggest you make a backup of the database file before proceeding.\n'
856
            )
857
            resp = input("         Please enter 'Y' to proceed: ")
×
858
            if not resp.upper().startswith('Y'):
×
859
                print('Quitting. No snapshots have been deleted.')
×
860
                return 1
×
861
        count = self.urlwatcher.ssdb_storage.delete_latest(job.guid)
8✔
862
        if count:
8!
863
            print(f'Deleted last snapshot of {job.get_indexed_location()}; {len(history) - 1} snapshots left.')
8✔
864
            return 0
8✔
865
        else:
866
            print(f'No snapshots found for {job.get_indexed_location()}.')
×
867
            return 1
×
868

869
    def modify_urls(self) -> int:
8✔
870
        if self.urlwatch_config.delete is not None:
8✔
871
            job = self._find_job(self.urlwatch_config.delete)
8✔
872
            if job is not None:
8!
873
                if sys.__stdin__ and sys.__stdin__.isatty():
8!
874
                    print(
×
875
                        f'WARNING: About to permanently delete {job.get_indexed_location()}.\n'
876
                        '         Job file will be overwritten and all remarks lost.'
877
                        '         This operation cannot be undone!\n'
878
                    )
879
                    resp = input("         Please enter 'Y' to proceed: ")
×
880
                    if not resp.upper().startswith('Y'):
×
881
                        print(f'Quitting. Job {job.index_number} has not been deleted and job file is unmodified.')
×
882
                        return 1
×
883
                self.urlwatcher.jobs.remove(job)
8✔
884
                print(f'Removed {job}.')
8✔
885
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
886
            else:
887
                print(f'Job not found: {self.urlwatch_config.delete}.')
×
888
                return 1
×
889

890
        if self.urlwatch_config.add is not None:
8✔
891
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
892
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
8!
893
            filters = [v for k, v in items if k == 'filter']
8!
894
            items2 = [(k, v) for k, v in items if k != 'filter']
8!
895
            d = {k: v for k, v in items2}
8!
896
            if filters:
8!
897
                d['filter'] = ','.join(filters)
×
898

899
            job = JobBase.unserialize(d)
8✔
900
            print(f'Adding {job}.')
8✔
901
            self.urlwatcher.jobs.append(job)
8✔
902
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
903

904
        if self.urlwatch_config.change_location is not None:
8✔
905
            new_loc = self.urlwatch_config.change_location[1]
8✔
906
            # Ensure the user isn't overwriting an existing job with the change.
907
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
8!
908
                print(
×
909
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
910
                    f'different value.\n'
911
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
912
                )
913
                return 1
×
914
            else:
915
                job = self._find_job(self.urlwatch_config.change_location[0])
8✔
916
                if job is not None:
8!
917
                    # Update the job's location (which will also update the guid) and move any history in the database
918
                    # over to the job's updated guid.
919
                    old_loc = job.get_location()
8✔
920
                    print(f'Moving location of "{old_loc}" to "{new_loc}".')
8✔
921
                    old_guid = job.guid
8✔
922
                    if old_guid not in self.urlwatcher.ssdb_storage.get_guids():
8✔
923
                        print(f'No snapshots found for "{old_loc}".')
8✔
924
                        return 1
8✔
925
                    job.set_base_location(new_loc)
8✔
926
                    num_searched = self.urlwatcher.ssdb_storage.move(old_guid, job.guid)
8✔
927
                    if num_searched:
8!
928
                        print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}".')
8✔
929
                else:
930
                    print(f'Job not found: "{self.urlwatch_config.change_location[0]}".')
×
931
                    return 1
×
932
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
8✔
933
            if not input(message).lower().startswith('y'):
8!
934
                print(f'Please manually update the jobs file by replacing "{old_loc}" with "{new_loc}".')
×
935
            else:
936
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
937

938
        return 0
8✔
939

940
    def edit_config(self) -> int:
8✔
941
        result = self.urlwatcher.config_storage.edit()
8✔
942
        return result
8✔
943

944
    def check_telegram_chats(self) -> None:
8✔
945
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
8✔
946

947
        bot_token = config['bot_token']
8✔
948
        if not bot_token:
8✔
949
            print('You need to set up your bot token first (see documentation).')
8✔
950
            self._exit(1)
8✔
951

952
        if httpx:
8!
953
            get_client = httpx.Client(http2=h2 is not None).get
8✔
954
        else:
955
            get_client = requests.get  # type: ignore[assignment]
×
956

957
        info = get_client(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
8✔
958
        if not info['ok']:
8!
959
            print(f'Error with token {bot_token}: {info["description"]}.')
8✔
960
            self._exit(1)
8✔
961

962
        chats = {}
×
963
        updates = get_client(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
964
        if 'result' in updates:
×
965
            for chat_info in updates['result']:
×
966
                chat = chat_info['message']['chat']
×
967
                if chat['type'] == 'private':
×
968
                    chats[chat['id']] = (
×
969
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
970
                    )
971

972
        if not chats:
×
973
            print(f'No chats found. Say hello to your bot at https://t.me/{info["result"]["username"]}.')
×
974
            self._exit(1)
×
975

976
        headers = ('Chat ID', 'Name')
×
977
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
978
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
979
        fmt = f'%-{maxchat}s  %s'
×
980
        print(fmt % headers)
×
981
        print(fmt % ('-' * maxchat, '-' * maxname))
×
982
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
983
            print(fmt % (k, v))
×
984
        print(f'\nChat up your bot here: https://t.me/{info["result"]["username"]}.')
×
985

986
        self._exit(0)
×
987

988
    def check_test_reporter(
8✔
989
        self,
990
        job_state: JobState | None = None,
991
        label: str = 'test',
992
        report: Report | None = None,
993
    ) -> int:
994
        """
995
        Tests a reporter by creating pseudo-jobs of new, changed, unchanged, and error outcomes ('verb').
996

997
        Note: The report will only show new, unchanged and error content if enabled in the respective `display` keys
998
        of the configuration file.
999

1000
        :param job_state: The JobState (Optional).
1001
        :param label: The label to be used in the report; defaults to 'test'.
1002
        :param report: A Report class to use for testing (Optional).
1003
        :return: 0 if successful, 1 otherwise.
1004
        """
1005

1006
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
8✔
1007
            """Builds a pseudo-job for the reporter to run on."""
1008
            job = JobBase.unserialize({'name': job_name, 'url': url})
8✔
1009

1010
            # Can pass in None for ssdb_storage, as we are not going to load or save the job state for
1011
            # testing; also no need to use it as context manager, since no processing is called on the job
1012
            job_state = JobState(None, job)  # type: ignore[arg-type]
8✔
1013

1014
            job_state.old_data = old
8✔
1015
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
8✔
1016
            job_state.new_data = new
8✔
1017
            job_state.new_timestamp = time.time()
8✔
1018

1019
            return job_state
8✔
1020

1021
        def set_error(job_state: 'JobState', message: str) -> JobState:
8✔
1022
            """Sets a job error message on a JobState."""
1023
            try:
8✔
1024
                raise ValueError(message)
8✔
1025
            except ValueError as e:
8✔
1026
                job_state.exception = e
8✔
1027
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
1028

1029
            return job_state
8✔
1030

1031
        reporter_name = self.urlwatch_config.test_reporter
8✔
1032
        if reporter_name not in ReporterBase.__subclasses__:
8✔
1033
            print(
8✔
1034
                f'No such reporter: {reporter_name}.\n'
1035
                f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}.\n'
1036
            )
1037
            return 1
8✔
1038

1039
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][
8✔
1040
            reporter_name  # type: ignore[literal-required]
1041
        ]
1042
        if job_state:  # we want a full report
8✔
1043
            cfg['enabled'] = True
8✔
1044
            self.urlwatcher.config_storage.config['display'][label] = True  # type: ignore[literal-required]
8✔
1045
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
8✔
1046
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
8✔
1047
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
8✔
1048
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
8✔
1049
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
8✔
1050
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
8✔
1051
            self.urlwatcher.config_storage.config['report']['stdout']['color'] = False
8✔
1052
        elif not cfg['enabled']:
8✔
1053
            print(
8✔
1054
                f'WARNING: Reporter being tested is not enabled: {reporter_name}.\n'
1055
                f'Will still attempt to test it, but this may not work.\n'
1056
                f'Use {__project_name__} --edit-config to configure reporters.'
1057
            )
1058
            cfg['enabled'] = True
8✔
1059

1060
        if report is None:
8✔
1061
            report = Report(self.urlwatcher)
8✔
1062

1063
        if job_state:
8✔
1064
            report.custom(job_state, label)  # type: ignore[arg-type]
8✔
1065
        else:
1066
            report.new(
8✔
1067
                build_job(
1068
                    'Sample job that was newly added',
1069
                    'https://example.com/new',
1070
                    '',
1071
                    '',
1072
                )
1073
            )
1074
            report.changed(
8✔
1075
                build_job(
1076
                    'Sample job where something changed',
1077
                    'https://example.com/changed',
1078
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
1079
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
1080
                )
1081
            )
1082
            report.unchanged(
8✔
1083
                build_job(
1084
                    'Sample job where nothing changed',
1085
                    'http://example.com/unchanged',
1086
                    'Same Old, Same Old\n',
1087
                    'Same Old, Same Old\n',
1088
                )
1089
            )
1090
            report.error(
8✔
1091
                set_error(
1092
                    build_job(
1093
                        'Sample job where an error was encountered',
1094
                        'https://example.com/error',
1095
                        '',
1096
                        '',
1097
                    ),
1098
                    'The error message would appear here.',
1099
                )
1100
            )
1101

1102
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
8✔
1103

1104
        return 0
8✔
1105

1106
    def check_smtp_login(self) -> None:
8✔
1107
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
8✔
1108
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
8✔
1109

1110
        success = True
8✔
1111

1112
        if not config['enabled']:
8✔
1113
            print('Please enable email reporting in the config first.')
8✔
1114
            success = False
8✔
1115

1116
        if config['method'] != 'smtp':
8✔
1117
            print('Please set the method to SMTP for the email reporter.')
8✔
1118
            success = False
8✔
1119

1120
        smtp_auth = smtp_config['auth']
8✔
1121
        if not smtp_auth:
8✔
1122
            print('Authentication must be enabled for SMTP.')
8✔
1123
            success = False
8✔
1124

1125
        smtp_hostname = smtp_config['host']
8✔
1126
        if not smtp_hostname:
8✔
1127
            print('Please configure the SMTP hostname in the config first.')
8✔
1128
            success = False
8✔
1129

1130
        smtp_username = smtp_config['user'] or config['from']
8✔
1131
        if not smtp_username:
8✔
1132
            print('Please configure the SMTP user in the config first.')
8✔
1133
            success = False
8✔
1134

1135
        if not success:
8✔
1136
            self._exit(1)
8✔
1137

1138
        insecure_password = smtp_config['insecure_password']
2✔
1139
        if insecure_password:
2!
1140
            print('The SMTP password is set in the config file (key "insecure_password").')
2✔
1141
        elif smtp_have_password(smtp_hostname, smtp_username):
×
1142
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
1143
            if not input(message).lower().startswith('y'):
×
1144
                print('Password unchanged.')
×
1145
            else:
1146
                smtp_set_password(smtp_hostname, smtp_username)
×
1147

1148
        smtp_port = smtp_config['port']
2✔
1149
        smtp_tls = smtp_config['starttls']
2✔
1150

1151
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
2✔
1152
        print('Trying to log into the SMTP server...')
2✔
1153
        mailer.send(None)
2✔
1154
        print('Successfully logged into SMTP server.')
×
1155

1156
        self._exit(0)
×
1157

1158
    def check_xmpp_login(self) -> None:
8✔
1159
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
8✔
1160

1161
        success = True
8✔
1162

1163
        if not xmpp_config['enabled']:
8✔
1164
            print('Please enable XMPP reporting in the config first.')
8✔
1165
            success = False
8✔
1166

1167
        xmpp_sender = xmpp_config['sender']
8✔
1168
        if not xmpp_sender:
8✔
1169
            print('Please configure the XMPP sender in the config first.')
8✔
1170
            success = False
8✔
1171

1172
        if not xmpp_config['recipient']:
8✔
1173
            print('Please configure the XMPP recipient in the config first.')
8✔
1174
            success = False
8✔
1175

1176
        if not success:
8✔
1177
            self._exit(1)
8✔
1178

1179
        if 'insecure_password' in xmpp_config:
8!
1180
            print('The XMPP password is already set in the config (key "insecure_password").')
8✔
1181
            self._exit(0)
8✔
1182

1183
        if xmpp_have_password(xmpp_sender):
×
1184
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
1185
            if input(message).lower() != 'y':
×
1186
                print('Password unchanged.')
×
1187
                self._exit(0)
×
1188

1189
        if success:
×
1190
            xmpp_set_password(xmpp_sender)
×
1191

1192
        self._exit(0)
×
1193

1194
    @staticmethod
2✔
1195
    def playwright_install_chrome() -> int:  # pragma: no cover
1196
        """
1197
        Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
1198
        install the browser executable.
1199

1200
        :return: Playwright's executable return code.
1201
        """
1202
        try:
1203
            from playwright._impl._driver import compute_driver_executable
1204
        except ImportError:  # pragma: no cover
1205
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
1206

1207
        driver_executable = compute_driver_executable()
1208
        env = os.environ.copy()
1209
        env['PW_CLI_TARGET_LANG'] = 'python'
1210
        cmd = [str(driver_executable), 'install', 'chrome']
1211
        logger.info(f'Running playwright CLI: {" ".join(cmd)}')
1212
        completed_process = subprocess.run(cmd, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
1213
        if completed_process.returncode:
1214
            print(completed_process.stderr)
1215
            return completed_process.returncode
1216
        if completed_process.stdout:
1217
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1218
        return 0
1219

1220
    def handle_actions(self) -> None:
8✔
1221
        """Handles the actions for command line arguments and exits."""
1222
        if self.urlwatch_config.list_jobs:
8✔
1223
            self.list_jobs(self.urlwatch_config.list_jobs)
8✔
1224
            self._exit(0)
8✔
1225

1226
        if self.urlwatch_config.errors:
8✔
1227
            self._exit(self.list_error_jobs())
8✔
1228

1229
        if self.urlwatch_config.test_job:
8✔
1230
            self.test_job(self.urlwatch_config.test_job)
8✔
1231
            self._exit(0)
8✔
1232

1233
        if self.urlwatch_config.prepare_jobs:
8✔
1234
            self.prepare_jobs()
8✔
1235
            self._exit(0)
8✔
1236

1237
        if self.urlwatch_config.test_differ:
8✔
1238
            self._exit(self.test_differ(self.urlwatch_config.test_differ))
8✔
1239

1240
        if self.urlwatch_config.dump_history:
8✔
1241
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
8✔
1242

1243
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
8✔
1244
            self._exit(self.modify_urls())
8✔
1245

1246
        if self.urlwatch_config.test_reporter:
8✔
1247
            self._exit(self.check_test_reporter())
8✔
1248

1249
        if self.urlwatch_config.smtp_login:
8✔
1250
            self.check_smtp_login()
8✔
1251

1252
        if self.urlwatch_config.telegram_chats:
8✔
1253
            self.check_telegram_chats()
8✔
1254

1255
        if self.urlwatch_config.xmpp_login:
8✔
1256
            self.check_xmpp_login()
8✔
1257

1258
        if self.urlwatch_config.edit:
8✔
1259
            self._exit(self.urlwatcher.jobs_storage.edit())
8✔
1260

1261
        if self.urlwatch_config.edit_config:
8✔
1262
            self._exit(self.edit_config())
8✔
1263

1264
        if self.urlwatch_config.edit_hooks:
8✔
1265
            self._exit(self.edit_hooks())
8✔
1266

1267
        if self.urlwatch_config.gc_database:
8✔
1268
            self.urlwatcher.ssdb_storage.gc(
8!
1269
                [job.guid for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1270
            )
1271
            self.urlwatcher.ssdb_storage.close()
8✔
1272
            self._exit(0)
8✔
1273

1274
        if self.urlwatch_config.clean_database:
8✔
1275
            self.urlwatcher.ssdb_storage.clean_ssdb(
8!
1276
                [job.guid for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1277
            )
1278
            self.urlwatcher.ssdb_storage.close()
8✔
1279
            self._exit(0)
8✔
1280

1281
        if self.urlwatch_config.rollback_database:
8✔
1282
            exit_arg = self.rollback_database(self.urlwatch_config.rollback_database)
8✔
1283
            self.urlwatcher.ssdb_storage.close()
8✔
1284
            self._exit(exit_arg)
8✔
1285

1286
        if self.urlwatch_config.delete_snapshot:
8✔
1287
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
8✔
1288

1289
        if self.urlwatch_config.features:
8✔
1290
            self._exit(self.show_features())
8✔
1291

1292
        if self.urlwatch_config.detailed_versions:
8!
1293
            self._exit(self.show_detailed_versions())
8✔
1294

1295
    def run(self) -> None:  # pragma: no cover
1296
        """The main run logic."""
1297
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1298
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1299

1300
        self.handle_actions()
1301

1302
        self.urlwatcher.run_jobs()
1303

1304
        self.urlwatcher.close()
1305

1306
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc