• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 13347180433

15 Feb 2025 05:27PM UTC coverage: 91.681% (-0.3%) from 92.021%
13347180433

push

github

web-flow
Merge pull request #291 from rsocket/send_channel_request_to_subscriber

Updated example use cases. Fixed a reactivex request_response error

715 of 866 branches covered (82.56%)

Branch coverage included in aggregate %.

3 of 4 new or added lines in 1 file covered. (75.0%)

3980 of 4255 relevant lines covered (93.54%)

1.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/rsocket/reactivex/reactivex_handler_adapter.py
1
import asyncio
2✔
2
from datetime import timedelta
2✔
3
from typing import Tuple, Optional, Callable
2✔
4

5
from reactivex import operators
2✔
6

7
from reactivestreams.publisher import Publisher
2✔
8
from reactivestreams.subscriber import Subscriber
2✔
9
from rsocket.error_codes import ErrorCode
2✔
10
from rsocket.payload import Payload
2✔
11
from rsocket.reactivex.back_pressure_publisher import observable_to_publisher
2✔
12
from rsocket.reactivex.from_rsocket_publisher import RxSubscriberFromObserver
2✔
13
from rsocket.reactivex.reactivex_handler import ReactivexHandler
2✔
14
from rsocket.request_handler import RequestHandler
2✔
15

16

17
def reactivex_handler_factory(handler_factory: Callable[[], ReactivexHandler]):
2✔
18
    def create_handler():
2✔
19
        return ReactivexHandlerAdapter(handler_factory())
2✔
20

21
    return create_handler
2✔
22

23

24
class ReactivexHandlerAdapter(RequestHandler):
2✔
25

26
    def __init__(self, delegate: ReactivexHandler):
2✔
27
        self.delegate = delegate
2✔
28

29
    async def on_setup(self, data_encoding: bytes, metadata_encoding: bytes, payload: Payload):
2✔
30
        await self.delegate.on_setup(data_encoding, metadata_encoding, payload)
2✔
31

32
    async def on_metadata_push(self, metadata: Payload):
2✔
33
        await self.on_metadata_push(metadata)
×
34

35
    async def request_channel(self, payload: Payload) -> Tuple[Optional[Publisher], Optional[Subscriber]]:
2✔
36
        reactivex_channel = await self.delegate.request_channel(payload)
2✔
37

38
        subscriber = None
2✔
39
        publisher = observable_to_publisher(reactivex_channel.observable)
2✔
40

41
        if reactivex_channel.observer is not None:
2!
42
            subscriber = RxSubscriberFromObserver(reactivex_channel.observer,
2✔
43
                                                  reactivex_channel.limit_rate)
44

45
        return publisher, subscriber
2✔
46

47
    async def request_fire_and_forget(self, payload: Payload):
2✔
48
        await self.delegate.request_fire_and_forget(payload)
×
49

50
    async def request_response(self, payload: Payload) -> asyncio.Future:
2✔
51
        response = await self.delegate.request_response(payload)
2✔
52

53
        if isinstance(response, asyncio.Future):
2!
NEW
54
            observable = await response
×
55
        else:
56
            observable = response
2✔
57

58
        return observable.pipe(
2✔
59
            operators.default_if_empty(Payload()),
60
            operators.to_future()
61
        )
62

63
    async def request_stream(self, payload: Payload) -> Publisher:
2✔
64
        return observable_to_publisher(await self.delegate.request_stream(payload))
2✔
65

66
    async def on_error(self, error_code: ErrorCode, payload: Payload):
2✔
67
        await self.delegate.on_error(error_code, payload)
×
68

69
    async def on_keepalive_timeout(self, time_since_last_keepalive: timedelta, rsocket):
2✔
70
        await self.delegate.on_keepalive_timeout(time_since_last_keepalive, rsocket)
×
71

72
    async def on_connection_error(self, rsocket, exception: Exception):
2✔
73
        await self.delegate.on_connection_error(rsocket, exception)
×
74

75
    async def on_close(self, rsocket, exception: Optional[Exception] = None):
2✔
76
        await self.delegate.on_close(rsocket, exception)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc