• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 13346469615

15 Feb 2025 03:44PM UTC coverage: 92.125% (+0.1%) from 92.021%
13346469615

Pull #291

github

web-flow
Merge 2fcc39f87 into 41706b46a
Pull Request #291: Updated example use cases. Fixed a reactivex request_response error

835 of 977 branches covered (85.47%)

Branch coverage included in aggregate %.

3 of 4 new or added lines in 1 file covered. (75.0%)

3985 of 4255 relevant lines covered (93.65%)

4.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/rsocket/reactivex/reactivex_handler_adapter.py
1
import asyncio
5✔
2
from datetime import timedelta
5✔
3
from typing import Tuple, Optional, Callable
5✔
4

5
from reactivex import operators
5✔
6

7
from reactivestreams.publisher import Publisher
5✔
8
from reactivestreams.subscriber import Subscriber
5✔
9
from rsocket.error_codes import ErrorCode
5✔
10
from rsocket.payload import Payload
5✔
11
from rsocket.reactivex.back_pressure_publisher import observable_to_publisher
5✔
12
from rsocket.reactivex.from_rsocket_publisher import RxSubscriberFromObserver
5✔
13
from rsocket.reactivex.reactivex_handler import ReactivexHandler
5✔
14
from rsocket.request_handler import RequestHandler
5✔
15

16

17
def reactivex_handler_factory(handler_factory: Callable[[], ReactivexHandler]):
5✔
18
    def create_handler():
5✔
19
        return ReactivexHandlerAdapter(handler_factory())
5✔
20

21
    return create_handler
5✔
22

23

24
class ReactivexHandlerAdapter(RequestHandler):
5✔
25

26
    def __init__(self, delegate: ReactivexHandler):
5✔
27
        self.delegate = delegate
5✔
28

29
    async def on_setup(self, data_encoding: bytes, metadata_encoding: bytes, payload: Payload):
5✔
30
        await self.delegate.on_setup(data_encoding, metadata_encoding, payload)
5✔
31

32
    async def on_metadata_push(self, metadata: Payload):
5✔
33
        await self.on_metadata_push(metadata)
×
34

35
    async def request_channel(self, payload: Payload) -> Tuple[Optional[Publisher], Optional[Subscriber]]:
5✔
36
        reactivex_channel = await self.delegate.request_channel(payload)
5✔
37

38
        subscriber = None
5✔
39
        publisher = observable_to_publisher(reactivex_channel.observable)
5✔
40

41
        if reactivex_channel.observer is not None:
5!
42
            subscriber = RxSubscriberFromObserver(reactivex_channel.observer,
5✔
43
                                                  reactivex_channel.limit_rate)
44

45
        return publisher, subscriber
5✔
46

47
    async def request_fire_and_forget(self, payload: Payload):
5✔
48
        await self.delegate.request_fire_and_forget(payload)
×
49

50
    async def request_response(self, payload: Payload) -> asyncio.Future:
5✔
51
        response = await self.delegate.request_response(payload)
5✔
52

53
        if isinstance(response, asyncio.Future):
5!
NEW
54
            observable = await response
×
55
        else:
56
            observable = response
5✔
57

58
        return observable.pipe(
5✔
59
            operators.default_if_empty(Payload()),
60
            operators.to_future()
61
        )
62

63
    async def request_stream(self, payload: Payload) -> Publisher:
5✔
64
        return observable_to_publisher(await self.delegate.request_stream(payload))
5✔
65

66
    async def on_error(self, error_code: ErrorCode, payload: Payload):
5✔
67
        await self.delegate.on_error(error_code, payload)
×
68

69
    async def on_keepalive_timeout(self, time_since_last_keepalive: timedelta, rsocket):
5✔
70
        await self.delegate.on_keepalive_timeout(time_since_last_keepalive, rsocket)
×
71

72
    async def on_connection_error(self, rsocket, exception: Exception):
5✔
73
        await self.delegate.on_connection_error(rsocket, exception)
×
74

75
    async def on_close(self, rsocket, exception: Optional[Exception] = None):
5✔
76
        await self.delegate.on_close(rsocket, exception)
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc