• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 21264706899

22 Jan 2026 09:00PM UTC coverage: 80.255% (+1.6%) from 78.666%
21264706899

Pull #23031

github

web-flow
Merge 8385604a3 into d250c80fe
Pull Request #23031: Enable publish without package

32 of 60 new or added lines in 6 files covered. (53.33%)

2 existing lines in 2 files now uncovered.

78788 of 98172 relevant lines covered (80.26%)

3.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.94
/src/python/pants/core/goals/publish.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
"""Goal for publishing packaged targets to any repository or registry etc.
4

5
Plugins implement the publish protocol that provides this goal with the processes to run in order to
6
publish the artifacts.
7

8
The publish protocol consists of defining two union members and one rule, returning the processes to
9
run. See the doc for the corresponding classes in this module for details on the classes to define.
10

11
Example rule:
12

13
    @rule
14
    async def publish_example(request: PublishToMyRepoRequest, ...) -> PublishProcesses:
15
      # Create `InteractiveProcess` instances or `Process` instances as required by the `request`.
16
      return PublishProcesses(...)
17
"""
18

19
from __future__ import annotations
12✔
20

21
import json
12✔
22
import logging
12✔
23
from abc import ABCMeta
12✔
24
from collections import defaultdict
12✔
25
from collections.abc import Coroutine, Mapping, Sequence
12✔
26
from dataclasses import asdict, dataclass, field, is_dataclass, replace
12✔
27
from enum import Enum
12✔
28
from itertools import chain
12✔
29
from typing import Any, ClassVar, Generic, TypeVar, cast, final
12✔
30

31
from pants.core.goals.package import (
12✔
32
    BuiltPackage,
33
    EnvironmentAwarePackageRequest,
34
    PackageFieldSet,
35
    environment_aware_package,
36
)
37
from pants.engine.addresses import Address
12✔
38
from pants.engine.collection import Collection
12✔
39
from pants.engine.console import Console
12✔
40
from pants.engine.environment import ChosenLocalEnvironmentName, EnvironmentName
12✔
41
from pants.engine.goal import Goal, GoalSubsystem
12✔
42
from pants.engine.internals.graph import transitive_targets as transitive_targets_get
12✔
43
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
12✔
44
from pants.engine.intrinsics import execute_process, run_interactive_process_in_environment
12✔
45
from pants.engine.process import (
12✔
46
    FallibleProcessResult,
47
    InteractiveProcess,
48
    InteractiveProcessResult,
49
    Process,
50
    ProcessCacheScope,
51
)
52
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
12✔
53
from pants.engine.target import (
12✔
54
    FieldSet,
55
    ImmutableValue,
56
    NoApplicableTargetsBehavior,
57
    TargetRootsToFieldSetsRequest,
58
    TransitiveTargetsRequest,
59
)
60
from pants.engine.unions import UnionMembership, UnionRule, union
12✔
61
from pants.option.option_types import EnumOption, StrOption
12✔
62
from pants.util.frozendict import FrozenDict
12✔
63
from pants.util.strutil import softwrap
12✔
64

65
logger = logging.getLogger(__name__)
12✔
66

67

68
_F = TypeVar("_F", bound=FieldSet)
12✔
69

70

71
class PublishOutputData(FrozenDict[str, ImmutableValue]):
12✔
72
    pass
12✔
73

74

75
@union(in_scope_types=[EnvironmentName])
12✔
76
@dataclass(frozen=True)
12✔
77
class PublishRequest(Generic[_F]):
12✔
78
    """Implement a union member subclass of this union class along with a PublishFieldSet subclass
79
    that appoints that member subclass in order to receive publish requests for targets compatible
80
    with the field set.
81

82
    The `packages` hold all artifacts produced for a given target to be published.
83

84
    Example:
85

86
        PublishToMyRepoRequest(PublishRequest):
87
          pass
88

89
        PublishToMyRepoFieldSet(PublishFieldSet):
90
          publish_request_type = PublishToMyRepoRequest
91

92
          # Standard FieldSet semantics from here on:
93
          required_fields = (MyRepositories,)
94
          ...
95
    """
96

97
    field_set: _F
12✔
98
    packages: tuple[BuiltPackage, ...]
12✔
99

100

101
_T = TypeVar("_T", bound=PublishRequest)
12✔
102

103

104
@union(in_scope_types=[EnvironmentName])
12✔
105
@dataclass(frozen=True)
12✔
106
class PublishFieldSet(Generic[_T], FieldSet, metaclass=ABCMeta):
12✔
107
    """FieldSet for PublishRequest.
108

109
    Union members may list any fields required to fulfill the instantiation of the
110
    `PublishProcesses` result of the publish rule.
111
    """
112

113
    # Subclasses must provide this, to a union member (subclass) of `PublishRequest`.
114
    publish_request_type: ClassVar[type[_T]]
12✔
115

116
    @final
12✔
117
    def _request(self, packages: tuple[BuiltPackage, ...]) -> _T:
12✔
118
        """Internal helper for the core publish goal."""
119
        return self.publish_request_type(field_set=self, packages=packages)
3✔
120

121
    @final
12✔
122
    @classmethod
12✔
123
    def rules(cls) -> tuple[UnionRule, ...]:
12✔
124
        """Helper method for registering the union members."""
125
        return (
7✔
126
            UnionRule(PublishFieldSet, cls),
127
            UnionRule(PublishRequest, cls.publish_request_type),
128
        )
129

130
    def get_output_data(self) -> PublishOutputData:
12✔
131
        return PublishOutputData({"target": self.address})
×
132

133
    def package_before_publish(self, package_fs: PackageFieldSet) -> bool:
12✔
134
        """Hook method to determine if a corresponding `package` rule should be executed before the
135
        associated `publish` rule.
136

137
        The target referred to by the PackageFieldSet is guaranteed to be a transitive dependency of
138
        the target referred to by `self`, including being the same target as `self`.
139
        """
NEW
140
        return True
×
141

142

143
# This is the same as the Enum in the test goal.  It is initially separate as
144
# DRYing out is easier than undoing pre-mature abstraction.
145
class ShowOutput(Enum):
12✔
146
    """Which publish actions to emit detailed output for."""
147

148
    ALL = "all"
12✔
149
    FAILED = "failed"
12✔
150
    NONE = "none"
12✔
151

152

153
@dataclass(frozen=True)
12✔
154
class PublishPackages:
12✔
155
    """Processes to run in order to publish the named artifacts.
156

157
    The `names` should list all artifacts being published by the `process` command.
158

159
    The `process` may be `None`, indicating that it will not be published. This will be logged as
160
    `skipped`. If the process returns a non-zero exit code, it will be logged as `failed`. The `process`
161
    can either be a Process or an InteractiveProcess. In most cases, InteractiveProcess will be wanted.
162
    However, some tools have non-interactive publishing modes and can leverage parallelism. See
163
    https://github.com/pantsbuild/pants/issues/17613#issuecomment-1323913381 for more context.
164

165
    The `description` may be a reason explaining why the publish was skipped, or identifying which
166
    repository the artifacts are published to.
167
    """
168

169
    names: tuple[str, ...]
12✔
170
    process: InteractiveProcess | Process | None = None
12✔
171
    description: str | None = None
12✔
172
    data: PublishOutputData = field(default_factory=PublishOutputData)
12✔
173

174
    def get_output_data(self, **extra_data) -> PublishOutputData:
12✔
175
        return PublishOutputData(
×
176
            {
177
                "names": self.names,
178
                **self.data,
179
                **extra_data,
180
            }
181
        )
182

183

184
class PublishProcesses(Collection[PublishPackages]):
12✔
185
    """Collection of what processes to run for all built packages.
186

187
    This is returned from implementing rules in response to a PublishRequest.
188

189
    Depending on the capabilities of the publishing tool, the work may be partitioned based on
190
    number of artifacts and/or repositories to publish to.
191
    """
192

193

194
@rule(polymorphic=True)
12✔
195
async def create_publish_processes(
12✔
196
    req: PublishRequest,
197
    environment_name: EnvironmentName,
198
) -> PublishProcesses:
199
    raise NotImplementedError()
×
200

201

202
@dataclass(frozen=True)
12✔
203
class PublishProcessesRequest:
12✔
204
    """Internal request taking all field sets for a target and turning it into a `PublishProcesses`
205
    collection (via registered publish plugins)."""
206

207
    package_field_sets: tuple[PackageFieldSet, ...]
12✔
208
    publish_field_sets: tuple[PublishFieldSet, ...]
12✔
209

210

211
class PublishSubsystem(GoalSubsystem):
12✔
212
    name = "publish"
12✔
213
    help = "Publish deliverables (assets, distributions, images, etc)."
12✔
214

215
    @classmethod
12✔
216
    def activated(cls, union_membership: UnionMembership) -> bool:
12✔
217
        return PackageFieldSet in union_membership and PublishFieldSet in union_membership
×
218

219
    output = StrOption(
12✔
220
        default=None,
221
        help="Filename for JSON structured publish information.",
222
    )
223

224
    noninteractive_process_output = EnumOption(
12✔
225
        default=ShowOutput.ALL,
226
        help=softwrap(
227
            """
228
            Show stdout/stderr when publishing with
229
            noninteractively.  This only has an effect for those
230
            publish subsystems that support a noninteractive mode.
231
            """
232
        ),
233
    )
234

235

236
class Publish(Goal):
12✔
237
    subsystem_cls = PublishSubsystem
12✔
238
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
12✔
239

240

241
def _to_publish_output_results_and_data(
12✔
242
    pub: PublishPackages, res: FallibleProcessResult | InteractiveProcessResult, console: Console
243
) -> tuple[list[str], list[PublishOutputData]]:
244
    if res.exit_code == 0:
×
245
        sigil = console.sigil_succeeded()
×
246
        status = "published"
×
247
        prep = "to"
×
248
    else:
249
        sigil = console.sigil_failed()
×
250
        status = "failed"
×
251
        prep = "for"
×
252

253
    if pub.description:
×
254
        status += f" {prep} {pub.description}"
×
255

256
    results = []
×
257
    output_data = []
×
258
    for name in pub.names:
×
259
        results.append(f"{sigil} {name} {status}.")
×
260

261
    output_data.append(
×
262
        pub.get_output_data(
263
            exit_code=res.exit_code,
264
            published=res.exit_code == 0,
265
            status=status,
266
        )
267
    )
268
    return results, output_data
×
269

270

271
@rule
12✔
272
async def package_for_publish(
12✔
273
    request: PublishProcessesRequest, local_environment: ChosenLocalEnvironmentName
274
) -> PublishProcesses:
275
    # Map an address to all of its PublishFieldSets
NEW
276
    address_to_publish_fss: defaultdict[Address, list[PublishFieldSet]] = defaultdict(list)
×
NEW
277
    for publish_fs in request.publish_field_sets:
×
NEW
278
        address_to_publish_fss[publish_fs.address].append(publish_fs)
×
NEW
279
    transitive_deps = await concurrently(
×
280
        transitive_targets_get(TransitiveTargetsRequest([address]), **implicitly())
281
        for address in address_to_publish_fss.keys()
282
    )
283
    # Map an address to all of the PublishFieldSets that transitively depend on each address (including its own PublishFieldSet)
NEW
284
    publish_fs_to_check_for_package: defaultdict[Address, set[PublishFieldSet]] = defaultdict(set)
×
NEW
285
    for publish_fss, tds in zip(address_to_publish_fss.values(), transitive_deps):
×
NEW
286
        for tgt in tds.closure:
×
287
            # `request.package_field_sets` and `request.publish_field_sets` refer to the same set of addresses
NEW
288
            if tgt.address in address_to_publish_fss:
×
NEW
289
                publish_fs_to_check_for_package[tgt.address].update(publish_fss)
×
UNCOV
290
    packages = await concurrently(
×
291
        environment_aware_package(EnvironmentAwarePackageRequest(package_fs))
292
        for package_fs in request.package_field_sets
293
        if any(
294
            publish_fs.package_before_publish(package_fs)
295
            for publish_fs in publish_fs_to_check_for_package[package_fs.address]
296
        )
297
    )
298

299
    for pkg in packages:
×
300
        for artifact in pkg.artifacts:
×
301
            if artifact.relpath:
×
302
                logger.info(f"Packaged {artifact.relpath}")
×
303
            elif artifact.extra_log_lines:
×
304
                logger.info(str(artifact.extra_log_lines[0]))
×
305

306
    publish = await concurrently(
×
307
        create_publish_processes(
308
            **implicitly(
309
                {
310
                    field_set._request(packages): PublishRequest,
311
                    local_environment.val: EnvironmentName,
312
                }
313
            )
314
        )
315
        for field_set in request.publish_field_sets
316
    )
317

318
    # Flatten and dress each publish processes collection with data about its origin.
319
    publish_processes = [
×
320
        replace(
321
            publish_process,
322
            data=PublishOutputData({**publish_process.data, **field_set.get_output_data()}),
323
        )
324
        for processes, field_set in zip(publish, request.publish_field_sets)
325
        for publish_process in processes
326
    ]
327

328
    return PublishProcesses(publish_processes)
×
329

330

331
@goal_rule
12✔
332
async def run_publish(
12✔
333
    console: Console, publish: PublishSubsystem, local_environment: ChosenLocalEnvironmentName
334
) -> Publish:
335
    target_roots_to_package_field_sets, target_roots_to_publish_field_sets = await concurrently(
×
336
        find_valid_field_sets_for_target_roots(
337
            TargetRootsToFieldSetsRequest(
338
                PackageFieldSet,
339
                goal_description="",
340
                # Don't warn/error here because it's already covered by `PublishFieldSet`.
341
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.ignore,
342
            ),
343
            **implicitly(),
344
        ),
345
        find_valid_field_sets_for_target_roots(
346
            TargetRootsToFieldSetsRequest(
347
                PublishFieldSet,
348
                goal_description="the `publish` goal",
349
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.warn,
350
            ),
351
            **implicitly(),
352
        ),
353
    )
354

355
    # Only keep field sets that both package something, and have something to publish.
356
    targets = set(target_roots_to_package_field_sets.targets).intersection(
×
357
        set(target_roots_to_publish_field_sets.targets)
358
    )
359

360
    if not targets:
×
361
        return Publish(exit_code=0)
×
362

363
    # Build all packages and request the processes to run for each field set.
364
    processes = await concurrently(
×
365
        package_for_publish(
366
            PublishProcessesRequest(
367
                target_roots_to_package_field_sets.mapping[tgt],
368
                target_roots_to_publish_field_sets.mapping[tgt],
369
            ),
370
            **implicitly(),
371
        )
372
        for tgt in targets
373
    )
374

375
    exit_code: int = 0
×
376
    outputs: list[PublishOutputData] = []
×
377
    results: list[str] = []
×
378

379
    flattened_processes = list(chain.from_iterable(processes))
×
380
    background_publishes: list[PublishPackages] = [
×
381
        pub for pub in flattened_processes if isinstance(pub.process, Process)
382
    ]
383
    foreground_publishes: list[PublishPackages] = [
×
384
        pub for pub in flattened_processes if isinstance(pub.process, InteractiveProcess)
385
    ]
386
    skipped_publishes: list[PublishPackages] = [
×
387
        pub for pub in flattened_processes if pub.process is None
388
    ]
389
    background_requests: list[Coroutine[Any, Any, FallibleProcessResult]] = []
×
390
    for pub in background_publishes:
×
391
        process = cast(Process, pub.process)
×
392
        # Because this is a publish process, we want to ensure we don't cache this process.
393
        assert process.cache_scope == ProcessCacheScope.PER_SESSION
×
394
        background_requests.append(
×
395
            execute_process(
396
                **implicitly({process: Process, local_environment.val: EnvironmentName})
397
            )
398
        )
399

400
    # Process all non-interactive publishes
401
    logger.debug(f"Awaiting {len(background_requests)} background publishes")
×
402
    background_results = await concurrently(background_requests)
×
403
    for pub, background_res in zip(background_publishes, background_results):
×
404
        logger.debug(f"Processing {pub.process} background process")
×
405
        pub_results, pub_output = _to_publish_output_results_and_data(pub, background_res, console)
×
406
        results.extend(pub_results)
×
407
        outputs.extend(pub_output)
×
408

409
        names = "'" + "', '".join(pub.names) + "'"
×
410
        output_msg = f"Output for publishing {names}"
×
411
        if background_res.stdout:
×
412
            output_msg += f"\n{background_res.stdout.decode()}"
×
413
        if background_res.stderr:
×
414
            output_msg += f"\n{background_res.stderr.decode()}"
×
415

416
        if publish.noninteractive_process_output == ShowOutput.ALL or (
×
417
            publish.noninteractive_process_output == ShowOutput.FAILED
418
            and background_res.exit_code == 0
419
        ):
420
            console.print_stdout(output_msg)
×
421

422
        if background_res.exit_code != 0:
×
423
            exit_code = background_res.exit_code
×
424

425
    for pub in skipped_publishes:
×
426
        sigil = console.sigil_skipped()
×
427
        status = "skipped"
×
428
        if pub.description:
×
429
            status += f" {pub.description}"
×
430
        for name in pub.names:
×
431
            results.append(f"{sigil} {name} {status}.")
×
432
        outputs.append(pub.get_output_data(published=False, status=status))
×
433

434
    # Process all interactive publishes
435
    for pub in foreground_publishes:
×
436
        logger.debug(f"Execute {pub.process}")
×
437
        res = await run_interactive_process_in_environment(
×
438
            cast(InteractiveProcess, pub.process), local_environment.val
439
        )
440
        pub_results, pub_output = _to_publish_output_results_and_data(pub, res, console)
×
441
        results.extend(pub_results)
×
442
        outputs.extend(pub_output)
×
443
        if res.exit_code != 0:
×
444
            exit_code = res.exit_code
×
445

446
    console.print_stderr("")
×
447
    if not results:
×
448
        sigil = console.sigil_skipped()
×
449
        console.print_stderr(f"{sigil} Nothing published.")
×
450

451
    # We collect all results to the end, so all output from the interactive processes are done,
452
    # before printing the results.
453
    for line in sorted(results):
×
454
        console.print_stderr(line)
×
455

456
    # Log structured output
457
    output_data = json.dumps(outputs, cls=_PublishJsonEncoder, indent=2, sort_keys=True)
×
458
    logger.debug(f"Publish result data:\n{output_data}")
×
459
    if publish.output:
×
460
        with open(publish.output, mode="w") as fd:
×
461
            fd.write(output_data)
×
462

463
    return Publish(exit_code)
×
464

465

466
class _PublishJsonEncoder(json.JSONEncoder):
12✔
467
    safe_to_str_types = (Address,)
12✔
468

469
    def default(self, o):
12✔
470
        """Return a serializable object for o."""
471
        if is_dataclass(o):
×
472
            return asdict(o)
×
NEW
473
        if isinstance(o, Mapping):
×
474
            return dict(o)
×
NEW
475
        if isinstance(o, Sequence):
×
476
            return list(o)
×
477
        try:
×
478
            return super().default(o)
×
479
        except TypeError:
×
480
            return str(o)
×
481

482

483
def rules():
12✔
484
    return collect_rules()
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc