• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 21374897774

26 Jan 2026 09:37PM UTC coverage: 80.008% (-0.3%) from 80.269%
21374897774

Pull #23037

github

web-flow
Merge 4023b9eee into 09b8ecaa1
Pull Request #23037: Enable publish without package 2

105 of 178 new or added lines in 11 files covered. (58.99%)

238 existing lines in 14 files now uncovered.

78628 of 98275 relevant lines covered (80.01%)

3.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.21
/src/python/pants/core/goals/publish.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
"""Goal for publishing packaged targets to any repository or registry etc.
4

5
Plugins implement the publish protocol that provides this goal with the processes to run in order to
6
publish the artifacts.
7

8
The publish protocol consists of defining two union members and one rule, returning the processes to
9
run. See the doc for the corresponding classes in this module for details on the classes to define.
10

11
Example rule:
12

13
    @rule
14
    async def publish_example(request: PublishToMyRepoRequest, ...) -> PublishProcesses:
15
      # Create `InteractiveProcess` instances or `Process` instances as required by the `request`.
16
      return PublishProcesses(...)
17
"""
18

19
from __future__ import annotations
12✔
20

21
import json
12✔
22
import logging
12✔
23
from abc import ABCMeta
12✔
24
from collections.abc import Coroutine, Iterable, Mapping, Sequence
12✔
25
from dataclasses import asdict, dataclass, field, is_dataclass, replace
12✔
26
from enum import Enum
12✔
27
from itertools import chain
12✔
28
from typing import Any, ClassVar, Generic, Self, TypeVar, cast, final
12✔
29

30
from pants.core.goals.package import (
12✔
31
    BuiltPackage,
32
    EnvironmentAwarePackageRequest,
33
    PackageFieldSet,
34
    environment_aware_package,
35
)
36
from pants.engine.addresses import Address
12✔
37
from pants.engine.collection import Collection
12✔
38
from pants.engine.console import Console
12✔
39
from pants.engine.environment import ChosenLocalEnvironmentName, EnvironmentName
12✔
40
from pants.engine.goal import Goal, GoalSubsystem
12✔
41
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
12✔
42
from pants.engine.intrinsics import execute_process, run_interactive_process_in_environment
12✔
43
from pants.engine.process import (
12✔
44
    FallibleProcessResult,
45
    InteractiveProcess,
46
    InteractiveProcessResult,
47
    Process,
48
    ProcessCacheScope,
49
)
50
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
12✔
51
from pants.engine.target import (
12✔
52
    FieldSet,
53
    ImmutableValue,
54
    NoApplicableTargetsBehavior,
55
    TargetRootsToFieldSets,
56
    TargetRootsToFieldSetsRequest,
57
)
58
from pants.engine.unions import UnionMembership, UnionRule, union
12✔
59
from pants.option.option_types import EnumOption, StrOption
12✔
60
from pants.util.frozendict import FrozenDict
12✔
61
from pants.util.strutil import softwrap
12✔
62

63
logger = logging.getLogger(__name__)
12✔
64

65

66
_F = TypeVar("_F", bound=FieldSet)
12✔
67

68

69
class PublishOutputData(FrozenDict[str, ImmutableValue]):
12✔
70
    pass
12✔
71

72

73
@union(in_scope_types=[EnvironmentName])
12✔
74
@dataclass(frozen=True)
12✔
75
class PublishRequest(Generic[_F]):
12✔
76
    """Implement a union member subclass of this union class along with a PublishFieldSet subclass
77
    that appoints that member subclass in order to receive publish requests for targets compatible
78
    with the field set.
79

80
    The `packages` hold all artifacts produced for a given target to be published.
81

82
    Example:
83

84
        PublishToMyRepoRequest(PublishRequest):
85
          pass
86

87
        PublishToMyRepoFieldSet(PublishFieldSet):
88
          publish_request_type = PublishToMyRepoRequest
89

90
          # Standard FieldSet semantics from here on:
91
          required_fields = (MyRepositories,)
92
          ...
93
    """
94

95
    field_set: _F
12✔
96
    packages: tuple[BuiltPackage, ...]
12✔
97

98

99
@union(in_scope_types=[EnvironmentName])
12✔
100
@dataclass(frozen=True)
12✔
101
class PreemptiveSkipRequest(Generic[_F]):
12✔
102
    package_fs: PackageFieldSet
12✔
103
    publish_fs: _F
12✔
104

105
    @property
12✔
106
    def address(self) -> Address:
12✔
NEW
107
        return self.publish_fs.address
×
108

109

110
_T = TypeVar("_T", bound=PublishRequest)
12✔
111

112

113
@union(in_scope_types=[EnvironmentName])
12✔
114
@dataclass(frozen=True)
12✔
115
class PublishFieldSet(Generic[_T], FieldSet, metaclass=ABCMeta):
12✔
116
    """FieldSet for PublishRequest.
117

118
    Union members may list any fields required to fulfill the instantiation of the
119
    `PublishProcesses` result of the publish rule.
120
    """
121

122
    # Subclasses must provide this, to a union member (subclass) of `PublishRequest`.
123
    publish_request_type: ClassVar[type[_T]]
12✔
124

125
    @final
12✔
126
    def _request(self, packages: tuple[BuiltPackage, ...]) -> _T:
12✔
127
        """Internal helper for the core publish goal."""
128
        return self.publish_request_type(field_set=self, packages=packages)
2✔
129

130
    def make_skip_request(self, package_fs: PackageFieldSet) -> PreemptiveSkipRequest[Self] | None:
12✔
131
        """Subclasses can override this method if they want to preempt packaging for publish
132
        requests that are just going to be skipped."""
NEW
133
        return None
×
134

135
    @final
12✔
136
    @classmethod
12✔
137
    def rules(cls) -> tuple[UnionRule, ...]:
12✔
138
        """Helper method for registering the union members."""
139
        return (
7✔
140
            UnionRule(PublishFieldSet, cls),
141
            UnionRule(PublishRequest, cls.publish_request_type),
142
        )
143

144
    def get_output_data(self) -> PublishOutputData:
12✔
145
        return PublishOutputData({"target": self.address})
×
146

147

148
# This is the same as the Enum in the test goal.  It is initially separate as
149
# DRYing out is easier than undoing pre-mature abstraction.
150
class ShowOutput(Enum):
12✔
151
    """Which publish actions to emit detailed output for."""
152

153
    ALL = "all"
12✔
154
    FAILED = "failed"
12✔
155
    NONE = "none"
12✔
156

157

158
@dataclass(frozen=True)
12✔
159
class PublishPackages:
12✔
160
    """Processes to run in order to publish the named artifacts.
161

162
    The `names` should list all artifacts being published by the `process` command.
163

164
    The `process` may be `None`, indicating that it will not be published. This will be logged as
165
    `skipped`. If the process returns a non-zero exit code, it will be logged as `failed`. The `process`
166
    can either be a Process or an InteractiveProcess. In most cases, InteractiveProcess will be wanted.
167
    However, some tools have non-interactive publishing modes and can leverage parallelism. See
168
    https://github.com/pantsbuild/pants/issues/17613#issuecomment-1323913381 for more context.
169

170
    The `description` may be a reason explaining why the publish was skipped, or identifying which
171
    repository the artifacts are published to.
172
    """
173

174
    names: tuple[str, ...]
12✔
175
    process: InteractiveProcess | Process | None = None
12✔
176
    description: str | None = None
12✔
177
    data: PublishOutputData = field(default_factory=PublishOutputData)
12✔
178

179
    def get_output_data(self, **extra_data) -> PublishOutputData:
12✔
180
        return PublishOutputData(
×
181
            {
182
                "names": self.names,
183
                **self.data,
184
                **extra_data,
185
            }
186
        )
187

188

189
@dataclass(frozen=True)
12✔
190
class SkippedPublishPackages:
12✔
191
    """PublishPackages that were pre-emptively skipped.
192

193
    If `inner` is None, this indicates that this request should NOT be skipped.
194
    """
195

196
    inner: tuple[PublishPackages, ...]
12✔
197

198
    def __init__(self, inner: Iterable[PublishPackages]) -> None:
12✔
NEW
199
        object.__setattr__(self, "inner", tuple(inner))
×
200

201
    def __post_init__(self):
12✔
NEW
202
        if any(pp.process is not None for pp in self.inner):
×
NEW
203
            raise ValueError("SkippedPublishPackages must not have any non-None processes")
×
204

205
    @classmethod
12✔
206
    def skip(
12✔
207
        cls,
208
        *,
209
        names: Iterable[str],
210
        description: str | None = None,
211
        data: PublishOutputData | None = None,
212
    ) -> Self:
NEW
213
        return cls(
×
214
            [
215
                PublishPackages(
216
                    names=tuple(names), description=description, data=data or PublishOutputData()
217
                )
218
            ]
219
        )
220

221
    @classmethod
12✔
222
    def no_skip(cls) -> Self:
12✔
NEW
223
        return cls(())
×
224

225

226
class PublishProcesses(Collection[PublishPackages]):
12✔
227
    """Collection of what processes to run for all built packages.
228

229
    This is returned from implementing rules in response to a PublishRequest.
230

231
    Depending on the capabilities of the publishing tool, the work may be partitioned based on
232
    number of artifacts and/or repositories to publish to.
233
    """
234

235

236
@rule(polymorphic=True)
12✔
237
async def preemptive_skip_publish_packages(
12✔
238
    request: PreemptiveSkipRequest, environment_name: EnvironmentName
239
) -> SkippedPublishPackages:
NEW
240
    raise NotImplementedError()
×
241

242

243
@rule(polymorphic=True)
12✔
244
async def create_publish_processes(
12✔
245
    req: PublishRequest,
246
    environment_name: EnvironmentName,
247
) -> PublishProcesses:
248
    raise NotImplementedError()
×
249

250

251
@dataclass(frozen=True)
12✔
252
class PublishProcessesRequest:
12✔
253
    """Internal request taking all field sets for a target and turning it into a `PublishProcesses`
254
    collection (via registered publish plugins)."""
255

256
    package_field_sets: tuple[PackageFieldSet, ...]
12✔
257
    publish_field_sets: tuple[PublishFieldSet, ...]
12✔
258

259

260
class PublishSubsystem(GoalSubsystem):
12✔
261
    name = "publish"
12✔
262
    help = "Publish deliverables (assets, distributions, images, etc)."
12✔
263

264
    @classmethod
12✔
265
    def activated(cls, union_membership: UnionMembership) -> bool:
12✔
266
        return PackageFieldSet in union_membership and PublishFieldSet in union_membership
×
267

268
    output = StrOption(
12✔
269
        default=None,
270
        help="Filename for JSON structured publish information.",
271
    )
272

273
    noninteractive_process_output = EnumOption(
12✔
274
        default=ShowOutput.ALL,
275
        help=softwrap(
276
            """
277
            Show stdout/stderr when publishing with
278
            noninteractively.  This only has an effect for those
279
            publish subsystems that support a noninteractive mode.
280
            """
281
        ),
282
    )
283

284

285
class Publish(Goal):
12✔
286
    subsystem_cls = PublishSubsystem
12✔
287
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
12✔
288

289

290
def _to_publish_output_results_and_data(
12✔
291
    pub: PublishPackages, res: FallibleProcessResult | InteractiveProcessResult, console: Console
292
) -> tuple[list[str], list[PublishOutputData]]:
293
    if res.exit_code == 0:
×
294
        sigil = console.sigil_succeeded()
×
295
        status = "published"
×
296
        prep = "to"
×
297
    else:
298
        sigil = console.sigil_failed()
×
299
        status = "failed"
×
300
        prep = "for"
×
301

302
    if pub.description:
×
303
        status += f" {prep} {pub.description}"
×
304

305
    results = []
×
306
    output_data = []
×
307
    for name in pub.names:
×
308
        results.append(f"{sigil} {name} {status}.")
×
309

310
    output_data.append(
×
311
        pub.get_output_data(
312
            exit_code=res.exit_code,
313
            published=res.exit_code == 0,
314
            status=status,
315
        )
316
    )
317
    return results, output_data
×
318

319

320
@rule
12✔
321
async def package_for_publish(
12✔
322
    request: PublishProcessesRequest, local_environment: ChosenLocalEnvironmentName
323
) -> PublishProcesses:
324
    packages = await concurrently(
×
325
        environment_aware_package(EnvironmentAwarePackageRequest(package_fs))
326
        for package_fs in request.package_field_sets
327
    )
328

329
    for pkg in packages:
×
330
        for artifact in pkg.artifacts:
×
331
            if artifact.relpath:
×
332
                logger.info(f"Packaged {artifact.relpath}")
×
333
            elif artifact.extra_log_lines:
×
334
                logger.info(str(artifact.extra_log_lines[0]))
×
335

336
    publish = await concurrently(
×
337
        create_publish_processes(
338
            **implicitly(
339
                {
340
                    field_set._request(packages): PublishRequest,
341
                    local_environment.val: EnvironmentName,
342
                }
343
            )
344
        )
345
        for field_set in request.publish_field_sets
346
    )
347

348
    # Flatten and dress each publish processes collection with data about its origin.
349
    publish_processes = [
×
350
        replace(
351
            publish_process,
352
            data=PublishOutputData({**publish_process.data, **field_set.get_output_data()}),
353
        )
354
        for processes, field_set in zip(publish, request.publish_field_sets)
355
        for publish_process in processes
356
    ]
357

358
    return PublishProcesses(publish_processes)
×
359

360

361
@goal_rule
12✔
362
async def run_publish(
12✔
363
    console: Console,
364
    publish: PublishSubsystem,
365
    local_environment: ChosenLocalEnvironmentName,
366
) -> Publish:
367
    target_roots_to_publish_field_sets: TargetRootsToFieldSets[PublishFieldSet]
UNCOV
368
    target_roots_to_package_field_sets, target_roots_to_publish_field_sets = await concurrently(
×
369
        find_valid_field_sets_for_target_roots(
370
            TargetRootsToFieldSetsRequest(
371
                PackageFieldSet,
372
                goal_description="",
373
                # Don't warn/error here because it's already covered by `PublishFieldSet`.
374
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.ignore,
375
            ),
376
            **implicitly(),
377
        ),
378
        find_valid_field_sets_for_target_roots(
379
            TargetRootsToFieldSetsRequest(
380
                PublishFieldSet,
381
                goal_description="the `publish` goal",
382
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.warn,
383
            ),
384
            **implicitly(),
385
        ),
386
    )
387

388
    # Only keep field sets that both package something, and have something to publish.
389
    targets = set(target_roots_to_package_field_sets.targets).intersection(
×
390
        set(target_roots_to_publish_field_sets.targets)
391
    )
392

393
    if not targets:
×
394
        return Publish(exit_code=0)
×
395

NEW
396
    skip_requests: list[PreemptiveSkipRequest] = []
×
NEW
397
    for tgt, publish_fss in target_roots_to_publish_field_sets.mapping.items():
×
NEW
398
        for package_fs in target_roots_to_package_field_sets.mapping[tgt]:
×
NEW
399
            srs = []
×
NEW
400
            for publish_fs in publish_fss:
×
NEW
401
                maybe_skip_request = publish_fs.make_skip_request(package_fs)
×
NEW
402
                if not maybe_skip_request:
×
NEW
403
                    break
×
NEW
404
                srs.append(maybe_skip_request)
×
405
            # We use a for-else construct here because we're only interested in skip
406
            # requests if every combination for a single publish_fs or package_fs has
407
            # one. Otherwise, we already know we're going to have to package/publish.
408
            else:
NEW
409
                skip_requests.extend(srs)
×
NEW
410
    preemptive_skips = await concurrently(
×
411
        preemptive_skip_publish_packages(
412
            **implicitly(
413
                {skip_request: PreemptiveSkipRequest, local_environment.val: EnvironmentName}
414
            )
415
        )
416
        for skip_request in skip_requests
417
    )
NEW
418
    skipped_publishes: list[PublishPackages] = []
×
419
    # We track nonskips because, if a publish or package FS has a nonskip, then we need to include it
NEW
420
    nonskip_package_fs: set[PackageFieldSet] = set()
×
NEW
421
    nonskip_publish_fs: set[PublishFieldSet] = set()
×
NEW
422
    for skip_request, maybe_skip in zip(skip_requests, preemptive_skips):
×
NEW
423
        if maybe_skip.inner:
×
NEW
424
            skipped_publishes.extend(maybe_skip.inner)
×
425
        else:
NEW
426
            nonskip_package_fs.add(skip_request.package_fs)
×
NEW
427
            nonskip_publish_fs.add(skip_request.publish_fs)
×
NEW
428
    package_skips = {
×
429
        skip_request.package_fs
430
        for skip_request in skip_requests
431
        if skip_request.package_fs not in nonskip_package_fs
432
    }
NEW
433
    publish_skips = {
×
434
        skip_request.publish_fs
435
        for skip_request in skip_requests
436
        if skip_request.publish_fs not in nonskip_publish_fs
437
    }
438
    # Build all packages and request the processes to run for each field set.
439
    processes = await concurrently(
×
440
        package_for_publish(
441
            PublishProcessesRequest(
442
                tuple(
443
                    pfs
444
                    for pfs in target_roots_to_package_field_sets.mapping[tgt]
445
                    if pfs not in package_skips
446
                ),
447
                tuple(
448
                    pfs
449
                    for pfs in target_roots_to_publish_field_sets.mapping[tgt]
450
                    if pfs not in publish_skips
451
                ),
452
            ),
453
            **implicitly(),
454
        )
455
        for tgt in targets
456
    )
457

458
    exit_code: int = 0
×
459
    outputs: list[PublishOutputData] = []
×
460
    results: list[str] = []
×
461

462
    flattened_processes = list(chain.from_iterable(processes))
×
463
    background_publishes: list[PublishPackages] = [
×
464
        pub for pub in flattened_processes if isinstance(pub.process, Process)
465
    ]
466
    foreground_publishes: list[PublishPackages] = [
×
467
        pub for pub in flattened_processes if isinstance(pub.process, InteractiveProcess)
468
    ]
NEW
469
    skipped_publishes.extend(pub for pub in flattened_processes if pub.process is None)
×
UNCOV
470
    background_requests: list[Coroutine[Any, Any, FallibleProcessResult]] = []
×
471
    for pub in background_publishes:
×
472
        process = cast(Process, pub.process)
×
473
        # Because this is a publish process, we want to ensure we don't cache this process.
474
        assert process.cache_scope == ProcessCacheScope.PER_SESSION
×
475
        background_requests.append(
×
476
            execute_process(
477
                **implicitly({process: Process, local_environment.val: EnvironmentName})
478
            )
479
        )
480

481
    # Process all non-interactive publishes
482
    logger.debug(f"Awaiting {len(background_requests)} background publishes")
×
483
    background_results = await concurrently(background_requests)
×
484
    for pub, background_res in zip(background_publishes, background_results):
×
485
        logger.debug(f"Processing {pub.process} background process")
×
486
        pub_results, pub_output = _to_publish_output_results_and_data(pub, background_res, console)
×
487
        results.extend(pub_results)
×
488
        outputs.extend(pub_output)
×
489

490
        names = "'" + "', '".join(pub.names) + "'"
×
491
        output_msg = f"Output for publishing {names}"
×
492
        if background_res.stdout:
×
493
            output_msg += f"\n{background_res.stdout.decode()}"
×
494
        if background_res.stderr:
×
495
            output_msg += f"\n{background_res.stderr.decode()}"
×
496

497
        if publish.noninteractive_process_output == ShowOutput.ALL or (
×
498
            publish.noninteractive_process_output == ShowOutput.FAILED
499
            and background_res.exit_code == 0
500
        ):
501
            console.print_stdout(output_msg)
×
502

503
        if background_res.exit_code != 0:
×
504
            exit_code = background_res.exit_code
×
505

506
    for pub in skipped_publishes:
×
507
        sigil = console.sigil_skipped()
×
508
        status = "skipped"
×
509
        if pub.description:
×
510
            status += f" {pub.description}"
×
511
        for name in pub.names:
×
512
            results.append(f"{sigil} {name} {status}.")
×
513
        outputs.append(pub.get_output_data(published=False, status=status))
×
514

515
    # Process all interactive publishes
516
    for pub in foreground_publishes:
×
517
        logger.debug(f"Execute {pub.process}")
×
518
        res = await run_interactive_process_in_environment(
×
519
            cast(InteractiveProcess, pub.process), local_environment.val
520
        )
521
        pub_results, pub_output = _to_publish_output_results_and_data(pub, res, console)
×
522
        results.extend(pub_results)
×
523
        outputs.extend(pub_output)
×
524
        if res.exit_code != 0:
×
525
            exit_code = res.exit_code
×
526

527
    console.print_stderr("")
×
528
    if not results:
×
529
        sigil = console.sigil_skipped()
×
530
        console.print_stderr(f"{sigil} Nothing published.")
×
531

532
    # We collect all results to the end, so all output from the interactive processes are done,
533
    # before printing the results.
534
    for line in sorted(results):
×
535
        console.print_stderr(line)
×
536

537
    # Log structured output
538
    output_data = json.dumps(outputs, cls=_PublishJsonEncoder, indent=2, sort_keys=True)
×
539
    logger.debug(f"Publish result data:\n{output_data}")
×
540
    if publish.output:
×
541
        with open(publish.output, mode="w") as fd:
×
542
            fd.write(output_data)
×
543

544
    return Publish(exit_code)
×
545

546

547
class _PublishJsonEncoder(json.JSONEncoder):
12✔
548
    safe_to_str_types = (Address,)
12✔
549

550
    def default(self, o):
12✔
551
        """Return a serializable object for o."""
552
        if is_dataclass(o):
×
553
            return asdict(o)
×
NEW
554
        if isinstance(o, Mapping):
×
555
            return dict(o)
×
NEW
556
        if isinstance(o, Sequence):
×
557
            return list(o)
×
558
        try:
×
559
            return super().default(o)
×
560
        except TypeError:
×
561
            return str(o)
×
562

563

564
def rules():
12✔
565
    return collect_rules()
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc