• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 8292053873

15 Mar 2024 06:07AM UTC coverage: 73.083% (+0.09%) from 72.995%
8292053873

push

github

mborsetti
Version 3.20rc0

1468 of 2275 branches covered (64.53%)

Branch coverage included in aggregate %.

21 of 32 new or added lines in 5 files covered. (65.63%)

2 existing lines in 2 files now uncovered.

3737 of 4847 relevant lines covered (77.1%)

6.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.76
/webchanges/command.py
1
"""Take actions from command line arguments."""
2

3
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
4

5
from __future__ import annotations
8✔
6

7
import contextlib
8✔
8
import difflib
8✔
9
import email.utils
8✔
10
import importlib.metadata
8✔
11
import logging
8✔
12
import os
8✔
13
import platform
8✔
14
import shutil
8✔
15
import sqlite3
8✔
16
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
17
import sys
8✔
18
import time
8✔
19
import traceback
8✔
20
from concurrent.futures import ThreadPoolExecutor
8✔
21
from datetime import datetime
8✔
22
from pathlib import Path
8✔
23
from typing import Generator, Iterable, Optional, TYPE_CHECKING, Union
8✔
24
from zoneinfo import ZoneInfo
8✔
25

26
try:
8✔
27
    import httpx
8✔
28
except ImportError:
29
    httpx = None  # type: ignore[assignment]
30
    import requests
31

32
if httpx is not None:
8!
33
    try:
8✔
34
        import h2
8✔
35
    except ImportError:
36
        h2 = None  # type: ignore[assignment]
37

38

39
try:
8✔
40
    import apt
8✔
41
except ImportError:
42
    apt = None  # type: ignore[assignment]
43

44
try:
8✔
45
    from pip._internal.metadata import get_default_environment
8✔
46
except ImportError:
47
    get_default_environment = None  # type: ignore[assignment]
48

49
try:
8✔
50
    from playwright.sync_api import sync_playwright
8✔
51
except ImportError:
52
    sync_playwright = None  # type: ignore[assignment]
53

54
try:
8✔
55
    import psutil
8✔
56
    from psutil._common import bytes2human
8✔
57
except ImportError:
58
    psutil = None  # type: ignore[assignment]
59
    bytes2human = None  # type: ignore[assignment]
60

61
from webchanges import __docs_url__, __project_name__, __version__
8✔
62
from webchanges.filters import FilterBase
8✔
63
from webchanges.handler import JobState, Report, SnapshotShort
8✔
64
from webchanges.jobs import BrowserJob, JobBase, NotModifiedError, UrlJob
8✔
65
from webchanges.mailer import smtp_have_password, smtp_set_password, SMTPMailer
8✔
66
from webchanges.main import Urlwatch
8✔
67
from webchanges.reporters import ReporterBase, xmpp_have_password, xmpp_set_password
8✔
68
from webchanges.util import dur_text, edit_file, import_module_from_source
8✔
69

70
logger = logging.getLogger(__name__)
8✔
71

72
if TYPE_CHECKING:
73
    from webchanges.reporters import _ConfigReportersList
74
    from webchanges.storage import _ConfigReportEmail, _ConfigReportEmailSmtp, _ConfigReportTelegram, _ConfigReportXmpp
75

76

77
class UrlwatchCommand:
8✔
78
    """The class that runs the program after initialization and CLI arguments parsing."""
79

80
    def __init__(self, urlwatcher: Urlwatch) -> None:
8✔
81
        self.urlwatcher = urlwatcher
8✔
82
        self.urlwatch_config = urlwatcher.urlwatch_config
8✔
83

84
    @staticmethod
8✔
85
    def _exit(arg: Union[str, int, None]) -> None:
8✔
86
        logger.info(f'Exiting with exit code {arg}')
8✔
87
        sys.exit(arg)
8✔
88

89
    def edit_hooks(self) -> int:
8✔
90
        """Edit hooks file.
91

92
        :returns: 0 if edit is successful, 1 otherwise.
93
        """
94
        # Similar code to BaseTextualFileStorage.edit()
95
        logger.debug(f'Edit file {self.urlwatch_config.hooks_file}')
8✔
96
        # Python 3.9: hooks_edit = self.urlwatch_config.hooks.with_stem(self.urlwatch_config.hooks.stem + '_edit')
97
        hooks_edit = self.urlwatch_config.hooks_file.parent.joinpath(
8✔
98
            self.urlwatch_config.hooks_file.stem + '_edit' + ''.join(self.urlwatch_config.hooks_file.suffixes)
99
        )
100
        if self.urlwatch_config.hooks_file.exists():
8!
101
            shutil.copy(self.urlwatch_config.hooks_file, hooks_edit)
8✔
102
        # elif self.urlwatch_config.hooks_py_example is not None and os.path.exists(
103
        #         self.urlwatch_config.hooks_py_example):
104
        #     shutil.copy(self.urlwatch_config.hooks_py_example, hooks_edit, follow_symlinks=False)
105

106
        while True:
6✔
107
            try:
8✔
108
                edit_file(hooks_edit)
8✔
109
                import_module_from_source('hooks', hooks_edit)
8✔
110
                break  # stop if no exception on parser
8✔
111
            except SystemExit:
8!
112
                raise
×
113
            except Exception as e:
8✔
114
                print('Parsing failed:')
8✔
115
                print('======')
8✔
116
                print(e)
8✔
117
                print('======')
8✔
118
                print('')
8✔
119
                print(f'The file {self.urlwatch_config.hooks_file} was NOT updated.')
8✔
120
                user_input = input('Do you want to retry the same edit? (Y/n)')
8✔
121
                if not user_input or user_input.lower()[0] == 'y':
×
122
                    continue
×
123
                hooks_edit.unlink()
×
124
                print('No changes have been saved.')
×
125
                return 1
2✔
126

127
        if self.urlwatch_config.hooks_file.is_symlink():
8!
128
            self.urlwatch_config.hooks_file.write_text(hooks_edit.read_text())
×
129
        else:
130
            hooks_edit.replace(self.urlwatch_config.hooks_file)
8✔
131
        hooks_edit.unlink(missing_ok=True)
8✔
132
        print(f'Saved edits in {self.urlwatch_config.hooks_file}')
8✔
133
        return 0
8✔
134

135
    @staticmethod
8✔
136
    def show_features() -> int:
8✔
137
        """
138
        Prints the "features", i.e. a list of job types, filters and reporters.
139

140
        :return: 0.
141
        """
142
        print(f'Please see full documentation at {__docs_url__}')
8✔
143
        print()
8✔
144
        print('Supported jobs:\n')
8✔
145
        print(JobBase.job_documentation())
8✔
146
        print('Supported filters:\n')
8✔
147
        print(FilterBase.filter_documentation())
8✔
148
        print()
8✔
149
        print('Supported reporters:\n')
8✔
150
        print(ReporterBase.reporter_documentation())
8✔
151
        print()
8✔
152
        print(f'Please see full documentation at {__docs_url__}')
8✔
153

154
        return 0
8✔
155

156
    @staticmethod
8✔
157
    def show_detailed_versions() -> int:
8✔
158
        """
159
        Prints the detailed versions, including of dependencies.
160

161
        :return: 0.
162
        """
163

164
        def dependencies() -> list[str]:
×
165
            if get_default_environment is not None:
×
166
                env = get_default_environment()
×
167
                dist = None
×
168
                for dist in env.iter_all_distributions():
×
169
                    if dist.canonical_name == __project_name__:
×
170
                        break
×
171
                if dist and dist.canonical_name == __project_name__:
×
172
                    return sorted(set(d.split()[0] for d in dist.metadata_dict['requires_dist']), key=str.lower)
×
173

174
            # default list of all possible dependencies
175
            logger.info(f'Found no pip distribution for {__project_name__}; returning all possible dependencies.')
×
176
            return [
×
177
                'aioxmpp',
178
                'beautifulsoup4',
179
                'chump',
180
                'colorama',
181
                'cryptography',
182
                'cssbeautifier',
183
                'cssselect',
184
                'deepdiff',
185
                'h2',
186
                'html2text',
187
                'httpx',
188
                'jq',
189
                'jsbeautifier',
190
                'keyring',
191
                'lxml',
192
                'markdown2',
193
                'matrix_client',
194
                'msgpack',
195
                'pdftotext',
196
                'Pillow',
197
                'platformdirs',
198
                'playwright',
199
                'psutil',
200
                'pushbullet.py',
201
                'pypdf',
202
                'pytesseract',
203
                'pyyaml',
204
                'redis',
205
                'requests',
206
                'tzdata',
207
                'vobject',
208
            ]
209

210
        print('Software:')
×
211
        print(f'• {__project_name__}: {__version__}')
×
212
        print(
×
213
            f'• {platform.python_implementation()}: {platform.python_version()} '
214
            f'{platform.python_build()} {platform.python_compiler()}'
215
        )
216
        print(f'• SQLite: {sqlite3.sqlite_version}')
×
217

218
        if psutil:
×
219
            print()
×
220
            print('System:')
×
221
            print(f'• Platform: {platform.platform()}, {platform.machine()}')
×
222
            print(f'• Processor: {platform.processor()}')
×
223
            print(f'• CPUs (logical): {psutil.cpu_count()}')
×
224
            try:
×
225
                virt_mem = psutil.virtual_memory().available
×
226
                print(
×
227
                    f'• Free memory: {bytes2human(virt_mem)} physical plus '
228
                    f'{bytes2human(psutil.swap_memory().free)} swap.'
229
                )
230
            except psutil.Error as e:  # pragma: no cover
231
                print(f'• Free memory: Could not read information: {e}')
232
            print(
×
233
                f"• Free disk '/': {bytes2human(psutil.disk_usage('/').free)} "
234
                f"({100 - psutil.disk_usage('/').percent:.1f}%)"
235
            )
236

237
        print()
×
238
        print('Installed PyPi dependencies:')
×
239
        for module_name in dependencies():
×
240
            try:
×
241
                mod = importlib.metadata.distribution(module_name)
×
242
            except ModuleNotFoundError:
×
243
                continue
×
244
            print(f'• {module_name}: {mod.version}')
×
245
            # package requirements
246
            if mod.requires:
×
247
                for req_name in [i.split()[0] for i in mod.requires]:
×
248
                    try:
×
249
                        req = importlib.metadata.distribution(req_name)
×
250
                    except ModuleNotFoundError:
×
251
                        continue
×
252
                    print(f'  - {req_name}: {req.version}')
×
253

254
        # playwright
255
        if sync_playwright is not None:
×
256
            with sync_playwright() as p:
×
257
                browser = p.chromium.launch(channel='chrome')
×
258
                print()
×
259
                print('Playwright browser:')
×
260
                print(f'• Name: {browser.browser_type.name}')
×
261
                print(f'• Version: {browser.version}')
×
262
                if psutil:
×
263
                    browser.new_page()
×
264
                    try:
×
265
                        virt_mem = psutil.virtual_memory().available
×
266
                        print(
×
267
                            f'• Free memory with browser loaded: {bytes2human(virt_mem)} physical plus '
268
                            f'{bytes2human(psutil.swap_memory().free)} swap'
269
                        )
270
                    except psutil.Error:
×
271
                        pass
×
272

273
        if os.name == 'posix' and apt:
×
274
            apt_cache = apt.Cache()
×
275

276
            def print_version(libs: list[str]) -> None:
×
277
                for lib in libs:
×
278
                    if lib in apt_cache:
×
279
                        if ver := apt_cache[lib].versions:
×
280
                            print(f'   - {ver[0].package}: {ver[0].version}')
×
281
                return None
×
282

283
            print()
×
284
            print('Installed dpkg dependencies:')
×
285
            for module, apt_dists in (
×
286
                ('jq', ['jq']),
287
                # https://github.com/jalan/pdftotext#os-dependencies
288
                ('pdftotext', ['libpoppler-cpp-dev']),
289
                # https://pillow.readthedocs.io/en/latest/installation.html#external-libraries
290
                (
291
                    'Pillow',
292
                    [
293
                        'libjpeg-dev',
294
                        'zlib-dev',
295
                        'zlib1g-dev',
296
                        'libtiff-dev',
297
                        'libfreetype-dev',
298
                        'littlecms-dev',
299
                        'libwebp-dev',
300
                        'tcl/tk-dev',
301
                        'openjpeg-dev',
302
                        'libimagequant-dev',
303
                        'libraqm-dev',
304
                        'libxcb-dev',
305
                        'libxcb1-dev',
306
                    ],
307
                ),
308
                ('playwright', ['google-chrome-stable']),
309
                # https://tesseract-ocr.github.io/tessdoc/Installation.html
310
                ('pytesseract', ['tesseract-ocr']),
311
            ):
312
                try:
×
313
                    importlib.metadata.distribution(module)
×
314
                    print(f'• {module}')
×
315
                    print_version(apt_dists)
×
316
                except importlib.metadata.PackageNotFoundError:
×
317
                    pass
×
318
        return 0
×
319

320
    def list_jobs(self) -> None:
8✔
321
        """
322
        Lists the job and their respective _index_number.
323

324
        :return: None.
325
        """
326
        for job in self.urlwatcher.jobs:
8✔
327
            if self.urlwatch_config.verbose:
8✔
328
                print(f'{job.index_number:3}: {job!r}')
8✔
329
            else:
330
                pretty_name = job.pretty_name()
8✔
331
                location = job.get_location()
8✔
332
                if pretty_name != location:
8!
333
                    print(f'{job.index_number:3}: {pretty_name} ({location})')
8✔
334
                else:
335
                    print(f'{job.index_number:3}: {pretty_name}')
×
336
        if len(self.urlwatch_config.jobs_files) > 1:
8!
337
            jobs_files = ['Jobs files concatenated:'] + [f'• {file}' for file in self.urlwatch_config.jobs_files]
×
338
        elif len(self.urlwatch_config.jobs_files) == 1:
8!
339
            jobs_files = [f'Jobs file: {self.urlwatch_config.jobs_files[0]}']
8✔
340
        else:
341
            jobs_files = []
×
342
        print('\n   '.join(jobs_files))
8✔
343

344
    def _find_job(self, query: Union[str, int]) -> Optional[JobBase]:
8✔
345
        try:
8✔
346
            index = int(query)
8✔
347
            if index == 0:
8✔
348
                return None
8✔
349
            try:
8✔
350
                if index <= 0:
8!
351
                    return self.urlwatcher.jobs[index]
×
352
                else:
353
                    return self.urlwatcher.jobs[index - 1]
8✔
354
            except IndexError:
8✔
355
                return None
8✔
356
        except ValueError:
8✔
357
            return next((job for job in self.urlwatcher.jobs if job.get_location() == query), None)
8✔
358

359
    def _get_job(self, job_id: Union[str, int]) -> JobBase:
8✔
360
        """
361
        Finds the job based on job_id, which could match an index, be a range, or match a url or command field.
362

363
        :param job_id:
364
        :return: JobBase.
365
        :raises SystemExit: If job is not found, setting argument to 1.
366
        """
367
        try:
8✔
368
            job_id = int(job_id)
8✔
369
            if job_id < 0:
8✔
370
                job_id = len(self.urlwatcher.jobs) + job_id + 1
8✔
371
        except ValueError:
×
372
            pass
×
373
        job = self._find_job(job_id)
8✔
374
        if job is None:
8✔
375
            print(f'Job not found: {job_id}')
8✔
376
            raise SystemExit(1)
8✔
377
        return job.with_defaults(self.urlwatcher.config_storage.config)
8✔
378

379
    def test_job(self, job_id: Union[bool, str, int]) -> None:
8✔
380
        """
381
        Tests the running of a single job outputting the filtered text to stdout or whatever reporter is selected with
382
        --test-reporter.  If job_id is True, don't run any jobs as it's a test of loading config, jobs and hook files
383
        for syntax.
384

385
        :param job_id: The job_id or True.
386

387
        :return: None.
388

389
        :raises Exception: The Exception when raised by a job. loading of hooks files, etc.
390
        """
391
        if job_id is True:
8!
392
            message = [f'No syntax errors in config file {self.urlwatch_config.config_file}']
×
393
            conj = ',\n' if 'hooks' in sys.modules else '\nand '
×
394
            if len(self.urlwatch_config.jobs_files) == 1:
×
395
                message.append(f'{conj}jobs file {self.urlwatch_config.jobs_files[0]}')
×
396
            else:
397
                message.append(
×
398
                    '\n   '.join(
399
                        [f'{conj}jobs files'] + [f'• {file}' for file in sorted(self.urlwatch_config.jobs_files)]
400
                    )
401
                )
402
            if 'hooks' in sys.modules:
×
403
                message.append(f",\nand hooks file {sys.modules['hooks'].__file__}")
×
404
            print(f"{''.join(message)}.")
×
405
            return
×
406

407
        job = self._get_job(job_id)
8✔
408
        start = time.perf_counter()
8✔
409

410
        if isinstance(job, UrlJob):
8!
411
            # Force re-retrieval of job, as we're testing filters
412
            job.ignore_cached = True
×
413

414
        # Add defaults, as if when run
415
        job = job.with_defaults(self.urlwatcher.config_storage.config)
8✔
416

417
        with JobState(self.urlwatcher.cache_storage, job) as job_state:
8✔
418
            job_state.process(headless=not self.urlwatch_config.no_headless)
8✔
419
            duration = time.perf_counter() - start
8✔
420
            if job_state.exception is not None:
8!
421
                raise job_state.exception
×
422
            print(job_state.job.pretty_name())
8✔
423
            print('-' * len(job_state.job.pretty_name()))
8✔
424
            if job_state.job.note:
8!
425
                print(job_state.job.note)
×
426
            print()
8✔
427
            print(job_state.new_data)
8✔
428
            print()
8✔
429
            print('--')
8✔
430
            print(f'Job tested in {dur_text(duration)} with {__project_name__} {__version__}.')
8✔
431

432
        return
8✔
433

434
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
435
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
436

437
    def test_diff(self, job_id: str) -> int:
8✔
438
        """
439
        Runs diffs for a job on all the saved snapshots and outputs the result to stdout or whatever reporter is
440
        selected with --test-reporter.
441

442
        :param job_id: The job_id.
443
        :return: 1 if error, 0 if successful.
444
        """
445
        report = Report(self.urlwatcher)
8✔
446
        self.urlwatch_config.jobs_files = [Path('--test-diff')]  # for report footer
8✔
447
        job = self._get_job(job_id)
8✔
448

449
        # TODO: The below is a hack; must find whether it's markdown programmatically (e.g. save it in database)
450
        if job.filter:
8!
451
            job.is_markdown = any('html2text' in filter_type for filter_type in job.filter)
×
452

453
        history_data = self.urlwatcher.cache_storage.get_history_snapshots(job.get_guid())
8✔
454

455
        num_snapshots = len(history_data)
8✔
456
        if num_snapshots == 0:
8✔
457
            print('This job has never been run before.')
8✔
458
            return 1
8✔
459
        elif num_snapshots < 2:
8✔
460
            print('Not enough historic data available (need at least 2 different snapshots).')
8✔
461
            return 1
8✔
462

463
        if job.compared_versions and job.compared_versions != 1:
8!
464
            print(f"Note: The job's 'compared_versions' directive is set to {job.compared_versions}.")
×
465

466
        for i in range(num_snapshots - 1):
8✔
467
            with JobState(self.urlwatcher.cache_storage, job) as job_state:
8✔
468
                job_state.new_data = history_data[i].data
8✔
469
                job_state.new_timestamp = history_data[i].timestamp
8✔
470
                job_state.new_etag = history_data[i].etag
8✔
471
                if not job.compared_versions or job.compared_versions == 1:
8!
472
                    job_state.old_data = history_data[i + 1].data
8✔
473
                    job_state.old_timestamp = history_data[i + 1].timestamp
8✔
474
                    job_state.old_etag = history_data[i + 1].etag
8✔
475
                else:
476
                    history_dic_snapshots = {
×
477
                        s.data: SnapshotShort(s.timestamp, s.tries, s.etag)
478
                        for s in history_data[i + 1 : i + 1 + job.compared_versions]
479
                    }
480
                    close_matches: list[str] = difflib.get_close_matches(
×
481
                        job_state.new_data, history_dic_snapshots.keys(), n=1
482
                    )
483
                    if close_matches:
×
484
                        job_state.old_data = close_matches[0]
×
485
                        job_state.old_timestamp = history_dic_snapshots[close_matches[0]].timestamp
×
486
                        job_state.old_etag = history_dic_snapshots[close_matches[0]].etag
×
487

488
                # TODO: setting of job_state.job.is_markdown = True when it had been set by a filter.
489
                # Ideally it should be saved as an attribute when saving "data".
490
                if self.urlwatch_config.test_reporter is None:
8✔
491
                    self.urlwatch_config.test_reporter = 'stdout'  # default
8✔
492
                report.job_states = []  # required
8✔
493
                if job_state.new_data == job_state.old_data:
8!
494
                    label = (
×
495
                        f'No change (snapshots {-i:2} AND {-(i + 1):2}) with '
496
                        f"'compared_versions: {job.compared_versions}'"
497
                    )
498
                else:
499
                    label = f'Filtered diff (snapshots {-i:2} and {-(i + 1):2})'
8✔
500
                errorlevel = self.check_test_reporter(job_state, label=label, report=report)
8✔
501
                if errorlevel:
8!
502
                    self._exit(errorlevel)
×
503

504
        # We do not save the job state or job on purpose here, since we are possibly modifying the job
505
        # (ignore_cached) and we do not want to store the newly-retrieved data yet (filter testing)
506

507
        return 0
8✔
508

509
    def dump_history(self, job_id: str) -> int:
8✔
510
        job = self._get_job(job_id)
8✔
511
        history_data = self.urlwatcher.cache_storage.get_history_snapshots(job.get_guid())
8✔
512

513
        print(f'History for job {job.get_indexed_location()}:')
8✔
514
        print(f'(ID: {job.get_guid()})')
8✔
515
        total_failed = 0
8✔
516
        if history_data:
8✔
517
            print('=' * 50)
8✔
518
        for i, snapshot in enumerate(history_data):
8✔
519
            etag = f'; ETag: {snapshot[3]}' if snapshot[3] else ''
8✔
520
            tries = f'; error run (number {snapshot[2]})' if snapshot[2] else ''
8✔
521
            total_failed += snapshot[2] > 0
8✔
522
            tz = self.urlwatcher.report.config['report']['tz']
8✔
523
            tzinfo = ZoneInfo(tz) if tz else datetime.now().astimezone().tzinfo  # from machine
8✔
524
            dt = datetime.fromtimestamp(snapshot[1], tzinfo)
8✔
525
            header = f'{i + 1}) {email.utils.format_datetime(dt)}{etag}{tries}'
8✔
526
            sep_len = max(50, len(header))
8✔
527
            print(header)
8✔
528
            print('-' * sep_len)
8✔
529
            print(snapshot[0])
8✔
530
            print('=' * sep_len, '\n')
8✔
531

532
        print(
8✔
533
            f'Found {len(history_data) - total_failed}'
534
            + (' good' if total_failed else '')
535
            + ' snapshot'
536
            + ('s' if len(history_data) - total_failed != 1 else '')
537
            + (f' and {total_failed} error capture' + ('s' if total_failed != 1 else '') if total_failed else '')
538
            + '.'
539
        )
540

541
        return 0
8✔
542

543
    def list_error_jobs(self) -> int:
8✔
544
        if self.urlwatch_config.errors not in ReporterBase.__subclasses__:
8✔
545
            print(f'Invalid reporter {self.urlwatch_config.errors}')
8✔
546
            return 1
8✔
547

548
        def error_jobs_lines(jobs: Iterable[JobBase]) -> Generator[str, None, None]:
8✔
549
            """A generator that outputs error text for jobs who fail with an exception or yield no data.
550

551
            Do not use it to test newly modified jobs since it does conditional requests on the websites (i.e. uses
552
            stored data if the website reports no changes in the data since the last time it downloaded it -- see
553
            https://developer.mozilla.org/en-US/docs/Web/HTTP/Conditional_requests).
554
            """
555
            with contextlib.ExitStack() as stack:
8✔
556
                max_workers = min(32, os.cpu_count() or 1) if any(isinstance(job, BrowserJob) for job in jobs) else None
8✔
557
                logger.debug(f'Max_workers set to {max_workers}')
8✔
558
                executor = ThreadPoolExecutor(max_workers=max_workers)
8✔
559

560
                for job_state in executor.map(
8✔
561
                    lambda jobstate: jobstate.process(headless=not self.urlwatch_config.no_headless),
562
                    (stack.enter_context(JobState(self.urlwatcher.cache_storage, job)) for job in jobs),
563
                ):
564
                    if job_state.exception is None or isinstance(job_state.exception, NotModifiedError):
8✔
565
                        if (
8!
566
                            len(job_state.new_data.strip()) == 0
567
                            if hasattr(job_state, 'new_data')
568
                            else len(job_state.old_data.strip()) == 0
569
                        ):
570
                            if self.urlwatch_config.verbose:
×
571
                                yield (f'{job_state.job.index_number:3}: No data: {job_state.job!r}')
×
572
                            else:
573
                                pretty_name = job_state.job.pretty_name()
×
574
                                location = job_state.job.get_location()
×
575
                                if pretty_name != location:
×
576
                                    yield (f'{job_state.job.index_number:3}: No data: {pretty_name} ({location})')
×
577
                                else:
578
                                    yield (f'{job_state.job.index_number:3}: No data: {pretty_name}')
×
579
                    else:
580
                        pretty_name = job_state.job.pretty_name()
8✔
581
                        location = job_state.job.get_location()
8✔
582
                        if pretty_name != location:
8!
583
                            yield (
8✔
584
                                f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name} '
585
                                f'({location})'
586
                            )
587
                        else:
588
                            yield (f'{job_state.job.index_number:3}: Error "{job_state.exception}": {pretty_name})')
×
589

590
        start = time.perf_counter()
8✔
591
        if len(self.urlwatch_config.jobs_files) == 1:
8!
592
            jobs_files = [f'in jobs file {self.urlwatch_config.jobs_files[0]}:']
8✔
593
        else:
594
            jobs_files = ['in the concatenation of the jobs files'] + [
×
595
                f'• {file}' for file in self.urlwatch_config.jobs_files
596
            ]
597
        header = '\n   '.join(['Jobs with errors or returning no data (after unmodified filters, if any)'] + jobs_files)
8✔
598

599
        # extract subset of jobs to run if joblist CLI was set
600
        if self.urlwatcher.urlwatch_config.joblist:
8!
601
            for idx in self.urlwatcher.urlwatch_config.joblist:
×
602
                if not (-len(self.urlwatcher.jobs) <= idx <= -1 or 1 <= idx <= len(self.urlwatcher.jobs)):
×
603
                    raise IndexError(f'Job index {idx} out of range (found {len(self.urlwatcher.jobs)} jobs).')
×
604
            self.urlwatcher.urlwatch_config.joblist = [
×
605
                jn if jn > 0 else len(self.urlwatcher.jobs) + jn + 1 for jn in self.urlwatcher.urlwatch_config.joblist
606
            ]
607
            jobs = [
×
608
                job.with_defaults(self.urlwatcher.config_storage.config)
609
                for job in self.urlwatcher.jobs
610
                if job.index_number in self.urlwatcher.urlwatch_config.joblist
611
            ]
612
            logger.debug(
×
613
                f"Processing {len(jobs)} job{'s' if len(jobs) else ''} as specified in command line: # "
614
                f"{', '.join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}"
615
            )
616
            header += (
×
617
                '\n'
618
                f"Processing {len(jobs)} job{'s' if len(jobs) else ''} as specified in command line: # "
619
                f"{', '.join(str(j) for j in self.urlwatcher.urlwatch_config.joblist)}"
620
            )
621
        else:
622
            jobs = [job.with_defaults(self.urlwatcher.config_storage.config) for job in self.urlwatcher.jobs]
8✔
623
            logger.debug(f"Processing {len(jobs)} job{'s' if len(jobs) else ''}")
8✔
624

625
        if self.urlwatch_config.errors == 'stdout':
8!
626
            print(header)
8✔
627
            for line in error_jobs_lines(jobs):
8✔
628
                print(line)
8✔
629
            print('--')
8✔
630
            duration = time.perf_counter() - start
8✔
631
            print(f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
8✔
632

633
        else:
634
            message = '\n'.join(error_jobs_lines(jobs))
×
635
            if message:
×
636
                # create a dummy job state to run a reporter on
637
                job_state = JobState(
×
638
                    None,  # type: ignore[arg-type]
639
                    JobBase.unserialize({'command': f'{__project_name__} --errors'}),
640
                )
641
                job_state.traceback = f'{header}\n{message}'
×
642
                duration = time.perf_counter() - start
×
NEW
643
                self.urlwatcher.report.config['footnote'] = (
×
644
                    f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}."
645
                )
646
                self.urlwatcher.report.config['report']['html']['footer'] = False
×
647
                self.urlwatcher.report.config['report']['markdown']['footer'] = False
×
648
                self.urlwatcher.report.config['report']['text']['footer'] = False
×
649
                self.urlwatcher.report.error(job_state)
×
650
                self.urlwatcher.report.finish_one(self.urlwatch_config.errors, check_enabled=False)
×
651
            else:
652
                print(header)
×
653
                print('--')
×
654
                duration = time.perf_counter() - start
×
655
                print('Found no errors')
×
656
                print(f"Checked {len(jobs)} job{'s' if len(jobs) else ''} for errors in {dur_text(duration)}.")
×
657

658
        return 0
8✔
659

660
    def delete_snapshot(self, job_id: Union[str, int]) -> int:
8✔
661
        job = self._get_job(job_id)
8✔
662

663
        deleted = self.urlwatcher.cache_storage.delete_latest(job.get_guid())
8✔
664
        if deleted:
8✔
665
            print(f'Deleted last snapshot of {job.get_indexed_location()}')
8✔
666
            return 0
8✔
667
        else:
668
            print(f'No snapshots found to be deleted for {job.get_indexed_location()}')
8✔
669
            return 1
8✔
670

671
    def modify_urls(self) -> int:
8✔
672
        if self.urlwatch_config.delete is not None:
8✔
673
            job = self._find_job(self.urlwatch_config.delete)
8✔
674
            if job is not None:
8!
675
                self.urlwatcher.jobs.remove(job)
8✔
676
                print(f'Removed {job}')
8✔
677
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
678
            else:
679
                print(f'Job not found: {self.urlwatch_config.delete}')
×
680
                return 1
×
681

682
        if self.urlwatch_config.add is not None:
8✔
683
            # Allow multiple specifications of filter=, so that multiple filters can be specified on the CLI
684
            items = [item.split('=', 1) for item in self.urlwatch_config.add.split(',')]
8✔
685
            filters = [v for k, v in items if k == 'filter']
8✔
686
            items2 = [(k, v) for k, v in items if k != 'filter']
8✔
687
            d = {k: v for k, v in items2}
8✔
688
            if filters:
8!
689
                d['filter'] = ','.join(filters)
×
690

691
            job = JobBase.unserialize(d)
8✔
692
            print(f'Adding {job}')
8✔
693
            self.urlwatcher.jobs.append(job)
8✔
694
            self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
695

696
        if self.urlwatch_config.change_location is not None:
8✔
697
            new_loc = self.urlwatch_config.change_location[1]
8✔
698
            # Ensure the user isn't overwriting an existing job with the change.
699
            if new_loc in (j.get_location() for j in self.urlwatcher.jobs):
8!
700
                print(
×
701
                    f'The new location "{new_loc}" already exists for a job. Delete the existing job or choose a '
702
                    f'different value.\n'
703
                    f'Hint: you have to run --change-location before you update the jobs.yaml file!'
704
                )
705
                return 1
×
706
            else:
707
                job = self._find_job(self.urlwatch_config.change_location[0])
8✔
708
                if job is not None:
8✔
709
                    # Update the job's location (which will also update the guid) and move any history in the database
710
                    # over to the job's updated guid.
711
                    old_loc = job.get_location()
8✔
712
                    print(f'Moving location of "{old_loc}" to "{new_loc}"')
8✔
713
                    old_guid = job.get_guid()
8✔
714
                    if old_guid not in self.urlwatcher.cache_storage.get_guids():
8✔
715
                        print(f'No snapshots found for "{old_loc}"')
8✔
716
                        return 1
8✔
717
                    job.set_base_location(new_loc)
8✔
718
                    num_searched = self.urlwatcher.cache_storage.move(old_guid, job.get_guid())
8✔
719
                    if num_searched:
8!
720
                        print(f'Searched through {num_searched:,} snapshots and moved "{old_loc}" to "{new_loc}"')
8✔
721
                else:
722
                    print(f'Job not found: "{self.urlwatch_config.change_location[0]}"')
8✔
723
                    return 1
8✔
724
            message = 'Do you want me to update the jobs file (remarks will be lost)? [y/N] '
8✔
725
            if not input(message).lower().startswith('y'):
8!
726
                print(f'Please update the jobs file to reflect "{new_loc}".')
×
727
            else:
728
                self.urlwatcher.jobs_storage.save(self.urlwatcher.jobs)
8✔
729

730
        return 0
8✔
731

732
    def edit_config(self) -> int:
8✔
733
        result = self.urlwatcher.config_storage.edit()
8✔
734
        return result
8✔
735

736
    def check_telegram_chats(self) -> None:
8✔
737
        config: _ConfigReportTelegram = self.urlwatcher.config_storage.config['report']['telegram']
8✔
738

739
        bot_token = config['bot_token']
8✔
740
        if not bot_token:
8✔
741
            print('You need to set up your bot token first (see documentation)')
8✔
742
            self._exit(1)
8✔
743

744
        if httpx:
8!
745
            get_client = httpx.Client(http2=h2 is not None).get
8✔
746
        else:
747
            get_client = requests.get  # type: ignore[assignment]
×
748

749
        info = get_client(f'https://api.telegram.org/bot{bot_token}/getMe', timeout=60).json()
8✔
750
        if not info['ok']:
8!
751
            print(f"Error with token {bot_token}: {info['description']}")
8✔
752
            self._exit(1)
8✔
753

754
        chats = {}
×
755
        updates = get_client(f'https://api.telegram.org/bot{bot_token}/getUpdates', timeout=60).json()
×
756
        if 'result' in updates:
×
757
            for chat_info in updates['result']:
×
758
                chat = chat_info['message']['chat']
×
759
                if chat['type'] == 'private':
×
760
                    chats[chat['id']] = (
×
761
                        ' '.join((chat['first_name'], chat['last_name'])) if 'last_name' in chat else chat['first_name']
762
                    )
763

764
        if not chats:
×
765
            print(f"No chats found. Say hello to your bot at https://t.me/{info['result']['username']}")
×
766
            self._exit(1)
×
767

768
        headers = ('Chat ID', 'Name')
×
769
        maxchat = max(len(headers[0]), max((len(k) for k, v in chats.items()), default=0))
×
770
        maxname = max(len(headers[1]), max((len(v) for k, v in chats.items()), default=0))
×
771
        fmt = f'%-{maxchat}s  %s'
×
772
        print(fmt % headers)
×
773
        print(fmt % ('-' * maxchat, '-' * maxname))
×
774
        for k, v in sorted(chats.items(), key=lambda kv: kv[1]):
×
775
            print(fmt % (k, v))
×
776
        print(f"\nChat up your bot here: https://t.me/{info['result']['username']}")
×
777

778
        self._exit(0)
×
779

780
    def check_test_reporter(
8✔
781
        self,
782
        job_state: Optional[JobState] = None,
783
        label: str = 'test',
784
        report: Optional[Report] = None,
785
    ) -> int:
786
        """
787
        Tests a reporter.
788

789
        :param job_state: The JobState (Optional).
790
        :param label: The label to be used in the report; defaults to 'test'.
791
        :param report: A Report class to use for testing (Optional).
792
        :return: 0 if successful, 1 otherwise.
793
        """
794

795
        def build_job(job_name: str, url: str, old: str, new: str) -> JobState:
8✔
796
            """Builds a pseudo-job for the reporter to run on."""
797
            job = JobBase.unserialize({'name': job_name, 'url': url})
8✔
798

799
            # Can pass in None for cache_storage, as we are not going to load or save the job state for
800
            # testing; also no need to use it as context manager, since no processing is called on the job
801
            job_state = JobState(None, job)  # type: ignore[arg-type]
8✔
802

803
            job_state.old_data = old
8✔
804
            job_state.old_timestamp = 1605147837.511478  # initial release of webchanges!
8✔
805
            job_state.new_data = new
8✔
806
            job_state.new_timestamp = time.time()
8✔
807

808
            return job_state
8✔
809

810
        def set_error(job_state: 'JobState', message: str) -> JobState:
8✔
811
            """Sets a job error message on a JobState."""
812
            try:
8✔
813
                raise ValueError(message)
8✔
814
            except ValueError as e:
8✔
815
                job_state.exception = e
8✔
816
                job_state.traceback = job_state.job.format_error(e, traceback.format_exc())
8✔
817

818
            return job_state
8✔
819

820
        reporter_name = self.urlwatch_config.test_reporter
8✔
821
        if reporter_name not in ReporterBase.__subclasses__:
8✔
822
            print(f'No such reporter: {reporter_name}')
8✔
823
            print(f'\nSupported reporters:\n{ReporterBase.reporter_documentation()}\n')
8✔
824
            return 1
8✔
825

826
        cfg: _ConfigReportersList = self.urlwatcher.config_storage.config['report'][
8✔
827
            reporter_name  # type: ignore[literal-required]
828
        ]
829
        if job_state:  # we want a full report
8✔
830
            cfg['enabled'] = True
8✔
831
            self.urlwatcher.config_storage.config['report']['text']['details'] = True
8✔
832
            self.urlwatcher.config_storage.config['report']['text']['footer'] = True
8✔
833
            self.urlwatcher.config_storage.config['report']['text']['minimal'] = False
8✔
834
            self.urlwatcher.config_storage.config['report']['markdown']['details'] = True
8✔
835
            self.urlwatcher.config_storage.config['report']['markdown']['footer'] = True
8✔
836
            self.urlwatcher.config_storage.config['report']['markdown']['minimal'] = False
8✔
837
        if not cfg['enabled']:
8✔
838
            print(f'WARNING: Reporter being tested is not enabled: {reporter_name}')
8✔
839
            print('Will still attempt to test it, but this may not work')
8✔
840
            print(f'Use {__project_name__} --edit-config to configure reporters')
8✔
841
            cfg['enabled'] = True
8✔
842

843
        if report is None:
8✔
844
            report = Report(self.urlwatcher)
8✔
845

846
        if job_state:
8✔
847
            report.custom(job_state, label)
8✔
848
        else:
849
            report.new(
8✔
850
                build_job(
851
                    'Sample job that was newly added',
852
                    'https://example.com/new',
853
                    '',
854
                    '',
855
                )
856
            )
857
            report.changed(
8✔
858
                build_job(
859
                    'Sample job where something changed',
860
                    'https://example.com/changed',
861
                    'Unchanged Line\nPrevious Content\nAnother Unchanged Line\n',
862
                    'Unchanged Line\nUpdated Content\nAnother Unchanged Line\n',
863
                )
864
            )
865
            report.unchanged(
8✔
866
                build_job(
867
                    'Sample job where nothing changed',
868
                    'http://example.com/unchanged',
869
                    'Same Old, Same Old\n',
870
                    'Same Old, Same Old\n',
871
                )
872
            )
873
            report.error(
8✔
874
                set_error(
875
                    build_job(
876
                        'Sample job where an error was encountered',
877
                        'https://example.com/error',
878
                        '',
879
                        '',
880
                    ),
881
                    'The error message would appear here.',
882
                )
883
            )
884

885
        report.finish_one(reporter_name, jobs_file=self.urlwatch_config.jobs_files)
8✔
886

887
        return 0
8✔
888

889
    def check_smtp_login(self) -> None:
8✔
890
        config: _ConfigReportEmail = self.urlwatcher.config_storage.config['report']['email']
8✔
891
        smtp_config: _ConfigReportEmailSmtp = config['smtp']
8✔
892

893
        success = True
8✔
894

895
        if not config['enabled']:
8!
896
            print('Please enable email reporting in the config first.')
8✔
897
            success = False
8✔
898

899
        if config['method'] != 'smtp':
8!
NEW
900
            print('Please set the method to SMTP for the email reporter.')
×
901
            success = False
×
902

903
        smtp_auth = smtp_config['auth']
8✔
904
        if not smtp_auth:
8!
905
            print('Authentication must be enabled for SMTP.')
×
906
            success = False
×
907

908
        smtp_hostname = smtp_config['host']
8✔
909
        if not smtp_hostname:
8!
910
            print('Please configure the SMTP hostname in the config first.')
×
911
            success = False
×
912

913
        smtp_username = smtp_config['user'] or config['from']
8✔
914
        if not smtp_username:
8!
915
            print('Please configure the SMTP user in the config first.')
8✔
916
            success = False
8✔
917

918
        if not success:
8!
919
            self._exit(1)
8✔
920

921
        insecure_password = smtp_config['insecure_password']
×
922
        if insecure_password:
×
923
            print('The SMTP password is set in the config file (key "insecure_password")')
×
924
        elif smtp_have_password(smtp_hostname, smtp_username):
×
925
            message = f'Password for {smtp_username} / {smtp_hostname} already set, update? [y/N] '
×
926
            if not input(message).lower().startswith('y'):
×
927
                print('Password unchanged.')
×
928
            else:
929
                smtp_set_password(smtp_hostname, smtp_username)
×
930

931
        smtp_port = smtp_config['port']
×
932
        smtp_tls = smtp_config['starttls']
×
933

934
        mailer = SMTPMailer(smtp_username, smtp_hostname, smtp_port, smtp_tls, smtp_auth, insecure_password)
×
935
        print('Trying to log into the SMTP server...')
×
936
        mailer.send(None)
×
937
        print('Successfully logged into SMTP server')
×
938

939
        self._exit(0)
×
940

941
    def check_xmpp_login(self) -> None:
8✔
942
        xmpp_config: _ConfigReportXmpp = self.urlwatcher.config_storage.config['report']['xmpp']
8✔
943

944
        success = True
8✔
945

946
        if not xmpp_config['enabled']:
8!
947
            print('Please enable XMPP reporting in the config first.')
8✔
948
            success = False
8✔
949

950
        xmpp_sender = xmpp_config['sender']
8✔
951
        if not xmpp_sender:
8!
952
            print('Please configure the XMPP sender in the config first.')
8✔
953
            success = False
8✔
954

955
        if not xmpp_config['recipient']:
8!
956
            print('Please configure the XMPP recipient in the config first.')
8✔
957
            success = False
8✔
958

959
        if not success:
8!
960
            self._exit(1)
8✔
961

962
        if 'insecure_password' in xmpp_config:
×
963
            print('The XMPP password is already set in the config (key "insecure_password").')
×
964
            self._exit(0)
×
965

966
        if xmpp_have_password(xmpp_sender):
×
967
            message = f'Password for {xmpp_sender} already set, update? [y/N] '
×
968
            if input(message).lower() != 'y':
×
969
                print('Password unchanged.')
×
970
                self._exit(0)
×
971

972
        if success:
×
973
            xmpp_set_password(xmpp_sender)
×
974

975
        self._exit(0)
×
976

977
    @staticmethod
8✔
978
    def playwright_install_chrome() -> int:  # pragma: no cover
979
        """
980
        Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
981
        install the browser executable.
982

983
        :return: Playwright's executable return code.
984
        """
985
        try:
986
            from playwright._impl._driver import compute_driver_executable
987
        except ImportError:
988
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
989

990
        driver_executable = compute_driver_executable()
991
        env = os.environ.copy()
992
        env['PW_CLI_TARGET_LANG'] = 'python'
993
        cmd = [str(driver_executable), 'install', 'chrome']
994
        logger.info(f"Running playwright CLI: {' '.join(cmd)}")
995
        completed_process = subprocess.run(cmd, env=env, capture_output=True, text=True)  # noqa: S603 subprocess call
996
        if completed_process.returncode:
997
            print(completed_process.stderr)
998
            return completed_process.returncode
999
        if completed_process.stdout:
1000
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
1001
        return 0
1002

1003
    def handle_actions(self) -> None:
8✔
1004
        """Handles the actions for command line arguments and exits."""
1005
        if self.urlwatch_config.list_jobs:
8✔
1006
            self.list_jobs()
8✔
1007
            self._exit(0)
8✔
1008

1009
        if self.urlwatch_config.errors:
8✔
1010
            self._exit(self.list_error_jobs())
8✔
1011

1012
        if self.urlwatch_config.test_job:
8✔
1013
            self.test_job(self.urlwatch_config.test_job)
8✔
1014
            self._exit(0)
8✔
1015

1016
        if self.urlwatch_config.test_diff:
8✔
1017
            self._exit(self.test_diff(self.urlwatch_config.test_diff))
8✔
1018

1019
        if self.urlwatch_config.dump_history:
8✔
1020
            self._exit(self.dump_history(self.urlwatch_config.dump_history))
8✔
1021

1022
        if self.urlwatch_config.add or self.urlwatch_config.delete or self.urlwatch_config.change_location:
8✔
1023
            self._exit(self.modify_urls())
8✔
1024

1025
        if self.urlwatch_config.test_reporter:
8✔
1026
            self._exit(self.check_test_reporter())
8✔
1027

1028
        if self.urlwatch_config.smtp_login:
8✔
1029
            self.check_smtp_login()
8✔
1030

1031
        if self.urlwatch_config.telegram_chats:
8✔
1032
            self.check_telegram_chats()
8✔
1033

1034
        if self.urlwatch_config.xmpp_login:
8✔
1035
            self.check_xmpp_login()
8✔
1036

1037
        if self.urlwatch_config.edit:
8✔
1038
            self._exit(self.urlwatcher.jobs_storage.edit())
8✔
1039

1040
        if self.urlwatch_config.edit_config:
8✔
1041
            self._exit(self.edit_config())
8✔
1042

1043
        if self.urlwatch_config.edit_hooks:
8✔
1044
            self._exit(self.edit_hooks())
8✔
1045

1046
        if self.urlwatch_config.gc_database:
8✔
1047
            self.urlwatcher.cache_storage.gc(
8✔
1048
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.gc_database
1049
            )
1050
            self.urlwatcher.cache_storage.close()
8✔
1051
            self._exit(0)
8✔
1052

1053
        if self.urlwatch_config.clean_database:
8✔
1054
            self.urlwatcher.cache_storage.clean_cache(
8✔
1055
                [job.get_guid() for job in self.urlwatcher.jobs], self.urlwatch_config.clean_database
1056
            )
1057
            self.urlwatcher.cache_storage.close()
8✔
1058
            self._exit(0)
8✔
1059

1060
        if self.urlwatch_config.rollback_database:
8✔
1061
            tz = self.urlwatcher.report.config['report']['tz']
8✔
1062
            self.urlwatcher.cache_storage.rollback_cache(self.urlwatch_config.rollback_database, tz)
8✔
1063
            self.urlwatcher.cache_storage.close()
8✔
1064
            self._exit(0)
8✔
1065

1066
        if self.urlwatch_config.delete_snapshot:
8✔
1067
            self._exit(self.delete_snapshot(self.urlwatch_config.delete_snapshot))
8✔
1068

1069
        if self.urlwatch_config.features:
8!
1070
            self._exit(self.show_features())
8✔
1071

1072
        if self.urlwatch_config.detailed_versions:
×
1073
            self._exit(self.show_detailed_versions())
×
1074

1075
    def run(self) -> None:  # pragma: no cover
1076
        """The main run logic."""
1077
        self.urlwatcher.report.config = self.urlwatcher.config_storage.config
1078
        self.urlwatcher.report.config['footnote'] = self.urlwatch_config.footnote
1079

1080
        self.handle_actions()
1081

1082
        self.urlwatcher.run_jobs()
1083

1084
        self.urlwatcher.close()
1085

1086
        self._exit(0)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc