• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mborsetti / webchanges / 10371336087

13 Aug 2024 02:07PM UTC coverage: 77.832% (-0.2%) from 78.056%
10371336087

push

github

mborsetti
Version 3.25.0rc0

1751 of 2515 branches covered (69.62%)

Branch coverage included in aggregate %.

4446 of 5447 relevant lines covered (81.62%)

6.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.0
/webchanges/cli.py
1
#!/usr/bin/env python3
2

3
"""Module containing the entry point: the function main()."""
4

5
# See config module for the command line arguments.
6

7
# The code below is subject to the license contained in the LICENSE file, which is part of the source code.
8

9
from __future__ import annotations
8✔
10

11
import logging
8✔
12
import os
8✔
13
import platform
8✔
14
import shutil
8✔
15
import signal
8✔
16
import subprocess  # noqa: S404 Consider possible security implications associated with the subprocess module.
8✔
17
import sys
8✔
18
import warnings
8✔
19
from pathlib import Path, PurePath
8✔
20
from typing import Optional, Union
8✔
21

22
import platformdirs
8✔
23

24
from webchanges import __copyright__, __docs_url__, __min_python_version__, __project_name__, __version__
8✔
25
from webchanges.config import CommandConfig
8✔
26
from webchanges.util import file_ownership_checks, get_new_version_number, import_module_from_source
8✔
27

28
# Ignore signal SIGPIPE ("broken pipe") for stdout (see https://github.com/thp/urlwatch/issues/77)
29
if os.name != 'nt':  # Windows does not have signal.SIGPIPE
8!
30
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)  # type: ignore[attr-defined]  # not defined in Windows
8✔
31

32
logger = logging.getLogger(__name__)
8✔
33

34

35
def python_version_warning() -> None:
8✔
36
    """Check if we're running on the minimum Python version supported and if so print and issue a pending deprecation
37
    warning."""
38
    if sys.version_info[0:2] == __min_python_version__:
8✔
39
        current_minor_version = '.'.join(str(n) for n in sys.version_info[0:2])
2✔
40
        next_minor_version = f'{__min_python_version__[0]}.{__min_python_version__[1] + 1}'
2✔
41
        warning = (
2✔
42
            f'Support for Python {current_minor_version} will be ending three years from the date Python '
43
            f'{next_minor_version} was released'
44
        )
45
        print(f'WARNING: {warning}\n')
2✔
46
        PendingDeprecationWarning(warning)
2✔
47

48

49
def migrate_from_legacy(
8✔
50
    legacy_package: str,
51
    config_file: Optional[Path] = None,
52
    jobs_file: Optional[Path] = None,
53
    hooks_file: Optional[Path] = None,
54
    ssdb_file: Optional[Path] = None,
55
) -> None:
56
    """Check for existence of legacy files for configuration, jobs and Python hooks and migrate them (i.e. make a copy
57
    to new folder and/or name). Original files are not deleted.
58

59
    :param legacy_package: The name of the legacy package to migrate (e.g. urlwatch).
60
    :param config_file: The new Path to the configuration file.
61
    :param jobs_file: The new Path to the jobs file.
62
    :param hooks_file: The new Path to the hooks file.
63
    :param ssdb_file: The new Path to the snapshot database file.
64
    """
65
    legacy_project_path = Path.home().joinpath(f'.{legacy_package}')
8✔
66
    leagacy_config_file = legacy_project_path.joinpath(f'{legacy_package}.yaml')
8✔
67
    legacy_urls_file = legacy_project_path.joinpath('urls.yaml')
8✔
68
    legacy_hooks_file = legacy_project_path.joinpath('hooks.py')
8✔
69
    legacy_cache_path = platformdirs.user_cache_path(legacy_package)
8✔
70
    legacy_cache_file = legacy_cache_path.joinpath('cache.db')
8✔
71
    for old_file, new_file in zip(
8✔
72
        (leagacy_config_file, legacy_urls_file, legacy_hooks_file, legacy_cache_file),
73
        (config_file, jobs_file, hooks_file, ssdb_file),
74
    ):
75
        if new_file and old_file.is_file() and not new_file.is_file():
8!
76
            new_file.parent.mkdir(parents=True, exist_ok=True)
×
77
            shutil.copyfile(old_file, new_file)
×
78
            logger.warning(f"Copied {legacy_package} '{old_file}' file to {__project_name__} '{new_file}'.")
×
79
            logger.warning(f"You can safely delete '{old_file}'.")
×
80

81

82
def setup_logger(verbose: Optional[int] = None) -> None:
8✔
83
    """Set up the logger.
84

85
    :param verbose: the verbosity level (1 = INFO, 2 = ERROR).
86
    """
87
    log_level = None
8✔
88
    if verbose is not None:
8!
89
        if verbose >= 3:
×
90
            log_level = 'NOTSET'
×
91
            # https://playwright.dev/python/docs/debug#verbose-api-logs
92
            os.environ['DEBUG'] = 'pw:api pytest -s'
×
93
        if verbose >= 2:
×
94
            log_level = 'DEBUG'
×
95
            # https://playwright.dev/python/docs/debug#verbose-api-logs
96
            os.environ['DEBUG'] = 'pw:api pytest -s'
×
97
        elif verbose == 1:
×
98
            log_level = 'INFO'
×
99

100
    if not verbose:
8!
101
        sys.tracebacklimit = 0
8✔
102

103
    logging.basicConfig(format='%(asctime)s %(module)s[%(thread)s] %(levelname)s: %(message)s', level=log_level)
8✔
104
    logger.info(f'{__project_name__}: {__version__} {__copyright__}')
8✔
105
    logger.info(
8✔
106
        f'{platform.python_implementation()}: {platform.python_version()} '
107
        f'{platform.python_build()} {platform.python_compiler()}'
108
    )
109
    logger.info(f'System: {platform.platform()}')
8✔
110

111

112
def teardown_logger(verbose: Optional[int] = None) -> None:
8✔
113
    """Clean up logging.
114

115
    :param verbose: the verbosity level (1 = INFO, 2 = ERROR).
116
    """
117
    if verbose is not None:
×
118
        if verbose >= 2:
×
119
            # https://playwright.dev/python/docs/debug#verbose-api-logs
120
            os.environ.pop('DEBUG', None)
×
121

122

123
def _expand_jobs_files(filename: Path, default_path: Path, ext: Optional[str] = None) -> list[Path]:
8✔
124
    """Searches for file both as specified and in the default directory, then retries with 'ext' extension if defined.
125

126
    :param filename: The filename.
127
    :param default_path: The default directory.
128
    :param ext: The extension, e.g. '.yaml', to add for searching if first scan fails.
129

130
    :returns: The filename, either original or one with path where found and/or extension.
131
    """
132
    search_filenames = [filename]
8✔
133

134
    # if ext is given, iterate both on raw filename and the filename with ext if different
135
    if ext and filename.suffix != ext:
8!
136
        search_filenames.append(filename.with_suffix(ext))
8✔
137
        # also iterate on file pre-pended with 'jobs-'
138
        search_filenames.append(filename.with_stem(f'jobs-{filename.stem}').with_suffix(ext))
8✔
139

140
    # try as given
141
    for file in search_filenames:
8✔
142
        # https://stackoverflow.com/questions/56311703/globbing-absolute-paths-with-pathlib
143
        file_list = list(Path(file.anchor).glob(str(file.relative_to(file.anchor))))
8✔
144
        if any(f.is_file() for f in file_list):
8!
145
            return file_list
×
146

147
        # no directory specified (and not in current one): add default one
148
        if not file.is_absolute() and not Path(file).parent == Path.cwd():
8!
149
            file_list = list(default_path.glob(str(file)))
8✔
150
            if any(f.is_file() for f in file_list):
8!
151
                return file_list
×
152

153
    # no matches found
154
    return [filename]
8✔
155

156

157
def locate_jobs_files(filenames: list[Path], default_path: Path, ext: Optional[str] = None) -> list[Path]:
8✔
158
    job_files = set()
8✔
159
    for filename in filenames:
8✔
160
        for file in _expand_jobs_files(filename, default_path, ext):
8✔
161
            job_files.add(file)
8✔
162
    return list(job_files)
8✔
163

164

165
def locate_storage_file(filename: Path, default_path: Path, ext: Optional[str] = None) -> Path:
8✔
166
    """Searches for file both as specified and in the default directory, then retries with 'ext' extension if defined.
167

168
    :param filename: The filename.
169
    :param default_path: The default directory.
170
    :param ext: The extension, e.g. '.yaml', to add for searching if first scan fails.
171

172
    :returns: The filename, either original or one with path where found and/or extension.
173
    """
174
    search_filenames = [filename]
8✔
175

176
    # if ext is given, iterate both on raw filename and the filename with ext if different
177
    if ext and filename.suffix != ext:
8!
178
        search_filenames.append(filename.with_suffix(ext))
8✔
179

180
    for file in search_filenames:
8✔
181
        # return if found
182
        if file.is_file():
8!
183
            return file
×
184

185
        # no directory specified (and not in current one): add default one
186
        if file.parent == PurePath('.'):
8!
187
            new_file = default_path.joinpath(file)
8✔
188
            if new_file.is_file():
8!
189
                return new_file
×
190

191
    # no matches found
192
    return filename
8✔
193

194

195
def first_run(command_config: CommandConfig) -> None:
8✔
196
    """Create configuration and jobs files.
197

198
    :param command_config: the CommandConfig containing the command line arguments selected.
199
    """
200
    if not command_config.config_file.is_file():
8!
201
        command_config.config_file.parent.mkdir(parents=True, exist_ok=True)
8✔
202
        from webchanges.storage import YamlConfigStorage
8✔
203

204
        YamlConfigStorage.write_default_config(command_config.config_file)
8✔
205
        print(f'Created default config file at {command_config.config_file}')
8✔
206
        if not command_config.edit_config:
8!
207
            print(f'> Edit it with {__project_name__} --edit-config')
8✔
208
    if not any(f.is_file() for f in command_config.jobs_files):
8!
209
        command_config.jobs_files[0].parent.mkdir(parents=True, exist_ok=True)
8✔
210
        command_config.jobs_files[0].write_text(
8✔
211
            f'# {__project_name__} jobs file. See {__docs_url__}en/stable/jobs.html\n'
212
        )
213
        print(f'Created default jobs file at {command_config.jobs_files[0]}')
8✔
214
        if not command_config.edit:
8!
215
            print(f'> Edit it with {__project_name__} --edit')
8✔
216

217

218
def load_hooks(hooks_file: Path) -> None:
8✔
219
    """Load hooks file."""
220
    if not hooks_file.is_file():
2!
221
        warnings.warn(
2✔
222
            f'Hooks file not imported because {hooks_file} is not a file',
223
            ImportWarning,
224
        )
225
        return
2✔
226

227
    hooks_file_errors = file_ownership_checks(hooks_file)
×
228
    if hooks_file_errors:
×
229
        warnings.warn(
×
230
            f'Hooks file {hooks_file} not imported because '
231
            f" {' and '.join(hooks_file_errors)}.\n"
232
            f'(see {__docs_url__}en/stable/hooks.html#important-note-for-hooks-file)',
233
            ImportWarning,
234
        )
235
    else:
236
        logger.info(f'Importing hooks module from {hooks_file}')
×
237
        import_module_from_source('hooks', hooks_file)
×
238
        logger.info('Finished importing hooks module')
×
239

240

241
def handle_unitialized_actions(urlwatch_config: CommandConfig) -> None:
8✔
242
    """Handles CLI actions that do not require all classes etc. to be initialized (and command.py loaded). For speed
243
    purposes."""
244

245
    def _exit(arg: Union[str, int, None]) -> None:
2✔
246
        logger.info(f'Exiting with exit code {arg}')
2✔
247
        sys.exit(arg)
2✔
248

249
    def print_new_version() -> int:
2✔
250
        """Will print alert message if a newer version is found on PyPi."""
251
        print(f'{__project_name__} {__version__}.', end='')
2✔
252
        new_release = get_new_version_number(timeout=2)
2✔
253
        if new_release:
2!
254
            print(
×
255
                f'\nNew release version {new_release} is available; we recommend updating using e.g. '
256
                f"'pip install -U {__project_name__}'."
257
            )
258
            return 0
×
259
        elif new_release == '':
2!
260
            print(' You are running the latest release.')
2✔
261
            return 0
2✔
262
        else:
263
            print(' Error contacting PyPI to determine the latest release.')
×
264
            return 1
×
265

266
    def playwright_install_chrome() -> int:  # pragma: no cover
267
        """
268
        Replicates playwright.___main__.main() function, which is called by the playwright executable, in order to
269
        install the browser executable.
270

271
        :return: Playwright's executable return code.
272
        """
273
        try:
274
            from playwright._impl._driver import compute_driver_executable
275
        except ImportError:  # pragma: no cover
276
            raise ImportError('Python package playwright is not installed; cannot install the Chrome browser') from None
277

278
        driver_executable = compute_driver_executable()
279
        env = os.environ.copy()
280
        env['PW_CLI_TARGET_LANG'] = 'python'
281
        cmd = [str(driver_executable), 'install', 'chrome']
282
        logger.info(f"Running playwright CLI: {' '.join(cmd)}")
283
        completed_process = subprocess.run(cmd, env=env, capture_output=True, text=True)  # noqa: S603
284
        if completed_process.returncode:
285
            print(completed_process.stderr)
286
            return completed_process.returncode
287
        if completed_process.stdout:
288
            logger.info(f'Success! Output of Playwright CLI: {completed_process.stdout}')
289
        return 0
290

291
    if urlwatch_config.check_new:
2✔
292
        _exit(print_new_version())
2✔
293

294
    if urlwatch_config.install_chrome:  # pragma: no cover
295
        _exit(playwright_install_chrome())
296

297

298
def main() -> None:  # pragma: no cover
299
    """The entry point run when __name__ == '__main__'.
300

301
    Contains all the high-level logic to instantiate all classes that run the program.
302

303
    :raises NotImplementedError: If a `--database-engine` is specified that is not supported.
304
    :raises RuntimeError: If `--database-engine redis` is selected but `--cache` with a redis URI is not provided.
305
    """
306
    # Make sure that PendingDeprecationWarning are displayed from all modules (otherwise only those in __main__ are)
307
    warnings.filterwarnings('default', category=PendingDeprecationWarning)
308

309
    # Issue deprecation warning if running on minimum version supported
310
    python_version_warning()
311

312
    # Path where the config, jobs and hooks files are located
313
    if os.name != 'nt':
314
        config_path = platformdirs.user_config_path(__project_name__)  # typically ~/.config/{__project_name__}
315
    else:
316
        config_path = platformdirs.user_documents_path().joinpath(__project_name__)
317

318
    # Path where the snapshot database is located; typically ~/.local/share/{__project_name__} or
319
    # $XDG_DATA_HOME/{__project_name__} # in linux, ~/Library/Application Support/webchanges in macOS  and
320
    # or %LOCALAPPDATA%\{__project_name__}\{__project_name__} in Windows
321
    data_path = platformdirs.user_data_path(__project_name__, __project_name__.capitalize())
322

323
    # Default config, jobs, hooks and ssdb (database) files
324
    default_config_file = config_path.joinpath('config.yaml')
325
    default_jobs_file = config_path.joinpath('jobs.yaml')
326
    default_hooks_file = config_path.joinpath('hooks.py')
327
    default_ssdb_file = data_path.joinpath('snapshots.db')
328

329
    # Check for and if found migrate snapshot database file from version <= 3.21, which was called cache.db and located
330
    # in user_cache_path
331
    migrate_from_legacy('webchanges', ssdb_file=default_ssdb_file)
332

333
    # Check for and if found migrate legacy (urlwatch) files
334
    migrate_from_legacy('urlwatch', default_config_file, default_jobs_file, default_hooks_file, default_ssdb_file)
335

336
    # Parse command line arguments
337
    command_config = CommandConfig(
338
        sys.argv[1:],
339
        config_path,
340
        default_config_file,
341
        default_jobs_file,
342
        default_hooks_file,
343
        default_ssdb_file,
344
    )
345

346
    # Set up the logger to verbose if needed
347
    setup_logger(command_config.verbose)
348

349
    # For speed, run these here
350
    handle_unitialized_actions(command_config)
351

352
    # Only now, after configuring logging, we can load other modules
353
    from webchanges.command import UrlwatchCommand
354
    from webchanges.main import Urlwatch
355
    from webchanges.storage import (
356
        SsdbDirStorage,
357
        SsdbRedisStorage,
358
        SsdbSQLite3Storage,
359
        SsdbStorage,
360
        YamlConfigStorage,
361
        YamlJobsStorage,
362
    )
363

364
    # Locate config, job and hooks files
365
    command_config.config_file = locate_storage_file(command_config.config_file, command_config.config_path, '.yaml')
366
    command_config.jobs_files = locate_jobs_files(command_config.jobs_files, command_config.config_path, '.yaml')
367
    command_config.hooks_file = locate_storage_file(command_config.hooks_file, command_config.config_path, '.py')
368

369
    # Check for first run
370
    if command_config.config_file == default_config_file and not Path(command_config.config_file).is_file():
371
        first_run(command_config)
372

373
    # Setup config file API
374
    config_storage = YamlConfigStorage(command_config.config_file)  # storage.py
375

376
    # load config (which for syntax checking requires hooks to be loaded too)
377
    if command_config.hooks_file:
378
        load_hooks(command_config.hooks_file)
379
    config_storage.load()
380

381
    # Setup database API
382
    database_engine = (
383
        command_config.database_engine or config_storage.config.get('database', {}).get('engine') or 'sqlite3'
384
    )  # "or 'sqlite3'" is not needed except for a mypy bug; same for the "or 4" below
385
    max_snapshots = command_config.max_snapshots or config_storage.config.get('database', {}).get('max_snapshots') or 4
386
    if database_engine == 'sqlite3':
387
        ssdb_storage: SsdbStorage = SsdbSQLite3Storage(command_config.ssdb_file, max_snapshots)  # storage.py
388
    elif any(str(command_config.ssdb_file).startswith(prefix) for prefix in {'redis://', 'rediss://'}):
389
        ssdb_storage = SsdbRedisStorage(command_config.ssdb_file)  # storage.py
390
    elif database_engine.startswith('redis'):
391
        ssdb_storage = SsdbRedisStorage(database_engine)
392
    elif database_engine == 'textfiles':
393
        ssdb_storage = SsdbDirStorage(command_config.ssdb_file)  # storage.py
394
    elif database_engine == 'minidb':
395
        # legacy code imported only if needed (requires minidb, which is not a dependency)
396
        from webchanges.storage_minidb import SsdbMiniDBStorage
397

398
        ssdb_storage = SsdbMiniDBStorage(command_config.ssdb_file)  # storage.py
399
    else:
400
        raise NotImplementedError(f'Database engine {database_engine} not implemented')
401

402
    # Setup jobs file API
403
    jobs_storage = YamlJobsStorage(command_config.jobs_files)  # storage.py
404

405
    # Setup 'webchanges'
406
    urlwatcher = Urlwatch(command_config, config_storage, ssdb_storage, jobs_storage)  # main.py
407
    urlwatch_command = UrlwatchCommand(urlwatcher)  # command.py
408

409
    # Run 'webchanges', starting with processing command line arguments
410
    urlwatch_command.run()
411

412
    # Remove Playwright debug mode if there
413
    teardown_logger(command_config.verbose)
414

415

416
if __name__ == '__main__':
417
    main()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc