• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20332790708

18 Dec 2025 09:48AM UTC coverage: 64.992% (-15.3%) from 80.295%
20332790708

Pull #22949

github

web-flow
Merge f730a56cd into 407284c67
Pull Request #22949: Add experimental uv resolver for Python lockfiles

54 of 97 new or added lines in 5 files covered. (55.67%)

8270 existing lines in 295 files now uncovered.

48990 of 75379 relevant lines covered (64.99%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.5
/src/python/pants/core/goals/test.py
1
# Copyright 2018 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
5✔
5

6
import itertools
5✔
7
import json
5✔
8
import logging
5✔
9
import os
5✔
10
import shlex
5✔
11
from abc import ABC, ABCMeta
5✔
12
from collections.abc import Coroutine, Iterable, Sequence
5✔
13
from dataclasses import dataclass, field
5✔
14
from datetime import datetime
5✔
15
from enum import Enum
5✔
16
from pathlib import PurePath
5✔
17
from typing import Any, ClassVar, TypeVar, cast
5✔
18

19
from pants.core.environments.rules import (
5✔
20
    ChosenLocalEnvironmentName,
21
    EnvironmentName,
22
    SingleEnvironmentNameRequest,
23
    resolve_single_environment_name,
24
)
25
from pants.core.goals.multi_tool_goal_helper import SkippableSubsystem
5✔
26
from pants.core.goals.package import (
5✔
27
    BuiltPackage,
28
    EnvironmentAwarePackageRequest,
29
    PackageFieldSet,
30
    environment_aware_package,
31
)
32
from pants.core.subsystems.debug_adapter import DebugAdapterSubsystem
5✔
33
from pants.core.util_rules.distdir import DistDir
5✔
34
from pants.core.util_rules.env_vars import environment_vars_subset
5✔
35
from pants.core.util_rules.partitions import (
5✔
36
    PartitionerType,
37
    PartitionMetadataT,
38
    Partitions,
39
    _BatchBase,
40
    _PartitionFieldSetsRequestBase,
41
)
42
from pants.engine.addresses import Address
5✔
43
from pants.engine.collection import Collection
5✔
44
from pants.engine.console import Console
5✔
45
from pants.engine.desktop import OpenFilesRequest, find_open_program
5✔
46
from pants.engine.engine_aware import EngineAwareReturnType
5✔
47
from pants.engine.env_vars import EXTRA_ENV_VARS_USAGE_HELP, EnvironmentVars, EnvironmentVarsRequest
5✔
48
from pants.engine.fs import EMPTY_FILE_DIGEST, FileDigest, MergeDigests, Snapshot, Workspace
5✔
49
from pants.engine.goal import Goal, GoalSubsystem
5✔
50
from pants.engine.internals.graph import find_valid_field_sets, resolve_targets
5✔
51
from pants.engine.internals.session import RunId
5✔
52
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
5✔
53
from pants.engine.intrinsics import merge_digests, run_interactive_process_in_environment
5✔
54
from pants.engine.process import FallibleProcessResult, InteractiveProcess, ProcessResultMetadata
5✔
55
from pants.engine.rules import collect_rules, concurrently, goal_rule, implicitly, rule
5✔
56
from pants.engine.target import (
5✔
57
    FieldSet,
58
    FieldSetsPerTargetRequest,
59
    IntField,
60
    NoApplicableTargetsBehavior,
61
    SourcesField,
62
    SpecialCasedDependencies,
63
    StringField,
64
    StringSequenceField,
65
    TargetRootsToFieldSets,
66
    TargetRootsToFieldSetsRequest,
67
    ValidNumbers,
68
    parse_shard_spec,
69
)
70
from pants.engine.unions import UnionMembership, UnionRule, distinct_union_type_per_subclass, union
5✔
71
from pants.option.option_types import BoolOption, EnumOption, IntOption, StrListOption, StrOption
5✔
72
from pants.util.collections import partition_sequentially
5✔
73
from pants.util.dirutil import safe_open
5✔
74
from pants.util.docutil import bin_name
5✔
75
from pants.util.logging import LogLevel
5✔
76
from pants.util.memo import memoized, memoized_property
5✔
77
from pants.util.meta import classproperty
5✔
78
from pants.util.strutil import Simplifier, help_text, softwrap
5✔
79

80
logger = logging.getLogger(__name__)
5✔
81

82

83
@dataclass(frozen=True)
5✔
84
class TestResult(EngineAwareReturnType):
5✔
85
    # A None exit_code indicates a backend that performs its own test discovery/selection
86
    # (rather than delegating that to the underlying test tool), and discovered no tests.
87
    exit_code: int | None
5✔
88
    stdout_bytes: bytes
5✔
89
    stdout_digest: FileDigest
5✔
90
    stderr_bytes: bytes
5✔
91
    stderr_digest: FileDigest
5✔
92
    addresses: tuple[Address, ...]
5✔
93
    output_setting: ShowOutput
5✔
94
    # A None result_metadata indicates a backend that performs its own test discovery/selection
95
    # and either discovered no tests, or encountered an error, such as a compilation error, in
96
    # the attempt.
97
    result_metadata: ProcessResultMetadata | None  # TODO: Merge elapsed MS of all subproceses
5✔
98
    partition_description: str | None = None
5✔
99

100
    coverage_data: CoverageData | None = None
5✔
101
    # TODO: Rename this to `reports`. There is no guarantee that every language will produce
102
    #  XML reports, or only XML reports.
103
    xml_results: Snapshot | None = None
5✔
104
    # Any extra output (such as from plugins) that the test runner was configured to output.
105
    extra_output: Snapshot | None = None
5✔
106
    # True if the core test rules should log that extra output was written.
107
    log_extra_output: bool = False
5✔
108
    # All results including failed attempts
109
    process_results: tuple[FallibleProcessResult, ...] = field(default_factory=tuple)
5✔
110

111
    output_simplifier: Simplifier = Simplifier()
5✔
112

113
    # Prevent this class from being detected by pytest as a test class.
114
    __test__ = False
5✔
115

116
    @staticmethod
5✔
117
    def no_tests_found(address: Address, output_setting: ShowOutput) -> TestResult:
5✔
118
        """Used when we do test discovery ourselves, and we didn't find any."""
119
        return TestResult(
×
120
            exit_code=None,
121
            stdout_bytes=b"",
122
            stderr_bytes=b"",
123
            stdout_digest=EMPTY_FILE_DIGEST,
124
            stderr_digest=EMPTY_FILE_DIGEST,
125
            addresses=(address,),
126
            output_setting=output_setting,
127
            result_metadata=None,
128
        )
129

130
    @staticmethod
5✔
131
    def no_tests_found_in_batch(
5✔
132
        batch: TestRequest.Batch[_TestFieldSetT, Any], output_setting: ShowOutput
133
    ) -> TestResult:
134
        """Used when we do test discovery ourselves, and we didn't find any."""
135
        return TestResult(
×
136
            exit_code=None,
137
            stdout_bytes=b"",
138
            stderr_bytes=b"",
139
            stdout_digest=EMPTY_FILE_DIGEST,
140
            stderr_digest=EMPTY_FILE_DIGEST,
141
            addresses=tuple(field_set.address for field_set in batch.elements),
142
            output_setting=output_setting,
143
            result_metadata=None,
144
            partition_description=batch.partition_metadata.description,
145
        )
146

147
    @staticmethod
5✔
148
    def from_fallible_process_result(
5✔
149
        process_results: tuple[FallibleProcessResult, ...],
150
        address: Address,
151
        output_setting: ShowOutput,
152
        *,
153
        coverage_data: CoverageData | None = None,
154
        xml_results: Snapshot | None = None,
155
        extra_output: Snapshot | None = None,
156
        log_extra_output: bool = False,
157
        output_simplifier: Simplifier = Simplifier(),
158
    ) -> TestResult:
159
        process_result = process_results[-1]
×
160
        return TestResult(
×
161
            exit_code=process_result.exit_code,
162
            stdout_bytes=process_result.stdout,
163
            stdout_digest=process_result.stdout_digest,
164
            stderr_bytes=process_result.stderr,
165
            stderr_digest=process_result.stderr_digest,
166
            addresses=(address,),
167
            output_setting=output_setting,
168
            result_metadata=process_result.metadata,
169
            coverage_data=coverage_data,
170
            xml_results=xml_results,
171
            extra_output=extra_output,
172
            log_extra_output=log_extra_output,
173
            process_results=process_results,
174
            output_simplifier=output_simplifier,
175
        )
176

177
    @staticmethod
5✔
178
    def from_batched_fallible_process_result(
5✔
179
        process_results: tuple[FallibleProcessResult, ...],
180
        batch: TestRequest.Batch[_TestFieldSetT, Any],
181
        output_setting: ShowOutput,
182
        *,
183
        coverage_data: CoverageData | None = None,
184
        xml_results: Snapshot | None = None,
185
        extra_output: Snapshot | None = None,
186
        log_extra_output: bool = False,
187
        output_simplifier: Simplifier = Simplifier(),
188
    ) -> TestResult:
189
        process_result = process_results[-1]
×
190
        return TestResult(
×
191
            exit_code=process_result.exit_code,
192
            stdout_bytes=process_result.stdout,
193
            stdout_digest=process_result.stdout_digest,
194
            stderr_bytes=process_result.stderr,
195
            stderr_digest=process_result.stderr_digest,
196
            addresses=tuple(field_set.address for field_set in batch.elements),
197
            output_setting=output_setting,
198
            result_metadata=process_result.metadata,
199
            coverage_data=coverage_data,
200
            xml_results=xml_results,
201
            extra_output=extra_output,
202
            log_extra_output=log_extra_output,
203
            output_simplifier=output_simplifier,
204
            partition_description=batch.partition_metadata.description,
205
            process_results=process_results,
206
        )
207

208
    @property
5✔
209
    def description(self) -> str:
5✔
UNCOV
210
        if len(self.addresses) == 1:
×
UNCOV
211
            return self.addresses[0].spec
×
212

213
        return f"{self.addresses[0].spec} and {len(self.addresses) - 1} other files"
×
214

215
    @property
5✔
216
    def path_safe_description(self) -> str:
5✔
217
        if len(self.addresses) == 1:
×
218
            return self.addresses[0].path_safe_spec
×
219

220
        return f"{self.addresses[0].path_safe_spec}+{len(self.addresses) - 1}"
×
221

222
    def __lt__(self, other: Any) -> bool:
5✔
223
        """We sort first by exit code, then alphanumerically within each group."""
UNCOV
224
        if not isinstance(other, TestResult):
×
225
            return NotImplemented
×
UNCOV
226
        if self.exit_code == other.exit_code:
×
UNCOV
227
            return self.description < other.description
×
UNCOV
228
        if self.exit_code is None:
×
229
            return True
×
UNCOV
230
        if other.exit_code is None:
×
231
            return False
×
UNCOV
232
        return abs(self.exit_code) < abs(other.exit_code)
×
233

234
    def artifacts(self) -> dict[str, FileDigest | Snapshot] | None:
5✔
235
        output: dict[str, FileDigest | Snapshot] = {
×
236
            "stdout": self.stdout_digest,
237
            "stderr": self.stderr_digest,
238
        }
239
        if self.xml_results:
×
240
            output["xml_results"] = self.xml_results
×
241
        return output
×
242

243
    def level(self) -> LogLevel:
5✔
UNCOV
244
        if self.exit_code is None:
×
UNCOV
245
            return LogLevel.DEBUG
×
UNCOV
246
        return LogLevel.INFO if self.exit_code == 0 else LogLevel.ERROR
×
247

248
    def _simplified_output(self, v: bytes) -> str:
5✔
249
        return self.output_simplifier.simplify(v.decode(errors="replace"))
2✔
250

251
    @memoized_property
5✔
252
    def stdout_simplified_str(self) -> str:
5✔
253
        return self._simplified_output(self.stdout_bytes)
2✔
254

255
    @memoized_property
5✔
256
    def stderr_simplified_str(self) -> str:
5✔
UNCOV
257
        return self._simplified_output(self.stderr_bytes)
×
258

259
    def message(self) -> str:
5✔
UNCOV
260
        if self.exit_code is None:
×
UNCOV
261
            return "no tests found."
×
UNCOV
262
        status = "succeeded" if self.exit_code == 0 else f"failed (exit code {self.exit_code})"
×
UNCOV
263
        message = f"{status}."
×
UNCOV
264
        if self.partition_description:
×
265
            message += f"\nPartition: {self.partition_description}"
×
UNCOV
266
        if self.output_setting == ShowOutput.NONE or (
×
267
            self.output_setting == ShowOutput.FAILED and self.exit_code == 0
268
        ):
UNCOV
269
            return message
×
UNCOV
270
        output = ""
×
UNCOV
271
        if self.stdout_bytes:
×
UNCOV
272
            output += f"\n{self.stdout_simplified_str}"
×
UNCOV
273
        if self.stderr_bytes:
×
UNCOV
274
            output += f"\n{self.stderr_simplified_str}"
×
UNCOV
275
        if output:
×
UNCOV
276
            output = f"{output.rstrip()}\n\n"
×
UNCOV
277
        return f"{message}{output}"
×
278

279
    def metadata(self) -> dict[str, Any]:
5✔
280
        return {"addresses": [address.spec for address in self.addresses]}
×
281

282
    def cacheable(self) -> bool:
5✔
283
        """Is marked uncacheable to ensure that it always renders."""
284
        return False
×
285

286

287
class ShowOutput(Enum):
5✔
288
    """Which tests to emit detailed output for."""
289

290
    ALL = "all"
5✔
291
    FAILED = "failed"
5✔
292
    NONE = "none"
5✔
293

294

295
@dataclass(frozen=True)
5✔
296
class TestDebugRequest:
5✔
297
    process: InteractiveProcess
5✔
298

299
    # Prevent this class from being detected by pytest as a test class.
300
    __test__ = False
5✔
301

302

303
class TestDebugAdapterRequest(TestDebugRequest):
5✔
304
    """Like TestDebugRequest, but launches the test process using the relevant Debug Adapter server.
305

306
    The process should be launched waiting for the client to connect.
307
    """
308

309

310
@union
5✔
311
@dataclass(frozen=True)
5✔
312
class TestFieldSet(FieldSet, metaclass=ABCMeta):
5✔
313
    """The fields necessary to run tests on a target."""
314

315
    sources: SourcesField
5✔
316

317
    __test__ = False
5✔
318

319

320
_TestFieldSetT = TypeVar("_TestFieldSetT", bound=TestFieldSet)
5✔
321

322

323
@union
5✔
324
class TestRequest:
5✔
325
    """Base class for plugin types wanting to be run as part of `test`.
326

327
    Plugins should define a new type which subclasses this type, and set the
328
    appropriate class variables.
329
    E.g.
330
        class DryCleaningRequest(TestRequest):
331
            tool_subsystem = DryCleaningSubsystem
332
            field_set_type = DryCleaningFieldSet
333

334
    Then register the rules which tell Pants about your plugin.
335
    E.g.
336
        def rules():
337
            return [
338
                *collect_rules(),
339
                *DryCleaningRequest.rules(),
340
            ]
341
    """
342

343
    tool_subsystem: ClassVar[type[SkippableSubsystem]]
5✔
344
    field_set_type: ClassVar[type[TestFieldSet]]
5✔
345
    partitioner_type: ClassVar[PartitionerType] = PartitionerType.DEFAULT_ONE_PARTITION_PER_INPUT
5✔
346

347
    supports_debug: ClassVar[bool] = False
5✔
348
    supports_debug_adapter: ClassVar[bool] = False
5✔
349

350
    __test__ = False
5✔
351

352
    @classproperty
5✔
353
    def tool_name(cls) -> str:
5✔
UNCOV
354
        return cls.tool_subsystem.options_scope
×
355

356
    @distinct_union_type_per_subclass(in_scope_types=[EnvironmentName])
5✔
357
    class PartitionRequest(_PartitionFieldSetsRequestBase[_TestFieldSetT]):
5✔
358
        def metadata(self) -> dict[str, Any]:
5✔
359
            return {"addresses": [field_set.address.spec for field_set in self.field_sets]}
×
360

361
    @distinct_union_type_per_subclass(in_scope_types=[EnvironmentName])
5✔
362
    class Batch(_BatchBase[_TestFieldSetT, PartitionMetadataT]):
5✔
363
        @property
5✔
364
        def single_element(self) -> _TestFieldSetT:
5✔
365
            """Return the single element of this batch.
366

367
            NOTE: Accessing this property will raise a `TypeError` if this `Batch` contains
368
            >1 elements. It is only safe to be used by test runners utilizing the "default"
369
            one-input-per-partition partitioner type.
370
            """
371

372
            if len(self.elements) != 1:
×
373
                description = ""
×
374
                if self.partition_metadata.description:
×
375
                    description = f" from partition '{self.partition_metadata.description}'"
×
376
                raise TypeError(
×
377
                    f"Expected a single element in batch{description}, but found {len(self.elements)}"
378
                )
379

380
            return self.elements[0]
×
381

382
        @property
5✔
383
        def description(self) -> str:
5✔
UNCOV
384
            if self.partition_metadata and self.partition_metadata.description:
×
385
                return f"test batch from partition '{self.partition_metadata.description}'"
×
UNCOV
386
            return "test batch"
×
387

388
        def debug_hint(self) -> str:
5✔
389
            if len(self.elements) == 1:
×
390
                return self.elements[0].address.spec
×
391

392
            return f"{self.elements[0].address.spec} and {len(self.elements) - 1} other files"
×
393

394
        def metadata(self) -> dict[str, Any]:
5✔
395
            return {
×
396
                "addresses": [field_set.address.spec for field_set in self.elements],
397
                "partition_description": self.partition_metadata.description,
398
            }
399

400
    @classmethod
5✔
401
    def rules(cls) -> Iterable:
5✔
402
        yield from cls.partitioner_type.default_rules(cls, by_file=False)
5✔
403

404
        yield UnionRule(TestFieldSet, cls.field_set_type)
5✔
405
        yield UnionRule(TestRequest, cls)
5✔
406
        yield UnionRule(TestRequest.PartitionRequest, cls.PartitionRequest)
5✔
407
        yield UnionRule(TestRequest.Batch, cls.Batch)
5✔
408

409
        if not cls.supports_debug:
5✔
410
            yield from _unsupported_debug_rules(cls)
4✔
411

412
        if not cls.supports_debug_adapter:
5✔
413
            yield from _unsupported_debug_adapter_rules(cls)
5✔
414

415

416
@rule(polymorphic=True)
5✔
417
async def partition_tests(req: TestRequest.PartitionRequest) -> Partitions:
5✔
418
    raise NotImplementedError()
×
419

420

421
@rule(polymorphic=True)
5✔
422
async def test_batch_to_debug_request(batch: TestRequest.Batch) -> TestDebugRequest:
5✔
423
    raise NotImplementedError()
×
424

425

426
@rule(polymorphic=True)
5✔
427
async def test_batch_to_debug_adapter_request(batch: TestRequest.Batch) -> TestDebugAdapterRequest:
5✔
428
    raise NotImplementedError()
×
429

430

431
@rule(polymorphic=True)
5✔
432
async def run_test_batch(batch: TestRequest.Batch) -> TestResult:
5✔
433
    raise NotImplementedError()
×
434

435

436
class CoverageData(ABC):
5✔
437
    """Base class for inputs to a coverage report.
438

439
    Subclasses should add whichever fields they require - snapshots of coverage output, XML files,
440
    etc.
441
    """
442

443

444
_CD = TypeVar("_CD", bound=CoverageData)
5✔
445

446

447
@union(in_scope_types=[EnvironmentName])
5✔
448
class CoverageDataCollection(Collection[_CD]):
5✔
449
    element_type: ClassVar[type[_CD]]
5✔
450

451

452
@dataclass(frozen=True)
5✔
453
class CoverageReport(ABC):
5✔
454
    """Represents a code coverage report that can be materialized to the terminal or disk."""
455

456
    # Some coverage systems can determine, based on a configurable threshold, whether coverage
457
    # was sufficient or not. The test goal will fail the build if coverage was deemed insufficient.
458
    coverage_insufficient: bool
5✔
459

460
    def materialize(self, console: Console, workspace: Workspace) -> PurePath | None:
5✔
461
        """Materialize this code coverage report to the terminal or disk.
462

463
        :param console: A handle to the terminal.
464
        :param workspace: A handle to local disk.
465
        :return: If a report was materialized to disk, the path of the file in the report one might
466
                 open first to start examining the report.
467
        """
468
        ...
469

470
    def get_artifact(self) -> tuple[str, Snapshot] | None:
5✔
471
        return None
×
472

473

474
@dataclass(frozen=True)
5✔
475
class ConsoleCoverageReport(CoverageReport):
5✔
476
    """Materializes a code coverage report to the terminal."""
477

478
    report: str
5✔
479

480
    def materialize(self, console: Console, workspace: Workspace) -> None:
5✔
UNCOV
481
        console.print_stderr(f"\n{self.report}")
×
UNCOV
482
        return None
×
483

484

485
@dataclass(frozen=True)
5✔
486
class FilesystemCoverageReport(CoverageReport):
5✔
487
    """Materializes a code coverage report to disk."""
488

489
    result_snapshot: Snapshot
5✔
490
    directory_to_materialize_to: PurePath
5✔
491
    report_file: PurePath | None
5✔
492
    report_type: str
5✔
493

494
    def materialize(self, console: Console, workspace: Workspace) -> PurePath | None:
5✔
495
        workspace.write_digest(
×
496
            self.result_snapshot.digest, path_prefix=str(self.directory_to_materialize_to)
497
        )
498
        console.print_stderr(
×
499
            f"\nWrote {self.report_type} coverage report to `{self.directory_to_materialize_to}`"
500
        )
501
        return self.report_file
×
502

503
    def get_artifact(self) -> tuple[str, Snapshot] | None:
5✔
504
        return f"coverage_{self.report_type}", self.result_snapshot
×
505

506

507
@dataclass(frozen=True)
5✔
508
class CoverageReports(EngineAwareReturnType):
5✔
509
    reports: tuple[CoverageReport, ...]
5✔
510

511
    @property
5✔
512
    def coverage_insufficient(self) -> bool:
5✔
513
        """Whether to fail the build due to insufficient coverage."""
UNCOV
514
        return any(report.coverage_insufficient for report in self.reports)
×
515

516
    def materialize(self, console: Console, workspace: Workspace) -> tuple[PurePath, ...]:
5✔
UNCOV
517
        report_paths = []
×
UNCOV
518
        for report in self.reports:
×
UNCOV
519
            report_path = report.materialize(console, workspace)
×
UNCOV
520
            if report_path:
×
521
                report_paths.append(report_path)
×
UNCOV
522
        return tuple(report_paths)
×
523

524
    def artifacts(self) -> dict[str, Snapshot | FileDigest] | None:
5✔
525
        artifacts: dict[str, Snapshot | FileDigest] = {}
×
526
        for report in self.reports:
×
527
            artifact = report.get_artifact()
×
528
            if not artifact:
×
529
                continue
×
530
            artifacts[artifact[0]] = artifact[1]
×
531
        return artifacts or None
×
532

533

534
@rule(polymorphic=True)
5✔
535
async def create_coverage_report(req: CoverageDataCollection) -> CoverageReports:
5✔
536
    raise NotImplementedError()
×
537

538

539
class TestSubsystem(GoalSubsystem):
5✔
540
    name = "test"
5✔
541
    help = "Run tests."
5✔
542

543
    # Prevent this class from being detected by pytest as a test class.
544
    __test__ = False
5✔
545

546
    @classmethod
5✔
547
    def activated(cls, union_membership: UnionMembership) -> bool:
5✔
548
        return TestRequest in union_membership
×
549

550
    class EnvironmentAware:
5✔
551
        extra_env_vars = StrListOption(
5✔
552
            help=softwrap(
553
                f"""
554
                Additional environment variables to include in test processes.
555

556
                {EXTRA_ENV_VARS_USAGE_HELP}
557
                """
558
            ),
559
        )
560

561
    debug = BoolOption(
5✔
562
        default=False,
563
        help=softwrap(
564
            """
565
            Run tests sequentially in an interactive process. This is necessary, for
566
            example, when you add breakpoints to your code.
567
            """
568
        ),
569
    )
570
    # See also `run.py`'s same option
571
    debug_adapter = BoolOption(
5✔
572
        default=False,
573
        help=softwrap(
574
            """
575
            Run tests sequentially in an interactive process, using a Debug Adapter
576
            (https://microsoft.github.io/debug-adapter-protocol/) for the language if supported.
577

578
            The interactive process used will be immediately blocked waiting for a client before
579
            continuing.
580

581
            This option implies `--debug`.
582
            """
583
        ),
584
    )
585
    force = BoolOption(
5✔
586
        default=False,
587
        help="Force the tests to run, even if they could be satisfied from cache.",
588
    )
589
    output = EnumOption(
5✔
590
        default=ShowOutput.FAILED,
591
        help="Show stdout/stderr for these tests.",
592
    )
593
    use_coverage = BoolOption(
5✔
594
        default=False,
595
        help="Generate a coverage report if the test runner supports it.",
596
    )
597
    open_coverage = BoolOption(
5✔
598
        default=False,
599
        help=softwrap(
600
            """
601
            If a coverage report file is generated, open it on the local system if the
602
            system supports this.
603
            """
604
        ),
605
    )
606
    report = BoolOption(default=False, advanced=True, help="Write test reports to `--report-dir`.")
5✔
607
    default_report_path = str(PurePath("{distdir}", "test", "reports"))
5✔
608
    _report_dir = StrOption(
5✔
609
        default=default_report_path,
610
        advanced=True,
611
        help="Path to write test reports to. Must be relative to the build root.",
612
    )
613
    shard = StrOption(
5✔
614
        default="",
615
        help=softwrap(
616
            """
617
            A shard specification of the form "k/N", where N is a positive integer and k is a
618
            non-negative integer less than N.
619

620
            If set, the request input targets will be deterministically partitioned into N disjoint
621
            subsets of roughly equal size, and only the k'th subset will be used, with all others
622
            discarded.
623

624
            Useful for splitting large numbers of test files across multiple machines in CI.
625
            For example, you can run three shards with `--shard=0/3`, `--shard=1/3`, `--shard=2/3`.
626

627
            Note that the shards are roughly equal in size as measured by number of files.
628
            No attempt is made to consider the size of different files, the time they have
629
            taken to run in the past, or other such sophisticated measures.
630
            """
631
        ),
632
    )
633
    timeouts = BoolOption(
5✔
634
        default=True,
635
        help=softwrap(
636
            """
637
            Enable test target timeouts. If timeouts are enabled then test targets with a
638
            `timeout=` parameter set on their target will time out after the given number of
639
            seconds if not completed. If no timeout is set, then either the default timeout
640
            is used or no timeout is configured.
641
            """
642
        ),
643
    )
644
    timeout_default = IntOption(
5✔
645
        default=None,
646
        advanced=True,
647
        help=softwrap(
648
            """
649
            The default timeout (in seconds) for a test target if the `timeout` field is not
650
            set on the target.
651
            """
652
        ),
653
    )
654
    timeout_maximum = IntOption(
5✔
655
        default=None,
656
        advanced=True,
657
        help="The maximum timeout (in seconds) that may be used on a test target.",
658
    )
659
    _attempts_default = IntOption(
5✔
660
        default=1,
661
        help=softwrap(
662
            """
663
            The number of attempts to run tests, in case of a test failure.
664
            Tests that were retried will include the number of attempts in the summary output.
665
            """
666
        ),
667
    )
668

669
    batch_size = IntOption(
5✔
670
        "--batch-size",
671
        default=128,
672
        advanced=True,
673
        help=softwrap(
674
            """
675
            The target maximum number of files to be included in each run of batch-enabled
676
            test runners.
677

678
            Some test runners can execute tests from multiple files in a single run. Test
679
            implementations will return all tests that _can_ run together as a single group -
680
            and then this may be further divided into smaller batches, based on this option.
681
            This is done:
682

683
              1. to avoid OS argument length limits (in processes which don't support argument files)
684
              2. to support more stable cache keys than would be possible if all files were operated \
685
                 on in a single batch
686
              3. to allow for parallelism in test runners which don't have internal \
687
                 parallelism, or -- if they do support internal parallelism -- to improve scheduling \
688
                 behavior when multiple processes are competing for cores and so internal parallelism \
689
                 cannot be used perfectly
690

691
            In order to improve cache hit rates (see 2.), batches are created at stable boundaries,
692
            and so this value is only a "target" max batch size (rather than an exact value).
693

694
            NOTE: This parameter has no effect on test runners/plugins that do not implement support
695
            for batched testing.
696
            """
697
        ),
698
    )
699

700
    show_rerun_command = BoolOption(
5✔
701
        default="CI" in os.environ,
702
        advanced=True,
703
        help=softwrap(
704
            f"""
705
            If tests fail, show an appropriate `{bin_name()} {name} ...` invocation to rerun just
706
            those tests.
707

708
            This is to make it easy to run those tests on a new machine (for instance, run tests
709
            locally if they fail in CI): caching of successful tests means that rerunning the exact
710
            same command on the same machine will already automatically only rerun the failures.
711

712
            This defaults to `True` when running in CI (as determined by the `CI` environment
713
            variable being set) but `False` elsewhere.
714
            """
715
        ),
716
    )
717
    experimental_report_test_result_info = BoolOption(
5✔
718
        default=False,
719
        advanced=True,
720
        help=softwrap(
721
            """
722
            Report information about the test results.
723

724
            For now, it reports only the source from where the test results were fetched. When running tests,
725
            they may be executed locally or remotely, but if there are results of previous runs available,
726
            they may be retrieved from the local or remote cache, or be memoized. Knowing where the test
727
            results come from might be useful when evaluating the efficiency of the cache and the nature of
728
            the changes in the source code that may lead to frequent cache invalidations.
729
            """
730
        ),
731
    )
732

733
    def report_dir(self, distdir: DistDir) -> PurePath:
5✔
UNCOV
734
        return PurePath(self._report_dir.format(distdir=distdir.relpath))
×
735

736
    @property
5✔
737
    def attempts_default(self):
5✔
738
        if self._attempts_default < 1:
×
739
            raise ValueError(
×
740
                "The `--test-attempts-default` option must have a value equal or greater than 1. "
741
                f"Instead, it was set to {self._attempts_default}."
742
            )
743
        return self._attempts_default
×
744

745

746
class Test(Goal):
5✔
747
    __test__ = False
5✔
748

749
    subsystem_cls = TestSubsystem
5✔
750
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
5✔
751

752

753
class TestTimeoutField(IntField, metaclass=ABCMeta):
5✔
754
    """Base field class for implementing timeouts for test targets.
755

756
    Each test target that wants to implement a timeout needs to provide with its own concrete field
757
    class extending this one.
758
    """
759

760
    __test__ = False
5✔
761

762
    alias = "timeout"
5✔
763
    required = False
5✔
764
    valid_numbers = ValidNumbers.positive_only
5✔
765
    help = help_text(
5✔
766
        """
767
        A timeout (in seconds) used by each test file belonging to this target.
768

769
        If unset, will default to `[test].timeout_default`; if that option is also unset,
770
        then the test will never time out. Will never exceed `[test].timeout_maximum`. Only
771
        applies if the option `--test-timeouts` is set to true (the default).
772
        """
773
    )
774

775
    def calculate_from_global_options(self, test: TestSubsystem) -> int | None:
5✔
UNCOV
776
        if not test.timeouts:
×
UNCOV
777
            return None
×
UNCOV
778
        if self.value is None:
×
UNCOV
779
            if test.timeout_default is None:
×
UNCOV
780
                return None
×
UNCOV
781
            result = test.timeout_default
×
782
        else:
UNCOV
783
            result = self.value
×
UNCOV
784
        if test.timeout_maximum is not None:
×
UNCOV
785
            return min(result, test.timeout_maximum)
×
UNCOV
786
        return result
×
787

788

789
class TestExtraEnvVarsField(StringSequenceField, metaclass=ABCMeta):
5✔
790
    alias = "extra_env_vars"
5✔
791
    help = help_text(
5✔
792
        f"""
793
        Additional environment variables to include in test processes.
794

795
        {EXTRA_ENV_VARS_USAGE_HELP}
796

797
        This will be merged with and override values from `[test].extra_env_vars`.
798
        """
799
    )
800

801
    def sorted(self) -> tuple[str, ...]:
5✔
802
        return tuple(sorted(self.value or ()))
×
803

804

805
class TestsBatchCompatibilityTagField(StringField, metaclass=ABCMeta):
5✔
806
    alias = "batch_compatibility_tag"
5✔
807

808
    @classmethod
5✔
809
    def format_help(cls, target_name: str, test_runner_name: str) -> str:
5✔
810
        return f"""
5✔
811
        An arbitrary value used to mark the test files belonging to this target as valid for
812
        batched execution.
813

814
        It's _sometimes_ safe to run multiple `{target_name}`s within a single test runner process,
815
        and doing so can give significant wins by allowing reuse of expensive test setup /
816
        teardown logic. To opt into this behavior, set this field to an arbitrary non-empty
817
        string on all the `{target_name}` targets that are safe/compatible to run in the same
818
        process.
819

820
        If this field is left unset on a target, the target is assumed to be incompatible with
821
        all others and will run in a dedicated `{test_runner_name}` process.
822

823
        If this field is set on a target, and its value is different from the value on some
824
        other test `{target_name}`, then the two targets are explicitly incompatible and are guaranteed
825
        to not run in the same `{test_runner_name}` process.
826

827
        If this field is set on a target, and its value is the same as the value on some other
828
        `{target_name}`, then the two targets are explicitly compatible and _may_ run in the same
829
        test runner process. Compatible tests may not end up in the same test runner batch if:
830

831
          * There are "too many" compatible tests in a partition, as determined by the \
832
            `[test].batch_size` config parameter, or
833
          * Compatible tests have some incompatibility in Pants metadata (i.e. different \
834
            `resolve`s or `extra_env_vars`).
835

836
        When tests with the same `batch_compatibility_tag` have incompatibilities in some other
837
        Pants metadata, they will be automatically split into separate batches. This way you can
838
        set a high-level `batch_compatibility_tag` using `__defaults__` and then have tests
839
        continue to work as you tweak BUILD metadata on specific targets.
840
        """
841

842

843
async def _get_test_batches(
5✔
844
    core_request_types: Iterable[type[TestRequest]],
845
    targets_to_field_sets: TargetRootsToFieldSets,
846
    local_environment_name: ChosenLocalEnvironmentName,
847
    test_subsystem: TestSubsystem,
848
) -> list[TestRequest.Batch]:
UNCOV
849
    def partitions_call(request_type: type[TestRequest]) -> Coroutine[Any, Any, Partitions]:
×
UNCOV
850
        partition_type = cast(TestRequest, request_type)
×
UNCOV
851
        field_set_type = partition_type.field_set_type
×
UNCOV
852
        applicable_field_sets: list[TestFieldSet] = []
×
UNCOV
853
        for target, field_sets in targets_to_field_sets.mapping.items():
×
UNCOV
854
            if field_set_type.is_applicable(target):
×
UNCOV
855
                applicable_field_sets.extend(field_sets)
×
856

UNCOV
857
        partition_request = partition_type.PartitionRequest(tuple(applicable_field_sets))
×
UNCOV
858
        return partition_tests(
×
859
            **implicitly(
860
                {
861
                    partition_request: TestRequest.PartitionRequest,
862
                    local_environment_name.val: EnvironmentName,
863
                },
864
            )
865
        )
866

UNCOV
867
    all_partitions = await concurrently(
×
868
        partitions_call(request_type) for request_type in core_request_types
869
    )
870

UNCOV
871
    return [
×
872
        request_type.Batch(
873
            cast(TestRequest, request_type).tool_name, tuple(batch), partition.metadata
874
        )
875
        for request_type, partitions in zip(core_request_types, all_partitions)
876
        for partition in partitions
877
        for batch in partition_sequentially(
878
            partition.elements,
879
            key=lambda x: str(x.address) if isinstance(x, FieldSet) else str(x),
880
            size_target=test_subsystem.batch_size,
881
            size_max=2 * test_subsystem.batch_size,
882
        )
883
    ]
884

885

886
async def _run_debug_tests(
5✔
887
    batches: Iterable[TestRequest.Batch],
888
    environment_names: Sequence[EnvironmentName],
889
    test_subsystem: TestSubsystem,
890
    debug_adapter: DebugAdapterSubsystem,
891
) -> Test:
UNCOV
892
    debug_requests = await concurrently(
×
893
        (
894
            test_batch_to_debug_request(
895
                **implicitly({batch: TestRequest.Batch, environment_name: EnvironmentName})
896
            )
897
            if not test_subsystem.debug_adapter
898
            else test_batch_to_debug_adapter_request(
899
                **implicitly({batch: TestRequest.Batch, environment_name: EnvironmentName})
900
            )
901
        )
902
        for batch, environment_name in zip(batches, environment_names)
903
    )
UNCOV
904
    exit_code = 0
×
UNCOV
905
    for debug_request, environment_name in zip(debug_requests, environment_names):
×
UNCOV
906
        if test_subsystem.debug_adapter:
×
907
            logger.info(
×
908
                softwrap(
909
                    f"""
910
                    Launching debug adapter at '{debug_adapter.host}:{debug_adapter.port}',
911
                    which will wait for a client connection...
912
                    """
913
                )
914
            )
915

UNCOV
916
        debug_result = await run_interactive_process_in_environment(
×
917
            debug_request.process, environment_name
918
        )
UNCOV
919
        if debug_result.exit_code != 0:
×
920
            exit_code = debug_result.exit_code
×
UNCOV
921
    return Test(exit_code)
×
922

923

924
def _save_test_result_info_report_file(run_id: RunId, results: dict[str, dict]) -> None:
5✔
925
    """Save a JSON file with the information about the test results."""
926
    timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
×
927
    obj = json.dumps({"timestamp": timestamp, "run_id": run_id, "info": results})
×
928
    with safe_open(f"test_result_info_report_runid{run_id}_{timestamp}.json", "w") as fh:
×
929
        fh.write(obj)
×
930

931

932
@goal_rule
5✔
933
async def run_tests(
5✔
934
    console: Console,
935
    test_subsystem: TestSubsystem,
936
    debug_adapter: DebugAdapterSubsystem,
937
    workspace: Workspace,
938
    union_membership: UnionMembership,
939
    distdir: DistDir,
940
    run_id: RunId,
941
    local_environment_name: ChosenLocalEnvironmentName,
942
) -> Test:
UNCOV
943
    if test_subsystem.debug_adapter:
×
944
        goal_description = f"`{test_subsystem.name} --debug-adapter`"
×
945
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.error
×
UNCOV
946
    elif test_subsystem.debug:
×
UNCOV
947
        goal_description = f"`{test_subsystem.name} --debug`"
×
UNCOV
948
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.error
×
949
    else:
UNCOV
950
        goal_description = f"The `{test_subsystem.name}` goal"
×
UNCOV
951
        no_applicable_targets_behavior = NoApplicableTargetsBehavior.warn
×
952

UNCOV
953
    shard, num_shards = parse_shard_spec(test_subsystem.shard, "the [test].shard option")
×
UNCOV
954
    targets_to_valid_field_sets = await find_valid_field_sets_for_target_roots(
×
955
        TargetRootsToFieldSetsRequest(
956
            TestFieldSet,
957
            goal_description=goal_description,
958
            no_applicable_targets_behavior=no_applicable_targets_behavior,
959
            shard=shard,
960
            num_shards=num_shards,
961
        ),
962
        **implicitly(),
963
    )
964

UNCOV
965
    request_types = union_membership.get(TestRequest)
×
UNCOV
966
    test_batches = await _get_test_batches(
×
967
        request_types,
968
        targets_to_valid_field_sets,
969
        local_environment_name,
970
        test_subsystem,
971
    )
972

UNCOV
973
    environment_names = await concurrently(
×
974
        resolve_single_environment_name(
975
            SingleEnvironmentNameRequest.from_field_sets(batch.elements, batch.description)
976
        )
977
        for batch in test_batches
978
    )
979

UNCOV
980
    if test_subsystem.debug or test_subsystem.debug_adapter:
×
UNCOV
981
        return await _run_debug_tests(
×
982
            test_batches, environment_names, test_subsystem, debug_adapter
983
        )
984

UNCOV
985
    to_test = list(zip(test_batches, environment_names))
×
UNCOV
986
    results = await concurrently(
×
987
        run_test_batch(
988
            **implicitly(
989
                {
990
                    batch: TestRequest.Batch,
991
                    environment_name: EnvironmentName,
992
                }
993
            )
994
        )
995
        for batch, environment_name in to_test
996
    )
997

998
    # Print summary.
UNCOV
999
    exit_code = 0
×
UNCOV
1000
    if results:
×
UNCOV
1001
        console.print_stderr("")
×
UNCOV
1002
    if test_subsystem.experimental_report_test_result_info:
×
1003
        test_result_info = {}
×
UNCOV
1004
    for result in sorted(results):
×
UNCOV
1005
        if result.exit_code is None:
×
1006
            # We end up here, e.g., if we implemented test discovery and found no tests.
1007
            continue
×
UNCOV
1008
        if result.exit_code != 0:
×
UNCOV
1009
            exit_code = result.exit_code
×
UNCOV
1010
        if result.result_metadata is None:
×
1011
            # We end up here, e.g., if compilation failed during self-implemented test discovery.
UNCOV
1012
            continue
×
UNCOV
1013
        if test_subsystem.experimental_report_test_result_info:
×
1014
            test_result_info[result.addresses[0].spec] = {
×
1015
                "source": result.result_metadata.source(run_id).value
1016
            }
UNCOV
1017
        console.print_stderr(_format_test_summary(result, run_id, console))
×
1018

UNCOV
1019
        if result.extra_output and result.extra_output.files:
×
1020
            path_prefix = str(distdir.relpath / "test" / result.path_safe_description)
×
1021
            workspace.write_digest(
×
1022
                result.extra_output.digest,
1023
                path_prefix=path_prefix,
1024
            )
1025
            if result.log_extra_output:
×
1026
                logger.info(
×
1027
                    f"Wrote extra output from test `{result.addresses[0]}` to `{path_prefix}`."
1028
                )
1029

UNCOV
1030
    rerun_command = _format_test_rerun_command(results)
×
UNCOV
1031
    if rerun_command and test_subsystem.show_rerun_command:
×
UNCOV
1032
        console.print_stderr(f"\n{rerun_command}")
×
1033

UNCOV
1034
    if test_subsystem.report:
×
UNCOV
1035
        report_dir = test_subsystem.report_dir(distdir)
×
UNCOV
1036
        merged_reports = await merge_digests(
×
1037
            MergeDigests(result.xml_results.digest for result in results if result.xml_results)
1038
        )
UNCOV
1039
        workspace.write_digest(merged_reports, path_prefix=str(report_dir))
×
UNCOV
1040
        console.print_stderr(f"\nWrote test reports to {report_dir}")
×
1041

UNCOV
1042
    if test_subsystem.use_coverage:
×
1043
        # NB: We must pre-sort the data for itertools.groupby() to work properly, using the same
1044
        # key function for both. However, you can't sort by `types`, so we call `str()` on it.
UNCOV
1045
        all_coverage_data = sorted(
×
1046
            (result.coverage_data for result in results if result.coverage_data is not None),
1047
            key=lambda cov_data: str(type(cov_data)),
1048
        )
1049

UNCOV
1050
        coverage_types_to_collection_types = {
×
1051
            collection_cls.element_type: collection_cls  # type: ignore[misc]
1052
            for collection_cls in union_membership.get(CoverageDataCollection)
1053
        }
UNCOV
1054
        coverage_collections = []
×
UNCOV
1055
        for data_cls, data in itertools.groupby(all_coverage_data, lambda data: type(data)):
×
UNCOV
1056
            collection_cls = coverage_types_to_collection_types[data_cls]  # type: ignore[index]
×
UNCOV
1057
            coverage_collections.append(collection_cls(data))
×
1058
        # We can create multiple reports for each coverage data (e.g., console, xml, html)
UNCOV
1059
        coverage_reports_collections = await concurrently(
×
1060
            create_coverage_report(
1061
                **implicitly(
1062
                    {
1063
                        coverage_collection: CoverageDataCollection,
1064
                        local_environment_name.val: EnvironmentName,
1065
                    }
1066
                )
1067
            )
1068
            for coverage_collection in coverage_collections
1069
        )
1070

UNCOV
1071
        coverage_report_files: list[PurePath] = []
×
UNCOV
1072
        for coverage_reports in coverage_reports_collections:
×
UNCOV
1073
            report_files = coverage_reports.materialize(console, workspace)
×
UNCOV
1074
            coverage_report_files.extend(report_files)
×
1075

UNCOV
1076
        if coverage_report_files and test_subsystem.open_coverage:
×
1077
            open_files = await find_open_program(
×
1078
                OpenFilesRequest(coverage_report_files, error_if_open_not_found=False),
1079
                **implicitly(),
1080
            )
1081
            for process in open_files.processes:
×
1082
                _ = await run_interactive_process_in_environment(
×
1083
                    process, local_environment_name.val
1084
                )
1085

UNCOV
1086
        for coverage_reports in coverage_reports_collections:
×
UNCOV
1087
            if coverage_reports.coverage_insufficient:
×
1088
                logger.error(
×
1089
                    softwrap(
1090
                        """
1091
                        Test goal failed due to insufficient coverage.
1092
                        See coverage reports for details.
1093
                        """
1094
                    )
1095
                )
1096
                # coverage.py uses 2 to indicate failure due to insufficient coverage.
1097
                # We may as well follow suit in the general case, for all languages.
1098
                exit_code = 2
×
1099

UNCOV
1100
    if test_subsystem.experimental_report_test_result_info:
×
1101
        _save_test_result_info_report_file(run_id, test_result_info)
×
1102

UNCOV
1103
    return Test(exit_code)
×
1104

1105

1106
_SOURCE_MAP = {
5✔
1107
    ProcessResultMetadata.Source.MEMOIZED: "memoized",
1108
    ProcessResultMetadata.Source.RAN: "ran",
1109
    ProcessResultMetadata.Source.HIT_LOCALLY: "cached locally",
1110
    ProcessResultMetadata.Source.HIT_REMOTELY: "cached remotely",
1111
}
1112

1113

1114
def _format_test_summary(result: TestResult, run_id: RunId, console: Console) -> str:
5✔
1115
    """Format the test summary printed to the console."""
UNCOV
1116
    assert result.result_metadata is not None, (
×
1117
        "Skipped test results should not be outputted in the test summary"
1118
    )
UNCOV
1119
    succeeded = result.exit_code == 0
×
UNCOV
1120
    retried = len(result.process_results) > 1
×
1121

UNCOV
1122
    if succeeded:
×
UNCOV
1123
        if not retried:
×
UNCOV
1124
            sigil = console.sigil_succeeded()
×
1125
        else:
1126
            sigil = console.sigil_succeeded_with_edits()
×
UNCOV
1127
        status = "succeeded"
×
1128
    else:
UNCOV
1129
        sigil = console.sigil_failed()
×
UNCOV
1130
        status = "failed"
×
1131

UNCOV
1132
    if retried:
×
1133
        attempt_msg = f" after {len(result.process_results)} attempts"
×
1134
    else:
UNCOV
1135
        attempt_msg = ""
×
1136

UNCOV
1137
    environment = result.result_metadata.execution_environment.name
×
UNCOV
1138
    environment_type = result.result_metadata.execution_environment.environment_type
×
UNCOV
1139
    source = result.result_metadata.source(run_id)
×
UNCOV
1140
    source_str = _SOURCE_MAP[source]
×
UNCOV
1141
    if environment:
×
UNCOV
1142
        preposition = "in" if source == ProcessResultMetadata.Source.RAN else "for"
×
UNCOV
1143
        source_desc = (
×
1144
            f" ({source_str} {preposition} {environment_type} environment `{environment}`)"
1145
        )
UNCOV
1146
    elif source == ProcessResultMetadata.Source.RAN:
×
UNCOV
1147
        source_desc = ""
×
1148
    else:
UNCOV
1149
        source_desc = f" ({source_str})"
×
1150

UNCOV
1151
    elapsed_print = ""
×
UNCOV
1152
    total_elapsed_ms = result.result_metadata.total_elapsed_ms
×
UNCOV
1153
    if total_elapsed_ms is not None:
×
UNCOV
1154
        elapsed_secs = total_elapsed_ms / 1000
×
UNCOV
1155
        elapsed_print = f"in {elapsed_secs:.2f}s"
×
1156

UNCOV
1157
    return f"{sigil} {result.description} {status}{attempt_msg} {elapsed_print}{source_desc}."
×
1158

1159

1160
def _format_test_rerun_command(results: Iterable[TestResult]) -> None | str:
5✔
UNCOV
1161
    failures = [result for result in results if result.exit_code not in (None, 0)]
×
UNCOV
1162
    if not failures:
×
UNCOV
1163
        return None
×
1164

1165
    # format an invocation like `pants test path/to/first:address path/to/second:address ...`
UNCOV
1166
    addresses = sorted(shlex.quote(str(addr)) for result in failures for addr in result.addresses)
×
UNCOV
1167
    goal = f"{bin_name()} {TestSubsystem.name}"
×
UNCOV
1168
    invocation = " ".join([goal, *addresses])
×
1169

UNCOV
1170
    return f"To rerun the failing tests, use:\n\n    {invocation}"
×
1171

1172

1173
@dataclass(frozen=True)
5✔
1174
class TestExtraEnv:
5✔
1175
    env: EnvironmentVars
5✔
1176

1177

1178
@rule
5✔
1179
async def get_filtered_environment(test_env_aware: TestSubsystem.EnvironmentAware) -> TestExtraEnv:
5✔
1180
    return TestExtraEnv(
×
1181
        await environment_vars_subset(
1182
            EnvironmentVarsRequest(test_env_aware.extra_env_vars), **implicitly()
1183
        )
1184
    )
1185

1186

1187
@memoized
5✔
1188
def _unsupported_debug_rules(cls: type[TestRequest]) -> Iterable:
5✔
1189
    """Returns a rule that implements TestDebugRequest by raising an error."""
1190

1191
    @rule(canonical_name_suffix=cls.__name__, _param_type_overrides={"request": cls.Batch})
4✔
1192
    async def get_test_debug_request(request: TestRequest.Batch) -> TestDebugRequest:
4✔
1193
        raise NotImplementedError("Testing this target with --debug is not yet supported.")
×
1194

1195
    return collect_rules(locals())
4✔
1196

1197

1198
@memoized
5✔
1199
def _unsupported_debug_adapter_rules(cls: type[TestRequest]) -> Iterable:
5✔
1200
    """Returns a rule that implements TestDebugAdapterRequest by raising an error."""
1201

1202
    @rule(canonical_name_suffix=cls.__name__, _param_type_overrides={"request": cls.Batch})
5✔
1203
    async def get_test_debug_adapter_request(request: TestRequest.Batch) -> TestDebugAdapterRequest:
5✔
1204
        raise NotImplementedError(
×
1205
            "Testing this target type with a debug adapter is not yet supported."
1206
        )
1207

1208
    return collect_rules(locals())
5✔
1209

1210

1211
# -------------------------------------------------------------------------------------------
1212
# `runtime_package_dependencies` field
1213
# -------------------------------------------------------------------------------------------
1214

1215

1216
class RuntimePackageDependenciesField(SpecialCasedDependencies):
5✔
1217
    alias = "runtime_package_dependencies"
5✔
1218
    help = help_text(
5✔
1219
        f"""
1220
        Addresses to targets that can be built with the `{bin_name()} package` goal and whose
1221
        resulting artifacts should be included in the test run.
1222

1223
        Pants will build the artifacts as if you had run `{bin_name()} package`.
1224
        It will include the results in your test's chroot, using the same name they would normally
1225
        have, but without the `--distdir` prefix (e.g. `dist/`).
1226

1227
        You can include anything that can be built by `{bin_name()} package`, e.g. a `pex_binary`,
1228
        `python_aws_lambda_function`, or an `archive`.
1229
        """
1230
    )
1231

1232

1233
class BuiltPackageDependencies(Collection[BuiltPackage]):
5✔
1234
    pass
5✔
1235

1236

1237
@dataclass(frozen=True)
5✔
1238
class BuildPackageDependenciesRequest:
5✔
1239
    field: RuntimePackageDependenciesField
5✔
1240

1241

1242
@rule(desc="Build runtime package dependencies for tests", level=LogLevel.DEBUG)
5✔
1243
async def build_runtime_package_dependencies(
5✔
1244
    request: BuildPackageDependenciesRequest,
1245
) -> BuiltPackageDependencies:
1246
    unparsed_addresses = request.field.to_unparsed_address_inputs()
×
1247
    if not unparsed_addresses:
×
1248
        return BuiltPackageDependencies()
×
1249
    tgts = await resolve_targets(**implicitly(unparsed_addresses))
×
1250
    field_sets_per_tgt = await find_valid_field_sets(
×
1251
        FieldSetsPerTargetRequest(PackageFieldSet, tgts), **implicitly()
1252
    )
1253
    packages = await concurrently(
×
1254
        environment_aware_package(EnvironmentAwarePackageRequest(field_set))
1255
        for field_set in field_sets_per_tgt.field_sets
1256
    )
1257
    return BuiltPackageDependencies(packages)
×
1258

1259

1260
def rules():
5✔
1261
    return [
2✔
1262
        *collect_rules(),
1263
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc