• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 13353792253

16 Feb 2025 09:36AM UTC coverage: 91.992% (+0.2%) from 91.818%
13353792253

Pull #292

github

web-flow
Merge 59cc42ce7 into 1200045e3
Pull Request #292: added python 3.13. to test flow

822 of 965 branches covered (85.18%)

Branch coverage included in aggregate %.

3980 of 4255 relevant lines covered (93.54%)

2.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/rsocket/reactivex/reactivex_handler_adapter.py
1
import asyncio
3✔
2
from datetime import timedelta
3✔
3
from typing import Tuple, Optional, Callable
3✔
4

5
from reactivex import operators
3✔
6

7
from reactivestreams.publisher import Publisher
3✔
8
from reactivestreams.subscriber import Subscriber
3✔
9
from rsocket.error_codes import ErrorCode
3✔
10
from rsocket.payload import Payload
3✔
11
from rsocket.reactivex.back_pressure_publisher import observable_to_publisher
3✔
12
from rsocket.reactivex.from_rsocket_publisher import RxSubscriberFromObserver
3✔
13
from rsocket.reactivex.reactivex_handler import ReactivexHandler
3✔
14
from rsocket.request_handler import RequestHandler
3✔
15

16

17
def reactivex_handler_factory(handler_factory: Callable[[], ReactivexHandler]):
3✔
18
    def create_handler():
3✔
19
        return ReactivexHandlerAdapter(handler_factory())
3✔
20

21
    return create_handler
3✔
22

23

24
class ReactivexHandlerAdapter(RequestHandler):
3✔
25

26
    def __init__(self, delegate: ReactivexHandler):
3✔
27
        self.delegate = delegate
3✔
28

29
    async def on_setup(self, data_encoding: bytes, metadata_encoding: bytes, payload: Payload):
3✔
30
        await self.delegate.on_setup(data_encoding, metadata_encoding, payload)
3✔
31

32
    async def on_metadata_push(self, metadata: Payload):
3✔
33
        await self.on_metadata_push(metadata)
×
34

35
    async def request_channel(self, payload: Payload) -> Tuple[Optional[Publisher], Optional[Subscriber]]:
3✔
36
        reactivex_channel = await self.delegate.request_channel(payload)
3✔
37

38
        subscriber = None
3✔
39
        publisher = observable_to_publisher(reactivex_channel.observable)
3✔
40

41
        if reactivex_channel.observer is not None:
3!
42
            subscriber = RxSubscriberFromObserver(reactivex_channel.observer,
3✔
43
                                                  reactivex_channel.limit_rate)
44

45
        return publisher, subscriber
3✔
46

47
    async def request_fire_and_forget(self, payload: Payload):
3✔
48
        await self.delegate.request_fire_and_forget(payload)
×
49

50
    async def request_response(self, payload: Payload) -> asyncio.Future:
3✔
51
        response = await self.delegate.request_response(payload)
3✔
52

53
        if isinstance(response, asyncio.Future):
3!
54
            observable = await response
×
55
        else:
56
            observable = response
3✔
57

58
        return observable.pipe(
3✔
59
            operators.default_if_empty(Payload()),
60
            operators.to_future()
61
        )
62

63
    async def request_stream(self, payload: Payload) -> Publisher:
3✔
64
        return observable_to_publisher(await self.delegate.request_stream(payload))
3✔
65

66
    async def on_error(self, error_code: ErrorCode, payload: Payload):
3✔
67
        await self.delegate.on_error(error_code, payload)
×
68

69
    async def on_keepalive_timeout(self, time_since_last_keepalive: timedelta, rsocket):
3✔
70
        await self.delegate.on_keepalive_timeout(time_since_last_keepalive, rsocket)
×
71

72
    async def on_connection_error(self, rsocket, exception: Exception):
3✔
73
        await self.delegate.on_connection_error(rsocket, exception)
×
74

75
    async def on_close(self, rsocket, exception: Optional[Exception] = None):
3✔
76
        await self.delegate.on_close(rsocket, exception)
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc