• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/core/goals/deploy.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

UNCOV
4
from __future__ import annotations
×
5

UNCOV
6
import logging
×
UNCOV
7
from abc import ABCMeta
×
UNCOV
8
from collections.abc import Iterable
×
UNCOV
9
from dataclasses import dataclass
×
UNCOV
10
from itertools import chain
×
UNCOV
11
from typing import cast
×
12

UNCOV
13
from pants.core.goals.package import PackageFieldSet
×
UNCOV
14
from pants.core.goals.publish import (
×
15
    PublishFieldSet,
16
    PublishProcesses,
17
    PublishProcessesRequest,
18
    package_for_publish,
19
)
UNCOV
20
from pants.engine.console import Console
×
UNCOV
21
from pants.engine.environment import ChosenLocalEnvironmentName, EnvironmentName
×
UNCOV
22
from pants.engine.goal import Goal, GoalSubsystem
×
UNCOV
23
from pants.engine.internals.graph import find_valid_field_sets
×
UNCOV
24
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
×
UNCOV
25
from pants.engine.intrinsics import execute_process, run_interactive_process
×
UNCOV
26
from pants.engine.process import (
×
27
    FallibleProcessResult,
28
    InteractiveProcess,
29
    InteractiveProcessResult,
30
    Process,
31
)
UNCOV
32
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
×
UNCOV
33
from pants.engine.target import (
×
34
    FieldSet,
35
    FieldSetsPerTargetRequest,
36
    NoApplicableTargetsBehavior,
37
    Target,
38
    TargetRootsToFieldSetsRequest,
39
)
UNCOV
40
from pants.engine.unions import union
×
UNCOV
41
from pants.option.option_types import BoolOption
×
UNCOV
42
from pants.util.strutil import pluralize, softwrap
×
43

UNCOV
44
logger = logging.getLogger(__name__)
×
45

46

UNCOV
47
@union(in_scope_types=[EnvironmentName])
×
UNCOV
48
@dataclass(frozen=True)
×
UNCOV
49
class DeployFieldSet(FieldSet, metaclass=ABCMeta):
×
50
    """The FieldSet type for the `deploy` goal.
51

52
    Union members may list any fields required to fulfill the instantiation of the `DeployProcess`
53
    result of the deploy rule.
54
    """
55

56

UNCOV
57
@dataclass(frozen=True)
×
UNCOV
58
class DeployProcess:
×
59
    """A process that when executed will have the side effect of deploying a target.
60

61
    To provide with the ability to deploy a given target, create a custom `DeployFieldSet` for
62
    that given target and implement a rule that returns `DeployProcess` for that custom field set:
63

64
    Example:
65

66
        @dataclass(frozen=True)
67
        class MyDeploymentFieldSet(DeployFieldSet):
68
            pass
69

70
        @rule
71
        async def my_deployment_process(field_set: MyDeploymentFieldSet) -> DeployProcess:
72
            # Create the underlying process that executes the deployment
73
            process = Process(...)
74
            return DeployProcess(
75
                name="my_deployment",
76
                process=InteractiveProcess.from_process(process)
77
            )
78

79
        def rules():
80
            return [
81
                *collect_rules(),
82
                UnionRule(DeployFieldSet, MyDeploymentFieldSet)
83
            ]
84

85
    Use the `publish_dependencies` field to provide with a list of targets that produce packages
86
    which need to be externally published before the deployment process is executed.
87
    """
88

UNCOV
89
    name: str
×
UNCOV
90
    process: InteractiveProcess | None
×
UNCOV
91
    publish_dependencies: tuple[Target, ...] = ()
×
UNCOV
92
    description: str | None = None
×
93

94

UNCOV
95
@rule(polymorphic=True)
×
UNCOV
96
async def create_deploy_process(
×
97
    req: DeployFieldSet,
98
    environment_name: EnvironmentName,
99
) -> DeployProcess:
100
    raise NotImplementedError()
×
101

102

UNCOV
103
class DeploySubsystem(GoalSubsystem):
×
UNCOV
104
    name = "experimental-deploy"
×
UNCOV
105
    help = "Perform a deployment process."
×
106

UNCOV
107
    dry_run = BoolOption(
×
108
        default=False,
109
        help=softwrap(
110
            """
111
            If true, perform a dry run without deploying anything.
112
            For example, when deploying a terraform_deployment, a plan will be executed instead of an apply.
113
            """
114
        ),
115
    )
UNCOV
116
    publish_dependencies = BoolOption(
×
117
        default=True,
118
        help=softwrap(
119
            """
120
            If false, don't publish target dependencies before deploying the target.
121
            For example, when deploying a helm_deployment, dependent docker images will not be published.
122
            """
123
        ),
124
    )
125

UNCOV
126
    required_union_implementation = (DeployFieldSet,)
×
127

128

UNCOV
129
@dataclass(frozen=True)
×
UNCOV
130
class Deploy(Goal):
×
UNCOV
131
    subsystem_cls = DeploySubsystem
×
UNCOV
132
    environment_behavior = Goal.EnvironmentBehavior.LOCAL_ONLY  # TODO(#17129) — Migrate this.
×
133

134

UNCOV
135
@dataclass(frozen=True)
×
UNCOV
136
class _PublishProcessesForTargetRequest:
×
UNCOV
137
    target: Target
×
138

139

UNCOV
140
@rule
×
UNCOV
141
async def publish_process_for_target(
×
142
    request: _PublishProcessesForTargetRequest,
143
) -> PublishProcesses:
144
    package_field_sets, publish_field_sets = await concurrently(
×
145
        find_valid_field_sets(
146
            FieldSetsPerTargetRequest(PackageFieldSet, [request.target]), **implicitly()
147
        ),
148
        find_valid_field_sets(
149
            FieldSetsPerTargetRequest(PublishFieldSet, [request.target]), **implicitly()
150
        ),
151
    )
152

153
    return await package_for_publish(
×
154
        PublishProcessesRequest(
155
            package_field_sets=package_field_sets.field_sets,
156
            publish_field_sets=publish_field_sets.field_sets,
157
        ),
158
        **implicitly(),
159
    )
160

161

UNCOV
162
async def _all_publish_processes(targets: Iterable[Target]) -> PublishProcesses:
×
163
    processes_per_target = await concurrently(
×
164
        publish_process_for_target(_PublishProcessesForTargetRequest(target)) for target in targets
165
    )
166

167
    return PublishProcesses(chain.from_iterable(processes_per_target))
×
168

169

UNCOV
170
def _process_results_to_string(
×
171
    console: Console,
172
    res: InteractiveProcessResult | FallibleProcessResult,
173
    *,
174
    names: Iterable[str],
175
    success_status: str,
176
    description: str | None = None,
177
) -> tuple[int, tuple[str, ...]]:
178
    results = []
×
179
    if res.exit_code == 0:
×
180
        sigil = console.sigil_succeeded()
×
181
        status = success_status
×
182
        prep = "to"
×
183
    else:
184
        sigil = console.sigil_failed()
×
185
        status = "failed"
×
186
        prep = "for"
×
187

188
    if description:
×
189
        status += f" {prep} {description}"
×
190

191
    for name in names:
×
192
        results.append(f"{sigil} {name} {status}")
×
193
    return res.exit_code, tuple(results)
×
194

195

UNCOV
196
async def _invoke_process(
×
197
    console: Console,
198
    process: InteractiveProcess | None,
199
    *,
200
    names: Iterable[str],
201
    success_status: str,
202
    description: str | None = None,
203
) -> tuple[int, tuple[str, ...]]:
204
    results = []
×
205

206
    if not process:
×
207
        sigil = console.sigil_skipped()
×
208
        status = "skipped"
×
209
        if description:
×
210
            status += f" {description}"
×
211
        for name in names:
×
212
            results.append(f"{sigil} {name} {status}.")
×
213
        return 0, tuple(results)
×
214

215
    logger.debug(f"Execute {process}")
×
216
    res = await run_interactive_process(process)
×
217
    return _process_results_to_string(
×
218
        console, res, names=names, success_status=success_status, description=description
219
    )
220

221

UNCOV
222
@goal_rule
×
UNCOV
223
async def run_deploy(
×
224
    console: Console,
225
    deploy_subsystem: DeploySubsystem,
226
    local_environment: ChosenLocalEnvironmentName,
227
) -> Deploy:
228
    target_roots_to_deploy_field_sets = await find_valid_field_sets_for_target_roots(
×
229
        TargetRootsToFieldSetsRequest(
230
            DeployFieldSet,
231
            goal_description=f"the `{deploy_subsystem.name}` goal",
232
            no_applicable_targets_behavior=NoApplicableTargetsBehavior.error,
233
        ),
234
        **implicitly(),
235
    )
236

237
    deploy_processes = await concurrently(
×
238
        create_deploy_process(**implicitly({field_set: DeployFieldSet}))
239
        for field_set in target_roots_to_deploy_field_sets.field_sets
240
    )
241

242
    publish_targets = (
×
243
        set(chain.from_iterable([deploy.publish_dependencies for deploy in deploy_processes]))
244
        if deploy_subsystem.publish_dependencies
245
        else set()
246
    )
247

248
    logger.debug(f"Found {pluralize(len(publish_targets), 'dependency')}")
×
249
    publish_processes = await _all_publish_processes(publish_targets)
×
250

251
    exit_code: int = 0
×
252
    results: list[str] = []
×
253

254
    if publish_processes:
×
255
        logger.info(f"Publishing {pluralize(len(publish_processes), 'dependency')}...")
×
256
        background_publish_processes = [
×
257
            publish for publish in publish_processes if isinstance(publish.process, Process)
258
        ]
259
        foreground_publish_processes = [
×
260
            publish
261
            for publish in publish_processes
262
            if isinstance(publish.process, InteractiveProcess) or publish.process is None
263
        ]
264

265
        # Publish all background deployments first
266
        background_results = await concurrently(
×
267
            execute_process(
268
                **implicitly(
269
                    {
270
                        cast(Process, publish.process): Process,
271
                        local_environment.val: EnvironmentName,
272
                    }
273
                )
274
            )
275
            for publish in background_publish_processes
276
        )
277
        for pub, res in zip(background_publish_processes, background_results):
×
278
            ec, statuses = _process_results_to_string(
×
279
                console,
280
                res,
281
                names=pub.names,
282
                description=pub.description,
283
                success_status="published",
284
            )
285
            exit_code = ec if ec != 0 else exit_code
×
286
            results.extend(statuses)
×
287

288
        # Publish all foreground deployments next.
289
        for publish in foreground_publish_processes:
×
290
            process = cast(InteractiveProcess | None, publish.process)
×
291
            ec, statuses = await _invoke_process(
×
292
                console,
293
                process,
294
                names=publish.names,
295
                description=publish.description,
296
                success_status="published",
297
            )
298
            exit_code = ec if ec != 0 else exit_code
×
299
            results.extend(statuses)
×
300

301
    # Only proceed to deploy of all dependencies have been successfully published
302
    if exit_code == 0 and deploy_processes:
×
303
        logger.info("Deploying targets...")
×
304

305
        for deploy in deploy_processes:
×
306
            # Invoke the deployment.
307
            ec, statuses = await _invoke_process(
×
308
                console,
309
                deploy.process,
310
                names=[deploy.name],
311
                success_status="deployed",
312
                description=deploy.description,
313
            )
314
            exit_code = ec if ec != 0 else exit_code
×
315
            results.extend(statuses)
×
316

317
    console.print_stderr("")
×
318
    if not results:
×
319
        sigil = console.sigil_skipped()
×
320
        console.print_stderr(f"{sigil} Nothing deployed.")
×
321

322
    for line in results:
×
323
        console.print_stderr(line)
×
324

325
    return Deploy(exit_code)
×
326

327

UNCOV
328
def rules():
×
UNCOV
329
    return collect_rules()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc