• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 24060868650

07 Apr 2026 02:12AM UTC coverage: 92.903% (-0.005%) from 92.908%
24060868650

Pull #23225

github

web-flow
Merge 86ff62991 into 542ca048d
Pull Request #23225: Add --test-show-all-batch-targets to expose all targets in batched pytest

10 of 17 new or added lines in 2 files covered. (58.82%)

1 existing line in 1 file now uncovered.

91544 of 98537 relevant lines covered (92.9%)

4.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.72
/src/python/pants/core/goals/test.py
1
# Copyright 2018 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import itertools
12✔
7
import json
12✔
8
import logging
12✔
9
import os
12✔
10
import shlex
12✔
11
from abc import ABC, ABCMeta
12✔
12
from collections.abc import Coroutine, Iterable, Sequence
12✔
13
from dataclasses import dataclass, field
12✔
14
from datetime import datetime
12✔
15
from enum import Enum
12✔
16
from pathlib import PurePath
12✔
17
from typing import Any, ClassVar, TypeVar, cast
12✔
18

19
from pants.core.environments.rules import (
12✔
20
    ChosenLocalEnvironmentName,
21
    EnvironmentName,
22
    SingleEnvironmentNameRequest,
23
    resolve_single_environment_name,
24
)
25
from pants.core.goals.multi_tool_goal_helper import SkippableSubsystem
12✔
26
from pants.core.goals.package import (
12✔
27
    BuiltPackage,
28
    EnvironmentAwarePackageRequest,
29
    PackageFieldSet,
30
    environment_aware_package,
31
)
32
from pants.core.subsystems.debug_adapter import DebugAdapterSubsystem
12✔
33
from pants.core.util_rules.distdir import DistDir
12✔
34
from pants.core.util_rules.env_vars import environment_vars_subset
12✔
35
from pants.core.util_rules.partitions import (
12✔
36
    PartitionerType,
37
    PartitionMetadataT,
38
    Partitions,
39
    _BatchBase,
40
    _PartitionFieldSetsRequestBase,
41
)
42
from pants.engine.addresses import Address
12✔
43
from pants.engine.collection import Collection
12✔
44
from pants.engine.console import Console
12✔
45
from pants.engine.desktop import OpenFilesRequest, find_open_program
12✔
46
from pants.engine.engine_aware import EngineAwareReturnType
12✔
47
from pants.engine.env_vars import EXTRA_ENV_VARS_USAGE_HELP, EnvironmentVars, EnvironmentVarsRequest
12✔
48
from pants.engine.fs import EMPTY_FILE_DIGEST, FileDigest, MergeDigests, Snapshot, Workspace
12✔
49
from pants.engine.goal import Goal, GoalSubsystem
12✔
50
from pants.engine.internals.graph import find_valid_field_sets, resolve_targets
12✔
51
from pants.engine.internals.session import RunId
12✔
52
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
12✔
53
from pants.engine.intrinsics import merge_digests, run_interactive_process_in_environment
12✔
54
from pants.engine.process import (
12✔
55
    FallibleProcessResult,
56
    InteractiveProcess,
57
    ProcessCacheScope,
58
    ProcessResultMetadata,
59
)
60
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
12✔
61
from pants.engine.target import (
12✔
62
    FieldSet,
63
    FieldSetsPerTargetRequest,
64
    IntField,
65
    NoApplicableTargetsBehavior,
66
    SourcesField,
67
    SpecialCasedDependencies,
68
    StringField,
69
    StringSequenceField,
70
    TargetRootsToFieldSets,
71
    TargetRootsToFieldSetsRequest,
72
    ValidNumbers,
73
    parse_shard_spec,
74
)
75
from pants.engine.unions import UnionMembership, UnionRule, distinct_union_type_per_subclass, union
12✔
76
from pants.option.option_types import BoolOption, EnumOption, IntOption, StrListOption, StrOption
12✔
77
from pants.util.collections import partition_sequentially
12✔
78
from pants.util.dirutil import safe_open
12✔
79
from pants.util.docutil import bin_name
12✔
80
from pants.util.logging import LogLevel
12✔
81
from pants.util.memo import memoized, memoized_property
12✔
82
from pants.util.meta import classproperty
12✔
83
from pants.util.strutil import Simplifier, help_text, softwrap
12✔
84

85
logger = logging.getLogger(__name__)
12✔
86

87

88
@dataclass(frozen=True)
12✔
89
class TestResult(EngineAwareReturnType):
12✔
90
    # A None exit_code indicates a backend that performs its own test discovery/selection
91
    # (rather than delegating that to the underlying test tool), and discovered no tests.
92
    exit_code: int | None
12✔
93
    stdout_bytes: bytes
12✔
94
    stdout_digest: FileDigest
12✔
95
    stderr_bytes: bytes
12✔
96
    stderr_digest: FileDigest
12✔
97
    addresses: tuple[Address, ...]
12✔
98
    output_setting: ShowOutput
12✔
99
    # A None result_metadata indicates a backend that performs its own test discovery/selection
100
    # and either discovered no tests, or encountered an error, such as a compilation error, in
101
    # the attempt.
102
    result_metadata: ProcessResultMetadata | None  # TODO: Merge elapsed MS of all subproceses
12✔
103
    partition_description: str | None = None
12✔
104

105
    coverage_data: CoverageData | None = None
12✔
106
    # TODO: Rename this to `reports`. There is no guarantee that every language will produce
107
    #  XML reports, or only XML reports.
108
    xml_results: Snapshot | None = None
12✔
109
    # Any extra output (such as from plugins) that the test runner was configured to output.
110
    extra_output: Snapshot | None = None
12✔
111
    # True if the core test rules should log that extra output was written.
112
    log_extra_output: bool = False
12✔
113
    # All results including failed attempts
114
    process_results: tuple[FallibleProcessResult, ...] = field(default_factory=tuple)
12✔
115

116
    output_simplifier: Simplifier = Simplifier()
12✔
117

118
    # Prevent this class from being detected by pytest as a test class.
119
    __test__ = False
12✔
120

121
    @staticmethod
12✔
122
    def no_tests_found(address: Address, output_setting: ShowOutput) -> TestResult:
12✔
123
        """Used when we do test discovery ourselves, and we didn't find any."""
124
        return TestResult(
1✔
125
            exit_code=None,
126
            stdout_bytes=b"",
127
            stderr_bytes=b"",
128
            stdout_digest=EMPTY_FILE_DIGEST,
129
            stderr_digest=EMPTY_FILE_DIGEST,
130
            addresses=(address,),
131
            output_setting=output_setting,
132
            result_metadata=None,
133
        )
134

135
    @staticmethod
12✔
136
    def no_tests_found_in_batch(
12✔
137
        batch: TestRequest.Batch[_TestFieldSetT, Any], output_setting: ShowOutput
138
    ) -> TestResult:
139
        """Used when we do test discovery ourselves, and we didn't find any."""
140
        return TestResult(
1✔
141
            exit_code=None,
142
            stdout_bytes=b"",
143
            stderr_bytes=b"",
144
            stdout_digest=EMPTY_FILE_DIGEST,
145
            stderr_digest=EMPTY_FILE_DIGEST,
146
            addresses=tuple(field_set.address for field_set in batch.elements),
147
            output_setting=output_setting,
148
            result_metadata=None,
149
            partition_description=batch.partition_metadata.description,
150
        )
151

152
    @staticmethod
12✔
153
    def from_fallible_process_result(
12✔
154
        process_results: tuple[FallibleProcessResult, ...],
155
        address: Address,
156
        output_setting: ShowOutput,
157
        *,
158
        coverage_data: CoverageData | None = None,
159
        xml_results: Snapshot | None = None,
160
        extra_output: Snapshot | None = None,
161
        log_extra_output: bool = False,
162
        output_simplifier: Simplifier = Simplifier(),
163
    ) -> TestResult:
164
        process_result = process_results[-1]
8✔
165
        return TestResult(
8✔
166
            exit_code=process_result.exit_code,
167
            stdout_bytes=process_result.stdout,
168
            stdout_digest=process_result.stdout_digest,
169
            stderr_bytes=process_result.stderr,
170
            stderr_digest=process_result.stderr_digest,
171
            addresses=(address,),
172
            output_setting=output_setting,
173
            result_metadata=process_result.metadata,
174
            coverage_data=coverage_data,
175
            xml_results=xml_results,
176
            extra_output=extra_output,
177
            log_extra_output=log_extra_output,
178
            process_results=process_results,
179
            output_simplifier=output_simplifier,
180
        )
181

182
    @staticmethod
12✔
183
    def from_batched_fallible_process_result(
12✔
184
        process_results: tuple[FallibleProcessResult, ...],
185
        batch: TestRequest.Batch[_TestFieldSetT, Any],
186
        output_setting: ShowOutput,
187
        *,
188
        coverage_data: CoverageData | None = None,
189
        xml_results: Snapshot | None = None,
190
        extra_output: Snapshot | None = None,
191
        log_extra_output: bool = False,
192
        output_simplifier: Simplifier = Simplifier(),
193
    ) -> TestResult:
194
        process_result = process_results[-1]
4✔
195
        return TestResult(
4✔
196
            exit_code=process_result.exit_code,
197
            stdout_bytes=process_result.stdout,
198
            stdout_digest=process_result.stdout_digest,
199
            stderr_bytes=process_result.stderr,
200
            stderr_digest=process_result.stderr_digest,
201
            addresses=tuple(field_set.address for field_set in batch.elements),
202
            output_setting=output_setting,
203
            result_metadata=process_result.metadata,
204
            coverage_data=coverage_data,
205
            xml_results=xml_results,
206
            extra_output=extra_output,
207
            log_extra_output=log_extra_output,
208
            output_simplifier=output_simplifier,
209
            partition_description=batch.partition_metadata.description,
210
            process_results=process_results,
211
        )
212

213
    @property
12✔
214
    def description(self) -> str:
12✔
215
        if len(self.addresses) == 1:
1✔
216
            return self.addresses[0].spec
1✔
217

218
        return f"{self.addresses[0].spec} and {len(self.addresses) - 1} other files"
×
219

220
    @property
12✔
221
    def path_safe_description(self) -> str:
12✔
222
        if len(self.addresses) == 1:
×
223
            return self.addresses[0].path_safe_spec
×
224

225
        return f"{self.addresses[0].path_safe_spec}+{len(self.addresses) - 1}"
×
226

227
    def __lt__(self, other: Any) -> bool:
12✔
228
        """We sort first by exit code, then alphanumerically within each group."""
229
        if not isinstance(other, TestResult):
1✔
230
            return NotImplemented
×
231
        if self.exit_code == other.exit_code:
1✔
232
            return self.description < other.description
1✔
233
        if self.exit_code is None:
1✔
234
            return True
×
235
        if other.exit_code is None:
1✔
236
            return False
×
237
        return abs(self.exit_code) < abs(other.exit_code)
1✔
238

239
    def artifacts(self) -> dict[str, FileDigest | Snapshot] | None:
12✔
240
        output: dict[str, FileDigest | Snapshot] = {
9✔
241
            "stdout": self.stdout_digest,
242
            "stderr": self.stderr_digest,
243
        }
244
        if self.xml_results:
9✔
245
            output["xml_results"] = self.xml_results
6✔
246
        return output
9✔
247

248
    def level(self) -> LogLevel:
12✔
249
        if self.exit_code is None:
10✔
250
            return LogLevel.DEBUG
3✔
251
        return LogLevel.INFO if self.exit_code == 0 else LogLevel.ERROR
10✔
252

253
    def _simplified_output(self, v: bytes) -> str:
12✔
254
        return self.output_simplifier.simplify(v.decode(errors="replace"))
7✔
255

256
    @memoized_property
12✔
257
    def stdout_simplified_str(self) -> str:
12✔
258
        return self._simplified_output(self.stdout_bytes)
7✔
259

260
    @memoized_property
12✔
261
    def stderr_simplified_str(self) -> str:
12✔
262
        return self._simplified_output(self.stderr_bytes)
4✔
263

264
    def message(self) -> str:
12✔
265
        if self.exit_code is None:
10✔
266
            return "no tests found."
3✔
267
        status = "succeeded" if self.exit_code == 0 else f"failed (exit code {self.exit_code})"
10✔
268
        message = f"{status}."
10✔
269
        if self.partition_description:
10✔
270
            message += f"\nPartition: {self.partition_description}"
4✔
271
        if self.output_setting == ShowOutput.NONE or (
10✔
272
            self.output_setting == ShowOutput.FAILED and self.exit_code == 0
273
        ):
274
            return message
10✔
275
        output = ""
7✔
276
        if self.stdout_bytes:
7✔
277
            output += f"\n{self.stdout_simplified_str}"
7✔
278
        if self.stderr_bytes:
7✔
279
            output += f"\n{self.stderr_simplified_str}"
4✔
280
        if output:
7✔
281
            output = f"{output.rstrip()}\n\n"
7✔
282
        return f"{message}{output}"
7✔
283

284
    def metadata(self) -> dict[str, Any]:
12✔
285
        return {"addresses": [address.spec for address in self.addresses]}
9✔
286

287
    def cacheable(self) -> bool:
12✔
288
        """Is marked uncacheable to ensure that it always renders."""
289
        return False
9✔
290

291

292
class ShowOutput(Enum):
12✔
293
    """Which tests to emit detailed output for."""
294

295
    ALL = "all"
12✔
296
    FAILED = "failed"
12✔
297
    NONE = "none"
12✔
298

299

300
@dataclass(frozen=True)
12✔
301
class TestDebugRequest:
12✔
302
    process: InteractiveProcess
12✔
303

304
    # Prevent this class from being detected by pytest as a test class.
305
    __test__ = False
12✔
306

307

308
class TestDebugAdapterRequest(TestDebugRequest):
12✔
309
    """Like TestDebugRequest, but launches the test process using the relevant Debug Adapter server.
310

311
    The process should be launched waiting for the client to connect.
312
    """
313

314

315
@union
12✔
316
@dataclass(frozen=True)
12✔
317
class TestFieldSet(FieldSet, metaclass=ABCMeta):
12✔
318
    """The fields necessary to run tests on a target."""
319

320
    sources: SourcesField
12✔
321

322
    __test__ = False
12✔
323

324

325
_TestFieldSetT = TypeVar("_TestFieldSetT", bound=TestFieldSet)
12✔
326

327

328
@union
12✔
329
class TestRequest:
12✔
330
    """Base class for plugin types wanting to be run as part of `test`.
331

332
    Plugins should define a new type which subclasses this type, and set the
333
    appropriate class variables.
334
    E.g.
335
        class DryCleaningRequest(TestRequest):
336
            tool_subsystem = DryCleaningSubsystem
337
            field_set_type = DryCleaningFieldSet
338

339
    Then register the rules which tell Pants about your plugin.
340
    E.g.
341
        def rules():
342
            return [
343
                *collect_rules(),
344
                *DryCleaningRequest.rules(),
345
            ]
346
    """
347

348
    tool_subsystem: ClassVar[type[SkippableSubsystem]]
12✔
349
    field_set_type: ClassVar[type[TestFieldSet]]
12✔
350
    partitioner_type: ClassVar[PartitionerType] = PartitionerType.DEFAULT_ONE_PARTITION_PER_INPUT
12✔
351

352
    supports_debug: ClassVar[bool] = False
12✔
353
    supports_debug_adapter: ClassVar[bool] = False
12✔
354

355
    __test__ = False
12✔
356

357
    @classproperty
12✔
358
    def tool_name(cls) -> str:
12✔
359
        return cls.tool_subsystem.options_scope
1✔
360

361
    @distinct_union_type_per_subclass(in_scope_types=[EnvironmentName])
12✔
362
    class PartitionRequest(_PartitionFieldSetsRequestBase[_TestFieldSetT]):
12✔
363
        def metadata(self) -> dict[str, Any]:
12✔
364
            return {"addresses": [field_set.address.spec for field_set in self.field_sets]}
3✔
365

366
    @distinct_union_type_per_subclass(in_scope_types=[EnvironmentName])
12✔
367
    class Batch(_BatchBase[_TestFieldSetT, PartitionMetadataT]):
12✔
368
        @property
12✔
369
        def single_element(self) -> _TestFieldSetT:
12✔
370
            """Return the single element of this batch.
371

372
            NOTE: Accessing this property will raise a `TypeError` if this `Batch` contains
373
            >1 elements. It is only safe to be used by test runners utilizing the "default"
374
            one-input-per-partition partitioner type.
375
            """
376

377
            if len(self.elements) != 1:
8✔
378
                description = ""
×
379
                if self.partition_metadata.description:
×
380
                    description = f" from partition '{self.partition_metadata.description}'"
×
381
                raise TypeError(
×
382
                    f"Expected a single element in batch{description}, but found {len(self.elements)}"
383
                )
384

385
            return self.elements[0]
8✔
386

387
        @property
12✔
388
        def description(self) -> str:
12✔
389
            if self.partition_metadata and self.partition_metadata.description:
1✔
390
                return f"test batch from partition '{self.partition_metadata.description}'"
×
391
            return "test batch"
1✔
392

393
        def debug_hint(self) -> str:
12✔
394
            if len(self.elements) == 1:
9✔
395
                return self.elements[0].address.spec
9✔
396

397
            return f"{self.elements[0].address.spec} and {len(self.elements) - 1} other files"
4✔
398

399
        def metadata(self) -> dict[str, Any]:
12✔
400
            return {
9✔
401
                "addresses": [field_set.address.spec for field_set in self.elements],
402
                "partition_description": self.partition_metadata.description,
403
            }
404

405
    @classmethod
12✔
406
    def rules(cls) -> Iterable:
12✔
407
        yield from cls.partitioner_type.default_rules(cls, by_file=False)
11✔
408

409
        yield UnionRule(TestFieldSet, cls.field_set_type)
11✔
410
        yield UnionRule(TestRequest, cls)
11✔
411
        yield UnionRule(TestRequest.PartitionRequest, cls.PartitionRequest)
11✔
412
        yield UnionRule(TestRequest.Batch, cls.Batch)
11✔
413

414
        if not cls.supports_debug:
11✔
415
            yield from _unsupported_debug_rules(cls)
6✔
416

417
        if not cls.supports_debug_adapter:
11✔
418
            yield from _unsupported_debug_adapter_rules(cls)
9✔
419

420

421
@rule(polymorphic=True)
12✔
422
async def partition_tests(req: TestRequest.PartitionRequest) -> Partitions:
12✔
423
    raise NotImplementedError()
×
424

425

426
@rule(polymorphic=True)
12✔
427
async def test_batch_to_debug_request(batch: TestRequest.Batch) -> TestDebugRequest:
12✔
428
    raise NotImplementedError()
×
429

430

431
@rule(polymorphic=True)
12✔
432
async def test_batch_to_debug_adapter_request(batch: TestRequest.Batch) -> TestDebugAdapterRequest:
12✔
433
    raise NotImplementedError()
×
434

435

436
@rule(polymorphic=True)
12✔
437
async def run_test_batch(batch: TestRequest.Batch) -> TestResult:
12✔
438
    raise NotImplementedError()
×
439

440

441
class CoverageData(ABC):
12✔
442
    """Base class for inputs to a coverage report.
443

444
    Subclasses should add whichever fields they require - snapshots of coverage output, XML files,
445
    etc.
446
    """
447

448

449
_CD = TypeVar("_CD", bound=CoverageData)
12✔
450

451

452
@union(in_scope_types=[EnvironmentName])
12✔
453
class CoverageDataCollection(Collection[_CD]):
12✔
454
    element_type: ClassVar[type[_CD]]
12✔
455

456

457
@dataclass(frozen=True)
12✔
458
class CoverageReport(ABC):
12✔
459
    """Represents a code coverage report that can be materialized to the terminal or disk."""
460

461
    # Some coverage systems can determine, based on a configurable threshold, whether coverage
462
    # was sufficient or not. The test goal will fail the build if coverage was deemed insufficient.
463
    coverage_insufficient: bool
12✔
464

465
    def materialize(self, console: Console, workspace: Workspace) -> PurePath | None:
12✔
466
        """Materialize this code coverage report to the terminal or disk.
467

468
        :param console: A handle to the terminal.
469
        :param workspace: A handle to local disk.
470
        :return: If a report was materialized to disk, the path of the file in the report one might
471
                 open first to start examining the report.
472
        """
473
        ...
474

475
    def get_artifact(self) -> tuple[str, Snapshot] | None:
12✔
476
        return None
×
477

478

479
@dataclass(frozen=True)
12✔
480
class ConsoleCoverageReport(CoverageReport):
12✔
481
    """Materializes a code coverage report to the terminal."""
482

483
    report: str
12✔
484

485
    def materialize(self, console: Console, workspace: Workspace) -> None:
12✔
486
        console.print_stderr(f"\n{self.report}")
1✔
487
        return None
1✔
488

489

490
@dataclass(frozen=True)
12✔
491
class FilesystemCoverageReport(CoverageReport):
12✔
492
    """Materializes a code coverage report to disk."""
493

494
    result_snapshot: Snapshot
12✔
495
    directory_to_materialize_to: PurePath
12✔
496
    report_file: PurePath | None
12✔
497
    report_type: str
12✔
498

499
    def materialize(self, console: Console, workspace: Workspace) -> PurePath | None:
12✔
500
        workspace.write_digest(
×
501
            self.result_snapshot.digest, path_prefix=str(self.directory_to_materialize_to)
502
        )
503
        console.print_stderr(
×
504
            f"\nWrote {self.report_type} coverage report to `{self.directory_to_materialize_to}`"
505
        )
506
        return self.report_file
×
507

508
    def get_artifact(self) -> tuple[str, Snapshot] | None:
12✔
509
        return f"coverage_{self.report_type}", self.result_snapshot
1✔
510

511

512
@dataclass(frozen=True)
12✔
513
class CoverageReports(EngineAwareReturnType):
12✔
514
    reports: tuple[CoverageReport, ...]
12✔
515

516
    @property
12✔
517
    def coverage_insufficient(self) -> bool:
12✔
518
        """Whether to fail the build due to insufficient coverage."""
519
        return any(report.coverage_insufficient for report in self.reports)
1✔
520

521
    def materialize(self, console: Console, workspace: Workspace) -> tuple[PurePath, ...]:
12✔
522
        report_paths = []
1✔
523
        for report in self.reports:
1✔
524
            report_path = report.materialize(console, workspace)
1✔
525
            if report_path:
1✔
526
                report_paths.append(report_path)
×
527
        return tuple(report_paths)
1✔
528

529
    def artifacts(self) -> dict[str, Snapshot | FileDigest] | None:
12✔
530
        artifacts: dict[str, Snapshot | FileDigest] = {}
1✔
531
        for report in self.reports:
1✔
532
            artifact = report.get_artifact()
1✔
533
            if not artifact:
1✔
534
                continue
×
535
            artifacts[artifact[0]] = artifact[1]
1✔
536
        return artifacts or None
1✔
537

538

539
@rule(polymorphic=True)
12✔
540
async def create_coverage_report(req: CoverageDataCollection) -> CoverageReports:
12✔
541
    raise NotImplementedError()
×
542

543

544
class TestSubsystem(GoalSubsystem):
12✔
545
    name = "test"
12✔
546
    help = "Run tests."
12✔
547

548
    # Prevent this class from being detected by pytest as a test class.
549
    __test__ = False
12✔
550

551
    @classmethod
12✔
552
    def activated(cls, union_membership: UnionMembership) -> bool:
12✔
553
        return TestRequest in union_membership
×
554

555
    class EnvironmentAware:
12✔
556
        extra_env_vars = StrListOption(
12✔
557
            help=softwrap(
558
                f"""
559
                Additional environment variables to include in test processes.
560

561
                {EXTRA_ENV_VARS_USAGE_HELP}
562
                """
563
            ),
564
        )
565

566
    debug = BoolOption(
12✔
567
        default=False,
568
        help=softwrap(
569
            """
570
            Run tests sequentially in an interactive process. This is necessary, for
571
            example, when you add breakpoints to your code.
572
            """
573
        ),
574
    )
575
    # See also `run.py`'s same option
576
    debug_adapter = BoolOption(
12✔
577
        default=False,
578
        help=softwrap(
579
            """
580
            Run tests sequentially in an interactive process, using a Debug Adapter
581
            (https://microsoft.github.io/debug-adapter-protocol/) for the language if supported.
582

583
            The interactive process used will be immediately blocked waiting for a client before
584
            continuing.
585

586
            This option implies `--debug`.
587
            """
588
        ),
589
    )
590
    force = BoolOption(
12✔
591
        default=False,
592
        help="Force the tests to run, even if they could be satisfied from cache.",
593
    )
594

595
    @property
12✔
596
    def default_process_cache_scope(self) -> ProcessCacheScope:
12✔
597
        return ProcessCacheScope.PER_SESSION if self.force else ProcessCacheScope.SUCCESSFUL
9✔
598

599
    output = EnumOption(
12✔
600
        default=ShowOutput.FAILED,
601
        help="Show stdout/stderr for these tests.",
602
    )
603
    use_coverage = BoolOption(
12✔
604
        default=False,
605
        help="Generate a coverage report if the test runner supports it.",
606
    )
607
    open_coverage = BoolOption(
12✔
608
        default=False,
609
        help=softwrap(
610
            """
611
            If a coverage report file is generated, open it on the local system if the
612
            system supports this.
613
            """
614
        ),
615
    )
616
    report = BoolOption(default=False, advanced=True, help="Write test reports to `--report-dir`.")
12✔
617
    default_report_path = str(PurePath("{distdir}", "test", "reports"))
12✔
618
    _report_dir = StrOption(
12✔
619
        default=default_report_path,
620
        advanced=True,
621
        help="Path to write test reports to. Must be relative to the build root.",
622
    )
623
    shard = StrOption(
12✔
624
        default="",
625
        help=softwrap(
626
            """
627
            A shard specification of the form "k/N", where N is a positive integer and k is a
628
            non-negative integer less than N.
629

630
            If set, the request input targets will be deterministically partitioned into N disjoint
631
            subsets of roughly equal size, and only the k'th subset will be used, with all others
632
            discarded.
633

634
            Useful for splitting large numbers of test files across multiple machines in CI.
635
            For example, you can run three shards with `--shard=0/3`, `--shard=1/3`, `--shard=2/3`.
636

637
            Note that the shards are roughly equal in size as measured by number of files.
638
            No attempt is made to consider the size of different files, the time they have
639
            taken to run in the past, or other such sophisticated measures.
640
            """
641
        ),
642
    )
643
    timeouts = BoolOption(
12✔
644
        default=True,
645
        help=softwrap(
646
            """
647
            Enable test target timeouts. If timeouts are enabled then test targets with a
648
            `timeout=` parameter set on their target will time out after the given number of
649
            seconds if not completed. If no timeout is set, then either the default timeout
650
            is used or no timeout is configured.
651
            """
652
        ),
653
    )
654
    timeout_default = IntOption(
12✔
655
        default=None,
656
        advanced=True,
657
        help=softwrap(
658
            """
659
            The default timeout (in seconds) for a test target if the `timeout` field is not
660
            set on the target.
661
            """
662
        ),
663
    )
664
    timeout_maximum = IntOption(
12✔
665
        default=None,
666
        advanced=True,
667
        help="The maximum timeout (in seconds) that may be used on a test target.",
668
    )
669
    _attempts_default = IntOption(
12✔
670
        default=1,
671
        help=softwrap(
672
            """
673
            The number of attempts to run tests, in case of a test failure.
674
            Tests that were retried will include the number of attempts in the summary output.
675
            """
676
        ),
677
    )
678

679
    batch_size = IntOption(
12✔
680
        "--batch-size",
681
        default=128,
682
        advanced=True,
683
        help=softwrap(
684
            """
685
            The target maximum number of files to be included in each run of batch-enabled
686
            test runners.
687

688
            Some test runners can execute tests from multiple files in a single run. Test
689
            implementations will return all tests that _can_ run together as a single group -
690
            and then this may be further divided into smaller batches, based on this option.
691
            This is done:
692

693
              1. to avoid OS argument length limits (in processes which don't support argument files)
694
              2. to support more stable cache keys than would be possible if all files were operated \
695
                 on in a single batch
696
              3. to allow for parallelism in test runners which don't have internal \
697
                 parallelism, or -- if they do support internal parallelism -- to improve scheduling \
698
                 behavior when multiple processes are competing for cores and so internal parallelism \
699
                 cannot be used perfectly
700

701
            In order to improve cache hit rates (see 2.), batches are created at stable boundaries,
702
            and so this value is only a "target" max batch size (rather than an exact value).
703

704
            NOTE: This parameter has no effect on test runners/plugins that do not implement support
705
            for batched testing.
706
            """
707
        ),
708
    )
709

710
    show_rerun_command = BoolOption(
12✔
711
        default="CI" in os.environ,
712
        advanced=True,
713
        help=softwrap(
714
            f"""
715
            If tests fail, show an appropriate `{bin_name()} {name} ...` invocation to rerun just
716
            those tests.
717

718
            This is to make it easy to run those tests on a new machine (for instance, run tests
719
            locally if they fail in CI): caching of successful tests means that rerunning the exact
720
            same command on the same machine will already automatically only rerun the failures.
721

722
            This defaults to `True` when running in CI (as determined by the `CI` environment
723
            variable being set) but `False` elsewhere.
724
            """
725
        ),
726
    )
727
    experimental_report_test_result_info = BoolOption(
12✔
728
        default=False,
729
        advanced=True,
730
        help=softwrap(
731
            """
732
            Report information about the test results.
733

734
            For now, it reports only the source from where the test results were fetched. When running tests,
735
            they may be executed locally or remotely, but if there are results of previous runs available,
736
            they may be retrieved from the local or remote cache, or be memoized. Knowing where the test
737
            results come from might be useful when evaluating the efficiency of the cache and the nature of
738
            the changes in the source code that may lead to frequent cache invalidations.
739
            """
740
        ),
741
    )
742
    show_all_batch_targets = BoolOption(
12✔
743
        default=False,
744
        help=softwrap(
745
            """
746
            When tests are batched via `batch_compatibility_tag`, show all target addresses in
747
            the batch in test result summaries, workunit descriptions, and warning messages.
748

749
            By default, batched test descriptions are truncated to show only the first target
750
            address (e.g. "path/to:tests and 3 other files"). When this option is enabled, all
751
            target addresses in the batch are listed (e.g.
752
            "path/to:tests, path/to:tests2, path/to:tests3, path/to:tests4").
753

754
            This is useful for CI environments where you need to know exactly which targets
755
            are grouped together in each test invocation.
756
            """
757
        ),
758
    )
759

760
    def report_dir(self, distdir: DistDir) -> PurePath:
12✔
761
        return PurePath(self._report_dir.format(distdir=distdir.relpath))
1✔
762

763
    @property
12✔
764
    def attempts_default(self):
12✔
765
        if self._attempts_default < 1:
9✔
766
            raise ValueError(
×
767
                "The `--test-attempts-default` option must have a value equal or greater than 1. "
768
                f"Instead, it was set to {self._attempts_default}."
769
            )
770
        return self._attempts_default
9✔
771

772

773
class Test(Goal):
12✔
774
    __test__ = False
12✔
775

776
    subsystem_cls = TestSubsystem
12✔
777
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
12✔
778

779

780
class TestTimeoutField(IntField, metaclass=ABCMeta):
12✔
781
    """Base field class for implementing timeouts for test targets.
782

783
    Each test target that wants to implement a timeout needs to provide with its own concrete field
784
    class extending this one.
785
    """
786

787
    __test__ = False
12✔
788

789
    alias = "timeout"
12✔
790
    required = False
12✔
791
    valid_numbers = ValidNumbers.positive_only
12✔
792
    help = help_text(
12✔
793
        """
794
        A timeout (in seconds) used by each test file belonging to this target.
795

796
        If unset, will default to `[test].timeout_default`; if that option is also unset,
797
        then the test will never time out. Will never exceed `[test].timeout_maximum`. Only
798
        applies if the option `--test-timeouts` is set to true (the default).
799
        """
800
    )
801

802
    def calculate_from_global_options(self, test: TestSubsystem) -> int | None:
12✔
803
        if not test.timeouts:
7✔
804
            return None
1✔
805
        if self.value is None:
7✔
806
            if test.timeout_default is None:
7✔
807
                return None
7✔
808
            result = test.timeout_default
1✔
809
        else:
810
            result = self.value
1✔
811
        if test.timeout_maximum is not None:
1✔
812
            return min(result, test.timeout_maximum)
1✔
813
        return result
1✔
814

815

816
class TestExtraEnvVarsField(StringSequenceField, metaclass=ABCMeta):
12✔
817
    alias = "extra_env_vars"
12✔
818
    help = help_text(
12✔
819
        f"""
820
        Additional environment variables to include in test processes.
821

822
        {EXTRA_ENV_VARS_USAGE_HELP}
823

824
        This will be merged with and override values from `[test].extra_env_vars`.
825
        """
826
    )
827

828
    def sorted(self) -> tuple[str, ...]:
12✔
829
        return tuple(sorted(self.value or ()))
3✔
830

831

832
class TestsBatchCompatibilityTagField(StringField, metaclass=ABCMeta):
12✔
833
    alias = "batch_compatibility_tag"
12✔
834

835
    @classmethod
12✔
836
    def format_help(cls, target_name: str, test_runner_name: str) -> str:
12✔
837
        return f"""
12✔
838
        An arbitrary value used to mark the test files belonging to this target as valid for
839
        batched execution.
840

841
        It's _sometimes_ safe to run multiple `{target_name}`s within a single test runner process,
842
        and doing so can give significant wins by allowing reuse of expensive test setup /
843
        teardown logic. To opt into this behavior, set this field to an arbitrary non-empty
844
        string on all the `{target_name}` targets that are safe/compatible to run in the same
845
        process.
846

847
        If this field is left unset on a target, the target is assumed to be incompatible with
848
        all others and will run in a dedicated `{test_runner_name}` process.
849

850
        If this field is set on a target, and its value is different from the value on some
851
        other test `{target_name}`, then the two targets are explicitly incompatible and are guaranteed
852
        to not run in the same `{test_runner_name}` process.
853

854
        If this field is set on a target, and its value is the same as the value on some other
855
        `{target_name}`, then the two targets are explicitly compatible and _may_ run in the same
856
        test runner process. Compatible tests may not end up in the same test runner batch if:
857

858
          * There are "too many" compatible tests in a partition, as determined by the \
859
            `[test].batch_size` config parameter, or
860
          * Compatible tests have some incompatibility in Pants metadata (i.e. different \
861
            `resolve`s or `extra_env_vars`).
862

863
        When tests with the same `batch_compatibility_tag` have incompatibilities in some other
864
        Pants metadata, they will be automatically split into separate batches. This way you can
865
        set a high-level `batch_compatibility_tag` using `__defaults__` and then have tests
866
        continue to work as you tweak BUILD metadata on specific targets.
867
        """
868

869

870
async def _get_test_batches(
12✔
871
    core_request_types: Iterable[type[TestRequest]],
872
    targets_to_field_sets: TargetRootsToFieldSets,
873
    local_environment_name: ChosenLocalEnvironmentName,
874
    test_subsystem: TestSubsystem,
875
) -> list[TestRequest.Batch]:
876
    def partitions_call(request_type: type[TestRequest]) -> Coroutine[Any, Any, Partitions]:
1✔
877
        partition_type = cast(TestRequest, request_type)
1✔
878
        field_set_type = partition_type.field_set_type
1✔
879
        applicable_field_sets: list[TestFieldSet] = []
1✔
880
        for target, field_sets in targets_to_field_sets.mapping.items():
1✔
881
            if field_set_type.is_applicable(target):
1✔
882
                applicable_field_sets.extend(field_sets)
1✔
883

884
        partition_request = partition_type.PartitionRequest(tuple(applicable_field_sets))
1✔
885
        return partition_tests(
1✔
886
            **implicitly(
887
                {
888
                    partition_request: TestRequest.PartitionRequest,
889
                    local_environment_name.val: EnvironmentName,
890
                },
891
            )
892
        )
893

894
    all_partitions = await concurrently(
1✔
895
        partitions_call(request_type) for request_type in core_request_types
896
    )
897

898
    return [
1✔
899
        request_type.Batch(
900
            cast(TestRequest, request_type).tool_name, tuple(batch), partition.metadata
901
        )
902
        for request_type, partitions in zip(core_request_types, all_partitions)
903
        for partition in partitions
904
        for batch in partition_sequentially(
905
            partition.elements,
906
            key=lambda x: str(x.address) if isinstance(x, FieldSet) else str(x),
907
            size_target=test_subsystem.batch_size,
908
            size_max=2 * test_subsystem.batch_size,
909
        )
910
    ]
911

912

913
async def _run_debug_tests(
12✔
914
    batches: Iterable[TestRequest.Batch],
915
    environment_names: Sequence[EnvironmentName],
916
    test_subsystem: TestSubsystem,
917
    debug_adapter: DebugAdapterSubsystem,
918
) -> Test:
919
    debug_requests = await concurrently(
1✔
920
        (
921
            test_batch_to_debug_request(
922
                **implicitly({batch: TestRequest.Batch, environment_name: EnvironmentName})
923
            )
924
            if not test_subsystem.debug_adapter
925
            else test_batch_to_debug_adapter_request(
926
                **implicitly({batch: TestRequest.Batch, environment_name: EnvironmentName})
927
            )
928
        )
929
        for batch, environment_name in zip(batches, environment_names)
930
    )
931
    exit_code = 0
1✔
932
    for debug_request, environment_name in zip(debug_requests, environment_names):
1✔
933
        if test_subsystem.debug_adapter:
1✔
934
            logger.info(
×
935
                softwrap(
936
                    f"""
937
                    Launching debug adapter at '{debug_adapter.host}:{debug_adapter.port}',
938
                    which will wait for a client connection...
939
                    """
940
                )
941
            )
942

943
        debug_result = await run_interactive_process_in_environment(
1✔
944
            debug_request.process, environment_name
945
        )
946
        if debug_result.exit_code != 0:
1✔
947
            exit_code = debug_result.exit_code
×
948
    return Test(exit_code)
1✔
949

950

951
def _save_test_result_info_report_file(run_id: RunId, results: dict[str, dict]) -> None:
12✔
952
    """Save a JSON file with the information about the test results."""
953
    timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
×
954
    obj = json.dumps({"timestamp": timestamp, "run_id": run_id, "info": results})
×
955
    with safe_open(f"test_result_info_report_runid{run_id}_{timestamp}.json", "w") as fh:
×
956
        fh.write(obj)
×
957

958

959
@goal_rule
12✔
960
async def run_tests(
12✔
961
    console: Console,
962
    test_subsystem: TestSubsystem,
963
    debug_adapter: DebugAdapterSubsystem,
964
    workspace: Workspace,
965
    union_membership: UnionMembership,
966
    distdir: DistDir,
967
    run_id: RunId,
968
    local_environment_name: ChosenLocalEnvironmentName,
969
) -> Test:
970
    if test_subsystem.debug_adapter:
1✔
971
        goal_description = f"`{test_subsystem.name} --debug-adapter`"
×
972
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.error
×
973
    elif test_subsystem.debug:
1✔
974
        goal_description = f"`{test_subsystem.name} --debug`"
1✔
975
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.error
1✔
976
    else:
977
        goal_description = f"The `{test_subsystem.name}` goal"
1✔
978
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.warn
1✔
979

980
    shard, num_shards = parse_shard_spec(test_subsystem.shard, "the [test].shard option")
1✔
981
    targets_to_valid_field_sets = await find_valid_field_sets_for_target_roots(
1✔
982
        TargetRootsToFieldSetsRequest(
983
            TestFieldSet,
984
            goal_description=goal_description,
985
            no_applicable_targets_behavior=no_applicable_targets_behavior,
986
            shard=shard,
987
            num_shards=num_shards,
988
        ),
989
        **implicitly(),
990
    )
991

992
    request_types = union_membership.get(TestRequest)
1✔
993
    test_batches = await _get_test_batches(
1✔
994
        request_types,
995
        targets_to_valid_field_sets,
996
        local_environment_name,
997
        test_subsystem,
998
    )
999

1000
    environment_names = await concurrently(
1✔
1001
        resolve_single_environment_name(
1002
            SingleEnvironmentNameRequest.from_field_sets(batch.elements, batch.description)
1003
        )
1004
        for batch in test_batches
1005
    )
1006

1007
    if test_subsystem.debug or test_subsystem.debug_adapter:
1✔
1008
        return await _run_debug_tests(
1✔
1009
            test_batches, environment_names, test_subsystem, debug_adapter
1010
        )
1011

1012
    to_test = list(zip(test_batches, environment_names))
1✔
1013
    results = await concurrently(
1✔
1014
        run_test_batch(
1015
            **implicitly(
1016
                {
1017
                    batch: TestRequest.Batch,
1018
                    environment_name: EnvironmentName,
1019
                }
1020
            )
1021
        )
1022
        for batch, environment_name in to_test
1023
    )
1024

1025
    # Print summary.
1026
    exit_code = 0
1✔
1027
    if results:
1✔
1028
        console.print_stderr("")
1✔
1029
    if test_subsystem.experimental_report_test_result_info:
1✔
1030
        test_result_info = {}
×
1031
    for result in sorted(results):
1✔
1032
        if result.exit_code is None:
1✔
1033
            # We end up here, e.g., if we implemented test discovery and found no tests.
1034
            continue
×
1035
        if result.exit_code != 0:
1✔
1036
            exit_code = result.exit_code
1✔
1037
        if result.result_metadata is None:
1✔
1038
            # We end up here, e.g., if compilation failed during self-implemented test discovery.
1039
            continue
1✔
1040
        if test_subsystem.experimental_report_test_result_info:
1✔
1041
            test_result_info[result.addresses[0].spec] = {
×
1042
                "source": result.result_metadata.source(run_id).value
1043
            }
1044
        console.print_stderr(
1✔
1045
            _format_test_summary(result, run_id, console, test_subsystem.show_all_batch_targets)
1046
        )
1047

1048
        if result.extra_output and result.extra_output.files:
1✔
1049
            path_prefix = str(distdir.relpath / "test" / result.path_safe_description)
×
1050
            workspace.write_digest(
×
1051
                result.extra_output.digest,
1052
                path_prefix=path_prefix,
1053
            )
1054
            if result.log_extra_output:
×
1055
                logger.info(
×
1056
                    f"Wrote extra output from test `{result.addresses[0]}` to `{path_prefix}`."
1057
                )
1058

1059
    rerun_command = _format_test_rerun_command(results)
1✔
1060
    if rerun_command and test_subsystem.show_rerun_command:
1✔
1061
        console.print_stderr(f"\n{rerun_command}")
1✔
1062

1063
    if test_subsystem.report:
1✔
1064
        report_dir = test_subsystem.report_dir(distdir)
1✔
1065
        merged_reports = await merge_digests(
1✔
1066
            MergeDigests(result.xml_results.digest for result in results if result.xml_results)
1067
        )
1068
        workspace.write_digest(merged_reports, path_prefix=str(report_dir))
1✔
1069
        console.print_stderr(f"\nWrote test reports to {report_dir}")
1✔
1070

1071
    if test_subsystem.use_coverage:
1✔
1072
        # NB: We must pre-sort the data for itertools.groupby() to work properly, using the same
1073
        # key function for both. However, you can't sort by `types`, so we call `str()` on it.
1074
        all_coverage_data = sorted(
1✔
1075
            (result.coverage_data for result in results if result.coverage_data is not None),
1076
            key=lambda cov_data: str(type(cov_data)),
1077
        )
1078

1079
        coverage_types_to_collection_types = {
1✔
1080
            collection_cls.element_type: collection_cls  # type: ignore[misc]
1081
            for collection_cls in union_membership.get(CoverageDataCollection)
1082
        }
1083
        coverage_collections = []
1✔
1084
        for data_cls, data in itertools.groupby(all_coverage_data, lambda data: type(data)):
1✔
1085
            collection_cls = coverage_types_to_collection_types[data_cls]  # type: ignore[index]
1✔
1086
            coverage_collections.append(collection_cls(data))
1✔
1087
        # We can create multiple reports for each coverage data (e.g., console, xml, html)
1088
        coverage_reports_collections = await concurrently(
1✔
1089
            create_coverage_report(
1090
                **implicitly(
1091
                    {
1092
                        coverage_collection: CoverageDataCollection,
1093
                        local_environment_name.val: EnvironmentName,
1094
                    }
1095
                )
1096
            )
1097
            for coverage_collection in coverage_collections
1098
        )
1099

1100
        coverage_report_files: list[PurePath] = []
1✔
1101
        for coverage_reports in coverage_reports_collections:
1✔
1102
            report_files = coverage_reports.materialize(console, workspace)
1✔
1103
            coverage_report_files.extend(report_files)
1✔
1104

1105
        if coverage_report_files and test_subsystem.open_coverage:
1✔
1106
            open_files = await find_open_program(
×
1107
                OpenFilesRequest(coverage_report_files, error_if_open_not_found=False),
1108
                **implicitly(),
1109
            )
1110
            for process in open_files.processes:
×
1111
                _ = await run_interactive_process_in_environment(
×
1112
                    process, local_environment_name.val
1113
                )
1114

1115
        for coverage_reports in coverage_reports_collections:
1✔
1116
            if coverage_reports.coverage_insufficient:
1✔
1117
                logger.error(
×
1118
                    softwrap(
1119
                        """
1120
                        Test goal failed due to insufficient coverage.
1121
                        See coverage reports for details.
1122
                        """
1123
                    )
1124
                )
1125
                # coverage.py uses 2 to indicate failure due to insufficient coverage.
1126
                # We may as well follow suit in the general case, for all languages.
1127
                exit_code = 2
×
1128

1129
    if test_subsystem.experimental_report_test_result_info:
1✔
1130
        _save_test_result_info_report_file(run_id, test_result_info)
×
1131

1132
    return Test(exit_code)
1✔
1133

1134

1135
_SOURCE_MAP = {
12✔
1136
    ProcessResultMetadata.Source.MEMOIZED: "memoized",
1137
    ProcessResultMetadata.Source.RAN: "ran",
1138
    ProcessResultMetadata.Source.HIT_LOCALLY: "cached locally",
1139
    ProcessResultMetadata.Source.HIT_REMOTELY: "cached remotely",
1140
}
1141

1142

1143
def _format_test_summary(
12✔
1144
    result: TestResult,
1145
    run_id: RunId,
1146
    console: Console,
1147
    show_all_batch_targets: bool = False,
1148
) -> str:
1149
    """Format the test summary printed to the console."""
1150
    assert result.result_metadata is not None, (
1✔
1151
        "Skipped test results should not be outputted in the test summary"
1152
    )
1153
    succeeded = result.exit_code == 0
1✔
1154
    retried = len(result.process_results) > 1
1✔
1155

1156
    if succeeded:
1✔
1157
        if not retried:
1✔
1158
            sigil = console.sigil_succeeded()
1✔
1159
        else:
1160
            sigil = console.sigil_succeeded_with_edits()
×
1161
        status = "succeeded"
1✔
1162
    else:
1163
        sigil = console.sigil_failed()
1✔
1164
        status = "failed"
1✔
1165

1166
    if retried:
1✔
1167
        attempt_msg = f" after {len(result.process_results)} attempts"
×
1168
    else:
1169
        attempt_msg = ""
1✔
1170

1171
    environment = result.result_metadata.execution_environment.name
1✔
1172
    environment_type = result.result_metadata.execution_environment.environment_type
1✔
1173
    source = result.result_metadata.source(run_id)
1✔
1174
    source_str = _SOURCE_MAP[source]
1✔
1175
    if environment:
1✔
1176
        preposition = "in" if source == ProcessResultMetadata.Source.RAN else "for"
1✔
1177
        source_desc = (
1✔
1178
            f" ({source_str} {preposition} {environment_type} environment `{environment}`)"
1179
        )
1180
    elif source == ProcessResultMetadata.Source.RAN:
1✔
1181
        source_desc = ""
1✔
1182
    else:
1183
        source_desc = f" ({source_str})"
1✔
1184

1185
    elapsed_print = ""
1✔
1186
    total_elapsed_ms = result.result_metadata.total_elapsed_ms
1✔
1187
    if total_elapsed_ms is not None:
1✔
1188
        elapsed_secs = total_elapsed_ms / 1000
1✔
1189
        elapsed_print = f"in {elapsed_secs:.2f}s"
1✔
1190

1191
    if show_all_batch_targets and len(result.addresses) > 1:
1✔
NEW
1192
        description = ", ".join(addr.spec for addr in result.addresses)
×
1193
    else:
1194
        description = result.description
1✔
1195

1196
    return f"{sigil} {description} {status}{attempt_msg} {elapsed_print}{source_desc}."
1✔
1197

1198

1199
def _format_test_rerun_command(results: Iterable[TestResult]) -> None | str:
12✔
1200
    failures = [result for result in results if result.exit_code not in (None, 0)]
1✔
1201
    if not failures:
1✔
1202
        return None
1✔
1203

1204
    # format an invocation like `pants test path/to/first:address path/to/second:address ...`
1205
    addresses = sorted(shlex.quote(str(addr)) for result in failures for addr in result.addresses)
1✔
1206
    goal = f"{bin_name()} {TestSubsystem.name}"
1✔
1207
    invocation = " ".join([goal, *addresses])
1✔
1208

1209
    return f"To rerun the failing tests, use:\n\n    {invocation}"
1✔
1210

1211

1212
@dataclass(frozen=True)
12✔
1213
class TestExtraEnv:
12✔
1214
    env: EnvironmentVars
12✔
1215

1216

1217
@rule
12✔
1218
async def get_filtered_environment(test_env_aware: TestSubsystem.EnvironmentAware) -> TestExtraEnv:
12✔
1219
    return TestExtraEnv(
9✔
1220
        await environment_vars_subset(
1221
            EnvironmentVarsRequest(test_env_aware.extra_env_vars), **implicitly()
1222
        )
1223
    )
1224

1225

1226
@memoized
12✔
1227
def _unsupported_debug_rules(cls: type[TestRequest]) -> Iterable:
12✔
1228
    """Returns a rule that implements TestDebugRequest by raising an error."""
1229

1230
    @rule(canonical_name_suffix=cls.__name__, _param_type_overrides={"request": cls.Batch})
6✔
1231
    async def get_test_debug_request(request: TestRequest.Batch) -> TestDebugRequest:
6✔
1232
        raise NotImplementedError("Testing this target with --debug is not yet supported.")
×
1233

1234
    return collect_rules(locals())
6✔
1235

1236

1237
@memoized
12✔
1238
def _unsupported_debug_adapter_rules(cls: type[TestRequest]) -> Iterable:
12✔
1239
    """Returns a rule that implements TestDebugAdapterRequest by raising an error."""
1240

1241
    @rule(canonical_name_suffix=cls.__name__, _param_type_overrides={"request": cls.Batch})
9✔
1242
    async def get_test_debug_adapter_request(request: TestRequest.Batch) -> TestDebugAdapterRequest:
9✔
1243
        raise NotImplementedError(
×
1244
            "Testing this target type with a debug adapter is not yet supported."
1245
        )
1246

1247
    return collect_rules(locals())
9✔
1248

1249

1250
# -------------------------------------------------------------------------------------------
1251
# `runtime_package_dependencies` field
1252
# -------------------------------------------------------------------------------------------
1253

1254

1255
class RuntimePackageDependenciesField(SpecialCasedDependencies):
12✔
1256
    alias = "runtime_package_dependencies"
12✔
1257
    help = help_text(
12✔
1258
        f"""
1259
        Addresses to targets that can be built with the `{bin_name()} package` goal and whose
1260
        resulting artifacts should be included in the test run.
1261

1262
        Pants will build the artifacts as if you had run `{bin_name()} package`.
1263
        It will include the results in your test's chroot, using the same name they would normally
1264
        have, but without the `--distdir` prefix (e.g. `dist/`).
1265

1266
        You can include anything that can be built by `{bin_name()} package`, e.g. a `pex_binary`,
1267
        `python_aws_lambda_function`, or an `archive`.
1268
        """
1269
    )
1270

1271

1272
class BuiltPackageDependencies(Collection[BuiltPackage]):
12✔
1273
    pass
12✔
1274

1275

1276
@dataclass(frozen=True)
12✔
1277
class BuildPackageDependenciesRequest:
12✔
1278
    field: RuntimePackageDependenciesField
12✔
1279

1280

1281
@rule(desc="Build runtime package dependencies for tests", level=LogLevel.DEBUG)
12✔
1282
async def build_runtime_package_dependencies(
12✔
1283
    request: BuildPackageDependenciesRequest,
1284
) -> BuiltPackageDependencies:
1285
    unparsed_addresses = request.field.to_unparsed_address_inputs()
3✔
1286
    if not unparsed_addresses:
3✔
1287
        return BuiltPackageDependencies()
×
1288
    tgts = await resolve_targets(**implicitly(unparsed_addresses))
3✔
1289
    field_sets_per_tgt = await find_valid_field_sets(
3✔
1290
        FieldSetsPerTargetRequest(PackageFieldSet, tgts), **implicitly()
1291
    )
1292
    packages = await concurrently(
3✔
1293
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
1294
        for field_set in field_sets_per_tgt.field_sets
1295
    )
1296
    return BuiltPackageDependencies(packages)
3✔
1297

1298

1299
def rules():
12✔
1300
    return [
7✔
1301
        *collect_rules(),
1302
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc