• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 24055979590

06 Apr 2026 11:17PM UTC coverage: 52.37% (-40.5%) from 92.908%
24055979590

Pull #23225

github

web-flow
Merge 67474653c into 542ca048d
Pull Request #23225: Add --test-show-all-batch-targets to expose all targets in batched pytest

6 of 17 new or added lines in 2 files covered. (35.29%)

23030 existing lines in 605 files now uncovered.

31643 of 60422 relevant lines covered (52.37%)

1.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.91
/src/python/pants/core/goals/test.py
1
# Copyright 2018 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
2✔
5

6
import itertools
2✔
7
import json
2✔
8
import logging
2✔
9
import os
2✔
10
import shlex
2✔
11
from abc import ABC, ABCMeta
2✔
12
from collections.abc import Coroutine, Iterable, Sequence
2✔
13
from dataclasses import dataclass, field
2✔
14
from datetime import datetime
2✔
15
from enum import Enum
2✔
16
from pathlib import PurePath
2✔
17
from typing import Any, ClassVar, TypeVar, cast
2✔
18

19
from pants.core.environments.rules import (
2✔
20
    ChosenLocalEnvironmentName,
21
    EnvironmentName,
22
    SingleEnvironmentNameRequest,
23
    resolve_single_environment_name,
24
)
25
from pants.core.goals.multi_tool_goal_helper import SkippableSubsystem
2✔
26
from pants.core.goals.package import (
2✔
27
    BuiltPackage,
28
    EnvironmentAwarePackageRequest,
29
    PackageFieldSet,
30
    environment_aware_package,
31
)
32
from pants.core.subsystems.debug_adapter import DebugAdapterSubsystem
2✔
33
from pants.core.util_rules.distdir import DistDir
2✔
34
from pants.core.util_rules.env_vars import environment_vars_subset
2✔
35
from pants.core.util_rules.partitions import (
2✔
36
    PartitionerType,
37
    PartitionMetadataT,
38
    Partitions,
39
    _BatchBase,
40
    _PartitionFieldSetsRequestBase,
41
)
42
from pants.engine.addresses import Address
2✔
43
from pants.engine.collection import Collection
2✔
44
from pants.engine.console import Console
2✔
45
from pants.engine.desktop import OpenFilesRequest, find_open_program
2✔
46
from pants.engine.engine_aware import EngineAwareReturnType
2✔
47
from pants.engine.env_vars import EXTRA_ENV_VARS_USAGE_HELP, EnvironmentVars, EnvironmentVarsRequest
2✔
48
from pants.engine.fs import EMPTY_FILE_DIGEST, FileDigest, MergeDigests, Snapshot, Workspace
2✔
49
from pants.engine.goal import Goal, GoalSubsystem
2✔
50
from pants.engine.internals.graph import find_valid_field_sets, resolve_targets
2✔
51
from pants.engine.internals.session import RunId
2✔
52
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
2✔
53
from pants.engine.intrinsics import merge_digests, run_interactive_process_in_environment
2✔
54
from pants.engine.process import (
2✔
55
    FallibleProcessResult,
56
    InteractiveProcess,
57
    ProcessCacheScope,
58
    ProcessResultMetadata,
59
)
60
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
2✔
61
from pants.engine.target import (
2✔
62
    FieldSet,
63
    FieldSetsPerTargetRequest,
64
    IntField,
65
    NoApplicableTargetsBehavior,
66
    SourcesField,
67
    SpecialCasedDependencies,
68
    StringField,
69
    StringSequenceField,
70
    TargetRootsToFieldSets,
71
    TargetRootsToFieldSetsRequest,
72
    ValidNumbers,
73
    parse_shard_spec,
74
)
75
from pants.engine.unions import UnionMembership, UnionRule, distinct_union_type_per_subclass, union
2✔
76
from pants.option.option_types import BoolOption, EnumOption, IntOption, StrListOption, StrOption
2✔
77
from pants.util.collections import partition_sequentially
2✔
78
from pants.util.dirutil import safe_open
2✔
79
from pants.util.docutil import bin_name
2✔
80
from pants.util.logging import LogLevel
2✔
81
from pants.util.memo import memoized, memoized_property
2✔
82
from pants.util.meta import classproperty
2✔
83
from pants.util.strutil import Simplifier, help_text, softwrap
2✔
84

85
logger = logging.getLogger(__name__)
2✔
86

87

88
@dataclass(frozen=True)
2✔
89
class TestResult(EngineAwareReturnType):
2✔
90
    # A None exit_code indicates a backend that performs its own test discovery/selection
91
    # (rather than delegating that to the underlying test tool), and discovered no tests.
92
    exit_code: int | None
2✔
93
    stdout_bytes: bytes
2✔
94
    stdout_digest: FileDigest
2✔
95
    stderr_bytes: bytes
2✔
96
    stderr_digest: FileDigest
2✔
97
    addresses: tuple[Address, ...]
2✔
98
    output_setting: ShowOutput
2✔
99
    # A None result_metadata indicates a backend that performs its own test discovery/selection
100
    # and either discovered no tests, or encountered an error, such as a compilation error, in
101
    # the attempt.
102
    result_metadata: ProcessResultMetadata | None  # TODO: Merge elapsed MS of all subproceses
2✔
103
    partition_description: str | None = None
2✔
104

105
    coverage_data: CoverageData | None = None
2✔
106
    # TODO: Rename this to `reports`. There is no guarantee that every language will produce
107
    #  XML reports, or only XML reports.
108
    xml_results: Snapshot | None = None
2✔
109
    # Any extra output (such as from plugins) that the test runner was configured to output.
110
    extra_output: Snapshot | None = None
2✔
111
    # True if the core test rules should log that extra output was written.
112
    log_extra_output: bool = False
2✔
113
    # All results including failed attempts
114
    process_results: tuple[FallibleProcessResult, ...] = field(default_factory=tuple)
2✔
115

116
    output_simplifier: Simplifier = Simplifier()
2✔
117

118
    # Prevent this class from being detected by pytest as a test class.
119
    __test__ = False
2✔
120

121
    @staticmethod
2✔
122
    def no_tests_found(address: Address, output_setting: ShowOutput) -> TestResult:
2✔
123
        """Used when we do test discovery ourselves, and we didn't find any."""
UNCOV
124
        return TestResult(
×
125
            exit_code=None,
126
            stdout_bytes=b"",
127
            stderr_bytes=b"",
128
            stdout_digest=EMPTY_FILE_DIGEST,
129
            stderr_digest=EMPTY_FILE_DIGEST,
130
            addresses=(address,),
131
            output_setting=output_setting,
132
            result_metadata=None,
133
        )
134

135
    @staticmethod
2✔
136
    def no_tests_found_in_batch(
2✔
137
        batch: TestRequest.Batch[_TestFieldSetT, Any], output_setting: ShowOutput
138
    ) -> TestResult:
139
        """Used when we do test discovery ourselves, and we didn't find any."""
UNCOV
140
        return TestResult(
×
141
            exit_code=None,
142
            stdout_bytes=b"",
143
            stderr_bytes=b"",
144
            stdout_digest=EMPTY_FILE_DIGEST,
145
            stderr_digest=EMPTY_FILE_DIGEST,
146
            addresses=tuple(field_set.address for field_set in batch.elements),
147
            output_setting=output_setting,
148
            result_metadata=None,
149
            partition_description=batch.partition_metadata.description,
150
        )
151

152
    @staticmethod
2✔
153
    def from_fallible_process_result(
2✔
154
        process_results: tuple[FallibleProcessResult, ...],
155
        address: Address,
156
        output_setting: ShowOutput,
157
        *,
158
        coverage_data: CoverageData | None = None,
159
        xml_results: Snapshot | None = None,
160
        extra_output: Snapshot | None = None,
161
        log_extra_output: bool = False,
162
        output_simplifier: Simplifier = Simplifier(),
163
    ) -> TestResult:
164
        process_result = process_results[-1]
2✔
165
        return TestResult(
2✔
166
            exit_code=process_result.exit_code,
167
            stdout_bytes=process_result.stdout,
168
            stdout_digest=process_result.stdout_digest,
169
            stderr_bytes=process_result.stderr,
170
            stderr_digest=process_result.stderr_digest,
171
            addresses=(address,),
172
            output_setting=output_setting,
173
            result_metadata=process_result.metadata,
174
            coverage_data=coverage_data,
175
            xml_results=xml_results,
176
            extra_output=extra_output,
177
            log_extra_output=log_extra_output,
178
            process_results=process_results,
179
            output_simplifier=output_simplifier,
180
        )
181

182
    @staticmethod
2✔
183
    def from_batched_fallible_process_result(
2✔
184
        process_results: tuple[FallibleProcessResult, ...],
185
        batch: TestRequest.Batch[_TestFieldSetT, Any],
186
        output_setting: ShowOutput,
187
        *,
188
        coverage_data: CoverageData | None = None,
189
        xml_results: Snapshot | None = None,
190
        extra_output: Snapshot | None = None,
191
        log_extra_output: bool = False,
192
        output_simplifier: Simplifier = Simplifier(),
193
    ) -> TestResult:
194
        process_result = process_results[-1]
2✔
195
        return TestResult(
2✔
196
            exit_code=process_result.exit_code,
197
            stdout_bytes=process_result.stdout,
198
            stdout_digest=process_result.stdout_digest,
199
            stderr_bytes=process_result.stderr,
200
            stderr_digest=process_result.stderr_digest,
201
            addresses=tuple(field_set.address for field_set in batch.elements),
202
            output_setting=output_setting,
203
            result_metadata=process_result.metadata,
204
            coverage_data=coverage_data,
205
            xml_results=xml_results,
206
            extra_output=extra_output,
207
            log_extra_output=log_extra_output,
208
            output_simplifier=output_simplifier,
209
            partition_description=batch.partition_metadata.description,
210
            process_results=process_results,
211
        )
212

213
    @property
2✔
214
    def description(self) -> str:
2✔
UNCOV
215
        if len(self.addresses) == 1:
×
UNCOV
216
            return self.addresses[0].spec
×
217

218
        return f"{self.addresses[0].spec} and {len(self.addresses) - 1} other files"
×
219

220
    @property
2✔
221
    def path_safe_description(self) -> str:
2✔
222
        if len(self.addresses) == 1:
×
223
            return self.addresses[0].path_safe_spec
×
224

225
        return f"{self.addresses[0].path_safe_spec}+{len(self.addresses) - 1}"
×
226

227
    def __lt__(self, other: Any) -> bool:
2✔
228
        """We sort first by exit code, then alphanumerically within each group."""
UNCOV
229
        if not isinstance(other, TestResult):
×
230
            return NotImplemented
×
UNCOV
231
        if self.exit_code == other.exit_code:
×
UNCOV
232
            return self.description < other.description
×
UNCOV
233
        if self.exit_code is None:
×
234
            return True
×
UNCOV
235
        if other.exit_code is None:
×
236
            return False
×
UNCOV
237
        return abs(self.exit_code) < abs(other.exit_code)
×
238

239
    def artifacts(self) -> dict[str, FileDigest | Snapshot] | None:
2✔
240
        output: dict[str, FileDigest | Snapshot] = {
2✔
241
            "stdout": self.stdout_digest,
242
            "stderr": self.stderr_digest,
243
        }
244
        if self.xml_results:
2✔
245
            output["xml_results"] = self.xml_results
2✔
246
        return output
2✔
247

248
    def level(self) -> LogLevel:
2✔
249
        if self.exit_code is None:
2✔
UNCOV
250
            return LogLevel.DEBUG
×
251
        return LogLevel.INFO if self.exit_code == 0 else LogLevel.ERROR
2✔
252

253
    def _simplified_output(self, v: bytes) -> str:
2✔
254
        return self.output_simplifier.simplify(v.decode(errors="replace"))
2✔
255

256
    @memoized_property
2✔
257
    def stdout_simplified_str(self) -> str:
2✔
258
        return self._simplified_output(self.stdout_bytes)
2✔
259

260
    @memoized_property
2✔
261
    def stderr_simplified_str(self) -> str:
2✔
UNCOV
262
        return self._simplified_output(self.stderr_bytes)
×
263

264
    def message(self) -> str:
2✔
265
        if self.exit_code is None:
2✔
UNCOV
266
            return "no tests found."
×
267
        status = "succeeded" if self.exit_code == 0 else f"failed (exit code {self.exit_code})"
2✔
268
        message = f"{status}."
2✔
269
        if self.partition_description:
2✔
270
            message += f"\nPartition: {self.partition_description}"
2✔
271
        if self.output_setting == ShowOutput.NONE or (
2✔
272
            self.output_setting == ShowOutput.FAILED and self.exit_code == 0
273
        ):
274
            return message
2✔
275
        output = ""
2✔
276
        if self.stdout_bytes:
2✔
277
            output += f"\n{self.stdout_simplified_str}"
2✔
278
        if self.stderr_bytes:
2✔
UNCOV
279
            output += f"\n{self.stderr_simplified_str}"
×
280
        if output:
2✔
281
            output = f"{output.rstrip()}\n\n"
2✔
282
        return f"{message}{output}"
2✔
283

284
    def metadata(self) -> dict[str, Any]:
2✔
285
        return {"addresses": [address.spec for address in self.addresses]}
2✔
286

287
    def cacheable(self) -> bool:
2✔
288
        """Is marked uncacheable to ensure that it always renders."""
289
        return False
2✔
290

291

292
class ShowOutput(Enum):
2✔
293
    """Which tests to emit detailed output for."""
294

295
    ALL = "all"
2✔
296
    FAILED = "failed"
2✔
297
    NONE = "none"
2✔
298

299

300
@dataclass(frozen=True)
2✔
301
class TestDebugRequest:
2✔
302
    process: InteractiveProcess
2✔
303

304
    # Prevent this class from being detected by pytest as a test class.
305
    __test__ = False
2✔
306

307

308
class TestDebugAdapterRequest(TestDebugRequest):
2✔
309
    """Like TestDebugRequest, but launches the test process using the relevant Debug Adapter server.
310

311
    The process should be launched waiting for the client to connect.
312
    """
313

314

315
@union
2✔
316
@dataclass(frozen=True)
2✔
317
class TestFieldSet(FieldSet, metaclass=ABCMeta):
2✔
318
    """The fields necessary to run tests on a target."""
319

320
    sources: SourcesField
2✔
321

322
    __test__ = False
2✔
323

324

325
_TestFieldSetT = TypeVar("_TestFieldSetT", bound=TestFieldSet)
2✔
326

327

328
@union
2✔
329
class TestRequest:
2✔
330
    """Base class for plugin types wanting to be run as part of `test`.
331

332
    Plugins should define a new type which subclasses this type, and set the
333
    appropriate class variables.
334
    E.g.
335
        class DryCleaningRequest(TestRequest):
336
            tool_subsystem = DryCleaningSubsystem
337
            field_set_type = DryCleaningFieldSet
338

339
    Then register the rules which tell Pants about your plugin.
340
    E.g.
341
        def rules():
342
            return [
343
                *collect_rules(),
344
                *DryCleaningRequest.rules(),
345
            ]
346
    """
347

348
    tool_subsystem: ClassVar[type[SkippableSubsystem]]
2✔
349
    field_set_type: ClassVar[type[TestFieldSet]]
2✔
350
    partitioner_type: ClassVar[PartitionerType] = PartitionerType.DEFAULT_ONE_PARTITION_PER_INPUT
2✔
351

352
    supports_debug: ClassVar[bool] = False
2✔
353
    supports_debug_adapter: ClassVar[bool] = False
2✔
354

355
    __test__ = False
2✔
356

357
    @classproperty
2✔
358
    def tool_name(cls) -> str:
2✔
UNCOV
359
        return cls.tool_subsystem.options_scope
×
360

361
    @distinct_union_type_per_subclass(in_scope_types=[EnvironmentName])
2✔
362
    class PartitionRequest(_PartitionFieldSetsRequestBase[_TestFieldSetT]):
2✔
363
        def metadata(self) -> dict[str, Any]:
2✔
364
            return {"addresses": [field_set.address.spec for field_set in self.field_sets]}
2✔
365

366
    @distinct_union_type_per_subclass(in_scope_types=[EnvironmentName])
2✔
367
    class Batch(_BatchBase[_TestFieldSetT, PartitionMetadataT]):
2✔
368
        @property
2✔
369
        def single_element(self) -> _TestFieldSetT:
2✔
370
            """Return the single element of this batch.
371

372
            NOTE: Accessing this property will raise a `TypeError` if this `Batch` contains
373
            >1 elements. It is only safe to be used by test runners utilizing the "default"
374
            one-input-per-partition partitioner type.
375
            """
376

377
            if len(self.elements) != 1:
2✔
378
                description = ""
×
379
                if self.partition_metadata.description:
×
380
                    description = f" from partition '{self.partition_metadata.description}'"
×
381
                raise TypeError(
×
382
                    f"Expected a single element in batch{description}, but found {len(self.elements)}"
383
                )
384

385
            return self.elements[0]
2✔
386

387
        @property
2✔
388
        def description(self) -> str:
2✔
UNCOV
389
            if self.partition_metadata and self.partition_metadata.description:
×
390
                return f"test batch from partition '{self.partition_metadata.description}'"
×
UNCOV
391
            return "test batch"
×
392

393
        def debug_hint(self) -> str:
2✔
394
            if len(self.elements) == 1:
2✔
395
                return self.elements[0].address.spec
2✔
396

397
            return f"{self.elements[0].address.spec} and {len(self.elements) - 1} other files"
2✔
398

399
        def metadata(self) -> dict[str, Any]:
2✔
400
            return {
2✔
401
                "addresses": [field_set.address.spec for field_set in self.elements],
402
                "partition_description": self.partition_metadata.description,
403
            }
404

405
    @classmethod
2✔
406
    def rules(cls) -> Iterable:
2✔
407
        yield from cls.partitioner_type.default_rules(cls, by_file=False)
2✔
408

409
        yield UnionRule(TestFieldSet, cls.field_set_type)
2✔
410
        yield UnionRule(TestRequest, cls)
2✔
411
        yield UnionRule(TestRequest.PartitionRequest, cls.PartitionRequest)
2✔
412
        yield UnionRule(TestRequest.Batch, cls.Batch)
2✔
413

414
        if not cls.supports_debug:
2✔
UNCOV
415
            yield from _unsupported_debug_rules(cls)
×
416

417
        if not cls.supports_debug_adapter:
2✔
418
            yield from _unsupported_debug_adapter_rules(cls)
2✔
419

420

421
@rule(polymorphic=True)
2✔
422
async def partition_tests(req: TestRequest.PartitionRequest) -> Partitions:
2✔
423
    raise NotImplementedError()
×
424

425

426
@rule(polymorphic=True)
2✔
427
async def test_batch_to_debug_request(batch: TestRequest.Batch) -> TestDebugRequest:
2✔
428
    raise NotImplementedError()
×
429

430

431
@rule(polymorphic=True)
2✔
432
async def test_batch_to_debug_adapter_request(batch: TestRequest.Batch) -> TestDebugAdapterRequest:
2✔
433
    raise NotImplementedError()
×
434

435

436
@rule(polymorphic=True)
2✔
437
async def run_test_batch(batch: TestRequest.Batch) -> TestResult:
2✔
438
    raise NotImplementedError()
×
439

440

441
class CoverageData(ABC):
2✔
442
    """Base class for inputs to a coverage report.
443

444
    Subclasses should add whichever fields they require - snapshots of coverage output, XML files,
445
    etc.
446
    """
447

448

449
_CD = TypeVar("_CD", bound=CoverageData)
2✔
450

451

452
@union(in_scope_types=[EnvironmentName])
2✔
453
class CoverageDataCollection(Collection[_CD]):
2✔
454
    element_type: ClassVar[type[_CD]]
2✔
455

456

457
@dataclass(frozen=True)
2✔
458
class CoverageReport(ABC):
2✔
459
    """Represents a code coverage report that can be materialized to the terminal or disk."""
460

461
    # Some coverage systems can determine, based on a configurable threshold, whether coverage
462
    # was sufficient or not. The test goal will fail the build if coverage was deemed insufficient.
463
    coverage_insufficient: bool
2✔
464

465
    def materialize(self, console: Console, workspace: Workspace) -> PurePath | None:
2✔
466
        """Materialize this code coverage report to the terminal or disk.
467

468
        :param console: A handle to the terminal.
469
        :param workspace: A handle to local disk.
470
        :return: If a report was materialized to disk, the path of the file in the report one might
471
                 open first to start examining the report.
472
        """
473
        ...
474

475
    def get_artifact(self) -> tuple[str, Snapshot] | None:
2✔
476
        return None
×
477

478

479
@dataclass(frozen=True)
2✔
480
class ConsoleCoverageReport(CoverageReport):
2✔
481
    """Materializes a code coverage report to the terminal."""
482

483
    report: str
2✔
484

485
    def materialize(self, console: Console, workspace: Workspace) -> None:
2✔
UNCOV
486
        console.print_stderr(f"\n{self.report}")
×
UNCOV
487
        return None
×
488

489

490
@dataclass(frozen=True)
2✔
491
class FilesystemCoverageReport(CoverageReport):
2✔
492
    """Materializes a code coverage report to disk."""
493

494
    result_snapshot: Snapshot
2✔
495
    directory_to_materialize_to: PurePath
2✔
496
    report_file: PurePath | None
2✔
497
    report_type: str
2✔
498

499
    def materialize(self, console: Console, workspace: Workspace) -> PurePath | None:
2✔
500
        workspace.write_digest(
×
501
            self.result_snapshot.digest, path_prefix=str(self.directory_to_materialize_to)
502
        )
503
        console.print_stderr(
×
504
            f"\nWrote {self.report_type} coverage report to `{self.directory_to_materialize_to}`"
505
        )
506
        return self.report_file
×
507

508
    def get_artifact(self) -> tuple[str, Snapshot] | None:
2✔
UNCOV
509
        return f"coverage_{self.report_type}", self.result_snapshot
×
510

511

512
@dataclass(frozen=True)
2✔
513
class CoverageReports(EngineAwareReturnType):
2✔
514
    reports: tuple[CoverageReport, ...]
2✔
515

516
    @property
2✔
517
    def coverage_insufficient(self) -> bool:
2✔
518
        """Whether to fail the build due to insufficient coverage."""
UNCOV
519
        return any(report.coverage_insufficient for report in self.reports)
×
520

521
    def materialize(self, console: Console, workspace: Workspace) -> tuple[PurePath, ...]:
2✔
UNCOV
522
        report_paths = []
×
UNCOV
523
        for report in self.reports:
×
UNCOV
524
            report_path = report.materialize(console, workspace)
×
UNCOV
525
            if report_path:
×
526
                report_paths.append(report_path)
×
UNCOV
527
        return tuple(report_paths)
×
528

529
    def artifacts(self) -> dict[str, Snapshot | FileDigest] | None:
2✔
UNCOV
530
        artifacts: dict[str, Snapshot | FileDigest] = {}
×
UNCOV
531
        for report in self.reports:
×
UNCOV
532
            artifact = report.get_artifact()
×
UNCOV
533
            if not artifact:
×
534
                continue
×
UNCOV
535
            artifacts[artifact[0]] = artifact[1]
×
UNCOV
536
        return artifacts or None
×
537

538

539
@rule(polymorphic=True)
2✔
540
async def create_coverage_report(req: CoverageDataCollection) -> CoverageReports:
2✔
541
    raise NotImplementedError()
×
542

543

544
class TestSubsystem(GoalSubsystem):
2✔
545
    name = "test"
2✔
546
    help = "Run tests."
2✔
547

548
    # Prevent this class from being detected by pytest as a test class.
549
    __test__ = False
2✔
550

551
    @classmethod
2✔
552
    def activated(cls, union_membership: UnionMembership) -> bool:
2✔
553
        return TestRequest in union_membership
×
554

555
    class EnvironmentAware:
2✔
556
        extra_env_vars = StrListOption(
2✔
557
            help=softwrap(
558
                f"""
559
                Additional environment variables to include in test processes.
560

561
                {EXTRA_ENV_VARS_USAGE_HELP}
562
                """
563
            ),
564
        )
565

566
    debug = BoolOption(
2✔
567
        default=False,
568
        help=softwrap(
569
            """
570
            Run tests sequentially in an interactive process. This is necessary, for
571
            example, when you add breakpoints to your code.
572
            """
573
        ),
574
    )
575
    # See also `run.py`'s same option
576
    debug_adapter = BoolOption(
2✔
577
        default=False,
578
        help=softwrap(
579
            """
580
            Run tests sequentially in an interactive process, using a Debug Adapter
581
            (https://microsoft.github.io/debug-adapter-protocol/) for the language if supported.
582

583
            The interactive process used will be immediately blocked waiting for a client before
584
            continuing.
585

586
            This option implies `--debug`.
587
            """
588
        ),
589
    )
590
    force = BoolOption(
2✔
591
        default=False,
592
        help="Force the tests to run, even if they could be satisfied from cache.",
593
    )
594

595
    @property
2✔
596
    def default_process_cache_scope(self) -> ProcessCacheScope:
2✔
597
        return ProcessCacheScope.PER_SESSION if self.force else ProcessCacheScope.SUCCESSFUL
2✔
598

599
    output = EnumOption(
2✔
600
        default=ShowOutput.FAILED,
601
        help="Show stdout/stderr for these tests.",
602
    )
603
    use_coverage = BoolOption(
2✔
604
        default=False,
605
        help="Generate a coverage report if the test runner supports it.",
606
    )
607
    open_coverage = BoolOption(
2✔
608
        default=False,
609
        help=softwrap(
610
            """
611
            If a coverage report file is generated, open it on the local system if the
612
            system supports this.
613
            """
614
        ),
615
    )
616
    report = BoolOption(default=False, advanced=True, help="Write test reports to `--report-dir`.")
2✔
617
    default_report_path = str(PurePath("{distdir}", "test", "reports"))
2✔
618
    _report_dir = StrOption(
2✔
619
        default=default_report_path,
620
        advanced=True,
621
        help="Path to write test reports to. Must be relative to the build root.",
622
    )
623
    shard = StrOption(
2✔
624
        default="",
625
        help=softwrap(
626
            """
627
            A shard specification of the form "k/N", where N is a positive integer and k is a
628
            non-negative integer less than N.
629

630
            If set, the request input targets will be deterministically partitioned into N disjoint
631
            subsets of roughly equal size, and only the k'th subset will be used, with all others
632
            discarded.
633

634
            Useful for splitting large numbers of test files across multiple machines in CI.
635
            For example, you can run three shards with `--shard=0/3`, `--shard=1/3`, `--shard=2/3`.
636

637
            Note that the shards are roughly equal in size as measured by number of files.
638
            No attempt is made to consider the size of different files, the time they have
639
            taken to run in the past, or other such sophisticated measures.
640
            """
641
        ),
642
    )
643
    timeouts = BoolOption(
2✔
644
        default=True,
645
        help=softwrap(
646
            """
647
            Enable test target timeouts. If timeouts are enabled then test targets with a
648
            `timeout=` parameter set on their target will time out after the given number of
649
            seconds if not completed. If no timeout is set, then either the default timeout
650
            is used or no timeout is configured.
651
            """
652
        ),
653
    )
654
    timeout_default = IntOption(
2✔
655
        default=None,
656
        advanced=True,
657
        help=softwrap(
658
            """
659
            The default timeout (in seconds) for a test target if the `timeout` field is not
660
            set on the target.
661
            """
662
        ),
663
    )
664
    timeout_maximum = IntOption(
2✔
665
        default=None,
666
        advanced=True,
667
        help="The maximum timeout (in seconds) that may be used on a test target.",
668
    )
669
    _attempts_default = IntOption(
2✔
670
        default=1,
671
        help=softwrap(
672
            """
673
            The number of attempts to run tests, in case of a test failure.
674
            Tests that were retried will include the number of attempts in the summary output.
675
            """
676
        ),
677
    )
678

679
    batch_size = IntOption(
2✔
680
        "--batch-size",
681
        default=128,
682
        advanced=True,
683
        help=softwrap(
684
            """
685
            The target maximum number of files to be included in each run of batch-enabled
686
            test runners.
687

688
            Some test runners can execute tests from multiple files in a single run. Test
689
            implementations will return all tests that _can_ run together as a single group -
690
            and then this may be further divided into smaller batches, based on this option.
691
            This is done:
692

693
              1. to avoid OS argument length limits (in processes which don't support argument files)
694
              2. to support more stable cache keys than would be possible if all files were operated \
695
                 on in a single batch
696
              3. to allow for parallelism in test runners which don't have internal \
697
                 parallelism, or -- if they do support internal parallelism -- to improve scheduling \
698
                 behavior when multiple processes are competing for cores and so internal parallelism \
699
                 cannot be used perfectly
700

701
            In order to improve cache hit rates (see 2.), batches are created at stable boundaries,
702
            and so this value is only a "target" max batch size (rather than an exact value).
703

704
            NOTE: This parameter has no effect on test runners/plugins that do not implement support
705
            for batched testing.
706
            """
707
        ),
708
    )
709

710
    show_rerun_command = BoolOption(
2✔
711
        default="CI" in os.environ,
712
        advanced=True,
713
        help=softwrap(
714
            f"""
715
            If tests fail, show an appropriate `{bin_name()} {name} ...` invocation to rerun just
716
            those tests.
717

718
            This is to make it easy to run those tests on a new machine (for instance, run tests
719
            locally if they fail in CI): caching of successful tests means that rerunning the exact
720
            same command on the same machine will already automatically only rerun the failures.
721

722
            This defaults to `True` when running in CI (as determined by the `CI` environment
723
            variable being set) but `False` elsewhere.
724
            """
725
        ),
726
    )
727
    experimental_report_test_result_info = BoolOption(
2✔
728
        default=False,
729
        advanced=True,
730
        help=softwrap(
731
            """
732
            Report information about the test results.
733

734
            For now, it reports only the source from where the test results were fetched. When running tests,
735
            they may be executed locally or remotely, but if there are results of previous runs available,
736
            they may be retrieved from the local or remote cache, or be memoized. Knowing where the test
737
            results come from might be useful when evaluating the efficiency of the cache and the nature of
738
            the changes in the source code that may lead to frequent cache invalidations.
739
            """
740
        ),
741
    )
742
    show_all_batch_targets = BoolOption(
2✔
743
        default=False,
744
        help=softwrap(
745
            """
746
            When tests are batched via `batch_compatibility_tag`, show all target addresses in
747
            the batch in test result summaries, workunit descriptions, and warning messages.
748

749
            By default, batched test descriptions are truncated to show only the first target
750
            address (e.g. "path/to:tests and 3 other files"). When this option is enabled, all
751
            target addresses in the batch are listed (e.g.
752
            "path/to:tests, path/to:tests2, path/to:tests3, path/to:tests4").
753

754
            This is useful for CI environments where you need to know exactly which targets
755
            are grouped together in each test invocation.
756
            """
757
        ),
758
    )
759

760
    def report_dir(self, distdir: DistDir) -> PurePath:
2✔
UNCOV
761
        return PurePath(self._report_dir.format(distdir=distdir.relpath))
×
762

763
    @property
2✔
764
    def attempts_default(self):
2✔
765
        if self._attempts_default < 1:
2✔
766
            raise ValueError(
×
767
                "The `--test-attempts-default` option must have a value equal or greater than 1. "
768
                f"Instead, it was set to {self._attempts_default}."
769
            )
770
        return self._attempts_default
2✔
771

772

773
class Test(Goal):
2✔
774
    __test__ = False
2✔
775

776
    subsystem_cls = TestSubsystem
2✔
777
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
2✔
778

779

780
class TestTimeoutField(IntField, metaclass=ABCMeta):
2✔
781
    """Base field class for implementing timeouts for test targets.
782

783
    Each test target that wants to implement a timeout needs to provide with its own concrete field
784
    class extending this one.
785
    """
786

787
    __test__ = False
2✔
788

789
    alias = "timeout"
2✔
790
    required = False
2✔
791
    valid_numbers = ValidNumbers.positive_only
2✔
792
    help = help_text(
2✔
793
        """
794
        A timeout (in seconds) used by each test file belonging to this target.
795

796
        If unset, will default to `[test].timeout_default`; if that option is also unset,
797
        then the test will never time out. Will never exceed `[test].timeout_maximum`. Only
798
        applies if the option `--test-timeouts` is set to true (the default).
799
        """
800
    )
801

802
    def calculate_from_global_options(self, test: TestSubsystem) -> int | None:
2✔
UNCOV
803
        if not test.timeouts:
×
UNCOV
804
            return None
×
UNCOV
805
        if self.value is None:
×
UNCOV
806
            if test.timeout_default is None:
×
UNCOV
807
                return None
×
UNCOV
808
            result = test.timeout_default
×
809
        else:
UNCOV
810
            result = self.value
×
UNCOV
811
        if test.timeout_maximum is not None:
×
UNCOV
812
            return min(result, test.timeout_maximum)
×
UNCOV
813
        return result
×
814

815

816
class TestExtraEnvVarsField(StringSequenceField, metaclass=ABCMeta):
2✔
817
    alias = "extra_env_vars"
2✔
818
    help = help_text(
2✔
819
        f"""
820
        Additional environment variables to include in test processes.
821

822
        {EXTRA_ENV_VARS_USAGE_HELP}
823

824
        This will be merged with and override values from `[test].extra_env_vars`.
825
        """
826
    )
827

828
    def sorted(self) -> tuple[str, ...]:
2✔
829
        return tuple(sorted(self.value or ()))
2✔
830

831

832
class TestsBatchCompatibilityTagField(StringField, metaclass=ABCMeta):
2✔
833
    alias = "batch_compatibility_tag"
2✔
834

835
    @classmethod
2✔
836
    def format_help(cls, target_name: str, test_runner_name: str) -> str:
2✔
837
        return f"""
2✔
838
        An arbitrary value used to mark the test files belonging to this target as valid for
839
        batched execution.
840

841
        It's _sometimes_ safe to run multiple `{target_name}`s within a single test runner process,
842
        and doing so can give significant wins by allowing reuse of expensive test setup /
843
        teardown logic. To opt into this behavior, set this field to an arbitrary non-empty
844
        string on all the `{target_name}` targets that are safe/compatible to run in the same
845
        process.
846

847
        If this field is left unset on a target, the target is assumed to be incompatible with
848
        all others and will run in a dedicated `{test_runner_name}` process.
849

850
        If this field is set on a target, and its value is different from the value on some
851
        other test `{target_name}`, then the two targets are explicitly incompatible and are guaranteed
852
        to not run in the same `{test_runner_name}` process.
853

854
        If this field is set on a target, and its value is the same as the value on some other
855
        `{target_name}`, then the two targets are explicitly compatible and _may_ run in the same
856
        test runner process. Compatible tests may not end up in the same test runner batch if:
857

858
          * There are "too many" compatible tests in a partition, as determined by the \
859
            `[test].batch_size` config parameter, or
860
          * Compatible tests have some incompatibility in Pants metadata (i.e. different \
861
            `resolve`s or `extra_env_vars`).
862

863
        When tests with the same `batch_compatibility_tag` have incompatibilities in some other
864
        Pants metadata, they will be automatically split into separate batches. This way you can
865
        set a high-level `batch_compatibility_tag` using `__defaults__` and then have tests
866
        continue to work as you tweak BUILD metadata on specific targets.
867
        """
868

869

870
async def _get_test_batches(
2✔
871
    core_request_types: Iterable[type[TestRequest]],
872
    targets_to_field_sets: TargetRootsToFieldSets,
873
    local_environment_name: ChosenLocalEnvironmentName,
874
    test_subsystem: TestSubsystem,
875
) -> list[TestRequest.Batch]:
UNCOV
876
    def partitions_call(request_type: type[TestRequest]) -> Coroutine[Any, Any, Partitions]:
×
UNCOV
877
        partition_type = cast(TestRequest, request_type)
×
UNCOV
878
        field_set_type = partition_type.field_set_type
×
UNCOV
879
        applicable_field_sets: list[TestFieldSet] = []
×
UNCOV
880
        for target, field_sets in targets_to_field_sets.mapping.items():
×
UNCOV
881
            if field_set_type.is_applicable(target):
×
UNCOV
882
                applicable_field_sets.extend(field_sets)
×
883

UNCOV
884
        partition_request = partition_type.PartitionRequest(tuple(applicable_field_sets))
×
UNCOV
885
        return partition_tests(
×
886
            **implicitly(
887
                {
888
                    partition_request: TestRequest.PartitionRequest,
889
                    local_environment_name.val: EnvironmentName,
890
                },
891
            )
892
        )
893

UNCOV
894
    all_partitions = await concurrently(
×
895
        partitions_call(request_type) for request_type in core_request_types
896
    )
897

UNCOV
898
    return [
×
899
        request_type.Batch(
900
            cast(TestRequest, request_type).tool_name, tuple(batch), partition.metadata
901
        )
902
        for request_type, partitions in zip(core_request_types, all_partitions)
903
        for partition in partitions
904
        for batch in partition_sequentially(
905
            partition.elements,
906
            key=lambda x: str(x.address) if isinstance(x, FieldSet) else str(x),
907
            size_target=test_subsystem.batch_size,
908
            size_max=2 * test_subsystem.batch_size,
909
        )
910
    ]
911

912

913
async def _run_debug_tests(
2✔
914
    batches: Iterable[TestRequest.Batch],
915
    environment_names: Sequence[EnvironmentName],
916
    test_subsystem: TestSubsystem,
917
    debug_adapter: DebugAdapterSubsystem,
918
) -> Test:
UNCOV
919
    debug_requests = await concurrently(
×
920
        (
921
            test_batch_to_debug_request(
922
                **implicitly({batch: TestRequest.Batch, environment_name: EnvironmentName})
923
            )
924
            if not test_subsystem.debug_adapter
925
            else test_batch_to_debug_adapter_request(
926
                **implicitly({batch: TestRequest.Batch, environment_name: EnvironmentName})
927
            )
928
        )
929
        for batch, environment_name in zip(batches, environment_names)
930
    )
UNCOV
931
    exit_code = 0
×
UNCOV
932
    for debug_request, environment_name in zip(debug_requests, environment_names):
×
UNCOV
933
        if test_subsystem.debug_adapter:
×
934
            logger.info(
×
935
                softwrap(
936
                    f"""
937
                    Launching debug adapter at '{debug_adapter.host}:{debug_adapter.port}',
938
                    which will wait for a client connection...
939
                    """
940
                )
941
            )
942

UNCOV
943
        debug_result = await run_interactive_process_in_environment(
×
944
            debug_request.process, environment_name
945
        )
UNCOV
946
        if debug_result.exit_code != 0:
×
947
            exit_code = debug_result.exit_code
×
UNCOV
948
    return Test(exit_code)
×
949

950

951
def _save_test_result_info_report_file(run_id: RunId, results: dict[str, dict]) -> None:
2✔
952
    """Save a JSON file with the information about the test results."""
953
    timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
×
954
    obj = json.dumps({"timestamp": timestamp, "run_id": run_id, "info": results})
×
955
    with safe_open(f"test_result_info_report_runid{run_id}_{timestamp}.json", "w") as fh:
×
956
        fh.write(obj)
×
957

958

959
@goal_rule
2✔
960
async def run_tests(
2✔
961
    console: Console,
962
    test_subsystem: TestSubsystem,
963
    debug_adapter: DebugAdapterSubsystem,
964
    workspace: Workspace,
965
    union_membership: UnionMembership,
966
    distdir: DistDir,
967
    run_id: RunId,
968
    local_environment_name: ChosenLocalEnvironmentName,
969
) -> Test:
UNCOV
970
    if test_subsystem.debug_adapter:
×
971
        goal_description = f"`{test_subsystem.name} --debug-adapter`"
×
972
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.error
×
UNCOV
973
    elif test_subsystem.debug:
×
UNCOV
974
        goal_description = f"`{test_subsystem.name} --debug`"
×
UNCOV
975
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.error
×
976
    else:
UNCOV
977
        goal_description = f"The `{test_subsystem.name}` goal"
×
UNCOV
978
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.warn
×
979

UNCOV
980
    shard, num_shards = parse_shard_spec(test_subsystem.shard, "the [test].shard option")
×
UNCOV
981
    targets_to_valid_field_sets = await find_valid_field_sets_for_target_roots(
×
982
        TargetRootsToFieldSetsRequest(
983
            TestFieldSet,
984
            goal_description=goal_description,
985
            no_applicable_targets_behavior=no_applicable_targets_behavior,
986
            shard=shard,
987
            num_shards=num_shards,
988
        ),
989
        **implicitly(),
990
    )
991

UNCOV
992
    request_types = union_membership.get(TestRequest)
×
UNCOV
993
    test_batches = await _get_test_batches(
×
994
        request_types,
995
        targets_to_valid_field_sets,
996
        local_environment_name,
997
        test_subsystem,
998
    )
999

UNCOV
1000
    environment_names = await concurrently(
×
1001
        resolve_single_environment_name(
1002
            SingleEnvironmentNameRequest.from_field_sets(batch.elements, batch.description)
1003
        )
1004
        for batch in test_batches
1005
    )
1006

UNCOV
1007
    if test_subsystem.debug or test_subsystem.debug_adapter:
×
UNCOV
1008
        return await _run_debug_tests(
×
1009
            test_batches, environment_names, test_subsystem, debug_adapter
1010
        )
1011

UNCOV
1012
    to_test = list(zip(test_batches, environment_names))
×
UNCOV
1013
    results = await concurrently(
×
1014
        run_test_batch(
1015
            **implicitly(
1016
                {
1017
                    batch: TestRequest.Batch,
1018
                    environment_name: EnvironmentName,
1019
                }
1020
            )
1021
        )
1022
        for batch, environment_name in to_test
1023
    )
1024

1025
    # Print summary.
UNCOV
1026
    exit_code = 0
×
UNCOV
1027
    if results:
×
UNCOV
1028
        console.print_stderr("")
×
UNCOV
1029
    if test_subsystem.experimental_report_test_result_info:
×
1030
        test_result_info = {}
×
UNCOV
1031
    for result in sorted(results):
×
UNCOV
1032
        if result.exit_code is None:
×
1033
            # We end up here, e.g., if we implemented test discovery and found no tests.
1034
            continue
×
UNCOV
1035
        if result.exit_code != 0:
×
UNCOV
1036
            exit_code = result.exit_code
×
UNCOV
1037
        if result.result_metadata is None:
×
1038
            # We end up here, e.g., if compilation failed during self-implemented test discovery.
UNCOV
1039
            continue
×
UNCOV
1040
        if test_subsystem.experimental_report_test_result_info:
×
1041
            test_result_info[result.addresses[0].spec] = {
×
1042
                "source": result.result_metadata.source(run_id).value
1043
            }
NEW
1044
        console.print_stderr(
×
1045
            _format_test_summary(
1046
                result, run_id, console, test_subsystem.show_all_batch_targets
1047
            )
1048
        )
1049

UNCOV
1050
        if result.extra_output and result.extra_output.files:
×
1051
            path_prefix = str(distdir.relpath / "test" / result.path_safe_description)
×
1052
            workspace.write_digest(
×
1053
                result.extra_output.digest,
1054
                path_prefix=path_prefix,
1055
            )
1056
            if result.log_extra_output:
×
1057
                logger.info(
×
1058
                    f"Wrote extra output from test `{result.addresses[0]}` to `{path_prefix}`."
1059
                )
1060

UNCOV
1061
    rerun_command = _format_test_rerun_command(results)
×
UNCOV
1062
    if rerun_command and test_subsystem.show_rerun_command:
×
UNCOV
1063
        console.print_stderr(f"\n{rerun_command}")
×
1064

UNCOV
1065
    if test_subsystem.report:
×
UNCOV
1066
        report_dir = test_subsystem.report_dir(distdir)
×
UNCOV
1067
        merged_reports = await merge_digests(
×
1068
            MergeDigests(result.xml_results.digest for result in results if result.xml_results)
1069
        )
UNCOV
1070
        workspace.write_digest(merged_reports, path_prefix=str(report_dir))
×
UNCOV
1071
        console.print_stderr(f"\nWrote test reports to {report_dir}")
×
1072

UNCOV
1073
    if test_subsystem.use_coverage:
×
1074
        # NB: We must pre-sort the data for itertools.groupby() to work properly, using the same
1075
        # key function for both. However, you can't sort by `types`, so we call `str()` on it.
UNCOV
1076
        all_coverage_data = sorted(
×
1077
            (result.coverage_data for result in results if result.coverage_data is not None),
1078
            key=lambda cov_data: str(type(cov_data)),
1079
        )
1080

UNCOV
1081
        coverage_types_to_collection_types = {
×
1082
            collection_cls.element_type: collection_cls  # type: ignore[misc]
1083
            for collection_cls in union_membership.get(CoverageDataCollection)
1084
        }
UNCOV
1085
        coverage_collections = []
×
UNCOV
1086
        for data_cls, data in itertools.groupby(all_coverage_data, lambda data: type(data)):
×
UNCOV
1087
            collection_cls = coverage_types_to_collection_types[data_cls]  # type: ignore[index]
×
UNCOV
1088
            coverage_collections.append(collection_cls(data))
×
1089
        # We can create multiple reports for each coverage data (e.g., console, xml, html)
UNCOV
1090
        coverage_reports_collections = await concurrently(
×
1091
            create_coverage_report(
1092
                **implicitly(
1093
                    {
1094
                        coverage_collection: CoverageDataCollection,
1095
                        local_environment_name.val: EnvironmentName,
1096
                    }
1097
                )
1098
            )
1099
            for coverage_collection in coverage_collections
1100
        )
1101

UNCOV
1102
        coverage_report_files: list[PurePath] = []
×
UNCOV
1103
        for coverage_reports in coverage_reports_collections:
×
UNCOV
1104
            report_files = coverage_reports.materialize(console, workspace)
×
UNCOV
1105
            coverage_report_files.extend(report_files)
×
1106

UNCOV
1107
        if coverage_report_files and test_subsystem.open_coverage:
×
1108
            open_files = await find_open_program(
×
1109
                OpenFilesRequest(coverage_report_files, error_if_open_not_found=False),
1110
                **implicitly(),
1111
            )
1112
            for process in open_files.processes:
×
1113
                _ = await run_interactive_process_in_environment(
×
1114
                    process, local_environment_name.val
1115
                )
1116

UNCOV
1117
        for coverage_reports in coverage_reports_collections:
×
UNCOV
1118
            if coverage_reports.coverage_insufficient:
×
1119
                logger.error(
×
1120
                    softwrap(
1121
                        """
1122
                        Test goal failed due to insufficient coverage.
1123
                        See coverage reports for details.
1124
                        """
1125
                    )
1126
                )
1127
                # coverage.py uses 2 to indicate failure due to insufficient coverage.
1128
                # We may as well follow suit in the general case, for all languages.
1129
                exit_code = 2
×
1130

UNCOV
1131
    if test_subsystem.experimental_report_test_result_info:
×
1132
        _save_test_result_info_report_file(run_id, test_result_info)
×
1133

UNCOV
1134
    return Test(exit_code)
×
1135

1136

1137
_SOURCE_MAP = {
2✔
1138
    ProcessResultMetadata.Source.MEMOIZED: "memoized",
1139
    ProcessResultMetadata.Source.RAN: "ran",
1140
    ProcessResultMetadata.Source.HIT_LOCALLY: "cached locally",
1141
    ProcessResultMetadata.Source.HIT_REMOTELY: "cached remotely",
1142
}
1143

1144

1145
def _format_test_summary(
2✔
1146
    result: TestResult,
1147
    run_id: RunId,
1148
    console: Console,
1149
    show_all_batch_targets: bool = False,
1150
) -> str:
1151
    """Format the test summary printed to the console."""
UNCOV
1152
    assert result.result_metadata is not None, (
×
1153
        "Skipped test results should not be outputted in the test summary"
1154
    )
UNCOV
1155
    succeeded = result.exit_code == 0
×
UNCOV
1156
    retried = len(result.process_results) > 1
×
1157

UNCOV
1158
    if succeeded:
×
UNCOV
1159
        if not retried:
×
UNCOV
1160
            sigil = console.sigil_succeeded()
×
1161
        else:
1162
            sigil = console.sigil_succeeded_with_edits()
×
UNCOV
1163
        status = "succeeded"
×
1164
    else:
UNCOV
1165
        sigil = console.sigil_failed()
×
UNCOV
1166
        status = "failed"
×
1167

UNCOV
1168
    if retried:
×
1169
        attempt_msg = f" after {len(result.process_results)} attempts"
×
1170
    else:
UNCOV
1171
        attempt_msg = ""
×
1172

UNCOV
1173
    environment = result.result_metadata.execution_environment.name
×
UNCOV
1174
    environment_type = result.result_metadata.execution_environment.environment_type
×
UNCOV
1175
    source = result.result_metadata.source(run_id)
×
UNCOV
1176
    source_str = _SOURCE_MAP[source]
×
UNCOV
1177
    if environment:
×
UNCOV
1178
        preposition = "in" if source == ProcessResultMetadata.Source.RAN else "for"
×
UNCOV
1179
        source_desc = (
×
1180
            f" ({source_str} {preposition} {environment_type} environment `{environment}`)"
1181
        )
UNCOV
1182
    elif source == ProcessResultMetadata.Source.RAN:
×
UNCOV
1183
        source_desc = ""
×
1184
    else:
UNCOV
1185
        source_desc = f" ({source_str})"
×
1186

UNCOV
1187
    elapsed_print = ""
×
UNCOV
1188
    total_elapsed_ms = result.result_metadata.total_elapsed_ms
×
UNCOV
1189
    if total_elapsed_ms is not None:
×
UNCOV
1190
        elapsed_secs = total_elapsed_ms / 1000
×
UNCOV
1191
        elapsed_print = f"in {elapsed_secs:.2f}s"
×
1192

NEW
1193
    if show_all_batch_targets and len(result.addresses) > 1:
×
NEW
1194
        description = ", ".join(addr.spec for addr in result.addresses)
×
1195
    else:
NEW
1196
        description = result.description
×
1197

NEW
1198
    return f"{sigil} {description} {status}{attempt_msg} {elapsed_print}{source_desc}."
×
1199

1200

1201
def _format_test_rerun_command(results: Iterable[TestResult]) -> None | str:
2✔
UNCOV
1202
    failures = [result for result in results if result.exit_code not in (None, 0)]
×
UNCOV
1203
    if not failures:
×
UNCOV
1204
        return None
×
1205

1206
    # format an invocation like `pants test path/to/first:address path/to/second:address ...`
UNCOV
1207
    addresses = sorted(shlex.quote(str(addr)) for result in failures for addr in result.addresses)
×
UNCOV
1208
    goal = f"{bin_name()} {TestSubsystem.name}"
×
UNCOV
1209
    invocation = " ".join([goal, *addresses])
×
1210

UNCOV
1211
    return f"To rerun the failing tests, use:\n\n    {invocation}"
×
1212

1213

1214
@dataclass(frozen=True)
2✔
1215
class TestExtraEnv:
2✔
1216
    env: EnvironmentVars
2✔
1217

1218

1219
@rule
2✔
1220
async def get_filtered_environment(test_env_aware: TestSubsystem.EnvironmentAware) -> TestExtraEnv:
2✔
1221
    return TestExtraEnv(
2✔
1222
        await environment_vars_subset(
1223
            EnvironmentVarsRequest(test_env_aware.extra_env_vars), **implicitly()
1224
        )
1225
    )
1226

1227

1228
@memoized
2✔
1229
def _unsupported_debug_rules(cls: type[TestRequest]) -> Iterable:
2✔
1230
    """Returns a rule that implements TestDebugRequest by raising an error."""
1231

UNCOV
1232
    @rule(canonical_name_suffix=cls.__name__, _param_type_overrides={"request": cls.Batch})
×
UNCOV
1233
    async def get_test_debug_request(request: TestRequest.Batch) -> TestDebugRequest:
×
1234
        raise NotImplementedError("Testing this target with --debug is not yet supported.")
×
1235

UNCOV
1236
    return collect_rules(locals())
×
1237

1238

1239
@memoized
2✔
1240
def _unsupported_debug_adapter_rules(cls: type[TestRequest]) -> Iterable:
2✔
1241
    """Returns a rule that implements TestDebugAdapterRequest by raising an error."""
1242

1243
    @rule(canonical_name_suffix=cls.__name__, _param_type_overrides={"request": cls.Batch})
2✔
1244
    async def get_test_debug_adapter_request(request: TestRequest.Batch) -> TestDebugAdapterRequest:
2✔
1245
        raise NotImplementedError(
×
1246
            "Testing this target type with a debug adapter is not yet supported."
1247
        )
1248

1249
    return collect_rules(locals())
2✔
1250

1251

1252
# -------------------------------------------------------------------------------------------
1253
# `runtime_package_dependencies` field
1254
# -------------------------------------------------------------------------------------------
1255

1256

1257
class RuntimePackageDependenciesField(SpecialCasedDependencies):
2✔
1258
    alias = "runtime_package_dependencies"
2✔
1259
    help = help_text(
2✔
1260
        f"""
1261
        Addresses to targets that can be built with the `{bin_name()} package` goal and whose
1262
        resulting artifacts should be included in the test run.
1263

1264
        Pants will build the artifacts as if you had run `{bin_name()} package`.
1265
        It will include the results in your test's chroot, using the same name they would normally
1266
        have, but without the `--distdir` prefix (e.g. `dist/`).
1267

1268
        You can include anything that can be built by `{bin_name()} package`, e.g. a `pex_binary`,
1269
        `python_aws_lambda_function`, or an `archive`.
1270
        """
1271
    )
1272

1273

1274
class BuiltPackageDependencies(Collection[BuiltPackage]):
2✔
1275
    pass
2✔
1276

1277

1278
@dataclass(frozen=True)
2✔
1279
class BuildPackageDependenciesRequest:
2✔
1280
    field: RuntimePackageDependenciesField
2✔
1281

1282

1283
@rule(desc="Build runtime package dependencies for tests", level=LogLevel.DEBUG)
2✔
1284
async def build_runtime_package_dependencies(
2✔
1285
    request: BuildPackageDependenciesRequest,
1286
) -> BuiltPackageDependencies:
UNCOV
1287
    unparsed_addresses = request.field.to_unparsed_address_inputs()
×
UNCOV
1288
    if not unparsed_addresses:
×
1289
        return BuiltPackageDependencies()
×
UNCOV
1290
    tgts = await resolve_targets(**implicitly(unparsed_addresses))
×
UNCOV
1291
    field_sets_per_tgt = await find_valid_field_sets(
×
1292
        FieldSetsPerTargetRequest(PackageFieldSet, tgts), **implicitly()
1293
    )
UNCOV
1294
    packages = await concurrently(
×
1295
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
1296
        for field_set in field_sets_per_tgt.field_sets
1297
    )
UNCOV
1298
    return BuiltPackageDependencies(packages)
×
1299

1300

1301
def rules():
2✔
UNCOV
1302
    return [
×
1303
        *collect_rules(),
1304
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc