• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20328535594

18 Dec 2025 06:46AM UTC coverage: 57.969% (-22.3%) from 80.295%
20328535594

Pull #22954

github

web-flow
Merge ccc9c5409 into 407284c67
Pull Request #22954: free up disk space in runner image

39083 of 67421 relevant lines covered (57.97%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/backend/codegen/protobuf/lint/buf/lint_rules.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
from dataclasses import dataclass
×
4
from typing import Any
×
5

6
from pants.backend.codegen.protobuf.lint.buf.skip_field import SkipBufLintField
×
7
from pants.backend.codegen.protobuf.lint.buf.subsystem import BufSubsystem
×
8
from pants.backend.codegen.protobuf.target_types import (
×
9
    ProtobufDependenciesField,
10
    ProtobufSourceField,
11
)
12
from pants.core.goals.lint import LintResult, LintTargetsRequest, Partitions
×
13
from pants.core.util_rules.config_files import find_config_file
×
14
from pants.core.util_rules.external_tool import download_external_tool
×
15
from pants.core.util_rules.source_files import SourceFilesRequest
×
16
from pants.core.util_rules.stripped_source_files import strip_source_roots
×
17
from pants.engine.fs import MergeDigests
×
18
from pants.engine.internals.graph import transitive_targets as transitive_targets_get
×
19
from pants.engine.intrinsics import execute_process, merge_digests
×
20
from pants.engine.platform import Platform
×
21
from pants.engine.process import Process
×
22
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
×
23
from pants.engine.target import FieldSet, Target, TransitiveTargetsRequest
×
24
from pants.util.logging import LogLevel
×
25
from pants.util.meta import classproperty
×
26
from pants.util.strutil import pluralize
×
27

28

29
@dataclass(frozen=True)
×
30
class BufFieldSet(FieldSet):
×
31
    required_fields = (ProtobufSourceField,)
×
32

33
    sources: ProtobufSourceField
×
34
    dependencies: ProtobufDependenciesField
×
35

36
    @classmethod
×
37
    def opt_out(cls, tgt: Target) -> bool:
×
38
        return tgt.get(SkipBufLintField).value
×
39

40

41
class BufLintRequest(LintTargetsRequest):
×
42
    field_set_type = BufFieldSet
×
43
    tool_subsystem = BufSubsystem  # type: ignore[assignment]
×
44

45
    @classproperty
×
46
    def tool_name(cls) -> str:
×
47
        return "buf lint"
×
48

49
    @classproperty
×
50
    def tool_id(cls) -> str:
×
51
        return "buf-lint"
×
52

53

54
@rule
×
55
async def partition_buf(
×
56
    request: BufLintRequest.PartitionRequest[BufFieldSet], buf: BufSubsystem
57
) -> Partitions[BufFieldSet, Any]:
58
    return Partitions() if buf.lint_skip else Partitions.single_partition(request.field_sets)
×
59

60

61
@rule(desc="Lint with buf lint", level=LogLevel.DEBUG)
×
62
async def run_buf(
×
63
    request: BufLintRequest.Batch[BufFieldSet, Any], buf: BufSubsystem, platform: Platform
64
) -> LintResult:
65
    transitive_targets = await transitive_targets_get(
×
66
        TransitiveTargetsRequest(field_set.address for field_set in request.elements),
67
        **implicitly(),
68
    )
69

70
    all_stripped_sources_request = strip_source_roots(
×
71
        **implicitly(
72
            SourceFilesRequest(
73
                tgt[ProtobufSourceField]
74
                for tgt in transitive_targets.closure
75
                if tgt.has_field(ProtobufSourceField)
76
            )
77
        )
78
    )
79
    target_stripped_sources_request = strip_source_roots(
×
80
        **implicitly(
81
            SourceFilesRequest(
82
                (field_set.sources for field_set in request.elements),
83
                for_sources_types=(ProtobufSourceField,),
84
                enable_codegen=True,
85
            )
86
        )
87
    )
88

89
    download_buf_get = download_external_tool(buf.get_request(platform))
×
90

91
    config_files_get = find_config_file(buf.config_request)
×
92

93
    (
×
94
        target_sources_stripped,
95
        all_sources_stripped,
96
        downloaded_buf,
97
        config_files,
98
    ) = await concurrently(
99
        target_stripped_sources_request,
100
        all_stripped_sources_request,
101
        download_buf_get,
102
        config_files_get,
103
    )
104

105
    input_digest = await merge_digests(
×
106
        MergeDigests(
107
            (
108
                target_sources_stripped.snapshot.digest,
109
                all_sources_stripped.snapshot.digest,
110
                downloaded_buf.digest,
111
                config_files.snapshot.digest,
112
            )
113
        )
114
    )
115

116
    config_arg = ["--config", buf.config] if buf.config else []
×
117

118
    process_result = await execute_process(
×
119
        Process(
120
            argv=[
121
                downloaded_buf.exe,
122
                "lint",
123
                *config_arg,
124
                *buf.lint_args,
125
                "--path",
126
                ",".join(target_sources_stripped.snapshot.files),
127
            ],
128
            input_digest=input_digest,
129
            description=f"Run buf lint on {pluralize(len(request.elements), 'file')}.",
130
            level=LogLevel.DEBUG,
131
        ),
132
        **implicitly(),
133
    )
134
    return LintResult.create(request, process_result)
×
135

136

137
def rules():
×
138
    return [
×
139
        *collect_rules(),
140
        *BufLintRequest.rules(),
141
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc