• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 21494992949

29 Jan 2026 09:15PM UTC coverage: 80.245%. First build
21494992949

Pull #23053

github

web-flow
Merge 16a8312ae into 78e2689de
Pull Request #23053: Skip Preemptive Python

71 of 124 new or added lines in 5 files covered. (57.26%)

78854 of 98267 relevant lines covered (80.24%)

3.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.42
/src/python/pants/core/goals/publish.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
"""Goal for publishing packaged targets to any repository or registry etc.
4

5
Plugins implement the publish protocol that provides this goal with the processes to run in order to
6
publish the artifacts.
7

8
The publish protocol consists of defining two union members and one rule, returning the processes to
9
run. See the doc for the corresponding classes in this module for details on the classes to define.
10

11
Example rule:
12

13
    @rule
14
    async def publish_example(request: PublishToMyRepoRequest, ...) -> PublishProcesses:
15
      # Create `InteractiveProcess` instances or `Process` instances as required by the `request`.
16
      return PublishProcesses(...)
17
"""
18

19
from __future__ import annotations
11✔
20

21
import itertools
11✔
22
import json
11✔
23
import logging
11✔
24
from abc import ABCMeta
11✔
25
from collections.abc import Coroutine, Iterable, Mapping, Sequence
11✔
26
from dataclasses import asdict, dataclass, field, is_dataclass, replace
11✔
27
from enum import Enum
11✔
28
from itertools import chain
11✔
29
from typing import Any, ClassVar, Generic, Literal, Self, TypeVar, cast, final, overload
11✔
30

31
from pants.core.goals.package import (
11✔
32
    BuiltPackage,
33
    EnvironmentAwarePackageRequest,
34
    PackageFieldSet,
35
    environment_aware_package,
36
)
37
from pants.engine.addresses import Address
11✔
38
from pants.engine.collection import Collection
11✔
39
from pants.engine.console import Console
11✔
40
from pants.engine.environment import ChosenLocalEnvironmentName, EnvironmentName
11✔
41
from pants.engine.goal import Goal, GoalSubsystem
11✔
42
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
11✔
43
from pants.engine.intrinsics import execute_process, run_interactive_process_in_environment
11✔
44
from pants.engine.process import (
11✔
45
    FallibleProcessResult,
46
    InteractiveProcess,
47
    InteractiveProcessResult,
48
    Process,
49
    ProcessCacheScope,
50
)
51
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
11✔
52
from pants.engine.target import (
11✔
53
    FieldSet,
54
    ImmutableValue,
55
    NoApplicableTargetsBehavior,
56
    TargetRootsToFieldSets,
57
    TargetRootsToFieldSetsRequest,
58
)
59
from pants.engine.unions import UnionMembership, UnionRule, union
11✔
60
from pants.option.option_types import EnumOption, StrOption
11✔
61
from pants.util.frozendict import FrozenDict
11✔
62
from pants.util.strutil import softwrap
11✔
63

64
logger = logging.getLogger(__name__)
11✔
65

66

67
_F = TypeVar("_F", bound=FieldSet)
11✔
68

69

70
class PublishOutputData(FrozenDict[str, ImmutableValue]):
11✔
71
    pass
11✔
72

73

74
@union(in_scope_types=[EnvironmentName])
11✔
75
@dataclass(frozen=True)
11✔
76
class PublishRequest(Generic[_F]):
11✔
77
    """Implement a union member subclass of this union class along with a PublishFieldSet subclass
78
    that appoints that member subclass in order to receive publish requests for targets compatible
79
    with the field set.
80

81
    The `packages` hold all artifacts produced for a given target to be published.
82

83
    Example:
84

85
        PublishToMyRepoRequest(PublishRequest):
86
          pass
87

88
        PublishToMyRepoFieldSet(PublishFieldSet):
89
          publish_request_type = PublishToMyRepoRequest
90

91
          # Standard FieldSet semantics from here on:
92
          required_fields = (MyRepositories,)
93
          ...
94
    """
95

96
    field_set: _F
11✔
97
    packages: tuple[BuiltPackage, ...]
11✔
98

99

100
@union(in_scope_types=[EnvironmentName])
11✔
101
@dataclass(frozen=True)
11✔
102
class CheckSkipRequest(Generic[_F]):
11✔
103
    package_fs: PackageFieldSet
11✔
104
    publish_fs: _F
11✔
105

106
    @property
11✔
107
    def address(self) -> Address:
11✔
NEW
108
        return self.publish_fs.address
×
109

110

111
_T = TypeVar("_T", bound=PublishRequest)
11✔
112

113

114
@union(in_scope_types=[EnvironmentName])
11✔
115
@dataclass(frozen=True)
11✔
116
class PublishFieldSet(Generic[_T], FieldSet, metaclass=ABCMeta):
11✔
117
    """FieldSet for PublishRequest.
118

119
    Union members may list any fields required to fulfill the instantiation of the
120
    `PublishProcesses` result of the publish rule.
121
    """
122

123
    # Subclasses must provide this, to a union member (subclass) of `PublishRequest`.
124
    publish_request_type: ClassVar[type[_T]]
11✔
125

126
    @final
11✔
127
    def _request(self, packages: tuple[BuiltPackage, ...]) -> _T:
11✔
128
        """Internal helper for the core publish goal."""
129
        return self.publish_request_type(field_set=self, packages=packages)
3✔
130

131
    def check_skip_request(self, package_fs: PackageFieldSet) -> CheckSkipRequest[Self] | None:
11✔
132
        """Subclasses can override this method if they want to preempt packaging for publish
133
        requests that are just going to be skipped."""
NEW
134
        return None
×
135

136
    @final
11✔
137
    @classmethod
11✔
138
    def rules(cls) -> tuple[UnionRule, ...]:
11✔
139
        """Helper method for registering the union members."""
140
        return (
6✔
141
            UnionRule(PublishFieldSet, cls),
142
            UnionRule(PublishRequest, cls.publish_request_type),
143
        )
144

145
    def get_output_data(self) -> PublishOutputData:
11✔
146
        return PublishOutputData({"target": self.address})
×
147

148

149
# This is the same as the Enum in the test goal.  It is initially separate as
150
# DRYing out is easier than undoing pre-mature abstraction.
151
class ShowOutput(Enum):
11✔
152
    """Which publish actions to emit detailed output for."""
153

154
    ALL = "all"
11✔
155
    FAILED = "failed"
11✔
156
    NONE = "none"
11✔
157

158

159
@dataclass(frozen=True)
11✔
160
class PublishPackages:
11✔
161
    """Processes to run in order to publish the named artifacts.
162

163
    The `names` should list all artifacts being published by the `process` command.
164

165
    The `process` may be `None`, indicating that it will not be published. This will be logged as
166
    `skipped`. If the process returns a non-zero exit code, it will be logged as `failed`. The `process`
167
    can either be a Process or an InteractiveProcess. In most cases, InteractiveProcess will be wanted.
168
    However, some tools have non-interactive publishing modes and can leverage parallelism. See
169
    https://github.com/pantsbuild/pants/issues/17613#issuecomment-1323913381 for more context.
170

171
    The `description` may be a reason explaining why the publish was skipped, or identifying which
172
    repository the artifacts are published to.
173
    """
174

175
    names: tuple[str, ...]
11✔
176
    process: InteractiveProcess | Process | None = None
11✔
177
    description: str | None = None
11✔
178
    data: PublishOutputData = field(default_factory=PublishOutputData)
11✔
179

180
    def get_output_data(self, **extra_data) -> PublishOutputData:
11✔
181
        return PublishOutputData(
×
182
            {
183
                "names": self.names,
184
                **self.data,
185
                **extra_data,
186
            }
187
        )
188

189

190
@dataclass(frozen=True)
11✔
191
class CheckSkipResult:
11✔
192
    """PublishPackages that were pre-emptively skipped.
193

194
    If `inner` is None, this indicates that this request should NOT be skipped.
195
    """
196

197
    inner: tuple[PublishPackages, ...]
11✔
198
    _skip_packaging_only: bool
11✔
199

200
    def __init__(self, inner: Iterable[PublishPackages], skip_packaging_only: bool = False) -> None:
11✔
NEW
201
        object.__setattr__(self, "inner", tuple(inner))
×
NEW
202
        object.__setattr__(self, "_skip_packaging_only", skip_packaging_only)
×
203

204
    def __post_init__(self):
11✔
NEW
205
        if any(pp.process is not None for pp in self.inner):
×
NEW
206
            raise ValueError("CheckSkipResult must not have any non-None processes")
×
207

208
    @property
11✔
209
    def skip_publish(self) -> bool:
11✔
NEW
210
        return bool(self.inner)
×
211

212
    @property
11✔
213
    def skip_package(self) -> bool:
11✔
NEW
214
        return self.skip_publish or self._skip_packaging_only
×
215

216
    @overload
217
    @classmethod
218
    def skip(cls, *, skip_packaging_only: Literal[True]) -> Self: ...
219

220
    @overload
221
    @classmethod
222
    def skip(
223
        cls,
224
        *,
225
        names: Iterable[str],
226
        description: str | None = None,
227
        data: Mapping[str, Any] | None = None,
228
    ) -> Self: ...
229

230
    @classmethod
11✔
231
    def skip(
11✔
232
        cls,
233
        *,
234
        skip_packaging_only: bool = False,
235
        names: Iterable[str] = (),
236
        description: str | None = None,
237
        data: Mapping[str, Any] | None = None,
238
    ) -> Self:
NEW
239
        args = (
×
240
            ((), True)
241
            if skip_packaging_only
242
            else (
243
                [
244
                    PublishPackages(
245
                        names=tuple(names),
246
                        description=description,
247
                        data=PublishOutputData.deep_freeze(data) if data else PublishOutputData(),
248
                    )
249
                ],
250
                False,
251
            )
252
        )
NEW
253
        return cls(*args)
×
254

255
    @classmethod
11✔
256
    def no_skip(cls) -> Self:
11✔
NEW
257
        return cls((), False)
×
258

259

260
class PublishProcesses(Collection[PublishPackages]):
11✔
261
    """Collection of what processes to run for all built packages.
262

263
    This is returned from implementing rules in response to a PublishRequest.
264

265
    Depending on the capabilities of the publishing tool, the work may be partitioned based on
266
    number of artifacts and/or repositories to publish to.
267
    """
268

269

270
@rule(polymorphic=True)
11✔
271
async def preemptive_skip_publish_packages(
11✔
272
    request: CheckSkipRequest, environment_name: EnvironmentName
273
) -> CheckSkipResult:
NEW
274
    raise NotImplementedError()
×
275

276

277
@rule(polymorphic=True)
11✔
278
async def create_publish_processes(
11✔
279
    req: PublishRequest,
280
    environment_name: EnvironmentName,
281
) -> PublishProcesses:
282
    raise NotImplementedError()
×
283

284

285
@dataclass(frozen=True)
11✔
286
class PublishProcessesRequest:
11✔
287
    """Internal request taking all field sets for a target and turning it into a `PublishProcesses`
288
    collection (via registered publish plugins)."""
289

290
    package_field_sets: tuple[PackageFieldSet, ...]
11✔
291
    publish_field_sets: tuple[PublishFieldSet, ...]
11✔
292

293

294
class PublishSubsystem(GoalSubsystem):
11✔
295
    name = "publish"
11✔
296
    help = "Publish deliverables (assets, distributions, images, etc)."
11✔
297

298
    @classmethod
11✔
299
    def activated(cls, union_membership: UnionMembership) -> bool:
11✔
300
        return PackageFieldSet in union_membership and PublishFieldSet in union_membership
×
301

302
    output = StrOption(
11✔
303
        default=None,
304
        help="Filename for JSON structured publish information.",
305
    )
306

307
    noninteractive_process_output = EnumOption(
11✔
308
        default=ShowOutput.ALL,
309
        help=softwrap(
310
            """
311
            Show stdout/stderr when publishing with
312
            noninteractively.  This only has an effect for those
313
            publish subsystems that support a noninteractive mode.
314
            """
315
        ),
316
    )
317

318

319
class Publish(Goal):
11✔
320
    subsystem_cls = PublishSubsystem
11✔
321
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
11✔
322

323

324
def _to_publish_output_results_and_data(
11✔
325
    pub: PublishPackages, res: FallibleProcessResult | InteractiveProcessResult, console: Console
326
) -> tuple[list[str], list[PublishOutputData]]:
327
    if res.exit_code == 0:
×
328
        sigil = console.sigil_succeeded()
×
329
        status = "published"
×
330
        prep = "to"
×
331
    else:
332
        sigil = console.sigil_failed()
×
333
        status = "failed"
×
334
        prep = "for"
×
335

336
    if pub.description:
×
337
        status += f" {prep} {pub.description}"
×
338

339
    results = []
×
340
    output_data = []
×
341
    for name in pub.names:
×
342
        results.append(f"{sigil} {name} {status}.")
×
343

344
    output_data.append(
×
345
        pub.get_output_data(
346
            exit_code=res.exit_code,
347
            published=res.exit_code == 0,
348
            status=status,
349
        )
350
    )
351
    return results, output_data
×
352

353

354
@rule
11✔
355
async def package_for_publish(
11✔
356
    request: PublishProcessesRequest, local_environment: ChosenLocalEnvironmentName
357
) -> PublishProcesses:
358
    packages = await concurrently(
×
359
        environment_aware_package(EnvironmentAwarePackageRequest(package_fs))
360
        for package_fs in request.package_field_sets
361
    )
362

363
    for pkg in packages:
×
364
        for artifact in pkg.artifacts:
×
365
            if artifact.relpath:
×
366
                logger.info(f"Packaged {artifact.relpath}")
×
367
            elif artifact.extra_log_lines:
×
368
                logger.info(str(artifact.extra_log_lines[0]))
×
369

370
    publish = await concurrently(
×
371
        create_publish_processes(
372
            **implicitly(
373
                {
374
                    field_set._request(packages): PublishRequest,
375
                    local_environment.val: EnvironmentName,
376
                }
377
            )
378
        )
379
        for field_set in request.publish_field_sets
380
    )
381

382
    # Flatten and dress each publish processes collection with data about its origin.
383
    publish_processes = [
×
384
        replace(
385
            publish_process,
386
            data=PublishOutputData({**publish_process.data, **field_set.get_output_data()}),
387
        )
388
        for processes, field_set in zip(publish, request.publish_field_sets)
389
        for publish_process in processes
390
    ]
391

392
    return PublishProcesses(publish_processes)
×
393

394

395
@goal_rule
11✔
396
async def run_publish(
11✔
397
    console: Console,
398
    publish: PublishSubsystem,
399
    local_environment: ChosenLocalEnvironmentName,
400
) -> Publish:
401
    target_roots_to_publish_field_sets: TargetRootsToFieldSets[PublishFieldSet]
402
    target_roots_to_package_field_sets, target_roots_to_publish_field_sets = await concurrently(
×
403
        find_valid_field_sets_for_target_roots(
404
            TargetRootsToFieldSetsRequest(
405
                PackageFieldSet,
406
                goal_description="",
407
                # Don't warn/error here because it's already covered by `PublishFieldSet`.
408
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.ignore,
409
            ),
410
            **implicitly(),
411
        ),
412
        find_valid_field_sets_for_target_roots(
413
            TargetRootsToFieldSetsRequest(
414
                PublishFieldSet,
415
                goal_description="the `publish` goal",
416
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.warn,
417
            ),
418
            **implicitly(),
419
        ),
420
    )
421

422
    # Only keep field sets that both package something, and have something to publish.
423
    targets = set(target_roots_to_package_field_sets.targets).intersection(
×
424
        set(target_roots_to_publish_field_sets.targets)
425
    )
426

427
    if not targets:
×
428
        return Publish(exit_code=0)
×
429

NEW
430
    skip_check_requests = [
×
431
        skip_request
432
        for tgt in targets
433
        for package_fs in target_roots_to_package_field_sets.mapping[tgt]
434
        for publish_fs in target_roots_to_publish_field_sets.mapping[tgt]
435
        if (skip_request := publish_fs.check_skip_request(package_fs))
436
    ]
NEW
437
    skip_check_results = await concurrently(
×
438
        preemptive_skip_publish_packages(
439
            **implicitly({skip_request: CheckSkipRequest, local_environment.val: EnvironmentName})
440
        )
441
        for skip_request in skip_check_requests
442
    )
443
    # In `package_skips`, True represents skip, False represents a definitive non-skip, and not present means we don't know yet.
NEW
444
    package_skips: dict[PackageFieldSet, bool] = {}
×
445
    # In `publish_skips`, the value is a list of PublishPackages means skip, None is a non-skip, and not present means we don't know yet.
NEW
446
    publish_skips: dict[PublishFieldSet, list[PublishPackages] | None] = {}
×
NEW
447
    for skip_request, maybe_skip in zip(skip_check_requests, skip_check_results):
×
NEW
448
        skip_package = maybe_skip.skip_package
×
NEW
449
        package_skip_seen = skip_request.package_fs in package_skips
×
450
        # If skip_package is False, set to False, otherwise set only if this package_fs has not been seen yet.
NEW
451
        if (package_skip_seen and not skip_package) or not package_skip_seen:
×
NEW
452
            package_skips[skip_request.package_fs] = skip_package
×
NEW
453
        if maybe_skip.skip_publish:
×
NEW
454
            try:
×
NEW
455
                skip_publish_packages = publish_skips[skip_request.publish_fs]
×
NEW
456
            except KeyError:
×
NEW
457
                publish_skips[skip_request.publish_fs] = list(maybe_skip.inner)
×
458
            else:
NEW
459
                if skip_publish_packages is not None:
×
NEW
460
                    skip_publish_packages.extend(maybe_skip.inner)
×
461
        else:
NEW
462
            publish_skips[skip_request.publish_fs] = None
×
463

NEW
464
    skipped_publishes: list[PublishPackages] = list(
×
465
        itertools.chain.from_iterable(pubskip for pubskip in publish_skips.values() if pubskip)
466
    )
467
    # Build all packages and request the processes to run for each field set.
468
    processes = await concurrently(
×
469
        package_for_publish(
470
            PublishProcessesRequest(
471
                tuple(
472
                    pfs
473
                    for pfs in target_roots_to_package_field_sets.mapping[tgt]
474
                    if not package_skips.get(pfs, False)
475
                ),
476
                tuple(
477
                    pfs
478
                    for pfs in target_roots_to_publish_field_sets.mapping[tgt]
479
                    if not publish_skips.get(pfs)
480
                ),
481
            ),
482
            **implicitly(),
483
        )
484
        for tgt in targets
485
    )
486

487
    exit_code: int = 0
×
488
    outputs: list[PublishOutputData] = []
×
489
    results: list[str] = []
×
490

491
    flattened_processes = list(chain.from_iterable(processes))
×
492
    background_publishes: list[PublishPackages] = [
×
493
        pub for pub in flattened_processes if isinstance(pub.process, Process)
494
    ]
495
    foreground_publishes: list[PublishPackages] = [
×
496
        pub for pub in flattened_processes if isinstance(pub.process, InteractiveProcess)
497
    ]
NEW
498
    skipped_publishes.extend(pub for pub in flattened_processes if pub.process is None)
×
499
    background_requests: list[Coroutine[Any, Any, FallibleProcessResult]] = []
×
500
    for pub in background_publishes:
×
501
        process = cast(Process, pub.process)
×
502
        # Because this is a publish process, we want to ensure we don't cache this process.
503
        assert process.cache_scope == ProcessCacheScope.PER_SESSION
×
504
        background_requests.append(
×
505
            execute_process(
506
                **implicitly({process: Process, local_environment.val: EnvironmentName})
507
            )
508
        )
509

510
    # Process all non-interactive publishes
511
    logger.debug(f"Awaiting {len(background_requests)} background publishes")
×
512
    background_results = await concurrently(background_requests)
×
513
    for pub, background_res in zip(background_publishes, background_results):
×
514
        logger.debug(f"Processing {pub.process} background process")
×
515
        pub_results, pub_output = _to_publish_output_results_and_data(pub, background_res, console)
×
516
        results.extend(pub_results)
×
517
        outputs.extend(pub_output)
×
518

519
        names = "'" + "', '".join(pub.names) + "'"
×
520
        output_msg = f"Output for publishing {names}"
×
521
        if background_res.stdout:
×
522
            output_msg += f"\n{background_res.stdout.decode()}"
×
523
        if background_res.stderr:
×
524
            output_msg += f"\n{background_res.stderr.decode()}"
×
525

526
        if publish.noninteractive_process_output == ShowOutput.ALL or (
×
527
            publish.noninteractive_process_output == ShowOutput.FAILED
528
            and background_res.exit_code == 0
529
        ):
530
            console.print_stdout(output_msg)
×
531

532
        if background_res.exit_code != 0:
×
533
            exit_code = background_res.exit_code
×
534

535
    for pub in skipped_publishes:
×
536
        sigil = console.sigil_skipped()
×
537
        status = "skipped"
×
538
        if pub.description:
×
539
            status += f" {pub.description}"
×
540
        for name in pub.names:
×
541
            results.append(f"{sigil} {name} {status}.")
×
542
        outputs.append(pub.get_output_data(published=False, status=status))
×
543

544
    # Process all interactive publishes
545
    for pub in foreground_publishes:
×
546
        logger.debug(f"Execute {pub.process}")
×
547
        res = await run_interactive_process_in_environment(
×
548
            cast(InteractiveProcess, pub.process), local_environment.val
549
        )
550
        pub_results, pub_output = _to_publish_output_results_and_data(pub, res, console)
×
551
        results.extend(pub_results)
×
552
        outputs.extend(pub_output)
×
553
        if res.exit_code != 0:
×
554
            exit_code = res.exit_code
×
555

556
    console.print_stderr("")
×
557
    if not results:
×
558
        sigil = console.sigil_skipped()
×
559
        console.print_stderr(f"{sigil} Nothing published.")
×
560

561
    # We collect all results to the end, so all output from the interactive processes are done,
562
    # before printing the results.
563
    for line in sorted(results):
×
564
        console.print_stderr(line)
×
565

566
    # Log structured output
567
    output_data = json.dumps(outputs, cls=_PublishJsonEncoder, indent=2, sort_keys=True)
×
568
    logger.debug(f"Publish result data:\n{output_data}")
×
569
    if publish.output:
×
570
        with open(publish.output, mode="w") as fd:
×
571
            fd.write(output_data)
×
572

573
    return Publish(exit_code)
×
574

575

576
class _PublishJsonEncoder(json.JSONEncoder):
11✔
577
    safe_to_str_types = (Address,)
11✔
578

579
    def default(self, o):
11✔
580
        """Return a serializable object for o."""
581
        if is_dataclass(o):
×
582
            return asdict(o)
×
NEW
583
        if isinstance(o, Mapping):
×
584
            return dict(o)
×
NEW
585
        if isinstance(o, Sequence):
×
586
            return list(o)
×
587
        try:
×
588
            return super().default(o)
×
589
        except TypeError:
×
590
            return str(o)
×
591

592

593
def rules():
11✔
594
    return collect_rules()
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc