• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

andreoliwa / nitpick / 8668991803

12 Apr 2024 11:09PM UTC coverage: 96.489%. First build
8668991803

Pull #649

github

web-flow
Merge 0119a6cc8 into 7c4ae187c
Pull Request #649: Add GitLab fetcher

610 of 649 branches covered (93.99%)

Branch coverage included in aggregate %.

75 of 79 new or added lines in 2 files covered. (94.94%)

2166 of 2228 relevant lines covered (97.22%)

4.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.25
/src/nitpick/style.py
1
# pylint: disable=too-many-lines # TODO: refactor: break this into separate modules in a follow-up PR
2
"""Style parsing and merging."""
5✔
3

4
from __future__ import annotations
5✔
5

6
import os
5✔
7
from contextlib import suppress
5✔
8
from dataclasses import dataclass, field
5✔
9
from datetime import timedelta
5✔
10
from enum import auto
5✔
11
from functools import lru_cache
5✔
12
from pathlib import Path
5✔
13
from typing import TYPE_CHECKING, ClassVar, Iterable, Iterator, Literal, NoReturn, Sequence, cast
5✔
14

15
import attr
5✔
16
import click
5✔
17
import requests
5✔
18
import tomlkit
5✔
19
from flatten_dict import flatten, unflatten
5✔
20
from furl import furl
5✔
21
from identify import identify
5✔
22
from loguru import logger
5✔
23
from more_itertools import always_iterable, peekable
5✔
24
from requests import Session
5✔
25
from requests_cache import CachedSession
5✔
26
from slugify import slugify
5✔
27
from strenum import LowercaseStrEnum
5✔
28
from toml import TomlDecodeError
5✔
29

30
from nitpick import compat, fields
5✔
31
from nitpick.blender import SEPARATOR_FLATTEN, TomlDoc, custom_reducer, custom_splitter, search_json
5✔
32
from nitpick.constants import (
5✔
33
    CACHE_DIR_NAME,
34
    CACHE_EXPIRATION_DEFAULTS,
35
    DOT,
36
    GIT_AT_REFERENCE,
37
    GITHUB_COM,
38
    GITHUB_COM_API,
39
    GITHUB_COM_QUERY_STRING_TOKEN,
40
    GITHUB_COM_RAW,
41
    GITLAB_BRANCH_REFERENCE,
42
    GITLAB_COM,
43
    JMEX_NITPICK_STYLES_INCLUDE,
44
    MERGED_STYLE_TOML,
45
    NITPICK_STYLE_TOML,
46
    PROJECT_NAME,
47
    PROJECT_OWNER,
48
    PYTHON_PYPROJECT_TOML,
49
    REGEX_CACHE_UNIT,
50
    TOML_EXTENSION,
51
    WRITE_STYLE_MAX_ATTEMPTS,
52
    CachingEnum,
53
    Flake8OptionEnum,
54
)
55
from nitpick.exceptions import Deprecation, QuitComplainingError, pretty_exception
5✔
56
from nitpick.generic import glob_files, url_to_python_path
5✔
57
from nitpick.plugins.info import FileInfo
5✔
58
from nitpick.schemas import BaseStyleSchema, NitpickSectionSchema, flatten_marshmallow_errors
5✔
59
from nitpick.violations import Fuss, Reporter, StyleViolations
5✔
60

61
try:
5✔
62
    # DeprecationWarning: The dpath.util package is being deprecated.
63
    # All util functions have been moved to dpath package top level.
64
    from dpath import merge as dpath_merge
5✔
65
except ImportError:  # pragma: no cover
66
    from dpath.util import merge as dpath_merge
67

68
GIT_API_SESSION = Session()  # Dedicated session to reuse connections
5✔
69

70
if TYPE_CHECKING:
71
    from marshmallow import Schema
72

73
    from nitpick.core import Project
74
    from nitpick.plugins.base import NitpickPlugin
75
    from nitpick.typedefs import JsonDict
76

77

78
@lru_cache
5✔
79
def builtin_resources_root() -> Path:
5✔
80
    """Built-in resources root."""
81
    return Path(str(compat.files("nitpick.resources")))
5✔
82

83

84
@lru_cache
5✔
85
def repo_root() -> Path:
5✔
86
    """Repository root, 3 levels up from the resources root."""
87
    return builtin_resources_root().parent.parent.parent
×
88

89

90
def builtin_styles() -> Iterable[Path]:
5✔
91
    """List the built-in styles."""
92
    yield from builtin_resources_root().glob("**/*.toml")
5✔
93

94

95
@lru_cache
5✔
96
def github_default_branch(api_url: str, *, token: str | None = None) -> str:
5✔
97
    """Get the default branch from the GitHub repo using the API.
98

99
    For now, for URLs without an authorization token embedded, the request is
100
    not authenticated on GitHub, so it might hit a rate limit with:
101
    ``requests.exceptions.HTTPError: 403 Client Error: rate limit exceeded for url``
102

103
    This function is using ``lru_cache()`` as a simple memoizer, trying to avoid this rate limit error.
104
    """
105
    headers = {"Authorization": f"token {token}"} if token else None
5✔
106
    response = GIT_API_SESSION.get(api_url, headers=headers)
5✔
107
    response.raise_for_status()
5✔
108

109
    return response.json()["default_branch"]
5✔
110

111

112
def parse_cache_option(cache_option: str) -> tuple[CachingEnum, timedelta | int]:
5✔
113
    """Parse the cache option provided on pyproject.toml.
114

115
    If no cache is provided or is invalid, the default is *one hour*.
116
    """
117
    clean_cache_option = cache_option.strip().upper() if cache_option else ""
5✔
118
    try:
5✔
119
        caching = CachingEnum[clean_cache_option]
5✔
120
        logger.info(f"Simple cache option: {caching.name}")
5✔
121
    except KeyError:
5✔
122
        caching = CachingEnum.EXPIRES
5✔
123

124
    expires_after = CACHE_EXPIRATION_DEFAULTS[caching]
5✔
125
    if caching is CachingEnum.EXPIRES and clean_cache_option:
5✔
126
        for match in REGEX_CACHE_UNIT.finditer(clean_cache_option):
5✔
127
            plural_unit = match.group("unit").lower() + "s"
5✔
128
            number = int(match.group("number"))
5✔
129
            logger.info(f"Cache option with unit: {number} {plural_unit}")
5✔
130
            expires_after = timedelta(**{plural_unit: number})
5✔
131
            break
5✔
132
        else:
133
            logger.warning(f"Invalid cache option: {clean_cache_option}. Defaulting to 1 hour")
5✔
134

135
    return caching, expires_after
5✔
136

137

138
def raise_gitlab_incorrect_url_error(url: furl) -> NoReturn:
5✔
139
    """Raise an error if the URL is not a valid GitLab URL."""
140
    message = f"Invalid GitLab URL: {url}"
5✔
141
    raise ValueError(message)
5✔
142

143

144
@dataclass()
5✔
145
class StyleManager:  # pylint: disable=too-many-instance-attributes
5✔
146
    """Include styles recursively from one another."""
147

148
    project: Project
5✔
149
    offline: bool
5✔
150
    cache_option: str
5✔
151

152
    _cache_dir: Path = field(init=False)
5✔
153
    _fixed_name_classes: set = field(init=False)
5✔
154

155
    def __post_init__(self) -> None:
5✔
156
        """Initialize dependant fields."""
157
        self._merged_styles: JsonDict = {}
5✔
158
        self._already_included: set[str] = set()
5✔
159
        self._dynamic_schema_class: type = BaseStyleSchema
5✔
160
        self._style_fetcher_manager = StyleFetcherManager(self.offline, self.cache_dir, self.cache_option)
5✔
161
        self._config_validator = ConfigValidator(self.project)
5✔
162
        self.rebuild_dynamic_schema()
5✔
163

164
    def __hash__(self):
5✔
165
        """Calculate hash on hashable items so lru_cache knows how to cache data from this class."""
166
        return hash((self.project, self.offline, self.cache_option))
×
167

168
    @property
5✔
169
    def cache_dir(self) -> Path:
5✔
170
        """Clear the cache directory (on the project root or on the current directory)."""
171
        try:
5✔
172
            path = self._cache_dir
5✔
173
        except AttributeError:
5✔
174
            self._cache_dir = path = self.project.root / CACHE_DIR_NAME / PROJECT_NAME
5✔
175
            # TODO: fix: check if the merged style file is still needed
176
            #  if not, this line can be removed
177
            path.mkdir(parents=True, exist_ok=True)
5✔
178
        return path
5✔
179

180
    @staticmethod
5✔
181
    def get_default_style_url(github=False) -> furl:
5✔
182
        """Return the URL of the default style/preset."""
183
        if github:
5!
184
            from nitpick import __version__  # pylint: disable=import-outside-toplevel
5✔
185

186
            return GitHubURL(PROJECT_OWNER, PROJECT_NAME, f"v{__version__}", (NITPICK_STYLE_TOML,)).long_protocol_url
5✔
187

188
        return furl(scheme=Scheme.PY, host=PROJECT_NAME, path=["resources", "presets", PROJECT_NAME])
×
189

190
    def find_initial_styles(self, configured_styles: Sequence[str], base: str | None = None) -> Iterator[Fuss]:
5✔
191
        """Find the initial style(s) and include them.
192

193
        base is the URL for the source of the initial styles, and is used to
194
        resolve relative references. If omitted, defaults to the project root.
195
        """
196
        project_root = self.project.root
5✔
197
        base_url = furl(base or project_root.resolve().as_uri())
5✔
198

199
        if configured_styles:
5✔
200
            chosen_styles = configured_styles
5✔
201
            config_file = base_url.path.segments[-1] if base else PYTHON_PYPROJECT_TOML
5✔
202
            logger.info(f"Using styles configured in {config_file}: {', '.join(chosen_styles)}")
5✔
203
        else:
204
            paths = glob_files(project_root, [NITPICK_STYLE_TOML])
5✔
205
            if paths:
5✔
206
                chosen_styles = [sorted(paths)[0].expanduser().resolve().as_uri()]
5✔
207
                log_message = "Using local style found climbing the directory tree"
5✔
208
            else:
209
                yield Reporter().make_fuss(StyleViolations.NO_STYLE_CONFIGURED)
5✔
210
                return
5✔
211
            logger.info(f"{log_message}: {chosen_styles[0]}")
5✔
212

213
        yield from self.include_multiple_styles(
5✔
214
            self._style_fetcher_manager.normalize_url(ref, base_url) for ref in chosen_styles
215
        )
216

217
    def include_multiple_styles(self, chosen_styles: Iterable[furl]) -> Iterator[Fuss]:
5✔
218
        """Include a list of styles (or just one) into this style tree."""
219
        for style_url in chosen_styles:
5✔
220
            yield from self._include_style(style_url)
5✔
221

222
    def _include_style(self, style_url: furl) -> Iterator[Fuss]:
5✔
223
        if style_url.url in self._already_included:
5✔
224
            return
5✔
225
        self._already_included.add(style_url.url)
5✔
226

227
        file_contents = self._style_fetcher_manager.fetch(style_url)
5✔
228
        if file_contents is None:
5✔
229
            return
5✔
230

231
        # generate a 'human readable' version of the URL; a relative path for local files
232
        # and the URL otherwise.
233
        display_name = style_url.url
5✔
234
        if style_url.scheme == "file":
5✔
235
            path = url_to_python_path(style_url)
5✔
236
            with suppress(ValueError):
5✔
237
                path = path.relative_to(self.project.root)
5✔
238
            display_name = str(path)
5✔
239

240
        read_toml_dict = self._read_toml(file_contents, display_name)
5✔
241

242
        # normalize sub-style URIs, before merging
243
        sub_styles = [
5✔
244
            self._style_fetcher_manager.normalize_url(ref, style_url)
245
            for ref in always_iterable(search_json(read_toml_dict, JMEX_NITPICK_STYLES_INCLUDE, []))
246
        ]
247
        if sub_styles:
5✔
248
            read_toml_dict.setdefault("nitpick", {}).setdefault("styles", {})["include"] = [
5✔
249
                str(url) for url in sub_styles
250
            ]
251

252
        toml_dict, validation_errors = self._config_validator.validate(read_toml_dict)
5✔
253

254
        if validation_errors:
5✔
255
            yield Reporter(FileInfo(self.project, display_name)).make_fuss(
5✔
256
                StyleViolations.INVALID_CONFIG, flatten_marshmallow_errors(validation_errors)
257
            )
258

259
        dpath_merge(self._merged_styles, flatten(toml_dict, custom_reducer(SEPARATOR_FLATTEN)))
5✔
260

261
        yield from self.include_multiple_styles(sub_styles)
5✔
262

263
    def _read_toml(self, file_contents: str, display_name: str) -> JsonDict:
5✔
264
        toml = TomlDoc(string=file_contents)
5✔
265
        try:
5✔
266
            read_toml_dict = toml.as_object
5✔
267
        # TODO: refactor: replace by TOMLKitError when using tomlkit only in the future:
268
        except TomlDecodeError as err:
5✔
269
            # If the TOML itself could not be parsed, we can't go on
270
            raise QuitComplainingError(
5✔
271
                Reporter(FileInfo(self.project, display_name)).make_fuss(
272
                    StyleViolations.INVALID_TOML, exception=pretty_exception(err)
273
                )
274
            ) from err
275
        return read_toml_dict
5✔
276

277
    def merge_toml_dict(self) -> JsonDict:
5✔
278
        """Merge all included styles into a TOML (actually JSON) dictionary."""
279
        merged_dict = unflatten(self._merged_styles, custom_splitter(SEPARATOR_FLATTEN))
5✔
280
        # TODO: fix: check if the merged style file is still needed
281
        merged_style_path: Path = self.cache_dir / MERGED_STYLE_TOML
5✔
282
        toml = TomlDoc(obj=merged_dict)
5✔
283

284
        attempt = 1
5✔
285
        while attempt < WRITE_STYLE_MAX_ATTEMPTS:
5!
286
            try:
5✔
287
                merged_style_path.write_text(toml.reformatted)
5✔
288
                break
5✔
289
            except OSError:
×
290
                attempt += 1
×
291

292
        return merged_dict
5✔
293

294
    @staticmethod
5✔
295
    def file_field_pair(filename: str, base_file_class: type[NitpickPlugin]) -> dict[str, fields.Field]:
5✔
296
        """Return a schema field with info from a config file class."""
297
        unique_filename_with_underscore = slugify(filename, separator="_")
×
298

299
        kwargs = {"data_key": filename}
×
300
        if base_file_class.validation_schema:
×
301
            file_field = fields.Nested(base_file_class.validation_schema, **kwargs)
×
302
        else:
303
            # For some files (e.g.: TOML/ INI files), there is no strict schema;
304
            # it can be anything they allow.
305
            # It's out of Nitpick's scope to validate those files.
306
            file_field = fields.Dict(fields.String, **kwargs)
×
307
        return {unique_filename_with_underscore: file_field}
×
308

309
    def load_fixed_name_plugins(self) -> set[type[NitpickPlugin]]:
5✔
310
        """Separate classes with fixed file names from classes with dynamic files names."""
311
        try:
5✔
312
            fixed_name_classes = self._fixed_name_classes
5✔
313
        except AttributeError:
5✔
314
            fixed_name_classes = self._fixed_name_classes = {
5✔
315
                plugin_class
316
                for plugin_class in self.project.plugin_manager.hook.plugin_class()  # pylint: disable=no-member
317
                if plugin_class.filename
318
            }
319
        return fixed_name_classes
5✔
320

321
    def rebuild_dynamic_schema(self) -> None:
5✔
322
        """Rebuild the dynamic Marshmallow schema when needed, adding new fields that were found on the style."""
323
        new_files_found: dict[str, fields.Field] = {}
5✔
324

325
        fixed_name_classes = self.load_fixed_name_plugins()
5✔
326

327
        for subclass in fixed_name_classes:
5!
328
            new_files_found.update(self.file_field_pair(subclass.filename, subclass))
×
329

330
        # Only recreate the schema if new fields were found.
331
        if new_files_found:
5!
332
            self._dynamic_schema_class = type("DynamicStyleSchema", (self._dynamic_schema_class,), new_files_found)
×
333

334
    def _find_subclasses(self, data, handled_tags, new_files_found):
5✔
335
        for possible_file in data:
×
336
            found_subclasses = []
×
337
            for file_tag in identify.tags_from_filename(possible_file):
×
338
                handler_subclass = handled_tags.get(file_tag)
×
339
                if handler_subclass:
×
340
                    found_subclasses.append(handler_subclass)
×
341

342
            for found_subclass in found_subclasses:
×
343
                new_files_found.update(self.file_field_pair(possible_file, found_subclass))
×
344

345

346
@dataclass(repr=True)  # TODO: refactor: use attrs instead
5✔
347
class ConfigValidator:
5✔
348
    """Validate a nitpick configuration."""
349

350
    project: Project
5✔
351

352
    def validate(self, config_dict: dict) -> tuple[dict, dict]:
5✔
353
        """Validate an already parsed toml file."""
354
        validation_errors = {}
5✔
355
        toml_dict = {}
5✔
356
        for key, value_dict in config_dict.items():
5✔
357
            info = FileInfo.create(self.project, key)
5✔
358
            toml_dict[info.path_from_root] = value_dict
5✔
359
            validation_errors.update(self._validate_item(key, info, value_dict))
5✔
360
        return toml_dict, validation_errors
5✔
361

362
    def _validate_item(self, key, info, value_dict):
5✔
363
        validation_errors = {}
5✔
364
        if key == PROJECT_NAME:
5✔
365
            schemas = [NitpickSectionSchema]
5✔
366
        else:
367
            schemas = peekable(self._get_validation_schemas_for_file(info))
5✔
368
            if not schemas:
5✔
369
                validation_errors[key] = [BaseStyleSchema.error_messages["unknown"]]
5✔
370
        valid_schema, all_errors = self._validate_schemas(info, schemas, value_dict)
5✔
371
        if not valid_schema:
5✔
372
            Deprecation.jsonfile_section(all_errors)
5✔
373
            validation_errors.update(all_errors)
5✔
374

375
        return validation_errors
5✔
376

377
    def _get_validation_schemas_for_file(self, info):
5✔
378
        for plugin_class in self.project.plugin_manager.hook.can_handle(info=info):  # pylint: disable=no-member
5✔
379
            yield plugin_class.validation_schema
5✔
380

381
    def _validate_schemas(self, info, schemas, value_dict):
5✔
382
        all_errors = {}
5✔
383
        for schema in schemas:
5✔
384
            errors = self._validate_schema(schema, info.path_from_root, value_dict)
5✔
385
            if not errors:
5✔
386
                # When multiple schemas match a file type, exit when a valid schema is found
387
                return True, {}
5✔
388

389
            all_errors.update(errors)
5✔
390

391
        return False, all_errors
5✔
392

393
    @staticmethod
5✔
394
    def _validate_schema(schema: type[Schema], path_from_root: str, original_data: JsonDict) -> dict[str, list[str]]:
5✔
395
        """Validate the schema for the file."""
396
        if not schema:
5✔
397
            return {}
5✔
398

399
        inherited_schema = schema is not BaseStyleSchema
5✔
400
        data_to_validate = original_data if inherited_schema else {path_from_root: None}
5✔
401
        local_errors = schema().validate(data_to_validate)
5✔
402
        if local_errors and inherited_schema:
5✔
403
            local_errors = {path_from_root: local_errors}
5✔
404
        return local_errors
5✔
405

406

407
class Scheme(LowercaseStrEnum):
5✔
408
    """URL schemes."""
409

410
    # keep-sorted start
411
    FILE = auto()
5✔
412
    GH = auto()
5✔
413
    GITHUB = auto()
5✔
414
    GITLAB = auto()
5✔
415
    GL = auto()
5✔
416
    HTTP = auto()
5✔
417
    HTTPS = auto()
5✔
418
    PY = auto()
5✔
419
    PYPACKAGE = auto()
5✔
420
    # keep-sorted end
421

422

423
@dataclass()
5✔
424
class StyleFetcherManager:
5✔
425
    """Manager that controls which fetcher to be used given a protocol."""
426

427
    offline: bool
5✔
428
    cache_dir: Path
5✔
429
    cache_option: str
5✔
430

431
    session: CachedSession = field(init=False)
5✔
432
    fetchers: dict[str, StyleFetcher] = field(init=False)
5✔
433
    schemes: tuple[str, ...] = field(init=False)
5✔
434

435
    def __post_init__(self) -> None:
5✔
436
        """Initialize dependant properties."""
437
        caching, expire_after = parse_cache_option(self.cache_option)
5✔
438
        # honour caching headers on the response when an expiration time has
439
        # been set meaning that the server can dictate cache expiration
440
        # overriding the local expiration time. This may need to become a
441
        # separate configuration option in future.
442
        cache_control = caching is CachingEnum.EXPIRES
5✔
443
        self.session = CachedSession(
5✔
444
            str(self.cache_dir / "styles"), expire_after=expire_after, cache_control=cache_control
445
        )
446
        self.fetchers = fetchers = _get_fetchers(self.session)
5✔
447

448
        # used to test if a string URL is relative or not. These strings
449
        # *include the colon*.
450
        protocols = {prot for fetcher in fetchers.values() for prot in fetcher.protocols}
5✔
451
        self.schemes = tuple(f"{prot}:" for prot in protocols)
5✔
452

453
    def normalize_url(self, url: str | furl, base: furl) -> furl:
5✔
454
        """Normalize a style URL.
455

456
        The URL is made absolute against base, then passed to individual fetchers
457
        to produce a canonical version of the URL.
458
        """
459
        if isinstance(url, str) and not url.startswith(self.schemes):
5✔
460
            url = self._fetcher_for(base).preprocess_relative_url(url)
5✔
461
        absolute = base.copy().join(url)
5✔
462
        return self._fetcher_for(absolute).normalize(absolute)
5✔
463

464
    def fetch(self, url: furl) -> str | None:
5✔
465
        """Determine which fetcher to be used and fetch from it.
466

467
        Returns None when offline is True and the fetcher would otherwise
468
        require a connection.
469
        """
470
        fetcher = self._fetcher_for(url)
5✔
471
        if self.offline and fetcher.requires_connection:
5✔
472
            return None
5✔
473

474
        return fetcher.fetch(url)
5✔
475

476
    def _fetcher_for(self, url: furl) -> StyleFetcher:
5✔
477
        """Determine which fetcher to be used.
478

479
        Try a fetcher by domain first, then by protocol scheme.
480
        """
481
        fetcher = self.fetchers.get(url.host) if url.host else None
5✔
482
        if not fetcher:
5✔
483
            fetcher = self.fetchers.get(url.scheme)
5✔
484
        if not fetcher:
5✔
485
            msg = f"URL protocol {url.scheme!r} is not supported"
5✔
486
            raise RuntimeError(msg)
5✔
487
        return fetcher
5✔
488

489

490
@dataclass(frozen=True)
5✔
491
class StyleFetcher:
5✔
492
    """Base class of all fetchers, it encapsulates get/fetch from a specific source."""
493

494
    requires_connection: ClassVar[bool] = False
5✔
495

496
    # only set when requires_connection is True
497
    session: CachedSession | None = None
5✔
498
    protocols: tuple[str, ...] = ()
5✔
499
    domains: tuple[str, ...] = ()
5✔
500

501
    def __post_init__(self):
5✔
502
        """Validate that session has been passed in for requires_connection == True."""
503
        if self.requires_connection and self.session is None:
5!
504
            msg = "session is required"
×
505
            raise ValueError(msg)
×
506

507
    def preprocess_relative_url(self, url: str) -> str:  # pylint: disable=no-self-use
5✔
508
        """Preprocess a relative URL.
509

510
        Only called for urls that lack a scheme (at the very least), being resolved
511
        against a base URL that matches this specific fetcher.
512
        """
513
        return url
5✔
514

515
    def _normalize_url_path(self, url: furl) -> furl:  # pylint: disable=no-self-use
5✔
516
        """Normalize the path component of a URL."""
517
        if not url.path.segments[-1].endswith(TOML_EXTENSION):
5✔
518
            url = url.copy()
5✔
519
            url.path.segments[-1] = f"{url.path.segments[-1]}{TOML_EXTENSION}"
5✔
520
        return url
5✔
521

522
    def _normalize_scheme(self, scheme: str) -> str:  # pylint: disable=no-self-use
5✔
523
        """Normalize the scheme component of a URL."""
524
        return scheme
5✔
525

526
    def normalize(self, url: furl) -> furl:
5✔
527
        """Normalize a URL.
528

529
        Produces a canonical URL, meant to be used to uniquely identify a style resource.
530

531
        - The base name has .toml appended if not already ending in that extension
532
        - Individual fetchers can further normalize the path and scheme.
533
        """
534
        new_scheme = self._normalize_scheme(url.scheme)
5✔
535
        if new_scheme != url.scheme:
5✔
536
            url = url.copy().set(scheme=new_scheme)
5✔
537
        return self._normalize_url_path(url)
5✔
538

539
    def fetch(self, url: furl) -> str:
5✔
540
        """Fetch a style from a specific fetcher."""
541
        raise NotImplementedError
542

543

544
def _get_fetchers(session: CachedSession) -> dict[str, StyleFetcher]:
5✔
545
    def _factory(klass: type[StyleFetcher]) -> StyleFetcher:
5✔
546
        return klass(session) if klass.requires_connection else klass()
5✔
547

548
    fetchers = (
5✔
549
        _factory(FileFetcher),
550
        _factory(HttpFetcher),
551
        _factory(GitHubFetcher),
552
        _factory(GitLabFetcher),
553
        _factory(PythonPackageFetcher),
554
    )
555
    return dict(_fetchers_to_pairs(fetchers))
5✔
556

557

558
def _fetchers_to_pairs(fetchers: Iterable[StyleFetcher]) -> Iterator[tuple[str, StyleFetcher]]:
5✔
559
    for fetcher in fetchers:
5✔
560
        for protocol in fetcher.protocols:
5✔
561
            yield protocol, fetcher
5✔
562
        for domain in fetcher.domains:
5✔
563
            yield domain, fetcher
5✔
564

565

566
@dataclass(frozen=True)
5✔
567
class FileFetcher(StyleFetcher):  # pylint: disable=too-few-public-methods
5✔
568
    """Fetch a style from a local file."""
569

570
    protocols: tuple[str, ...] = (Scheme.FILE,)  # type: ignore[assignment]
5✔
571

572
    def preprocess_relative_url(self, url: str) -> str:
5✔
573
        """Preprocess a relative URL.
574

575
        Only called for urls that lack a scheme (at the very least), being resolved
576
        against a base URL that matches this specific fetcher.
577

578
        Relative paths are file paths; any ~ home reference is expanded at this point.
579
        """
580
        # We have to expand ~ values before trying to resolve a path as a file URL
581
        path = Path(url).expanduser()
5✔
582
        # return absolute paths as URLs as on Windows they could otherwise not resolve
583
        # cleanly against a file:// base. Relative paths should use POSIX conventions.
584
        return path.as_uri() if path.is_absolute() else path.as_posix()
5✔
585

586
    def _normalize_url_path(self, url: furl) -> furl:
5✔
587
        local_path = url_to_python_path(super()._normalize_url_path(url))
5✔
588
        return furl(local_path.resolve().as_uri())
5✔
589

590
    def fetch(self, url: furl) -> str:
5✔
591
        """Fetch a style from a local file."""
592
        return url_to_python_path(url).read_text(encoding="UTF-8")
5✔
593

594

595
@dataclass(frozen=True)
5✔
596
class GitHubURL:
5✔
597
    """Represent a GitHub URL, created from a URL or from its parts."""
598

599
    owner: str
5✔
600
    repository: str
5✔
601
    git_reference: str
5✔
602
    path: tuple[str, ...] = ()
5✔
603
    auth_token: str | None = None
5✔
604
    query_params: tuple[tuple[str, str], ...] | None = None
5✔
605

606
    @property
5✔
607
    def default_branch(self) -> str:
5✔
608
        """Default GitHub branch."""
609
        return github_default_branch(self.api_url.url, token=self.token)  # function is memoized
5✔
610

611
    @property
5✔
612
    def token(self) -> str | None:
5✔
613
        """Token encoded in this URL.
614

615
        If present and it starts with a ``$``, it will be replaced with the
616
        value of the environment corresponding to the remaining part of the
617
        string.
618
        """
619
        token = self.auth_token
5✔
620
        if token is not None and token.startswith("$"):
5✔
621
            token = os.getenv(token[1:])
5✔
622
        return token
5✔
623

624
    @property
5✔
625
    def authorization_header(self) -> dict[str, str] | None:
5✔
626
        """Authorization header encoded in this URL."""
627
        token = self.token
5✔
628
        return {"Authorization": f"token {token}"} if token else None
5✔
629

630
    @property
5✔
631
    def git_reference_or_default(self) -> str:
5✔
632
        """Return the Git reference if informed, or return the default branch."""
633
        return self.git_reference or self.default_branch
5✔
634

635
    @property
5✔
636
    def url(self) -> furl:
5✔
637
        """Default URL built from attributes."""
638
        return furl(
5✔
639
            scheme=Scheme.HTTPS,
640
            host=GITHUB_COM,
641
            path=[self.owner, self.repository, "blob", self.git_reference_or_default, *self.path],
642
            query_params=self.query_params,
643
        )
644

645
    @property
5✔
646
    def raw_content_url(self) -> furl:
5✔
647
        """Raw content URL for this path."""
648
        return furl(
5✔
649
            scheme=Scheme.HTTPS,
650
            host=GITHUB_COM_RAW,
651
            path=[self.owner, self.repository, self.git_reference_or_default, *self.path],
652
            query_params=self.query_params,
653
        )
654

655
    @classmethod
5✔
656
    def from_furl(cls, url: furl) -> GitHubURL:
5✔
657
        """Create an instance from a parsed URL in any accepted format.
658

659
        See the code for ``test_parsing_github_urls()`` for more examples.
660
        """
661
        auth_token = url.username or url.args.get(GITHUB_COM_QUERY_STRING_TOKEN)
5✔
662
        query_params = tuple((key, value) for key, value in url.args.items() if key != GITHUB_COM_QUERY_STRING_TOKEN)
5✔
663

664
        if url.scheme in GitHubFetcher.protocols:
5✔
665
            owner = url.host
5✔
666
            repo_with_git_reference, *path = url.path.segments
5✔
667
            repo, _, git_reference = repo_with_git_reference.partition(GIT_AT_REFERENCE)
5✔
668
        else:  # github.com URL (raw.githubusercontent.com is handled by the HTTP fetcher)
669
            # Skip the 'blob' component in the github.com URL.
670
            owner, repo, _, git_reference, *path = url.path.segments
5✔
671

672
        if path and not path[-1]:
5!
673
            # strip trailing slashes
674
            *path, _ = path
×
675

676
        return cls(owner, repo, git_reference, tuple(path), auth_token, query_params)
5✔
677

678
    @property
5✔
679
    def api_url(self) -> furl:
5✔
680
        """API URL for this repo."""
681
        return furl(scheme=Scheme.HTTPS, host=GITHUB_COM_API, path=["repos", self.owner, self.repository])
5✔
682

683
    @property
5✔
684
    def short_protocol_url(self) -> furl:
5✔
685
        """Short protocol URL (``gh``)."""
686
        return self._build_url(cast(str, Scheme.GH))
5✔
687

688
    @property
5✔
689
    def long_protocol_url(self) -> furl:
5✔
690
        """Long protocol URL (``github``)."""
691
        return self._build_url(cast(str, Scheme.GITHUB))
5✔
692

693
    def _build_url(self, scheme: str) -> furl:
5✔
694
        if self.git_reference and self.git_reference != self.default_branch:
5✔
695
            at_reference = f"{GIT_AT_REFERENCE}{self.git_reference}"
5✔
696
        else:
697
            at_reference = ""
5✔
698
        return furl(scheme=scheme, host=self.owner, path=[f"{self.repository}{at_reference}", *self.path])
5✔
699

700

701
@dataclass(frozen=True)
5✔
702
class HttpFetcher(StyleFetcher):
5✔
703
    """Fetch a style from an http/https server."""
704

705
    requires_connection = True
5✔
706

707
    protocols: tuple[str, ...] = (Scheme.HTTP, Scheme.HTTPS)  # type: ignore[assignment]
5✔
708

709
    def fetch(self, url: furl) -> str:
5✔
710
        """Fetch the style from a web location."""
711
        try:
5✔
712
            contents = self._download(url)
5✔
713
        except requests.ConnectionError as err:
5✔
714
            logger.exception(f"Request failed with {err}")
5✔
715
            click.secho(
5✔
716
                f"The URL {url} could not be downloaded. Either your network is unreachable or the URL is broken."
717
                f" Check the URL, fix your connection, or use "
718
                f" {Flake8OptionEnum.OFFLINE.as_flake8_flag()} / {Flake8OptionEnum.OFFLINE.as_envvar()}=1",
719
                fg="red",
720
                err=True,
721
            )
722
            return ""
5✔
723
        return contents
5✔
724

725
    def _download(self, url: furl, **kwargs) -> str:
5✔
726
        logger.info(f"Downloading style from {url}")
5✔
727
        if self.session is None:
5!
728
            msg = "No session provided to fetcher"
×
729
            raise RuntimeError(msg)
×
730
        response = self.session.get(url.url, **kwargs)
5✔
731
        response.raise_for_status()
5✔
732
        return response.text
5✔
733

734

735
@dataclass(frozen=True)
5✔
736
class GitHubFetcher(HttpFetcher):  # pylint: disable=too-few-public-methods
5✔
737
    """Fetch styles from GitHub repositories."""
738

739
    protocols: tuple[str, ...] = (Scheme.GH, Scheme.GITHUB)  # type: ignore[assignment,has-type]
5✔
740
    domains: tuple[str, ...] = (GITHUB_COM,)
5✔
741

742
    def _normalize_scheme(self, scheme: str) -> str:  # pylint: disable=no-self-use
5✔
743
        # Use github:// instead of gh:// in the canonical URL
744
        return Scheme.GITHUB if scheme == Scheme.GH else scheme  # type: ignore[return-value]
5✔
745

746
    def _download(self, url: furl, **kwargs) -> str:
5✔
747
        github_url = GitHubURL.from_furl(url)
5✔
748
        kwargs.setdefault("headers", github_url.authorization_header)
5✔
749
        return super()._download(github_url.raw_content_url, **kwargs)
5✔
750

751

752
@dataclass(frozen=True)
5✔
753
class GitLabURL:
5✔
754
    """Represent a GitLab URL, created from a URL or from its parts."""
755

756
    scheme: str
5✔
757
    host: str
5✔
758
    project: list[str]
5✔
759
    path: str
5✔
760
    git_reference: str
5✔
761
    query_params: tuple[tuple[str, str], ...]
5✔
762
    auth_token: str | None = None
5✔
763

764
    @property
5✔
765
    def token(self) -> str | None:
5✔
766
        """Token encoded in this URL.
767

768
        If present, and it starts with a ``$``, it will be replaced with the
769
        value of the environment corresponding to the remaining part of the
770
        string.
771
        """
772
        token = self.auth_token
5✔
773
        if token is not None and token.startswith("$"):
5✔
774
            token = os.getenv(token[1:])
5✔
775
        return token
5✔
776

777
    @property
5✔
778
    def authorization_header(self) -> dict[Literal["PRIVATE-TOKEN"], str] | None:
5✔
779
        """Authorization header encoded in this URL."""
780
        return {"PRIVATE-TOKEN": self.token} if self.token else None
5✔
781

782
    @property
5✔
783
    def raw_content_url(self) -> furl:
5✔
784
        """Raw content URL for this path."""
785
        if self.scheme in GitLabFetcher.protocols:
5✔
786
            query_params = self.query_params
5✔
787
            if self.git_reference:
5✔
788
                # If the branch was not specified for the raw file, GitLab itself will substitute the HEAD branch
789
                # https://docs.gitlab.com/ee/api/repository_files.html#get-raw-file-from-repository
790
                query_params += ((GITLAB_BRANCH_REFERENCE, self.git_reference),)
5✔
791

792
            return furl(
5✔
793
                scheme=Scheme.HTTPS,
794
                host=self.host,
795
                path=["api", "v4", "projects", *self.project, "repository", "files", self.path, "raw"],
796
                query_params=query_params,
797
            )
798

799
        return furl(
5✔
800
            scheme=Scheme.HTTPS,
801
            host=self.host,
802
            path=[*self.project, "-", "raw", self.git_reference, *self.path],
803
            query_params=self.query_params,
804
        )
805

806
    @classmethod
5✔
807
    def _from_http_scheme_furl(cls, url: furl) -> GitLabURL:
5✔
808
        """Create an instance from a parsed URL in accepted format.
809

810
        Gitlab GUI uses named path like:
811
        - https://gitlab.com/group_URL/subgroup/project_name/-/blob/branch/folder/file
812
        - https://gitlab.com/group_URL/sub_group/project_name/-/raw/branch/folder/file
813
        See the code for ``test_parsing_gitlab_http_api_urls()`` for more examples.
814
        """
815
        auth_token = url.username
5✔
816
        query_params = tuple(url.args.items())
5✔
817

818
        segments = url.path.segments
5✔
819
        try:
5✔
820
            dash_index = segments.index("-")
5✔
821
            blob_index = dash_index + 2  # "blob" or "raw" should immediately follow
5✔
822
            if segments[dash_index + 1] not in {"blob", "raw"}:
5✔
823
                raise_gitlab_incorrect_url_error(url)
5✔
824
        except (ValueError, IndexError):
5✔
825
            raise_gitlab_incorrect_url_error(url)
5✔
826

827
        project = segments[:dash_index]  # Everything before the "-"
5✔
828
        # The error for git_reference will never be raised due to url normalization (always add .toml)
829
        git_reference = segments[blob_index]  # The first argument after "blob"
5✔
830
        path = segments[blob_index + 1 :]  # Everything after the git_reference
5✔
831

832
        return cls(
5✔
833
            scheme=url.scheme,
834
            host=url.host,
835
            project=project,
836
            path=path,
837
            git_reference=git_reference,
838
            query_params=query_params,
839
            auth_token=auth_token,
840
        )
841

842
    @classmethod
5✔
843
    def _from_gitlab_scheme_furl(cls, url: furl) -> GitLabURL:
5✔
844
        """Create an instance from a parsed URL in accepted format.
845

846
        The Gitlab API does not pay attention to the groups and subgroups the project is in,
847
        instead it uses the project number and use URL encoded full path to file:
848
        https://gitlab.com/api/v4/projects/project_number/repository/files/folder%2Ffile/raw?ref=branch_name
849

850
        Documentation https://docs.gitlab.com/ee/api/repository_files.html#get-raw-file-from-repository
851
        See the code for ``test_parsing_gitlab_gl_api_urls()`` for more examples.
852
        """
853
        auth_token = url.username
5✔
854
        query_params = tuple(url.args.items())
5✔
855

856
        project_with_git_reference, *path = url.path.segments
5✔
857
        project, _, git_reference = project_with_git_reference.partition(GIT_AT_REFERENCE)
5✔
858
        project = [project]
5✔
859
        path = "/".join(path)
5✔
860

861
        return cls(
5✔
862
            scheme=url.scheme,
863
            host=url.host,
864
            project=project,
865
            path=path,
866
            git_reference=git_reference,
867
            query_params=query_params,
868
            auth_token=auth_token,
869
        )
870

871
    @classmethod
5✔
872
    def from_furl(cls, url: furl) -> GitLabURL:
5✔
873
        """Create an instance from a parsed URL in any accepted format.
874

875
        The gitlab:// scheme uses the Gitlab API and takes a project number.
876
        The https:// scheme uses the Gitlab site and takes the path to the project.
877
        """
878
        if url.scheme in GitLabFetcher.protocols:
5✔
879
            return cls._from_gitlab_scheme_furl(url)
5✔
880
        return cls._from_http_scheme_furl(url)
5✔
881

882

883
@dataclass(frozen=True)
5✔
884
class GitLabFetcher(HttpFetcher):  # pylint: disable=too-few-public-methods
5✔
885
    """Fetch styles from GitLab repositories via API."""
886

887
    protocols: tuple[str, ...] = (
5✔
888
        Scheme.GL,
889
        Scheme.GITLAB,
890
    )  # type: ignore[assignment,has-type]
891
    domains: tuple[str, ...] = (GITLAB_COM,)
5✔
892

893
    def _normalize_scheme(self, scheme: str) -> str:  # pylint: disable=no-self-use
5✔
894
        # Use gitlab:// instead of gl:// in the canonical URL
NEW
895
        return Scheme.GITLAB if scheme == Scheme.GL else scheme  # type: ignore[return-value]
×
896

897
    def _download(self, url: furl, **kwargs) -> str:
5✔
898
        """Downloading style from url."""
NEW
899
        gitlab_url = GitLabURL.from_furl(url)
×
NEW
900
        kwargs.setdefault("headers", gitlab_url.authorization_header)
×
NEW
901
        return super()._download(gitlab_url.raw_content_url, **kwargs)
×
902

903

904
@dataclass(frozen=True)
5✔
905
class PythonPackageURL:
5✔
906
    """Represent a resource file in installed Python package."""
907

908
    import_path: str
5✔
909
    resource_name: str
5✔
910

911
    @classmethod
5✔
912
    def from_furl(cls, url: furl) -> PythonPackageURL:
5✔
913
        """Create an instance from a parsed URL in any accepted format.
914

915
        See the code for ``test_parsing_python_package_urls()`` for more examples.
916
        """
917
        package_name = url.host
5✔
918
        resource_path = url.path.segments
5✔
919
        if resource_path and not resource_path[-1]:
5!
920
            # strip trailing slash
921
            *resource_path, _ = resource_path
×
922

923
        *resource_path, resource_name = resource_path
5✔
924
        return cls(import_path=DOT.join([package_name, *resource_path]), resource_name=resource_name)
5✔
925

926
    @property
5✔
927
    def content_path(self) -> Path:
5✔
928
        """Raw path of resource file."""
929
        return Path(str(compat.files(self.import_path))) / self.resource_name
5✔
930

931

932
@dataclass(frozen=True)
5✔
933
class PythonPackageFetcher(StyleFetcher):  # pylint: disable=too-few-public-methods
5✔
934
    """Fetch a style from an installed Python package.
935

936
    URL schemes:
937
    - ``py://import/path/of/style/file/<style_file_name>``
938
    - ``pypackage://import/path/of/style/file/<style_file_name>``
939

940
    E.g. ``py://some_package/path/nitpick.toml``.
941
    """
942

943
    protocols: tuple[str, ...] = (Scheme.PY, Scheme.PYPACKAGE)  # type: ignore[assignment]
5✔
944

945
    def _normalize_scheme(self, scheme: str) -> str:  # noqa: ARG002
5✔
946
        # Always use the shorter py:// scheme name in the canonical URL.
947
        return cast(str, Scheme.PY)
5✔
948

949
    def fetch(self, url: furl) -> str:
5✔
950
        """Fetch the style from a Python package."""
951
        package_url = PythonPackageURL.from_furl(url)
5✔
952
        return package_url.content_path.read_text(encoding="UTF-8")
5✔
953

954

955
@attr.mutable(kw_only=True)
5✔
956
class BuiltinStyle:  # pylint: disable=too-few-public-methods
5✔
957
    """A built-in style file in TOML format."""
958

959
    formatted: str
5✔
960
    path_from_resources_root: str
5✔
961

962
    identify_tag: str = attr.field(init=False)
5✔
963
    name: str = attr.field(init=False)
5✔
964
    url: str = attr.field(init=False)
5✔
965
    files: list[str] = attr.field(init=False)
5✔
966

967
    @classmethod
5✔
968
    def from_path(cls, resource_path: Path, library_dir: Path | None = None) -> BuiltinStyle:
5✔
969
        """Create a style from its path."""
970
        without_suffix = resource_path.with_suffix("")
5✔
971
        if library_dir:
5✔
972
            # Style in a directory
973
            from_resources_root = without_suffix.relative_to(library_dir)
5✔
974
            bis = BuiltinStyle(
5✔
975
                formatted=str(without_suffix),
976
                path_from_resources_root=from_resources_root.as_posix(),
977
            )
978
        else:
979
            # Style from the built-in library
980
            package_path = resource_path.relative_to(builtin_resources_root().parent.parent)
5✔
981
            from_resources_root = without_suffix.relative_to(builtin_resources_root())
5✔
982
            root, *path_remainder = package_path.parts
5✔
983
            path_remainder_without_suffix = (*path_remainder[:-1], without_suffix.parts[-1])
5✔
984
            bis = BuiltinStyle(
5✔
985
                formatted=furl(scheme=Scheme.PY, host=root, path=path_remainder_without_suffix).url,
986
                path_from_resources_root=from_resources_root.as_posix(),
987
            )
988
        bis.identify_tag = from_resources_root.parts[0]
5✔
989
        toml_dict = tomlkit.loads(resource_path.read_text(encoding="UTF-8"))
5✔
990

991
        keys = list(toml_dict.keys())
5✔
992
        keys.remove(PROJECT_NAME)
5✔
993
        bis.files = keys
5✔
994

995
        try:
5✔
996
            # Intentionally break the doc generation when styles don't have [nitpick.meta]name
997
            meta = toml_dict["nitpick"]["meta"]  # pylint: disable=invalid-sequence-index
5✔
998
            bis.name = meta["name"]
5✔
999
            bis.url = meta.get("url")
5✔
1000
        except KeyError as err:
×
1001
            msg = f"Style file missing [nitpick.meta] information: {bis}"
×
1002
            raise SyntaxError(msg) from err
×
1003
        return bis
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc