• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 13409972107

19 Feb 2025 10:03AM UTC coverage: 92.028% (-0.3%) from 92.314%
13409972107

push

github

jell-o-fishi
version bump

833 of 976 branches covered (85.35%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

2 existing lines in 2 files now uncovered.

3981 of 4255 relevant lines covered (93.56%)

4.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.18
/rsocket/rsocket_base.py
1
import abc
5✔
2
import asyncio
5✔
3
from asyncio import Task
5✔
4
from contextlib import asynccontextmanager
5✔
5
from datetime import timedelta
5✔
6
from typing import Union, Optional, Dict, Any, Coroutine, Callable, Type, cast, TypeVar
5✔
7

8
from reactivestreams.publisher import Publisher
5✔
9
from reactivestreams.subscriber import DefaultSubscriber
5✔
10
from rsocket.error_codes import ErrorCode
5✔
11
from rsocket.exceptions import RSocketProtocolError, RSocketTransportError, RSocketError
5✔
12
from rsocket.extensions.mimetypes import WellKnownMimeTypes, ensure_encoding_name
5✔
13
from rsocket.frame import (KeepAliveFrame,
5✔
14
                           MetadataPushFrame, RequestFireAndForgetFrame,
15
                           RequestResponseFrame, RequestStreamFrame, Frame,
16
                           exception_to_error_frame,
17
                           LeaseFrame, ErrorFrame, RequestFrame,
18
                           initiate_request_frame_types, InvalidFrame,
19
                           FragmentableFrame, FrameFragmentMixin, MINIMUM_FRAGMENT_SIZE_BYTES)
20
from rsocket.frame import (RequestChannelFrame, ResumeFrame,
5✔
21
                           is_fragmentable_frame, CONNECTION_STREAM_ID)
22
from rsocket.frame import SetupFrame
5✔
23
from rsocket.frame_builders import to_payload_frame, to_fire_and_forget_frame, to_setup_frame, to_metadata_push_frame, \
5✔
24
    to_keepalive_frame
25
from rsocket.frame_fragment_cache import FrameFragmentCache
5✔
26
from rsocket.frame_logger import log_frame
5✔
27
from rsocket.handlers.request_cahnnel_responder import RequestChannelResponder
5✔
28
from rsocket.handlers.request_channel_requester import RequestChannelRequester
5✔
29
from rsocket.handlers.request_response_requester import RequestResponseRequester
5✔
30
from rsocket.handlers.request_response_responder import RequestResponseResponder
5✔
31
from rsocket.handlers.request_stream_requester import RequestStreamRequester
5✔
32
from rsocket.handlers.request_stream_responder import RequestStreamResponder
5✔
33
from rsocket.helpers import payload_from_frame, async_noop, cancel_if_task_exists
5✔
34
from rsocket.lease import DefinedLease, NullLease, Lease
5✔
35
from rsocket.local_typing import Awaitable
5✔
36
from rsocket.logger import logger
5✔
37
from rsocket.payload import Payload
5✔
38
from rsocket.queue_peekable import QueuePeekable
5✔
39
from rsocket.request_handler import BaseRequestHandler, RequestHandler
5✔
40
from rsocket.rsocket import RSocket
5✔
41
from rsocket.rsocket_internal import RSocketInternal
5✔
42
from rsocket.stream_control import StreamControl
5✔
43
from rsocket.streams.backpressureapi import BackpressureApi
5✔
44
from rsocket.streams.stream_handler import StreamHandler
5✔
45
from rsocket.transports.transport import Transport
5✔
46

47
T = TypeVar('T')
5✔
48

49

50
class RSocketBase(RSocket, RSocketInternal):
5✔
51
    class LeaseSubscriber(DefaultSubscriber):
5✔
52
        def __init__(self, socket: 'RSocketBase'):
5✔
53
            super().__init__()
5✔
54
            self._socket = socket
5✔
55

56
        def on_next(self, value, is_complete=False):
5✔
57
            self._socket.send_lease(value)
5✔
58

59
    def __init__(self,
5✔
60
                 handler_factory: Callable[[], RequestHandler] = BaseRequestHandler,
61
                 honor_lease=False,
62
                 lease_publisher: Optional[Publisher] = None,
63
                 request_queue_size: int = 0,
64
                 data_encoding: Union[str, bytes, WellKnownMimeTypes] = WellKnownMimeTypes.APPLICATION_JSON,
65
                 metadata_encoding: Union[str, bytes, WellKnownMimeTypes] = WellKnownMimeTypes.APPLICATION_JSON,
66
                 keep_alive_period: timedelta = timedelta(milliseconds=500),
67
                 max_lifetime_period: timedelta = timedelta(minutes=10),
68
                 setup_payload: Optional[Payload] = None,
69
                 fragment_size_bytes: Optional[int] = None
70
                 ):
71

72
        self._assert_valid_fragment_size(fragment_size_bytes)
5✔
73

74
        self._handler_factory = handler_factory
5✔
75
        self._request_queue_size = request_queue_size
5✔
76
        self._honor_lease = honor_lease
5✔
77
        self._max_lifetime_period = max_lifetime_period
5✔
78
        self._keep_alive_period = keep_alive_period
5✔
79
        self._setup_payload = setup_payload
5✔
80
        self._data_encoding = ensure_encoding_name(data_encoding)
5✔
81
        self._metadata_encoding = ensure_encoding_name(metadata_encoding)
5✔
82
        self._lease_publisher = lease_publisher
5✔
83
        self._sender_task = None
5✔
84
        self._receiver_task = None
5✔
85
        self._handler = self._handler_factory()
5✔
86
        self._responder_lease = None
5✔
87
        self._requester_lease = None
5✔
88
        self._is_closing = False
5✔
89
        self._connecting = True
5✔
90
        self._fragment_size_bytes = fragment_size_bytes
5✔
91

92
        self._setup_internals()
5✔
93

94
    def get_fragment_size_bytes(self) -> Optional[int]:
5✔
95
        return self._fragment_size_bytes
5✔
96

97
    def _setup_internals(self):
5✔
98
        pass
99

100
    @abc.abstractmethod
101
    def _current_transport(self) -> Awaitable[Transport]:
102
        ...
103

104
    def _reset_internals(self):
5✔
105
        self._frame_fragment_cache = FrameFragmentCache()
5✔
106
        self._send_queue = QueuePeekable()
5✔
107
        self._request_queue = asyncio.Queue(self._request_queue_size)
5✔
108

109
        if self._honor_lease:
5✔
110
            self._requester_lease = DefinedLease(maximum_request_count=0)
5✔
111
        else:
112
            self._requester_lease = NullLease()
5✔
113

114
        self._responder_lease = NullLease()
5✔
115
        self._stream_control = StreamControl(self._get_first_stream_id())
5✔
116
        self._is_closing = False
5✔
117

118
    def stop_all_streams(self, error_code=ErrorCode.CONNECTION_ERROR, data=b''):
5✔
119
        self._stream_control.stop_all_streams(error_code, data)
5✔
120

121
    def _start_tasks(self):
5✔
122
        self._receiver_task = self._start_task_if_not_closing(self._receiver)
5✔
123
        self._sender_task = self._start_task_if_not_closing(self._sender)
5✔
124

125
    async def connect(self):
5✔
126
        self.send_priority_frame(self._create_setup_frame(self._data_encoding,
5✔
127
                                                          self._metadata_encoding,
128
                                                          self._setup_payload))
129

130
        if self._honor_lease:
5✔
131
            self._subscribe_to_lease_publisher()
5✔
132

133
        return self
5✔
134

135
    def _start_task_if_not_closing(self, task_factory: Callable[[], Coroutine]) -> Optional[Task]:
5✔
136
        if not self._is_closing:
5!
137
            return asyncio.create_task(task_factory())
5✔
138

139
    def set_handler_using_factory(self, handler_factory) -> RequestHandler:
5✔
140
        self._handler = handler_factory()
5✔
141
        return self._handler
5✔
142

143
    def _allocate_stream(self) -> int:
5✔
144
        return self._stream_control.allocate_stream()
5✔
145

146
    @abc.abstractmethod
147
    def _get_first_stream_id(self) -> int:
148
        ...
149

150
    def finish_stream(self, stream_id: int):
5✔
151
        self._stream_control.finish_stream(stream_id)
5✔
152

153
    def send_request(self, frame: RequestFrame):
5✔
154
        if self._honor_lease and not self._is_frame_allowed_to_send(frame):
5✔
155
            self._queue_request_frame(frame)
5✔
156
        else:
157
            self.send_frame(frame)
5✔
158

159
    def _queue_request_frame(self, frame: RequestFrame):
5✔
160
        logger().debug('%s: lease not allowing to send request. queueing', self._log_identifier())
5✔
161

162
        self._request_queue.put_nowait(frame)
5✔
163

164
    def send_priority_frame(self, frame: Frame):
5✔
165
        items = []
5✔
166
        while not self._send_queue.empty():
5✔
167
            items.append(self._send_queue.get_nowait())
5✔
168

169
        self._send_queue.put_nowait(frame)
5✔
170
        for item in items:
5✔
171
            self._send_queue.put_nowait(item)
5✔
172

173
    def send_frame(self, frame: Frame):
5✔
174
        self._send_queue.put_nowait(frame)
5✔
175

176
    def send_complete(self, stream_id: int):
5✔
177
        self.send_payload(stream_id, Payload(), complete=True, is_next=False)
5✔
178

179
    def send_error(self, stream_id: int, exception: Exception):
5✔
180
        self.send_frame(exception_to_error_frame(stream_id, exception))
5✔
181

182
    def send_payload(self, stream_id: int, payload: Payload, complete=False, is_next=True):
5✔
183
        self.send_frame(to_payload_frame(stream_id, payload, complete, is_next=is_next,
5✔
184
                                         fragment_size_bytes=self.get_fragment_size_bytes()))
185

186
    def _update_last_keepalive(self):
5✔
187
        pass
188

189
    def register_new_stream(self, handler: T) -> T:
5✔
190
        stream_id = self._allocate_stream()
5✔
191
        self._register_stream(stream_id, handler)
5✔
192
        return handler
5✔
193

194
    def _register_stream(self, stream_id: int, handler: StreamHandler):
5✔
195
        handler.stream_id = stream_id
5✔
196
        self._stream_control.register_stream(stream_id, handler)
5✔
197
        return handler
5✔
198

199
    async def handle_error(self, frame: ErrorFrame):
5✔
200
        await self._handler.on_error(frame.error_code, payload_from_frame(frame))
5✔
201

202
    async def handle_keep_alive(self, frame: KeepAliveFrame):
5✔
203
        self._update_last_keepalive()
5✔
204

205
        if frame.flags_respond:
5✔
206
            frame.flags_respond = False
5✔
207
            self.send_frame(frame)
5✔
208

209
    async def handle_request_response(self, frame: RequestResponseFrame):
5✔
210
        stream_id = frame.stream_id
5✔
211
        self._stream_control.assert_stream_id_available(stream_id)
5✔
212
        handler = self._handler
5✔
213

214
        response_future = await handler.request_response(payload_from_frame(frame))
5✔
215

216
        self._register_stream(stream_id, RequestResponseResponder(self, response_future)).setup()
5✔
217

218
    async def handle_request_stream(self, frame: RequestStreamFrame):
5✔
219
        stream_id = frame.stream_id
5✔
220
        self._stream_control.assert_stream_id_available(stream_id)
5✔
221
        handler = self._handler
5✔
222

223
        publisher = await handler.request_stream(payload_from_frame(frame))
5✔
224

225
        request_responder = RequestStreamResponder(self, publisher)
5✔
226
        self._register_stream(stream_id, request_responder)
5✔
227
        request_responder.frame_received(frame)
5✔
228

229
    async def handle_setup(self, frame: SetupFrame):
5✔
230
        if frame.flags_resume:
5✔
231
            raise RSocketProtocolError(ErrorCode.UNSUPPORTED_SETUP, data='Resume not supported')
5✔
232

233
        if frame.flags_lease:
5✔
234
            if self._lease_publisher is None:
5✔
235
                raise RSocketProtocolError(ErrorCode.UNSUPPORTED_SETUP, data='Lease not available')
5✔
236
            else:
237
                self._subscribe_to_lease_publisher()
5✔
238

239
        handler = self._handler
5✔
240

241
        try:
5✔
242
            await handler.on_setup(frame.data_encoding,
5✔
243
                                   frame.metadata_encoding,
244
                                   payload_from_frame(frame))
245
        except Exception as exception:
5✔
246
            logger().error('%s: Setup error', self._log_identifier(), exc_info=True)
5✔
247
            raise RSocketProtocolError(ErrorCode.REJECTED_SETUP, data=str(exception)) from exception
5✔
248

249
    def _subscribe_to_lease_publisher(self):
5✔
250
        if self._lease_publisher is not None:
5✔
251
            self._lease_publisher.subscribe(self.LeaseSubscriber(self))
5✔
252

253
    def send_lease(self, lease: Lease):
5✔
254
        try:
5✔
255
            self._responder_lease = lease
5✔
256

257
            self.send_frame(self._responder_lease.to_frame())
5✔
258
        except Exception as exception:
×
259
            self.send_error(CONNECTION_STREAM_ID, exception)
×
260

261
    async def handle_fire_and_forget(self, frame: RequestFireAndForgetFrame):
5✔
262
        self._stream_control.assert_stream_id_available(frame.stream_id)
5✔
263

264
        await self._handler.request_fire_and_forget(payload_from_frame(frame))
5✔
265

266
    async def handle_metadata_push(self, frame: MetadataPushFrame):
5✔
267
        await self._handler.on_metadata_push(Payload(None, frame.metadata))
5✔
268

269
    async def handle_request_channel(self, frame: RequestChannelFrame):
5✔
270
        stream_id = frame.stream_id
5✔
271
        self._stream_control.assert_stream_id_available(stream_id)
5✔
272
        handler = self._handler
5✔
273

274
        publisher, subscriber = await handler.request_channel(payload_from_frame(frame))
5✔
275

276
        channel_responder = RequestChannelResponder(self, publisher)
5✔
277
        self._register_stream(stream_id, channel_responder)
5✔
278
        channel_responder.subscribe(subscriber)
5✔
279
        channel_responder.frame_received(frame)
5✔
280

281
    async def handle_resume(self, frame: ResumeFrame):
5✔
282
        raise RSocketProtocolError(ErrorCode.REJECTED_RESUME, data='Resume not supported')
5✔
283

284
    async def handle_lease(self, frame: LeaseFrame):
5✔
285
        self._requester_lease = DefinedLease(
5✔
286
            frame.number_of_requests,
287
            timedelta(milliseconds=frame.time_to_live)
288
        )
289

290
        while not self._request_queue.empty() and self._requester_lease.is_request_allowed():
5✔
291
            self.send_frame(self._request_queue.get_nowait())
5✔
292
            self._request_queue.task_done()
5✔
293

294
    async def _receiver(self):
5✔
295
        try:
5✔
296
            await self._receiver_listen()
5✔
297
        except asyncio.CancelledError:
5✔
298
            logger().debug('%s: Asyncio task canceled: receiver', self._log_identifier())
5✔
299
        except RSocketTransportError:
5✔
300
            pass
301
        except Exception:
5✔
302
            logger().error('%s: Unknown error', self._log_identifier(), exc_info=True)
5✔
303
            raise
5✔
304

305
        await self._on_connection_closed()
5✔
306

307
    async def _on_connection_error(self, exception: Exception):
5✔
308
        logger().warning(str(exception))
5✔
309
        logger().debug(str(exception), exc_info=exception)
5✔
310
        await self._handler.on_connection_error(self, exception)
5✔
311

312
    async def _on_connection_closed(self):
5✔
313
        self.stop_all_streams()
5✔
314
        await self._handler.on_close(self)
5✔
315
        await self._stop_tasks()
5✔
316

317
    @abc.abstractmethod
318
    def is_server_alive(self) -> bool:
319
        ...
320

321
    async def _receiver_listen(self):
5✔
322
        async_frame_handler_by_type: Dict[Type[Frame], Any] = {
5✔
323
            RequestResponseFrame: self.handle_request_response,
324
            RequestStreamFrame: self.handle_request_stream,
325
            RequestChannelFrame: self.handle_request_channel,
326
            SetupFrame: self.handle_setup,
327
            RequestFireAndForgetFrame: self.handle_fire_and_forget,
328
            MetadataPushFrame: self.handle_metadata_push,
329
            ResumeFrame: self.handle_resume,
330
            LeaseFrame: self.handle_lease,
331
            KeepAliveFrame: self.handle_keep_alive,
332
            ErrorFrame: self.handle_error
333
        }
334

335
        transport = await self._current_transport()
5✔
336
        while self.is_server_alive():
5✔
337
            next_frame_generator = await transport.next_frame_generator()
5✔
338
            if next_frame_generator is None:
5✔
339
                break
5✔
340
            async for frame in next_frame_generator:
5✔
341
                try:
5✔
342
                    await self._handle_next_frame(frame, async_frame_handler_by_type)
5✔
343
                except RSocketProtocolError as exception:
5✔
344
                    logger().error('%s: Protocol error %s', self._log_identifier(), str(exception))
5✔
345
                    self.send_error(frame.stream_id, exception)
5✔
346
                except RSocketTransportError:
5!
347
                    raise
×
348
                except Exception as exception:
5✔
349
                    logger().error('%s: Unknown error', self._log_identifier(), exc_info=True)
5✔
350
                    self.send_error(frame.stream_id, exception)
5✔
351

352
    async def _handle_next_frame(self, frame: Frame, async_frame_handler_by_type):
5✔
353

354
        log_frame(frame, self._log_identifier())
5✔
355

356
        if isinstance(frame, InvalidFrame):
5✔
357
            return
5✔
358

359
        if is_fragmentable_frame(frame):
5✔
360
            complete_frame = self._frame_fragment_cache.append(cast(FragmentableFrame, frame))
5✔
361
            if complete_frame is None:
5✔
362
                return
5✔
363
        else:
364
            complete_frame = frame
5✔
365

366
        if (complete_frame.stream_id == CONNECTION_STREAM_ID or
5✔
367
                isinstance(complete_frame, initiate_request_frame_types)):
368
            await self._handle_frame_by_type(complete_frame, async_frame_handler_by_type)
5✔
369
        elif self._stream_control.handle_stream(complete_frame):
5✔
370
            return
5✔
371
        else:
372
            logger().warning('%s: Dropping frame from unknown stream %d', self._log_identifier(),
5✔
373
                             complete_frame.stream_id)
374

375
    async def _handle_frame_by_type(self, frame: Frame, async_frame_handler_by_type):
5✔
376
        frame_handler = async_frame_handler_by_type.get(type(frame), async_noop)
5✔
377
        await frame_handler(frame)
5✔
378

379
    def _send_new_keepalive(self, data: bytes = b''):
5✔
380
        self.send_frame(to_keepalive_frame(data))
5✔
381

382
    def _before_sender(self):
5✔
383
        pass
384

385
    async def _finally_sender(self):
5✔
386
        pass
387

388
    @asynccontextmanager
5✔
389
    async def _get_next_frame_to_send(self, transport: Transport) -> Frame:
5✔
390
        next_frame_source = await self._send_queue.peek()
5✔
391

392
        if isinstance(next_frame_source, FrameFragmentMixin):
5✔
393
            next_fragment = next_frame_source.get_next_fragment(transport.requires_length_header())
5✔
394

395
            if next_fragment.flags_follows:
5✔
396
                self._send_queue.put_nowait(self._send_queue.get_nowait())  # cycle to next frame source in queue
5✔
397
            else:
398
                next_frame_source.get_next_fragment(
5✔
399
                    transport.requires_length_header())  # workaround to clean-up generator.
400
                self._send_queue.get_nowait()
5✔
401

402
            yield next_fragment
5✔
403

404
        else:
405
            self._send_queue.get_nowait()
5✔
406
            yield next_frame_source
5✔
407

408
    async def _sender(self):
5✔
409
        try:
5✔
410
            try:
5✔
411
                transport = await self._current_transport()
5✔
412

413
                self._before_sender()
5✔
414
                while self.is_server_alive():
5✔
415
                    async with self._get_next_frame_to_send(transport) as frame:
5✔
416
                        await transport.send_frame(frame)
5✔
417
                        log_frame(frame, self._log_identifier(), 'Sent')
5✔
418

419
                        if frame.sent_future is not None:
5✔
420
                            frame.sent_future.set_result(None)
5✔
421

422
                    if self._send_queue.empty():
5✔
423
                        await transport.on_send_queue_empty()
5✔
424
            except RSocketTransportError:
5✔
UNCOV
425
                logger().error('Error', exc_info=True)
2✔
426
                pass
427

428
        except asyncio.CancelledError:
5✔
429
            logger().debug('%s: Asyncio task canceled: sender', self._log_identifier())
5✔
430
        except Exception:
5✔
431
            logger().error('%s: RSocket error', self._log_identifier(), exc_info=True)
×
432
            raise
×
433
        finally:
434
            await self._finally_sender()
5✔
435

436
    async def close(self):
5✔
437
        logger().debug('%s: Closing', self._log_identifier())
5✔
438

439
        await self._stop_tasks()
5✔
440

441
        await self._close_transport()
5✔
442

443
    async def _stop_tasks(self):
5✔
444
        logger().debug('%s: Cleanup', self._log_identifier())
5✔
445

446
        self._is_closing = True
5✔
447
        await cancel_if_task_exists(self._sender_task)
5✔
448
        self._sender_task = None
5✔
449
        await cancel_if_task_exists(self._receiver_task)
5✔
450
        self._receiver_task = None
5✔
451

452
    async def _close_transport(self):
5✔
453
        if self._current_transport().done():
5!
454
            logger().debug('%s: Closing transport', self._log_identifier())
5✔
455
            try:
5✔
456
                transport = await self._current_transport()
5✔
457
            except asyncio.CancelledError:
5✔
458
                raise RSocketTransportError()
5✔
459

460
            if transport is not None:
5!
461
                try:
5✔
462
                    await transport.close()
5✔
463
                except Exception:
×
464
                    logger().debug('Transport already closed or failed to close', exc_info=True)
×
465

466
    async def __aenter__(self) -> 'RSocketBase':
5✔
467
        return self
×
468

469
    async def __aexit__(self, exc_type, exc_val, exc_tb):
5✔
470
        await self.close()
5✔
471

472
    def request_response(self, payload: Payload) -> Awaitable[Payload]:
5✔
473
        """
474
        Initiate a request-response interaction.
475
        """
476

477
        logger().debug('%s: request-response: %s', self._log_identifier(), payload)
5✔
478

479
        requester = RequestResponseRequester(self, payload)
5✔
480
        self.register_new_stream(requester).setup()
5✔
481
        return requester.run()
5✔
482

483
    def fire_and_forget(self, payload: Payload) -> Awaitable[None]:
5✔
484
        """
485
        Initiate a fire-and-forget interaction.
486
        """
487

488
        logger().debug('%s: fire-and-forget: %s', self._log_identifier(), payload)
5✔
489

490
        stream_id = self._allocate_stream()
5✔
491
        frame = to_fire_and_forget_frame(stream_id, payload, self._fragment_size_bytes)
5✔
492
        self.send_request(frame)
5✔
493
        frame.sent_future.add_done_callback(lambda _: self.finish_stream(stream_id))
5✔
494
        return frame.sent_future
5✔
495

496
    def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]:
5✔
497
        """
498
        Initiate a request-stream interaction.
499
        """
500

501
        logger().debug('%s: request-stream: %s', self._log_identifier(), payload)
5✔
502

503
        requester = RequestStreamRequester(self, payload)
5✔
504
        return self.register_new_stream(requester)
5✔
505

506
    def request_channel(
5✔
507
            self,
508
            payload: Payload,
509
            publisher: Optional[Publisher] = None,
510
            sending_done: Optional[asyncio.Event] = None) -> Union[BackpressureApi, Publisher]:
511
        """
512
        Initiate a request-channel interaction.
513
        """
514

515
        logger().debug('%s: request-channel: %s', self._log_identifier(), payload)
5✔
516

517
        requester = RequestChannelRequester(self, payload, publisher, sending_done)
5✔
518
        return self.register_new_stream(requester)
5✔
519

520
    def metadata_push(self, metadata: bytes) -> Awaitable[None]:
5✔
521
        """
522
        Initiate a metadata-push interaction.
523
        """
524

525
        logger().debug('%s: metadata-push: %s', self._log_identifier(), metadata)
5✔
526

527
        frame = to_metadata_push_frame(metadata)
5✔
528
        self.send_frame(frame)
5✔
529
        return frame.sent_future
5✔
530

531
    def _is_frame_allowed_to_send(self, frame: Frame) -> bool:
5✔
532
        if isinstance(frame, initiate_request_frame_types):
5!
533
            return self._requester_lease.is_request_allowed(frame.stream_id)
5✔
534

535
        return True
×
536

537
    def _create_setup_frame(self,
5✔
538
                            data_encoding: bytes,
539
                            metadata_encoding: bytes,
540
                            payload: Optional[Payload] = None) -> SetupFrame:
541
        return to_setup_frame(payload,
5✔
542
                              data_encoding,
543
                              metadata_encoding,
544
                              self._keep_alive_period,
545
                              self._max_lifetime_period,
546
                              self._honor_lease)
547

548
    @abc.abstractmethod
549
    def _log_identifier(self) -> str:
550
        ...
551

552
    def _assert_valid_fragment_size(self, fragment_size_bytes: Optional[int]):
5✔
553
        if fragment_size_bytes is not None and fragment_size_bytes < MINIMUM_FRAGMENT_SIZE_BYTES:
5!
554
            raise RSocketError("Invalid fragment size specified. bytes: %s" % fragment_size_bytes)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc