• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/core/goals/publish.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
"""Goal for publishing packaged targets to any repository or registry etc.
4

5
Plugins implement the publish protocol that provides this goal with the processes to run in order to
6
publish the artifacts.
7

8
The publish protocol consists of defining two union members and one rule, returning the processes to
9
run. See the doc for the corresponding classes in this module for details on the classes to define.
10

11
Example rule:
12

13
    @rule
14
    async def publish_example(request: PublishToMyRepoRequest, ...) -> PublishProcesses:
15
      # Create `InteractiveProcess` instances or `Process` instances as required by the `request`.
16
      return PublishProcesses(...)
17
"""
18

UNCOV
19
from __future__ import annotations
×
20

UNCOV
21
import collections
×
UNCOV
22
import json
×
UNCOV
23
import logging
×
UNCOV
24
from abc import ABCMeta
×
UNCOV
25
from collections.abc import Coroutine
×
UNCOV
26
from dataclasses import asdict, dataclass, field, is_dataclass, replace
×
UNCOV
27
from enum import Enum
×
UNCOV
28
from itertools import chain
×
UNCOV
29
from typing import Any, ClassVar, Generic, TypeVar, cast, final
×
30

UNCOV
31
from pants.core.goals.package import (
×
32
    BuiltPackage,
33
    EnvironmentAwarePackageRequest,
34
    PackageFieldSet,
35
    environment_aware_package,
36
)
UNCOV
37
from pants.engine.addresses import Address
×
UNCOV
38
from pants.engine.collection import Collection
×
UNCOV
39
from pants.engine.console import Console
×
UNCOV
40
from pants.engine.environment import ChosenLocalEnvironmentName, EnvironmentName
×
UNCOV
41
from pants.engine.goal import Goal, GoalSubsystem
×
UNCOV
42
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
×
UNCOV
43
from pants.engine.intrinsics import execute_process, run_interactive_process_in_environment
×
UNCOV
44
from pants.engine.process import (
×
45
    FallibleProcessResult,
46
    InteractiveProcess,
47
    InteractiveProcessResult,
48
    Process,
49
    ProcessCacheScope,
50
)
UNCOV
51
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
×
UNCOV
52
from pants.engine.target import (
×
53
    FieldSet,
54
    ImmutableValue,
55
    NoApplicableTargetsBehavior,
56
    TargetRootsToFieldSetsRequest,
57
)
UNCOV
58
from pants.engine.unions import UnionMembership, UnionRule, union
×
UNCOV
59
from pants.option.option_types import EnumOption, StrOption
×
UNCOV
60
from pants.util.frozendict import FrozenDict
×
UNCOV
61
from pants.util.strutil import softwrap
×
62

UNCOV
63
logger = logging.getLogger(__name__)
×
64

65

UNCOV
66
_F = TypeVar("_F", bound=FieldSet)
×
67

68

UNCOV
69
class PublishOutputData(FrozenDict[str, ImmutableValue]):
×
UNCOV
70
    pass
×
71

72

UNCOV
73
@union(in_scope_types=[EnvironmentName])
×
UNCOV
74
@dataclass(frozen=True)
×
UNCOV
75
class PublishRequest(Generic[_F]):
×
76
    """Implement a union member subclass of this union class along with a PublishFieldSet subclass
77
    that appoints that member subclass in order to receive publish requests for targets compatible
78
    with the field set.
79

80
    The `packages` hold all artifacts produced for a given target to be published.
81

82
    Example:
83

84
        PublishToMyRepoRequest(PublishRequest):
85
          pass
86

87
        PublishToMyRepoFieldSet(PublishFieldSet):
88
          publish_request_type = PublishToMyRepoRequest
89

90
          # Standard FieldSet semantics from here on:
91
          required_fields = (MyRepositories,)
92
          ...
93
    """
94

UNCOV
95
    field_set: _F
×
UNCOV
96
    packages: tuple[BuiltPackage, ...]
×
97

98

UNCOV
99
_T = TypeVar("_T", bound=PublishRequest)
×
100

101

UNCOV
102
@union(in_scope_types=[EnvironmentName])
×
UNCOV
103
@dataclass(frozen=True)
×
UNCOV
104
class PublishFieldSet(Generic[_T], FieldSet, metaclass=ABCMeta):
×
105
    """FieldSet for PublishRequest.
106

107
    Union members may list any fields required to fulfill the instantiation of the
108
    `PublishProcesses` result of the publish rule.
109
    """
110

111
    # Subclasses must provide this, to a union member (subclass) of `PublishRequest`.
UNCOV
112
    publish_request_type: ClassVar[type[_T]]
×
113

UNCOV
114
    @final
×
UNCOV
115
    def _request(self, packages: tuple[BuiltPackage, ...]) -> _T:
×
116
        """Internal helper for the core publish goal."""
UNCOV
117
        return self.publish_request_type(field_set=self, packages=packages)
×
118

UNCOV
119
    @final
×
UNCOV
120
    @classmethod
×
UNCOV
121
    def rules(cls) -> tuple[UnionRule, ...]:
×
122
        """Helper method for registering the union members."""
UNCOV
123
        return (
×
124
            UnionRule(PublishFieldSet, cls),
125
            UnionRule(PublishRequest, cls.publish_request_type),
126
        )
127

UNCOV
128
    def get_output_data(self) -> PublishOutputData:
×
129
        return PublishOutputData({"target": self.address})
×
130

131

132
# This is the same as the Enum in the test goal.  It is initially separate as
133
# DRYing out is easier than undoing pre-mature abstraction.
UNCOV
134
class ShowOutput(Enum):
×
135
    """Which publish actions to emit detailed output for."""
136

UNCOV
137
    ALL = "all"
×
UNCOV
138
    FAILED = "failed"
×
UNCOV
139
    NONE = "none"
×
140

141

UNCOV
142
@dataclass(frozen=True)
×
UNCOV
143
class PublishPackages:
×
144
    """Processes to run in order to publish the named artifacts.
145

146
    The `names` should list all artifacts being published by the `process` command.
147

148
    The `process` may be `None`, indicating that it will not be published. This will be logged as
149
    `skipped`. If the process returns a non-zero exit code, it will be logged as `failed`. The `process`
150
    can either be a Process or an InteractiveProcess. In most cases, InteractiveProcess will be wanted.
151
    However, some tools have non-interactive publishing modes and can leverage parallelism. See
152
    https://github.com/pantsbuild/pants/issues/17613#issuecomment-1323913381 for more context.
153

154
    The `description` may be a reason explaining why the publish was skipped, or identifying which
155
    repository the artifacts are published to.
156
    """
157

UNCOV
158
    names: tuple[str, ...]
×
UNCOV
159
    process: InteractiveProcess | Process | None = None
×
UNCOV
160
    description: str | None = None
×
UNCOV
161
    data: PublishOutputData = field(default_factory=PublishOutputData)
×
162

UNCOV
163
    def get_output_data(self, **extra_data) -> PublishOutputData:
×
164
        return PublishOutputData(
×
165
            {
166
                "names": self.names,
167
                **self.data,
168
                **extra_data,
169
            }
170
        )
171

172

UNCOV
173
class PublishProcesses(Collection[PublishPackages]):
×
174
    """Collection of what processes to run for all built packages.
175

176
    This is returned from implementing rules in response to a PublishRequest.
177

178
    Depending on the capabilities of the publishing tool, the work may be partitioned based on
179
    number of artifacts and/or repositories to publish to.
180
    """
181

182

UNCOV
183
@rule(polymorphic=True)
×
UNCOV
184
async def create_publish_processes(
×
185
    req: PublishRequest,
186
    environment_name: EnvironmentName,
187
) -> PublishProcesses:
188
    raise NotImplementedError()
×
189

190

UNCOV
191
@dataclass(frozen=True)
×
UNCOV
192
class PublishProcessesRequest:
×
193
    """Internal request taking all field sets for a target and turning it into a `PublishProcesses`
194
    collection (via registered publish plugins)."""
195

UNCOV
196
    package_field_sets: tuple[PackageFieldSet, ...]
×
UNCOV
197
    publish_field_sets: tuple[PublishFieldSet, ...]
×
198

199

UNCOV
200
class PublishSubsystem(GoalSubsystem):
×
UNCOV
201
    name = "publish"
×
UNCOV
202
    help = "Publish deliverables (assets, distributions, images, etc)."
×
203

UNCOV
204
    @classmethod
×
UNCOV
205
    def activated(cls, union_membership: UnionMembership) -> bool:
×
206
        return PackageFieldSet in union_membership and PublishFieldSet in union_membership
×
207

UNCOV
208
    output = StrOption(
×
209
        default=None,
210
        help="Filename for JSON structured publish information.",
211
    )
212

UNCOV
213
    noninteractive_process_output = EnumOption(
×
214
        default=ShowOutput.ALL,
215
        help=softwrap(
216
            """
217
            Show stdout/stderr when publishing with
218
            noninteractively.  This only has an effect for those
219
            publish subsystems that support a noninteractive mode.
220
            """
221
        ),
222
    )
223

224

UNCOV
225
class Publish(Goal):
×
UNCOV
226
    subsystem_cls = PublishSubsystem
×
UNCOV
227
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
×
228

229

UNCOV
230
def _to_publish_output_results_and_data(
×
231
    pub: PublishPackages, res: FallibleProcessResult | InteractiveProcessResult, console: Console
232
) -> tuple[list[str], list[PublishOutputData]]:
233
    if res.exit_code == 0:
×
234
        sigil = console.sigil_succeeded()
×
235
        status = "published"
×
236
        prep = "to"
×
237
    else:
238
        sigil = console.sigil_failed()
×
239
        status = "failed"
×
240
        prep = "for"
×
241

242
    if pub.description:
×
243
        status += f" {prep} {pub.description}"
×
244

245
    results = []
×
246
    output_data = []
×
247
    for name in pub.names:
×
248
        results.append(f"{sigil} {name} {status}.")
×
249

250
    output_data.append(
×
251
        pub.get_output_data(
252
            exit_code=res.exit_code,
253
            published=res.exit_code == 0,
254
            status=status,
255
        )
256
    )
257
    return results, output_data
×
258

259

UNCOV
260
@rule
×
UNCOV
261
async def package_for_publish(
×
262
    request: PublishProcessesRequest, local_environment: ChosenLocalEnvironmentName
263
) -> PublishProcesses:
264
    packages = await concurrently(
×
265
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
266
        for field_set in request.package_field_sets
267
    )
268

269
    for pkg in packages:
×
270
        for artifact in pkg.artifacts:
×
271
            if artifact.relpath:
×
272
                logger.info(f"Packaged {artifact.relpath}")
×
273
            elif artifact.extra_log_lines:
×
274
                logger.info(str(artifact.extra_log_lines[0]))
×
275

276
    publish = await concurrently(
×
277
        create_publish_processes(
278
            **implicitly(
279
                {
280
                    field_set._request(packages): PublishRequest,
281
                    local_environment.val: EnvironmentName,
282
                }
283
            )
284
        )
285
        for field_set in request.publish_field_sets
286
    )
287

288
    # Flatten and dress each publish processes collection with data about its origin.
289
    publish_processes = [
×
290
        replace(
291
            publish_process,
292
            data=PublishOutputData({**publish_process.data, **field_set.get_output_data()}),
293
        )
294
        for processes, field_set in zip(publish, request.publish_field_sets)
295
        for publish_process in processes
296
    ]
297

298
    return PublishProcesses(publish_processes)
×
299

300

UNCOV
301
@goal_rule
×
UNCOV
302
async def run_publish(
×
303
    console: Console, publish: PublishSubsystem, local_environment: ChosenLocalEnvironmentName
304
) -> Publish:
305
    target_roots_to_package_field_sets, target_roots_to_publish_field_sets = await concurrently(
×
306
        find_valid_field_sets_for_target_roots(
307
            TargetRootsToFieldSetsRequest(
308
                PackageFieldSet,
309
                goal_description="",
310
                # Don't warn/error here because it's already covered by `PublishFieldSet`.
311
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.ignore,
312
            ),
313
            **implicitly(),
314
        ),
315
        find_valid_field_sets_for_target_roots(
316
            TargetRootsToFieldSetsRequest(
317
                PublishFieldSet,
318
                goal_description="the `publish` goal",
319
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.warn,
320
            ),
321
            **implicitly(),
322
        ),
323
    )
324

325
    # Only keep field sets that both package something, and have something to publish.
326
    targets = set(target_roots_to_package_field_sets.targets).intersection(
×
327
        set(target_roots_to_publish_field_sets.targets)
328
    )
329

330
    if not targets:
×
331
        return Publish(exit_code=0)
×
332

333
    # Build all packages and request the processes to run for each field set.
334
    processes = await concurrently(
×
335
        package_for_publish(
336
            PublishProcessesRequest(
337
                target_roots_to_package_field_sets.mapping[tgt],
338
                target_roots_to_publish_field_sets.mapping[tgt],
339
            ),
340
            **implicitly(),
341
        )
342
        for tgt in targets
343
    )
344

345
    exit_code: int = 0
×
346
    outputs: list[PublishOutputData] = []
×
347
    results: list[str] = []
×
348

349
    flattened_processes = list(chain.from_iterable(processes))
×
350
    background_publishes: list[PublishPackages] = [
×
351
        pub for pub in flattened_processes if isinstance(pub.process, Process)
352
    ]
353
    foreground_publishes: list[PublishPackages] = [
×
354
        pub for pub in flattened_processes if isinstance(pub.process, InteractiveProcess)
355
    ]
356
    skipped_publishes: list[PublishPackages] = [
×
357
        pub for pub in flattened_processes if pub.process is None
358
    ]
359
    background_requests: list[Coroutine[Any, Any, FallibleProcessResult]] = []
×
360
    for pub in background_publishes:
×
361
        process = cast(Process, pub.process)
×
362
        # Because this is a publish process, we want to ensure we don't cache this process.
363
        assert process.cache_scope == ProcessCacheScope.PER_SESSION
×
364
        background_requests.append(
×
365
            execute_process(
366
                **implicitly({process: Process, local_environment.val: EnvironmentName})
367
            )
368
        )
369

370
    # Process all non-interactive publishes
371
    logger.debug(f"Awaiting {len(background_requests)} background publishes")
×
372
    background_results = await concurrently(background_requests)
×
373
    for pub, background_res in zip(background_publishes, background_results):
×
374
        logger.debug(f"Processing {pub.process} background process")
×
375
        pub_results, pub_output = _to_publish_output_results_and_data(pub, background_res, console)
×
376
        results.extend(pub_results)
×
377
        outputs.extend(pub_output)
×
378

379
        names = "'" + "', '".join(pub.names) + "'"
×
380
        output_msg = f"Output for publishing {names}"
×
381
        if background_res.stdout:
×
382
            output_msg += f"\n{background_res.stdout.decode()}"
×
383
        if background_res.stderr:
×
384
            output_msg += f"\n{background_res.stderr.decode()}"
×
385

386
        if publish.noninteractive_process_output == ShowOutput.ALL or (
×
387
            publish.noninteractive_process_output == ShowOutput.FAILED
388
            and background_res.exit_code == 0
389
        ):
390
            console.print_stdout(output_msg)
×
391

392
        if background_res.exit_code != 0:
×
393
            exit_code = background_res.exit_code
×
394

395
    for pub in skipped_publishes:
×
396
        sigil = console.sigil_skipped()
×
397
        status = "skipped"
×
398
        if pub.description:
×
399
            status += f" {pub.description}"
×
400
        for name in pub.names:
×
401
            results.append(f"{sigil} {name} {status}.")
×
402
        outputs.append(pub.get_output_data(published=False, status=status))
×
403

404
    # Process all interactive publishes
405
    for pub in foreground_publishes:
×
406
        logger.debug(f"Execute {pub.process}")
×
407
        res = await run_interactive_process_in_environment(
×
408
            cast(InteractiveProcess, pub.process), local_environment.val
409
        )
410
        pub_results, pub_output = _to_publish_output_results_and_data(pub, res, console)
×
411
        results.extend(pub_results)
×
412
        outputs.extend(pub_output)
×
413
        if res.exit_code != 0:
×
414
            exit_code = res.exit_code
×
415

416
    console.print_stderr("")
×
417
    if not results:
×
418
        sigil = console.sigil_skipped()
×
419
        console.print_stderr(f"{sigil} Nothing published.")
×
420

421
    # We collect all results to the end, so all output from the interactive processes are done,
422
    # before printing the results.
423
    for line in sorted(results):
×
424
        console.print_stderr(line)
×
425

426
    # Log structured output
427
    output_data = json.dumps(outputs, cls=_PublishJsonEncoder, indent=2, sort_keys=True)
×
428
    logger.debug(f"Publish result data:\n{output_data}")
×
429
    if publish.output:
×
430
        with open(publish.output, mode="w") as fd:
×
431
            fd.write(output_data)
×
432

433
    return Publish(exit_code)
×
434

435

UNCOV
436
class _PublishJsonEncoder(json.JSONEncoder):
×
UNCOV
437
    safe_to_str_types = (Address,)
×
438

UNCOV
439
    def default(self, o):
×
440
        """Return a serializable object for o."""
441
        if is_dataclass(o):
×
442
            return asdict(o)
×
443
        if isinstance(o, collections.abc.Mapping):
×
444
            return dict(o)
×
445
        if isinstance(o, collections.abc.Sequence):
×
446
            return list(o)
×
447
        try:
×
448
            return super().default(o)
×
449
        except TypeError:
×
450
            return str(o)
×
451

452

UNCOV
453
def rules():
×
UNCOV
454
    return collect_rules()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc