• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kyuupichan / aiorpcX / #439

17 Mar 2024 07:56AM UTC coverage: 95.73% (-2.8%) from 98.482%
#439

push

coveralls-python

kyuupichan
Prepare aiorpcX 0.23

1 of 1 new or added line in 1 file covered. (100.0%)

52 existing lines in 2 files now uncovered.

1771 of 1850 relevant lines covered (95.73%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.61
/aiorpcx/unixsocket.py
1
# Copyright (c) 2021, Adriano Marto Reis
2
#
3
# All rights reserved.
4
#
5
# The MIT License (MIT)
6
#
7
# Permission is hereby granted, free of charge, to any person obtaining
8
# a copy of this software and associated documentation files (the
9
# "Software"), to deal in the Software without restriction, including
10
# without limitation the rights to use, copy, modify, merge, publish,
11
# distribute, sublicense, and/or sell copies of the Software, and to
12
# permit persons to whom the Software is furnished to do so, subject to
13
# the following conditions:
14
#
15
# The above copyright notice and this permission notice shall be
16
# included in all copies or substantial portions of the Software.
17
#
18
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
22
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25

26
'''Asyncio protocol abstraction.'''
1✔
27

28
__all__ = ('connect_us', 'serve_us')
1✔
29

30

31
import asyncio
1✔
32
from functools import partial
1✔
33

34
from aiorpcx.curio import Event, timeout_after, TaskTimeout
1✔
35
from aiorpcx.session import RPCSession, SessionBase, SessionKind
1✔
36

37

38
class ConnectionLostError(Exception):
1✔
39
    pass
1✔
40

41

42
class USTransport(asyncio.Protocol):
1✔
43

44
    def __init__(self, session_factory, framer, kind):
1✔
UNCOV
45
        self.session_factory = session_factory
×
UNCOV
46
        self.loop = asyncio.get_event_loop()
×
UNCOV
47
        self.session = None
×
UNCOV
48
        self.kind = kind
×
UNCOV
49
        self._asyncio_transport = None
×
UNCOV
50
        self._framer = framer
×
51
        # Cleared when the send socket is full
UNCOV
52
        self._can_send = Event()
×
UNCOV
53
        self._can_send.set()
×
UNCOV
54
        self._closed_event = Event()
×
UNCOV
55
        self._process_messages_task = None
×
56

57
    async def process_messages(self):
1✔
UNCOV
58
        try:
×
UNCOV
59
            await self.session.process_messages(self.receive_message)
×
UNCOV
60
        except ConnectionLostError:
×
UNCOV
61
            pass
×
62
        finally:
UNCOV
63
            self._closed_event.set()
×
64

65
    async def receive_message(self):
1✔
UNCOV
66
        return await self._framer.receive_message()
×
67

68
    def connection_made(self, transport):
1✔
69
        '''Called by asyncio when a connection is established.'''
UNCOV
70
        self._asyncio_transport = transport
×
UNCOV
71
        self.session = self.session_factory(self)
×
UNCOV
72
        self._framer = self._framer or self.session.default_framer()
×
UNCOV
73
        self._process_messages_task = self.loop.create_task(self.process_messages())
×
74

75
    def connection_lost(self, _exeption):
1✔
76
        '''Called by asyncio when the connection closes.
77
        Tear down things done in connection_made.'''
78
        # Release waiting tasks
UNCOV
79
        self._can_send.set()
×
UNCOV
80
        self._framer.fail(ConnectionLostError())
×
81

82
    def data_received(self, data):
1✔
83
        '''Called by asyncio when a message comes in.'''
UNCOV
84
        self.session.data_received(data)
×
UNCOV
85
        self._framer.received_bytes(data)
×
86

87
    def pause_writing(self):
1✔
88
        '''Called by asyncio the send buffer is full.'''
89
        if not self.is_closing():
×
90
            self._can_send.clear()
×
91
            self._asyncio_transport.pause_reading()
×
92

93
    def resume_writing(self):
1✔
94
        '''Called by asyncio the send buffer has room.'''
95
        if not self._can_send.is_set():
×
96
            self._can_send.set()
×
97
            self._asyncio_transport.resume_reading()
×
98

99
    # API exposed to session
100
    async def write(self, message):
1✔
UNCOV
101
        await self._can_send.wait()
×
UNCOV
102
        if not self.is_closing():
×
UNCOV
103
            framed_message = self._framer.frame(message)
×
UNCOV
104
            self._asyncio_transport.write(framed_message)
×
105

106
    async def close(self, force_after):
1✔
107
        '''Close the connection and return when closed.'''
UNCOV
108
        if self._asyncio_transport:
×
UNCOV
109
            self._asyncio_transport.close()
×
UNCOV
110
            try:
×
UNCOV
111
                async with timeout_after(force_after):
×
UNCOV
112
                    await self._closed_event.wait()
×
113
            except TaskTimeout:
×
114
                await self.abort()
×
115
                await self._closed_event.wait()
×
116

117
    async def abort(self):
1✔
UNCOV
118
        if self._asyncio_transport:
×
UNCOV
119
            self._asyncio_transport.abort()
×
120

121
    def is_closing(self):
1✔
122
        '''Return True if the connection is closing.'''
UNCOV
123
        return self._closed_event.is_set() or self._asyncio_transport.is_closing()
×
124

125
    def proxy(self):
1✔
126
        '''Not applicable to unix sockets.'''
127
        return None
×
128

129
    def remote_address(self):
1✔
130
        '''Not applicable to unix sockets'''
131
        return None
×
132

133

134
class USClient:
1✔
135

136
    def __init__(self, path=None, *, framer=None, **kwargs):
1✔
UNCOV
137
        session_factory = kwargs.pop('session_factory', RPCSession)
×
UNCOV
138
        self.protocol_factory = partial(USTransport, session_factory, framer,
×
139
                                        SessionKind.CLIENT)
UNCOV
140
        self.path = path
×
UNCOV
141
        self.session = None
×
UNCOV
142
        self.loop = kwargs.get('loop', asyncio.get_event_loop())
×
UNCOV
143
        self.kwargs = kwargs
×
144

145
    async def create_connection(self):
1✔
146
        '''Initiate a connection.'''
UNCOV
147
        return await self.loop.create_unix_connection(
×
148
            self.protocol_factory, self.path, **self.kwargs)
149

150
    async def __aenter__(self):
1✔
UNCOV
151
        _transport, protocol = await self.create_connection()
×
UNCOV
152
        self.session = protocol.session
×
UNCOV
153
        assert isinstance(self.session, SessionBase)
×
UNCOV
154
        return self.session
×
155

156
    async def __aexit__(self, _type, _value, _traceback):
1✔
UNCOV
157
        await self.session.close()
×
158

159

160
async def serve_us(session_factory, path=None, *, framer=None, loop=None, **kwargs):
1✔
UNCOV
161
    loop = loop or asyncio.get_event_loop()
×
UNCOV
162
    protocol_factory = partial(USTransport, session_factory, framer, SessionKind.SERVER)
×
UNCOV
163
    return await loop.create_unix_server(protocol_factory, path, **kwargs)
×
164

165

166
connect_us = USClient
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc