• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basilisp-lang / basilisp / 13100583921

02 Feb 2025 04:35PM CUT coverage: 98.651% (-0.03%) from 98.68%
13100583921

Pull #1205

github

web-flow
Merge 96813dcc3 into 2e37ca36f
Pull Request #1205: fix testrunner to handle relative entries in sys.path

1032 of 1041 branches covered (99.14%)

Branch coverage included in aggregate %.

5 of 5 new or added lines in 1 file covered. (100.0%)

3 existing lines in 1 file now uncovered.

8837 of 8963 relevant lines covered (98.59%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.62
/src/basilisp/lang/map.py
1
from builtins import map as pymap
1✔
2
from collections.abc import Iterable, Mapping
1✔
3
from itertools import islice
1✔
4
from typing import Any, Callable, Optional, TypeVar, Union, cast
1✔
5

6
from immutables import Map as _Map
1✔
7
from immutables import MapMutation
1✔
8
from typing_extensions import Unpack
1✔
9

10
from basilisp.lang.interfaces import (
1✔
11
    IEvolveableCollection,
12
    ILispObject,
13
    IMapEntry,
14
    INamed,
15
    IPersistentMap,
16
    IPersistentVector,
17
    IReduceKV,
18
    ISeq,
19
    ITransientMap,
20
    IWithMeta,
21
    ReduceKVFunction,
22
)
23
from basilisp.lang.obj import (
1✔
24
    PRINT_SEPARATOR,
25
    SURPASSED_PRINT_LENGTH,
26
    SURPASSED_PRINT_LEVEL,
27
    PrintSettings,
28
    lrepr,
29
    process_lrepr_kwargs,
30
)
31
from basilisp.lang.reduced import Reduced
1✔
32
from basilisp.lang.seq import sequence
1✔
33
from basilisp.lang.vector import MapEntry
1✔
34
from basilisp.util import partition
1✔
35

36
T = TypeVar("T")
1✔
37
K = TypeVar("K")
1✔
38
V = TypeVar("V")
1✔
39
T_reduce = TypeVar("T_reduce")
1✔
40

41
_ENTRY_SENTINEL = object()
1✔
42

43

44
class TransientMap(ITransientMap[K, V]):
1✔
45
    __slots__ = ("_inner",)
1✔
46

47
    def __init__(self, evolver: "MapMutation[K, V]") -> None:
1✔
48
        self._inner = evolver
1✔
49

50
    def __bool__(self):
1✔
51
        return True
×
52

53
    def __call__(self, key, default=None):
1✔
54
        return self._inner.get(key, default)
1✔
55

56
    def __contains__(self, item):
1✔
57
        return item in self._inner
×
58

59
    def __eq__(self, other):
1✔
60
        return self is other
1✔
61

62
    def __len__(self):
1✔
63
        return len(self._inner)
1✔
64

65
    def assoc_transient(self, *kvs) -> "TransientMap":
1✔
66
        for k, v in partition(kvs, 2):
1✔
67
            self._inner[k] = v
1✔
68
        return self
1✔
69

70
    def contains_transient(self, k: K) -> bool:
1✔
71
        return k in self._inner
1✔
72

73
    def dissoc_transient(self, *ks: K) -> "TransientMap[K, V]":
1✔
74
        for k in ks:
1✔
75
            try:
1✔
76
                del self._inner[k]
1✔
77
            except KeyError:
1✔
78
                pass
1✔
79
        return self
1✔
80

81
    def entry_transient(self, k: K) -> Optional[IMapEntry[K, V]]:
1✔
82
        v = self._inner.get(k, cast("V", _ENTRY_SENTINEL))
×
83
        if v is _ENTRY_SENTINEL:
×
84
            return None
×
85
        return MapEntry.of(k, v)
×
86

87
    def val_at(self, k, default=None):
1✔
88
        return self._inner.get(k, default)
1✔
89

90
    def cons_transient(  # type: ignore[override]
1✔
91
        self,
92
        *elems: Union[
93
            IPersistentMap[K, V],
94
            IMapEntry[K, V],
95
            IPersistentVector[Union[K, V]],
96
            Mapping[K, V],
97
        ],
98
    ) -> "TransientMap[K, V]":
99
        try:
1✔
100
            for elem in elems:
1✔
101
                if isinstance(elem, (IPersistentMap, Mapping)):
1✔
102
                    for k, v in elem.items():
1✔
103
                        self._inner[k] = v
1✔
104
                elif isinstance(elem, IMapEntry):
1✔
105
                    self._inner[elem.key] = elem.value
1✔
106
                elif elem is None:
1✔
107
                    continue
1✔
108
                else:
109
                    entry: IMapEntry[K, V] = MapEntry.from_vec(elem)
1✔
110
                    self._inner[entry.key] = entry.value
1✔
111
        except (TypeError, ValueError) as e:
1✔
112
            raise ValueError(
1✔
113
                "Argument to map conj must be another Map or castable to MapEntry"
114
            ) from e
115
        else:
116
            return self
1✔
117

118
    def to_persistent(self) -> "PersistentMap[K, V]":
1✔
119
        return PersistentMap(self._inner.finish())
1✔
120

121

122
def map_lrepr(  # pylint: disable=too-many-locals
1✔
123
    entries: Callable[[], Iterable[tuple[Any, Any]]],
124
    start: str,
125
    end: str,
126
    meta: Optional[IPersistentMap] = None,
127
    **kwargs: Unpack[PrintSettings],
128
) -> str:
129
    """Produce a Lisp representation of an associative collection, bookended
130
    with the start and end string supplied. The entries argument must be a
131
    callable which will produce tuples of key-value pairs.
132

133
    If the keyword argument print_namespace_maps is True and all keys
134
    share the same namespace, then print the namespace of the keys at
135
    the beginning of the map instead of beside the keys.
136

137
    The keyword arguments will be passed along to lrepr for the sequence
138
    elements.
139

140
    """
141
    print_level = kwargs["print_level"]
1✔
142
    if isinstance(print_level, int) and print_level < 1:
1✔
143
        return SURPASSED_PRINT_LEVEL
1✔
144

145
    kwargs = process_lrepr_kwargs(**kwargs)
1✔
146

147
    def check_same_ns():
1✔
148
        """Check whether all keys in entries belong to the same
149
        namespace. If they do, return the namespace name; otherwise,
150
        return None.
151
        """
152
        nses = set()
1✔
153
        for k, _ in entries():
1✔
154
            if isinstance(k, INamed):
1✔
155
                nses.add(k.ns)
1✔
156
            else:
157
                nses.add(None)
1✔
158
            if len(nses) > 1:
1✔
159
                break
1✔
160
        return next(iter(nses)) if len(nses) == 1 else None
1✔
161

162
    ns_name_shared = check_same_ns() if kwargs["print_namespace_maps"] else None
1✔
163

164
    entries_updated = entries
1✔
165
    if ns_name_shared:
1✔
166

167
        def entries_ns_remove():
1✔
168
            for k, v in entries():
1✔
169
                yield (k.with_name(k.name), v)
1✔
170

171
        entries_updated = entries_ns_remove
1✔
172

173
    kw_items = kwargs.copy()
1✔
174
    kw_items["human_readable"] = False
1✔
175

176
    def entry_reprs():
1✔
177
        for k, v in entries_updated():
1✔
178
            yield f"{lrepr(k, **kw_items)} {lrepr(v, **kw_items)}"
1✔
179

180
    trailer = []
1✔
181
    print_dup = kwargs["print_dup"]
1✔
182
    print_length = kwargs["print_length"]
1✔
183
    if not print_dup and isinstance(print_length, int):
1✔
184
        items = list(islice(entry_reprs(), print_length + 1))
1✔
185
        if len(items) > print_length:
1✔
186
            items.pop()
1✔
187
            trailer.append(SURPASSED_PRINT_LENGTH)
1✔
188
    else:
189
        items = list(entry_reprs())
1✔
190

191
    seq_lrepr = PRINT_SEPARATOR.join(items + trailer)
1✔
192

193
    ns_prefix = ("#:" + ns_name_shared) if ns_name_shared else ""
1✔
194
    if kwargs["print_meta"] and meta:
1✔
195
        kwargs_meta = kwargs
1✔
196
        kwargs_meta["print_level"] = None
1✔
197
        return f"^{lrepr(meta,**kwargs_meta)} {ns_prefix}{start}{seq_lrepr}{end}"
1✔
198

199
    return f"{ns_prefix}{start}{seq_lrepr}{end}"
1✔
200

201

202
@lrepr.register(dict)
1✔
203
def _lrepr_py_dict(o: dict, **kwargs: Unpack[PrintSettings]) -> str:
1✔
204
    return f"#py {map_lrepr(o.items, '{', '}', **kwargs)}"
1✔
205

206

207
class PersistentMap(
1✔
208
    IPersistentMap[K, V],
209
    IEvolveableCollection[TransientMap],
210
    IReduceKV,
211
    ILispObject,
212
    IWithMeta,
213
):
214
    """Basilisp Map. Delegates internally to a immutables.Map object.
215
    Do not instantiate directly. Instead use the m() and map() factory
216
    methods below."""
217

218
    __slots__ = ("_inner", "_meta")
1✔
219

220
    def __init__(
1✔
221
        self,
222
        m: "_Map[K, V]",
223
        meta: Optional[IPersistentMap] = None,
224
    ) -> None:
225
        self._inner = m
1✔
226
        self._meta = meta
1✔
227

228
    @classmethod
1✔
229
    def from_coll(
1✔
230
        cls,
231
        members: Union[Mapping[K, V], Iterable[tuple[K, V]]],
232
        meta: Optional[IPersistentMap] = None,
233
    ) -> "PersistentMap[K, V]":
234
        return PersistentMap(_Map(members), meta=meta)
1✔
235

236
    def __bool__(self):
1✔
237
        return True
1✔
238

239
    def __call__(self, key, default=None):
1✔
240
        return self._inner.get(key, default)
1✔
241

242
    def __contains__(self, item):
1✔
243
        return item in self._inner
1✔
244

245
    def __eq__(self, other):
1✔
246
        if self is other:
1✔
247
            return True
1✔
248
        if not isinstance(other, Mapping):
1✔
249
            return NotImplemented
1✔
250
        if len(self._inner) != len(other):
1✔
251
            return False
1✔
252
        return self._inner == other
1✔
253

254
    def __getitem__(self, item):
1✔
255
        return self._inner[item]
1✔
256

257
    def __hash__(self):
1✔
258
        return hash(self._inner)
1✔
259

260
    def __iter__(self):
1✔
261
        return iter(self._inner)
1✔
262

263
    def __len__(self):
1✔
264
        return len(self._inner)
1✔
265

266
    def _lrepr(self, **kwargs: Unpack[PrintSettings]):
1✔
267
        return map_lrepr(
1✔
268
            self._inner.items,
269
            start="{",
270
            end="}",
271
            meta=self._meta,
272
            **kwargs,
273
        )
274

275
    @property
1✔
276
    def meta(self) -> Optional[IPersistentMap]:
1✔
277
        return self._meta
1✔
278

279
    def with_meta(self, meta: Optional[IPersistentMap]) -> "PersistentMap":
1✔
280
        return PersistentMap(self._inner, meta=meta)
1✔
281

282
    def assoc(self, *kvs):
1✔
283
        with self._inner.mutate() as m:
1✔
284
            for k, v in partition(kvs, 2):
1✔
285
                m[k] = v
1✔
286
            return PersistentMap(m.finish(), meta=self._meta)
1✔
287

288
    def contains(self, k):
1✔
289
        return k in self._inner
1✔
290

291
    def dissoc(self, *ks):
1✔
292
        with self._inner.mutate() as m:
1✔
293
            for k in ks:
1✔
294
                try:
1✔
295
                    del m[k]
1✔
296
                except KeyError:
1✔
297
                    pass
1✔
298
            return PersistentMap(m.finish(), meta=self._meta)
1✔
299

300
    def entry(self, k):
1✔
301
        v = self._inner.get(k, cast("V", _ENTRY_SENTINEL))
1✔
302
        if v is _ENTRY_SENTINEL:
1✔
303
            return None
1✔
304
        return MapEntry.of(k, v)
1✔
305

306
    def val_at(self, k, default=None):
1✔
307
        return self._inner.get(k, default)
1✔
308

309
    def update(self, *maps: Mapping[K, V]) -> "PersistentMap":
1✔
310
        m: _Map = self._inner.update(*(m.items() for m in maps))
1✔
311
        return PersistentMap(m, meta=self._meta)
1✔
312

313
    def update_with(  # type: ignore[return]
1✔
314
        self, merge_fn: Callable[[V, V], V], *maps: Mapping[K, V]
315
    ) -> "PersistentMap[K, V]":
316
        with self._inner.mutate() as m:
×
317
            for map in maps:
×
318
                for k, v in map.items():
×
319
                    m.set(k, merge_fn(m[k], v) if k in m else v)
×
320
            return PersistentMap(m.finish(), meta=self._meta)
×
321

322
    def cons(  # type: ignore[override, return]
1✔
323
        self,
324
        *elems: Union[
325
            IPersistentMap[K, V],
326
            IMapEntry[K, V],
327
            IPersistentVector[Union[K, V]],
328
            Mapping[K, V],
329
        ],
330
    ) -> "PersistentMap[K, V]":
331
        with self._inner.mutate() as m:
1✔
332
            try:
1✔
333
                for elem in elems:
1✔
334
                    if isinstance(elem, (IPersistentMap, Mapping)):
1✔
335
                        for k, v in elem.items():
1✔
336
                            m.set(k, v)
1✔
337
                    elif isinstance(elem, IMapEntry):
1✔
338
                        m.set(elem.key, elem.value)
1✔
339
                    elif elem is None:
1✔
340
                        continue
1✔
341
                    else:
342
                        entry: IMapEntry[K, V] = MapEntry.from_vec(elem)
1✔
343
                        m.set(entry.key, entry.value)
1✔
344
            except (TypeError, ValueError) as e:
1✔
345
                raise ValueError(
1✔
346
                    "Argument to map conj must be another Map or castable to MapEntry"
347
                ) from e
348
            else:
349
                return PersistentMap(m.finish(), meta=self.meta)
1✔
350

351
    def empty(self) -> "PersistentMap":
1✔
352
        return EMPTY.with_meta(self._meta)
1✔
353

354
    def seq(self) -> Optional[ISeq[IMapEntry[K, V]]]:
1✔
355
        if len(self._inner) == 0:
1✔
356
            return None
1✔
357
        return sequence(MapEntry.of(k, v) for k, v in self._inner.items())
1✔
358

359
    def to_transient(self) -> TransientMap[K, V]:
1✔
360
        return TransientMap(self._inner.mutate())
1✔
361

362
    def reduce_kv(self, f: ReduceKVFunction, init: T_reduce) -> T_reduce:
1✔
363
        for k, v in self._inner.items():
1✔
364
            init = f(init, k, v)
1✔
365
            if isinstance(init, Reduced):
1✔
366
                return init.deref()
1✔
367
        return init
1✔
368

369

370
EMPTY: PersistentMap = PersistentMap.from_coll(())
1✔
371

372

373
def map(  # pylint:disable=redefined-builtin
1✔
374
    kvs: Mapping[K, V], meta: Optional[IPersistentMap] = None
375
) -> PersistentMap[K, V]:
376
    """Creates a new map."""
377
    # For some reason, creating a new `immutables.Map` instance from an existing
378
    # `basilisp.lang.map.PersistentMap` instance causes issues because the `__iter__`
379
    # returns only the keys rather than tuple of key/value pairs, even though it
380
    # adheres to the `Mapping` protocol. Passing the `.items()` directly bypasses
381
    # this problem.
382
    return PersistentMap.from_coll(kvs.items(), meta=meta)
1✔
383

384

385
def m(**kvs) -> PersistentMap[str, V]:
1✔
386
    """Creates a new map from keyword arguments."""
387
    return PersistentMap.from_coll(kvs)
1✔
388

389

390
def from_entries(entries: Iterable[MapEntry[K, V]]) -> PersistentMap[K, V]:  # type: ignore[return]
1✔
391
    with _Map().mutate() as m:  # type: ignore[var-annotated]
1✔
392
        for entry in entries:
1✔
393
            m.set(entry.key, entry.value)
1✔
394
        return PersistentMap(m.finish())
1✔
395

396

397
def hash_map(*pairs) -> PersistentMap:
1✔
398
    entries = pymap(lambda v: MapEntry.of(v[0], v[1]), partition(pairs, 2))
1✔
399
    return from_entries(entries)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc