• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18517631058

15 Oct 2025 04:18AM UTC coverage: 69.207% (-11.1%) from 80.267%
18517631058

Pull #22745

github

web-flow
Merge 642a76ca1 into 99919310e
Pull Request #22745: [windows] Add windows support in the stdio crate.

53815 of 77759 relevant lines covered (69.21%)

2.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.97
/src/python/pants/backend/helm/resolve/remotes.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
7✔
5

6
from collections.abc import Iterator
7✔
7
from dataclasses import dataclass
7✔
8
from typing import Any, cast
7✔
9

10
from pants.util.frozendict import FrozenDict
7✔
11
from pants.util.memo import memoized_method
7✔
12

13
ALL_DEFAULT_HELM_REGISTRIES = "<ALL DEFAULT HELM REGISTRIES>"
7✔
14

15
OCI_REGISTRY_PROTOCOL = "oci://"
7✔
16

17

18
class InvalidHelmRegistryAddress(ValueError):
7✔
19
    def __init__(self, alias: str, address: str) -> None:
7✔
20
        super().__init__(
×
21
            f"The registry '{alias}' needs to have a valid OCI address URL "
22
            f"(using protocol '{OCI_REGISTRY_PROTOCOL}'). The given address was instead: {address}"
23
        )
24

25

26
class HelmRemoteAliasNotFoundError(ValueError):
7✔
27
    def __init__(self, alias: str) -> None:
7✔
28
        super().__init__(f"There is no Helm remote configured with alias: {alias}")
×
29

30

31
@dataclass(frozen=True)
7✔
32
class HelmRegistry:
7✔
33
    address: str
7✔
34
    alias: str = ""
7✔
35
    default: bool = False
7✔
36

37
    @classmethod
7✔
38
    def from_dict(cls, alias: str, d: dict[str, Any]) -> HelmRegistry:
7✔
39
        return cls(
×
40
            alias=alias,
41
            address=cast(str, d["address"]).rstrip("/"),
42
            default=d.get("default", alias == "default"),
43
        )
44

45
    def __post_init__(self) -> None:
7✔
46
        if not self.address.startswith(OCI_REGISTRY_PROTOCOL):
×
47
            raise InvalidHelmRegistryAddress(self.alias, self.address)
×
48

49
    def register(self, remotes: dict[str, HelmRegistry]) -> None:
7✔
50
        remotes[self.address] = self
×
51
        if self.alias:
×
52
            remotes[f"@{self.alias}"] = self
×
53

54
    def repository_ref(self, repository: str | None) -> str:
7✔
55
        return f"{self.address}/{repository or ''}".rstrip("/")
×
56

57
    def package_ref(self, artifact_name: str, *, repository: str | None) -> str:
7✔
58
        repo_ref = self.repository_ref(repository)
×
59
        return f"{repo_ref}/{artifact_name}"
×
60

61

62
@dataclass(frozen=True)
7✔
63
class HelmRemotes:
7✔
64
    default: tuple[HelmRegistry, ...]
7✔
65
    all: FrozenDict[str, HelmRegistry]
7✔
66

67
    @classmethod
7✔
68
    def from_dict(cls, d_regs: dict[str, Any]) -> HelmRemotes:
7✔
69
        remotes: dict[str, HelmRegistry] = {}
×
70
        for alias, opts in d_regs.items():
×
71
            HelmRegistry.from_dict(alias, opts).register(remotes)
×
72
        return cls(
×
73
            default=tuple(
74
                sorted(
75
                    {r for r in remotes.values() if isinstance(r, HelmRegistry) and r.default},
76
                    key=lambda r: r.address,
77
                )
78
            ),
79
            all=FrozenDict(remotes),
80
        )
81

82
    def get(self, *aliases_or_addresses: str) -> Iterator[HelmRegistry]:
7✔
83
        for alias_or_address in aliases_or_addresses:
×
84
            if alias_or_address in self.all:
×
85
                yield self.all[alias_or_address]
×
86
            elif alias_or_address.startswith("@"):
×
87
                raise HelmRemoteAliasNotFoundError(alias_or_address)
×
88
            elif alias_or_address == ALL_DEFAULT_HELM_REGISTRIES:
×
89
                yield from self.default
×
90
            elif alias_or_address.startswith(OCI_REGISTRY_PROTOCOL):
×
91
                yield HelmRegistry(address=alias_or_address)
×
92

93
    @memoized_method
7✔
94
    def registries(self) -> tuple[HelmRegistry, ...]:
7✔
95
        return tuple(set(self.all.values()))
×
96

97
    @property
7✔
98
    def default_registry(self) -> HelmRegistry | None:
7✔
99
        remote = self.all.get("default")
×
100
        if not remote and self.default:
×
101
            remote = self.default[0]
×
102
        return remote
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc