• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 3446760580

pending completion
3446760580

Pull #78

github-actions

GitHub
Merge c07026e24 into 0eed515c0
Pull Request #78: Tutorial application steps

620 of 747 branches covered (83.0%)

Branch coverage included in aggregate %.

70 of 70 new or added lines in 15 files covered. (100.0%)

3507 of 3680 relevant lines covered (95.3%)

3.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.57
/rsocket/rsocket_client.py
1
import asyncio
4✔
2
from asyncio import CancelledError
4✔
3
from datetime import timedelta, datetime
4✔
4
from typing import Optional, Callable, AsyncGenerator, Any
4✔
5
from typing import Union
4✔
6

7
from reactivestreams.publisher import Publisher
4✔
8
from rsocket.exceptions import RSocketNoAvailableTransport
4✔
9
from rsocket.extensions.mimetypes import WellKnownMimeTypes
4✔
10
from rsocket.helpers import create_future, cancel_if_task_exists
4✔
11
from rsocket.local_typing import Awaitable
4✔
12
from rsocket.logger import logger
4✔
13
from rsocket.payload import Payload
4✔
14
from rsocket.request_handler import BaseRequestHandler
4✔
15
from rsocket.request_handler import RequestHandler
4✔
16
from rsocket.rsocket_base import RSocketBase
4✔
17
from rsocket.transports.transport import Transport
4✔
18

19

20
class RSocketClient(RSocketBase):
4✔
21

22
    def __init__(self,
4✔
23
                 transport_provider: AsyncGenerator[Transport, Any],
24
                 handler_factory: Callable[[RSocketBase], RequestHandler] = BaseRequestHandler,
25
                 honor_lease=False,
26
                 lease_publisher: Optional[Publisher] = None,
27
                 request_queue_size: int = 0,
28
                 data_encoding: Union[str, bytes, WellKnownMimeTypes] = WellKnownMimeTypes.APPLICATION_JSON,
29
                 metadata_encoding: Union[str, bytes, WellKnownMimeTypes] = WellKnownMimeTypes.APPLICATION_JSON,
30
                 keep_alive_period: timedelta = timedelta(milliseconds=500),
31
                 max_lifetime_period: timedelta = timedelta(minutes=10),
32
                 setup_payload: Optional[Payload] = None,
33
                 fragment_size_bytes: Optional[int] = None
34
                 ):
35
        self._transport_provider = transport_provider.__aiter__()
4✔
36
        self._is_server_alive = True
4✔
37
        self._update_last_keepalive()
4✔
38
        self._connect_request_event = asyncio.Event()
4✔
39
        self._transport: Optional[Transport] = None
4✔
40
        self._next_transport = asyncio.Future()
4✔
41
        self._reconnect_task = asyncio.create_task(self._reconnect_listener())
4✔
42
        self._keepalive_task = None
4✔
43

44
        super().__init__(handler_factory=handler_factory,
4✔
45
                         honor_lease=honor_lease,
46
                         lease_publisher=lease_publisher,
47
                         request_queue_size=request_queue_size,
48
                         data_encoding=data_encoding,
49
                         metadata_encoding=metadata_encoding,
50
                         keep_alive_period=keep_alive_period,
51
                         max_lifetime_period=max_lifetime_period,
52
                         setup_payload=setup_payload,
53
                         fragment_size_bytes=fragment_size_bytes)
54

55
    def _current_transport(self) -> Awaitable[Transport]:
4✔
56
        return self._next_transport
4✔
57

58
    def _log_identifier(self) -> str:
4✔
59
        return 'client'
4✔
60

61
    async def connect(self):
4✔
62
        logger().debug('%s: connecting', self._log_identifier())
4✔
63
        self._is_closing = False
4✔
64
        self._reset_internals()
4✔
65
        self._start_tasks()
4✔
66

67
        try:
4✔
68
            await self._connect_new_transport()
4✔
69
        except RSocketNoAvailableTransport:
4!
70
            logger().error('%s: No available transport', self._log_identifier(), exc_info=True)
×
71
            return
×
72
        except Exception as exception:
4✔
73
            logger().error('%s: Connection error', self._log_identifier(), exc_info=True)
4✔
74
            await self._on_connection_error(exception)
4✔
75
            return
4✔
76

77
        return await super().connect()
4✔
78

79
    async def _connect_new_transport(self):
4✔
80
        try:
4✔
81
            new_transport = await self._get_new_transport()
4✔
82

83
            if new_transport is None:
4!
84
                raise RSocketNoAvailableTransport()
×
85

86
            self._next_transport.set_result(new_transport)
4✔
87
            transport = await self._current_transport()
4✔
88
            await transport.connect()
4✔
89
        finally:
90
            self._connecting = False
4✔
91

92
    async def _get_new_transport(self):
4✔
93
        try:
4✔
94
            return await self._transport_provider.__anext__()
4✔
95
        except StopAsyncIteration:
4✔
96
            return
×
97

98
    async def close(self):
4✔
99
        await self._close()
4✔
100

101
    async def _close(self, reconnect=False):
4✔
102

103
        if not reconnect:
4✔
104
            await cancel_if_task_exists(self._reconnect_task)
4✔
105
        else:
106
            logger().debug('%s: Closing before reconnect', self._log_identifier())
4✔
107

108
        await super().close()
4✔
109

110
    async def __aenter__(self) -> 'RSocketClient':
4✔
111
        await self.connect()
4✔
112
        return self
4✔
113

114
    def _get_first_stream_id(self) -> int:
4✔
115
        return 1
4✔
116

117
    async def reconnect(self):
4✔
118
        logger().info('%s: Reconnecting', self._log_identifier())
4✔
119

120
        self._connect_request_event.set()
4✔
121

122
    async def _reconnect_listener(self):
4✔
123
        try:
4✔
124
            while True:
2✔
125
                try:
4✔
126
                    await self._connect_request_event.wait()
4✔
127

128
                    logger().debug('%s: Got reconnect request', self._log_identifier())
4✔
129

130
                    if self._connecting:
4!
131
                        continue
×
132

133
                    self._connecting = True
4✔
134
                    self._connect_request_event.clear()
4✔
135
                    await self._close(reconnect=True)
4✔
136
                    self._next_transport = create_future()
4✔
137
                    await self.connect()
4✔
138
                finally:
139
                    self._connect_request_event.clear()
4!
140
        except CancelledError:
4!
141
            logger().debug('%s: Asyncio task canceled: reconnect_listener', self._log_identifier())
4✔
142
        except Exception:
×
143
            logger().error('%s: Reconnect listener', self._log_identifier(), exc_info=True)
×
144
        finally:
145
            self.stop_all_streams()
4✔
146

147
    async def _keepalive_send_task(self):
4✔
148
        try:
4✔
149
            while True:
2✔
150
                await asyncio.sleep(self._keep_alive_period.total_seconds())
4✔
151
                self._send_new_keepalive()
4✔
152
        except asyncio.CancelledError:
4✔
153
            logger().debug('%s: Asyncio task canceled: keepalive_send', self._log_identifier())
4✔
154

155
    def _before_sender(self):
4✔
156
        self._keepalive_task = self._start_task_if_not_closing(self._keepalive_send_task)
4✔
157

158
    async def _finally_sender(self):
4✔
159
        await cancel_if_task_exists(self._keepalive_task)
4✔
160

161
    def _update_last_keepalive(self):
4✔
162
        self._last_server_keepalive = datetime.now()
4✔
163

164
    def is_server_alive(self) -> bool:
4✔
165
        return self._is_server_alive
4✔
166

167
    async def _keepalive_timeout_task(self):
4✔
168
        try:
4✔
169
            while True:
2✔
170
                await asyncio.sleep(self._max_lifetime_period.total_seconds())
4✔
171
                now = datetime.now()
4✔
172
                time_since_last_keepalive = now - self._last_server_keepalive
4✔
173

174
                if time_since_last_keepalive > self._max_lifetime_period:
4!
175
                    self._is_server_alive = False
4✔
176
                    await self._handler.on_keepalive_timeout(
4✔
177
                        time_since_last_keepalive,
178
                        self
179
                    )
180
        except asyncio.CancelledError:
4✔
181
            logger().debug('%s: Asyncio task canceled: keepalive_timeout', self._log_identifier())
4✔
182

183
    async def _receiver_listen(self):
4✔
184
        keepalive_timeout_task = self._start_task_if_not_closing(self._keepalive_timeout_task)
4✔
185

186
        try:
4✔
187
            await super()._receiver_listen()
4✔
188
        finally:
189
            await cancel_if_task_exists(keepalive_timeout_task)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc