• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18252174847

05 Oct 2025 01:36AM UTC coverage: 43.382% (-36.9%) from 80.261%
18252174847

push

github

web-flow
run tests on mac arm (#22717)

Just doing the minimal to pull forward the x86_64 pattern.

ref #20993

25776 of 59416 relevant lines covered (43.38%)

1.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.26
/src/python/pants/backend/helm/util_rules/renderer.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
3✔
5

6
import dataclasses
3✔
7
import logging
3✔
8
import os
3✔
9
import re
3✔
10
from collections import defaultdict
3✔
11
from collections.abc import Iterable
3✔
12
from dataclasses import dataclass
3✔
13
from enum import Enum
3✔
14
from itertools import chain
3✔
15
from typing import Any
3✔
16

17
from pants.backend.helm.subsystems import post_renderer
3✔
18
from pants.backend.helm.subsystems.post_renderer import HelmPostRenderer
3✔
19
from pants.backend.helm.target_types import (
3✔
20
    HelmChartFieldSet,
21
    HelmDeploymentFieldSet,
22
    HelmDeploymentSourcesField,
23
)
24
from pants.backend.helm.util_rules import chart, tool
3✔
25
from pants.backend.helm.util_rules.chart import (
3✔
26
    FindHelmDeploymentChart,
27
    HelmChart,
28
    HelmChartRequest,
29
    find_chart_for_deployment,
30
    get_helm_chart,
31
)
32
from pants.backend.helm.util_rules.tool import HelmProcess, helm_process
3✔
33
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
3✔
34
from pants.engine.addresses import Address
3✔
35
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType
3✔
36
from pants.engine.fs import (
3✔
37
    EMPTY_DIGEST,
38
    EMPTY_SNAPSHOT,
39
    CreateDigest,
40
    Digest,
41
    DigestSubset,
42
    Directory,
43
    FileContent,
44
    MergeDigests,
45
    PathGlobs,
46
    RemovePrefix,
47
    Snapshot,
48
)
49
from pants.engine.internals.native_engine import FileDigest
3✔
50
from pants.engine.intrinsics import create_digest, digest_to_snapshot, merge_digests
3✔
51
from pants.engine.process import (
3✔
52
    InteractiveProcess,
53
    ProcessCacheScope,
54
    ProcessResult,
55
    execute_process_or_raise,
56
)
57
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
3✔
58
from pants.util.logging import LogLevel
3✔
59
from pants.util.strutil import pluralize, softwrap
3✔
60

61
logger = logging.getLogger(__name__)
3✔
62

63

64
class HelmDeploymentCmd(Enum):
3✔
65
    """Supported Helm rendering commands, for use when creating a `HelmDeploymentRenderer`."""
66

67
    UPGRADE = "upgrade"
3✔
68
    RENDER = "template"
3✔
69

70

71
@dataclass(frozen=True)
3✔
72
class HelmDeploymentRequest(EngineAwareParameter):
3✔
73
    field_set: HelmDeploymentFieldSet
3✔
74

75
    cmd: HelmDeploymentCmd
3✔
76
    description: str = dataclasses.field(compare=False)
3✔
77
    extra_argv: tuple[str, ...]
3✔
78
    post_renderer: HelmPostRenderer | None
3✔
79

80
    def __init__(
3✔
81
        self,
82
        field_set: HelmDeploymentFieldSet,
83
        *,
84
        cmd: HelmDeploymentCmd,
85
        description: str,
86
        extra_argv: Iterable[str] | None = None,
87
        post_renderer: HelmPostRenderer | None = None,
88
    ) -> None:
89
        object.__setattr__(self, "field_set", field_set)
×
90
        object.__setattr__(self, "cmd", cmd)
×
91
        object.__setattr__(self, "description", description)
×
92
        object.__setattr__(self, "extra_argv", tuple(extra_argv or ()))
×
93
        object.__setattr__(self, "post_renderer", post_renderer)
×
94

95
    def debug_hint(self) -> str | None:
3✔
96
        return self.field_set.address.spec
×
97

98
    def metadata(self) -> dict[str, Any] | None:
3✔
99
        return {
×
100
            "cmd": self.cmd.value,
101
            "address": self.field_set.address,
102
            "description": self.description,
103
            "extra_argv": self.extra_argv,
104
            "post_renderer": self.post_renderer,
105
        }
106

107

108
@dataclass(frozen=True)
3✔
109
class _HelmDeploymentProcessWrapper(EngineAwareParameter, EngineAwareReturnType):
3✔
110
    """Intermediate representation of a `HelmProcess` that will produce a fully rendered set of
111
    manifests from a given chart.
112

113
    The encapsulated `process` will be side-effecting depending on the `cmd` that was originally requested.
114

115
    This is meant to only be used internally by this module.
116
    """
117

118
    chart: HelmChart
3✔
119
    cmd: HelmDeploymentCmd
3✔
120
    process: HelmProcess
3✔
121
    address: Address
3✔
122
    output_directory: str | None
3✔
123

124
    @property
3✔
125
    def is_side_effect(self) -> bool:
3✔
126
        return self.cmd != HelmDeploymentCmd.RENDER
×
127

128
    @property
3✔
129
    def uses_post_renderer(self) -> bool:
3✔
130
        if self.output_directory:
×
131
            return False
×
132
        return True
×
133

134
    def debug_hint(self) -> str | None:
3✔
135
        return self.address.spec
×
136

137
    def level(self) -> LogLevel | None:
3✔
138
        return LogLevel.DEBUG
×
139

140
    def message(self) -> str | None:
3✔
141
        msg = softwrap(
×
142
            f"""
143
            Built deployment process for {self.address} using chart {self.chart.address}
144
            with{"out" if not self.output_directory else ""} a post-renderer stage
145
            """
146
        )
147
        if self.output_directory:
×
148
            msg += f" and output directory: {self.output_directory}."
×
149
        else:
150
            msg += " and output to stdout."
×
151
        return msg
×
152

153
    def metadata(self) -> dict[str, Any] | None:
3✔
154
        meta = {
×
155
            "address": self.address.spec,
156
            "chart": self.chart,
157
            "process": self.process,
158
        }
159

160
        if self.output_directory:
×
161
            meta["output_directory"] = self.output_directory
×
162

163
        return meta
×
164

165

166
@dataclass(frozen=True)
3✔
167
class RenderHelmChartRequest(EngineAwareParameter):
3✔
168
    field_set: HelmChartFieldSet
3✔
169
    release_name: str | None = None
3✔
170

171
    def debug_hint(self) -> str:
3✔
172
        return self.field_set.address.spec
×
173

174

175
@dataclass(frozen=True)
3✔
176
class RenderedHelmFiles(EngineAwareReturnType):
3✔
177
    address: Address
3✔
178
    chart: HelmChart
3✔
179
    snapshot: Snapshot
3✔
180
    post_processed: bool
3✔
181

182
    def level(self) -> LogLevel | None:
3✔
183
        return LogLevel.DEBUG
×
184

185
    def message(self) -> str | None:
3✔
186
        return softwrap(
×
187
            f"""
188
            Generated {pluralize(len(self.snapshot.files), "file")} from deployment {self.address}
189
            using chart {self.chart.address}.
190
            """
191
        )
192

193
    def artifacts(self) -> dict[str, FileDigest | Snapshot] | None:
3✔
194
        return {"content": self.snapshot}
×
195

196
    def metadata(self) -> dict[str, Any] | None:
3✔
197
        return {
×
198
            "address": self.address.spec,
199
            "chart": self.chart,
200
            "post_processed": self.post_processed,
201
        }
202

203
    def cacheable(self) -> bool:
3✔
204
        # When using post-renderers it may not be safe to cache the generated files as the final result
205
        # may contain secrets or other kind of sensitive information.
206
        return not self.post_processed
×
207

208

209
async def _sort_value_file_names_for_evaluation(
3✔
210
    address: Address,
211
    *,
212
    sources_field: HelmDeploymentSourcesField,
213
    value_files_snapshot: Snapshot,
214
    prefix: str,
215
) -> list[str]:
216
    """Sorts the list of files in `value_files_snapshot` alphabetically but grouping them in the
217
    order in which they have been given in the `sources_field` field glob patterns."""
218

219
    base_path = address.spec_path
×
220
    result: list[str] = []
×
221

222
    if not sources_field.value:
×
223
        result = list(value_files_snapshot.files)
×
224
        result.sort()
×
225
    else:
226
        # Break the list of filenames in subsets that follow the order given in the `sources` field
227
        subset_snapshots = await concurrently(
×
228
            digest_to_snapshot(
229
                **implicitly(
230
                    DigestSubset(
231
                        value_files_snapshot.digest,
232
                        PathGlobs([os.path.join(base_path, glob_pattern)]),
233
                    )
234
                )
235
            )
236
            for glob_pattern in sources_field.globs
237
        )
238
        sources_subsets = [set(snapshot.files) for snapshot in subset_snapshots]
×
239

240
        def minimise_and_sort_subset(input_subset: set[str]) -> list[str]:
×
241
            result: set[str] = input_subset
×
242
            for subset in sources_subsets:
×
243
                if subset == input_subset:
×
244
                    continue
×
245

246
                if result.issuperset(subset):
×
247
                    result = result.difference(subset)
×
248

249
            result_as_list = list(result)
×
250
            result_as_list.sort()
×
251
            return result_as_list
×
252

253
        result = list(
×
254
            chain.from_iterable([minimise_and_sort_subset(subset) for subset in sources_subsets])
255
        )
256

257
    logger.debug(
×
258
        softwrap(
259
            f"""Value files for {address} would be evaluated using the following order:
260

261
            {", ".join(result)}
262
            """
263
        )
264
    )
265

266
    return [os.path.join(prefix, filename) for filename in result]
×
267

268

269
@rule(desc="Prepare Helm deployment renderer")
3✔
270
async def setup_render_helm_deployment_process(
3✔
271
    request: HelmDeploymentRequest,
272
) -> _HelmDeploymentProcessWrapper:
273
    value_files_prefix = "__values"
×
274
    chart, value_files = await concurrently(
×
275
        find_chart_for_deployment(FindHelmDeploymentChart(request.field_set)),
276
        determine_source_files(
277
            SourceFilesRequest(
278
                sources_fields=[request.field_set.sources],
279
                for_sources_types=[HelmDeploymentSourcesField],
280
                enable_codegen=True,
281
            )
282
        ),
283
    )
284

285
    logger.debug(f"Using Helm chart {chart.address} in deployment {request.field_set.address}.")
×
286

287
    output_dir = None
×
288
    output_digest = EMPTY_DIGEST
×
289
    output_directories = None
×
290
    if not request.post_renderer:
×
291
        output_dir = "__out"
×
292
        output_digest = await create_digest(CreateDigest([Directory(output_dir)]))
×
293
        output_directories = [output_dir]
×
294

295
    # Sort the list of file names following a consistent ordering
296
    sorted_value_files = await _sort_value_file_names_for_evaluation(
×
297
        request.field_set.address,
298
        sources_field=request.field_set.sources,
299
        value_files_snapshot=value_files.snapshot,
300
        prefix=value_files_prefix,
301
    )
302

303
    # Digests to be used as an input into the renderer process.
304
    input_digests = [output_digest]
×
305

306
    # Additional process values in case a post_renderer has been requested.
307
    env: dict[str, str] = {}
×
308
    immutable_input_digests: dict[str, Digest] = {
×
309
        **chart.immutable_input_digests,
310
        value_files_prefix: value_files.snapshot.digest,
311
    }
312
    append_only_caches: dict[str, str] = {}
×
313
    if request.post_renderer:
×
314
        logger.debug(f"Using post-renderer stage in deployment {request.field_set.address}")
×
315
        input_digests.append(request.post_renderer.digest)
×
316
        env.update(request.post_renderer.env)
×
317
        immutable_input_digests.update(request.post_renderer.immutable_input_digests)
×
318
        append_only_caches.update(request.post_renderer.append_only_caches)
×
319

320
    merged_digests = await merge_digests(MergeDigests(input_digests))
×
321

322
    inline_values = request.field_set.values.value
×
323
    release_name = (
×
324
        request.field_set.release_name.value
325
        or request.field_set.address.target_name.replace("_", "-")
326
    )
327

328
    def maybe_escape_string_value(value: str) -> str:
×
329
        if re.findall("\\s+", value):
×
330
            return f'"{value}"'
×
331
        return value
×
332

333
    # If using a post-renderer we are only going to keep the process result cached in
334
    # memory to prevent storing in disk, either locally or remotely, secrets or other
335
    # sensitive values that may have been added in by the post-renderer.
336
    process_cache = (
×
337
        ProcessCacheScope.PER_RESTART_SUCCESSFUL
338
        if request.post_renderer
339
        else ProcessCacheScope.SUCCESSFUL
340
    )
341

342
    process = HelmProcess(
×
343
        argv=[
344
            request.cmd.value,
345
            release_name,
346
            chart.name,
347
            *(
348
                ("--description", f'"{request.field_set.description.value}"')
349
                if request.field_set.description.value
350
                else ()
351
            ),
352
            *(
353
                ("--namespace", request.field_set.namespace.value)
354
                if request.field_set.namespace.value
355
                else ()
356
            ),
357
            *(("--skip-crds",) if request.field_set.skip_crds.value else ()),
358
            *(("--no-hooks",) if request.field_set.no_hooks.value else ()),
359
            *(("--output-dir", output_dir) if output_dir else ()),
360
            *(("--enable-dns",) if request.field_set.enable_dns.value else ()),
361
            *(
362
                ("--post-renderer", os.path.join(".", request.post_renderer.exe))
363
                if request.post_renderer
364
                else ()
365
            ),
366
            *(("--values", ",".join(sorted_value_files)) if sorted_value_files else ()),
367
            *chain.from_iterable(
368
                (
369
                    ("--set", f"{key}={maybe_escape_string_value(value)}")
370
                    for key, value in inline_values.items()
371
                )
372
                if inline_values
373
                else ()
374
            ),
375
            *request.extra_argv,
376
        ],
377
        extra_env=env,
378
        extra_immutable_input_digests=immutable_input_digests,
379
        extra_append_only_caches=append_only_caches,
380
        description=request.description,
381
        level=LogLevel.DEBUG if request.cmd == HelmDeploymentCmd.RENDER else LogLevel.INFO,
382
        input_digest=merged_digests,
383
        output_directories=output_directories,
384
        cache_scope=process_cache,
385
    )
386

387
    return _HelmDeploymentProcessWrapper(
×
388
        cmd=request.cmd,
389
        chart=chart,
390
        process=process,
391
        address=request.field_set.address,
392
        output_directory=output_dir,
393
    )
394

395

396
_YAML_FILE_SEPARATOR = "---"
3✔
397
_HELM_OUTPUT_FILE_MARKER = "# Source: "
3✔
398

399

400
@rule(desc="Render Helm deployment", level=LogLevel.DEBUG)
3✔
401
async def run_renderer(process_wrapper: _HelmDeploymentProcessWrapper) -> RenderedHelmFiles:
3✔
402
    assert not process_wrapper.is_side_effect
×
403

404
    def file_content(file_name: str, lines: Iterable[str]) -> FileContent:
×
405
        sanitised_lines = list(lines)
×
406
        if len(sanitised_lines) == 0:
×
407
            return FileContent(file_name, b"")
×
408
        if sanitised_lines[len(sanitised_lines) - 1] == _YAML_FILE_SEPARATOR:
×
409
            sanitised_lines = sanitised_lines[:-1]
×
410
        if sanitised_lines[0] != _YAML_FILE_SEPARATOR:
×
411
            sanitised_lines = [_YAML_FILE_SEPARATOR, *sanitised_lines]
×
412

413
        content = "\n".join(sanitised_lines) + "\n"
×
414
        return FileContent(file_name, content.encode("utf-8"))
×
415

416
    def parse_renderer_output(result: ProcessResult) -> list[FileContent]:
×
417
        rendered_files_contents = result.stdout.decode("utf-8")
×
418
        rendered_files: dict[str, list[str]] = defaultdict(list)
×
419

420
        curr_file_name = None
×
421
        for line in rendered_files_contents.splitlines():
×
422
            if not line:
×
423
                continue
×
424

425
            if line.startswith(_HELM_OUTPUT_FILE_MARKER):
×
426
                curr_file_name = line[len(_HELM_OUTPUT_FILE_MARKER) :]
×
427

428
            if not curr_file_name:
×
429
                continue
×
430

431
            rendered_files[curr_file_name].append(line)
×
432

433
        return [file_content(file_name, lines) for file_name, lines in rendered_files.items()]
×
434

435
    logger.debug(f"Rendering Helm files for {process_wrapper.address}")
×
436
    result = await execute_process_or_raise(**implicitly({process_wrapper.process: HelmProcess}))
×
437

438
    output_snapshot = EMPTY_SNAPSHOT
×
439
    if not process_wrapper.output_directory:
×
440
        logger.debug(
×
441
            f"Parsing Helm rendered files from the process' output of {process_wrapper.address}."
442
        )
443
        output_snapshot = await digest_to_snapshot(
×
444
            **implicitly(CreateDigest(parse_renderer_output(result)))
445
        )
446
    else:
447
        logger.debug(
×
448
            f"Obtaining Helm rendered files from the process' output directory of {process_wrapper.address}."
449
        )
450
        output_snapshot = await digest_to_snapshot(
×
451
            **implicitly(RemovePrefix(result.output_digest, process_wrapper.output_directory))
452
        )
453

454
    return RenderedHelmFiles(
×
455
        address=process_wrapper.address,
456
        chart=process_wrapper.chart,
457
        snapshot=output_snapshot,
458
        post_processed=process_wrapper.uses_post_renderer,
459
    )
460

461

462
@rule
3✔
463
async def materialize_deployment_process_wrapper_into_interactive_process(
3✔
464
    process_wrapper: _HelmDeploymentProcessWrapper,
465
) -> InteractiveProcess:
466
    assert process_wrapper.is_side_effect
×
467

468
    process = await helm_process(process_wrapper.process, **implicitly())
×
469
    return InteractiveProcess.from_process(process)
×
470

471

472
@rule
3✔
473
async def render_helm_chart(request: RenderHelmChartRequest) -> RenderedHelmFiles:
3✔
474
    output_dir = "__out"
×
475
    chart, empty_output = await concurrently(
×
476
        get_helm_chart(HelmChartRequest(request.field_set), **implicitly()),
477
        create_digest(CreateDigest([Directory(output_dir)])),
478
    )
479

480
    release_name = request.release_name or request.field_set.address.target_name.replace("_", "-")
×
481

482
    result = await execute_process_or_raise(
×
483
        **implicitly(
484
            HelmProcess(
485
                argv=[
486
                    "template",
487
                    release_name,
488
                    chart.name,
489
                    *(
490
                        ("--description", f'"{request.field_set.description.value}"')
491
                        if request.field_set.description.value
492
                        else ()
493
                    ),
494
                    "--output-dir",
495
                    output_dir,
496
                ],
497
                description=f"Rendering chart {request.field_set.address}",
498
                input_digest=empty_output,
499
                extra_immutable_input_digests=chart.immutable_input_digests,
500
                output_directories=(output_dir,),
501
                level=LogLevel.DEBUG,
502
            )
503
        )
504
    )
505

506
    output_snapshot = await digest_to_snapshot(
×
507
        **implicitly(RemovePrefix(result.output_digest, output_dir))
508
    )
509
    return RenderedHelmFiles(
×
510
        address=request.field_set.address,
511
        chart=chart,
512
        snapshot=output_snapshot,
513
        post_processed=False,
514
    )
515

516

517
def rules():
3✔
518
    return [*collect_rules(), *chart.rules(), *tool.rules(), *post_renderer.rules()]
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc