• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18252174847

05 Oct 2025 01:36AM UTC coverage: 43.382% (-36.9%) from 80.261%
18252174847

push

github

web-flow
run tests on mac arm (#22717)

Just doing the minimal to pull forward the x86_64 pattern.

ref #20993

25776 of 59416 relevant lines covered (43.38%)

1.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.7
/src/python/pants/backend/tools/semgrep/rules.py
1
# Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
from __future__ import annotations
3✔
4

5
import itertools
3✔
6
import logging
3✔
7
from collections import defaultdict
3✔
8
from collections.abc import Iterable
3✔
9
from dataclasses import dataclass
3✔
10
from pathlib import PurePath
3✔
11

12
from pants.backend.python.util_rules import pex
3✔
13
from pants.backend.python.util_rules.pex import VenvPexProcess, create_venv_pex
3✔
14
from pants.core.goals.lint import LintResult, LintTargetsRequest
3✔
15
from pants.core.util_rules.partitions import Partition, Partitions
3✔
16
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
3✔
17
from pants.engine.addresses import Address
3✔
18
from pants.engine.fs import CreateDigest, FileContent, MergeDigests, PathGlobs, Paths
3✔
19
from pants.engine.intrinsics import (
3✔
20
    create_digest,
21
    digest_to_snapshot,
22
    execute_process,
23
    merge_digests,
24
    path_globs_to_paths,
25
)
26
from pants.engine.process import ProcessCacheScope
3✔
27
from pants.engine.rules import Rule, collect_rules, concurrently, implicitly, rule
3✔
28
from pants.engine.unions import UnionRule
3✔
29
from pants.option.global_options import GlobalOptions
3✔
30
from pants.util.logging import LogLevel
3✔
31
from pants.util.strutil import pluralize
3✔
32

33
from .subsystem import SemgrepFieldSet, SemgrepSubsystem
3✔
34

35
logger = logging.getLogger(__name__)
3✔
36

37

38
_SEMGREPIGNORE_FILE_NAME = ".semgrepignore"
3✔
39
_DEFAULT_SEMGREP_CONFIG_DIR = ".semgrep"
3✔
40

41

42
class SemgrepLintRequest(LintTargetsRequest):
3✔
43
    field_set_type = SemgrepFieldSet
3✔
44
    tool_subsystem = SemgrepSubsystem
3✔
45

46

47
@dataclass(frozen=True)
3✔
48
class PartitionMetadata:
3✔
49
    config_files: frozenset[PurePath]
3✔
50

51
    @property
3✔
52
    def description(self) -> str:
3✔
53
        return ", ".join(sorted(str(path) for path in self.config_files))
×
54

55

56
@dataclass
3✔
57
class AllSemgrepConfigs:
3✔
58
    configs_by_dir: dict[PurePath, set[PurePath]]
3✔
59

60
    def ancestor_configs(self, address: Address) -> Iterable[PurePath]:
3✔
61
        # TODO: introspect the semgrep rules and determine which (if any) apply to the files, e.g. a
62
        # Python file shouldn't depend on a .semgrep.yml that doesn't have any 'python' or 'generic'
63
        # rules, and similarly if there's path inclusions/exclusions.
64
        # TODO: this would be better as actual dependency inference (e.g. allows inspection, manual
65
        # addition/exclusion), but that can only infer 'full' dependencies and it is wrong (e.g. JVM
66
        # things break) for real code files to depend on this sort of non-code linter config; requires
67
        # dependency scopes or similar (https://github.com/pantsbuild/pants/issues/12794)
68
        spec = PurePath(address.spec_path)
×
69

70
        for ancestor in itertools.chain([spec], spec.parents):
×
71
            yield from self.configs_by_dir.get(ancestor, [])
×
72

73

74
def _group_by_semgrep_dir(
3✔
75
    all_config_files: Paths, all_config_dir_files: Paths, config_name: str
76
) -> AllSemgrepConfigs:
77
    configs_by_dir: dict[PurePath, set[PurePath]] = {}
×
78
    for config_path in all_config_files.files:
×
79
        # Rules like foo/semgrep.yaml should apply to the project at foo/
80
        path = PurePath(config_path)
×
81
        configs_by_dir.setdefault(path.parent, set()).add(path)
×
82

83
    for config_path in all_config_dir_files.files:
×
84
        # Rules like foo/bar/.semgrep/baz.yaml and foo/bar/.semgrep/baz/qux.yaml should apply to the
85
        # project at foo/bar/
86
        path = PurePath(config_path)
×
87
        config_directory = next(
×
88
            parent.parent for parent in path.parents if parent.name == config_name
89
        )
90
        configs_by_dir.setdefault(config_directory, set()).add(path)
×
91

92
    return AllSemgrepConfigs(configs_by_dir)
×
93

94

95
@rule
3✔
96
async def find_all_semgrep_configs(semgrep: SemgrepSubsystem) -> AllSemgrepConfigs:
3✔
97
    config_file_globs: tuple[str, ...] = ()
×
98
    config_dir_globs: tuple[str, ...] = ()
×
99

100
    if semgrep.config_name is None:
×
101
        config_file_globs = ("**/.semgrep.yml", "**/.semgrep.yaml")
×
102
        config_dir_globs = (
×
103
            f"**/{_DEFAULT_SEMGREP_CONFIG_DIR}/**/*.yaml",
104
            f"**/{_DEFAULT_SEMGREP_CONFIG_DIR}/**/*.yml",
105
        )
106
    elif semgrep.config_name.endswith((".yaml", ".yml")):
×
107
        config_file_globs = (f"**/{semgrep.config_name}",)
×
108
    else:
109
        config_dir_globs = (
×
110
            f"**/{semgrep.config_name}/**/*.yaml",
111
            f"**/{semgrep.config_name}/**/*.yml",
112
        )
113

114
    all_config_files = await path_globs_to_paths(PathGlobs(config_file_globs))
×
115
    all_config_dir_files = await path_globs_to_paths(PathGlobs(config_dir_globs))
×
116
    return _group_by_semgrep_dir(
×
117
        all_config_files,
118
        all_config_dir_files,
119
        (semgrep.config_name or _DEFAULT_SEMGREP_CONFIG_DIR),
120
    )
121

122

123
@dataclass(frozen=True)
3✔
124
class RelevantSemgrepConfigsRequest:
3✔
125
    field_set: SemgrepFieldSet
3✔
126

127

128
class RelevantSemgrepConfigs(frozenset[PurePath]):
3✔
129
    pass
3✔
130

131

132
@rule
3✔
133
async def infer_relevant_semgrep_configs(
3✔
134
    request: RelevantSemgrepConfigsRequest, all_semgrep: AllSemgrepConfigs
135
) -> RelevantSemgrepConfigs:
136
    return RelevantSemgrepConfigs(all_semgrep.ancestor_configs(request.field_set.address))
×
137

138

139
@rule
3✔
140
async def partition(
3✔
141
    request: SemgrepLintRequest.PartitionRequest[SemgrepFieldSet],
142
    semgrep: SemgrepSubsystem,
143
) -> Partitions:
144
    if semgrep.skip:
×
145
        return Partitions()
×
146

147
    all_configs = await concurrently(
×
148
        infer_relevant_semgrep_configs(RelevantSemgrepConfigsRequest(field_set), **implicitly())
149
        for field_set in request.field_sets
150
    )
151

152
    # partition by the sets of configs that apply to each input
153
    by_config = defaultdict(list)
×
154
    for field_set, configs in zip(request.field_sets, all_configs):
×
155
        if configs:
×
156
            by_config[configs].append(field_set)
×
157

158
    return Partitions(
×
159
        Partition(tuple(field_sets), PartitionMetadata(configs))
160
        for configs, field_sets in by_config.items()
161
    )
162

163

164
# We have a hard-coded settings file to side-step
165
# https://github.com/returntocorp/semgrep/issues/7102, and also provide more cacheability, NB. both
166
# keys are required.
167
_DEFAULT_SETTINGS = FileContent(
3✔
168
    path="__semgrep_settings.yaml",
169
    content=b"anonymous_user_id: 00000000-0000-0000-0000-000000000000\nhas_shown_metrics_notification: true",
170
)
171

172

173
@rule(desc="Lint with Semgrep", level=LogLevel.DEBUG)
3✔
174
async def lint(
3✔
175
    request: SemgrepLintRequest.Batch[SemgrepFieldSet, PartitionMetadata],
176
    semgrep: SemgrepSubsystem,
177
    global_options: GlobalOptions,
178
) -> LintResult:
179
    config_files, ignore_files, semgrep_pex, input_files, settings = await concurrently(
×
180
        digest_to_snapshot(
181
            **implicitly(PathGlobs(str(s) for s in request.partition_metadata.config_files))
182
        ),
183
        digest_to_snapshot(**implicitly(PathGlobs([_SEMGREPIGNORE_FILE_NAME]))),
184
        create_venv_pex(**implicitly(semgrep.to_pex_request())),
185
        determine_source_files(
186
            SourceFilesRequest(field_set.source for field_set in request.elements)
187
        ),
188
        create_digest(CreateDigest([_DEFAULT_SETTINGS])),
189
    )
190

191
    input_digest = await merge_digests(
×
192
        MergeDigests(
193
            (
194
                input_files.snapshot.digest,
195
                config_files.digest,
196
                settings,
197
                ignore_files.digest,
198
            )
199
        )
200
    )
201

202
    cache_scope = ProcessCacheScope.PER_SESSION if semgrep.force else ProcessCacheScope.SUCCESSFUL
×
203

204
    # TODO: https://github.com/pantsbuild/pants/issues/18430 support running this with --autofix
205
    # under the fix goal... but not all rules have fixes, so we need to be running with
206
    # --error/checking exit codes, which FixResult doesn't currently support.
207
    result = await execute_process(
×
208
        **implicitly(
209
            VenvPexProcess(
210
                semgrep_pex,
211
                argv=(
212
                    "scan",
213
                    *(f"--config={f}" for f in config_files.files),
214
                    "--jobs={pants_concurrency}",
215
                    "--error",
216
                    *semgrep.args,
217
                    # we don't pass the target files directly because that overrides .semgrepignore
218
                    # (https://github.com/returntocorp/semgrep/issues/4978), so instead we just tell its
219
                    # traversal to include all the source files in this partition. Unfortunately this
220
                    # include is implicitly unrooted (i.e. as if it was **/path/to/file), and so may
221
                    # pick up other files if the names match. The highest risk of this is within the
222
                    # semgrep PEX.
223
                    *(f"--include={f}" for f in input_files.files),
224
                    f"--exclude={semgrep_pex.pex_filename}",
225
                ),
226
                extra_env={
227
                    "SEMGREP_FORCE_COLOR": "true",
228
                    # disable various global state/network requests
229
                    "SEMGREP_SETTINGS_FILE": _DEFAULT_SETTINGS.path,
230
                    "SEMGREP_ENABLE_VERSION_CHECK": "0",
231
                    "SEMGREP_SEND_METRICS": "off",
232
                },
233
                input_digest=input_digest,
234
                concurrency_available=len(input_files.files),
235
                description=f"Run Semgrep on {pluralize(len(input_files.files), 'file')}.",
236
                level=LogLevel.DEBUG,
237
                cache_scope=cache_scope,
238
            )
239
        )
240
    )
241

242
    return LintResult.create(request, result, output_simplifier=global_options.output_simplifier())
×
243

244

245
def rules() -> Iterable[Rule | UnionRule]:
3✔
246
    return [*collect_rules(), *SemgrepLintRequest.rules(), *pex.rules()]
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc