• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 18050199123

26 Sep 2025 09:52PM UTC coverage: 86.697%. Remained the same
18050199123

push

github

web-flow
Bump starlette from 0.46.1 to 0.47.2 (#325)

Bumps [starlette](https://github.com/encode/starlette) from 0.46.1 to 0.47.2.
- [Release notes](https://github.com/encode/starlette/releases)
- [Changelog](https://github.com/encode/starlette/blob/master/docs/release-notes.md)
- [Commits](https://github.com/encode/starlette/compare/0.46.1...0.47.2)

---
updated-dependencies:
- dependency-name: starlette
  dependency-version: 0.47.2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: jell-o-fishi <111562631+jell-o-fishi@users.noreply.github.com>

780 of 978 branches covered (79.75%)

Branch coverage included in aggregate %.

3847 of 4359 relevant lines covered (88.25%)

2.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.19
/rsocket/rsocket_base.py
1
import abc
3✔
2
import asyncio
3✔
3
from asyncio import Task
3✔
4
from contextlib import asynccontextmanager
3✔
5
from datetime import timedelta
3✔
6
from typing import Union, Optional, Dict, Any, Coroutine, Callable, Type, cast, TypeVar
3✔
7

8
from reactivestreams.publisher import Publisher
3✔
9
from reactivestreams.subscriber import DefaultSubscriber
3✔
10
from rsocket.error_codes import ErrorCode
3✔
11
from rsocket.exceptions import RSocketProtocolError, RSocketTransportError, RSocketError
3✔
12
from rsocket.extensions.mimetypes import WellKnownMimeTypes, ensure_encoding_name
3✔
13
from rsocket.frame import (KeepAliveFrame,
3✔
14
                           MetadataPushFrame, RequestFireAndForgetFrame,
15
                           RequestResponseFrame, RequestStreamFrame, Frame,
16
                           exception_to_error_frame,
17
                           LeaseFrame, ErrorFrame, RequestFrame,
18
                           initiate_request_frame_types, InvalidFrame,
19
                           FragmentableFrame, FrameFragmentMixin, MINIMUM_FRAGMENT_SIZE_BYTES)
20
from rsocket.frame import (RequestChannelFrame, ResumeFrame,
3✔
21
                           is_fragmentable_frame, CONNECTION_STREAM_ID)
22
from rsocket.frame import SetupFrame
3✔
23
from rsocket.frame_builders import to_payload_frame, to_fire_and_forget_frame, to_setup_frame, to_metadata_push_frame, \
3✔
24
    to_keepalive_frame
25
from rsocket.frame_fragment_cache import FrameFragmentCache
3✔
26
from rsocket.frame_logger import log_frame
3✔
27
from rsocket.handlers.request_cahnnel_responder import RequestChannelResponder
3✔
28
from rsocket.handlers.request_channel_requester import RequestChannelRequester
3✔
29
from rsocket.handlers.request_response_requester import RequestResponseRequester
3✔
30
from rsocket.handlers.request_response_responder import RequestResponseResponder
3✔
31
from rsocket.handlers.request_stream_requester import RequestStreamRequester
3✔
32
from rsocket.handlers.request_stream_responder import RequestStreamResponder
3✔
33
from rsocket.helpers import payload_from_frame, async_noop, cancel_if_task_exists
3✔
34
from rsocket.lease import DefinedLease, NullLease, Lease
3✔
35
from rsocket.local_typing import Awaitable
3✔
36
from rsocket.logger import logger
3✔
37
from rsocket.payload import Payload
3✔
38
from rsocket.queue_peekable import QueuePeekable
3✔
39
from rsocket.request_handler import BaseRequestHandler, RequestHandler
3✔
40
from rsocket.rsocket import RSocket
3✔
41
from rsocket.rsocket_internal import RSocketInternal
3✔
42
from rsocket.stream_control import StreamControl
3✔
43
from rsocket.streams.backpressureapi import BackpressureApi
3✔
44
from rsocket.streams.stream_handler import StreamHandler
3✔
45
from rsocket.transports.transport import Transport
3✔
46

47
T = TypeVar('T')
3✔
48

49

50
class RSocketBase(RSocket, RSocketInternal):
3✔
51
    class LeaseSubscriber(DefaultSubscriber):
3✔
52
        def __init__(self, socket: 'RSocketBase'):
3✔
53
            super().__init__()
3✔
54
            self._socket = socket
3✔
55

56
        def on_next(self, value, is_complete=False):
3✔
57
            self._socket.send_lease(value)
3✔
58

59
    def __init__(self,
3✔
60
                 handler_factory: Callable[[], RequestHandler] = BaseRequestHandler,
61
                 honor_lease=False,
62
                 lease_publisher: Optional[Publisher] = None,
63
                 request_queue_size: int = 0,
64
                 data_encoding: Union[str, bytes, WellKnownMimeTypes] = WellKnownMimeTypes.APPLICATION_JSON,
65
                 metadata_encoding: Union[str, bytes, WellKnownMimeTypes] = WellKnownMimeTypes.APPLICATION_JSON,
66
                 keep_alive_period: timedelta = timedelta(milliseconds=500),
67
                 max_lifetime_period: timedelta = timedelta(minutes=10),
68
                 setup_payload: Optional[Payload] = None,
69
                 fragment_size_bytes: Optional[int] = None
70
                 ):
71

72
        self._assert_valid_fragment_size(fragment_size_bytes)
3✔
73

74
        self._handler_factory = handler_factory
3✔
75
        self._request_queue_size = request_queue_size
3✔
76
        self._honor_lease = honor_lease
3✔
77
        self._max_lifetime_period = max_lifetime_period
3✔
78
        self._keep_alive_period = keep_alive_period
3✔
79
        self._setup_payload = setup_payload
3✔
80
        self._data_encoding = ensure_encoding_name(data_encoding)
3✔
81
        self._metadata_encoding = ensure_encoding_name(metadata_encoding)
3✔
82
        self._lease_publisher = lease_publisher
3✔
83
        self._sender_task = None
3✔
84
        self._receiver_task = None
3✔
85
        self._handler = self._handler_factory()
3✔
86
        self._responder_lease = None
3✔
87
        self._requester_lease = None
3✔
88
        self._is_closing = False
3✔
89
        self._connecting = True
3✔
90
        self._fragment_size_bytes = fragment_size_bytes
3✔
91

92
        self._setup_internals()
3✔
93

94
    def get_fragment_size_bytes(self) -> Optional[int]:
3✔
95
        return self._fragment_size_bytes
3✔
96

97
    def _setup_internals(self):
3✔
98
        pass
99

100
    @abc.abstractmethod
101
    def _current_transport(self) -> Awaitable[Transport]:
102
        ...
103

104
    def _reset_internals(self):
3✔
105
        self._frame_fragment_cache = FrameFragmentCache()
3✔
106
        self._send_queue = QueuePeekable()
3✔
107
        self._request_queue = asyncio.Queue(self._request_queue_size)
3✔
108

109
        if self._honor_lease:
3✔
110
            self._requester_lease = DefinedLease(maximum_request_count=0)
3✔
111
        else:
112
            self._requester_lease = NullLease()
3✔
113

114
        self._responder_lease = NullLease()
3✔
115
        self._stream_control = StreamControl(self._get_first_stream_id())
3✔
116
        self._is_closing = False
3✔
117

118
    def stop_all_streams(self, error_code=ErrorCode.CONNECTION_ERROR, data=b''):
3✔
119
        self._stream_control.stop_all_streams(error_code, data)
3✔
120

121
    def _start_tasks(self):
3✔
122
        self._receiver_task = self._start_task_if_not_closing(self._receiver)
3✔
123
        self._sender_task = self._start_task_if_not_closing(self._sender)
3✔
124

125
    async def connect(self):
3✔
126
        self.send_priority_frame(self._create_setup_frame(self._data_encoding,
3✔
127
                                                          self._metadata_encoding,
128
                                                          self._setup_payload))
129

130
        if self._honor_lease:
3✔
131
            self._subscribe_to_lease_publisher()
3✔
132

133
        return self
3✔
134

135
    def _start_task_if_not_closing(self, task_factory: Callable[[], Coroutine]) -> Optional[Task]:
3✔
136
        if not self._is_closing:
3!
137
            return asyncio.create_task(task_factory())
3✔
138

139
    def set_handler_using_factory(self, handler_factory) -> RequestHandler:
3✔
140
        self._handler = handler_factory()
3✔
141
        return self._handler
3✔
142

143
    def _allocate_stream(self) -> int:
3✔
144
        return self._stream_control.allocate_stream()
3✔
145

146
    @abc.abstractmethod
147
    def _get_first_stream_id(self) -> int:
148
        ...
149

150
    def finish_stream(self, stream_id: int):
3✔
151
        self._stream_control.finish_stream(stream_id)
3✔
152
        self._frame_fragment_cache.remove(stream_id)
3✔
153

154
    def send_request(self, frame: RequestFrame):
3✔
155
        if self._honor_lease and not self._is_frame_allowed_to_send(frame):
3✔
156
            self._queue_request_frame(frame)
3✔
157
        else:
158
            self.send_frame(frame)
3✔
159

160
    def _queue_request_frame(self, frame: RequestFrame):
3✔
161
        logger().debug('%s: lease not allowing to send request. queueing', self._log_identifier())
3✔
162

163
        self._request_queue.put_nowait(frame)
3✔
164

165
    def send_priority_frame(self, frame: Frame):
3✔
166
        items = []
3✔
167
        while not self._send_queue.empty():
3✔
168
            items.append(self._send_queue.get_nowait())
3✔
169

170
        self._send_queue.put_nowait(frame)
3✔
171
        for item in items:
3✔
172
            self._send_queue.put_nowait(item)
3✔
173

174
    def send_frame(self, frame: Frame):
3✔
175
        self._send_queue.put_nowait(frame)
3✔
176

177
    def send_complete(self, stream_id: int):
3✔
178
        self.send_payload(stream_id, Payload(), complete=True, is_next=False)
3✔
179

180
    def send_error(self, stream_id: int, exception: Exception):
3✔
181
        self.send_frame(exception_to_error_frame(stream_id, exception))
3✔
182

183
    def send_payload(self, stream_id: int, payload: Payload, complete=False, is_next=True):
3✔
184
        self.send_frame(to_payload_frame(stream_id, payload, complete, is_next=is_next,
3✔
185
                                         fragment_size_bytes=self.get_fragment_size_bytes()))
186

187
    def _update_last_keepalive(self):
3✔
188
        pass
189

190
    def register_new_stream(self, handler: T) -> T:
3✔
191
        stream_id = self._allocate_stream()
3✔
192
        self._register_stream(stream_id, handler)
3✔
193
        return handler
3✔
194

195
    def _register_stream(self, stream_id: int, handler: StreamHandler):
3✔
196
        handler.stream_id = stream_id
3✔
197
        self._stream_control.register_stream(stream_id, handler)
3✔
198
        return handler
3✔
199

200
    async def handle_error(self, frame: ErrorFrame):
3✔
201
        await self._handler.on_error(frame.error_code, payload_from_frame(frame))
3✔
202

203
    async def handle_keep_alive(self, frame: KeepAliveFrame):
3✔
204
        self._update_last_keepalive()
3✔
205

206
        if frame.flags_respond:
3✔
207
            frame.flags_respond = False
3✔
208
            self.send_frame(frame)
3✔
209

210
    async def handle_request_response(self, frame: RequestResponseFrame):
3✔
211
        stream_id = frame.stream_id
3✔
212
        self._stream_control.assert_stream_id_available(stream_id)
3✔
213
        handler = self._handler
3✔
214

215
        response_future = await handler.request_response(payload_from_frame(frame))
3✔
216

217
        self._register_stream(stream_id, RequestResponseResponder(self, response_future)).setup()
3✔
218

219
    async def handle_request_stream(self, frame: RequestStreamFrame):
3✔
220
        stream_id = frame.stream_id
3✔
221
        self._stream_control.assert_stream_id_available(stream_id)
3✔
222
        handler = self._handler
3✔
223

224
        publisher = await handler.request_stream(payload_from_frame(frame))
3✔
225

226
        request_responder = RequestStreamResponder(self, publisher)
3✔
227
        self._register_stream(stream_id, request_responder)
3✔
228
        request_responder.frame_received(frame)
3✔
229

230
    async def handle_setup(self, frame: SetupFrame):
3✔
231
        if frame.flags_resume:
3✔
232
            raise RSocketProtocolError(ErrorCode.UNSUPPORTED_SETUP, data='Resume not supported')
3✔
233

234
        if frame.flags_lease:
3✔
235
            if self._lease_publisher is None:
3✔
236
                raise RSocketProtocolError(ErrorCode.UNSUPPORTED_SETUP, data='Lease not available')
3✔
237
            else:
238
                self._subscribe_to_lease_publisher()
3✔
239

240
        handler = self._handler
3✔
241

242
        try:
3✔
243
            await handler.on_setup(frame.data_encoding,
3✔
244
                                   frame.metadata_encoding,
245
                                   payload_from_frame(frame))
246
        except Exception as exception:
3✔
247
            logger().error('%s: Setup error', self._log_identifier(), exc_info=True)
3✔
248
            raise RSocketProtocolError(ErrorCode.REJECTED_SETUP, data=str(exception)) from exception
3✔
249

250
    def _subscribe_to_lease_publisher(self):
3✔
251
        if self._lease_publisher is not None:
3✔
252
            self._lease_publisher.subscribe(self.LeaseSubscriber(self))
3✔
253

254
    def send_lease(self, lease: Lease):
3✔
255
        try:
3✔
256
            self._responder_lease = lease
3✔
257

258
            self.send_frame(self._responder_lease.to_frame())
3✔
259
        except Exception as exception:
×
260
            self.send_error(CONNECTION_STREAM_ID, exception)
×
261

262
    async def handle_fire_and_forget(self, frame: RequestFireAndForgetFrame):
3✔
263
        self._stream_control.assert_stream_id_available(frame.stream_id)
3✔
264

265
        await self._handler.request_fire_and_forget(payload_from_frame(frame))
3✔
266

267
    async def handle_metadata_push(self, frame: MetadataPushFrame):
3✔
268
        await self._handler.on_metadata_push(Payload(None, frame.metadata))
3✔
269

270
    async def handle_request_channel(self, frame: RequestChannelFrame):
3✔
271
        stream_id = frame.stream_id
3✔
272
        self._stream_control.assert_stream_id_available(stream_id)
3✔
273
        handler = self._handler
3✔
274

275
        publisher, subscriber = await handler.request_channel(payload_from_frame(frame))
3✔
276

277
        channel_responder = RequestChannelResponder(self, publisher)
3✔
278
        self._register_stream(stream_id, channel_responder)
3✔
279
        channel_responder.subscribe(subscriber)
3✔
280
        channel_responder.frame_received(frame)
3✔
281

282
    async def handle_resume(self, frame: ResumeFrame):
3✔
283
        raise RSocketProtocolError(ErrorCode.REJECTED_RESUME, data='Resume not supported')
3✔
284

285
    async def handle_lease(self, frame: LeaseFrame):
3✔
286
        self._requester_lease = DefinedLease(
3✔
287
            frame.number_of_requests,
288
            timedelta(milliseconds=frame.time_to_live)
289
        )
290

291
        while not self._request_queue.empty() and self._requester_lease.is_request_allowed():
3✔
292
            self.send_frame(self._request_queue.get_nowait())
3✔
293
            self._request_queue.task_done()
3✔
294

295
    async def _receiver(self):
3✔
296
        try:
3✔
297
            await self._receiver_listen()
3✔
298
        except asyncio.CancelledError:
3✔
299
            logger().debug('%s: Asyncio task canceled: receiver', self._log_identifier())
3✔
300
        except RSocketTransportError:
2✔
301
            pass
302
        except Exception:
2✔
303
            logger().error('%s: Unknown error', self._log_identifier(), exc_info=True)
2✔
304
            raise
2✔
305

306
        await self._on_connection_closed()
3✔
307

308
    async def _on_connection_error(self, exception: Exception):
3✔
309
        logger().warning(str(exception))
3✔
310
        logger().debug(str(exception), exc_info=exception)
3✔
311
        await self._handler.on_connection_error(self, exception)
3✔
312

313
    async def _on_connection_closed(self):
3✔
314
        self.stop_all_streams()
3✔
315
        await self._handler.on_close(self)
3✔
316
        await self._stop_tasks()
3✔
317

318
    @abc.abstractmethod
319
    def is_server_alive(self) -> bool:
320
        ...
321

322
    async def _receiver_listen(self):
3✔
323
        async_frame_handler_by_type: Dict[Type[Frame], Any] = {
3✔
324
            RequestResponseFrame: self.handle_request_response,
325
            RequestStreamFrame: self.handle_request_stream,
326
            RequestChannelFrame: self.handle_request_channel,
327
            SetupFrame: self.handle_setup,
328
            RequestFireAndForgetFrame: self.handle_fire_and_forget,
329
            MetadataPushFrame: self.handle_metadata_push,
330
            ResumeFrame: self.handle_resume,
331
            LeaseFrame: self.handle_lease,
332
            KeepAliveFrame: self.handle_keep_alive,
333
            ErrorFrame: self.handle_error
334
        }
335

336
        transport = await self._current_transport()
3✔
337
        while self.is_server_alive():
3✔
338
            next_frame_generator = await transport.next_frame_generator()
3✔
339
            if next_frame_generator is None:
3✔
340
                break
3✔
341
            async for frame in next_frame_generator:
3✔
342
                try:
3✔
343
                    await self._handle_next_frame(frame, async_frame_handler_by_type)
3✔
344
                except RSocketProtocolError as exception:
3✔
345
                    logger().error('%s: Protocol error %s', self._log_identifier(), str(exception))
3✔
346
                    self.send_error(frame.stream_id, exception)
3✔
347
                except RSocketTransportError:
3!
348
                    raise
×
349
                except Exception as exception:
3✔
350
                    logger().error('%s: Unknown error', self._log_identifier(), exc_info=True)
3✔
351
                    self.send_error(frame.stream_id, exception)
3✔
352

353
    async def _handle_next_frame(self, frame: Frame, async_frame_handler_by_type):
3✔
354

355
        log_frame(frame, self._log_identifier())
3✔
356

357
        if isinstance(frame, InvalidFrame):
3✔
358
            return
3✔
359

360
        if is_fragmentable_frame(frame):
3✔
361
            complete_frame = self._frame_fragment_cache.append(cast(FragmentableFrame, frame))
3✔
362
            if complete_frame is None:
3✔
363
                return
3✔
364
        else:
365
            complete_frame = frame
3✔
366

367
        if (complete_frame.stream_id == CONNECTION_STREAM_ID or
3✔
368
                isinstance(complete_frame, initiate_request_frame_types)):
369
            await self._handle_frame_by_type(complete_frame, async_frame_handler_by_type)
3✔
370
        elif self._stream_control.handle_stream(complete_frame):
3✔
371
            return
3✔
372
        else:
373
            logger().warning('%s: Dropping frame from unknown stream %d', self._log_identifier(),
3✔
374
                             complete_frame.stream_id)
375

376
    async def _handle_frame_by_type(self, frame: Frame, async_frame_handler_by_type):
3✔
377
        frame_handler = async_frame_handler_by_type.get(type(frame), async_noop)
3✔
378
        await frame_handler(frame)
3✔
379

380
    def _send_new_keepalive(self, data: bytes = b''):
3✔
381
        self.send_frame(to_keepalive_frame(data))
3✔
382

383
    def _before_sender(self):
3✔
384
        pass
385

386
    async def _finally_sender(self):
3✔
387
        pass
388

389
    @asynccontextmanager
3✔
390
    async def _get_next_frame_to_send(self, transport: Transport) -> Frame:
3✔
391
        next_frame_source = await self._send_queue.peek()
3✔
392

393
        if isinstance(next_frame_source, FrameFragmentMixin):
3✔
394
            next_fragment = next_frame_source.get_next_fragment(transport.requires_length_header())
3✔
395

396
            if next_fragment.flags_follows:
3✔
397
                self._send_queue.put_nowait(self._send_queue.get_nowait())  # cycle to next frame source in queue
3✔
398
            else:
399
                next_frame_source.get_next_fragment(
3✔
400
                    transport.requires_length_header())  # workaround to clean-up generator.
401
                self._send_queue.get_nowait()
3✔
402

403
            yield next_fragment
3✔
404

405
        else:
406
            self._send_queue.get_nowait()
3✔
407
            yield next_frame_source
3✔
408

409
    async def _sender(self):
3✔
410
        try:
3✔
411
            try:
3✔
412
                transport = await self._current_transport()
3✔
413

414
                self._before_sender()
3✔
415
                while self.is_server_alive():
3✔
416
                    async with self._get_next_frame_to_send(transport) as frame:
3✔
417
                        await transport.send_frame(frame)
3✔
418
                        log_frame(frame, self._log_identifier(), 'Sent')
3✔
419

420
                        if frame.sent_future is not None:
3✔
421
                            frame.sent_future.set_result(None)
3✔
422

423
                    if self._send_queue.empty():
3✔
424
                        await transport.on_send_queue_empty()
3✔
425
            except RSocketTransportError:
3✔
426
                logger().error('Error', exc_info=True)
1✔
427
                pass
428

429
        except asyncio.CancelledError:
3✔
430
            logger().debug('%s: Asyncio task canceled: sender', self._log_identifier())
3✔
431
        except Exception:
2✔
432
            logger().error('%s: RSocket error', self._log_identifier(), exc_info=True)
×
433
            raise
×
434
        finally:
435
            await self._finally_sender()
3✔
436

437
    async def close(self):
3✔
438
        logger().debug('%s: Closing', self._log_identifier())
3✔
439

440
        await self._stop_tasks()
3✔
441

442
        await self._close_transport()
3✔
443

444
    async def _stop_tasks(self):
3✔
445
        logger().debug('%s: Cleanup', self._log_identifier())
3✔
446

447
        self._is_closing = True
3✔
448
        await cancel_if_task_exists(self._sender_task)
3✔
449
        self._sender_task = None
3✔
450
        await cancel_if_task_exists(self._receiver_task)
3✔
451
        self._receiver_task = None
3✔
452

453
    async def _close_transport(self):
3✔
454
        if self._current_transport().done():
3!
455
            logger().debug('%s: Closing transport', self._log_identifier())
3✔
456
            try:
3✔
457
                transport = await self._current_transport()
3✔
458
            except asyncio.CancelledError:
3✔
459
                raise RSocketTransportError()
3✔
460

461
            if transport is not None:
3!
462
                try:
3✔
463
                    await transport.close()
3✔
464
                except Exception:
×
465
                    logger().debug('Transport already closed or failed to close', exc_info=True)
×
466

467
    async def __aenter__(self) -> 'RSocketBase':
3✔
468
        return self
×
469

470
    async def __aexit__(self, exc_type, exc_val, exc_tb):
3✔
471
        await self.close()
3✔
472

473
    def request_response(self, payload: Payload) -> Awaitable[Payload]:
3✔
474
        """
475
        Initiate a request-response interaction.
476
        """
477

478
        logger().debug('%s: request-response: %s', self._log_identifier(), payload)
3✔
479

480
        requester = RequestResponseRequester(self, payload)
3✔
481
        self.register_new_stream(requester).setup()
3✔
482
        return requester.run()
3✔
483

484
    def fire_and_forget(self, payload: Payload) -> Awaitable[None]:
3✔
485
        """
486
        Initiate a fire-and-forget interaction.
487
        """
488

489
        logger().debug('%s: fire-and-forget: %s', self._log_identifier(), payload)
3✔
490

491
        stream_id = self._allocate_stream()
3✔
492
        frame = to_fire_and_forget_frame(stream_id, payload, self._fragment_size_bytes)
3✔
493
        self.send_request(frame)
3✔
494
        frame.sent_future.add_done_callback(lambda _: self.finish_stream(stream_id))
3✔
495
        return frame.sent_future
3✔
496

497
    def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]:
3✔
498
        """
499
        Initiate a request-stream interaction.
500
        """
501

502
        logger().debug('%s: request-stream: %s', self._log_identifier(), payload)
3✔
503

504
        requester = RequestStreamRequester(self, payload)
3✔
505
        return self.register_new_stream(requester)
3✔
506

507
    def request_channel(
3✔
508
            self,
509
            payload: Payload,
510
            publisher: Optional[Publisher] = None,
511
            sending_done: Optional[asyncio.Event] = None) -> Union[BackpressureApi, Publisher]:
512
        """
513
        Initiate a request-channel interaction.
514
        """
515

516
        logger().debug('%s: request-channel: %s', self._log_identifier(), payload)
3✔
517

518
        requester = RequestChannelRequester(self, payload, publisher, sending_done)
3✔
519
        return self.register_new_stream(requester)
3✔
520

521
    def metadata_push(self, metadata: bytes) -> Awaitable[None]:
3✔
522
        """
523
        Initiate a metadata-push interaction.
524
        """
525

526
        logger().debug('%s: metadata-push: %s', self._log_identifier(), metadata)
3✔
527

528
        frame = to_metadata_push_frame(metadata)
3✔
529
        self.send_frame(frame)
3✔
530
        return frame.sent_future
3✔
531

532
    def _is_frame_allowed_to_send(self, frame: Frame) -> bool:
3✔
533
        if isinstance(frame, initiate_request_frame_types):
3!
534
            return self._requester_lease.is_request_allowed(frame.stream_id)
3✔
535

536
        return True
×
537

538
    def _create_setup_frame(self,
3✔
539
                            data_encoding: bytes,
540
                            metadata_encoding: bytes,
541
                            payload: Optional[Payload] = None) -> SetupFrame:
542
        return to_setup_frame(payload,
3✔
543
                              data_encoding,
544
                              metadata_encoding,
545
                              self._keep_alive_period,
546
                              self._max_lifetime_period,
547
                              self._honor_lease)
548

549
    @abc.abstractmethod
550
    def _log_identifier(self) -> str:
551
        ...
552

553
    def _assert_valid_fragment_size(self, fragment_size_bytes: Optional[int]):
3✔
554
        if fragment_size_bytes is not None and fragment_size_bytes < MINIMUM_FRAGMENT_SIZE_BYTES:
3!
555
            raise RSocketError("Invalid fragment size specified. bytes: %s" % fragment_size_bytes)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc