• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

haliphax / xthulu / 19526335899

20 Nov 2025 05:09AM UTC coverage: 56.979% (-0.04%) from 57.017%
19526335899

push

github

haliphax
🎨 remove unnecessary type hint

0 of 1 new or added line in 1 file covered. (0.0%)

645 of 1132 relevant lines covered (56.98%)

0.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.48
/xthulu/ssh/console/app.py
1
"""Textual application wrapper"""
2

3
# stdlib
4
from asyncio import sleep
1✔
5

6
# 3rd party
7
from rich.segment import Segments
1✔
8
from textual import events
1✔
9
from textual.app import App, ReturnType
1✔
10
from textual.geometry import Size
1✔
11

12
# local
13
from ...logger import log
1✔
14
from ..context import SSHContext
1✔
15

16

17
class _ErrorConsoleProxy:
1✔
18
    def print(self, what, **kwargs):
1✔
19
        if isinstance(what, Segments):
×
20
            log.error("".join([s.text for s in what.segments]))
×
21
            return
×
22

23
        log.error(what)
×
24

25

26
class XthuluApp(App[ReturnType]):
1✔
27
    """SSH wrapper for Textual apps"""
28

29
    ENABLE_COMMAND_PALETTE = False
1✔
30
    """Command palette is disabled by default"""
1✔
31

32
    context: SSHContext
1✔
33
    """The current SSH context"""
1✔
34

35
    def __init__(self, context: SSHContext, ansi_color=True, **kwargs):
1✔
36
        ""  # empty docstring
37
        # avoid cyclic import
38
        from .internal.driver import SSHDriver
×
39

40
        super(XthuluApp, self).__init__(
×
41
            driver_class=SSHDriver, ansi_color=ansi_color, **kwargs
42
        )
43
        self.context = context
×
44
        self.console = context.console
×
45
        self.error_console = _ErrorConsoleProxy()  # type: ignore
×
46
        self.run_worker(self._watch_for_resize, exclusive=True)
×
47

48
    async def _watch_for_resize(self):
1✔
49
        # avoid cyclic import
50
        from .internal.driver import SSHDriver
×
51

52
        while True:
×
NEW
53
            ev = self.context.events.get("resize")
×
54

55
            if not ev:
×
56
                await sleep(0.5)
×
57
                continue
×
58

59
            new_size = Size(*ev[-1].data)
×
60
            d: SSHDriver = self._driver  # type: ignore
×
61
            d.process_message(events.Resize(new_size, new_size))
×
62

63
    def exit(self, **kwargs) -> None:  # type: ignore
1✔
64
        ""  # empty docstring
65
        # avoid cyclic import
66
        from .internal.driver import SSHDriver
×
67

68
        super(XthuluApp, self).exit(**kwargs)
×
69
        d: SSHDriver = self._driver  # type: ignore
×
70
        d._disable_bracketed_paste()
×
71
        d._disable_mouse_support()
×
72
        d.exit_event.set()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc