• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/backend/docker/goals/publish.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

UNCOV
4
from __future__ import annotations
×
5

UNCOV
6
import logging
×
UNCOV
7
from collections import defaultdict
×
UNCOV
8
from dataclasses import dataclass
×
UNCOV
9
from itertools import chain
×
UNCOV
10
from typing import DefaultDict, cast
×
11

UNCOV
12
from pants.backend.docker.goals.package_image import BuiltDockerImage
×
UNCOV
13
from pants.backend.docker.subsystems.docker_options import DockerOptions
×
UNCOV
14
from pants.backend.docker.target_types import DockerImageRegistriesField, DockerImageSkipPushField
×
UNCOV
15
from pants.backend.docker.util_rules.docker_binary import DockerBinary
×
UNCOV
16
from pants.core.goals.publish import (
×
17
    PublishFieldSet,
18
    PublishOutputData,
19
    PublishPackages,
20
    PublishProcesses,
21
    PublishRequest,
22
)
UNCOV
23
from pants.core.util_rules.env_vars import environment_vars_subset
×
UNCOV
24
from pants.engine.env_vars import EnvironmentVarsRequest
×
UNCOV
25
from pants.engine.process import InteractiveProcess, Process
×
UNCOV
26
from pants.engine.rules import collect_rules, implicitly, rule
×
27

UNCOV
28
logger = logging.getLogger(__name__)
×
29

30

UNCOV
31
class PublishDockerImageRequest(PublishRequest):
×
UNCOV
32
    pass
×
33

34

UNCOV
35
@dataclass(frozen=True)
×
UNCOV
36
class PublishDockerImageFieldSet(PublishFieldSet):
×
UNCOV
37
    publish_request_type = PublishDockerImageRequest
×
UNCOV
38
    required_fields = (DockerImageRegistriesField,)
×
39

UNCOV
40
    registries: DockerImageRegistriesField
×
UNCOV
41
    skip_push: DockerImageSkipPushField
×
42

UNCOV
43
    def get_output_data(self) -> PublishOutputData:
×
44
        return PublishOutputData(
×
45
            {
46
                "publisher": "docker",
47
                "registries": self.registries.value or (),
48
                **super().get_output_data(),
49
            }
50
        )
51

52

UNCOV
53
@rule
×
UNCOV
54
async def push_docker_images(
×
55
    request: PublishDockerImageRequest,
56
    docker: DockerBinary,
57
    options: DockerOptions,
58
    options_env_aware: DockerOptions.EnvironmentAware,
59
) -> PublishProcesses:
60
    tags = tuple(
×
61
        chain.from_iterable(
62
            cast(BuiltDockerImage, image).tags
63
            for pkg in request.packages
64
            for image in pkg.artifacts
65
        )
66
    )
67

68
    if request.field_set.skip_push.value:
×
69
        return PublishProcesses(
×
70
            [
71
                PublishPackages(
72
                    names=tags,
73
                    description=f"(by `{request.field_set.skip_push.alias}` on {request.field_set.address})",
74
                ),
75
            ]
76
        )
77

78
    env = await environment_vars_subset(
×
79
        EnvironmentVarsRequest(options_env_aware.env_vars), **implicitly()
80
    )
81
    skip_push_reasons: DefaultDict[str, DefaultDict[str, set[str]]] = defaultdict(
×
82
        lambda: defaultdict(set)
83
    )
84
    jobs: list[PublishPackages] = []
×
85
    refs: list[str] = []
×
86
    processes: list[Process | InteractiveProcess] = []
×
87

88
    for tag in tags:
×
89
        for registry in options.registries().registries.values():
×
90
            if registry.skip_push and tag.startswith(f"{registry.address}/"):
×
91
                skip_push_reasons["skip_push"][registry.alias].add(tag)
×
92
                break
×
93
            if registry.use_local_alias and tag.startswith(f"{registry.alias}/"):
×
94
                skip_push_reasons["use_local_alias"][registry.alias].add(tag)
×
95
                break
×
96
        else:
97
            refs.append(tag)
×
98
            push_process = docker.push_image(tag, env)
×
99
            if options.publish_noninteractively:
×
100
                processes.append(push_process)
×
101
            else:
102
                processes.append(InteractiveProcess.from_process(push_process))
×
103

104
    for ref, process in zip(refs, processes):
×
105
        jobs.append(
×
106
            PublishPackages(
107
                names=(ref,),
108
                process=process,
109
            )
110
        )
111

112
    for reason, skip_push in skip_push_reasons.items():
×
113
        for name, skip_tags in skip_push.items():
×
114
            jobs.append(
×
115
                PublishPackages(
116
                    names=tuple(skip_tags),
117
                    description=f"(by `{reason}` on registry @{name})",
118
                ),
119
            )
120

121
    return PublishProcesses(jobs)
×
122

123

UNCOV
124
def rules():
×
UNCOV
125
    return (
×
126
        *collect_rules(),
127
        *PublishDockerImageFieldSet.rules(),
128
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc