• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19529437518

20 Nov 2025 07:44AM UTC coverage: 78.884% (-1.4%) from 80.302%
19529437518

push

github

web-flow
nfpm.native_libs: Add RPM package depends from packaged pex_binaries (#22899)

## PR Series Overview

This is the second in a series of PRs that introduces a new backend:
`pants.backend.npm.native_libs`
Initially, the backend will be available as:
`pants.backend.experimental.nfpm.native_libs`

I proposed this new backend (originally named `bindeps`) in discussion
#22396.

This backend will inspect ELF bin/lib files (like `lib*.so`) in packaged
contents (for this PR series, only in `pex_binary` targets) to identify
package dependency metadata and inject that metadata on the relevant
`nfpm_deb_package` or `nfpm_rpm_package` targets. Effectively, it will
provide an approximation of these native packager features:
- `rpm`: `rpmdeps` + `elfdeps`
- `deb`: `dh_shlibdeps` + `dpkg-shlibdeps` (These substitute
`${shlibs:Depends}` in debian control files have)

### Goal: Host-agnostic package builds

This pants backend is designed to be host-agnostic, like
[nFPM](https://nfpm.goreleaser.com/).

Native packaging tools are often restricted to a single release of a
single distro. Unlike native package builders, this new pants backend
does not use any of those distro-specific or distro-release-specific
utilities or local package databases. This new backend should be able to
run (help with building deb and rpm packages) anywhere that pants can
run (MacOS, rpm linux distros, deb linux distros, other linux distros,
docker, ...).

### Previous PRs in series

- #22873

## PR Overview

This PR adds rules in `nfpm.native_libs` to add package dependency
metadata to `nfpm_rpm_package`. The 2 new rules are:

- `inject_native_libs_dependencies_in_package_fields`:

    - An implementation of the polymorphic rule `inject_nfpm_package_fields`.
      This rule is low priority (`priority = 2`) so that in-repo plugins can
      override/augment what it injects. (See #22864)

    - Rule logic overview:
        - find any pex_binaries that will be packaged in an `nfpm_rpm_package`
   ... (continued)

96 of 118 new or added lines in 3 files covered. (81.36%)

910 existing lines in 53 files now uncovered.

73897 of 93678 relevant lines covered (78.88%)

3.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.46
/src/python/pants/backend/helm/utils/yaml.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
9✔
5

6
from abc import ABCMeta
9✔
7
from collections import defaultdict
9✔
8
from collections.abc import Callable, Iterable, Iterator, Mapping
9✔
9
from dataclasses import dataclass
9✔
10
from pathlib import PurePath
9✔
11
from typing import Any, Generic, TypeVar
9✔
12

13
from pants.engine.collection import Collection
9✔
14
from pants.util.frozendict import FrozenDict
9✔
15

16

17
@dataclass(unsafe_hash=True)
9✔
18
class YamlPath:
9✔
19
    """Simple implementation of YAML paths using `/` syntax and being the single slash the path to
20
    the root."""
21

22
    _elements: tuple[str, ...]
9✔
23
    _absolute: bool
9✔
24

25
    def __init__(self, elements: Iterable[str], *, absolute: bool) -> None:
9✔
26
        object.__setattr__(self, "_elements", tuple(elements))
2✔
27
        object.__setattr__(self, "_absolute", absolute)
2✔
28

29
        if len(self._elements) == 0 and not self._absolute:
2✔
30
            raise ValueError("Relative YAML paths with no elements are not allowed.")
×
31

32
    @classmethod
9✔
33
    def parse(cls, path: str) -> YamlPath:
9✔
34
        """Parses a YAML path."""
35

36
        is_absolute = path.startswith("/")
2✔
37
        return cls([elem for elem in path.split("/") if elem], absolute=is_absolute)
2✔
38

39
    @classmethod
9✔
40
    def root(cls) -> YamlPath:
9✔
41
        """Returns a YamlPath that represents the root element."""
42

UNCOV
43
        return cls([], absolute=True)
×
44

45
    @classmethod
9✔
46
    def index(cls, idx: int) -> YamlPath:
9✔
47
        """Returns a relative YamlPath for the index value provided."""
48

UNCOV
49
        return cls([str(idx)], absolute=False)
×
50

51
    @property
9✔
52
    def parent(self) -> YamlPath | None:
9✔
53
        """Returns the path to the parent element unless this path is already the root."""
54

UNCOV
55
        if not self.is_root:
×
UNCOV
56
            return YamlPath(self._elements[:-1], absolute=self._absolute)
×
UNCOV
57
        return None
×
58

59
    @property
9✔
60
    def current(self) -> str:
9✔
61
        """Returns the name of the current element referenced by this path.
62

63
        The root element will return the empty string.
64
        """
65

UNCOV
66
        if self.is_root:
×
UNCOV
67
            return ""
×
UNCOV
68
        return self._elements[len(self._elements) - 1]
×
69

70
    @property
9✔
71
    def is_absolute(self) -> bool:
9✔
72
        """Returns `True` if this is an absolute path."""
73

UNCOV
74
        return self._absolute
×
75

76
    @property
9✔
77
    def is_root(self) -> bool:
9✔
78
        """Returns `True` if this path represents the root element."""
79

UNCOV
80
        return len(self._elements) == 0 and self._absolute
×
81

82
    @property
9✔
83
    def is_index(self) -> bool:
9✔
84
        """Returns `True` if this path is referencing an indexed item inside an array."""
85

UNCOV
86
        try:
×
UNCOV
87
            int(self.current)
×
UNCOV
88
            return True
×
UNCOV
89
        except ValueError:
×
UNCOV
90
            return False
×
91

92
    def to_relative(self) -> YamlPath:
9✔
93
        """Transforms this YamlPath instance into a relative path."""
94

95
        if not self._absolute:
×
96
            return self
×
97
        return YamlPath(self._elements, absolute=False)
×
98

99
    def __truediv__(self, other: str | int | YamlPath) -> YamlPath:
9✔
UNCOV
100
        if isinstance(other, str):
×
UNCOV
101
            other_path = YamlPath.parse(other)
×
UNCOV
102
        elif isinstance(other, int):
×
103
            other_path = YamlPath.index(other)
×
104
        else:
UNCOV
105
            other_path = other
×
106

UNCOV
107
        if other_path._absolute:
×
108
            raise ValueError("Can not append an absolute path to another path.")
×
109

UNCOV
110
        return YamlPath(self._elements + other_path._elements, absolute=self._absolute)
×
111

112
    def __iter__(self):
9✔
113
        return iter(self._elements)
×
114

115
    def __str__(self) -> str:
9✔
116
        path = "/".join(self._elements)
×
117
        if self._absolute:
×
118
            path = f"/{path}"
×
119
        return path
×
120

121

122
@dataclass(frozen=True)
9✔
123
class YamlElement(metaclass=ABCMeta):
9✔
124
    """Abstract base class for elements read from YAML files.
125

126
    `element_path` represents the location inside the YAML file where this element is.
127
    """
128

129
    element_path: YamlPath
9✔
130

131

132
T = TypeVar("T")
9✔
133
R = TypeVar("R")
9✔
134

135

136
class MutableYamlIndex(Generic[T]):
9✔
137
    """Represents a mutable collection of items that is indexed by the following keys:
138

139
    - the relative path of the YAML file
140
    - the document index inside the YAML file
141
    - the YAML path of the item
142
    """
143

144
    _data: dict[PurePath, dict[int, dict[YamlPath, T]]]
9✔
145

146
    def __init__(self) -> None:
9✔
147
        self._data = defaultdict(dict)
1✔
148

149
    def insert(
9✔
150
        self, *, file_path: PurePath, yaml_path: YamlPath, item: T, document_index: int = 0
151
    ) -> None:
152
        """Inserts an item at the given position in the index."""
153

154
        doc_index = self._data[file_path].get(document_index, {})
1✔
155
        if not doc_index:
1✔
156
            self._data[file_path][document_index] = doc_index
1✔
157

158
        doc_index[yaml_path] = item
1✔
159

160
    def frozen(self) -> FrozenYamlIndex[T]:
9✔
161
        """Transforms this collection into a frozen (immutable) one."""
162

163
        return FrozenYamlIndex.create(self)
1✔
164

165

166
@dataclass(frozen=True)
9✔
167
class _YamlDocumentIndexNode(Generic[T]):
9✔
168
    """Helper node item for the `FrozenYamlIndex` type."""
169

170
    paths: FrozenDict[YamlPath, T]
9✔
171

172
    @classmethod
9✔
173
    def empty(cls: type[_YamlDocumentIndexNode[T]]) -> _YamlDocumentIndexNode[T]:
9✔
174
        return cls(paths=FrozenDict())
1✔
175

176
    def to_json_dict(self) -> dict[str, dict[str, str]]:
9✔
177
        items_dict: dict[str, str] = {}
×
178
        for path, item in self.paths.items():
×
179
            items_dict[str(path)] = str(item)
×
180
        return {"paths": items_dict}
×
181

182

183
@dataclass(frozen=True)
9✔
184
class FrozenYamlIndex(Generic[T]):
9✔
185
    """Represents a frozen collection of items that is indexed by the following keys:
186

187
    - the relative path of the YAML file
188
    - the document index inside the YAML file
189
    - the YAML path of the item
190
    """
191

192
    _data: FrozenDict[PurePath, Collection[_YamlDocumentIndexNode[T]]]
9✔
193

194
    @classmethod
9✔
195
    def create(cls, other: MutableYamlIndex[T]) -> FrozenYamlIndex[T]:
9✔
196
        data: dict[PurePath, Collection[_YamlDocumentIndexNode[T]]] = {}
1✔
197
        for file_path, doc_index in other._data.items():
1✔
198
            max_index = max(doc_index.keys())
1✔
199
            doc_list: list[_YamlDocumentIndexNode[T]] = [_YamlDocumentIndexNode.empty()] * (
1✔
200
                max_index + 1
201
            )
202

203
            for idx, item_map in doc_index.items():
1✔
204
                doc_list[idx] = _YamlDocumentIndexNode(paths=FrozenDict(item_map))
1✔
205

206
            data[file_path] = Collection(doc_list)
1✔
207
        return FrozenYamlIndex(_data=FrozenDict(data))
1✔
208

209
    @classmethod
9✔
210
    def empty(cls: type[FrozenYamlIndex[T]]) -> FrozenYamlIndex[T]:
9✔
211
        return FrozenYamlIndex[T](_data=FrozenDict())
×
212

213
    def transform_values(self, func: Callable[[T], R | None]) -> FrozenYamlIndex[R]:
9✔
214
        """Transforms the values of the given indexed collection into those that are returned from
215
        the received function.
216

217
        The items that map to `None` in the given function are not included in the result.
218

219
        This is a combination of the `map` and `filter` higher-order functions into one so
220
        both operations are performed in a single pass.
221
        """
222

223
        mutable_index: MutableYamlIndex[R] = MutableYamlIndex()
×
224
        for file_path, doc_index, yaml_path, item in self:
×
225
            new_item = func(item)
×
226
            if new_item is not None:
×
227
                mutable_index.insert(
×
228
                    file_path=file_path,
229
                    document_index=doc_index,
230
                    yaml_path=yaml_path,
231
                    item=new_item,
232
                )
233
        return mutable_index.frozen()
×
234

235
    def values(self) -> Iterator[T]:
9✔
236
        """Returns an iterator over the values of this index."""
237
        for _, _, _, item in self:
1✔
238
            yield item
1✔
239

240
    def to_json_dict(self) -> dict[str, Any]:
9✔
241
        """Transforms this collection into a JSON-like dictionary that can be dumped later."""
242

243
        result = {}
×
244
        for file_path, documents in self._data.items():
×
245
            result[str(file_path)] = [doc_idx.to_json_dict() for doc_idx in documents]
×
246
        return result
×
247

248
    def __iter__(self):
9✔
249
        for file_path, doc_indexes in self._data.items():
1✔
250
            for idx, doc_index in enumerate(doc_indexes):
1✔
251
                for yaml_path, item in doc_index.paths.items():
1✔
252
                    yield file_path, idx, yaml_path, item
1✔
253

254

255
def _to_snake_case(str: str) -> str:
9✔
256
    """Translates a camel-case or kebab-case identifier into a snake-case one."""
257

258
    base_string = str.replace("-", "_")
1✔
259

260
    result = ""
1✔
261
    idx = 0
1✔
262
    for c in base_string:
1✔
263
        char_to_add = c
1✔
264
        if char_to_add.isupper():
1✔
265
            char_to_add = c.lower()
1✔
266
            if idx > 0:
1✔
267
                result += "_"
1✔
268
        result += char_to_add
1✔
269
        idx += 1
1✔
270

271
    return result
1✔
272

273

274
def snake_case_attr_dict(d: Mapping[str, Any]) -> dict[str, Any]:
9✔
275
    """Transforms all keys in the given mapping to be snake-case."""
276
    return {_to_snake_case(name): value for name, value in d.items()}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc