• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 17489116321

05 Sep 2025 09:18AM UTC coverage: 86.517% (-0.2%) from 86.697%
17489116321

Pull #329

github

web-flow
Merge a170a7eae into 1ce9cdb0d
Pull Request #329: Bump flake8 from 7.2.0 to 7.3.0

850 of 1049 branches covered (81.03%)

Branch coverage included in aggregate %.

3828 of 4358 relevant lines covered (87.84%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.93
/rsocket/rsocket_base.py
1
import abc
1✔
2
import asyncio
1✔
3
from asyncio import Task
1✔
4
from contextlib import asynccontextmanager
1✔
5
from datetime import timedelta
1✔
6
from typing import Union, Optional, Dict, Any, Coroutine, Callable, Type, cast, TypeVar
1✔
7

8
from reactivestreams.publisher import Publisher
1✔
9
from reactivestreams.subscriber import DefaultSubscriber
1✔
10
from rsocket.error_codes import ErrorCode
1✔
11
from rsocket.exceptions import RSocketProtocolError, RSocketTransportError, RSocketError
1✔
12
from rsocket.extensions.mimetypes import WellKnownMimeTypes, ensure_encoding_name
1✔
13
from rsocket.frame import (KeepAliveFrame,
1✔
14
                           MetadataPushFrame, RequestFireAndForgetFrame,
15
                           RequestResponseFrame, RequestStreamFrame, Frame,
16
                           exception_to_error_frame,
17
                           LeaseFrame, ErrorFrame, RequestFrame,
18
                           initiate_request_frame_types, InvalidFrame,
19
                           FragmentableFrame, FrameFragmentMixin, MINIMUM_FRAGMENT_SIZE_BYTES)
20
from rsocket.frame import (RequestChannelFrame, ResumeFrame,
1✔
21
                           is_fragmentable_frame, CONNECTION_STREAM_ID)
22
from rsocket.frame import SetupFrame
1✔
23
from rsocket.frame_builders import to_payload_frame, to_fire_and_forget_frame, to_setup_frame, to_metadata_push_frame, \
1✔
24
    to_keepalive_frame
25
from rsocket.frame_fragment_cache import FrameFragmentCache
1✔
26
from rsocket.frame_logger import log_frame
1✔
27
from rsocket.handlers.request_cahnnel_responder import RequestChannelResponder
1✔
28
from rsocket.handlers.request_channel_requester import RequestChannelRequester
1✔
29
from rsocket.handlers.request_response_requester import RequestResponseRequester
1✔
30
from rsocket.handlers.request_response_responder import RequestResponseResponder
1✔
31
from rsocket.handlers.request_stream_requester import RequestStreamRequester
1✔
32
from rsocket.handlers.request_stream_responder import RequestStreamResponder
1✔
33
from rsocket.helpers import payload_from_frame, async_noop, cancel_if_task_exists
1✔
34
from rsocket.lease import DefinedLease, NullLease, Lease
1✔
35
from rsocket.local_typing import Awaitable
1✔
36
from rsocket.logger import logger
1✔
37
from rsocket.payload import Payload
1✔
38
from rsocket.queue_peekable import QueuePeekable
1✔
39
from rsocket.request_handler import BaseRequestHandler, RequestHandler
1✔
40
from rsocket.rsocket import RSocket
1✔
41
from rsocket.rsocket_internal import RSocketInternal
1✔
42
from rsocket.stream_control import StreamControl
1✔
43
from rsocket.streams.backpressureapi import BackpressureApi
1✔
44
from rsocket.streams.stream_handler import StreamHandler
1✔
45
from rsocket.transports.transport import Transport
1✔
46

47
T = TypeVar('T')
1✔
48

49

50
class RSocketBase(RSocket, RSocketInternal):
1✔
51
    class LeaseSubscriber(DefaultSubscriber):
1✔
52
        def __init__(self, socket: 'RSocketBase'):
1✔
53
            super().__init__()
1✔
54
            self._socket = socket
1✔
55

56
        def on_next(self, value, is_complete=False):
1✔
57
            self._socket.send_lease(value)
1✔
58

59
    def __init__(self,
1✔
60
                 handler_factory: Callable[[], RequestHandler] = BaseRequestHandler,
61
                 honor_lease=False,
62
                 lease_publisher: Optional[Publisher] = None,
63
                 request_queue_size: int = 0,
64
                 data_encoding: Union[str, bytes, WellKnownMimeTypes] = WellKnownMimeTypes.APPLICATION_JSON,
65
                 metadata_encoding: Union[str, bytes, WellKnownMimeTypes] = WellKnownMimeTypes.APPLICATION_JSON,
66
                 keep_alive_period: timedelta = timedelta(milliseconds=500),
67
                 max_lifetime_period: timedelta = timedelta(minutes=10),
68
                 setup_payload: Optional[Payload] = None,
69
                 fragment_size_bytes: Optional[int] = None
70
                 ):
71

72
        self._assert_valid_fragment_size(fragment_size_bytes)
1✔
73

74
        self._handler_factory = handler_factory
1✔
75
        self._request_queue_size = request_queue_size
1✔
76
        self._honor_lease = honor_lease
1✔
77
        self._max_lifetime_period = max_lifetime_period
1✔
78
        self._keep_alive_period = keep_alive_period
1✔
79
        self._setup_payload = setup_payload
1✔
80
        self._data_encoding = ensure_encoding_name(data_encoding)
1✔
81
        self._metadata_encoding = ensure_encoding_name(metadata_encoding)
1✔
82
        self._lease_publisher = lease_publisher
1✔
83
        self._sender_task = None
1✔
84
        self._receiver_task = None
1✔
85
        self._handler = self._handler_factory()
1✔
86
        self._responder_lease = None
1✔
87
        self._requester_lease = None
1✔
88
        self._is_closing = False
1✔
89
        self._connecting = True
1✔
90
        self._fragment_size_bytes = fragment_size_bytes
1✔
91

92
        self._setup_internals()
1✔
93

94
    def get_fragment_size_bytes(self) -> Optional[int]:
1✔
95
        return self._fragment_size_bytes
1✔
96

97
    def _setup_internals(self):
1✔
98
        pass
99

100
    @abc.abstractmethod
101
    def _current_transport(self) -> Awaitable[Transport]:
102
        ...
103

104
    def _reset_internals(self):
1✔
105
        self._frame_fragment_cache = FrameFragmentCache()
1✔
106
        self._send_queue = QueuePeekable()
1✔
107
        self._request_queue = asyncio.Queue(self._request_queue_size)
1✔
108

109
        if self._honor_lease:
1✔
110
            self._requester_lease = DefinedLease(maximum_request_count=0)
1✔
111
        else:
112
            self._requester_lease = NullLease()
1✔
113

114
        self._responder_lease = NullLease()
1✔
115
        self._stream_control = StreamControl(self._get_first_stream_id())
1✔
116
        self._is_closing = False
1✔
117

118
    def stop_all_streams(self, error_code=ErrorCode.CONNECTION_ERROR, data=b''):
1✔
119
        self._stream_control.stop_all_streams(error_code, data)
1✔
120

121
    def _start_tasks(self):
1✔
122
        self._receiver_task = self._start_task_if_not_closing(self._receiver)
1✔
123
        self._sender_task = self._start_task_if_not_closing(self._sender)
1✔
124

125
    async def connect(self):
1✔
126
        self.send_priority_frame(self._create_setup_frame(self._data_encoding,
1✔
127
                                                          self._metadata_encoding,
128
                                                          self._setup_payload))
129

130
        if self._honor_lease:
1✔
131
            self._subscribe_to_lease_publisher()
1✔
132

133
        return self
1✔
134

135
    def _start_task_if_not_closing(self, task_factory: Callable[[], Coroutine]) -> Optional[Task]:
1✔
136
        if not self._is_closing:
1!
137
            return asyncio.create_task(task_factory())
1✔
138

139
    def set_handler_using_factory(self, handler_factory) -> RequestHandler:
1✔
140
        self._handler = handler_factory()
1✔
141
        return self._handler
1✔
142

143
    def _allocate_stream(self) -> int:
1✔
144
        return self._stream_control.allocate_stream()
1✔
145

146
    @abc.abstractmethod
147
    def _get_first_stream_id(self) -> int:
148
        ...
149

150
    def finish_stream(self, stream_id: int):
1✔
151
        self._stream_control.finish_stream(stream_id)
1✔
152
        self._frame_fragment_cache.remove(stream_id)
1✔
153

154
    def send_request(self, frame: RequestFrame):
1✔
155
        if self._honor_lease and not self._is_frame_allowed_to_send(frame):
1✔
156
            self._queue_request_frame(frame)
1✔
157
        else:
158
            self.send_frame(frame)
1✔
159

160
    def _queue_request_frame(self, frame: RequestFrame):
1✔
161
        logger().debug('%s: lease not allowing to send request. queueing', self._log_identifier())
1✔
162

163
        self._request_queue.put_nowait(frame)
1✔
164

165
    def send_priority_frame(self, frame: Frame):
1✔
166
        items = []
1✔
167
        while not self._send_queue.empty():
1✔
168
            items.append(self._send_queue.get_nowait())
1✔
169

170
        self._send_queue.put_nowait(frame)
1✔
171
        for item in items:
1✔
172
            self._send_queue.put_nowait(item)
1✔
173

174
    def send_frame(self, frame: Frame):
1✔
175
        self._send_queue.put_nowait(frame)
1✔
176

177
    def send_complete(self, stream_id: int):
1✔
178
        self.send_payload(stream_id, Payload(), complete=True, is_next=False)
1✔
179

180
    def send_error(self, stream_id: int, exception: Exception):
1✔
181
        self.send_frame(exception_to_error_frame(stream_id, exception))
1✔
182

183
    def send_payload(self, stream_id: int, payload: Payload, complete=False, is_next=True):
1✔
184
        self.send_frame(to_payload_frame(stream_id, payload, complete, is_next=is_next,
1✔
185
                                         fragment_size_bytes=self.get_fragment_size_bytes()))
186

187
    def _update_last_keepalive(self):
1✔
188
        pass
189

190
    def register_new_stream(self, handler: T) -> T:
1✔
191
        stream_id = self._allocate_stream()
1✔
192
        self._register_stream(stream_id, handler)
1✔
193
        return handler
1✔
194

195
    def _register_stream(self, stream_id: int, handler: StreamHandler):
1✔
196
        handler.stream_id = stream_id
1✔
197
        self._stream_control.register_stream(stream_id, handler)
1✔
198
        return handler
1✔
199

200
    async def handle_error(self, frame: ErrorFrame):
1✔
201
        await self._handler.on_error(frame.error_code, payload_from_frame(frame))
1✔
202

203
    async def handle_keep_alive(self, frame: KeepAliveFrame):
1✔
204
        self._update_last_keepalive()
1✔
205

206
        if frame.flags_respond:
1✔
207
            frame.flags_respond = False
1✔
208
            self.send_frame(frame)
1✔
209

210
    async def handle_request_response(self, frame: RequestResponseFrame):
1✔
211
        stream_id = frame.stream_id
1✔
212
        self._stream_control.assert_stream_id_available(stream_id)
1✔
213
        handler = self._handler
1✔
214

215
        response_future = await handler.request_response(payload_from_frame(frame))
1✔
216

217
        self._register_stream(stream_id, RequestResponseResponder(self, response_future)).setup()
1✔
218

219
    async def handle_request_stream(self, frame: RequestStreamFrame):
1✔
220
        stream_id = frame.stream_id
1✔
221
        self._stream_control.assert_stream_id_available(stream_id)
1✔
222
        handler = self._handler
1✔
223

224
        publisher = await handler.request_stream(payload_from_frame(frame))
1✔
225

226
        request_responder = RequestStreamResponder(self, publisher)
1✔
227
        self._register_stream(stream_id, request_responder)
1✔
228
        request_responder.frame_received(frame)
1✔
229

230
    async def handle_setup(self, frame: SetupFrame):
1✔
231
        if frame.flags_resume:
1✔
232
            raise RSocketProtocolError(ErrorCode.UNSUPPORTED_SETUP, data='Resume not supported')
1✔
233

234
        if frame.flags_lease:
1✔
235
            if self._lease_publisher is None:
1✔
236
                raise RSocketProtocolError(ErrorCode.UNSUPPORTED_SETUP, data='Lease not available')
1✔
237
            else:
238
                self._subscribe_to_lease_publisher()
1✔
239

240
        handler = self._handler
1✔
241

242
        try:
1✔
243
            await handler.on_setup(frame.data_encoding,
1✔
244
                                   frame.metadata_encoding,
245
                                   payload_from_frame(frame))
246
        except Exception as exception:
1✔
247
            logger().error('%s: Setup error', self._log_identifier(), exc_info=True)
1✔
248
            raise RSocketProtocolError(ErrorCode.REJECTED_SETUP, data=str(exception)) from exception
1✔
249

250
    def _subscribe_to_lease_publisher(self):
1✔
251
        if self._lease_publisher is not None:
1✔
252
            self._lease_publisher.subscribe(self.LeaseSubscriber(self))
1✔
253

254
    def send_lease(self, lease: Lease):
1✔
255
        try:
1✔
256
            self._responder_lease = lease
1✔
257

258
            self.send_frame(self._responder_lease.to_frame())
1✔
259
        except Exception as exception:
×
260
            self.send_error(CONNECTION_STREAM_ID, exception)
×
261

262
    async def handle_fire_and_forget(self, frame: RequestFireAndForgetFrame):
1✔
263
        self._stream_control.assert_stream_id_available(frame.stream_id)
1✔
264

265
        await self._handler.request_fire_and_forget(payload_from_frame(frame))
1✔
266

267
    async def handle_metadata_push(self, frame: MetadataPushFrame):
1✔
268
        await self._handler.on_metadata_push(Payload(None, frame.metadata))
1✔
269

270
    async def handle_request_channel(self, frame: RequestChannelFrame):
1✔
271
        stream_id = frame.stream_id
1✔
272
        self._stream_control.assert_stream_id_available(stream_id)
1✔
273
        handler = self._handler
1✔
274

275
        publisher, subscriber = await handler.request_channel(payload_from_frame(frame))
1✔
276

277
        channel_responder = RequestChannelResponder(self, publisher)
1✔
278
        self._register_stream(stream_id, channel_responder)
1✔
279
        channel_responder.subscribe(subscriber)
1✔
280
        channel_responder.frame_received(frame)
1✔
281

282
    async def handle_resume(self, frame: ResumeFrame):
1✔
283
        raise RSocketProtocolError(ErrorCode.REJECTED_RESUME, data='Resume not supported')
1✔
284

285
    async def handle_lease(self, frame: LeaseFrame):
1✔
286
        self._requester_lease = DefinedLease(
1✔
287
            frame.number_of_requests,
288
            timedelta(milliseconds=frame.time_to_live)
289
        )
290

291
        while not self._request_queue.empty() and self._requester_lease.is_request_allowed():
1✔
292
            self.send_frame(self._request_queue.get_nowait())
1✔
293
            self._request_queue.task_done()
1✔
294

295
    async def _receiver(self):
1✔
296
        try:
1✔
297
            await self._receiver_listen()
1✔
298
        except asyncio.CancelledError:
1✔
299
            logger().debug('%s: Asyncio task canceled: receiver', self._log_identifier())
1✔
300
        except RSocketTransportError:
1✔
301
            pass
302
        except Exception:
1✔
303
            logger().error('%s: Unknown error', self._log_identifier(), exc_info=True)
1✔
304
            raise
1✔
305

306
        await self._on_connection_closed()
1✔
307

308
    async def _on_connection_error(self, exception: Exception):
1✔
309
        logger().warning(str(exception))
1✔
310
        logger().debug(str(exception), exc_info=exception)
1✔
311
        await self._handler.on_connection_error(self, exception)
1✔
312

313
    async def _on_connection_closed(self):
1✔
314
        self.stop_all_streams()
1✔
315
        await self._handler.on_close(self)
1✔
316
        await self._stop_tasks()
1✔
317

318
    @abc.abstractmethod
319
    def is_server_alive(self) -> bool:
320
        ...
321

322
    async def _receiver_listen(self):
1✔
323
        async_frame_handler_by_type: Dict[Type[Frame], Any] = {
1✔
324
            RequestResponseFrame: self.handle_request_response,
325
            RequestStreamFrame: self.handle_request_stream,
326
            RequestChannelFrame: self.handle_request_channel,
327
            SetupFrame: self.handle_setup,
328
            RequestFireAndForgetFrame: self.handle_fire_and_forget,
329
            MetadataPushFrame: self.handle_metadata_push,
330
            ResumeFrame: self.handle_resume,
331
            LeaseFrame: self.handle_lease,
332
            KeepAliveFrame: self.handle_keep_alive,
333
            ErrorFrame: self.handle_error
334
        }
335

336
        transport = await self._current_transport()
1✔
337
        while self.is_server_alive():
1✔
338
            next_frame_generator = await transport.next_frame_generator()
1✔
339
            if next_frame_generator is None:
1✔
340
                break
1✔
341
            async for frame in next_frame_generator:
1✔
342
                try:
1✔
343
                    await self._handle_next_frame(frame, async_frame_handler_by_type)
1✔
344
                except RSocketProtocolError as exception:
1✔
345
                    logger().error('%s: Protocol error %s', self._log_identifier(), str(exception))
1✔
346
                    self.send_error(frame.stream_id, exception)
1✔
347
                except RSocketTransportError:
1!
348
                    raise
×
349
                except Exception as exception:
1✔
350
                    logger().error('%s: Unknown error', self._log_identifier(), exc_info=True)
1✔
351
                    self.send_error(frame.stream_id, exception)
1✔
352

353
    async def _handle_next_frame(self, frame: Frame, async_frame_handler_by_type):
1✔
354

355
        log_frame(frame, self._log_identifier())
1✔
356

357
        if isinstance(frame, InvalidFrame):
1✔
358
            return
1✔
359

360
        if is_fragmentable_frame(frame):
1✔
361
            complete_frame = self._frame_fragment_cache.append(cast(FragmentableFrame, frame))
1✔
362
            if complete_frame is None:
1✔
363
                return
1✔
364
        else:
365
            complete_frame = frame
1✔
366

367
        if (complete_frame.stream_id == CONNECTION_STREAM_ID or
1✔
368
                isinstance(complete_frame, initiate_request_frame_types)):
369
            await self._handle_frame_by_type(complete_frame, async_frame_handler_by_type)
1✔
370
        elif self._stream_control.handle_stream(complete_frame):
1✔
371
            return
1✔
372
        else:
373
            logger().warning('%s: Dropping frame from unknown stream %d', self._log_identifier(),
1✔
374
                             complete_frame.stream_id)
375

376
    async def _handle_frame_by_type(self, frame: Frame, async_frame_handler_by_type):
1✔
377
        frame_handler = async_frame_handler_by_type.get(type(frame), async_noop)
1✔
378
        await frame_handler(frame)
1✔
379

380
    def _send_new_keepalive(self, data: bytes = b''):
1✔
381
        self.send_frame(to_keepalive_frame(data))
1✔
382

383
    def _before_sender(self):
1✔
384
        pass
385

386
    async def _finally_sender(self):
1✔
387
        pass
388

389
    @asynccontextmanager
1✔
390
    async def _get_next_frame_to_send(self, transport: Transport) -> Frame:
1✔
391
        next_frame_source = await self._send_queue.peek()
1✔
392

393
        if isinstance(next_frame_source, FrameFragmentMixin):
1✔
394
            next_fragment = next_frame_source.get_next_fragment(transport.requires_length_header())
1✔
395

396
            if next_fragment.flags_follows:
1✔
397
                self._send_queue.put_nowait(self._send_queue.get_nowait())  # cycle to next frame source in queue
1✔
398
            else:
399
                next_frame_source.get_next_fragment(
1✔
400
                    transport.requires_length_header())  # workaround to clean-up generator.
401
                self._send_queue.get_nowait()
1✔
402

403
            yield next_fragment
1✔
404

405
        else:
406
            self._send_queue.get_nowait()
1✔
407
            yield next_frame_source
1✔
408

409
    async def _sender(self):
1✔
410
        try:
1✔
411
            try:
1✔
412
                transport = await self._current_transport()
1✔
413

414
                self._before_sender()
1✔
415
                while self.is_server_alive():
1✔
416
                    async with self._get_next_frame_to_send(transport) as frame:
1✔
417
                        await transport.send_frame(frame)
1✔
418
                        log_frame(frame, self._log_identifier(), 'Sent')
1✔
419

420
                        if frame.sent_future is not None:
1✔
421
                            frame.sent_future.set_result(None)
1✔
422

423
                    if self._send_queue.empty():
1✔
424
                        await transport.on_send_queue_empty()
1✔
425
            except RSocketTransportError:
1✔
426
                logger().error('Error', exc_info=True)
×
427
                pass
428

429
        except asyncio.CancelledError:
1✔
430
            logger().debug('%s: Asyncio task canceled: sender', self._log_identifier())
1✔
431
        except Exception:
1✔
432
            logger().error('%s: RSocket error', self._log_identifier(), exc_info=True)
×
433
            raise
×
434
        finally:
435
            await self._finally_sender()
1✔
436

437
    async def close(self):
1✔
438
        logger().debug('%s: Closing', self._log_identifier())
1✔
439

440
        await self._stop_tasks()
1✔
441

442
        await self._close_transport()
1✔
443

444
    async def _stop_tasks(self):
1✔
445
        logger().debug('%s: Cleanup', self._log_identifier())
1✔
446

447
        self._is_closing = True
1✔
448
        await cancel_if_task_exists(self._sender_task)
1✔
449
        self._sender_task = None
1✔
450
        await cancel_if_task_exists(self._receiver_task)
1✔
451
        self._receiver_task = None
1✔
452

453
    async def _close_transport(self):
1✔
454
        if self._current_transport().done():
1!
455
            logger().debug('%s: Closing transport', self._log_identifier())
1✔
456
            try:
1✔
457
                transport = await self._current_transport()
1✔
458
            except asyncio.CancelledError:
1✔
459
                raise RSocketTransportError()
1✔
460

461
            if transport is not None:
1!
462
                try:
1✔
463
                    await transport.close()
1✔
464
                except Exception:
×
465
                    logger().debug('Transport already closed or failed to close', exc_info=True)
×
466

467
    async def __aenter__(self) -> 'RSocketBase':
1✔
468
        return self
×
469

470
    async def __aexit__(self, exc_type, exc_val, exc_tb):
1✔
471
        await self.close()
1✔
472

473
    def request_response(self, payload: Payload) -> Awaitable[Payload]:
1✔
474
        """
475
        Initiate a request-response interaction.
476
        """
477

478
        logger().debug('%s: request-response: %s', self._log_identifier(), payload)
1✔
479

480
        requester = RequestResponseRequester(self, payload)
1✔
481
        self.register_new_stream(requester).setup()
1✔
482
        return requester.run()
1✔
483

484
    def fire_and_forget(self, payload: Payload) -> Awaitable[None]:
1✔
485
        """
486
        Initiate a fire-and-forget interaction.
487
        """
488

489
        logger().debug('%s: fire-and-forget: %s', self._log_identifier(), payload)
1✔
490

491
        stream_id = self._allocate_stream()
1✔
492
        frame = to_fire_and_forget_frame(stream_id, payload, self._fragment_size_bytes)
1✔
493
        self.send_request(frame)
1✔
494
        frame.sent_future.add_done_callback(lambda _: self.finish_stream(stream_id))
1✔
495
        return frame.sent_future
1✔
496

497
    def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]:
1✔
498
        """
499
        Initiate a request-stream interaction.
500
        """
501

502
        logger().debug('%s: request-stream: %s', self._log_identifier(), payload)
1✔
503

504
        requester = RequestStreamRequester(self, payload)
1✔
505
        return self.register_new_stream(requester)
1✔
506

507
    def request_channel(
1✔
508
            self,
509
            payload: Payload,
510
            publisher: Optional[Publisher] = None,
511
            sending_done: Optional[asyncio.Event] = None) -> Union[BackpressureApi, Publisher]:
512
        """
513
        Initiate a request-channel interaction.
514
        """
515

516
        logger().debug('%s: request-channel: %s', self._log_identifier(), payload)
1✔
517

518
        requester = RequestChannelRequester(self, payload, publisher, sending_done)
1✔
519
        return self.register_new_stream(requester)
1✔
520

521
    def metadata_push(self, metadata: bytes) -> Awaitable[None]:
1✔
522
        """
523
        Initiate a metadata-push interaction.
524
        """
525

526
        logger().debug('%s: metadata-push: %s', self._log_identifier(), metadata)
1✔
527

528
        frame = to_metadata_push_frame(metadata)
1✔
529
        self.send_frame(frame)
1✔
530
        return frame.sent_future
1✔
531

532
    def _is_frame_allowed_to_send(self, frame: Frame) -> bool:
1✔
533
        if isinstance(frame, initiate_request_frame_types):
1!
534
            return self._requester_lease.is_request_allowed(frame.stream_id)
1✔
535

536
        return True
×
537

538
    def _create_setup_frame(self,
1✔
539
                            data_encoding: bytes,
540
                            metadata_encoding: bytes,
541
                            payload: Optional[Payload] = None) -> SetupFrame:
542
        return to_setup_frame(payload,
1✔
543
                              data_encoding,
544
                              metadata_encoding,
545
                              self._keep_alive_period,
546
                              self._max_lifetime_period,
547
                              self._honor_lease)
548

549
    @abc.abstractmethod
550
    def _log_identifier(self) -> str:
551
        ...
552

553
    def _assert_valid_fragment_size(self, fragment_size_bytes: Optional[int]):
1✔
554
        if fragment_size_bytes is not None and fragment_size_bytes < MINIMUM_FRAGMENT_SIZE_BYTES:
1!
555
            raise RSocketError("Invalid fragment size specified. bytes: %s" % fragment_size_bytes)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc