• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26342152999

23 May 2026 07:59PM UTC coverage: 91.165% (-1.6%) from 92.792%
26342152999

push

github

web-flow
Run Linux ARM CI on Depot runners (#23363)

RunsOn is deprecating their v2 stack, and rather than migrate
to v3 we should use the resources kindly donated by Depot.

GitHub also now has Linux ARM runners, should we need them.

87305 of 95766 relevant lines covered (91.16%)

3.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.33
/src/python/pants/core/goals/publish.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
"""Goal for publishing packaged targets to any repository or registry etc.
4

5
Plugins implement the publish protocol that provides this goal with the processes to run in order to
6
publish the artifacts.
7

8
The publish protocol consists of defining two union members and one rule, returning the processes to
9
run. See the doc for the corresponding classes in this module for details on the classes to define.
10

11
Example rule:
12

13
    @rule
14
    async def publish_example(request: PublishToMyRepoRequest, ...) -> PublishProcesses:
15
      # Create `InteractiveProcess` instances or `Process` instances as required by the `request`.
16
      return PublishProcesses(...)
17
"""
18

19
from __future__ import annotations
11✔
20

21
import itertools
11✔
22
import json
11✔
23
import logging
11✔
24
from abc import ABCMeta
11✔
25
from collections.abc import Coroutine, Iterable, Mapping, Sequence
11✔
26
from dataclasses import asdict, dataclass, field, is_dataclass, replace
11✔
27
from enum import Enum
11✔
28
from itertools import chain
11✔
29
from typing import Any, ClassVar, Generic, Literal, Self, TypeVar, cast, final, overload
11✔
30

31
from pants.core.goals.package import (
11✔
32
    BuiltPackage,
33
    EnvironmentAwarePackageRequest,
34
    PackageFieldSet,
35
    environment_aware_package,
36
)
37
from pants.engine.addresses import Address
11✔
38
from pants.engine.collection import Collection
11✔
39
from pants.engine.console import Console
11✔
40
from pants.engine.environment import ChosenLocalEnvironmentName, EnvironmentName
11✔
41
from pants.engine.goal import Goal, GoalSubsystem
11✔
42
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
11✔
43
from pants.engine.intrinsics import execute_process, run_interactive_process_in_environment
11✔
44
from pants.engine.process import (
11✔
45
    FallibleProcessResult,
46
    InteractiveProcess,
47
    InteractiveProcessResult,
48
    Process,
49
    ProcessCacheScope,
50
)
51
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
11✔
52
from pants.engine.target import (
11✔
53
    FieldSet,
54
    ImmutableValue,
55
    NoApplicableTargetsBehavior,
56
    TargetRootsToFieldSets,
57
    TargetRootsToFieldSetsRequest,
58
)
59
from pants.engine.unions import UnionMembership, UnionRule, union
11✔
60
from pants.option.option_types import EnumOption, StrOption
11✔
61
from pants.util.frozendict import FrozenDict
11✔
62
from pants.util.strutil import softwrap
11✔
63

64
logger = logging.getLogger(__name__)
11✔
65

66

67
_F = TypeVar("_F", bound=FieldSet)
11✔
68

69

70
class PublishOutputData(FrozenDict[str, ImmutableValue]):
11✔
71
    pass
11✔
72

73

74
@union(in_scope_types=[EnvironmentName])
11✔
75
@dataclass(frozen=True)
11✔
76
class PublishRequest(Generic[_F]):
11✔
77
    """Implement a union member subclass of this union class along with a PublishFieldSet subclass
78
    that appoints that member subclass in order to receive publish requests for targets compatible
79
    with the field set.
80

81
    The `packages` hold all artifacts produced for a given target to be published.
82

83
    Example:
84

85
        PublishToMyRepoRequest(PublishRequest):
86
          pass
87

88
        PublishToMyRepoFieldSet(PublishFieldSet):
89
          publish_request_type = PublishToMyRepoRequest
90

91
          # Standard FieldSet semantics from here on:
92
          required_fields = (MyRepositories,)
93
          ...
94
    """
95

96
    field_set: _F
11✔
97
    packages: tuple[BuiltPackage, ...]
11✔
98

99

100
@union(in_scope_types=[EnvironmentName])
11✔
101
@dataclass(frozen=True)
11✔
102
class CheckSkipRequest(Generic[_F]):
11✔
103
    package_fs: PackageFieldSet
11✔
104
    publish_fs: _F
11✔
105

106
    @property
11✔
107
    def address(self) -> Address:
11✔
108
        return self.publish_fs.address
1✔
109

110

111
_T = TypeVar("_T", bound=PublishRequest)
11✔
112

113

114
@union(in_scope_types=[EnvironmentName])
11✔
115
@dataclass(frozen=True)
11✔
116
class PublishFieldSet(Generic[_T], FieldSet, metaclass=ABCMeta):
11✔
117
    """FieldSet for PublishRequest.
118

119
    Union members may list any fields required to fulfill the instantiation of the
120
    `PublishProcesses` result of the publish rule.
121
    """
122

123
    # Subclasses must provide this, to a union member (subclass) of `PublishRequest`.
124
    publish_request_type: ClassVar[type[_T]]
11✔
125

126
    @final
11✔
127
    def _request(self, packages: tuple[BuiltPackage, ...]) -> _T:
11✔
128
        """Internal helper for the core publish goal."""
129
        return self.publish_request_type(field_set=self, packages=packages)
2✔
130

131
    def check_skip_request(self, package_fs: PackageFieldSet) -> CheckSkipRequest[Self] | None:
11✔
132
        """Subclasses can override this method if they want to preempt packaging for publish
133
        requests that are just going to be skipped."""
134
        return None
×
135

136
    @final
11✔
137
    @classmethod
11✔
138
    def rules(cls) -> tuple[UnionRule, ...]:
11✔
139
        """Helper method for registering the union members."""
140
        return (
6✔
141
            UnionRule(PublishFieldSet, cls),
142
            UnionRule(PublishRequest, cls.publish_request_type),
143
        )
144

145
    def get_output_data(self) -> PublishOutputData:
11✔
146
        return PublishOutputData({"target": self.address})
2✔
147

148

149
# This is the same as the Enum in the test goal.  It is initially separate as
150
# DRYing out is easier than undoing pre-mature abstraction.
151
class ShowOutput(Enum):
11✔
152
    """Which publish actions to emit detailed output for."""
153

154
    ALL = "all"
11✔
155
    FAILED = "failed"
11✔
156
    NONE = "none"
11✔
157

158

159
@dataclass(frozen=True)
11✔
160
class PublishPackages:
11✔
161
    """Processes to run in order to publish the named artifacts.
162

163
    The `names` should list all artifacts being published by the `process` command.
164

165
    The `process` may be `None`, indicating that it will not be published. This will be logged as
166
    `skipped`. If the process returns a non-zero exit code, it will be logged as `failed`. The `process`
167
    can either be a Process or an InteractiveProcess. In most cases, InteractiveProcess will be wanted.
168
    However, some tools have non-interactive publishing modes and can leverage parallelism. See
169
    https://github.com/pantsbuild/pants/issues/17613#issuecomment-1323913381 for more context.
170

171
    The `description` may be a reason explaining why the publish was skipped, or identifying which
172
    repository the artifacts are published to.
173
    """
174

175
    names: tuple[str, ...]
11✔
176
    process: InteractiveProcess | Process | None = None
11✔
177
    description: str | None = None
11✔
178
    data: PublishOutputData = field(default_factory=PublishOutputData)
11✔
179

180
    def get_output_data(self, **extra_data) -> PublishOutputData:
11✔
181
        return PublishOutputData(
×
182
            {
183
                "names": self.names,
184
                **self.data,
185
                **extra_data,
186
            }
187
        )
188

189

190
@dataclass(frozen=True)
11✔
191
class CheckSkipResult:
11✔
192
    """PublishPackages that were pre-emptively skipped.
193

194
    If `skipped_packages` is empty, this indicates that this request should NOT be skipped.
195
    """
196

197
    skipped_packages: tuple[PublishPackages, ...]
11✔
198
    _skip_packaging_only: bool
11✔
199

200
    def __init__(self, inner: Iterable[PublishPackages], skip_packaging_only: bool = False) -> None:
11✔
201
        object.__setattr__(self, "skipped_packages", tuple(inner))
2✔
202
        object.__setattr__(self, "_skip_packaging_only", skip_packaging_only)
2✔
203

204
    def __post_init__(self):
11✔
205
        if any(pp.process is not None for pp in self.skipped_packages):
×
206
            raise ValueError("CheckSkipResult must not have any non-None processes")
×
207

208
    @property
11✔
209
    def skip_publish(self) -> bool:
11✔
210
        return bool(self.skipped_packages)
×
211

212
    @property
11✔
213
    def skip_package(self) -> bool:
11✔
214
        return self.skip_publish or self._skip_packaging_only
×
215

216
    @overload
217
    @classmethod
218
    def skip(cls, *, skip_packaging_only: Literal[True]) -> Self: ...
219

220
    @overload
221
    @classmethod
222
    def skip(
223
        cls,
224
        *,
225
        names: Iterable[str],
226
        description: str | None = None,
227
        data: Mapping[str, Any] | None = None,
228
    ) -> Self: ...
229

230
    @classmethod
11✔
231
    def skip(
11✔
232
        cls,
233
        *,
234
        skip_packaging_only: bool = False,
235
        names: Iterable[str] = (),
236
        description: str | None = None,
237
        data: Mapping[str, Any] | None = None,
238
    ) -> Self:
239
        args = (
2✔
240
            ((), True)
241
            if skip_packaging_only
242
            else (
243
                [
244
                    PublishPackages(
245
                        names=tuple(names),
246
                        description=description,
247
                        data=PublishOutputData.deep_freeze(data) if data else PublishOutputData(),
248
                    )
249
                ],
250
                False,
251
            )
252
        )
253
        return cls(*args)
2✔
254

255
    @classmethod
11✔
256
    def no_skip(cls) -> Self:
11✔
257
        return cls((), False)
2✔
258

259

260
class PublishProcesses(Collection[PublishPackages]):
11✔
261
    """Collection of what processes to run for all built packages.
262

263
    This is returned from implementing rules in response to a PublishRequest.
264

265
    Depending on the capabilities of the publishing tool, the work may be partitioned based on
266
    number of artifacts and/or repositories to publish to.
267
    """
268

269

270
@rule(polymorphic=True)
11✔
271
async def preemptive_skip_publish_packages(
11✔
272
    request: CheckSkipRequest, environment_name: EnvironmentName
273
) -> CheckSkipResult:
274
    raise NotImplementedError()
×
275

276

277
@rule(polymorphic=True)
11✔
278
async def create_publish_processes(
11✔
279
    req: PublishRequest,
280
    environment_name: EnvironmentName,
281
) -> PublishProcesses:
282
    raise NotImplementedError()
×
283

284

285
@dataclass(frozen=True)
11✔
286
class PublishProcessesRequest:
11✔
287
    """Internal request taking all field sets for a target and turning it into a `PublishProcesses`
288
    collection (via registered publish plugins)."""
289

290
    package_field_sets: tuple[PackageFieldSet, ...]
11✔
291
    publish_field_sets: tuple[PublishFieldSet, ...]
11✔
292

293

294
class PublishSubsystem(GoalSubsystem):
11✔
295
    name = "publish"
11✔
296
    help = "Publish deliverables (assets, distributions, images, etc)."
11✔
297

298
    @classmethod
11✔
299
    def activated(cls, union_membership: UnionMembership) -> bool:
11✔
300
        return PackageFieldSet in union_membership and PublishFieldSet in union_membership
×
301

302
    output = StrOption(
11✔
303
        default=None,
304
        help="Filename for JSON structured publish information.",
305
    )
306

307
    noninteractive_process_output = EnumOption(
11✔
308
        default=ShowOutput.ALL,
309
        help=softwrap(
310
            """
311
            Show stdout/stderr when publishing with
312
            noninteractively.  This only has an effect for those
313
            publish subsystems that support a noninteractive mode.
314
            """
315
        ),
316
    )
317

318

319
class Publish(Goal):
11✔
320
    subsystem_cls = PublishSubsystem
11✔
321
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
11✔
322

323

324
def _to_publish_output_results_and_data(
11✔
325
    pub: PublishPackages, res: FallibleProcessResult | InteractiveProcessResult, console: Console
326
) -> tuple[list[str], list[PublishOutputData]]:
327
    if res.exit_code == 0:
×
328
        sigil = console.sigil_succeeded()
×
329
        status = "published"
×
330
        prep = "to"
×
331
    else:
332
        sigil = console.sigil_failed()
×
333
        status = "failed"
×
334
        prep = "for"
×
335

336
    if pub.description:
×
337
        status += f" {prep} {pub.description}"
×
338

339
    results = []
×
340
    output_data = []
×
341
    for name in pub.names:
×
342
        results.append(f"{sigil} {name} {status}.")
×
343

344
    output_data.append(
×
345
        pub.get_output_data(
346
            exit_code=res.exit_code,
347
            published=res.exit_code == 0,
348
            status=status,
349
        )
350
    )
351
    return results, output_data
×
352

353

354
@rule
11✔
355
async def package_for_publish(
11✔
356
    request: PublishProcessesRequest, local_environment: ChosenLocalEnvironmentName
357
) -> PublishProcesses:
358
    packages = await concurrently(
×
359
        environment_aware_package(EnvironmentAwarePackageRequest(package_fs))
360
        for package_fs in request.package_field_sets
361
    )
362

363
    for pkg in packages:
×
364
        for artifact in pkg.artifacts:
×
365
            if artifact.relpath:
×
366
                logger.info(f"Packaged {artifact.relpath}")
×
367
            elif artifact.extra_log_lines:
×
368
                logger.info(str(artifact.extra_log_lines[0]))
×
369

370
    publish = await concurrently(
×
371
        create_publish_processes(
372
            **implicitly(
373
                {
374
                    field_set._request(packages): PublishRequest,
375
                    local_environment.val: EnvironmentName,
376
                }
377
            )
378
        )
379
        for field_set in request.publish_field_sets
380
    )
381

382
    # Flatten and dress each publish processes collection with data about its origin.
383
    publish_processes = [
×
384
        replace(
385
            publish_process,
386
            data=PublishOutputData({**publish_process.data, **field_set.get_output_data()}),
387
        )
388
        for processes, field_set in zip(publish, request.publish_field_sets)
389
        for publish_process in processes
390
    ]
391

392
    return PublishProcesses(publish_processes)
×
393

394

395
@goal_rule
11✔
396
async def run_publish(
11✔
397
    console: Console,
398
    publish: PublishSubsystem,
399
    local_environment: ChosenLocalEnvironmentName,
400
) -> Publish:
401
    target_roots_to_publish_field_sets: TargetRootsToFieldSets[PublishFieldSet]
402
    target_roots_to_package_field_sets, target_roots_to_publish_field_sets = await concurrently(
×
403
        find_valid_field_sets_for_target_roots(
404
            TargetRootsToFieldSetsRequest(
405
                PackageFieldSet,
406
                goal_description="",
407
                # Don't warn/error here because it's already covered by `PublishFieldSet`.
408
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.ignore,
409
            ),
410
            **implicitly(),
411
        ),
412
        find_valid_field_sets_for_target_roots(
413
            TargetRootsToFieldSetsRequest(
414
                PublishFieldSet,
415
                goal_description="the `publish` goal",
416
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.warn,
417
            ),
418
            **implicitly(),
419
        ),
420
    )
421

422
    # Only keep field sets that both package something, and have something to publish.
423
    targets = set(target_roots_to_package_field_sets.targets).intersection(
×
424
        set(target_roots_to_publish_field_sets.targets)
425
    )
426

427
    if not targets:
×
428
        return Publish(exit_code=0)
×
429

430
    skip_check_requests = [
×
431
        skip_request
432
        for tgt in targets
433
        for package_fs in target_roots_to_package_field_sets.mapping[tgt]
434
        for publish_fs in target_roots_to_publish_field_sets.mapping[tgt]
435
        if (skip_request := publish_fs.check_skip_request(package_fs))
436
    ]
437
    skip_check_results = await concurrently(
×
438
        preemptive_skip_publish_packages(
439
            **implicitly({skip_request: CheckSkipRequest, local_environment.val: EnvironmentName})
440
        )
441
        for skip_request in skip_check_requests
442
    )
443
    # In `package_skips`, True represents skip, False represents a definitive non-skip, and not present means we don't know yet.
444
    package_skips: dict[PackageFieldSet, bool] = {}
×
445
    # In `publish_skips`, the value is a list of PublishPackages means skip, None is a non-skip, and not present means we don't know yet.
446
    publish_skips: dict[PublishFieldSet, list[PublishPackages] | None] = {}
×
447
    for skip_request, maybe_skip in zip(skip_check_requests, skip_check_results):
×
448
        skip_package = maybe_skip.skip_package
×
449
        package_skip_seen = skip_request.package_fs in package_skips
×
450
        # If skip_package is False, set to False, otherwise set only if this package_fs has not been seen yet.
451
        if (package_skip_seen and not skip_package) or not package_skip_seen:
×
452
            package_skips[skip_request.package_fs] = skip_package
×
453
        if maybe_skip.skip_publish:
×
454
            try:
×
455
                skip_publish_packages = publish_skips[skip_request.publish_fs]
×
456
            except KeyError:
×
457
                publish_skips[skip_request.publish_fs] = list(maybe_skip.skipped_packages)
×
458
            else:
459
                if skip_publish_packages is not None:
×
460
                    skip_publish_packages.extend(maybe_skip.skipped_packages)
×
461
        else:
462
            publish_skips[skip_request.publish_fs] = None
×
463

464
    skipped_publishes: list[PublishPackages] = list(
×
465
        itertools.chain.from_iterable(pubskip for pubskip in publish_skips.values() if pubskip)
466
    )
467
    # Build all packages and request the processes to run for each field set.
468
    processes = await concurrently(
×
469
        package_for_publish(
470
            PublishProcessesRequest(
471
                tuple(
472
                    pfs
473
                    for pfs in target_roots_to_package_field_sets.mapping[tgt]
474
                    if not package_skips.get(pfs, False)
475
                ),
476
                tuple(
477
                    pfs
478
                    for pfs in target_roots_to_publish_field_sets.mapping[tgt]
479
                    if not publish_skips.get(pfs)
480
                ),
481
            ),
482
            **implicitly(),
483
        )
484
        for tgt in targets
485
    )
486

487
    exit_code: int = 0
×
488
    outputs: list[PublishOutputData] = []
×
489
    results: list[str] = []
×
490

491
    flattened_processes = list(chain.from_iterable(processes))
×
492
    background_publishes: list[PublishPackages] = [
×
493
        pub for pub in flattened_processes if isinstance(pub.process, Process)
494
    ]
495
    foreground_publishes: list[PublishPackages] = [
×
496
        pub for pub in flattened_processes if isinstance(pub.process, InteractiveProcess)
497
    ]
498
    skipped_publishes.extend(pub for pub in flattened_processes if pub.process is None)
×
499
    background_requests: list[Coroutine[Any, Any, FallibleProcessResult]] = []
×
500
    for pub in background_publishes:
×
501
        process = cast(Process, pub.process)
×
502
        # Because this is a publish process, we want to ensure we don't cache this process.
503
        assert process.cache_scope == ProcessCacheScope.PER_SESSION
×
504
        background_requests.append(
×
505
            execute_process(
506
                **implicitly({process: Process, local_environment.val: EnvironmentName})
507
            )
508
        )
509

510
    # Process all non-interactive publishes
511
    logger.debug(f"Awaiting {len(background_requests)} background publishes")
×
512
    background_results = await concurrently(background_requests)
×
513
    for pub, background_res in zip(background_publishes, background_results):
×
514
        logger.debug(f"Processing {pub.process} background process")
×
515
        pub_results, pub_output = _to_publish_output_results_and_data(pub, background_res, console)
×
516
        results.extend(pub_results)
×
517
        outputs.extend(pub_output)
×
518

519
        names = "'" + "', '".join(pub.names) + "'"
×
520
        output_msg = f"Output for publishing {names}"
×
521
        if background_res.stdout:
×
522
            output_msg += f"\n{background_res.stdout.decode()}"
×
523
        if background_res.stderr:
×
524
            output_msg += f"\n{background_res.stderr.decode()}"
×
525

526
        if publish.noninteractive_process_output == ShowOutput.ALL or (
×
527
            publish.noninteractive_process_output == ShowOutput.FAILED
528
            and background_res.exit_code != 0
529
        ):
530
            console.print_stdout(output_msg)
×
531

532
        if background_res.exit_code != 0:
×
533
            exit_code = background_res.exit_code
×
534

535
    for pub in skipped_publishes:
×
536
        sigil = console.sigil_skipped()
×
537
        status = "skipped"
×
538
        if pub.description:
×
539
            status += f" {pub.description}"
×
540
        for name in pub.names:
×
541
            results.append(f"{sigil} {name} {status}.")
×
542
        outputs.append(pub.get_output_data(published=False, status=status))
×
543

544
    # Process all interactive publishes
545
    for pub in foreground_publishes:
×
546
        logger.debug(f"Execute {pub.process}")
×
547
        res = await run_interactive_process_in_environment(
×
548
            cast(InteractiveProcess, pub.process), local_environment.val
549
        )
550
        pub_results, pub_output = _to_publish_output_results_and_data(pub, res, console)
×
551
        results.extend(pub_results)
×
552
        outputs.extend(pub_output)
×
553
        if res.exit_code != 0:
×
554
            exit_code = res.exit_code
×
555

556
    console.print_stderr("")
×
557
    if not results:
×
558
        sigil = console.sigil_skipped()
×
559
        console.print_stderr(f"{sigil} Nothing published.")
×
560

561
    # We collect all results to the end, so all output from the interactive processes are done,
562
    # before printing the results.
563
    for line in sorted(results):
×
564
        console.print_stderr(line)
×
565

566
    # Log structured output
567
    output_data = json.dumps(outputs, cls=_PublishJsonEncoder, indent=2, sort_keys=True)
×
568
    logger.debug(f"Publish result data:\n{output_data}")
×
569
    if publish.output:
×
570
        with open(publish.output, mode="w") as fd:
×
571
            fd.write(output_data)
×
572

573
    return Publish(exit_code)
×
574

575

576
class _PublishJsonEncoder(json.JSONEncoder):
11✔
577
    safe_to_str_types = (Address,)
11✔
578

579
    def default(self, o):
11✔
580
        """Return a serializable object for o."""
581
        if is_dataclass(o):
×
582
            return asdict(o)
×
583
        if isinstance(o, Mapping):
×
584
            return dict(o)
×
585
        if isinstance(o, Sequence):
×
586
            return list(o)
×
587
        try:
×
588
            return super().default(o)
×
589
        except TypeError:
×
590
            return str(o)
×
591

592

593
def rules():
11✔
594
    return collect_rules()
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc