• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basilisp-lang / basilisp / 13100583921

02 Feb 2025 04:35PM CUT coverage: 98.651% (-0.03%) from 98.68%
13100583921

Pull #1205

github

web-flow
Merge 96813dcc3 into 2e37ca36f
Pull Request #1205: fix testrunner to handle relative entries in sys.path

1032 of 1041 branches covered (99.14%)

Branch coverage included in aggregate %.

5 of 5 new or added lines in 1 file covered. (100.0%)

3 existing lines in 1 file now uncovered.

8837 of 8963 relevant lines covered (98.59%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.93
/src/basilisp/contrib/pytest/testrunner.py
1
import importlib.util
1✔
2
import inspect
1✔
3
import os
1✔
4
import sys
1✔
5
import traceback
1✔
6
from collections.abc import Iterable, Iterator
1✔
7
from pathlib import Path
1✔
8
from types import GeneratorType
1✔
9
from typing import Callable, Optional
1✔
10

11
import pytest
1✔
12

13
from basilisp import main as basilisp
1✔
14
from basilisp.lang import keyword as kw
1✔
15
from basilisp.lang import map as lmap
1✔
16
from basilisp.lang import runtime as runtime
1✔
17
from basilisp.lang import symbol as sym
1✔
18
from basilisp.lang import vector as vec
1✔
19
from basilisp.lang.obj import lrepr
1✔
20
from basilisp.lang.util import munge
1✔
21
from basilisp.util import Maybe
1✔
22

23
_EACH_FIXTURES_META_KW = kw.keyword("each-fixtures", "basilisp.test")
1✔
24
_ONCE_FIXTURES_NUM_META_KW = kw.keyword("once-fixtures", "basilisp.test")
1✔
25
_TEST_META_KW = kw.keyword("test", "basilisp.test")
1✔
26

27
CORE_NS = "basilisp.core"
1✔
28
CORE_NS_SYM = sym.symbol(CORE_NS)
1✔
29
OUT_VAR_NAME = "*out*"
1✔
30
OUT_VAR_SYM = sym.symbol(OUT_VAR_NAME, ns=CORE_NS)
1✔
31
ERR_VAR_NAME = "*err*"
1✔
32
ERR_VAR_SYM = sym.symbol(ERR_VAR_NAME, ns=CORE_NS)
1✔
33

34

35
def pytest_configure(config):
1✔
36

37
    # https://github.com/pytest-dev/pytest/issues/12876
38
    #
39
    # Basilisp's standard output streams may be initialized before
40
    # pytest captures sys streams (sys.stdout and sys.stderr) for
41
    # testing (e.g., with `basilisp test`). Writing to the original
42
    # handles during tests on Windows can cause invalid handle
43
    # errors. To prevent this, we rebind them to pytest's streams
44
    # during tests and restore them afterward.
45
    out_var = runtime.Var.find(OUT_VAR_SYM)
1✔
46
    err_var = runtime.Var.find(ERR_VAR_SYM)
1✔
47
    if bindings := {
1✔
48
        k: v for k, v in {out_var: sys.stdout, err_var: sys.stderr}.items() if k
49
    }:
50
        runtime.push_thread_bindings(lmap.map(bindings))
1✔
51
        config.basilisp_bindings = bindings
1✔
52

53
    basilisp.bootstrap("basilisp.test")
1✔
54

55

56
def pytest_unconfigure(config):
1✔
57
    if hasattr(config, "basilisp_bindings"):
1✔
58
        runtime.pop_thread_bindings()
1✔
59

60

61
def pytest_collect_file(file_path: Path, parent):
1✔
62
    """Primary PyTest hook to identify Basilisp test files."""
63
    if file_path.suffix == ".lpy":
1✔
64
        if file_path.name.startswith("test_") or file_path.stem.endswith("_test"):
1✔
65
            return BasilispFile.from_parent(parent, path=file_path)
1✔
66
    return None
1✔
67

68

69
class TestFailuresInfo(Exception):
1✔
70
    __slots__ = ("_msg", "_data")
1✔
71

72
    def __init__(self, message: str, data: lmap.PersistentMap) -> None:
1✔
73
        super().__init__()
1✔
74
        self._msg = message
1✔
75
        self._data = data
1✔
76

77
    def __repr__(self):
78
        return (
79
            "basilisp.contrib.pytest.testrunner.TestFailuresInfo"
80
            f"({self._msg}, {lrepr(self._data)})"
81
        )
82

83
    def __str__(self):
1✔
84
        return f"{self._msg} {lrepr(self._data)}"
×
85

86
    @property
1✔
87
    def data(self) -> lmap.PersistentMap:
1✔
88
        return self._data
1✔
89

90
    @property
1✔
91
    def message(self) -> str:
1✔
92
        return self._msg
×
93

94

95
TestFunction = Callable[[], lmap.PersistentMap]
1✔
96
FixtureTeardown = Iterator[None]
1✔
97
FixtureFunction = Callable[[], Optional[FixtureTeardown]]
1✔
98

99

100
class FixtureManager:
1✔
101
    """FixtureManager instances manage `basilisp.test` style fixtures on behalf of a
102
    BasilispFile or BasilispTestItem node."""
103

104
    __slots__ = ("_fixtures", "_teardowns")
1✔
105

106
    def __init__(self, fixtures: Iterable[FixtureFunction]):
1✔
107
        self._fixtures: Iterable[FixtureFunction] = fixtures
1✔
108
        self._teardowns: Iterable[FixtureTeardown] = ()
1✔
109

110
    @staticmethod
1✔
111
    def _run_fixture(fixture: FixtureFunction) -> Optional[Iterator[None]]:
1✔
112
        """Run a fixture function. If the fixture is a generator function, return the
113
        generator/coroutine. Otherwise, simply return the value from the function, if
114
        one."""
115
        if inspect.isgeneratorfunction(fixture):
1✔
116
            coro = fixture()
1✔
117
            assert isinstance(coro, GeneratorType)
1✔
118
            next(coro)
1✔
119
            return coro
1✔
120
        else:
121
            fixture()
1✔
122
            return None
1✔
123

124
    @classmethod
1✔
125
    def _setup_fixtures(
1✔
126
        cls, fixtures: Iterable[FixtureFunction]
127
    ) -> Iterable[FixtureTeardown]:
128
        """Set up fixtures by running them as by `_run_fixture`. Collect any fixtures
129
        teardown steps (e.g. suspended coroutines) and return those so they can be
130
        resumed later for any cleanup."""
131
        teardown_fixtures = []
1✔
132
        try:
1✔
133
            for fixture in fixtures:
1✔
134
                teardown = cls._run_fixture(fixture)
1✔
135
                if teardown is not None:
1✔
136
                    teardown_fixtures.append(teardown)
1✔
137
        except Exception as e:
1✔
138
            raise runtime.RuntimeException(
1✔
139
                "Exception occurred during fixture setup"
140
            ) from e
141
        else:
142
            return teardown_fixtures
1✔
143

144
    @staticmethod
1✔
145
    def _teardown_fixtures(teardowns: Iterable[FixtureTeardown]) -> None:
1✔
146
        """Perform teardown steps returned from a fixture function."""
147
        for teardown in teardowns:
1✔
148
            try:
1✔
149
                next(teardown)
1✔
150
            except StopIteration:
1✔
151
                pass
1✔
152
            except Exception as e:
1✔
153
                raise runtime.RuntimeException(
1✔
154
                    "Exception occurred during fixture teardown"
155
                ) from e
156

157
    def setup(self) -> None:
1✔
158
        """Setup fixtures and store any teardowns for cleanup later.
159

160
        Should have a corresponding call to `FixtureManager.teardown` to clean up
161
        fixtures."""
162
        self._teardowns = self._setup_fixtures(self._fixtures)
1✔
163

164
    def teardown(self) -> None:
1✔
165
        """Teardown fixtures from a previous call to `setup`."""
166
        self._teardown_fixtures(self._teardowns)
1✔
167
        self._teardowns = ()
1✔
168

169

170
def _is_package(path: Path) -> bool:
1✔
171
    """Return `True` if the given path refers to a Python or Basilisp package."""
172
    _, _, files = next(os.walk(path))
×
173
    for file in files:
×
174
        if file in {"__init__.lpy", "__init__.py"} or file.endswith(".lpy"):
×
175
            return True
×
176
    return False
×
177

178

179
def _get_fully_qualified_module_names(file: Path) -> list[str]:
1✔
180
    """Return the fully qualified module name (from the import root) for a module given
181
    its location.
182

183
    This works by traversing up the filesystem looking for the top-most package. From
184
    there, we derive a Python module name referring to the given module path."""
185
    paths = []
1✔
186
    for pth in sys.path:
1✔
187
        root = Path(pth).resolve()
1✔
188
        if file.is_relative_to(root):
1✔
189
            elems = list(file.with_suffix("").relative_to(root).parts)
1✔
190

191
            if elems[-1] == "__init__":
1✔
192
                elems.pop()
×
193
            paths.append(".".join(elems))
1✔
194
    return paths
1✔
195

196

197
class BasilispFile(pytest.File):
1✔
198
    """Files represent a test module in Python or a test namespace in Basilisp."""
199

200
    def __init__(self, **kwargs) -> None:
1✔
201
        super().__init__(**kwargs)
1✔
202
        self._fixture_manager: Optional[FixtureManager] = None
1✔
203

204
    @staticmethod
1✔
205
    def _collected_fixtures(
1✔
206
        ns: runtime.Namespace,
207
    ) -> tuple[Iterable[FixtureFunction], Iterable[FixtureFunction]]:
208
        """Collect all of the declared fixtures of the namespace."""
209
        if ns.meta is not None:
1✔
210
            return (
1✔
211
                ns.meta.val_at(_ONCE_FIXTURES_NUM_META_KW) or (),
212
                ns.meta.val_at(_EACH_FIXTURES_META_KW) or (),
213
            )
214
        return (), ()
×
215

216
    @staticmethod
1✔
217
    def _collected_tests(ns: runtime.Namespace) -> Iterable[runtime.Var]:
1✔
218
        """Return the sorted sequence of collected tests from the Namespace `ns`.
219

220
        Tests defined by `deftest` are annotated with `:basilisp.test/test` metadata.
221
        Tests are sorted by their line number, which matches the default behavior of
222
        PyTest."""
223

224
        def _test_num(var: runtime.Var) -> int:
1✔
225
            assert var.meta is not None
1✔
226
            order = var.meta.val_at(_LINE_KW)
1✔
227
            assert isinstance(order, int)
1✔
228
            return order
1✔
229

230
        return sorted(
1✔
231
            (
232
                var
233
                for _, var in ns.interns.items()
234
                if var.meta is not None and var.meta.val_at(_TEST_META_KW)
235
            ),
236
            key=_test_num,
237
        )
238

239
    def setup(self) -> None:
1✔
240
        assert self._fixture_manager is not None
1✔
241
        self._fixture_manager.setup()
1✔
242

243
    def teardown(self) -> None:
1✔
244
        assert self._fixture_manager is not None
1✔
245
        self._fixture_manager.teardown()
1✔
246

247
    def _import_module(self) -> runtime.BasilispModule:
1✔
248
        modnames = _get_fully_qualified_module_names(self.path)
1✔
249
        assert modnames, "Must have at least one module name"
1✔
250

251
        exc: Optional[ModuleNotFoundError] = None
1✔
252
        for modname in modnames:
1✔
253
            try:
1✔
254
                module = importlib.import_module(modname)
1✔
255
            except ModuleNotFoundError as e:
1✔
UNCOV
256
                exc = e
×
257
            else:
258
                assert isinstance(module, runtime.BasilispModule)
1✔
259
                return module
1✔
260

UNCOV
261
        assert exc is not None, "Must have an exception or module"
×
UNCOV
262
        raise exc
×
263

264
    def collect(self):
1✔
265
        """Collect all tests from the namespace (module) given.
266

267
        Basilisp's test runner imports the namespace which will (as a side effect)
268
        collect the test functions in a namespace (represented by `deftest` forms in
269
        Basilisp). BasilispFile.collect fetches those test functions and generates
270
        BasilispTestItems for PyTest to run the tests."""
271
        filename = self.path.name
1✔
272
        module = self._import_module()
1✔
273
        ns = module.__basilisp_namespace__
1✔
274

275
        # Ensure the test module was loaded because it was directly
276
        # relative to an entry in `sys.path`.
277
        if module.__name__ != munge(str(ns)):
1✔
278
            raise ModuleNotFoundError(f"Module named '{ns}' is not in sys.path")
1✔
279

280
        once_fixtures, each_fixtures = self._collected_fixtures(ns)
1✔
281
        self._fixture_manager = FixtureManager(once_fixtures)
1✔
282
        for test in self._collected_tests(ns):
1✔
283
            f: TestFunction = test.value
1✔
284
            yield BasilispTestItem.from_parent(
1✔
285
                self,
286
                name=test.name.name,
287
                run_test=f,
288
                namespace=ns,
289
                filename=filename,
290
                fixture_manager=FixtureManager(each_fixtures),
291
            )
292

293

294
_ACTUAL_KW = kw.keyword("actual")
1✔
295
_ERROR_KW = kw.keyword("error")
1✔
296
_EXPECTED_KW = kw.keyword("expected")
1✔
297
_FAILURE_KW = kw.keyword("failure")
1✔
298
_FAILURES_KW = kw.keyword("failures")
1✔
299
_MESSAGE_KW = kw.keyword("message")
1✔
300
_LINE_KW = kw.keyword("line")
1✔
301
_EXPR_KW = kw.keyword("expr")
1✔
302
_TEST_SECTION_KW = kw.keyword("test-section")
1✔
303
_TYPE_KW = kw.keyword("type")
1✔
304

305

306
class BasilispTestItem(pytest.Item):
1✔
307
    """Test items correspond to a single `deftest` form in a Basilisp test.
308

309
    `deftest` forms run each `is` assertion and collect all failures in an atom,
310
    reporting their results as a vector of failures when each test concludes.
311

312
    The BasilispTestItem collects all the failures and returns a report to PyTest to
313
    show to the end-user."""
314

315
    def __init__(  # pylint: disable=too-many-arguments
1✔
316
        self,
317
        name: str,
318
        parent: BasilispFile,
319
        run_test: TestFunction,
320
        namespace: runtime.Namespace,
321
        filename: str,
322
        fixture_manager: FixtureManager,
323
    ) -> None:
324
        super().__init__(name, parent)
1✔
325
        self._run_test = run_test
1✔
326
        self._namespace = namespace
1✔
327
        self._filename = filename
1✔
328
        self._fixture_manager = fixture_manager
1✔
329

330
    @classmethod
1✔
331
    def from_parent(  # pylint: disable=arguments-differ,too-many-arguments
1✔
332
        cls,
333
        parent: "BasilispFile",
334
        name: str,
335
        run_test: TestFunction,
336
        namespace: runtime.Namespace,
337
        filename: str,
338
        fixture_manager: FixtureManager,
339
    ):
340
        """Create a new BasilispTestItem from the parent Node."""
341
        # https://github.com/pytest-dev/pytest/pull/6680
342
        return super().from_parent(
1✔
343
            parent,
344
            name=name,
345
            run_test=run_test,
346
            namespace=namespace,
347
            filename=filename,
348
            fixture_manager=fixture_manager,
349
        )
350

351
    def setup(self) -> None:
1✔
352
        self._fixture_manager.setup()
1✔
353

354
    def teardown(self) -> None:
1✔
355
        self._fixture_manager.teardown()
1✔
356

357
    def runtest(self):
1✔
358
        """Run the tests associated with this test item.
359

360
        If any tests fail, raise an ExceptionInfo exception with the test failures.
361
        PyTest will invoke self.repr_failure to display the failures to the user."""
362
        results: lmap.PersistentMap = self._run_test()
1✔
363
        failures: Optional[vec.PersistentVector] = results.val_at(_FAILURES_KW)
1✔
364
        if runtime.to_seq(failures):
1✔
365
            raise TestFailuresInfo("Test failures", lmap.map(results))
1✔
366

367
    def repr_failure(self, excinfo, style=None):
1✔
368
        """Representation function called when self.runtest() raises an exception."""
369
        if isinstance(excinfo.value, TestFailuresInfo):
1✔
370
            exc = excinfo.value
1✔
371
            failures = exc.data.val_at(_FAILURES_KW)
1✔
372
            messages = []
1✔
373

374
            for details in failures:
1✔
375
                type_ = details.val_at(_TYPE_KW)
1✔
376
                if type_ == _FAILURE_KW:
1✔
377
                    messages.append(self._failure_msg(details))
1✔
378
                elif type_ == _ERROR_KW:
1✔
379
                    exc = details.val_at(_ACTUAL_KW)
1✔
380
                    line = details.val_at(_LINE_KW)
1✔
381
                    messages.append(self._error_msg(exc, line=line))
1✔
382
                else:  # pragma: no cover
383
                    assert False, "Test failure type must be in #{:error :failure}"
384

385
            return "\n\n".join(messages)
1✔
386
        elif isinstance(excinfo.value, Exception):
1✔
387
            return self._error_msg(excinfo.value)
1✔
388
        else:
389
            return None
×
390

391
    def reportinfo(self):
1✔
392
        return self.fspath, 0, self.name
1✔
393

394
    def _error_msg(self, exc: Exception, line: Optional[int] = None) -> str:
1✔
395
        line_msg = Maybe(line).map(lambda l: f":{l}").or_else_get("")
1✔
396
        messages = [f"ERROR in ({self.name}) ({self._filename}{line_msg})", "\n\n"]
1✔
397
        messages.extend(traceback.format_exception(Exception, exc, exc.__traceback__))
1✔
398
        return "".join(messages)
1✔
399

400
    def _failure_msg(self, details: lmap.PersistentMap) -> str:
1✔
401
        assert details.val_at(_TYPE_KW) == _FAILURE_KW
1✔
402
        msg: str = details.val_at(_MESSAGE_KW)
1✔
403

404
        actual = details.val_at(_ACTUAL_KW)
1✔
405
        expected = details.val_at(_EXPECTED_KW)
1✔
406

407
        test_section = details.val_at(_TEST_SECTION_KW)
1✔
408
        line = details.val_at(_LINE_KW)
1✔
409
        line_msg = Maybe(line).map(lambda l: f":{l}").or_else_get("")
1✔
410
        section_msg = Maybe(test_section).map(lambda s: f" {s} :: ").or_else_get("")
1✔
411

412
        return "\n".join(
1✔
413
            [
414
                f"FAIL in ({self.name}) ({self._filename}{line_msg})",
415
                f"    {section_msg}{msg}",
416
                "",
417
                f"    expected: {lrepr(expected)}",
418
                f"      actual: {lrepr(actual)}",
419
            ]
420
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc