• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basilisp-lang / basilisp / 10943311926

19 Sep 2024 02:41PM CUT coverage: 98.89%. Remained the same
10943311926

Pull #1062

github

web-flow
Merge 1e881487b into 86c59ead1
Pull Request #1062: Prepare for release v0.2.3

1903 of 1910 branches covered (99.63%)

Branch coverage included in aggregate %.

8695 of 8807 relevant lines covered (98.73%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.49
/src/basilisp/lang/map.py
1
from builtins import map as pymap
1✔
2
from itertools import islice
1✔
3
from typing import (
1✔
4
    Any,
5
    Callable,
6
    Iterable,
7
    Mapping,
8
    Optional,
9
    Tuple,
10
    TypeVar,
11
    Union,
12
    cast,
13
)
14

15
from immutables import Map as _Map
1✔
16
from immutables import MapMutation
1✔
17
from typing_extensions import Unpack
1✔
18

19
from basilisp.lang.interfaces import (
1✔
20
    IEvolveableCollection,
21
    ILispObject,
22
    IMapEntry,
23
    INamed,
24
    IPersistentMap,
25
    IPersistentVector,
26
    IReduceKV,
27
    ISeq,
28
    ITransientMap,
29
    IWithMeta,
30
    ReduceKVFunction,
31
)
32
from basilisp.lang.obj import (
1✔
33
    PRINT_SEPARATOR,
34
    SURPASSED_PRINT_LENGTH,
35
    SURPASSED_PRINT_LEVEL,
36
    PrintSettings,
37
    lrepr,
38
    process_lrepr_kwargs,
39
)
40
from basilisp.lang.reduced import Reduced
1✔
41
from basilisp.lang.seq import sequence
1✔
42
from basilisp.lang.vector import MapEntry
1✔
43
from basilisp.util import partition
1✔
44

45
T = TypeVar("T")
1✔
46
K = TypeVar("K")
1✔
47
V = TypeVar("V")
1✔
48
T_reduce = TypeVar("T_reduce")
1✔
49

50
_ENTRY_SENTINEL = object()
1✔
51

52

53
class TransientMap(ITransientMap[K, V]):
1✔
54
    __slots__ = ("_inner",)
1✔
55

56
    def __init__(self, evolver: "MapMutation[K, V]") -> None:
1✔
57
        self._inner = evolver
1✔
58

59
    def __bool__(self):
1✔
60
        return True
×
61

62
    def __call__(self, key, default=None):
1✔
63
        return self._inner.get(key, default)
1✔
64

65
    def __contains__(self, item):
1✔
66
        return item in self._inner
×
67

68
    def __eq__(self, other):
1✔
69
        return self is other
1✔
70

71
    def __len__(self):
1✔
72
        return len(self._inner)
1✔
73

74
    def assoc_transient(self, *kvs) -> "TransientMap":
1✔
75
        for k, v in partition(kvs, 2):
1✔
76
            self._inner[k] = v
1✔
77
        return self
1✔
78

79
    def contains_transient(self, k: K) -> bool:
1✔
80
        return k in self._inner
1✔
81

82
    def dissoc_transient(self, *ks: K) -> "TransientMap[K, V]":
1✔
83
        for k in ks:
1✔
84
            try:
1✔
85
                del self._inner[k]
1✔
86
            except KeyError:
1✔
87
                pass
1✔
88
        return self
1✔
89

90
    def entry_transient(self, k: K) -> Optional[IMapEntry[K, V]]:
1✔
91
        v = self._inner.get(k, cast("V", _ENTRY_SENTINEL))
×
92
        if v is _ENTRY_SENTINEL:
×
93
            return None
×
94
        return MapEntry.of(k, v)
×
95

96
    def val_at(self, k, default=None):
1✔
97
        return self._inner.get(k, default)
1✔
98

99
    def cons_transient(  # type: ignore[override]
1✔
100
        self,
101
        *elems: Union[
102
            IPersistentMap[K, V],
103
            IMapEntry[K, V],
104
            IPersistentVector[Union[K, V]],
105
            Mapping[K, V],
106
        ],
107
    ) -> "TransientMap[K, V]":
108
        try:
1✔
109
            for elem in elems:
1✔
110
                if isinstance(elem, (IPersistentMap, Mapping)):
1✔
111
                    for k, v in elem.items():
1✔
112
                        self._inner[k] = v
1✔
113
                elif isinstance(elem, IMapEntry):
1✔
114
                    self._inner[elem.key] = elem.value
1✔
115
                elif elem is None:
1✔
116
                    continue
1✔
117
                else:
118
                    entry: IMapEntry[K, V] = MapEntry.from_vec(elem)
1✔
119
                    self._inner[entry.key] = entry.value
1✔
120
        except (TypeError, ValueError) as e:
1✔
121
            raise ValueError(
1✔
122
                "Argument to map conj must be another Map or castable to MapEntry"
123
            ) from e
124
        else:
125
            return self
1✔
126

127
    def to_persistent(self) -> "PersistentMap[K, V]":
1✔
128
        return PersistentMap(self._inner.finish())
1✔
129

130

131
def map_lrepr(  # pylint: disable=too-many-locals
1✔
132
    entries: Callable[[], Iterable[Tuple[Any, Any]]],
133
    start: str,
134
    end: str,
135
    meta: Optional[IPersistentMap] = None,
136
    **kwargs: Unpack[PrintSettings],
137
) -> str:
138
    """Produce a Lisp representation of an associative collection, bookended
139
    with the start and end string supplied. The entries argument must be a
140
    callable which will produce tuples of key-value pairs.
141

142
    If the keyword argument print_namespace_maps is True and all keys
143
    share the same namespace, then print the namespace of the keys at
144
    the beginning of the map instead of beside the keys.
145

146
    The keyword arguments will be passed along to lrepr for the sequence
147
    elements.
148

149
    """
150
    print_level = kwargs["print_level"]
1✔
151
    if isinstance(print_level, int) and print_level < 1:
1✔
152
        return SURPASSED_PRINT_LEVEL
1✔
153

154
    kwargs = process_lrepr_kwargs(**kwargs)
1✔
155

156
    def check_same_ns():
1✔
157
        """Check whether all keys in entries belong to the same
158
        namespace. If they do, return the namespace name; otherwise,
159
        return None.
160
        """
161
        nses = set()
1✔
162
        for k, _ in entries():
1✔
163
            if isinstance(k, INamed):
1✔
164
                nses.add(k.ns)
1✔
165
            else:
166
                nses.add(None)
1✔
167
            if len(nses) > 1:
1✔
168
                break
1✔
169
        return next(iter(nses)) if len(nses) == 1 else None
1✔
170

171
    ns_name_shared = check_same_ns() if kwargs["print_namespace_maps"] else None
1✔
172

173
    entries_updated = entries
1✔
174
    if ns_name_shared:
1✔
175

176
        def entries_ns_remove():
1✔
177
            for k, v in entries():
1✔
178
                yield (k.with_name(k.name), v)
1✔
179

180
        entries_updated = entries_ns_remove
1✔
181

182
    kw_items = kwargs.copy()
1✔
183
    kw_items["human_readable"] = False
1✔
184

185
    def entry_reprs():
1✔
186
        for k, v in entries_updated():
1✔
187
            yield f"{lrepr(k, **kw_items)} {lrepr(v, **kw_items)}"
1✔
188

189
    trailer = []
1✔
190
    print_dup = kwargs["print_dup"]
1✔
191
    print_length = kwargs["print_length"]
1✔
192
    if not print_dup and isinstance(print_length, int):
1✔
193
        items = list(islice(entry_reprs(), print_length + 1))
1✔
194
        if len(items) > print_length:
1✔
195
            items.pop()
1✔
196
            trailer.append(SURPASSED_PRINT_LENGTH)
1✔
197
    else:
198
        items = list(entry_reprs())
1✔
199

200
    seq_lrepr = PRINT_SEPARATOR.join(items + trailer)
1✔
201

202
    ns_prefix = ("#:" + ns_name_shared) if ns_name_shared else ""
1✔
203
    if kwargs["print_meta"] and meta:
1✔
204
        kwargs_meta = kwargs
1✔
205
        kwargs_meta["print_level"] = None
1✔
206
        return f"^{lrepr(meta,**kwargs_meta)} {ns_prefix}{start}{seq_lrepr}{end}"
1✔
207

208
    return f"{ns_prefix}{start}{seq_lrepr}{end}"
1✔
209

210

211
@lrepr.register(dict)
1✔
212
def _lrepr_py_dict(o: dict, **kwargs: Unpack[PrintSettings]) -> str:
1✔
213
    return f"#py {map_lrepr(o.items, '{', '}', **kwargs)}"
1✔
214

215

216
class PersistentMap(
1✔
217
    IPersistentMap[K, V],
218
    IEvolveableCollection[TransientMap],
219
    IReduceKV,
220
    ILispObject,
221
    IWithMeta,
222
):
223
    """Basilisp Map. Delegates internally to a immutables.Map object.
224
    Do not instantiate directly. Instead use the m() and map() factory
225
    methods below."""
226

227
    __slots__ = ("_inner", "_meta")
1✔
228

229
    def __init__(
1✔
230
        self,
231
        m: "_Map[K, V]",
232
        meta: Optional[IPersistentMap] = None,
233
    ) -> None:
234
        self._inner = m
1✔
235
        self._meta = meta
1✔
236

237
    @classmethod
1✔
238
    def from_coll(
1✔
239
        cls,
240
        members: Union[Mapping[K, V], Iterable[Tuple[K, V]]],
241
        meta: Optional[IPersistentMap] = None,
242
    ) -> "PersistentMap[K, V]":
243
        return PersistentMap(_Map(members), meta=meta)
1✔
244

245
    def __bool__(self):
1✔
246
        return True
1✔
247

248
    def __call__(self, key, default=None):
1✔
249
        return self._inner.get(key, default)
1✔
250

251
    def __contains__(self, item):
1✔
252
        return item in self._inner
1✔
253

254
    def __eq__(self, other):
1✔
255
        if self is other:
1✔
256
            return True
1✔
257
        if not isinstance(other, Mapping):
1✔
258
            return NotImplemented
1✔
259
        if len(self._inner) != len(other):
1✔
260
            return False
1✔
261
        return self._inner == other
1✔
262

263
    def __getitem__(self, item):
1✔
264
        return self._inner[item]
1✔
265

266
    def __hash__(self):
1✔
267
        return hash(self._inner)
1✔
268

269
    def __iter__(self):
1✔
270
        return iter(self._inner)
1✔
271

272
    def __len__(self):
1✔
273
        return len(self._inner)
1✔
274

275
    def _lrepr(self, **kwargs: Unpack[PrintSettings]):
1✔
276
        return map_lrepr(
1✔
277
            self._inner.items,
278
            start="{",
279
            end="}",
280
            meta=self._meta,
281
            **kwargs,
282
        )
283

284
    @property
1✔
285
    def meta(self) -> Optional[IPersistentMap]:
1✔
286
        return self._meta
1✔
287

288
    def with_meta(self, meta: Optional[IPersistentMap]) -> "PersistentMap":
1✔
289
        return PersistentMap(self._inner, meta=meta)
1✔
290

291
    def assoc(self, *kvs):
1✔
292
        with self._inner.mutate() as m:
1✔
293
            for k, v in partition(kvs, 2):
1✔
294
                m[k] = v
1✔
295
            return PersistentMap(m.finish())
1✔
296

297
    def contains(self, k):
1✔
298
        return k in self._inner
1✔
299

300
    def dissoc(self, *ks):
1✔
301
        with self._inner.mutate() as m:
1✔
302
            for k in ks:
1✔
303
                try:
1✔
304
                    del m[k]
1✔
305
                except KeyError:
1✔
306
                    pass
1✔
307
            return PersistentMap(m.finish())
1✔
308

309
    def entry(self, k):
1✔
310
        v = self._inner.get(k, cast("V", _ENTRY_SENTINEL))
1✔
311
        if v is _ENTRY_SENTINEL:
1✔
312
            return None
1✔
313
        return MapEntry.of(k, v)
1✔
314

315
    def val_at(self, k, default=None):
1✔
316
        return self._inner.get(k, default)
1✔
317

318
    def update(self, *maps: Mapping[K, V]) -> "PersistentMap":
1✔
319
        m: _Map = self._inner.update(*(m.items() for m in maps))
1✔
320
        return PersistentMap(m)
1✔
321

322
    def update_with(  # type: ignore[return]
1✔
323
        self, merge_fn: Callable[[V, V], V], *maps: Mapping[K, V]
324
    ) -> "PersistentMap[K, V]":
325
        with self._inner.mutate() as m:
×
326
            for map in maps:
×
327
                for k, v in map.items():
×
328
                    m.set(k, merge_fn(m[k], v) if k in m else v)
×
329
            return PersistentMap(m.finish())
×
330

331
    def cons(  # type: ignore[override, return]
1✔
332
        self,
333
        *elems: Union[
334
            IPersistentMap[K, V],
335
            IMapEntry[K, V],
336
            IPersistentVector[Union[K, V]],
337
            Mapping[K, V],
338
        ],
339
    ) -> "PersistentMap[K, V]":
340
        with self._inner.mutate() as m:
1✔
341
            try:
1✔
342
                for elem in elems:
1✔
343
                    if isinstance(elem, (IPersistentMap, Mapping)):
1✔
344
                        for k, v in elem.items():
1✔
345
                            m.set(k, v)
1✔
346
                    elif isinstance(elem, IMapEntry):
1✔
347
                        m.set(elem.key, elem.value)
1✔
348
                    elif elem is None:
1✔
349
                        continue
1✔
350
                    else:
351
                        entry: IMapEntry[K, V] = MapEntry.from_vec(elem)
1✔
352
                        m.set(entry.key, entry.value)
1✔
353
            except (TypeError, ValueError) as e:
1✔
354
                raise ValueError(
1✔
355
                    "Argument to map conj must be another Map or castable to MapEntry"
356
                ) from e
357
            else:
358
                return PersistentMap(m.finish(), meta=self.meta)
1✔
359

360
    @staticmethod
1✔
361
    def empty() -> "PersistentMap":
1✔
362
        return EMPTY
1✔
363

364
    def seq(self) -> Optional[ISeq[IMapEntry[K, V]]]:
1✔
365
        if len(self._inner) == 0:
1✔
366
            return None
1✔
367
        return sequence(MapEntry.of(k, v) for k, v in self._inner.items())
1✔
368

369
    def to_transient(self) -> TransientMap[K, V]:
1✔
370
        return TransientMap(self._inner.mutate())
1✔
371

372
    def reduce_kv(self, f: ReduceKVFunction, init: T_reduce) -> T_reduce:
1✔
373
        for k, v in self._inner.items():
1✔
374
            init = f(init, k, v)
1✔
375
            if isinstance(init, Reduced):
1✔
376
                return init.deref()
1✔
377
        return init
1✔
378

379

380
EMPTY: PersistentMap = PersistentMap.from_coll(())
1✔
381

382

383
def map(  # pylint:disable=redefined-builtin
1✔
384
    kvs: Mapping[K, V], meta: Optional[IPersistentMap] = None
385
) -> PersistentMap[K, V]:
386
    """Creates a new map."""
387
    # For some reason, creating a new `immutables.Map` instance from an existing
388
    # `basilisp.lang.map.PersistentMap` instance causes issues because the `__iter__`
389
    # returns only the keys rather than tuple of key/value pairs, even though it
390
    # adheres to the `Mapping` protocol. Passing the `.items()` directly bypasses
391
    # this problem.
392
    return PersistentMap.from_coll(kvs.items(), meta=meta)
1✔
393

394

395
def m(**kvs) -> PersistentMap[str, V]:
1✔
396
    """Creates a new map from keyword arguments."""
397
    return PersistentMap.from_coll(kvs)
1✔
398

399

400
def from_entries(entries: Iterable[MapEntry[K, V]]) -> PersistentMap[K, V]:  # type: ignore[return]
1✔
401
    with _Map().mutate() as m:  # type: ignore[var-annotated]
1✔
402
        for entry in entries:
1✔
403
            m.set(entry.key, entry.value)
1✔
404
        return PersistentMap(m.finish())
1✔
405

406

407
def hash_map(*pairs) -> PersistentMap:
1✔
408
    entries = pymap(lambda v: MapEntry.of(v[0], v[1]), partition(pairs, 2))
1✔
409
    return from_entries(entries)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc