• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19381742489

15 Nov 2025 12:52AM UTC coverage: 49.706% (-30.6%) from 80.29%
19381742489

Pull #22890

github

web-flow
Merge d961abf79 into 42e1ebd41
Pull Request #22890: Updated all python subsystem constraints to 3.14

4 of 5 new or added lines in 5 files covered. (80.0%)

14659 existing lines in 485 files now uncovered.

31583 of 63540 relevant lines covered (49.71%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.7
/src/python/pants/backend/tools/semgrep/rules.py
1
# Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
from __future__ import annotations
1✔
4

5
import itertools
1✔
6
import logging
1✔
7
from collections import defaultdict
1✔
8
from collections.abc import Iterable
1✔
9
from dataclasses import dataclass
1✔
10
from pathlib import PurePath
1✔
11

12
from pants.backend.python.util_rules import pex
1✔
13
from pants.backend.python.util_rules.pex import VenvPexProcess, create_venv_pex
1✔
14
from pants.core.goals.lint import LintResult, LintTargetsRequest
1✔
15
from pants.core.util_rules.partitions import Partition, Partitions
1✔
16
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
1✔
17
from pants.engine.addresses import Address
1✔
18
from pants.engine.fs import CreateDigest, FileContent, MergeDigests, PathGlobs, Paths
1✔
19
from pants.engine.intrinsics import (
1✔
20
    create_digest,
21
    digest_to_snapshot,
22
    execute_process,
23
    merge_digests,
24
    path_globs_to_paths,
25
)
26
from pants.engine.process import ProcessCacheScope
1✔
27
from pants.engine.rules import Rule, collect_rules, concurrently, implicitly, rule
1✔
28
from pants.engine.unions import UnionRule
1✔
29
from pants.option.global_options import GlobalOptions
1✔
30
from pants.util.logging import LogLevel
1✔
31
from pants.util.strutil import pluralize
1✔
32

33
from .subsystem import SemgrepFieldSet, SemgrepSubsystem
1✔
34

35
logger = logging.getLogger(__name__)
1✔
36

37

38
_SEMGREPIGNORE_FILE_NAME = ".semgrepignore"
1✔
39
_DEFAULT_SEMGREP_CONFIG_DIR = ".semgrep"
1✔
40

41

42
class SemgrepLintRequest(LintTargetsRequest):
1✔
43
    field_set_type = SemgrepFieldSet
1✔
44
    tool_subsystem = SemgrepSubsystem  # type: ignore[assignment]
1✔
45

46

47
@dataclass(frozen=True)
1✔
48
class PartitionMetadata:
1✔
49
    config_files: frozenset[PurePath]
1✔
50

51
    @property
1✔
52
    def description(self) -> str:
1✔
53
        return ", ".join(sorted(str(path) for path in self.config_files))
×
54

55

56
@dataclass
1✔
57
class AllSemgrepConfigs:
1✔
58
    configs_by_dir: dict[PurePath, set[PurePath]]
1✔
59

60
    def ancestor_configs(self, address: Address) -> Iterable[PurePath]:
1✔
61
        # TODO: introspect the semgrep rules and determine which (if any) apply to the files, e.g. a
62
        # Python file shouldn't depend on a .semgrep.yml that doesn't have any 'python' or 'generic'
63
        # rules, and similarly if there's path inclusions/exclusions.
64
        # TODO: this would be better as actual dependency inference (e.g. allows inspection, manual
65
        # addition/exclusion), but that can only infer 'full' dependencies and it is wrong (e.g. JVM
66
        # things break) for real code files to depend on this sort of non-code linter config; requires
67
        # dependency scopes or similar (https://github.com/pantsbuild/pants/issues/12794)
UNCOV
68
        spec = PurePath(address.spec_path)
×
69

UNCOV
70
        for ancestor in itertools.chain([spec], spec.parents):
×
UNCOV
71
            yield from self.configs_by_dir.get(ancestor, [])
×
72

73

74
def _group_by_semgrep_dir(
1✔
75
    all_config_files: Paths, all_config_dir_files: Paths, config_name: str
76
) -> AllSemgrepConfigs:
UNCOV
77
    configs_by_dir: dict[PurePath, set[PurePath]] = {}
×
UNCOV
78
    for config_path in all_config_files.files:
×
79
        # Rules like foo/semgrep.yaml should apply to the project at foo/
UNCOV
80
        path = PurePath(config_path)
×
UNCOV
81
        configs_by_dir.setdefault(path.parent, set()).add(path)
×
82

UNCOV
83
    for config_path in all_config_dir_files.files:
×
84
        # Rules like foo/bar/.semgrep/baz.yaml and foo/bar/.semgrep/baz/qux.yaml should apply to the
85
        # project at foo/bar/
UNCOV
86
        path = PurePath(config_path)
×
UNCOV
87
        config_directory = next(
×
88
            parent.parent for parent in path.parents if parent.name == config_name
89
        )
UNCOV
90
        configs_by_dir.setdefault(config_directory, set()).add(path)
×
91

UNCOV
92
    return AllSemgrepConfigs(configs_by_dir)
×
93

94

95
@rule
1✔
96
async def find_all_semgrep_configs(semgrep: SemgrepSubsystem) -> AllSemgrepConfigs:
1✔
97
    config_file_globs: tuple[str, ...] = ()
×
98
    config_dir_globs: tuple[str, ...] = ()
×
99

100
    if semgrep.config_name is None:
×
101
        config_file_globs = ("**/.semgrep.yml", "**/.semgrep.yaml")
×
102
        config_dir_globs = (
×
103
            f"**/{_DEFAULT_SEMGREP_CONFIG_DIR}/**/*.yaml",
104
            f"**/{_DEFAULT_SEMGREP_CONFIG_DIR}/**/*.yml",
105
        )
106
    elif semgrep.config_name.endswith((".yaml", ".yml")):
×
107
        config_file_globs = (f"**/{semgrep.config_name}",)
×
108
    else:
109
        config_dir_globs = (
×
110
            f"**/{semgrep.config_name}/**/*.yaml",
111
            f"**/{semgrep.config_name}/**/*.yml",
112
        )
113

114
    all_config_files = await path_globs_to_paths(PathGlobs(config_file_globs))
×
115
    all_config_dir_files = await path_globs_to_paths(PathGlobs(config_dir_globs))
×
116
    return _group_by_semgrep_dir(
×
117
        all_config_files,
118
        all_config_dir_files,
119
        (semgrep.config_name or _DEFAULT_SEMGREP_CONFIG_DIR),
120
    )
121

122

123
@dataclass(frozen=True)
1✔
124
class RelevantSemgrepConfigsRequest:
1✔
125
    field_set: SemgrepFieldSet
1✔
126

127

128
class RelevantSemgrepConfigs(frozenset[PurePath]):
1✔
129
    pass
1✔
130

131

132
@rule
1✔
133
async def infer_relevant_semgrep_configs(
1✔
134
    request: RelevantSemgrepConfigsRequest, all_semgrep: AllSemgrepConfigs
135
) -> RelevantSemgrepConfigs:
136
    return RelevantSemgrepConfigs(all_semgrep.ancestor_configs(request.field_set.address))
×
137

138

139
@rule
1✔
140
async def partition(
1✔
141
    request: SemgrepLintRequest.PartitionRequest[SemgrepFieldSet],
142
    semgrep: SemgrepSubsystem,
143
) -> Partitions:
144
    if semgrep.skip:
×
145
        return Partitions()
×
146

147
    all_configs = await concurrently(
×
148
        infer_relevant_semgrep_configs(RelevantSemgrepConfigsRequest(field_set), **implicitly())
149
        for field_set in request.field_sets
150
    )
151

152
    # partition by the sets of configs that apply to each input
153
    by_config = defaultdict(list)
×
154
    for field_set, configs in zip(request.field_sets, all_configs):
×
155
        if configs:
×
156
            by_config[configs].append(field_set)
×
157

158
    return Partitions(
×
159
        Partition(tuple(field_sets), PartitionMetadata(configs))
160
        for configs, field_sets in by_config.items()
161
    )
162

163

164
# We have a hard-coded settings file to side-step
165
# https://github.com/returntocorp/semgrep/issues/7102, and also provide more cacheability, NB. both
166
# keys are required.
167
_DEFAULT_SETTINGS = FileContent(
1✔
168
    path="__semgrep_settings.yaml",
169
    content=b"anonymous_user_id: 00000000-0000-0000-0000-000000000000\nhas_shown_metrics_notification: true",
170
)
171

172

173
@rule(desc="Lint with Semgrep", level=LogLevel.DEBUG)
1✔
174
async def lint(
1✔
175
    request: SemgrepLintRequest.Batch[SemgrepFieldSet, PartitionMetadata],
176
    semgrep: SemgrepSubsystem,
177
    global_options: GlobalOptions,
178
) -> LintResult:
179
    config_files, ignore_files, semgrep_pex, input_files, settings = await concurrently(
×
180
        digest_to_snapshot(
181
            **implicitly(PathGlobs(str(s) for s in request.partition_metadata.config_files))
182
        ),
183
        digest_to_snapshot(**implicitly(PathGlobs([_SEMGREPIGNORE_FILE_NAME]))),
184
        create_venv_pex(**implicitly(semgrep.to_pex_request())),
185
        determine_source_files(
186
            SourceFilesRequest(field_set.source for field_set in request.elements)
187
        ),
188
        create_digest(CreateDigest([_DEFAULT_SETTINGS])),
189
    )
190

191
    input_digest = await merge_digests(
×
192
        MergeDigests(
193
            (
194
                input_files.snapshot.digest,
195
                config_files.digest,
196
                settings,
197
                ignore_files.digest,
198
            )
199
        )
200
    )
201

202
    cache_scope = ProcessCacheScope.PER_SESSION if semgrep.force else ProcessCacheScope.SUCCESSFUL
×
203

204
    # TODO: https://github.com/pantsbuild/pants/issues/18430 support running this with --autofix
205
    # under the fix goal... but not all rules have fixes, so we need to be running with
206
    # --error/checking exit codes, which FixResult doesn't currently support.
207
    result = await execute_process(
×
208
        **implicitly(
209
            VenvPexProcess(
210
                semgrep_pex,
211
                argv=(
212
                    "scan",
213
                    *(f"--config={f}" for f in config_files.files),
214
                    "--jobs={pants_concurrency}",
215
                    "--error",
216
                    *semgrep.args,
217
                    # we don't pass the target files directly because that overrides .semgrepignore
218
                    # (https://github.com/returntocorp/semgrep/issues/4978), so instead we just tell its
219
                    # traversal to include all the source files in this partition. Unfortunately this
220
                    # include is implicitly unrooted (i.e. as if it was **/path/to/file), and so may
221
                    # pick up other files if the names match. The highest risk of this is within the
222
                    # semgrep PEX.
223
                    *(f"--include={f}" for f in input_files.files),
224
                    f"--exclude={semgrep_pex.pex_filename}",
225
                ),
226
                extra_env={
227
                    "SEMGREP_FORCE_COLOR": "true",
228
                    # disable various global state/network requests
229
                    "SEMGREP_SETTINGS_FILE": _DEFAULT_SETTINGS.path,
230
                    "SEMGREP_ENABLE_VERSION_CHECK": "0",
231
                    "SEMGREP_SEND_METRICS": "off",
232
                },
233
                input_digest=input_digest,
234
                concurrency_available=len(input_files.files),
235
                description=f"Run Semgrep on {pluralize(len(input_files.files), 'file')}.",
236
                level=LogLevel.DEBUG,
237
                cache_scope=cache_scope,
238
            )
239
        )
240
    )
241

242
    return LintResult.create(request, result, output_simplifier=global_options.output_simplifier())
×
243

244

245
def rules() -> Iterable[Rule | UnionRule]:
1✔
246
    return [*collect_rules(), *SemgrepLintRequest.rules(), *pex.rules()]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc