• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 5619093514

pending completion
5619093514

push

github-actions

jell-o-fishi
Merge remote-tracking branch 'origin/master'

823 of 963 branches covered (85.46%)

Branch coverage included in aggregate %.

3793 of 4074 relevant lines covered (93.1%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.96
/rsocket/rsocket_base.py
1
import abc
1✔
2
import asyncio
1✔
3
from asyncio import Task
1✔
4
from contextlib import asynccontextmanager
1✔
5
from datetime import timedelta
1✔
6
from typing import Union, Optional, Dict, Any, Coroutine, Callable, Type, cast, TypeVar
1✔
7

8
from reactivestreams.publisher import Publisher
1✔
9
from reactivestreams.subscriber import DefaultSubscriber
1✔
10
from rsocket.error_codes import ErrorCode
1✔
11
from rsocket.exceptions import RSocketProtocolError, RSocketTransportError, RSocketError
1✔
12
from rsocket.extensions.mimetypes import WellKnownMimeTypes, ensure_encoding_name
1✔
13
from rsocket.frame import (KeepAliveFrame,
1✔
14
                           MetadataPushFrame, RequestFireAndForgetFrame,
15
                           RequestResponseFrame, RequestStreamFrame, Frame,
16
                           exception_to_error_frame,
17
                           LeaseFrame, ErrorFrame, RequestFrame,
18
                           initiate_request_frame_types, InvalidFrame,
19
                           FragmentableFrame, FrameFragmentMixin, MINIMUM_FRAGMENT_SIZE_BYTES)
20
from rsocket.frame import (RequestChannelFrame, ResumeFrame,
1✔
21
                           is_fragmentable_frame, CONNECTION_STREAM_ID)
22
from rsocket.frame import SetupFrame
1✔
23
from rsocket.frame_builders import to_payload_frame, to_fire_and_forget_frame, to_setup_frame, to_metadata_push_frame, \
1✔
24
    to_keepalive_frame
25
from rsocket.frame_fragment_cache import FrameFragmentCache
1✔
26
from rsocket.frame_logger import log_frame
1✔
27
from rsocket.handlers.request_cahnnel_responder import RequestChannelResponder
1✔
28
from rsocket.handlers.request_channel_requester import RequestChannelRequester
1✔
29
from rsocket.handlers.request_response_requester import RequestResponseRequester
1✔
30
from rsocket.handlers.request_response_responder import RequestResponseResponder
1✔
31
from rsocket.handlers.request_stream_requester import RequestStreamRequester
1✔
32
from rsocket.handlers.request_stream_responder import RequestStreamResponder
1✔
33
from rsocket.helpers import payload_from_frame, async_noop, cancel_if_task_exists
1✔
34
from rsocket.lease import DefinedLease, NullLease, Lease
1✔
35
from rsocket.local_typing import Awaitable
1✔
36
from rsocket.logger import logger
1✔
37
from rsocket.payload import Payload
1✔
38
from rsocket.queue_peekable import QueuePeekable
1✔
39
from rsocket.request_handler import BaseRequestHandler, RequestHandler
1✔
40
from rsocket.rsocket import RSocket
1✔
41
from rsocket.rsocket_internal import RSocketInternal
1✔
42
from rsocket.stream_control import StreamControl
1✔
43
from rsocket.streams.backpressureapi import BackpressureApi
1✔
44
from rsocket.streams.stream_handler import StreamHandler
1✔
45
from rsocket.transports.transport import Transport
1✔
46

47
T = TypeVar('T')
1✔
48

49

50
class RSocketBase(RSocket, RSocketInternal):
1✔
51
    class LeaseSubscriber(DefaultSubscriber):
1✔
52
        def __init__(self, socket: 'RSocketBase'):
1✔
53
            super().__init__()
1✔
54
            self._socket = socket
1✔
55

56
        def on_next(self, value, is_complete=False):
1✔
57
            self._socket.send_lease(value)
1✔
58

59
    def __init__(self,
1✔
60
                 handler_factory: Callable[[], RequestHandler] = BaseRequestHandler,
61
                 honor_lease=False,
62
                 lease_publisher: Optional[Publisher] = None,
63
                 request_queue_size: int = 0,
64
                 data_encoding: Union[str, bytes, WellKnownMimeTypes] = WellKnownMimeTypes.APPLICATION_JSON,
65
                 metadata_encoding: Union[str, bytes, WellKnownMimeTypes] = WellKnownMimeTypes.APPLICATION_JSON,
66
                 keep_alive_period: timedelta = timedelta(milliseconds=500),
67
                 max_lifetime_period: timedelta = timedelta(minutes=10),
68
                 setup_payload: Optional[Payload] = None,
69
                 fragment_size_bytes: Optional[int] = None
70
                 ):
71

72
        self._assert_valid_fragment_size(fragment_size_bytes)
1✔
73

74
        self._handler_factory = handler_factory
1✔
75
        self._request_queue_size = request_queue_size
1✔
76
        self._honor_lease = honor_lease
1✔
77
        self._max_lifetime_period = max_lifetime_period
1✔
78
        self._keep_alive_period = keep_alive_period
1✔
79
        self._setup_payload = setup_payload
1✔
80
        self._data_encoding = ensure_encoding_name(data_encoding)
1✔
81
        self._metadata_encoding = ensure_encoding_name(metadata_encoding)
1✔
82
        self._lease_publisher = lease_publisher
1✔
83
        self._sender_task = None
1✔
84
        self._receiver_task = None
1✔
85
        self._handler = self._handler_factory()
1✔
86
        self._responder_lease = None
1✔
87
        self._requester_lease = None
1✔
88
        self._is_closing = False
1✔
89
        self._connecting = True
1✔
90
        self._fragment_size_bytes = fragment_size_bytes
1✔
91

92
        self._setup_internals()
1✔
93

94
    def get_fragment_size_bytes(self) -> Optional[int]:
1✔
95
        return self._fragment_size_bytes
1✔
96

97
    def _setup_internals(self):
1✔
98
        pass
99

100
    @abc.abstractmethod
101
    def _current_transport(self) -> Awaitable[Transport]:
102
        ...
103

104
    def _reset_internals(self):
1✔
105
        self._frame_fragment_cache = FrameFragmentCache()
1✔
106
        self._send_queue = QueuePeekable()
1✔
107
        self._request_queue = asyncio.Queue(self._request_queue_size)
1✔
108

109
        if self._honor_lease:
1✔
110
            self._requester_lease = DefinedLease(maximum_request_count=0)
1✔
111
        else:
112
            self._requester_lease = NullLease()
1✔
113

114
        self._responder_lease = NullLease()
1✔
115
        self._stream_control = StreamControl(self._get_first_stream_id())
1✔
116
        self._is_closing = False
1✔
117

118
    def stop_all_streams(self, error_code=ErrorCode.CONNECTION_ERROR, data=b''):
1✔
119
        self._stream_control.stop_all_streams(error_code, data)
1✔
120

121
    def _start_tasks(self):
1✔
122
        self._receiver_task = self._start_task_if_not_closing(self._receiver)
1✔
123
        self._sender_task = self._start_task_if_not_closing(self._sender)
1✔
124

125
    async def connect(self):
1✔
126
        self.send_priority_frame(self._create_setup_frame(self._data_encoding,
1✔
127
                                                          self._metadata_encoding,
128
                                                          self._setup_payload))
129

130
        if self._honor_lease:
1✔
131
            self._subscribe_to_lease_publisher()
1✔
132

133
        return self
1✔
134

135
    def _start_task_if_not_closing(self, task_factory: Callable[[], Coroutine]) -> Optional[Task]:
1✔
136
        if not self._is_closing:
1!
137
            return asyncio.create_task(task_factory())
1✔
138

139
    def set_handler_using_factory(self, handler_factory) -> RequestHandler:
1✔
140
        self._handler = handler_factory()
1✔
141
        return self._handler
1✔
142

143
    def _allocate_stream(self) -> int:
1✔
144
        return self._stream_control.allocate_stream()
1✔
145

146
    @abc.abstractmethod
147
    def _get_first_stream_id(self) -> int:
148
        ...
149

150
    def finish_stream(self, stream_id: int):
1✔
151
        self._stream_control.finish_stream(stream_id)
1✔
152

153
    def send_request(self, frame: RequestFrame):
1✔
154
        if self._honor_lease and not self._is_frame_allowed_to_send(frame):
1✔
155
            self._queue_request_frame(frame)
1✔
156
        else:
157
            self.send_frame(frame)
1✔
158

159
    def _queue_request_frame(self, frame: RequestFrame):
1✔
160
        logger().debug('%s: lease not allowing to send request. queueing', self._log_identifier())
1✔
161

162
        self._request_queue.put_nowait(frame)
1✔
163

164
    def send_priority_frame(self, frame: Frame):
1✔
165
        items = []
1✔
166
        while not self._send_queue.empty():
1✔
167
            items.append(self._send_queue.get_nowait())
1✔
168

169
        self._send_queue.put_nowait(frame)
1✔
170
        for item in items:
1✔
171
            self._send_queue.put_nowait(item)
1✔
172

173
    def send_frame(self, frame: Frame):
1✔
174
        self._send_queue.put_nowait(frame)
1✔
175

176
    def send_complete(self, stream_id: int):
1✔
177
        self.send_payload(stream_id, Payload(), complete=True, is_next=False)
1✔
178

179
    def send_error(self, stream_id: int, exception: Exception):
1✔
180
        self.send_frame(exception_to_error_frame(stream_id, exception))
1✔
181

182
    def send_payload(self, stream_id: int, payload: Payload, complete=False, is_next=True):
1✔
183
        self.send_frame(to_payload_frame(stream_id, payload, complete, is_next=is_next,
1✔
184
                                         fragment_size_bytes=self.get_fragment_size_bytes()))
185

186
    def _update_last_keepalive(self):
1✔
187
        pass
188

189
    def register_new_stream(self, handler: T) -> T:
1✔
190
        stream_id = self._allocate_stream()
1✔
191
        self._register_stream(stream_id, handler)
1✔
192
        return handler
1✔
193

194
    def _register_stream(self, stream_id: int, handler: StreamHandler):
1✔
195
        handler.stream_id = stream_id
1✔
196
        self._stream_control.register_stream(stream_id, handler)
1✔
197
        return handler
1✔
198

199
    async def handle_error(self, frame: ErrorFrame):
1✔
200
        await self._handler.on_error(frame.error_code, payload_from_frame(frame))
1✔
201

202
    async def handle_keep_alive(self, frame: KeepAliveFrame):
1✔
203
        self._update_last_keepalive()
1✔
204

205
        if frame.flags_respond:
1✔
206
            frame.flags_respond = False
1✔
207
            self.send_frame(frame)
1✔
208

209
    async def handle_request_response(self, frame: RequestResponseFrame):
1✔
210
        stream_id = frame.stream_id
1✔
211
        self._stream_control.assert_stream_id_available(stream_id)
1✔
212
        handler = self._handler
1✔
213

214
        response_future = await handler.request_response(payload_from_frame(frame))
1✔
215

216
        self._register_stream(stream_id, RequestResponseResponder(self, response_future)).setup()
1✔
217

218
    async def handle_request_stream(self, frame: RequestStreamFrame):
1✔
219
        stream_id = frame.stream_id
1✔
220
        self._stream_control.assert_stream_id_available(stream_id)
1✔
221
        handler = self._handler
1✔
222

223
        publisher = await handler.request_stream(payload_from_frame(frame))
1✔
224

225
        request_responder = RequestStreamResponder(self, publisher)
1✔
226
        self._register_stream(stream_id, request_responder)
1✔
227
        request_responder.frame_received(frame)
1✔
228

229
    async def handle_setup(self, frame: SetupFrame):
1✔
230
        if frame.flags_resume:
1✔
231
            raise RSocketProtocolError(ErrorCode.UNSUPPORTED_SETUP, data='Resume not supported')
1✔
232

233
        if frame.flags_lease:
1✔
234
            if self._lease_publisher is None:
1✔
235
                raise RSocketProtocolError(ErrorCode.UNSUPPORTED_SETUP, data='Lease not available')
1✔
236
            else:
237
                self._subscribe_to_lease_publisher()
1✔
238

239
        handler = self._handler
1✔
240

241
        try:
1✔
242
            await handler.on_setup(frame.data_encoding,
1✔
243
                                   frame.metadata_encoding,
244
                                   payload_from_frame(frame))
245
        except Exception as exception:
1✔
246
            logger().error('%s: Setup error', self._log_identifier(), exc_info=True)
1✔
247
            raise RSocketProtocolError(ErrorCode.REJECTED_SETUP, data=str(exception)) from exception
1✔
248

249
    def _subscribe_to_lease_publisher(self):
1✔
250
        if self._lease_publisher is not None:
1✔
251
            self._lease_publisher.subscribe(self.LeaseSubscriber(self))
1✔
252

253
    def send_lease(self, lease: Lease):
1✔
254
        try:
1✔
255
            self._responder_lease = lease
1✔
256

257
            self.send_frame(self._responder_lease.to_frame())
1✔
258
        except Exception as exception:
×
259
            self.send_error(CONNECTION_STREAM_ID, exception)
×
260

261
    async def handle_fire_and_forget(self, frame: RequestFireAndForgetFrame):
1✔
262
        self._stream_control.assert_stream_id_available(frame.stream_id)
1✔
263

264
        await self._handler.request_fire_and_forget(payload_from_frame(frame))
1✔
265

266
    async def handle_metadata_push(self, frame: MetadataPushFrame):
1✔
267
        await self._handler.on_metadata_push(Payload(None, frame.metadata))
1✔
268

269
    async def handle_request_channel(self, frame: RequestChannelFrame):
1✔
270
        stream_id = frame.stream_id
1✔
271
        self._stream_control.assert_stream_id_available(stream_id)
1✔
272
        handler = self._handler
1✔
273

274
        publisher, subscriber = await handler.request_channel(payload_from_frame(frame))
1✔
275

276
        channel_responder = RequestChannelResponder(self, publisher)
1✔
277
        self._register_stream(stream_id, channel_responder)
1✔
278
        channel_responder.subscribe(subscriber)
1✔
279
        channel_responder.frame_received(frame)
1✔
280

281
    async def handle_resume(self, frame: ResumeFrame):
1✔
282
        raise RSocketProtocolError(ErrorCode.REJECTED_RESUME, data='Resume not supported')
1✔
283

284
    async def handle_lease(self, frame: LeaseFrame):
1✔
285
        self._requester_lease = DefinedLease(
1✔
286
            frame.number_of_requests,
287
            timedelta(milliseconds=frame.time_to_live)
288
        )
289

290
        while not self._request_queue.empty() and self._requester_lease.is_request_allowed():
1✔
291
            self.send_frame(self._request_queue.get_nowait())
1✔
292
            self._request_queue.task_done()
1✔
293

294
    async def _receiver(self):
1✔
295
        try:
1✔
296
            await self._receiver_listen()
1✔
297
        except asyncio.CancelledError:
1✔
298
            logger().debug('%s: Asyncio task canceled: receiver', self._log_identifier())
1✔
299
        except RSocketTransportError:
1✔
300
            pass
301
        except Exception:
×
302
            logger().error('%s: Unknown error', self._log_identifier(), exc_info=True)
×
303
            raise
×
304

305
        await self._on_connection_closed()
1✔
306

307
    async def _on_connection_error(self, exception: Exception):
1✔
308
        logger().warning(str(exception))
1✔
309
        logger().debug(str(exception), exc_info=exception)
1✔
310
        await self._handler.on_connection_error(self, exception)
1✔
311

312
    async def _on_connection_closed(self):
1✔
313
        self.stop_all_streams()
1✔
314
        await self._handler.on_close(self)
1✔
315
        await self._stop_tasks()
1✔
316

317
    @abc.abstractmethod
318
    def is_server_alive(self) -> bool:
319
        ...
320

321
    async def _receiver_listen(self):
1✔
322
        async_frame_handler_by_type: Dict[Type[Frame], Any] = {
1✔
323
            RequestResponseFrame: self.handle_request_response,
324
            RequestStreamFrame: self.handle_request_stream,
325
            RequestChannelFrame: self.handle_request_channel,
326
            SetupFrame: self.handle_setup,
327
            RequestFireAndForgetFrame: self.handle_fire_and_forget,
328
            MetadataPushFrame: self.handle_metadata_push,
329
            ResumeFrame: self.handle_resume,
330
            LeaseFrame: self.handle_lease,
331
            KeepAliveFrame: self.handle_keep_alive,
332
            ErrorFrame: self.handle_error
333
        }
334

335
        transport = await self._current_transport()
1✔
336
        while self.is_server_alive():
1✔
337
            next_frame_generator = await transport.next_frame_generator()
1✔
338
            if next_frame_generator is None:
1✔
339
                break
1✔
340
            async for frame in next_frame_generator:
1✔
341
                try:
1✔
342
                    await self._handle_next_frame(frame, async_frame_handler_by_type)
1✔
343
                except RSocketProtocolError as exception:
1✔
344
                    logger().error('%s: Protocol error %s', self._log_identifier(), str(exception))
1✔
345
                    self.send_error(frame.stream_id, exception)
1✔
346
                except RSocketTransportError:
1!
347
                    raise
×
348
                except Exception as exception:
1✔
349
                    logger().error('%s: Unknown error', self._log_identifier(), exc_info=True)
1✔
350
                    self.send_error(frame.stream_id, exception)
1✔
351

352
    async def _handle_next_frame(self, frame: Frame, async_frame_handler_by_type):
1✔
353

354
        log_frame(frame, self._log_identifier())
1✔
355

356
        if isinstance(frame, InvalidFrame):
1✔
357
            return
1✔
358

359
        if is_fragmentable_frame(frame):
1✔
360
            complete_frame = self._frame_fragment_cache.append(cast(FragmentableFrame, frame))
1✔
361
            if complete_frame is None:
1✔
362
                return
1✔
363
        else:
364
            complete_frame = frame
1✔
365

366
        if (complete_frame.stream_id == CONNECTION_STREAM_ID or
1✔
367
                isinstance(complete_frame, initiate_request_frame_types)):
368
            await self._handle_frame_by_type(complete_frame, async_frame_handler_by_type)
1✔
369
        elif self._stream_control.handle_stream(complete_frame):
1✔
370
            return
1✔
371
        else:
372
            logger().warning('%s: Dropping frame from unknown stream %d', self._log_identifier(),
1✔
373
                             complete_frame.stream_id)
374

375
    async def _handle_frame_by_type(self, frame: Frame, async_frame_handler_by_type):
1✔
376
        frame_handler = async_frame_handler_by_type.get(type(frame), async_noop)
1✔
377
        await frame_handler(frame)
1✔
378

379
    def _send_new_keepalive(self, data: bytes = b''):
1✔
380
        self.send_frame(to_keepalive_frame(data))
1✔
381

382
    def _before_sender(self):
1✔
383
        pass
384

385
    async def _finally_sender(self):
1✔
386
        pass
387

388
    @asynccontextmanager
1✔
389
    async def _get_next_frame_to_send(self, transport: Transport) -> Frame:
1✔
390
        next_frame_source = await self._send_queue.peek()
1✔
391

392
        if isinstance(next_frame_source, FrameFragmentMixin):
1✔
393
            next_fragment = next_frame_source.get_next_fragment(transport.requires_length_header())
1✔
394

395
            if next_fragment.flags_follows:
1✔
396
                self._send_queue.put_nowait(self._send_queue.get_nowait())  # cycle to next frame source in queue
1✔
397
            else:
398
                next_frame_source.get_next_fragment(
1✔
399
                    transport.requires_length_header())  # workaround to clean-up generator.
400
                self._send_queue.get_nowait()
1✔
401

402
            yield next_fragment
1✔
403

404
        else:
405
            self._send_queue.get_nowait()
1✔
406
            yield next_frame_source
1✔
407

408
    async def _sender(self):
1✔
409
        try:
1✔
410
            try:
1✔
411
                transport = await self._current_transport()
1✔
412

413
                self._before_sender()
1✔
414
                while self.is_server_alive():
1✔
415
                    async with self._get_next_frame_to_send(transport) as frame:
1✔
416
                        await transport.send_frame(frame)
1✔
417
                        log_frame(frame, self._log_identifier(), 'Sent')
1✔
418

419
                        if frame.sent_future is not None:
1✔
420
                            frame.sent_future.set_result(None)
1✔
421

422
                    if self._send_queue.empty():
1✔
423
                        await transport.on_send_queue_empty()
1✔
424
            except RSocketTransportError:
1✔
425
                logger().error('Error', exc_info=True)
1✔
426
                pass
427

428
        except asyncio.CancelledError:
1!
429
            logger().debug('%s: Asyncio task canceled: sender', self._log_identifier())
1✔
430
        except Exception:
×
431
            logger().error('%s: RSocket error', self._log_identifier(), exc_info=True)
×
432
            raise
×
433
        finally:
434
            await self._finally_sender()
1✔
435

436
    async def close(self):
1✔
437
        logger().debug('%s: Closing', self._log_identifier())
1✔
438

439
        await self._stop_tasks()
1✔
440

441
        await self._close_transport()
1✔
442

443
    async def _stop_tasks(self):
1✔
444
        logger().debug('%s: Cleanup', self._log_identifier())
1✔
445

446
        self._is_closing = True
1✔
447
        await cancel_if_task_exists(self._sender_task)
1✔
448
        self._sender_task = None
1✔
449
        await cancel_if_task_exists(self._receiver_task)
1✔
450
        self._receiver_task = None
1✔
451

452
    async def _close_transport(self):
1✔
453
        if self._current_transport().done():
1!
454
            logger().debug('%s: Closing transport', self._log_identifier())
1✔
455
            try:
1✔
456
                transport = await self._current_transport()
1✔
457
            except asyncio.CancelledError:
1✔
458
                raise RSocketTransportError()
1✔
459

460
            if transport is not None:
1!
461
                try:
1✔
462
                    await transport.close()
1✔
463
                except Exception:
×
464
                    logger().debug('Transport already closed or failed to close', exc_info=True)
×
465

466
    async def __aenter__(self) -> 'RSocketBase':
1✔
467
        return self
×
468

469
    async def __aexit__(self, exc_type, exc_val, exc_tb):
1✔
470
        await self.close()
1✔
471

472
    def request_response(self, payload: Payload) -> Awaitable[Payload]:
1✔
473
        """
474
        Initiate a request-response interaction.
475
        """
476

477
        logger().debug('%s: request-response: %s', self._log_identifier(), payload)
1✔
478

479
        requester = RequestResponseRequester(self, payload)
1✔
480
        self.register_new_stream(requester).setup()
1✔
481
        return requester.run()
1✔
482

483
    def fire_and_forget(self, payload: Payload) -> Awaitable[None]:
1✔
484
        """
485
        Initiate a fire-and-forget interaction.
486
        """
487

488
        logger().debug('%s: fire-and-forget: %s', self._log_identifier(), payload)
1✔
489

490
        stream_id = self._allocate_stream()
1✔
491
        frame = to_fire_and_forget_frame(stream_id, payload, self._fragment_size_bytes)
1✔
492
        self.send_request(frame)
1✔
493
        frame.sent_future.add_done_callback(lambda _: self.finish_stream(stream_id))
1✔
494
        return frame.sent_future
1✔
495

496
    def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]:
1✔
497
        """
498
        Initiate a request-stream interaction.
499
        """
500

501
        logger().debug('%s: request-stream: %s', self._log_identifier(), payload)
1✔
502

503
        requester = RequestStreamRequester(self, payload)
1✔
504
        return self.register_new_stream(requester)
1✔
505

506
    def request_channel(
1✔
507
            self,
508
            payload: Payload,
509
            publisher: Optional[Publisher] = None,
510
            sending_done: Optional[asyncio.Event] = None) -> Union[BackpressureApi, Publisher]:
511
        """
512
        Initiate a request-channel interaction.
513
        """
514

515
        logger().debug('%s: request-channel: %s', self._log_identifier(), payload)
1✔
516

517
        requester = RequestChannelRequester(self, payload, publisher, sending_done)
1✔
518
        return self.register_new_stream(requester)
1✔
519

520
    def metadata_push(self, metadata: bytes) -> Awaitable[None]:
1✔
521
        """
522
        Initiate a metadata-push interaction.
523
        """
524

525
        logger().debug('%s: metadata-push: %s', self._log_identifier(), metadata)
1✔
526

527
        frame = to_metadata_push_frame(metadata)
1✔
528
        self.send_frame(frame)
1✔
529
        return frame.sent_future
1✔
530

531
    def _is_frame_allowed_to_send(self, frame: Frame) -> bool:
1✔
532
        if isinstance(frame, initiate_request_frame_types):
1!
533
            return self._requester_lease.is_request_allowed(frame.stream_id)
1✔
534

535
        return True
×
536

537
    def _create_setup_frame(self,
1✔
538
                            data_encoding: bytes,
539
                            metadata_encoding: bytes,
540
                            payload: Optional[Payload] = None) -> SetupFrame:
541
        return to_setup_frame(payload,
1✔
542
                              data_encoding,
543
                              metadata_encoding,
544
                              self._keep_alive_period,
545
                              self._max_lifetime_period,
546
                              self._honor_lease)
547

548
    @abc.abstractmethod
549
    def _log_identifier(self) -> str:
550
        ...
551

552
    def _assert_valid_fragment_size(self, fragment_size_bytes: Optional[int]):
1✔
553
        if fragment_size_bytes is not None and fragment_size_bytes < MINIMUM_FRAGMENT_SIZE_BYTES:
1!
554
            raise RSocketError("Invalid fragment size specified. bytes: %s" % fragment_size_bytes)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc