• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basilisp-lang / basilisp / 12469122141

23 Dec 2024 03:26PM UTC coverage: 98.68%. Remained the same
12469122141

Pull #1191

github

web-flow
Merge 2becbd62e into 54b340a99
Pull Request #1191: Fix a bug with inherited stdout in `basilisp.process/exec`

1031 of 1040 branches covered (99.13%)

Branch coverage included in aggregate %.

8837 of 8960 relevant lines covered (98.63%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.06
/src/basilisp/contrib/pytest/testrunner.py
1
import importlib.util
1✔
2
import inspect
1✔
3
import os
1✔
4
import sys
1✔
5
import traceback
1✔
6
from collections.abc import Iterable, Iterator
1✔
7
from pathlib import Path
1✔
8
from types import GeneratorType
1✔
9
from typing import Callable, Optional
1✔
10

11
import pytest
1✔
12

13
from basilisp import main as basilisp
1✔
14
from basilisp.lang import keyword as kw
1✔
15
from basilisp.lang import map as lmap
1✔
16
from basilisp.lang import runtime as runtime
1✔
17
from basilisp.lang import symbol as sym
1✔
18
from basilisp.lang import vector as vec
1✔
19
from basilisp.lang.obj import lrepr
1✔
20
from basilisp.util import Maybe
1✔
21

22
_EACH_FIXTURES_META_KW = kw.keyword("each-fixtures", "basilisp.test")
1✔
23
_ONCE_FIXTURES_NUM_META_KW = kw.keyword("once-fixtures", "basilisp.test")
1✔
24
_TEST_META_KW = kw.keyword("test", "basilisp.test")
1✔
25

26
CORE_NS = "basilisp.core"
1✔
27
CORE_NS_SYM = sym.symbol(CORE_NS)
1✔
28
OUT_VAR_NAME = "*out*"
1✔
29
OUT_VAR_SYM = sym.symbol(OUT_VAR_NAME, ns=CORE_NS)
1✔
30
ERR_VAR_NAME = "*err*"
1✔
31
ERR_VAR_SYM = sym.symbol(ERR_VAR_NAME, ns=CORE_NS)
1✔
32

33

34
def pytest_configure(config):
1✔
35

36
    # https://github.com/pytest-dev/pytest/issues/12876
37
    #
38
    # Basilisp's standard output streams may be initialized before
39
    # pytest captures sys streams (sys.stdout and sys.stderr) for
40
    # testing (e.g., with `basilisp test`). Writing to the original
41
    # handles during tests on Windows can cause invalid handle
42
    # errors. To prevent this, we rebind them to pytest's streams
43
    # during tests and restore them afterward.
44
    out_var = runtime.Var.find(OUT_VAR_SYM)
1✔
45
    err_var = runtime.Var.find(ERR_VAR_SYM)
1✔
46
    if bindings := {
1✔
47
        k: v for k, v in {out_var: sys.stdout, err_var: sys.stderr}.items() if k
48
    }:
49
        runtime.push_thread_bindings(lmap.map(bindings))
1✔
50
        config.basilisp_bindings = bindings
1✔
51

52
    basilisp.bootstrap("basilisp.test")
1✔
53

54

55
def pytest_unconfigure(config):
1✔
56
    if hasattr(config, "basilisp_bindings"):
1✔
57
        runtime.pop_thread_bindings()
1✔
58

59

60
def pytest_collect_file(file_path: Path, parent):
1✔
61
    """Primary PyTest hook to identify Basilisp test files."""
62
    if file_path.suffix == ".lpy":
1✔
63
        if file_path.name.startswith("test_") or file_path.stem.endswith("_test"):
1✔
64
            return BasilispFile.from_parent(parent, path=file_path)
1✔
65
    return None
1✔
66

67

68
class TestFailuresInfo(Exception):
1✔
69
    __slots__ = ("_msg", "_data")
1✔
70

71
    def __init__(self, message: str, data: lmap.PersistentMap) -> None:
1✔
72
        super().__init__()
1✔
73
        self._msg = message
1✔
74
        self._data = data
1✔
75

76
    def __repr__(self):
77
        return (
78
            "basilisp.contrib.pytest.testrunner.TestFailuresInfo"
79
            f"({self._msg}, {lrepr(self._data)})"
80
        )
81

82
    def __str__(self):
1✔
83
        return f"{self._msg} {lrepr(self._data)}"
×
84

85
    @property
1✔
86
    def data(self) -> lmap.PersistentMap:
1✔
87
        return self._data
1✔
88

89
    @property
1✔
90
    def message(self) -> str:
1✔
91
        return self._msg
×
92

93

94
TestFunction = Callable[[], lmap.PersistentMap]
1✔
95
FixtureTeardown = Iterator[None]
1✔
96
FixtureFunction = Callable[[], Optional[FixtureTeardown]]
1✔
97

98

99
class FixtureManager:
1✔
100
    """FixtureManager instances manage `basilisp.test` style fixtures on behalf of a
101
    BasilispFile or BasilispTestItem node."""
102

103
    __slots__ = ("_fixtures", "_teardowns")
1✔
104

105
    def __init__(self, fixtures: Iterable[FixtureFunction]):
1✔
106
        self._fixtures: Iterable[FixtureFunction] = fixtures
1✔
107
        self._teardowns: Iterable[FixtureTeardown] = ()
1✔
108

109
    @staticmethod
1✔
110
    def _run_fixture(fixture: FixtureFunction) -> Optional[Iterator[None]]:
1✔
111
        """Run a fixture function. If the fixture is a generator function, return the
112
        generator/coroutine. Otherwise, simply return the value from the function, if
113
        one."""
114
        if inspect.isgeneratorfunction(fixture):
1✔
115
            coro = fixture()
1✔
116
            assert isinstance(coro, GeneratorType)
1✔
117
            next(coro)
1✔
118
            return coro
1✔
119
        else:
120
            fixture()
1✔
121
            return None
1✔
122

123
    @classmethod
1✔
124
    def _setup_fixtures(
1✔
125
        cls, fixtures: Iterable[FixtureFunction]
126
    ) -> Iterable[FixtureTeardown]:
127
        """Set up fixtures by running them as by `_run_fixture`. Collect any fixtures
128
        teardown steps (e.g. suspended coroutines) and return those so they can be
129
        resumed later for any cleanup."""
130
        teardown_fixtures = []
1✔
131
        try:
1✔
132
            for fixture in fixtures:
1✔
133
                teardown = cls._run_fixture(fixture)
1✔
134
                if teardown is not None:
1✔
135
                    teardown_fixtures.append(teardown)
1✔
136
        except Exception as e:
1✔
137
            raise runtime.RuntimeException(
1✔
138
                "Exception occurred during fixture setup"
139
            ) from e
140
        else:
141
            return teardown_fixtures
1✔
142

143
    @staticmethod
1✔
144
    def _teardown_fixtures(teardowns: Iterable[FixtureTeardown]) -> None:
1✔
145
        """Perform teardown steps returned from a fixture function."""
146
        for teardown in teardowns:
1✔
147
            try:
1✔
148
                next(teardown)
1✔
149
            except StopIteration:
1✔
150
                pass
1✔
151
            except Exception as e:
1✔
152
                raise runtime.RuntimeException(
1✔
153
                    "Exception occurred during fixture teardown"
154
                ) from e
155

156
    def setup(self) -> None:
1✔
157
        """Setup fixtures and store any teardowns for cleanup later.
158

159
        Should have a corresponding call to `FixtureManager.teardown` to clean up
160
        fixtures."""
161
        self._teardowns = self._setup_fixtures(self._fixtures)
1✔
162

163
    def teardown(self) -> None:
1✔
164
        """Teardown fixtures from a previous call to `setup`."""
165
        self._teardown_fixtures(self._teardowns)
1✔
166
        self._teardowns = ()
1✔
167

168

169
def _is_package(path: Path) -> bool:
1✔
170
    """Return `True` if the given path refers to a Python or Basilisp package."""
171
    _, _, files = next(os.walk(path))
×
172
    for file in files:
×
173
        if file in {"__init__.lpy", "__init__.py"} or file.endswith(".lpy"):
×
174
            return True
×
175
    return False
×
176

177

178
def _get_fully_qualified_module_names(file: Path) -> list[str]:
1✔
179
    """Return the fully qualified module name (from the import root) for a module given
180
    its location.
181

182
    This works by traversing up the filesystem looking for the top-most package. From
183
    there, we derive a Python module name referring to the given module path."""
184
    paths = []
1✔
185
    for pth in sys.path:
1✔
186
        root = Path(pth)
1✔
187
        if file.is_relative_to(root):
1✔
188
            elems = list(file.with_suffix("").relative_to(pth).parts)
1✔
189
            if elems[-1] == "__init__":
1✔
190
                elems.pop()
×
191
            paths.append(".".join(elems))
1✔
192
    return paths
1✔
193

194

195
class BasilispFile(pytest.File):
1✔
196
    """Files represent a test module in Python or a test namespace in Basilisp."""
197

198
    def __init__(self, **kwargs) -> None:
1✔
199
        super().__init__(**kwargs)
1✔
200
        self._fixture_manager: Optional[FixtureManager] = None
1✔
201

202
    @staticmethod
1✔
203
    def _collected_fixtures(
1✔
204
        ns: runtime.Namespace,
205
    ) -> tuple[Iterable[FixtureFunction], Iterable[FixtureFunction]]:
206
        """Collect all of the declared fixtures of the namespace."""
207
        if ns.meta is not None:
1✔
208
            return (
1✔
209
                ns.meta.val_at(_ONCE_FIXTURES_NUM_META_KW) or (),
210
                ns.meta.val_at(_EACH_FIXTURES_META_KW) or (),
211
            )
212
        return (), ()
×
213

214
    @staticmethod
1✔
215
    def _collected_tests(ns: runtime.Namespace) -> Iterable[runtime.Var]:
1✔
216
        """Return the sorted sequence of collected tests from the Namespace `ns`.
217

218
        Tests defined by `deftest` are annotated with `:basilisp.test/test` metadata.
219
        Tests are sorted by their line number, which matches the default behavior of
220
        PyTest."""
221

222
        def _test_num(var: runtime.Var) -> int:
1✔
223
            assert var.meta is not None
1✔
224
            order = var.meta.val_at(_LINE_KW)
1✔
225
            assert isinstance(order, int)
1✔
226
            return order
1✔
227

228
        return sorted(
1✔
229
            (
230
                var
231
                for _, var in ns.interns.items()
232
                if var.meta is not None and var.meta.val_at(_TEST_META_KW)
233
            ),
234
            key=_test_num,
235
        )
236

237
    def setup(self) -> None:
1✔
238
        assert self._fixture_manager is not None
1✔
239
        self._fixture_manager.setup()
1✔
240

241
    def teardown(self) -> None:
1✔
242
        assert self._fixture_manager is not None
1✔
243
        self._fixture_manager.teardown()
1✔
244

245
    def _import_module(self) -> runtime.BasilispModule:
1✔
246
        modnames = _get_fully_qualified_module_names(self.path)
1✔
247
        assert modnames, "Must have at least one module name"
1✔
248

249
        exc: Optional[ModuleNotFoundError] = None
1✔
250
        for modname in modnames:
1✔
251
            try:
1✔
252
                module = importlib.import_module(modname)
1✔
253
            except ModuleNotFoundError as e:
1✔
254
                exc = e
1✔
255
            else:
256
                assert isinstance(module, runtime.BasilispModule)
1✔
257
                return module
1✔
258

259
        assert exc is not None, "Must have an exception or module"
1✔
260
        raise exc
1✔
261

262
    def collect(self):
1✔
263
        """Collect all tests from the namespace (module) given.
264

265
        Basilisp's test runner imports the namespace which will (as a side effect)
266
        collect the test functions in a namespace (represented by `deftest` forms in
267
        Basilisp). BasilispFile.collect fetches those test functions and generates
268
        BasilispTestItems for PyTest to run the tests."""
269
        filename = self.path.name
1✔
270
        module = self._import_module()
1✔
271
        ns = module.__basilisp_namespace__
1✔
272
        once_fixtures, each_fixtures = self._collected_fixtures(ns)
1✔
273
        self._fixture_manager = FixtureManager(once_fixtures)
1✔
274
        for test in self._collected_tests(ns):
1✔
275
            f: TestFunction = test.value
1✔
276
            yield BasilispTestItem.from_parent(
1✔
277
                self,
278
                name=test.name.name,
279
                run_test=f,
280
                namespace=ns,
281
                filename=filename,
282
                fixture_manager=FixtureManager(each_fixtures),
283
            )
284

285

286
_ACTUAL_KW = kw.keyword("actual")
1✔
287
_ERROR_KW = kw.keyword("error")
1✔
288
_EXPECTED_KW = kw.keyword("expected")
1✔
289
_FAILURE_KW = kw.keyword("failure")
1✔
290
_FAILURES_KW = kw.keyword("failures")
1✔
291
_MESSAGE_KW = kw.keyword("message")
1✔
292
_LINE_KW = kw.keyword("line")
1✔
293
_EXPR_KW = kw.keyword("expr")
1✔
294
_TEST_SECTION_KW = kw.keyword("test-section")
1✔
295
_TYPE_KW = kw.keyword("type")
1✔
296

297

298
class BasilispTestItem(pytest.Item):
1✔
299
    """Test items correspond to a single `deftest` form in a Basilisp test.
300

301
    `deftest` forms run each `is` assertion and collect all failures in an atom,
302
    reporting their results as a vector of failures when each test concludes.
303

304
    The BasilispTestItem collects all the failures and returns a report to PyTest to
305
    show to the end-user."""
306

307
    def __init__(  # pylint: disable=too-many-arguments
1✔
308
        self,
309
        name: str,
310
        parent: BasilispFile,
311
        run_test: TestFunction,
312
        namespace: runtime.Namespace,
313
        filename: str,
314
        fixture_manager: FixtureManager,
315
    ) -> None:
316
        super().__init__(name, parent)
1✔
317
        self._run_test = run_test
1✔
318
        self._namespace = namespace
1✔
319
        self._filename = filename
1✔
320
        self._fixture_manager = fixture_manager
1✔
321

322
    @classmethod
1✔
323
    def from_parent(  # pylint: disable=arguments-differ,too-many-arguments
1✔
324
        cls,
325
        parent: "BasilispFile",
326
        name: str,
327
        run_test: TestFunction,
328
        namespace: runtime.Namespace,
329
        filename: str,
330
        fixture_manager: FixtureManager,
331
    ):
332
        """Create a new BasilispTestItem from the parent Node."""
333
        # https://github.com/pytest-dev/pytest/pull/6680
334
        return super().from_parent(
1✔
335
            parent,
336
            name=name,
337
            run_test=run_test,
338
            namespace=namespace,
339
            filename=filename,
340
            fixture_manager=fixture_manager,
341
        )
342

343
    def setup(self) -> None:
1✔
344
        self._fixture_manager.setup()
1✔
345

346
    def teardown(self) -> None:
1✔
347
        self._fixture_manager.teardown()
1✔
348

349
    def runtest(self):
1✔
350
        """Run the tests associated with this test item.
351

352
        If any tests fail, raise an ExceptionInfo exception with the test failures.
353
        PyTest will invoke self.repr_failure to display the failures to the user."""
354
        results: lmap.PersistentMap = self._run_test()
1✔
355
        failures: Optional[vec.PersistentVector] = results.val_at(_FAILURES_KW)
1✔
356
        if runtime.to_seq(failures):
1✔
357
            raise TestFailuresInfo("Test failures", lmap.map(results))
1✔
358

359
    def repr_failure(self, excinfo, style=None):
1✔
360
        """Representation function called when self.runtest() raises an exception."""
361
        if isinstance(excinfo.value, TestFailuresInfo):
1✔
362
            exc = excinfo.value
1✔
363
            failures = exc.data.val_at(_FAILURES_KW)
1✔
364
            messages = []
1✔
365

366
            for details in failures:
1✔
367
                type_ = details.val_at(_TYPE_KW)
1✔
368
                if type_ == _FAILURE_KW:
1✔
369
                    messages.append(self._failure_msg(details))
1✔
370
                elif type_ == _ERROR_KW:
1✔
371
                    exc = details.val_at(_ACTUAL_KW)
1✔
372
                    line = details.val_at(_LINE_KW)
1✔
373
                    messages.append(self._error_msg(exc, line=line))
1✔
374
                else:  # pragma: no cover
375
                    assert False, "Test failure type must be in #{:error :failure}"
376

377
            return "\n\n".join(messages)
1✔
378
        elif isinstance(excinfo.value, Exception):
1✔
379
            return self._error_msg(excinfo.value)
1✔
380
        else:
381
            return None
×
382

383
    def reportinfo(self):
1✔
384
        return self.fspath, 0, self.name
1✔
385

386
    def _error_msg(self, exc: Exception, line: Optional[int] = None) -> str:
1✔
387
        line_msg = Maybe(line).map(lambda l: f":{l}").or_else_get("")
1✔
388
        messages = [f"ERROR in ({self.name}) ({self._filename}{line_msg})", "\n\n"]
1✔
389
        messages.extend(traceback.format_exception(Exception, exc, exc.__traceback__))
1✔
390
        return "".join(messages)
1✔
391

392
    def _failure_msg(self, details: lmap.PersistentMap) -> str:
1✔
393
        assert details.val_at(_TYPE_KW) == _FAILURE_KW
1✔
394
        msg: str = details.val_at(_MESSAGE_KW)
1✔
395

396
        actual = details.val_at(_ACTUAL_KW)
1✔
397
        expected = details.val_at(_EXPECTED_KW)
1✔
398

399
        test_section = details.val_at(_TEST_SECTION_KW)
1✔
400
        line = details.val_at(_LINE_KW)
1✔
401
        line_msg = Maybe(line).map(lambda l: f":{l}").or_else_get("")
1✔
402
        section_msg = Maybe(test_section).map(lambda s: f" {s} :: ").or_else_get("")
1✔
403

404
        return "\n".join(
1✔
405
            [
406
                f"FAIL in ({self.name}) ({self._filename}{line_msg})",
407
                f"    {section_msg}{msg}",
408
                "",
409
                f"    expected: {lrepr(expected)}",
410
                f"      actual: {lrepr(actual)}",
411
            ]
412
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc