• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basilisp-lang / basilisp / 7379247703

01 Jan 2024 06:37PM UTC coverage: 99.008%. Remained the same
7379247703

push

github

web-flow
callable var (#768)

Hi,

could you please review compatibility patch with Clojure to make vars
callable. It addresses #767.

I am not sure this if the `class Var` is the right place to make vars
callable or the analyzer, which should expand them to a callable var
value last node.

Nevertheless, I will kindly request your help with the type hinting,
currently it will fail the linter with the following error

```
src/basilisp/lang/runtime.py:279:15: E1102: self.value is not callable (not-callable)
```

but not sure how to fix it, I've tried the class Var `value` property
method to return a maybe Callable but it didn't work.

Thanks

Co-authored-by: ikappaki <ikappaki@users.noreply.github.com>

1691 of 1693 branches covered (0.0%)

Branch coverage included in aggregate %.

7892 of 7986 relevant lines covered (98.82%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.28
/src/basilisp/contrib/pytest/testrunner.py
1
import importlib.util
1✔
2
import inspect
1✔
3
import os
1✔
4
import traceback
1✔
5
from pathlib import Path
1✔
6
from types import GeneratorType
1✔
7
from typing import Callable, Iterable, Iterator, Optional, Tuple
1✔
8

9
import pytest
1✔
10

11
from basilisp import main as basilisp
1✔
12
from basilisp.lang import keyword as kw
1✔
13
from basilisp.lang import map as lmap
1✔
14
from basilisp.lang import runtime as runtime
1✔
15
from basilisp.lang import vector as vec
1✔
16
from basilisp.lang.obj import lrepr
1✔
17
from basilisp.util import Maybe
1✔
18

19
_EACH_FIXTURES_META_KW = kw.keyword("each-fixtures", "basilisp.test")
1✔
20
_ONCE_FIXTURES_NUM_META_KW = kw.keyword("once-fixtures", "basilisp.test")
1✔
21
_TEST_META_KW = kw.keyword("test", "basilisp.test")
1✔
22

23

24
# pylint: disable=unused-argument
25
def pytest_configure(config):
1✔
26
    basilisp.bootstrap("basilisp.test")
1✔
27

28

29
def pytest_collect_file(file_path: Path, path, parent):
1✔
30
    """Primary PyTest hook to identify Basilisp test files."""
31
    if file_path.suffix == ".lpy":
1✔
32
        if file_path.name.startswith("test_") or file_path.stem.endswith("_test"):
1✔
33
            return BasilispFile.from_parent(parent, fspath=path, path=file_path)
1✔
34
    return None
1✔
35

36

37
class TestFailuresInfo(Exception):
1✔
38
    __slots__ = ("_msg", "_data")
1✔
39

40
    def __init__(self, message: str, data: lmap.PersistentMap) -> None:
1✔
41
        super().__init__()
1✔
42
        self._msg = message
1✔
43
        self._data = data
1✔
44

45
    def __repr__(self):
46
        return (
47
            "basilisp.contrib.pytest.testrunner.TestFailuresInfo"
48
            f"({self._msg}, {lrepr(self._data)})"
49
        )
50

51
    def __str__(self):
1✔
52
        return f"{self._msg} {lrepr(self._data)}"
×
53

54
    @property
1✔
55
    def data(self) -> lmap.PersistentMap:
1✔
56
        return self._data
1✔
57

58
    @property
1✔
59
    def message(self) -> str:
1✔
60
        return self._msg
×
61

62

63
TestFunction = Callable[[], lmap.PersistentMap]
1✔
64
FixtureTeardown = Iterator[None]
1✔
65
FixtureFunction = Callable[[], Optional[FixtureTeardown]]
1✔
66

67

68
class FixtureManager:
1✔
69
    """FixtureManager instances manage `basilisp.test` style fixtures on behalf of a
70
    BasilispFile or BasilispTestItem node."""
71

72
    __slots__ = ("_fixtures", "_teardowns")
1✔
73

74
    def __init__(self, fixtures: Iterable[FixtureFunction]):
1✔
75
        self._fixtures: Iterable[FixtureFunction] = fixtures
1✔
76
        self._teardowns: Iterable[FixtureTeardown] = ()
1✔
77

78
    @staticmethod
1✔
79
    def _run_fixture(fixture: FixtureFunction) -> Optional[Iterator[None]]:
1✔
80
        """Run a fixture function. If the fixture is a generator function, return the
81
        generator/coroutine. Otherwise, simply return the value from the function, if
82
        one."""
83
        if inspect.isgeneratorfunction(fixture):
1✔
84
            coro = fixture()
1✔
85
            assert isinstance(coro, GeneratorType)
1✔
86
            next(coro)
1✔
87
            return coro
1✔
88
        else:
89
            fixture()
1✔
90
            return None
1✔
91

92
    @classmethod
1✔
93
    def _setup_fixtures(
1✔
94
        cls, fixtures: Iterable[FixtureFunction]
95
    ) -> Iterable[FixtureTeardown]:
96
        """Set up fixtures by running them as by `_run_fixture`. Collect any fixtures
97
        teardown steps (e.g. suspended coroutines) and return those so they can be
98
        resumed later for any cleanup."""
99
        teardown_fixtures = []
1✔
100
        try:
1✔
101
            for fixture in fixtures:
1✔
102
                teardown = cls._run_fixture(fixture)
1✔
103
                if teardown is not None:
1✔
104
                    teardown_fixtures.append(teardown)
1✔
105
        except Exception as e:
1✔
106
            raise runtime.RuntimeException(
1✔
107
                "Exception occurred during fixture setup"
108
            ) from e
109
        else:
110
            return teardown_fixtures
1✔
111

112
    @staticmethod
1✔
113
    def _teardown_fixtures(teardowns: Iterable[FixtureTeardown]) -> None:
1✔
114
        """Perform teardown steps returned from a fixture function."""
115
        for teardown in teardowns:
1✔
116
            try:
1✔
117
                next(teardown)
1✔
118
            except StopIteration:
1✔
119
                pass
1✔
120
            except Exception as e:
1✔
121
                raise runtime.RuntimeException(
1✔
122
                    "Exception occurred during fixture teardown"
123
                ) from e
124

125
    def setup(self) -> None:
1✔
126
        """Setup fixtures and store any teardowns for cleanup later.
127

128
        Should have a corresponding call to `FixtureManager.teardown` to clean up
129
        fixtures."""
130
        self._teardowns = self._setup_fixtures(self._fixtures)
1✔
131

132
    def teardown(self) -> None:
1✔
133
        """Teardown fixtures from a previous call to `setup`."""
134
        self._teardown_fixtures(self._teardowns)
1✔
135
        self._teardowns = ()
1✔
136

137

138
def _is_package(path: Path) -> bool:
1✔
139
    """Return `True` if the given path refers to a Python or Basilisp package."""
140
    _, _, files = next(os.walk(path))
1✔
141
    for file in files:
1✔
142
        if file in {"__init__.lpy", "__init__.py"} or file.endswith(".lpy"):
1✔
143
            return True
1✔
144
    return False
1✔
145

146

147
def _get_fully_qualified_module_name(file: Path) -> str:
1✔
148
    """Return the fully qualified module name (from the import root) for a module given
149
    its location.
150

151
    This works by traversing up the filesystem looking for the top-most package. From
152
    there, we derive a Python module name referring to the given module path."""
153
    top = None
1✔
154
    for p in file.parents:
1✔
155
        if _is_package(p):
1✔
156
            top = p
1✔
157
        else:
158
            break
1✔
159

160
    if top is None or top == file.parent:
1✔
161
        return file.stem
1✔
162

163
    root = top.parent
1✔
164
    elems = list(file.with_suffix("").relative_to(root).parts)
1✔
165
    if elems[-1] == "__init__":
1✔
166
        elems.pop()
×
167
    return ".".join(elems)
1✔
168

169

170
class BasilispFile(pytest.File):
1✔
171
    """Files represent a test module in Python or a test namespace in Basilisp."""
172

173
    def __init__(self, **kwargs) -> None:
1✔
174
        super().__init__(**kwargs)
1✔
175
        self._fixture_manager: Optional[FixtureManager] = None
1✔
176

177
    @staticmethod
1✔
178
    def _collected_fixtures(
1✔
179
        ns: runtime.Namespace,
180
    ) -> Tuple[Iterable[FixtureFunction], Iterable[FixtureFunction]]:
181
        """Collect all of the declared fixtures of the namespace."""
182
        if ns.meta is not None:
1✔
183
            return (
1✔
184
                ns.meta.val_at(_ONCE_FIXTURES_NUM_META_KW) or (),
185
                ns.meta.val_at(_EACH_FIXTURES_META_KW) or (),
186
            )
187
        return (), ()
1✔
188

189
    @staticmethod
1✔
190
    def _collected_tests(ns: runtime.Namespace) -> Iterable[runtime.Var]:
1✔
191
        """Return the sorted sequence of collected tests from the Namespace `ns`.
192

193
        Tests defined by `deftest` are annotated with `:basilisp.test/test` metadata.
194
        Tests are sorted by their line number, which matches the default behavior of
195
        PyTest."""
196

197
        def _test_num(var: runtime.Var) -> int:
1✔
198
            assert var.meta is not None
1✔
199
            order = var.meta.val_at(_LINE_KW)
1✔
200
            assert isinstance(order, int)
1✔
201
            return order
1✔
202

203
        return sorted(
1✔
204
            (
205
                var
206
                for _, var in ns.interns.items()
207
                if var.meta is not None and var.meta.val_at(_TEST_META_KW)
208
            ),
209
            key=_test_num,
210
        )
211

212
    def setup(self) -> None:
1✔
213
        assert self._fixture_manager is not None
1✔
214
        self._fixture_manager.setup()
1✔
215

216
    def teardown(self) -> None:
1✔
217
        assert self._fixture_manager is not None
1✔
218
        self._fixture_manager.teardown()
1✔
219

220
    def _import_module(self) -> runtime.BasilispModule:
1✔
221
        modname = _get_fully_qualified_module_name(self.path)
1✔
222
        module = importlib.import_module(modname)
1✔
223
        assert isinstance(module, runtime.BasilispModule)
1✔
224
        return module
1✔
225

226
    def collect(self):
1✔
227
        """Collect all tests from the namespace (module) given.
228

229
        Basilisp's test runner imports the namespace which will (as a side effect)
230
        collect the test functions in a namespace (represented by `deftest` forms in
231
        Basilisp). BasilispFile.collect fetches those test functions and generates
232
        BasilispTestItems for PyTest to run the tests."""
233
        filename = self.path.name
1✔
234
        module = self._import_module()
1✔
235
        ns = module.__basilisp_namespace__
1✔
236
        once_fixtures, each_fixtures = self._collected_fixtures(ns)
1✔
237
        self._fixture_manager = FixtureManager(once_fixtures)
1✔
238
        for test in self._collected_tests(ns):
1✔
239
            f: TestFunction = test.value
1✔
240
            yield BasilispTestItem.from_parent(
1✔
241
                self,
242
                name=test.name.name,
243
                run_test=f,
244
                namespace=ns,
245
                filename=filename,
246
                fixture_manager=FixtureManager(each_fixtures),
247
            )
248

249

250
_ACTUAL_KW = kw.keyword("actual")
1✔
251
_ERROR_KW = kw.keyword("error")
1✔
252
_EXPECTED_KW = kw.keyword("expected")
1✔
253
_FAILURE_KW = kw.keyword("failure")
1✔
254
_FAILURES_KW = kw.keyword("failures")
1✔
255
_MESSAGE_KW = kw.keyword("message")
1✔
256
_LINE_KW = kw.keyword("line")
1✔
257
_EXPR_KW = kw.keyword("expr")
1✔
258
_TEST_SECTION_KW = kw.keyword("test-section")
1✔
259
_TYPE_KW = kw.keyword("type")
1✔
260

261

262
class BasilispTestItem(pytest.Item):
1✔
263
    """Test items correspond to a single `deftest` form in a Basilisp test.
264

265
    `deftest` forms run each `is` assertion and collect all failures in an atom,
266
    reporting their results as a vector of failures when each test concludes.
267

268
    The BasilispTestItem collects all the failures and returns a report to PyTest to
269
    show to the end-user."""
270

271
    def __init__(  # pylint: disable=too-many-arguments
1✔
272
        self,
273
        name: str,
274
        parent: BasilispFile,
275
        run_test: TestFunction,
276
        namespace: runtime.Namespace,
277
        filename: str,
278
        fixture_manager: FixtureManager,
279
    ) -> None:
280
        super().__init__(name, parent)
1✔
281
        self._run_test = run_test
1✔
282
        self._namespace = namespace
1✔
283
        self._filename = filename
1✔
284
        self._fixture_manager = fixture_manager
1✔
285

286
    @classmethod
1✔
287
    def from_parent(  # pylint: disable=arguments-differ,too-many-arguments
1✔
288
        cls,
289
        parent: "BasilispFile",
290
        name: str,
291
        run_test: TestFunction,
292
        namespace: runtime.Namespace,
293
        filename: str,
294
        fixture_manager: FixtureManager,
295
    ):
296
        """Create a new BasilispTestItem from the parent Node."""
297
        # https://github.com/pytest-dev/pytest/pull/6680
298
        return super().from_parent(
1✔
299
            parent,
300
            name=name,
301
            run_test=run_test,
302
            namespace=namespace,
303
            filename=filename,
304
            fixture_manager=fixture_manager,
305
        )
306

307
    def setup(self) -> None:
1✔
308
        self._fixture_manager.setup()
1✔
309

310
    def teardown(self) -> None:
1✔
311
        self._fixture_manager.teardown()
1✔
312

313
    def runtest(self):
1✔
314
        """Run the tests associated with this test item.
315

316
        If any tests fail, raise an ExceptionInfo exception with the test failures.
317
        PyTest will invoke self.repr_failure to display the failures to the user."""
318
        results: lmap.PersistentMap = self._run_test()
1✔
319
        failures: Optional[vec.PersistentVector] = results.val_at(_FAILURES_KW)
1✔
320
        if runtime.to_seq(failures):
1✔
321
            raise TestFailuresInfo("Test failures", lmap.map(results))
1✔
322

323
    def repr_failure(self, excinfo, style=None):
1✔
324
        """Representation function called when self.runtest() raises an exception."""
325
        if isinstance(excinfo.value, TestFailuresInfo):
1✔
326
            exc = excinfo.value
1✔
327
            failures = exc.data.val_at(_FAILURES_KW)
1✔
328
            messages = []
1✔
329

330
            for details in failures:
1✔
331
                type_ = details.val_at(_TYPE_KW)
1✔
332
                if type_ == _FAILURE_KW:
1✔
333
                    messages.append(self._failure_msg(details))
1✔
334
                elif type_ == _ERROR_KW:
1✔
335
                    exc = details.val_at(_ACTUAL_KW)
1✔
336
                    line = details.val_at(_LINE_KW)
1✔
337
                    messages.append(self._error_msg(exc, line=line))
1✔
338
                else:  # pragma: no cover
339
                    assert False, "Test failure type must be in #{:error :failure}"
340

341
            return "\n\n".join(messages)
1✔
342
        elif isinstance(excinfo.value, Exception):
1✔
343
            return self._error_msg(excinfo.value)
1✔
344
        else:
345
            return None
×
346

347
    def reportinfo(self):
1✔
348
        return self.fspath, 0, self.name
1✔
349

350
    def _error_msg(self, exc: Exception, line: Optional[int] = None) -> str:
1✔
351
        line_msg = Maybe(line).map(lambda l: f":{l}").or_else_get("")
1✔
352
        messages = [f"ERROR in ({self.name}) ({self._filename}{line_msg})", "\n\n"]
1✔
353
        messages.extend(traceback.format_exception(Exception, exc, exc.__traceback__))
1✔
354
        return "".join(messages)
1✔
355

356
    def _failure_msg(self, details: lmap.PersistentMap) -> str:
1✔
357
        assert details.val_at(_TYPE_KW) == _FAILURE_KW
1✔
358
        msg: str = details.val_at(_MESSAGE_KW)
1✔
359

360
        actual = details.val_at(_ACTUAL_KW)
1✔
361
        expected = details.val_at(_EXPECTED_KW)
1✔
362

363
        test_section = details.val_at(_TEST_SECTION_KW)
1✔
364
        line = details.val_at(_LINE_KW)
1✔
365
        line_msg = Maybe(line).map(lambda l: f":{l}").or_else_get("")
1✔
366
        section_msg = Maybe(test_section).map(lambda s: f" {s} :: ").or_else_get("")
1✔
367

368
        return "\n".join(
1✔
369
            [
370
                f"FAIL in ({self.name}) ({self._filename}{line_msg})",
371
                f"    {section_msg}{msg}",
372
                "",
373
                f"    expected: {lrepr(expected)}",
374
                f"      actual: {lrepr(actual)}",
375
            ]
376
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc