• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 13346469615

15 Feb 2025 03:44PM UTC coverage: 92.125% (+0.1%) from 92.021%
13346469615

Pull #291

github

web-flow
Merge 2fcc39f87 into 41706b46a
Pull Request #291: Updated example use cases. Fixed a reactivex request_response error

835 of 977 branches covered (85.47%)

Branch coverage included in aggregate %.

3 of 4 new or added lines in 1 file covered. (75.0%)

3985 of 4255 relevant lines covered (93.65%)

4.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.67
/rsocket/rx_support/rx_handler.py
1
from abc import abstractmethod
5✔
2
from datetime import timedelta
5✔
3
from typing import Optional, Union, Callable
5✔
4

5
from rx import Observable
5✔
6
from rx.core.typing import Subject
5✔
7

8
from rsocket.error_codes import ErrorCode
5✔
9
from rsocket.extensions.composite_metadata import CompositeMetadata
5✔
10
from rsocket.logger import logger
5✔
11
from rsocket.payload import Payload
5✔
12
from rsocket.rx_support.rx_channel import RxChannel
5✔
13

14

15
class RxHandler:
5✔
16

17
    @abstractmethod
18
    async def on_setup(self,
19
                       data_encoding: bytes,
20
                       metadata_encoding: bytes,
21
                       payload: Payload):
22
        ...
23

24
    @abstractmethod
25
    async def on_metadata_push(self, metadata: Payload):
26
        ...
27

28
    @abstractmethod
29
    async def request_channel(self, payload: Payload) -> RxChannel:
30
        ...
31

32
    @abstractmethod
33
    async def request_fire_and_forget(self, payload: Payload):
34
        ...
35

36
    @abstractmethod
37
    async def request_response(self, payload: Payload) -> Observable:
38
        ...
39

40
    @abstractmethod
41
    async def request_stream(self, payload: Payload) -> Union[Observable, Callable[[Subject], Observable]]:
42
        ...
43

44
    @abstractmethod
45
    async def on_error(self, error_code: ErrorCode, payload: Payload):
46
        ...
47

48
    @abstractmethod
49
    async def on_keepalive_timeout(self,
50
                                   time_since_last_keepalive: timedelta,
51
                                   rsocket):
52
        ...
53

54
    @abstractmethod
55
    async def on_connection_error(self, rsocket, exception: Exception):
56
        ...
57

58
    @abstractmethod
59
    async def on_close(self, rsocket, exception: Optional[Exception] = None):
60
        ...
61

62
    # noinspection PyMethodMayBeStatic
63
    def _parse_composite_metadata(self, metadata: bytes) -> CompositeMetadata:
5✔
64
        composite_metadata = CompositeMetadata()
×
65
        composite_metadata.parse(metadata)
×
66
        return composite_metadata
×
67

68

69
class BaseRxHandler(RxHandler):
5✔
70

71
    async def on_setup(self, data_encoding: bytes, metadata_encoding: bytes, payload: Payload):
5✔
72
        """Nothing to do on setup by default"""
73

74
    async def on_metadata_push(self, metadata: Payload):
5✔
75
        """Nothing by default"""
76

77
    async def request_channel(self, payload: Payload) -> RxChannel:
5✔
78
        raise RuntimeError('Not implemented')
5✔
79

80
    async def request_fire_and_forget(self, payload: Payload):
5✔
81
        """The requester isn't listening for errors.  Nothing to do."""
82

83
    async def request_response(self, payload: Payload) -> Observable:
5✔
84
        raise RuntimeError('Not implemented')
5✔
85

86
    async def request_stream(self, payload: Payload) -> Observable:
5✔
87
        raise RuntimeError('Not implemented')
5✔
88

89
    async def on_error(self, error_code: ErrorCode, payload: Payload):
5✔
90
        logger().error('Error handler: %s, %s', error_code.name, payload)
×
91

92
    async def on_keepalive_timeout(self, time_since_last_keepalive: timedelta, rsocket):
5✔
93
        pass
94

95
    async def on_close(self, rsocket, exception: Optional[Exception] = None):
5✔
96
        pass
97

98
    async def on_connection_error(self, rsocket, exception: Exception):
5✔
99
        pass
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc