• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25441711719

06 May 2026 02:31PM UTC coverage: 92.915%. Remained the same
25441711719

push

github

web-flow
use sha pin (with comment) format for generated actions (#23312)

Per the GitHub Action best practices we recently enabled at #23249, we
should pin each action to a SHA so that the reference is actually
immutable.

This will -- I hope -- knock out a large chunk of the 421 alerts we
currently get from zizmor. The next followup would then be upgrades and
harmonizing the generated and none-generated pins.

Notice: This idea was suggested by Claude while going over pinact output
and I was surprised to see that post processing the yaml wasn't too
gross.

92206 of 99237 relevant lines covered (92.91%)

4.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/src/python/pants/core/goals/test.py
1
# Copyright 2018 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import itertools
12✔
7
import json
12✔
8
import logging
12✔
9
import os
12✔
10
import shlex
12✔
11
from abc import ABC, ABCMeta
12✔
12
from collections.abc import Coroutine, Iterable, Sequence
12✔
13
from dataclasses import dataclass, field
12✔
14
from datetime import datetime
12✔
15
from enum import Enum
12✔
16
from pathlib import PurePath
12✔
17
from typing import Any, ClassVar, TypeVar, cast
12✔
18

19
from pants.core.environments.rules import (
12✔
20
    ChosenLocalEnvironmentName,
21
    EnvironmentName,
22
    SingleEnvironmentNameRequest,
23
    resolve_single_environment_name,
24
)
25
from pants.core.goals.multi_tool_goal_helper import SkippableSubsystem
12✔
26
from pants.core.goals.package import (
12✔
27
    BuiltPackage,
28
    EnvironmentAwarePackageRequest,
29
    PackageFieldSet,
30
    environment_aware_package,
31
)
32
from pants.core.subsystems.debug_adapter import DebugAdapterSubsystem
12✔
33
from pants.core.target_types import GenericTarget
12✔
34
from pants.core.util_rules.distdir import DistDir
12✔
35
from pants.core.util_rules.env_vars import environment_vars_subset
12✔
36
from pants.core.util_rules.partitions import (
12✔
37
    PartitionerType,
38
    PartitionMetadataT,
39
    Partitions,
40
    _BatchBase,
41
    _PartitionFieldSetsRequestBase,
42
)
43
from pants.engine.addresses import Address
12✔
44
from pants.engine.collection import Collection
12✔
45
from pants.engine.console import Console
12✔
46
from pants.engine.desktop import OpenFilesRequest, find_open_program
12✔
47
from pants.engine.engine_aware import EngineAwareReturnType
12✔
48
from pants.engine.env_vars import EXTRA_ENV_VARS_USAGE_HELP, EnvironmentVars, EnvironmentVarsRequest
12✔
49
from pants.engine.fs import EMPTY_FILE_DIGEST, FileDigest, MergeDigests, Snapshot, Workspace
12✔
50
from pants.engine.goal import Goal, GoalSubsystem
12✔
51
from pants.engine.internals.graph import find_valid_field_sets, resolve_targets, transitive_targets
12✔
52
from pants.engine.internals.session import RunId
12✔
53
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
12✔
54
from pants.engine.intrinsics import merge_digests, run_interactive_process_in_environment
12✔
55
from pants.engine.process import (
12✔
56
    FallibleProcessResult,
57
    InteractiveProcess,
58
    ProcessCacheScope,
59
    ProcessResultMetadata,
60
)
61
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
12✔
62
from pants.engine.target import (
12✔
63
    Dependencies,
64
    DepsTraversalBehavior,
65
    FieldSet,
66
    FieldSetsPerTargetRequest,
67
    IntField,
68
    NoApplicableTargetsBehavior,
69
    ShouldTraverseDepsPredicate,
70
    SourcesField,
71
    SpecialCasedDependencies,
72
    StringField,
73
    StringSequenceField,
74
    Target,
75
    TargetRootsToFieldSets,
76
    TargetRootsToFieldSetsRequest,
77
    TransitiveTargetsRequest,
78
    ValidNumbers,
79
    parse_shard_spec,
80
)
81
from pants.engine.unions import UnionMembership, UnionRule, distinct_union_type_per_subclass, union
12✔
82
from pants.option.option_types import BoolOption, EnumOption, IntOption, StrListOption, StrOption
12✔
83
from pants.util.collections import partition_sequentially
12✔
84
from pants.util.dirutil import safe_open
12✔
85
from pants.util.docutil import bin_name
12✔
86
from pants.util.logging import LogLevel
12✔
87
from pants.util.memo import memoized, memoized_property
12✔
88
from pants.util.meta import classproperty
12✔
89
from pants.util.strutil import Simplifier, help_text, softwrap
12✔
90

91
logger = logging.getLogger(__name__)
12✔
92

93

94
@dataclass(frozen=True)
12✔
95
class TestResult(EngineAwareReturnType):
12✔
96
    # A None exit_code indicates a backend that performs its own test discovery/selection
97
    # (rather than delegating that to the underlying test tool), and discovered no tests.
98
    exit_code: int | None
12✔
99
    stdout_bytes: bytes
12✔
100
    stdout_digest: FileDigest
12✔
101
    stderr_bytes: bytes
12✔
102
    stderr_digest: FileDigest
12✔
103
    addresses: tuple[Address, ...]
12✔
104
    output_setting: ShowOutput
12✔
105
    # A None result_metadata indicates a backend that performs its own test discovery/selection
106
    # and either discovered no tests, or encountered an error, such as a compilation error, in
107
    # the attempt.
108
    result_metadata: ProcessResultMetadata | None  # TODO: Merge elapsed MS of all subproceses
12✔
109
    partition_description: str | None = None
12✔
110

111
    coverage_data: CoverageData | None = None
12✔
112
    # TODO: Rename this to `reports`. There is no guarantee that every language will produce
113
    #  XML reports, or only XML reports.
114
    xml_results: Snapshot | None = None
12✔
115
    # Any extra output (such as from plugins) that the test runner was configured to output.
116
    extra_output: Snapshot | None = None
12✔
117
    # True if the core test rules should log that extra output was written.
118
    log_extra_output: bool = False
12✔
119
    # All results including failed attempts
120
    process_results: tuple[FallibleProcessResult, ...] = field(default_factory=tuple)
12✔
121

122
    output_simplifier: Simplifier = Simplifier()
12✔
123

124
    # Prevent this class from being detected by pytest as a test class.
125
    __test__ = False
12✔
126

127
    @staticmethod
12✔
128
    def no_tests_found(address: Address, output_setting: ShowOutput) -> TestResult:
12✔
129
        """Used when we do test discovery ourselves, and we didn't find any."""
130
        return TestResult(
1✔
131
            exit_code=None,
132
            stdout_bytes=b"",
133
            stderr_bytes=b"",
134
            stdout_digest=EMPTY_FILE_DIGEST,
135
            stderr_digest=EMPTY_FILE_DIGEST,
136
            addresses=(address,),
137
            output_setting=output_setting,
138
            result_metadata=None,
139
        )
140

141
    @staticmethod
12✔
142
    def no_tests_found_in_batch(
12✔
143
        batch: TestRequest.Batch[_TestFieldSetT, Any], output_setting: ShowOutput
144
    ) -> TestResult:
145
        """Used when we do test discovery ourselves, and we didn't find any."""
146
        return TestResult(
1✔
147
            exit_code=None,
148
            stdout_bytes=b"",
149
            stderr_bytes=b"",
150
            stdout_digest=EMPTY_FILE_DIGEST,
151
            stderr_digest=EMPTY_FILE_DIGEST,
152
            addresses=tuple(field_set.address for field_set in batch.elements),
153
            output_setting=output_setting,
154
            result_metadata=None,
155
            partition_description=batch.partition_metadata.description,
156
        )
157

158
    @staticmethod
12✔
159
    def from_fallible_process_result(
12✔
160
        process_results: tuple[FallibleProcessResult, ...],
161
        address: Address,
162
        output_setting: ShowOutput,
163
        *,
164
        coverage_data: CoverageData | None = None,
165
        xml_results: Snapshot | None = None,
166
        extra_output: Snapshot | None = None,
167
        log_extra_output: bool = False,
168
        output_simplifier: Simplifier = Simplifier(),
169
    ) -> TestResult:
170
        process_result = process_results[-1]
8✔
171
        return TestResult(
8✔
172
            exit_code=process_result.exit_code,
173
            stdout_bytes=process_result.stdout,
174
            stdout_digest=process_result.stdout_digest,
175
            stderr_bytes=process_result.stderr,
176
            stderr_digest=process_result.stderr_digest,
177
            addresses=(address,),
178
            output_setting=output_setting,
179
            result_metadata=process_result.metadata,
180
            coverage_data=coverage_data,
181
            xml_results=xml_results,
182
            extra_output=extra_output,
183
            log_extra_output=log_extra_output,
184
            process_results=process_results,
185
            output_simplifier=output_simplifier,
186
        )
187

188
    @staticmethod
12✔
189
    def from_batched_fallible_process_result(
12✔
190
        process_results: tuple[FallibleProcessResult, ...],
191
        batch: TestRequest.Batch[_TestFieldSetT, Any],
192
        output_setting: ShowOutput,
193
        *,
194
        coverage_data: CoverageData | None = None,
195
        xml_results: Snapshot | None = None,
196
        extra_output: Snapshot | None = None,
197
        log_extra_output: bool = False,
198
        output_simplifier: Simplifier = Simplifier(),
199
    ) -> TestResult:
200
        process_result = process_results[-1]
4✔
201
        return TestResult(
4✔
202
            exit_code=process_result.exit_code,
203
            stdout_bytes=process_result.stdout,
204
            stdout_digest=process_result.stdout_digest,
205
            stderr_bytes=process_result.stderr,
206
            stderr_digest=process_result.stderr_digest,
207
            addresses=tuple(field_set.address for field_set in batch.elements),
208
            output_setting=output_setting,
209
            result_metadata=process_result.metadata,
210
            coverage_data=coverage_data,
211
            xml_results=xml_results,
212
            extra_output=extra_output,
213
            log_extra_output=log_extra_output,
214
            output_simplifier=output_simplifier,
215
            partition_description=batch.partition_metadata.description,
216
            process_results=process_results,
217
        )
218

219
    @property
12✔
220
    def description(self) -> str:
12✔
221
        if len(self.addresses) == 1:
1✔
222
            return self.addresses[0].spec
1✔
223

224
        return f"{self.addresses[0].spec} and {len(self.addresses) - 1} other files"
×
225

226
    @property
12✔
227
    def path_safe_description(self) -> str:
12✔
228
        if len(self.addresses) == 1:
×
229
            return self.addresses[0].path_safe_spec
×
230

231
        return f"{self.addresses[0].path_safe_spec}+{len(self.addresses) - 1}"
×
232

233
    def __lt__(self, other: Any) -> bool:
12✔
234
        """We sort first by exit code, then alphanumerically within each group."""
235
        if not isinstance(other, TestResult):
1✔
236
            return NotImplemented
×
237
        if self.exit_code == other.exit_code:
1✔
238
            return self.description < other.description
1✔
239
        if self.exit_code is None:
1✔
240
            return True
×
241
        if other.exit_code is None:
1✔
242
            return False
×
243
        return abs(self.exit_code) < abs(other.exit_code)
1✔
244

245
    def artifacts(self) -> dict[str, FileDigest | Snapshot] | None:
12✔
246
        output: dict[str, FileDigest | Snapshot] = {
9✔
247
            "stdout": self.stdout_digest,
248
            "stderr": self.stderr_digest,
249
        }
250
        if self.xml_results:
9✔
251
            output["xml_results"] = self.xml_results
6✔
252
        return output
9✔
253

254
    def level(self) -> LogLevel:
12✔
255
        if self.exit_code is None:
10✔
256
            return LogLevel.DEBUG
3✔
257
        return LogLevel.INFO if self.exit_code == 0 else LogLevel.ERROR
10✔
258

259
    def _simplified_output(self, v: bytes) -> str:
12✔
260
        return self.output_simplifier.simplify(v.decode(errors="replace"))
7✔
261

262
    @memoized_property
12✔
263
    def stdout_simplified_str(self) -> str:
12✔
264
        return self._simplified_output(self.stdout_bytes)
7✔
265

266
    @memoized_property
12✔
267
    def stderr_simplified_str(self) -> str:
12✔
268
        return self._simplified_output(self.stderr_bytes)
4✔
269

270
    def message(self) -> str:
12✔
271
        if self.exit_code is None:
10✔
272
            return "no tests found."
3✔
273
        status = "succeeded" if self.exit_code == 0 else f"failed (exit code {self.exit_code})"
10✔
274
        message = f"{status}."
10✔
275
        if self.partition_description:
10✔
276
            message += f"\nPartition: {self.partition_description}"
4✔
277
        if self.output_setting == ShowOutput.NONE or (
10✔
278
            self.output_setting == ShowOutput.FAILED and self.exit_code == 0
279
        ):
280
            return message
10✔
281
        output = ""
7✔
282
        if self.stdout_bytes:
7✔
283
            output += f"\n{self.stdout_simplified_str}"
7✔
284
        if self.stderr_bytes:
7✔
285
            output += f"\n{self.stderr_simplified_str}"
4✔
286
        if output:
7✔
287
            output = f"{output.rstrip()}\n\n"
7✔
288
        return f"{message}{output}"
7✔
289

290
    def metadata(self) -> dict[str, Any]:
12✔
291
        return {"addresses": [address.spec for address in self.addresses]}
9✔
292

293
    def cacheable(self) -> bool:
12✔
294
        """Is marked uncacheable to ensure that it always renders."""
295
        return False
9✔
296

297

298
class ShowOutput(Enum):
12✔
299
    """Which tests to emit detailed output for."""
300

301
    ALL = "all"
12✔
302
    FAILED = "failed"
12✔
303
    NONE = "none"
12✔
304

305

306
@dataclass(frozen=True)
12✔
307
class TestDebugRequest:
12✔
308
    process: InteractiveProcess
12✔
309

310
    # Prevent this class from being detected by pytest as a test class.
311
    __test__ = False
12✔
312

313

314
class TestDebugAdapterRequest(TestDebugRequest):
12✔
315
    """Like TestDebugRequest, but launches the test process using the relevant Debug Adapter server.
316

317
    The process should be launched waiting for the client to connect.
318
    """
319

320

321
@union
12✔
322
@dataclass(frozen=True)
12✔
323
class TestFieldSet(FieldSet, metaclass=ABCMeta):
12✔
324
    """The fields necessary to run tests on a target."""
325

326
    sources: SourcesField
12✔
327

328
    __test__ = False
12✔
329

330

331
_TestFieldSetT = TypeVar("_TestFieldSetT", bound=TestFieldSet)
12✔
332

333

334
@union
12✔
335
class TestRequest:
12✔
336
    """Base class for plugin types wanting to be run as part of `test`.
337

338
    Plugins should define a new type which subclasses this type, and set the
339
    appropriate class variables.
340
    E.g.
341
        class DryCleaningRequest(TestRequest):
342
            tool_subsystem = DryCleaningSubsystem
343
            field_set_type = DryCleaningFieldSet
344

345
    Then register the rules which tell Pants about your plugin.
346
    E.g.
347
        def rules():
348
            return [
349
                *collect_rules(),
350
                *DryCleaningRequest.rules(),
351
            ]
352
    """
353

354
    tool_subsystem: ClassVar[type[SkippableSubsystem]]
12✔
355
    field_set_type: ClassVar[type[TestFieldSet]]
12✔
356
    partitioner_type: ClassVar[PartitionerType] = PartitionerType.DEFAULT_ONE_PARTITION_PER_INPUT
12✔
357

358
    supports_debug: ClassVar[bool] = False
12✔
359
    supports_debug_adapter: ClassVar[bool] = False
12✔
360

361
    __test__ = False
12✔
362

363
    @classproperty
12✔
364
    def tool_name(cls) -> str:
12✔
365
        return cls.tool_subsystem.options_scope
1✔
366

367
    @distinct_union_type_per_subclass(in_scope_types=[EnvironmentName])
12✔
368
    class PartitionRequest(_PartitionFieldSetsRequestBase[_TestFieldSetT]):
12✔
369
        def metadata(self) -> dict[str, Any]:
12✔
370
            return {"addresses": [field_set.address.spec for field_set in self.field_sets]}
3✔
371

372
    @distinct_union_type_per_subclass(in_scope_types=[EnvironmentName])
12✔
373
    class Batch(_BatchBase[_TestFieldSetT, PartitionMetadataT]):
12✔
374
        @property
12✔
375
        def single_element(self) -> _TestFieldSetT:
12✔
376
            """Return the single element of this batch.
377

378
            NOTE: Accessing this property will raise a `TypeError` if this `Batch` contains
379
            >1 elements. It is only safe to be used by test runners utilizing the "default"
380
            one-input-per-partition partitioner type.
381
            """
382

383
            if len(self.elements) != 1:
8✔
384
                description = ""
×
385
                if self.partition_metadata.description:
×
386
                    description = f" from partition '{self.partition_metadata.description}'"
×
387
                raise TypeError(
×
388
                    f"Expected a single element in batch{description}, but found {len(self.elements)}"
389
                )
390

391
            return self.elements[0]
8✔
392

393
        @property
12✔
394
        def description(self) -> str:
12✔
395
            if self.partition_metadata and self.partition_metadata.description:
1✔
396
                return f"test batch from partition '{self.partition_metadata.description}'"
×
397
            return "test batch"
1✔
398

399
        def debug_hint(self) -> str:
12✔
400
            if len(self.elements) == 1:
9✔
401
                return self.elements[0].address.spec
9✔
402

403
            return f"{self.elements[0].address.spec} and {len(self.elements) - 1} other files"
4✔
404

405
        def metadata(self) -> dict[str, Any]:
12✔
406
            return {
9✔
407
                "addresses": [field_set.address.spec for field_set in self.elements],
408
                "partition_description": self.partition_metadata.description,
409
            }
410

411
    @classmethod
12✔
412
    def rules(cls) -> Iterable:
12✔
413
        yield from cls.partitioner_type.default_rules(cls, by_file=False)
11✔
414

415
        yield UnionRule(TestFieldSet, cls.field_set_type)
11✔
416
        yield UnionRule(TestRequest, cls)
11✔
417
        yield UnionRule(TestRequest.PartitionRequest, cls.PartitionRequest)
11✔
418
        yield UnionRule(TestRequest.Batch, cls.Batch)
11✔
419

420
        if not cls.supports_debug:
11✔
421
            yield from _unsupported_debug_rules(cls)
6✔
422

423
        if not cls.supports_debug_adapter:
11✔
424
            yield from _unsupported_debug_adapter_rules(cls)
9✔
425

426

427
@rule(polymorphic=True)
12✔
428
async def partition_tests(req: TestRequest.PartitionRequest) -> Partitions:
12✔
429
    raise NotImplementedError()
×
430

431

432
@rule(polymorphic=True)
12✔
433
async def test_batch_to_debug_request(batch: TestRequest.Batch) -> TestDebugRequest:
12✔
434
    raise NotImplementedError()
×
435

436

437
@rule(polymorphic=True)
12✔
438
async def test_batch_to_debug_adapter_request(batch: TestRequest.Batch) -> TestDebugAdapterRequest:
12✔
439
    raise NotImplementedError()
×
440

441

442
@rule(polymorphic=True)
12✔
443
async def run_test_batch(batch: TestRequest.Batch) -> TestResult:
12✔
444
    raise NotImplementedError()
×
445

446

447
class CoverageData(ABC):
12✔
448
    """Base class for inputs to a coverage report.
449

450
    Subclasses should add whichever fields they require - snapshots of coverage output, XML files,
451
    etc.
452
    """
453

454

455
_CD = TypeVar("_CD", bound=CoverageData)
12✔
456

457

458
@union(in_scope_types=[EnvironmentName])
12✔
459
class CoverageDataCollection(Collection[_CD]):
12✔
460
    element_type: ClassVar[type[_CD]]
12✔
461

462

463
@dataclass(frozen=True)
12✔
464
class CoverageReport(ABC):
12✔
465
    """Represents a code coverage report that can be materialized to the terminal or disk."""
466

467
    # Some coverage systems can determine, based on a configurable threshold, whether coverage
468
    # was sufficient or not. The test goal will fail the build if coverage was deemed insufficient.
469
    coverage_insufficient: bool
12✔
470

471
    def materialize(self, console: Console, workspace: Workspace) -> PurePath | None:
12✔
472
        """Materialize this code coverage report to the terminal or disk.
473

474
        :param console: A handle to the terminal.
475
        :param workspace: A handle to local disk.
476
        :return: If a report was materialized to disk, the path of the file in the report one might
477
                 open first to start examining the report.
478
        """
479
        ...
480

481
    def get_artifact(self) -> tuple[str, Snapshot] | None:
12✔
482
        return None
×
483

484

485
@dataclass(frozen=True)
12✔
486
class ConsoleCoverageReport(CoverageReport):
12✔
487
    """Materializes a code coverage report to the terminal."""
488

489
    report: str
12✔
490

491
    def materialize(self, console: Console, workspace: Workspace) -> None:
12✔
492
        console.print_stderr(f"\n{self.report}")
1✔
493
        return None
1✔
494

495

496
@dataclass(frozen=True)
12✔
497
class FilesystemCoverageReport(CoverageReport):
12✔
498
    """Materializes a code coverage report to disk."""
499

500
    result_snapshot: Snapshot
12✔
501
    directory_to_materialize_to: PurePath
12✔
502
    report_file: PurePath | None
12✔
503
    report_type: str
12✔
504

505
    def materialize(self, console: Console, workspace: Workspace) -> PurePath | None:
12✔
506
        workspace.write_digest(
×
507
            self.result_snapshot.digest, path_prefix=str(self.directory_to_materialize_to)
508
        )
509
        console.print_stderr(
×
510
            f"\nWrote {self.report_type} coverage report to `{self.directory_to_materialize_to}`"
511
        )
512
        return self.report_file
×
513

514
    def get_artifact(self) -> tuple[str, Snapshot] | None:
12✔
515
        return f"coverage_{self.report_type}", self.result_snapshot
1✔
516

517

518
@dataclass(frozen=True)
12✔
519
class CoverageReports(EngineAwareReturnType):
12✔
520
    reports: tuple[CoverageReport, ...]
12✔
521

522
    @property
12✔
523
    def coverage_insufficient(self) -> bool:
12✔
524
        """Whether to fail the build due to insufficient coverage."""
525
        return any(report.coverage_insufficient for report in self.reports)
1✔
526

527
    def materialize(self, console: Console, workspace: Workspace) -> tuple[PurePath, ...]:
12✔
528
        report_paths = []
1✔
529
        for report in self.reports:
1✔
530
            report_path = report.materialize(console, workspace)
1✔
531
            if report_path:
1✔
532
                report_paths.append(report_path)
×
533
        return tuple(report_paths)
1✔
534

535
    def artifacts(self) -> dict[str, Snapshot | FileDigest] | None:
12✔
536
        artifacts: dict[str, Snapshot | FileDigest] = {}
1✔
537
        for report in self.reports:
1✔
538
            artifact = report.get_artifact()
1✔
539
            if not artifact:
1✔
540
                continue
×
541
            artifacts[artifact[0]] = artifact[1]
1✔
542
        return artifacts or None
1✔
543

544

545
@rule(polymorphic=True)
12✔
546
async def create_coverage_report(req: CoverageDataCollection) -> CoverageReports:
12✔
547
    raise NotImplementedError()
×
548

549

550
class TestSubsystem(GoalSubsystem):
12✔
551
    name = "test"
12✔
552
    help = "Run tests."
12✔
553

554
    # Prevent this class from being detected by pytest as a test class.
555
    __test__ = False
12✔
556

557
    @classmethod
12✔
558
    def activated(cls, union_membership: UnionMembership) -> bool:
12✔
559
        return TestRequest in union_membership
×
560

561
    class EnvironmentAware:
12✔
562
        extra_env_vars = StrListOption(
12✔
563
            help=softwrap(
564
                f"""
565
                Additional environment variables to include in test processes.
566

567
                {EXTRA_ENV_VARS_USAGE_HELP}
568
                """
569
            ),
570
        )
571

572
    debug = BoolOption(
12✔
573
        default=False,
574
        help=softwrap(
575
            """
576
            Run tests sequentially in an interactive process. This is necessary, for
577
            example, when you add breakpoints to your code.
578
            """
579
        ),
580
    )
581
    # See also `run.py`'s same option
582
    debug_adapter = BoolOption(
12✔
583
        default=False,
584
        help=softwrap(
585
            """
586
            Run tests sequentially in an interactive process, using a Debug Adapter
587
            (https://microsoft.github.io/debug-adapter-protocol/) for the language if supported.
588

589
            The interactive process used will be immediately blocked waiting for a client before
590
            continuing.
591

592
            This option implies `--debug`.
593
            """
594
        ),
595
    )
596
    force = BoolOption(
12✔
597
        default=False,
598
        help="Force the tests to run, even if they could be satisfied from cache.",
599
    )
600

601
    @property
12✔
602
    def default_process_cache_scope(self) -> ProcessCacheScope:
12✔
603
        return ProcessCacheScope.PER_SESSION if self.force else ProcessCacheScope.SUCCESSFUL
9✔
604

605
    output = EnumOption(
12✔
606
        default=ShowOutput.FAILED,
607
        help="Show stdout/stderr for these tests.",
608
    )
609
    use_coverage = BoolOption(
12✔
610
        default=False,
611
        help="Generate a coverage report if the test runner supports it.",
612
    )
613
    open_coverage = BoolOption(
12✔
614
        default=False,
615
        help=softwrap(
616
            """
617
            If a coverage report file is generated, open it on the local system if the
618
            system supports this.
619
            """
620
        ),
621
    )
622
    report = BoolOption(default=False, advanced=True, help="Write test reports to `--report-dir`.")
12✔
623
    default_report_path = str(PurePath("{distdir}", "test", "reports"))
12✔
624
    _report_dir = StrOption(
12✔
625
        default=default_report_path,
626
        advanced=True,
627
        help="Path to write test reports to. Must be relative to the build root.",
628
    )
629
    shard = StrOption(
12✔
630
        default="",
631
        help=softwrap(
632
            """
633
            A shard specification of the form "k/N", where N is a positive integer and k is a
634
            non-negative integer less than N.
635

636
            If set, the request input targets will be deterministically partitioned into N disjoint
637
            subsets of roughly equal size, and only the k'th subset will be used, with all others
638
            discarded.
639

640
            Useful for splitting large numbers of test files across multiple machines in CI.
641
            For example, you can run three shards with `--shard=0/3`, `--shard=1/3`, `--shard=2/3`.
642

643
            Note that the shards are roughly equal in size as measured by number of files.
644
            No attempt is made to consider the size of different files, the time they have
645
            taken to run in the past, or other such sophisticated measures.
646
            """
647
        ),
648
    )
649
    timeouts = BoolOption(
12✔
650
        default=True,
651
        help=softwrap(
652
            """
653
            Enable test target timeouts. If timeouts are enabled then test targets with a
654
            `timeout=` parameter set on their target will time out after the given number of
655
            seconds if not completed. If no timeout is set, then either the default timeout
656
            is used or no timeout is configured.
657
            """
658
        ),
659
    )
660
    timeout_default = IntOption(
12✔
661
        default=None,
662
        advanced=True,
663
        help=softwrap(
664
            """
665
            The default timeout (in seconds) for a test target if the `timeout` field is not
666
            set on the target.
667
            """
668
        ),
669
    )
670
    timeout_maximum = IntOption(
12✔
671
        default=None,
672
        advanced=True,
673
        help="The maximum timeout (in seconds) that may be used on a test target.",
674
    )
675
    _attempts_default = IntOption(
12✔
676
        default=1,
677
        help=softwrap(
678
            """
679
            The number of attempts to run tests, in case of a test failure.
680
            Tests that were retried will include the number of attempts in the summary output.
681
            """
682
        ),
683
    )
684

685
    batch_size = IntOption(
12✔
686
        "--batch-size",
687
        default=128,
688
        advanced=True,
689
        help=softwrap(
690
            """
691
            The target maximum number of files to be included in each run of batch-enabled
692
            test runners.
693

694
            Some test runners can execute tests from multiple files in a single run. Test
695
            implementations will return all tests that _can_ run together as a single group -
696
            and then this may be further divided into smaller batches, based on this option.
697
            This is done:
698

699
              1. to avoid OS argument length limits (in processes which don't support argument files)
700
              2. to support more stable cache keys than would be possible if all files were operated \
701
                 on in a single batch
702
              3. to allow for parallelism in test runners which don't have internal \
703
                 parallelism, or -- if they do support internal parallelism -- to improve scheduling \
704
                 behavior when multiple processes are competing for cores and so internal parallelism \
705
                 cannot be used perfectly
706

707
            In order to improve cache hit rates (see 2.), batches are created at stable boundaries,
708
            and so this value is only a "target" max batch size (rather than an exact value).
709

710
            NOTE: This parameter has no effect on test runners/plugins that do not implement support
711
            for batched testing.
712
            """
713
        ),
714
    )
715

716
    show_rerun_command = BoolOption(
12✔
717
        default="CI" in os.environ,
718
        advanced=True,
719
        help=softwrap(
720
            f"""
721
            If tests fail, show an appropriate `{bin_name()} {name} ...` invocation to rerun just
722
            those tests.
723

724
            This is to make it easy to run those tests on a new machine (for instance, run tests
725
            locally if they fail in CI): caching of successful tests means that rerunning the exact
726
            same command on the same machine will already automatically only rerun the failures.
727

728
            This defaults to `True` when running in CI (as determined by the `CI` environment
729
            variable being set) but `False` elsewhere.
730
            """
731
        ),
732
    )
733
    experimental_report_test_result_info = BoolOption(
12✔
734
        default=False,
735
        advanced=True,
736
        help=softwrap(
737
            """
738
            Report information about the test results.
739

740
            For now, it reports only the source from where the test results were fetched. When running tests,
741
            they may be executed locally or remotely, but if there are results of previous runs available,
742
            they may be retrieved from the local or remote cache, or be memoized. Knowing where the test
743
            results come from might be useful when evaluating the efficiency of the cache and the nature of
744
            the changes in the source code that may lead to frequent cache invalidations.
745
            """
746
        ),
747
    )
748

749
    def report_dir(self, distdir: DistDir) -> PurePath:
12✔
750
        return PurePath(self._report_dir.format(distdir=distdir.relpath))
1✔
751

752
    @property
12✔
753
    def attempts_default(self):
12✔
754
        if self._attempts_default < 1:
9✔
755
            raise ValueError(
×
756
                "The `--test-attempts-default` option must have a value equal or greater than 1. "
757
                f"Instead, it was set to {self._attempts_default}."
758
            )
759
        return self._attempts_default
9✔
760

761

762
class Test(Goal):
12✔
763
    __test__ = False
12✔
764

765
    subsystem_cls = TestSubsystem
12✔
766
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
12✔
767

768

769
class TestTimeoutField(IntField, metaclass=ABCMeta):
12✔
770
    """Base field class for implementing timeouts for test targets.
771

772
    Each test target that wants to implement a timeout needs to provide with its own concrete field
773
    class extending this one.
774
    """
775

776
    __test__ = False
12✔
777

778
    alias = "timeout"
12✔
779
    required = False
12✔
780
    valid_numbers = ValidNumbers.positive_only
12✔
781
    help = help_text(
12✔
782
        """
783
        A timeout (in seconds) used by each test file belonging to this target.
784

785
        If unset, will default to `[test].timeout_default`; if that option is also unset,
786
        then the test will never time out. Will never exceed `[test].timeout_maximum`. Only
787
        applies if the option `--test-timeouts` is set to true (the default).
788
        """
789
    )
790

791
    def calculate_from_global_options(self, test: TestSubsystem) -> int | None:
12✔
792
        if not test.timeouts:
7✔
793
            return None
1✔
794
        if self.value is None:
7✔
795
            if test.timeout_default is None:
7✔
796
                return None
7✔
797
            result = test.timeout_default
1✔
798
        else:
799
            result = self.value
1✔
800
        if test.timeout_maximum is not None:
1✔
801
            return min(result, test.timeout_maximum)
1✔
802
        return result
1✔
803

804

805
class TestExtraEnvVarsField(StringSequenceField, metaclass=ABCMeta):
12✔
806
    alias = "extra_env_vars"
12✔
807
    help = help_text(
12✔
808
        f"""
809
        Additional environment variables to include in test processes.
810

811
        {EXTRA_ENV_VARS_USAGE_HELP}
812

813
        This will be merged with and override values from `[test].extra_env_vars`.
814
        """
815
    )
816

817
    def sorted(self) -> tuple[str, ...]:
12✔
818
        return tuple(sorted(self.value or ()))
3✔
819

820

821
class TestsBatchCompatibilityTagField(StringField, metaclass=ABCMeta):
12✔
822
    alias = "batch_compatibility_tag"
12✔
823

824
    @classmethod
12✔
825
    def format_help(cls, target_name: str, test_runner_name: str) -> str:
12✔
826
        return f"""
12✔
827
        An arbitrary value used to mark the test files belonging to this target as valid for
828
        batched execution.
829

830
        It's _sometimes_ safe to run multiple `{target_name}`s within a single test runner process,
831
        and doing so can give significant wins by allowing reuse of expensive test setup /
832
        teardown logic. To opt into this behavior, set this field to an arbitrary non-empty
833
        string on all the `{target_name}` targets that are safe/compatible to run in the same
834
        process.
835

836
        If this field is left unset on a target, the target is assumed to be incompatible with
837
        all others and will run in a dedicated `{test_runner_name}` process.
838

839
        If this field is set on a target, and its value is different from the value on some
840
        other test `{target_name}`, then the two targets are explicitly incompatible and are guaranteed
841
        to not run in the same `{test_runner_name}` process.
842

843
        If this field is set on a target, and its value is the same as the value on some other
844
        `{target_name}`, then the two targets are explicitly compatible and _may_ run in the same
845
        test runner process. Compatible tests may not end up in the same test runner batch if:
846

847
          * There are "too many" compatible tests in a partition, as determined by the \
848
            `[test].batch_size` config parameter, or
849
          * Compatible tests have some incompatibility in Pants metadata (i.e. different \
850
            `resolve`s or `extra_env_vars`).
851

852
        When tests with the same `batch_compatibility_tag` have incompatibilities in some other
853
        Pants metadata, they will be automatically split into separate batches. This way you can
854
        set a high-level `batch_compatibility_tag` using `__defaults__` and then have tests
855
        continue to work as you tweak BUILD metadata on specific targets.
856
        """
857

858

859
async def _get_test_batches(
12✔
860
    core_request_types: Iterable[type[TestRequest]],
861
    targets_to_field_sets: TargetRootsToFieldSets,
862
    local_environment_name: ChosenLocalEnvironmentName,
863
    test_subsystem: TestSubsystem,
864
) -> list[TestRequest.Batch]:
865
    def partitions_call(request_type: type[TestRequest]) -> Coroutine[Any, Any, Partitions]:
1✔
866
        partition_type = cast(TestRequest, request_type)
1✔
867
        field_set_type = partition_type.field_set_type
1✔
868
        applicable_field_sets: list[TestFieldSet] = []
1✔
869
        for target, field_sets in targets_to_field_sets.mapping.items():
1✔
870
            if field_set_type.is_applicable(target):
1✔
871
                applicable_field_sets.extend(field_sets)
1✔
872

873
        partition_request = partition_type.PartitionRequest(tuple(applicable_field_sets))
1✔
874
        return partition_tests(
1✔
875
            **implicitly(
876
                {
877
                    partition_request: TestRequest.PartitionRequest,
878
                    local_environment_name.val: EnvironmentName,
879
                },
880
            )
881
        )
882

883
    all_partitions = await concurrently(
1✔
884
        partitions_call(request_type) for request_type in core_request_types
885
    )
886

887
    return [
1✔
888
        request_type.Batch(
889
            cast(TestRequest, request_type).tool_name, tuple(batch), partition.metadata
890
        )
891
        for request_type, partitions in zip(core_request_types, all_partitions)
892
        for partition in partitions
893
        for batch in partition_sequentially(
894
            partition.elements,
895
            key=lambda x: str(x.address) if isinstance(x, FieldSet) else str(x),
896
            size_target=test_subsystem.batch_size,
897
            size_max=2 * test_subsystem.batch_size,
898
        )
899
    ]
900

901

902
async def _run_debug_tests(
12✔
903
    batches: Iterable[TestRequest.Batch],
904
    environment_names: Sequence[EnvironmentName],
905
    test_subsystem: TestSubsystem,
906
    debug_adapter: DebugAdapterSubsystem,
907
) -> Test:
908
    debug_requests = await concurrently(
1✔
909
        (
910
            test_batch_to_debug_request(
911
                **implicitly({batch: TestRequest.Batch, environment_name: EnvironmentName})
912
            )
913
            if not test_subsystem.debug_adapter
914
            else test_batch_to_debug_adapter_request(
915
                **implicitly({batch: TestRequest.Batch, environment_name: EnvironmentName})
916
            )
917
        )
918
        for batch, environment_name in zip(batches, environment_names)
919
    )
920
    exit_code = 0
1✔
921
    for debug_request, environment_name in zip(debug_requests, environment_names):
1✔
922
        if test_subsystem.debug_adapter:
1✔
923
            logger.info(
×
924
                softwrap(
925
                    f"""
926
                    Launching debug adapter at '{debug_adapter.host}:{debug_adapter.port}',
927
                    which will wait for a client connection...
928
                    """
929
                )
930
            )
931

932
        debug_result = await run_interactive_process_in_environment(
1✔
933
            debug_request.process, environment_name
934
        )
935
        if debug_result.exit_code != 0:
1✔
936
            exit_code = debug_result.exit_code
×
937
    return Test(exit_code)
1✔
938

939

940
def _save_test_result_info_report_file(run_id: RunId, results: dict[str, dict]) -> None:
12✔
941
    """Save a JSON file with the information about the test results."""
942
    timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
×
943
    obj = json.dumps({"timestamp": timestamp, "run_id": run_id, "info": results})
×
944
    with safe_open(f"test_result_info_report_runid{run_id}_{timestamp}.json", "w") as fh:
×
945
        fh.write(obj)
×
946

947

948
@goal_rule
12✔
949
async def run_tests(
12✔
950
    console: Console,
951
    test_subsystem: TestSubsystem,
952
    debug_adapter: DebugAdapterSubsystem,
953
    workspace: Workspace,
954
    union_membership: UnionMembership,
955
    distdir: DistDir,
956
    run_id: RunId,
957
    local_environment_name: ChosenLocalEnvironmentName,
958
) -> Test:
959
    if test_subsystem.debug_adapter:
1✔
960
        goal_description = f"`{test_subsystem.name} --debug-adapter`"
×
961
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.error
×
962
    elif test_subsystem.debug:
1✔
963
        goal_description = f"`{test_subsystem.name} --debug`"
1✔
964
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.error
1✔
965
    else:
966
        goal_description = f"The `{test_subsystem.name}` goal"
1✔
967
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.warn
1✔
968

969
    shard, num_shards = parse_shard_spec(test_subsystem.shard, "the [test].shard option")
1✔
970
    targets_to_valid_field_sets = await find_valid_field_sets_for_target_roots(
1✔
971
        TargetRootsToFieldSetsRequest(
972
            TestFieldSet,
973
            goal_description=goal_description,
974
            no_applicable_targets_behavior=no_applicable_targets_behavior,
975
            shard=shard,
976
            num_shards=num_shards,
977
        ),
978
        **implicitly(),
979
    )
980

981
    request_types = union_membership.get(TestRequest)
1✔
982
    test_batches = await _get_test_batches(
1✔
983
        request_types,
984
        targets_to_valid_field_sets,
985
        local_environment_name,
986
        test_subsystem,
987
    )
988

989
    environment_names = await concurrently(
1✔
990
        resolve_single_environment_name(
991
            SingleEnvironmentNameRequest.from_field_sets(batch.elements, batch.description)
992
        )
993
        for batch in test_batches
994
    )
995

996
    if test_subsystem.debug or test_subsystem.debug_adapter:
1✔
997
        return await _run_debug_tests(
1✔
998
            test_batches, environment_names, test_subsystem, debug_adapter
999
        )
1000

1001
    to_test = list(zip(test_batches, environment_names))
1✔
1002
    results = await concurrently(
1✔
1003
        run_test_batch(
1004
            **implicitly(
1005
                {
1006
                    batch: TestRequest.Batch,
1007
                    environment_name: EnvironmentName,
1008
                }
1009
            )
1010
        )
1011
        for batch, environment_name in to_test
1012
    )
1013

1014
    # Print summary.
1015
    exit_code = 0
1✔
1016
    if results:
1✔
1017
        console.print_stderr("")
1✔
1018
    if test_subsystem.experimental_report_test_result_info:
1✔
1019
        test_result_info = {}
×
1020
    for result in sorted(results):
1✔
1021
        if result.exit_code is None:
1✔
1022
            # We end up here, e.g., if we implemented test discovery and found no tests.
1023
            continue
×
1024
        if result.exit_code != 0:
1✔
1025
            exit_code = result.exit_code
1✔
1026
        if result.result_metadata is None:
1✔
1027
            # We end up here, e.g., if compilation failed during self-implemented test discovery.
1028
            continue
1✔
1029
        if test_subsystem.experimental_report_test_result_info:
1✔
1030
            test_result_info[result.addresses[0].spec] = {
×
1031
                "source": result.result_metadata.source(run_id).value
1032
            }
1033
        console.print_stderr(_format_test_summary(result, run_id, console))
1✔
1034

1035
        if result.extra_output and result.extra_output.files:
1✔
1036
            path_prefix = str(distdir.relpath / "test" / result.path_safe_description)
×
1037
            workspace.write_digest(
×
1038
                result.extra_output.digest,
1039
                path_prefix=path_prefix,
1040
            )
1041
            if result.log_extra_output:
×
1042
                logger.info(
×
1043
                    f"Wrote extra output from test `{result.addresses[0]}` to `{path_prefix}`."
1044
                )
1045

1046
    rerun_command = _format_test_rerun_command(results)
1✔
1047
    if rerun_command and test_subsystem.show_rerun_command:
1✔
1048
        console.print_stderr(f"\n{rerun_command}")
1✔
1049

1050
    if test_subsystem.report:
1✔
1051
        report_dir = test_subsystem.report_dir(distdir)
1✔
1052
        merged_reports = await merge_digests(
1✔
1053
            MergeDigests(result.xml_results.digest for result in results if result.xml_results)
1054
        )
1055
        workspace.write_digest(merged_reports, path_prefix=str(report_dir))
1✔
1056
        console.print_stderr(f"\nWrote test reports to {report_dir}")
1✔
1057

1058
    if test_subsystem.use_coverage:
1✔
1059
        # NB: We must pre-sort the data for itertools.groupby() to work properly, using the same
1060
        # key function for both. However, you can't sort by `types`, so we call `str()` on it.
1061
        all_coverage_data = sorted(
1✔
1062
            (result.coverage_data for result in results if result.coverage_data is not None),
1063
            key=lambda cov_data: str(type(cov_data)),
1064
        )
1065

1066
        coverage_types_to_collection_types = {
1✔
1067
            collection_cls.element_type: collection_cls  # type: ignore[misc]
1068
            for collection_cls in union_membership.get(CoverageDataCollection)
1069
        }
1070
        coverage_collections = []
1✔
1071
        for data_cls, data in itertools.groupby(all_coverage_data, lambda data: type(data)):
1✔
1072
            collection_cls = coverage_types_to_collection_types[data_cls]  # type: ignore[index]
1✔
1073
            coverage_collections.append(collection_cls(data))
1✔
1074
        # We can create multiple reports for each coverage data (e.g., console, xml, html)
1075
        coverage_reports_collections = await concurrently(
1✔
1076
            create_coverage_report(
1077
                **implicitly(
1078
                    {
1079
                        coverage_collection: CoverageDataCollection,
1080
                        local_environment_name.val: EnvironmentName,
1081
                    }
1082
                )
1083
            )
1084
            for coverage_collection in coverage_collections
1085
        )
1086

1087
        coverage_report_files: list[PurePath] = []
1✔
1088
        for coverage_reports in coverage_reports_collections:
1✔
1089
            report_files = coverage_reports.materialize(console, workspace)
1✔
1090
            coverage_report_files.extend(report_files)
1✔
1091

1092
        if coverage_report_files and test_subsystem.open_coverage:
1✔
1093
            open_files = await find_open_program(
×
1094
                OpenFilesRequest(coverage_report_files, error_if_open_not_found=False),
1095
                **implicitly(),
1096
            )
1097
            for process in open_files.processes:
×
1098
                _ = await run_interactive_process_in_environment(
×
1099
                    process, local_environment_name.val
1100
                )
1101

1102
        for coverage_reports in coverage_reports_collections:
1✔
1103
            if coverage_reports.coverage_insufficient:
1✔
1104
                logger.error(
×
1105
                    softwrap(
1106
                        """
1107
                        Test goal failed due to insufficient coverage.
1108
                        See coverage reports for details.
1109
                        """
1110
                    )
1111
                )
1112
                # coverage.py uses 2 to indicate failure due to insufficient coverage.
1113
                # We may as well follow suit in the general case, for all languages.
1114
                exit_code = 2
×
1115

1116
    if test_subsystem.experimental_report_test_result_info:
1✔
1117
        _save_test_result_info_report_file(run_id, test_result_info)
×
1118

1119
    return Test(exit_code)
1✔
1120

1121

1122
_SOURCE_MAP = {
12✔
1123
    ProcessResultMetadata.Source.MEMOIZED: "memoized",
1124
    ProcessResultMetadata.Source.RAN: "ran",
1125
    ProcessResultMetadata.Source.HIT_LOCALLY: "cached locally",
1126
    ProcessResultMetadata.Source.HIT_REMOTELY: "cached remotely",
1127
}
1128

1129

1130
def _format_test_summary(result: TestResult, run_id: RunId, console: Console) -> str:
12✔
1131
    """Format the test summary printed to the console."""
1132
    assert result.result_metadata is not None, (
1✔
1133
        "Skipped test results should not be outputted in the test summary"
1134
    )
1135
    succeeded = result.exit_code == 0
1✔
1136
    retried = len(result.process_results) > 1
1✔
1137

1138
    if succeeded:
1✔
1139
        if not retried:
1✔
1140
            sigil = console.sigil_succeeded()
1✔
1141
        else:
1142
            sigil = console.sigil_succeeded_with_edits()
×
1143
        status = "succeeded"
1✔
1144
    else:
1145
        sigil = console.sigil_failed()
1✔
1146
        status = "failed"
1✔
1147

1148
    if retried:
1✔
1149
        attempt_msg = f" after {len(result.process_results)} attempts"
×
1150
    else:
1151
        attempt_msg = ""
1✔
1152

1153
    environment = result.result_metadata.execution_environment.name
1✔
1154
    environment_type = result.result_metadata.execution_environment.environment_type
1✔
1155
    source = result.result_metadata.source(run_id)
1✔
1156
    source_str = _SOURCE_MAP[source]
1✔
1157
    if environment:
1✔
1158
        preposition = "in" if source == ProcessResultMetadata.Source.RAN else "for"
1✔
1159
        source_desc = (
1✔
1160
            f" ({source_str} {preposition} {environment_type} environment `{environment}`)"
1161
        )
1162
    elif source == ProcessResultMetadata.Source.RAN:
1✔
1163
        source_desc = ""
1✔
1164
    else:
1165
        source_desc = f" ({source_str})"
1✔
1166

1167
    elapsed_print = ""
1✔
1168
    total_elapsed_ms = result.result_metadata.total_elapsed_ms
1✔
1169
    if total_elapsed_ms is not None:
1✔
1170
        elapsed_secs = total_elapsed_ms / 1000
1✔
1171
        elapsed_print = f"in {elapsed_secs:.2f}s"
1✔
1172

1173
    return f"{sigil} {result.description} {status}{attempt_msg} {elapsed_print}{source_desc}."
1✔
1174

1175

1176
def _format_test_rerun_command(results: Iterable[TestResult]) -> None | str:
12✔
1177
    failures = [result for result in results if result.exit_code not in (None, 0)]
1✔
1178
    if not failures:
1✔
1179
        return None
1✔
1180

1181
    # format an invocation like `pants test path/to/first:address path/to/second:address ...`
1182
    addresses = sorted(shlex.quote(str(addr)) for result in failures for addr in result.addresses)
1✔
1183
    goal = f"{bin_name()} {TestSubsystem.name}"
1✔
1184
    invocation = " ".join([goal, *addresses])
1✔
1185

1186
    return f"To rerun the failing tests, use:\n\n    {invocation}"
1✔
1187

1188

1189
@dataclass(frozen=True)
12✔
1190
class TestExtraEnv:
12✔
1191
    env: EnvironmentVars
12✔
1192

1193

1194
@rule
12✔
1195
async def get_filtered_environment(test_env_aware: TestSubsystem.EnvironmentAware) -> TestExtraEnv:
12✔
1196
    return TestExtraEnv(
9✔
1197
        await environment_vars_subset(
1198
            EnvironmentVarsRequest(test_env_aware.extra_env_vars), **implicitly()
1199
        )
1200
    )
1201

1202

1203
@memoized
12✔
1204
def _unsupported_debug_rules(cls: type[TestRequest]) -> Iterable:
12✔
1205
    """Returns a rule that implements TestDebugRequest by raising an error."""
1206

1207
    @rule(canonical_name_suffix=cls.__name__, _param_type_overrides={"request": cls.Batch})
6✔
1208
    async def get_test_debug_request(request: TestRequest.Batch) -> TestDebugRequest:
6✔
1209
        raise NotImplementedError("Testing this target with --debug is not yet supported.")
×
1210

1211
    return collect_rules(locals())
6✔
1212

1213

1214
@memoized
12✔
1215
def _unsupported_debug_adapter_rules(cls: type[TestRequest]) -> Iterable:
12✔
1216
    """Returns a rule that implements TestDebugAdapterRequest by raising an error."""
1217

1218
    @rule(canonical_name_suffix=cls.__name__, _param_type_overrides={"request": cls.Batch})
9✔
1219
    async def get_test_debug_adapter_request(request: TestRequest.Batch) -> TestDebugAdapterRequest:
9✔
1220
        raise NotImplementedError(
×
1221
            "Testing this target type with a debug adapter is not yet supported."
1222
        )
1223

1224
    return collect_rules(locals())
9✔
1225

1226

1227
# -------------------------------------------------------------------------------------------
1228
# `runtime_package_dependencies` field
1229
# -------------------------------------------------------------------------------------------
1230

1231

1232
class RuntimePackageDependenciesField(SpecialCasedDependencies):
12✔
1233
    alias = "runtime_package_dependencies"
12✔
1234
    help = help_text(
12✔
1235
        f"""
1236
        Addresses to targets that can be built with the `{bin_name()} package` goal and whose
1237
        resulting artifacts should be included in the test run.
1238

1239
        Pants will build the artifacts as if you had run `{bin_name()} package`.
1240
        It will include the results in your test's chroot, using the same name they would normally
1241
        have, but without the `--distdir` prefix (e.g. `dist/`).
1242

1243
        You can include anything that can be built by `{bin_name()} package`, e.g. a `pex_binary`,
1244
        `python_aws_lambda_function`, or an `archive`.
1245
        """
1246
    )
1247

1248

1249
class TraverseGenericTargetDepsOnly(ShouldTraverseDepsPredicate):
12✔
1250
    """Traverses deps of `target()` (GenericTarget) entries, stops at all other target types.
1251

1252
    Used to unwrap a `target()` alias that groups packageable targets, so that
1253
    `runtime_package_dependencies` can reference the alias instead of each target individually.
1254
    """
1255

1256
    def __call__(
12✔
1257
        self, target: Target, field: Dependencies | SpecialCasedDependencies
1258
    ) -> DepsTraversalBehavior:
1259
        if isinstance(target, GenericTarget) and isinstance(field, Dependencies):
3✔
1260
            return DepsTraversalBehavior.INCLUDE
1✔
1261
        return DepsTraversalBehavior.EXCLUDE
3✔
1262

1263

1264
class BuiltPackageDependencies(Collection[BuiltPackage]):
12✔
1265
    pass
12✔
1266

1267

1268
@dataclass(frozen=True)
12✔
1269
class BuildPackageDependenciesRequest:
12✔
1270
    field: RuntimePackageDependenciesField
12✔
1271

1272

1273
@rule(desc="Build runtime package dependencies for tests", level=LogLevel.DEBUG)
12✔
1274
async def build_runtime_package_dependencies(
12✔
1275
    request: BuildPackageDependenciesRequest,
1276
) -> BuiltPackageDependencies:
1277
    unparsed_addresses = request.field.to_unparsed_address_inputs()
3✔
1278
    if not unparsed_addresses:
3✔
1279
        return BuiltPackageDependencies()
×
1280
    tgts = await resolve_targets(**implicitly(unparsed_addresses))
3✔
1281

1282
    # Unwrap GenericTarget ("target()") entries by traversing their deps transitively,
1283
    # stopping at non-GenericTarget targets. This lets callers group packageable targets
1284
    # under a single `target()` alias and reference that alias in
1285
    # runtime_package_dependencies, rather than listing each packageable target individually.
1286
    transitive = await transitive_targets(
3✔
1287
        TransitiveTargetsRequest(
1288
            [tgt.address for tgt in tgts],
1289
            should_traverse_deps_predicate=TraverseGenericTargetDepsOnly(),
1290
        ),
1291
        **implicitly(),
1292
    )
1293
    non_generic = [tgt for tgt in transitive.closure if not isinstance(tgt, GenericTarget)]
3✔
1294

1295
    field_sets_per_tgt = await find_valid_field_sets(
3✔
1296
        FieldSetsPerTargetRequest(PackageFieldSet, non_generic), **implicitly()
1297
    )
1298
    packages = await concurrently(
3✔
1299
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
1300
        for field_set in field_sets_per_tgt.field_sets
1301
    )
1302
    return BuiltPackageDependencies(packages)
3✔
1303

1304

1305
def rules():
12✔
1306
    return [
7✔
1307
        *collect_rules(),
1308
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc