• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basilisp-lang / basilisp / 12281612501

11 Dec 2024 05:24PM UTC coverage: 98.677% (-0.06%) from 98.738%
12281612501

Pull #1172

github

web-flow
Merge a45766c9a into 2dffcc004
Pull Request #1172: Derive module (test) names from `ns` declarations

1026 of 1035 branches covered (99.13%)

Branch coverage included in aggregate %.

15 of 15 new or added lines in 2 files covered. (100.0%)

5 existing lines in 1 file now uncovered.

8820 of 8943 relevant lines covered (98.62%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.15
/src/basilisp/contrib/pytest/testrunner.py
1
import importlib.util
1✔
2
import inspect
1✔
3
import os
1✔
4
import sys
1✔
5
import traceback
1✔
6
from collections.abc import Iterable, Iterator
1✔
7
from pathlib import Path
1✔
8
from types import GeneratorType
1✔
9
from typing import Callable, Optional
1✔
10

11
import pytest
1✔
12

13
from basilisp import main as basilisp
1✔
14
from basilisp.importer import read_namespace_name
1✔
15
from basilisp.lang import keyword as kw
1✔
16
from basilisp.lang import map as lmap
1✔
17
from basilisp.lang import runtime as runtime
1✔
18
from basilisp.lang import symbol as sym
1✔
19
from basilisp.lang import vector as vec
1✔
20
from basilisp.lang.obj import lrepr
1✔
21
from basilisp.lang.util import munge
1✔
22
from basilisp.util import Maybe
1✔
23

24
_EACH_FIXTURES_META_KW = kw.keyword("each-fixtures", "basilisp.test")
1✔
25
_ONCE_FIXTURES_NUM_META_KW = kw.keyword("once-fixtures", "basilisp.test")
1✔
26
_TEST_META_KW = kw.keyword("test", "basilisp.test")
1✔
27

28
CORE_NS = "basilisp.core"
1✔
29
CORE_NS_SYM = sym.symbol(CORE_NS)
1✔
30
OUT_VAR_NAME = "*out*"
1✔
31
OUT_VAR_SYM = sym.symbol(OUT_VAR_NAME, ns=CORE_NS)
1✔
32
ERR_VAR_NAME = "*err*"
1✔
33
ERR_VAR_SYM = sym.symbol(ERR_VAR_NAME, ns=CORE_NS)
1✔
34

35

36
def pytest_configure(config):
1✔
37

38
    # https://github.com/pytest-dev/pytest/issues/12876
39
    #
40
    # Basilisp's standard output streams may be initialized before
41
    # pytest captures sys streams (sys.stdout and sys.stderr) for
42
    # testing (e.g., with `basilisp test`). Writing to the original
43
    # handles during tests on Windows can cause invalid handle
44
    # errors. To prevent this, we rebind them to pytest's streams
45
    # during tests and restore them afterward.
46
    out_var = runtime.Var.find(OUT_VAR_SYM)
1✔
47
    err_var = runtime.Var.find(ERR_VAR_SYM)
1✔
48
    if bindings := {
1✔
49
        k: v for k, v in {out_var: sys.stdout, err_var: sys.stderr}.items() if k
50
    }:
51
        runtime.push_thread_bindings(lmap.map(bindings))
1✔
52
        config.basilisp_bindings = bindings
1✔
53

54
    basilisp.bootstrap("basilisp.test")
1✔
55

56

57
def pytest_unconfigure(config):
1✔
58
    if hasattr(config, "basilisp_bindings"):
1✔
59
        runtime.pop_thread_bindings()
1✔
60

61

62
def pytest_collect_file(file_path: Path, parent):
1✔
63
    """Primary PyTest hook to identify Basilisp test files."""
64
    if file_path.suffix == ".lpy":
1✔
65
        if file_path.name.startswith("test_") or file_path.stem.endswith("_test"):
1✔
66
            return BasilispFile.from_parent(parent, path=file_path)
1✔
67
    return None
1✔
68

69

70
class TestFailuresInfo(Exception):
1✔
71
    __slots__ = ("_msg", "_data")
1✔
72

73
    def __init__(self, message: str, data: lmap.PersistentMap) -> None:
1✔
74
        super().__init__()
1✔
75
        self._msg = message
1✔
76
        self._data = data
1✔
77

78
    def __repr__(self):
79
        return (
80
            "basilisp.contrib.pytest.testrunner.TestFailuresInfo"
81
            f"({self._msg}, {lrepr(self._data)})"
82
        )
83

84
    def __str__(self):
1✔
85
        return f"{self._msg} {lrepr(self._data)}"
×
86

87
    @property
1✔
88
    def data(self) -> lmap.PersistentMap:
1✔
89
        return self._data
1✔
90

91
    @property
1✔
92
    def message(self) -> str:
1✔
93
        return self._msg
×
94

95

96
TestFunction = Callable[[], lmap.PersistentMap]
1✔
97
FixtureTeardown = Iterator[None]
1✔
98
FixtureFunction = Callable[[], Optional[FixtureTeardown]]
1✔
99

100

101
class FixtureManager:
1✔
102
    """FixtureManager instances manage `basilisp.test` style fixtures on behalf of a
103
    BasilispFile or BasilispTestItem node."""
104

105
    __slots__ = ("_fixtures", "_teardowns")
1✔
106

107
    def __init__(self, fixtures: Iterable[FixtureFunction]):
1✔
108
        self._fixtures: Iterable[FixtureFunction] = fixtures
1✔
109
        self._teardowns: Iterable[FixtureTeardown] = ()
1✔
110

111
    @staticmethod
1✔
112
    def _run_fixture(fixture: FixtureFunction) -> Optional[Iterator[None]]:
1✔
113
        """Run a fixture function. If the fixture is a generator function, return the
114
        generator/coroutine. Otherwise, simply return the value from the function, if
115
        one."""
116
        if inspect.isgeneratorfunction(fixture):
1✔
117
            coro = fixture()
1✔
118
            assert isinstance(coro, GeneratorType)
1✔
119
            next(coro)
1✔
120
            return coro
1✔
121
        else:
122
            fixture()
1✔
123
            return None
1✔
124

125
    @classmethod
1✔
126
    def _setup_fixtures(
1✔
127
        cls, fixtures: Iterable[FixtureFunction]
128
    ) -> Iterable[FixtureTeardown]:
129
        """Set up fixtures by running them as by `_run_fixture`. Collect any fixtures
130
        teardown steps (e.g. suspended coroutines) and return those so they can be
131
        resumed later for any cleanup."""
132
        teardown_fixtures = []
1✔
133
        try:
1✔
134
            for fixture in fixtures:
1✔
135
                teardown = cls._run_fixture(fixture)
1✔
136
                if teardown is not None:
1✔
137
                    teardown_fixtures.append(teardown)
1✔
138
        except Exception as e:
1✔
139
            raise runtime.RuntimeException(
1✔
140
                "Exception occurred during fixture setup"
141
            ) from e
142
        else:
143
            return teardown_fixtures
1✔
144

145
    @staticmethod
1✔
146
    def _teardown_fixtures(teardowns: Iterable[FixtureTeardown]) -> None:
1✔
147
        """Perform teardown steps returned from a fixture function."""
148
        for teardown in teardowns:
1✔
149
            try:
1✔
150
                next(teardown)
1✔
151
            except StopIteration:
1✔
152
                pass
1✔
153
            except Exception as e:
1✔
154
                raise runtime.RuntimeException(
1✔
155
                    "Exception occurred during fixture teardown"
156
                ) from e
157

158
    def setup(self) -> None:
1✔
159
        """Setup fixtures and store any teardowns for cleanup later.
160

161
        Should have a corresponding call to `FixtureManager.teardown` to clean up
162
        fixtures."""
163
        self._teardowns = self._setup_fixtures(self._fixtures)
1✔
164

165
    def teardown(self) -> None:
1✔
166
        """Teardown fixtures from a previous call to `setup`."""
167
        self._teardown_fixtures(self._teardowns)
1✔
168
        self._teardowns = ()
1✔
169

170

171
def _is_package(path: Path) -> bool:
1✔
172
    """Return `True` if the given path refers to a Python or Basilisp package."""
UNCOV
173
    _, _, files = next(os.walk(path))
×
UNCOV
174
    for file in files:
×
UNCOV
175
        if file in {"__init__.lpy", "__init__.py"} or file.endswith(".lpy"):
×
UNCOV
176
            return True
×
UNCOV
177
    return False
×
178

179

180
class BasilispFile(pytest.File):
1✔
181
    """Files represent a test module in Python or a test namespace in Basilisp."""
182

183
    def __init__(self, **kwargs) -> None:
1✔
184
        super().__init__(**kwargs)
1✔
185
        self._fixture_manager: Optional[FixtureManager] = None
1✔
186

187
    @staticmethod
1✔
188
    def _collected_fixtures(
1✔
189
        ns: runtime.Namespace,
190
    ) -> tuple[Iterable[FixtureFunction], Iterable[FixtureFunction]]:
191
        """Collect all of the declared fixtures of the namespace."""
192
        if ns.meta is not None:
1✔
193
            return (
1✔
194
                ns.meta.val_at(_ONCE_FIXTURES_NUM_META_KW) or (),
195
                ns.meta.val_at(_EACH_FIXTURES_META_KW) or (),
196
            )
197
        return (), ()
×
198

199
    @staticmethod
1✔
200
    def _collected_tests(ns: runtime.Namespace) -> Iterable[runtime.Var]:
1✔
201
        """Return the sorted sequence of collected tests from the Namespace `ns`.
202

203
        Tests defined by `deftest` are annotated with `:basilisp.test/test` metadata.
204
        Tests are sorted by their line number, which matches the default behavior of
205
        PyTest."""
206

207
        def _test_num(var: runtime.Var) -> int:
1✔
208
            assert var.meta is not None
1✔
209
            order = var.meta.val_at(_LINE_KW)
1✔
210
            assert isinstance(order, int)
1✔
211
            return order
1✔
212

213
        return sorted(
1✔
214
            (
215
                var
216
                for _, var in ns.interns.items()
217
                if var.meta is not None and var.meta.val_at(_TEST_META_KW)
218
            ),
219
            key=_test_num,
220
        )
221

222
    def setup(self) -> None:
1✔
223
        assert self._fixture_manager is not None
1✔
224
        self._fixture_manager.setup()
1✔
225

226
    def teardown(self) -> None:
1✔
227
        assert self._fixture_manager is not None
1✔
228
        self._fixture_manager.teardown()
1✔
229

230
    def _import_module(self) -> runtime.BasilispModule:
1✔
231
        """Import the Basilisp module at `self.path` and return it.
232

233
        Raises ImportError if the Basilisp module does not declare a
234
        namespace name.
235

236
        """
237
        ns_name = read_namespace_name(self.path)
1✔
238
        if ns_name is not None:
1✔
239
            modname = munge(ns_name)
1✔
240
        else:
241
            raise ImportError(f"Can't find Basilisp namespace name in {self.path}")
1✔
242

243
        module = importlib.import_module(modname)
1✔
244
        assert isinstance(module, runtime.BasilispModule)
1✔
245
        return module
1✔
246

247
    def collect(self):
1✔
248
        """Collect all tests from the namespace (module) given.
249

250
        Basilisp's test runner imports the namespace which will (as a side effect)
251
        collect the test functions in a namespace (represented by `deftest` forms in
252
        Basilisp). BasilispFile.collect fetches those test functions and generates
253
        BasilispTestItems for PyTest to run the tests."""
254
        filename = self.path.name
1✔
255
        module = self._import_module()
1✔
256
        ns = module.__basilisp_namespace__
1✔
257
        once_fixtures, each_fixtures = self._collected_fixtures(ns)
1✔
258
        self._fixture_manager = FixtureManager(once_fixtures)
1✔
259
        for test in self._collected_tests(ns):
1✔
260
            f: TestFunction = test.value
1✔
261
            yield BasilispTestItem.from_parent(
1✔
262
                self,
263
                name=test.name.name,
264
                run_test=f,
265
                namespace=ns,
266
                filename=filename,
267
                fixture_manager=FixtureManager(each_fixtures),
268
            )
269

270

271
_ACTUAL_KW = kw.keyword("actual")
1✔
272
_ERROR_KW = kw.keyword("error")
1✔
273
_EXPECTED_KW = kw.keyword("expected")
1✔
274
_FAILURE_KW = kw.keyword("failure")
1✔
275
_FAILURES_KW = kw.keyword("failures")
1✔
276
_MESSAGE_KW = kw.keyword("message")
1✔
277
_LINE_KW = kw.keyword("line")
1✔
278
_EXPR_KW = kw.keyword("expr")
1✔
279
_TEST_SECTION_KW = kw.keyword("test-section")
1✔
280
_TYPE_KW = kw.keyword("type")
1✔
281

282

283
class BasilispTestItem(pytest.Item):
1✔
284
    """Test items correspond to a single `deftest` form in a Basilisp test.
285

286
    `deftest` forms run each `is` assertion and collect all failures in an atom,
287
    reporting their results as a vector of failures when each test concludes.
288

289
    The BasilispTestItem collects all the failures and returns a report to PyTest to
290
    show to the end-user."""
291

292
    def __init__(  # pylint: disable=too-many-arguments
1✔
293
        self,
294
        name: str,
295
        parent: BasilispFile,
296
        run_test: TestFunction,
297
        namespace: runtime.Namespace,
298
        filename: str,
299
        fixture_manager: FixtureManager,
300
    ) -> None:
301
        super().__init__(name, parent)
1✔
302
        self._run_test = run_test
1✔
303
        self._namespace = namespace
1✔
304
        self._filename = filename
1✔
305
        self._fixture_manager = fixture_manager
1✔
306

307
    @classmethod
1✔
308
    def from_parent(  # pylint: disable=arguments-differ,too-many-arguments
1✔
309
        cls,
310
        parent: "BasilispFile",
311
        name: str,
312
        run_test: TestFunction,
313
        namespace: runtime.Namespace,
314
        filename: str,
315
        fixture_manager: FixtureManager,
316
    ):
317
        """Create a new BasilispTestItem from the parent Node."""
318
        # https://github.com/pytest-dev/pytest/pull/6680
319
        return super().from_parent(
1✔
320
            parent,
321
            name=name,
322
            run_test=run_test,
323
            namespace=namespace,
324
            filename=filename,
325
            fixture_manager=fixture_manager,
326
        )
327

328
    def setup(self) -> None:
1✔
329
        self._fixture_manager.setup()
1✔
330

331
    def teardown(self) -> None:
1✔
332
        self._fixture_manager.teardown()
1✔
333

334
    def runtest(self):
1✔
335
        """Run the tests associated with this test item.
336

337
        If any tests fail, raise an ExceptionInfo exception with the test failures.
338
        PyTest will invoke self.repr_failure to display the failures to the user."""
339
        results: lmap.PersistentMap = self._run_test()
1✔
340
        failures: Optional[vec.PersistentVector] = results.val_at(_FAILURES_KW)
1✔
341
        if runtime.to_seq(failures):
1✔
342
            raise TestFailuresInfo("Test failures", lmap.map(results))
1✔
343

344
    def repr_failure(self, excinfo, style=None):
1✔
345
        """Representation function called when self.runtest() raises an exception."""
346
        if isinstance(excinfo.value, TestFailuresInfo):
1✔
347
            exc = excinfo.value
1✔
348
            failures = exc.data.val_at(_FAILURES_KW)
1✔
349
            messages = []
1✔
350

351
            for details in failures:
1✔
352
                type_ = details.val_at(_TYPE_KW)
1✔
353
                if type_ == _FAILURE_KW:
1✔
354
                    messages.append(self._failure_msg(details))
1✔
355
                elif type_ == _ERROR_KW:
1✔
356
                    exc = details.val_at(_ACTUAL_KW)
1✔
357
                    line = details.val_at(_LINE_KW)
1✔
358
                    messages.append(self._error_msg(exc, line=line))
1✔
359
                else:  # pragma: no cover
360
                    assert False, "Test failure type must be in #{:error :failure}"
361

362
            return "\n\n".join(messages)
1✔
363
        elif isinstance(excinfo.value, Exception):
1✔
364
            return self._error_msg(excinfo.value)
1✔
365
        else:
366
            return None
×
367

368
    def reportinfo(self):
1✔
369
        return self.fspath, 0, self.name
1✔
370

371
    def _error_msg(self, exc: Exception, line: Optional[int] = None) -> str:
1✔
372
        line_msg = Maybe(line).map(lambda l: f":{l}").or_else_get("")
1✔
373
        messages = [f"ERROR in ({self.name}) ({self._filename}{line_msg})", "\n\n"]
1✔
374
        messages.extend(traceback.format_exception(Exception, exc, exc.__traceback__))
1✔
375
        return "".join(messages)
1✔
376

377
    def _failure_msg(self, details: lmap.PersistentMap) -> str:
1✔
378
        assert details.val_at(_TYPE_KW) == _FAILURE_KW
1✔
379
        msg: str = details.val_at(_MESSAGE_KW)
1✔
380

381
        actual = details.val_at(_ACTUAL_KW)
1✔
382
        expected = details.val_at(_EXPECTED_KW)
1✔
383

384
        test_section = details.val_at(_TEST_SECTION_KW)
1✔
385
        line = details.val_at(_LINE_KW)
1✔
386
        line_msg = Maybe(line).map(lambda l: f":{l}").or_else_get("")
1✔
387
        section_msg = Maybe(test_section).map(lambda s: f" {s} :: ").or_else_get("")
1✔
388

389
        return "\n".join(
1✔
390
            [
391
                f"FAIL in ({self.name}) ({self._filename}{line_msg})",
392
                f"    {section_msg}{msg}",
393
                "",
394
                f"    expected: {lrepr(expected)}",
395
                f"      actual: {lrepr(actual)}",
396
            ]
397
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc