• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26689585807

30 May 2026 04:55PM UTC coverage: 92.742% (-0.05%) from 92.792%
26689585807

Pull #23343

github

web-flow
Merge c42efe377 into c8127c1f4
Pull Request #23343: Add buf as an alternate Python protobuf code generator

767 of 807 new or added lines in 17 files covered. (95.04%)

69 existing lines in 3 files now uncovered.

93753 of 101090 relevant lines covered (92.74%)

4.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.98
/src/python/pants/backend/codegen/protobuf/buf/config.py
1
# Copyright 2026 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
"""Shared, language-agnostic helpers for working with `buf.yaml` and `buf.gen.yaml`.
5

6
These primitives are used by the per-language buf integrations so each language
7
only owns its own suffix conventions and module-name math, not yaml parsing
8
or plugin matching.
9
"""
10

11
from __future__ import annotations
8✔
12

13
import os
8✔
14
from collections.abc import Mapping, Sequence
8✔
15
from dataclasses import dataclass
8✔
16

17
import yaml
8✔
18

19
from pants.backend.codegen.protobuf.buf.fields import BufGenTemplateField
8✔
20
from pants.backend.codegen.protobuf.buf.subsystem import BufSubsystem
8✔
21
from pants.core.util_rules.config_files import ConfigFilesRequest, find_config_file
8✔
22
from pants.engine.intrinsics import get_digest_contents
8✔
23
from pants.engine.rules import concurrently
8✔
24
from pants.engine.target import Target
8✔
25

26
# ---- Errors ----------------------------------------------------------------
27

28

29
class UnpinnedBufPluginError(Exception):
8✔
30
    """Raised when a `remote:` plugin entry is missing a version+revision pin and
31
    isn't in the user's `DEFAULT_PLUGIN_PINS` registry."""
32

33

34
class MissingBufLockError(Exception):
8✔
35
    """Raised when `buf.yaml` declares `deps:` but no sibling `buf.lock` exists."""
36

37

38
# ---- Default plugin-pin registry -------------------------------------------
39

40

41
# Default `(version, revision)` pin Pants will fill in for known `remote:`
42
# plugins that the user wrote without a pin or revision. The synthesized
43
# `buf.gen.yaml` entry has the form:
44
#
45
#     - remote: <id>:<version>
46
#       revision: <revision>
47
#       out: ...
48
#
49
# These values are the latest as of writing, fetched from the BSR's
50
# `PluginCurationService.GetLatestCuratedPlugin` endpoint. To refresh:
51
#
52
#     curl -X POST \
53
#       https://buf.build/buf.alpha.registry.v1alpha1.PluginCurationService/GetLatestCuratedPlugin \
54
#       -H 'Content-Type: application/json' -H 'Connect-Protocol-Version: 1' \
55
#       -d '{"owner":"<owner>","name":"<plugin>"}'
56
#
57
# The browseable equivalent is https://buf.build/<owner>/<plugin> — each
58
# plugin's "Versions" tab lists every `(version, revision)` pair available.
59
# Bumping a pin is a one-line change here; cache invalidation is automatic
60
# via the synthesized `buf.gen.yaml`'s content hash.
61
DEFAULT_PLUGIN_PINS: Mapping[str, tuple[str, int]] = {
8✔
62
    "buf.build/protocolbuffers/python": ("v34.1", 1),
63
    "buf.build/protocolbuffers/pyi": ("v34.1", 1),
64
    "buf.build/connectrpc/python": ("v0.10.0", 1),
65
    "buf.build/grpc/python": ("v1.80.0", 1),
66
}
67

68

69
# ---- buf.yaml parsers ------------------------------------------------------
70

71

72
def parse_buf_yaml_module_paths(content: bytes) -> tuple[str, ...]:
8✔
73
    """Module paths declared in a v2 `buf.yaml`, relative to the file.
74

75
    Returns `()` for v1 `buf.yaml` (no `modules:` block); callers should treat the
76
    file's own directory as the implicit module root in that case.
77
    """
78
    try:
1✔
79
        data = yaml.safe_load(content)
1✔
NEW
80
    except yaml.YAMLError:
×
NEW
81
        return ()
×
82
    if not isinstance(data, dict):
1✔
NEW
83
        return ()
×
84
    modules = data.get("modules")
1✔
85
    if not isinstance(modules, list):
1✔
NEW
86
        return ()
×
87
    paths: list[str] = []
1✔
88
    for entry in modules:
1✔
89
        if isinstance(entry, dict):
1✔
90
            p = entry.get("path")
1✔
91
            if isinstance(p, str) and p:
1✔
92
                paths.append(p)
1✔
93
    return tuple(paths)
1✔
94

95

96
def parse_buf_yaml_deps(content: bytes) -> tuple[str, ...]:
8✔
97
    """BSR module IDs declared in a `buf.yaml`'s `deps:` list."""
98
    try:
4✔
99
        data = yaml.safe_load(content)
4✔
100
    except yaml.YAMLError:
1✔
101
        return ()
1✔
102
    if not isinstance(data, dict):
4✔
103
        return ()
1✔
104
    deps = data.get("deps")
4✔
105
    if not isinstance(deps, list):
4✔
106
        return ()
4✔
107
    return tuple(d for d in deps if isinstance(d, str) and d)
4✔
108

109

110
def resolve_buf_module_root(
8✔
111
    proto_path: str, buf_yaml_dir: str, module_paths: tuple[str, ...]
112
) -> str:
113
    """Resolve the buf module root for a proto file.
114

115
    `buf_yaml_dir` is the directory holding `buf.yaml`. `module_paths` are the
116
    entries from its `modules:` list (relative to that directory). The returned
117
    root is the longest configured module path that contains the proto, normalized
118
    to the repo root.
119
    """
120
    if not module_paths:
1✔
121
        return buf_yaml_dir
1✔
122
    candidates = sorted(
1✔
123
        (os.path.normpath(os.path.join(buf_yaml_dir, p)) for p in module_paths),
124
        key=len,
125
        reverse=True,
126
    )
127
    for root in candidates:
1✔
128
        if root == "." or root == "":
1✔
129
            return ""
1✔
130
        if proto_path == root or proto_path.startswith(root + os.sep):
1✔
131
            return root
1✔
NEW
132
    return candidates[0]
×
133

134

135
# ---- buf.gen.yaml parsers --------------------------------------------------
136

137

138
def _plugin_identifier(plugin: dict) -> str | None:
8✔
139
    """Return the registry key (`"<kind>:<ident>"`) for a `buf.gen.yaml` plugin entry.
140

141
    `kind` is one of `protoc_builtin`, `local`, or `remote` — matching the field
142
    name in the entry. `ident` strips any `:vX.Y` version pin from `remote:` so
143
    callers can match against the registry without knowing the version. Returns
144
    `None` if the entry declares none of these fields.
145
    """
146
    for key in ("protoc_builtin", "local", "remote"):
3✔
147
        val = plugin.get(key)
3✔
148
        if val is None:
3✔
149
            continue
3✔
150
        ident = " ".join(str(x) for x in val) if isinstance(val, list) else str(val)
3✔
151
        if key == "remote":
3✔
152
            base, _, _ = ident.partition(":")
3✔
153
            return f"remote:{base}"
3✔
154
        return f"{key}:{ident}"
2✔
NEW
155
    return None
×
156

157

158
def parse_plugin_outs(content: bytes, suffixes: Mapping[str, str]) -> dict[str, str]:
8✔
159
    """Walk `buf.gen.yaml` plugins and return `suffix -> out:` for matching entries.
160

161
    `suffixes` is a `<kind>:<ident> -> suffix` dict supplied by the calling
162
    language backend, where `<kind>` is `remote`, `protoc_builtin`, or `local`
163
    and `suffix` is the language's module/file-naming suffix. The first matching
164
    plugin per suffix wins.
165
    """
166
    try:
3✔
167
        data = yaml.safe_load(content)
3✔
NEW
168
    except yaml.YAMLError:
×
NEW
169
        return {}
×
170
    if not isinstance(data, dict):
3✔
NEW
171
        return {}
×
172
    plugins = data.get("plugins")
3✔
173
    if not isinstance(plugins, list):
3✔
NEW
174
        return {}
×
175

176
    result: dict[str, str] = {}
3✔
177
    for plugin in plugins:
3✔
178
        if not isinstance(plugin, dict):
3✔
NEW
179
            continue
×
180
        out = plugin.get("out")
3✔
181
        if not isinstance(out, str) or not out:
3✔
NEW
182
            continue
×
183
        key = _plugin_identifier(plugin)
3✔
184
        if key is None:
3✔
NEW
185
            continue
×
186
        suffix = suffixes.get(key)
3✔
187
        if suffix is not None and suffix not in result:
3✔
188
            result[suffix] = out
3✔
189
    return result
3✔
190

191

192
def suffix_plugin_includes_imports(
8✔
193
    content: bytes, suffix: str, suffixes: Mapping[str, str]
194
) -> bool:
195
    """True if the `buf.gen.yaml` plugin emitting `suffix` has `include_imports:
196
    true` set — meaning buf will materialize generated artifacts for
197
    transitively-imported BSR-dep protos into the digest.
198

199
    `suffixes` is the same `<kind>:<ident> -> suffix` mapping passed to
200
    `parse_plugin_outs`; the first plugin entry whose registry key maps to
201
    `suffix` wins.
202
    """
203
    try:
2✔
204
        data = yaml.safe_load(content)
2✔
NEW
205
    except yaml.YAMLError:
×
NEW
206
        return False
×
207
    if not isinstance(data, dict):
2✔
NEW
208
        return False
×
209
    plugins = data.get("plugins")
2✔
210
    if not isinstance(plugins, list):
2✔
NEW
211
        return False
×
212
    for plugin in plugins:
2✔
213
        if not isinstance(plugin, dict):
2✔
NEW
214
            continue
×
215
        out = plugin.get("out")
2✔
216
        if not isinstance(out, str) or not out:
2✔
NEW
217
            continue
×
218
        key = _plugin_identifier(plugin)
2✔
219
        if key is None:
2✔
NEW
220
            continue
×
221
        if suffixes.get(key) != suffix:
2✔
222
            continue
1✔
223
        return plugin.get("include_imports") is True
2✔
NEW
224
    return False
×
225

226

227
# ---- buf.gen.yaml pin synthesis --------------------------------------------
228

229

230
def _split_remote_ident(ident: str) -> tuple[str, str | None]:
8✔
231
    """Split a `remote:` value into `(base_id, version_or_None)`.
232

233
    `buf.build/foo/bar:v1.2` → (`buf.build/foo/bar`, `v1.2`).
234
    `buf.build/foo/bar` → (`buf.build/foo/bar`, None).
235
    """
236
    base, sep, suffix = ident.partition(":")
4✔
237
    return (base, suffix) if sep else (base, None)
4✔
238

239

240
def _parse_pin_string(pin: str) -> tuple[str, int] | None:
8✔
241
    """Parse `"vX.Y:N"` (Pants-internal pin format) into `(version, revision)`.
242
    Returns `None` if the format is invalid."""
243
    parts = pin.split(":")
1✔
244
    if len(parts) != 2 or not parts[0]:
1✔
245
        return None
1✔
246
    try:
1✔
247
        return parts[0], int(parts[1])
1✔
NEW
248
    except ValueError:
×
NEW
249
        return None
×
250

251

252
def synthesize_pinned_buf_gen_yaml(
8✔
253
    content: bytes,
254
    source_path: str,
255
    *,
256
    extra_pins: Mapping[str, str] | None = None,
257
) -> bytes:
258
    """Return the user's `buf.gen.yaml` with `remote:` plugin pins resolved.
259

260
    For each `remote:` entry, the buf-recognized pinned form is:
261

262
        - remote: <id>:<version>
263
          revision: <int>
264
          out: ...
265

266
    Pants requires both fields. If a plugin entry is missing either, it must be
267
    in `DEFAULT_PLUGIN_PINS` (or the user's `extra_pins`) for Pants to fill in
268
    defaults. `extra_pins` values are `"vX.Y:N"` strings, parsed here.
269
    `protoc_builtin:` and `local:` plugins are not subject to pin enforcement.
270
    """
271
    try:
4✔
272
        data = yaml.safe_load(content)
4✔
NEW
273
    except yaml.YAMLError:
×
NEW
274
        return content
×
275
    if not isinstance(data, dict):
4✔
NEW
276
        return content
×
277
    plugins = data.get("plugins")
4✔
278
    if not isinstance(plugins, list):
4✔
NEW
279
        return content
×
280

281
    parsed_extra: dict[str, tuple[str, int]] = {}
4✔
282
    for k, v in (extra_pins or {}).items():
4✔
283
        parsed = _parse_pin_string(v)
1✔
284
        if parsed is not None:
1✔
285
            parsed_extra[k] = parsed
1✔
286
    pins: Mapping[str, tuple[str, int]] = {**DEFAULT_PLUGIN_PINS, **parsed_extra}
4✔
287

288
    unresolvable: list[str] = []
4✔
289
    rewrote = False
4✔
290
    for plugin in plugins:
4✔
291
        if not isinstance(plugin, dict):
4✔
NEW
292
            continue
×
293
        val = plugin.get("remote")
4✔
294
        if val is None:
4✔
295
            continue
4✔
296
        ident = " ".join(str(x) for x in val) if isinstance(val, list) else str(val)
4✔
297
        base, version = _split_remote_ident(ident)
4✔
298
        revision = plugin.get("revision")
4✔
299
        if version is not None and isinstance(revision, int):
4✔
300
            continue  # already fully pinned
1✔
301
        default = pins.get(base)
4✔
302
        if default is None:
4✔
303
            unresolvable.append(ident)
1✔
304
            continue
1✔
305
        default_version, default_revision = default
4✔
306
        plugin["remote"] = f"{base}:{default_version}"
4✔
307
        plugin["revision"] = default_revision
4✔
308
        rewrote = True
4✔
309

310
    if unresolvable:
4✔
311
        bullets = "\n".join(f"  - remote: {ident}" for ident in unresolvable)
1✔
312
        known = ", ".join(sorted(DEFAULT_PLUGIN_PINS)) or "(none)"
1✔
313
        raise UnpinnedBufPluginError(
1✔
314
            f"`{source_path}` has `remote:` plugin entries that are missing a "
315
            f"version or `revision:`:\n{bullets}\n\n"
316
            "Pin both fields explicitly:\n\n"
317
            "    - remote: buf.build/owner/plugin:vX.Y\n"
318
            "      revision: N\n"
319
            "      out: ...\n\n"
320
            "Alternatively, for a plugin in Pants's built-in registry, leave both "
321
            "fields unset and Pants will fill in defaults. To extend the registry "
322
            "for your own plugins, use `[buf].extra_plugin_pins`.\n\n"
323
            f"Built-in registry: {known}."
324
        )
325

326
    if not rewrote:
4✔
327
        return content
4✔
328
    return yaml.safe_dump(data, sort_keys=False).encode("utf-8")
4✔
329

330

331
def check_pinned_remote_plugins(
8✔
332
    content: bytes,
333
    source_path: str,
334
    *,
335
    extra_pins: Mapping[str, str] | None = None,
336
) -> None:
337
    """Raise if `remote:` plugin entries can't be resolved to a full pin, without
338
    returning the synthesized content."""
NEW
339
    synthesize_pinned_buf_gen_yaml(content, source_path, extra_pins=extra_pins)
×
340

341

342
# ---- Per-target template request resolvers --------------------------------
343

344

345
def gen_template_request_from_fields(
8✔
346
    *,
347
    spec_path: str,
348
    address_str: str,
349
    override: str | None,
350
    buf: BufSubsystem,
351
) -> ConfigFilesRequest:
352
    """Resolve the `buf.gen.yaml` request from already-extracted field values.
353

354
    Precedence: per-target `buf_gen_template` (`override`) → `[buf].gen_template`
355
    subsystem option → `[buf].gen_template_discovery`.
356
    """
357
    if override is None:
4✔
358
        return buf.gen_template_request
4✔
359
    path = os.path.normpath(os.path.join(spec_path, override))
3✔
360
    return ConfigFilesRequest(
3✔
361
        specified=path,
362
        specified_option_name=f"`{BufGenTemplateField.alias}` field on {address_str}",
363
        discovery=False,
364
        check_existence=(path,),
365
    )
366

367

368
def gen_template_request_for_target(tgt: Target, buf: BufSubsystem) -> ConfigFilesRequest:
8✔
369
    """Convenience wrapper around `gen_template_request_from_fields` for a Target."""
370
    return gen_template_request_from_fields(
3✔
371
        spec_path=tgt.address.spec_path,
372
        address_str=str(tgt.address),
373
        override=tgt.get(BufGenTemplateField).value,
374
        buf=buf,
375
    )
376

377

378
def resolved_template_path(tgt: Target, buf: BufSubsystem) -> str | None:
8✔
379
    """Path to pass to `buf generate --template`, or None to rely on discovery."""
380
    override = tgt.get(BufGenTemplateField).value
3✔
381
    if override is not None:
3✔
382
        return os.path.normpath(os.path.join(tgt.address.spec_path, override))
3✔
383
    return buf.gen_template
3✔
384

385

386
# ---- Async fetchers + result types ----------------------------------------
387

388

389
@dataclass(frozen=True)
8✔
390
class BufLayout:
8✔
391
    """Module layout derived from `buf.yaml`."""
392

393
    buf_yaml_dir: str
8✔
394
    module_paths: tuple[str, ...]
8✔
395
    deps: tuple[str, ...]  # BSR module ids (e.g. `buf.build/bufbuild/protovalidate`)
8✔
396

397
    def root_for_proto(self, proto_path: str) -> str:
8✔
398
        return resolve_buf_module_root(proto_path, self.buf_yaml_dir, self.module_paths)
1✔
399

400

401
async def fetch_buf_layout(buf: BufSubsystem) -> BufLayout:
8✔
402
    """Read `buf.yaml` and return the parsed module layout. Empty if not found.
403

404
    `config_request` may also surface `buf.lock` (it's listed in `check_existence`
405
    so codegen invalidates on lock changes), so we filter to `buf.yaml` here.
406
    """
407
    files = await find_config_file(buf.config_request)
1✔
408
    yaml_paths = [p for p in files.snapshot.files if os.path.basename(p) == "buf.yaml"]
1✔
409
    if not yaml_paths:
1✔
410
        return BufLayout("", (), ())
1✔
411
    path = yaml_paths[0]
1✔
412
    contents = await get_digest_contents(files.snapshot.digest)
1✔
413
    content = next((dc.content for dc in contents if dc.path == path), b"")
1✔
414
    return BufLayout(
1✔
415
        os.path.dirname(path),
416
        parse_buf_yaml_module_paths(content),
417
        parse_buf_yaml_deps(content),
418
    )
419

420

421
@dataclass(frozen=True)
8✔
422
class BufGenContent:
8✔
423
    """Per-target `buf.gen.yaml` resolution.
424

425
    `template_path` is `None` when no template was found (callers should fall back
426
    to source-root path arithmetic). When set, `content` is the raw yaml.
427
    """
428

429
    target: Target
8✔
430
    template_path: str | None
8✔
431
    content: bytes
8✔
432

433

434
async def fetch_buf_gen_contents(
8✔
435
    targets: Sequence[Target], buf: BufSubsystem
436
) -> tuple[BufGenContent, ...]:
437
    """Resolve and read each target's effective `buf.gen.yaml`."""
438
    if not targets:
1✔
NEW
439
        return ()
×
440
    template_files_per_target = await concurrently(
1✔
441
        find_config_file(gen_template_request_for_target(t, buf)) for t in targets
442
    )
443
    contents_per_target = await concurrently(
1✔
444
        get_digest_contents(tf.snapshot.digest) for tf in template_files_per_target
445
    )
446
    out: list[BufGenContent] = []
1✔
447
    for tgt, files, dcs in zip(targets, template_files_per_target, contents_per_target):
1✔
448
        if not files.snapshot.files:
1✔
449
            out.append(BufGenContent(tgt, None, b""))
1✔
450
            continue
1✔
451
        path = files.snapshot.files[0]
1✔
452
        content = next((dc.content for dc in dcs if dc.path == path), b"")
1✔
453
        out.append(BufGenContent(tgt, path, content))
1✔
454
    return tuple(out)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc