• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20333307239

18 Dec 2025 10:07AM UTC coverage: 75.452% (-4.8%) from 80.295%
20333307239

Pull #22949

github

web-flow
Merge b07232683 into 407284c67
Pull Request #22949: Add experimental uv resolver for Python lockfiles

51 of 96 new or added lines in 5 files covered. (53.13%)

2857 existing lines in 120 files now uncovered.

66315 of 87890 relevant lines covered (75.45%)

2.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.55
/src/python/pants/backend/helm/util_rules/tool.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
6✔
5

6
import dataclasses
6✔
7
import logging
6✔
8
import os
6✔
9
from abc import ABCMeta
6✔
10
from collections.abc import Iterable, Mapping
6✔
11
from dataclasses import dataclass
6✔
12
from typing import Any, ClassVar, Generic, TypeVar, final
6✔
13

14
import yaml
6✔
15

16
from pants.backend.helm.subsystems.helm import HelmSubsystem
6✔
17
from pants.backend.helm.utils.yaml import snake_case_attr_dict
6✔
18
from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior
6✔
19
from pants.core.util_rules import external_tool
6✔
20
from pants.core.util_rules.env_vars import environment_vars_subset
6✔
21
from pants.core.util_rules.external_tool import (
6✔
22
    ExternalToolRequest,
23
    TemplatedExternalTool,
24
    download_external_tool,
25
)
26
from pants.engine import process
6✔
27
from pants.engine.collection import Collection
6✔
28
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType
6✔
29
from pants.engine.env_vars import EnvironmentVarsRequest
6✔
30
from pants.engine.environment import EnvironmentName
6✔
31
from pants.engine.fs import (
6✔
32
    EMPTY_DIGEST,
33
    AddPrefix,
34
    CreateDigest,
35
    Digest,
36
    DigestSubset,
37
    Directory,
38
    FileDigest,
39
    MergeDigests,
40
    PathGlobs,
41
    RemovePrefix,
42
    Snapshot,
43
)
44
from pants.engine.intrinsics import (
6✔
45
    add_prefix,
46
    create_digest,
47
    digest_subset_to_digest,
48
    digest_to_snapshot,
49
    get_digest_contents,
50
    merge_digests,
51
    remove_prefix,
52
)
53
from pants.engine.platform import Platform
6✔
54
from pants.engine.process import Process, ProcessCacheScope
6✔
55
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
6✔
56
from pants.engine.unions import UnionMembership, union
6✔
57
from pants.option.subsystem import Subsystem
6✔
58
from pants.util.frozendict import FrozenDict
6✔
59
from pants.util.logging import LogLevel
6✔
60
from pants.util.strutil import bullet_list, pluralize
6✔
61

62
logger = logging.getLogger(__name__)
6✔
63

64
_HELM_CACHE_NAME = "helm"
6✔
65
_HELM_CACHE_DIR = "__cache"
6✔
66
_HELM_CONFIG_DIR = "__config"
6✔
67
_HELM_DATA_DIR = "__data"
6✔
68

69
# ---------------------------------------------
70
# Helm Plugins Support
71
# ---------------------------------------------
72

73

74
class HelmPluginMetadataFileNotFound(Exception):
6✔
75
    def __init__(self, plugin_name: str) -> None:
6✔
76
        super().__init__(f"Helm plugin `{plugin_name}` is missing the `plugin.yaml` metadata file.")
×
77

78

79
class HelmPluginMissingCommand(ValueError):
6✔
80
    def __init__(self, plugin_name: str) -> None:
6✔
81
        super().__init__(
×
82
            f"Helm plugin `{plugin_name}` is missing either `platformCommand` entries or a single `command` entry."
83
        )
84

85

86
class HelmPluginSubsystem(Subsystem, metaclass=ABCMeta):
6✔
87
    """Base class for any kind of Helm plugin."""
88

89
    plugin_name: ClassVar[str]
6✔
90

91

92
class ExternalHelmPlugin(HelmPluginSubsystem, TemplatedExternalTool, metaclass=ABCMeta):
6✔
93
    """Represents the subsystem for a Helm plugin that needs to be downloaded from an external
94
    source.
95

96
    For declaring an External Helm plugin, extend this class providing a value of the
97
    `plugin_name` class attribute and implement the rest of it like you would do for
98
    any other `TemplatedExternalTool`.
99

100
    This class is meant to be used in combination with `ExternalHelmPluginBinding`, as
101
    in the following example:
102

103
    class MyHelmPluginSubsystem(ExternalHelmPlugin):
104
        plugin_name = "myplugin"
105
        options_scope = "my_plugin"
106
        help = "..."
107

108
        ...
109

110

111
    class MyPluginBinding(ExternalHelmPluginBinding[MyPluginSubsystem]):
112
        plugin_subsystem_cls = MyHelmPluginSubsystem
113

114
    With that class structure, then define a `UnionRule` so Pants can find this plugin and
115
    use it in the Helm setup:
116

117
    @rule
118
    async def download_myplugin_plugin_request(
119
        _: MyPluginBinding, subsystem: MyHelmPluginSubsystem
120
    ) -> ExternalHelmPluginRequest:
121
        return ExternalHelmPluginRequest.from_subsystem(subsystem, platform)
122

123

124
    def rules():
125
        return [
126
            *collect_rules(),
127
            UnionRule(ExternalHelmPluginBinding, MyPluginBinding),
128
        ]
129
    """
130

131

132
@dataclass(frozen=True)
6✔
133
class HelmPluginPlatformCommand:
6✔
134
    os: str
6✔
135
    arch: str
6✔
136
    command: str
6✔
137

138
    @classmethod
6✔
139
    def from_dict(cls, d: dict[str, Any]) -> HelmPluginPlatformCommand:
6✔
140
        return cls(**snake_case_attr_dict(d))
×
141

142

143
@dataclass(frozen=True)
6✔
144
class HelmPluginInfo:
6✔
145
    name: str
6✔
146
    version: str
6✔
147
    usage: str | None = None
6✔
148
    description: str | None = None
6✔
149
    ignore_flags: bool | None = None
6✔
150
    command: str | None = None
6✔
151
    platform_command: tuple[HelmPluginPlatformCommand, ...] = dataclasses.field(
6✔
152
        default_factory=tuple
153
    )
154
    hooks: FrozenDict[str, str] = dataclasses.field(default_factory=FrozenDict)
6✔
155

156
    @classmethod
6✔
157
    def from_dict(cls, d: dict[str, Any]) -> HelmPluginInfo:
6✔
158
        platform_command = [
×
159
            HelmPluginPlatformCommand.from_dict(d) for d in d.pop("platformCommand", [])
160
        ]
161
        hooks = d.pop("hooks", {})
×
162

163
        attrs = snake_case_attr_dict(d)
×
164
        return cls(platform_command=tuple(platform_command), hooks=FrozenDict(hooks), **attrs)
×
165

166
    @classmethod
6✔
167
    def from_bytes(cls, content: bytes) -> HelmPluginInfo:
6✔
168
        return HelmPluginInfo.from_dict(yaml.safe_load(content))
×
169

170

171
_ExternalHelmPlugin = TypeVar("_ExternalHelmPlugin", bound=ExternalHelmPlugin)
6✔
172
_EHPB = TypeVar("_EHPB", bound="ExternalHelmPluginBinding")
6✔
173

174

175
@union(in_scope_types=[EnvironmentName])
6✔
176
@dataclass(frozen=True)
6✔
177
class ExternalHelmPluginBinding(Generic[_ExternalHelmPlugin], metaclass=ABCMeta):
6✔
178
    """Union type allowing Pants to discover global external Helm plugins."""
179

180
    plugin_subsystem_cls: ClassVar[type[ExternalHelmPlugin]]
6✔
181

182
    name: str
6✔
183

184
    @final
6✔
185
    @classmethod
6✔
186
    def create(cls: type[_EHPB]) -> _EHPB:
6✔
187
        return cls(name=cls.plugin_subsystem_cls.plugin_name)
×
188

189

190
@dataclass(frozen=True)
6✔
191
class ExternalHelmPluginRequest(EngineAwareParameter):
6✔
192
    """Helper class to create a download request for an external Helm plugin."""
193

194
    plugin_name: str
6✔
195
    platform: Platform
6✔
196

197
    _tool_request: ExternalToolRequest
6✔
198

199
    @classmethod
6✔
200
    def from_subsystem(
6✔
201
        cls, subsystem: ExternalHelmPlugin, platform: Platform
202
    ) -> ExternalHelmPluginRequest:
203
        return cls(
×
204
            plugin_name=subsystem.plugin_name,
205
            platform=platform,
206
            _tool_request=subsystem.get_request(platform),
207
        )
208

209
    def debug_hint(self) -> str | None:
6✔
210
        return self.plugin_name
×
211

212
    def metadata(self) -> dict[str, Any] | None:
6✔
213
        return {"platform": self.platform, "url": self._tool_request.download_file_request.url}
×
214

215

216
@rule(polymorphic=True)
6✔
217
async def get_external_plugin_request(
6✔
218
    binding: ExternalHelmPluginBinding, env_name: EnvironmentName
219
) -> ExternalHelmPluginRequest:
220
    raise NotImplementedError()
×
221

222

223
@dataclass(frozen=True)
6✔
224
class HelmPlugin(EngineAwareReturnType):
6✔
225
    info: HelmPluginInfo
6✔
226
    platform: Platform
6✔
227
    snapshot: Snapshot
6✔
228

229
    @property
6✔
230
    def name(self) -> str:
6✔
231
        return self.info.name
×
232

233
    @property
6✔
234
    def version(self) -> str:
6✔
235
        return self.info.version
×
236

237
    def level(self) -> LogLevel | None:
6✔
238
        return LogLevel.DEBUG
×
239

240
    def message(self) -> str | None:
6✔
241
        return f"Materialized Helm plugin {self.name} with version {self.version} for {self.platform} platform."
×
242

243
    def metadata(self) -> dict[str, Any] | None:
6✔
244
        return {"name": self.name, "version": self.version, "platform": self.platform}
×
245

246
    def artifacts(self) -> dict[str, FileDigest | Snapshot] | None:
6✔
247
        return {"content": self.snapshot}
×
248

249
    def cacheable(self) -> bool:
6✔
250
        return True
×
251

252

253
class HelmPlugins(Collection[HelmPlugin]):
6✔
254
    pass
6✔
255

256

257
@rule
6✔
258
async def all_helm_plugins(union_membership: UnionMembership) -> HelmPlugins:
6✔
259
    bindings = union_membership.get(ExternalHelmPluginBinding)
×
260
    external_plugin_requests = await concurrently(
×
261
        get_external_plugin_request(**implicitly({binding.create(): ExternalHelmPluginBinding}))
262
        for binding in bindings
263
    )
264
    external_plugins = await concurrently(
×
265
        download_external_helm_plugin(req) for req in external_plugin_requests
266
    )
267

268
    if logger.isEnabledFor(LogLevel.DEBUG.level):
×
269
        plugins_desc = [f"{p.name}, version: {p.version}" for p in external_plugins]
×
270
        logger.debug(
×
271
            f"Downloaded {pluralize(len(external_plugins), 'external Helm plugin')}:\n{bullet_list(plugins_desc)}"
272
        )
273
    return HelmPlugins(external_plugins)
×
274

275

276
@rule(desc="Download external Helm plugin", level=LogLevel.DEBUG)
6✔
277
async def download_external_helm_plugin(request: ExternalHelmPluginRequest) -> HelmPlugin:
6✔
278
    downloaded_tool = await download_external_tool(request._tool_request)
×
279

280
    plugin_info_file = await digest_subset_to_digest(
×
281
        DigestSubset(
282
            downloaded_tool.digest,
283
            PathGlobs(
284
                ["plugin.yaml", "plugin.yml"],
285
                glob_match_error_behavior=GlobMatchErrorBehavior.error,
286
                description_of_origin=request.plugin_name,
287
            ),
288
        )
289
    )
290
    plugin_info_contents = await get_digest_contents(plugin_info_file)
×
291
    if len(plugin_info_contents) == 0:
×
292
        raise HelmPluginMetadataFileNotFound(request.plugin_name)
×
293

294
    plugin_info = HelmPluginInfo.from_bytes(plugin_info_contents[0].content)
×
295
    if not plugin_info.command and not plugin_info.platform_command:
×
296
        raise HelmPluginMissingCommand(request.plugin_name)
×
297

298
    plugin_snapshot = await digest_to_snapshot(downloaded_tool.digest)
×
299
    return HelmPlugin(info=plugin_info, platform=request.platform, snapshot=plugin_snapshot)
×
300

301

302
# ---------------------------------------------
303
# Helm Binary setup
304
# ---------------------------------------------
305

306

307
@dataclass(frozen=True)
6✔
308
class HelmBinary:
6✔
309
    path: str
6✔
310

311
    env: FrozenDict[str, str]
6✔
312
    immutable_input_digests: FrozenDict[str, Digest]
6✔
313

314
    def __init__(
6✔
315
        self,
316
        path: str,
317
        *,
318
        helm_env: Mapping[str, str],
319
        local_env: Mapping[str, str],
320
        immutable_input_digests: Mapping[str, Digest],
321
    ) -> None:
322
        object.__setattr__(self, "path", path)
×
323
        object.__setattr__(self, "immutable_input_digests", FrozenDict(immutable_input_digests))
×
324
        object.__setattr__(self, "env", FrozenDict({**helm_env, **local_env}))
×
325

326
    @property
6✔
327
    def config_digest(self) -> Digest:
6✔
328
        return self.immutable_input_digests[_HELM_CONFIG_DIR]
×
329

330
    @property
6✔
331
    def data_digest(self) -> Digest:
6✔
332
        return self.immutable_input_digests[_HELM_DATA_DIR]
×
333

334
    @property
6✔
335
    def append_only_caches(self) -> dict[str, str]:
6✔
336
        return {_HELM_CACHE_NAME: _HELM_CACHE_DIR}
×
337

338

339
@dataclass(frozen=True)
6✔
340
class HelmProcess:
6✔
341
    argv: tuple[str, ...]
6✔
342
    input_digest: Digest
6✔
343
    description: str = dataclasses.field(compare=False)
6✔
344
    level: LogLevel
6✔
345
    extra_env: FrozenDict[str, str]
6✔
346
    extra_immutable_input_digests: FrozenDict[str, Digest]
6✔
347
    extra_append_only_caches: FrozenDict[str, str]
6✔
348
    cache_scope: ProcessCacheScope | None
6✔
349
    timeout_seconds: int | None
6✔
350
    output_directories: tuple[str, ...]
6✔
351
    output_files: tuple[str, ...]
6✔
352

353
    def __init__(
6✔
354
        self,
355
        argv: Iterable[str],
356
        *,
357
        description: str,
358
        input_digest: Digest = EMPTY_DIGEST,
359
        level: LogLevel = LogLevel.INFO,
360
        output_directories: Iterable[str] | None = None,
361
        output_files: Iterable[str] | None = None,
362
        extra_env: Mapping[str, str] | None = None,
363
        extra_immutable_input_digests: Mapping[str, Digest] | None = None,
364
        extra_append_only_caches: Mapping[str, str] | None = None,
365
        cache_scope: ProcessCacheScope | None = None,
366
        timeout_seconds: int | None = None,
367
    ):
UNCOV
368
        object.__setattr__(self, "argv", tuple(argv))
×
UNCOV
369
        object.__setattr__(self, "input_digest", input_digest)
×
UNCOV
370
        object.__setattr__(self, "description", description)
×
UNCOV
371
        object.__setattr__(self, "level", level)
×
UNCOV
372
        object.__setattr__(self, "output_directories", tuple(output_directories or ()))
×
UNCOV
373
        object.__setattr__(self, "output_files", tuple(output_files or ()))
×
UNCOV
374
        object.__setattr__(self, "extra_env", FrozenDict(extra_env or {}))
×
UNCOV
375
        object.__setattr__(
×
376
            self, "extra_immutable_input_digests", FrozenDict(extra_immutable_input_digests or {})
377
        )
UNCOV
378
        object.__setattr__(
×
379
            self, "extra_append_only_caches", FrozenDict(extra_append_only_caches or {})
380
        )
UNCOV
381
        object.__setattr__(self, "cache_scope", cache_scope)
×
UNCOV
382
        object.__setattr__(self, "timeout_seconds", timeout_seconds)
×
383

384

385
@rule(desc="Download and configure Helm", level=LogLevel.DEBUG)
6✔
386
async def setup_helm(
6✔
387
    helm_subsytem: HelmSubsystem, global_plugins: HelmPlugins, platform: Platform
388
) -> HelmBinary:
389
    downloaded_binary, empty_dirs_digest = await concurrently(
×
390
        download_external_tool(helm_subsytem.get_request(platform)),
391
        create_digest(
392
            CreateDigest(
393
                [
394
                    Directory(_HELM_CONFIG_DIR),
395
                    Directory(_HELM_DATA_DIR),
396
                ]
397
            )
398
        ),
399
    )
400

401
    tool_relpath = "__helm"
×
402
    immutable_input_digests = {tool_relpath: downloaded_binary.digest}
×
403

404
    helm_path = os.path.join(tool_relpath, downloaded_binary.exe)
×
405
    helm_env = {
×
406
        "HELM_CACHE_HOME": _HELM_CACHE_DIR,
407
        "HELM_CONFIG_HOME": _HELM_CONFIG_DIR,
408
        "HELM_DATA_HOME": _HELM_DATA_DIR,
409
    }
410

411
    # Create a digest that will get mutated during the setup process
412
    mutable_input_digest = empty_dirs_digest
×
413

414
    # Install all global Helm plugins
415
    if global_plugins:
×
416
        logger.debug(f"Installing {pluralize(len(global_plugins), 'global Helm plugin')}.")
×
417
        prefixed_plugins_digests = await concurrently(
×
418
            add_prefix(
419
                AddPrefix(
420
                    plugin.snapshot.digest, os.path.join(_HELM_DATA_DIR, "plugins", plugin.name)
421
                )
422
            )
423
            for plugin in global_plugins
424
        )
425
        mutable_input_digest = await merge_digests(
×
426
            MergeDigests([mutable_input_digest, *prefixed_plugins_digests])
427
        )
428

429
    updated_config_digest, updated_data_digest = await concurrently(
×
430
        digest_subset_to_digest(
431
            DigestSubset(mutable_input_digest, PathGlobs([os.path.join(_HELM_CONFIG_DIR, "**")]))
432
        ),
433
        digest_subset_to_digest(
434
            DigestSubset(mutable_input_digest, PathGlobs([os.path.join(_HELM_DATA_DIR, "**")]))
435
        ),
436
    )
437
    config_subset_digest, data_subset_digest = await concurrently(
×
438
        remove_prefix(RemovePrefix(updated_config_digest, _HELM_CONFIG_DIR)),
439
        remove_prefix(RemovePrefix(updated_data_digest, _HELM_DATA_DIR)),
440
    )
441

442
    setup_immutable_digests = {
×
443
        **immutable_input_digests,
444
        _HELM_CONFIG_DIR: config_subset_digest,
445
        _HELM_DATA_DIR: data_subset_digest,
446
    }
447

448
    local_env = await environment_vars_subset(
×
449
        EnvironmentVarsRequest(["HOME", "PATH"]), **implicitly()
450
    )
451
    return HelmBinary(
×
452
        path=helm_path,
453
        helm_env=helm_env,
454
        local_env=local_env,
455
        immutable_input_digests=setup_immutable_digests,
456
    )
457

458

459
@rule
6✔
460
async def helm_process(
6✔
461
    request: HelmProcess,
462
    helm_binary: HelmBinary,
463
    helm_subsystem: HelmSubsystem,
464
) -> Process:
465
    global_extra_env = await environment_vars_subset(
×
466
        EnvironmentVarsRequest(helm_subsystem.extra_env_vars), **implicitly()
467
    )
468

469
    # Helm binary's setup parameters go last to prevent end users overriding any of its values.
470

471
    env = {**global_extra_env, **request.extra_env, **helm_binary.env}
×
472
    immutable_input_digests = {
×
473
        **request.extra_immutable_input_digests,
474
        **helm_binary.immutable_input_digests,
475
    }
476
    append_only_caches = {**request.extra_append_only_caches, **helm_binary.append_only_caches}
×
477

478
    argv = [helm_binary.path, *request.argv]
×
479

480
    # A special case for "--debug".
481
    # This ensures that it is applied to all operations in the chain,
482
    # not just the final one.
483
    # For example, we want this applied to the call to `template`, not just the call to `install`
484
    # Also, we can be helpful and automatically forward a request to debug Pants to also debug Helm
485
    debug_requested = "--debug" in helm_subsystem.valid_args() or (
×
486
        0 < logger.getEffectiveLevel() <= LogLevel.DEBUG.level
487
    )
488
    if debug_requested and "--debug" not in request.argv:
×
489
        argv.append("--debug")
×
490

491
    return Process(
×
492
        argv,
493
        input_digest=request.input_digest,
494
        immutable_input_digests=immutable_input_digests,
495
        env=env,
496
        description=request.description,
497
        level=request.level,
498
        append_only_caches=append_only_caches,
499
        output_directories=request.output_directories,
500
        output_files=request.output_files,
501
        cache_scope=request.cache_scope or ProcessCacheScope.SUCCESSFUL,
502
        timeout_seconds=request.timeout_seconds,
503
    )
504

505

506
def rules():
6✔
507
    return [*collect_rules(), *external_tool.rules(), *process.rules()]
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc