• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18517631058

15 Oct 2025 04:18AM UTC coverage: 69.207% (-11.1%) from 80.267%
18517631058

Pull #22745

github

web-flow
Merge 642a76ca1 into 99919310e
Pull Request #22745: [windows] Add windows support in the stdio crate.

53815 of 77759 relevant lines covered (69.21%)

2.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.26
/src/python/pants/backend/docker/registries.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
6✔
5

6
from collections.abc import Iterator
6✔
7
from dataclasses import dataclass
6✔
8
from typing import Any
6✔
9

10
from pants.util.frozendict import FrozenDict
6✔
11
from pants.util.strutil import softwrap
6✔
12

13
ALL_DEFAULT_REGISTRIES = "<all default registries>"
6✔
14

15

16
class DockerRegistryError(ValueError):
6✔
17
    pass
6✔
18

19

20
class DockerRegistryOptionsNotFoundError(DockerRegistryError):
6✔
21
    def __init__(self, message):
6✔
22
        super().__init__(
×
23
            f"{message}\n\n"
24
            "Use the [docker].registries configuration option to define custom registries."
25
        )
26

27

28
class DockerRegistryAddressCollisionError(DockerRegistryError):
6✔
29
    def __init__(self, first, second):
6✔
30
        message = softwrap(
×
31
            f"""
32
            Duplicated docker registry address for aliases: {first.alias}, {second.alias}.
33
            Each registry `address` in `[docker].registries` must be unique.
34
            """
35
        )
36

37
        super().__init__(message)
×
38

39

40
@dataclass(frozen=True)
6✔
41
class DockerRegistryOptions:
6✔
42
    address: str
6✔
43
    alias: str = ""
6✔
44
    default: bool = False
6✔
45
    skip_push: bool = False
6✔
46
    extra_image_tags: tuple[str, ...] = ()
6✔
47
    repository: str | None = None
6✔
48
    use_local_alias: bool = False
6✔
49

50
    @classmethod
6✔
51
    def from_dict(cls, alias: str, d: dict[str, Any]) -> DockerRegistryOptions:
6✔
52
        return cls(
×
53
            alias=alias,
54
            address=d["address"],
55
            default=d.get("default", alias == "default"),
56
            skip_push=d.get("skip_push", DockerRegistryOptions.skip_push),
57
            extra_image_tags=tuple(
58
                d.get("extra_image_tags", DockerRegistryOptions.extra_image_tags)
59
            ),
60
            repository=d.get("repository"),
61
            use_local_alias=d.get("use_local_alias", False),
62
        )
63

64
    def register(self, registries: dict[str, DockerRegistryOptions]) -> None:
6✔
65
        if self.address in registries:
×
66
            collision = registries[self.address]
×
67
            raise DockerRegistryAddressCollisionError(collision, self)
×
68
        registries[self.address] = self
×
69
        if self.alias:
×
70
            registries[f"@{self.alias}"] = self
×
71

72

73
@dataclass(frozen=True)
6✔
74
class DockerRegistries:
6✔
75
    default: tuple[DockerRegistryOptions, ...]
6✔
76
    registries: FrozenDict[str, DockerRegistryOptions]
6✔
77

78
    @classmethod
6✔
79
    def from_dict(cls, d: dict[str, Any]) -> DockerRegistries:
6✔
80
        registries: dict[str, DockerRegistryOptions] = {}
×
81
        for alias, options in d.items():
×
82
            DockerRegistryOptions.from_dict(alias, options).register(registries)
×
83
        return cls(
×
84
            default=tuple(
85
                sorted({r for r in registries.values() if r.default}, key=lambda r: r.address)
86
            ),
87
            registries=FrozenDict(registries),
88
        )
89

90
    def get(self, *aliases_or_addresses: str) -> Iterator[DockerRegistryOptions]:
6✔
91
        for alias_or_address in aliases_or_addresses:
×
92
            if alias_or_address in self.registries:
×
93
                # Get configured registry by "@alias" or "address".
94
                yield self.registries[alias_or_address]
×
95
            elif alias_or_address.startswith("@"):
×
96
                raise DockerRegistryOptionsNotFoundError(
×
97
                    f"There is no Docker registry configured with alias: {alias_or_address[1:]}."
98
                )
99
            elif alias_or_address == ALL_DEFAULT_REGISTRIES:
×
100
                yield from self.default
×
101
            else:
102
                # Assume an explicit address from the BUILD file.
103
                yield DockerRegistryOptions(address=alias_or_address)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc