• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hivesolutions / netius / #620889952

23 Apr 2024 08:23PM UTC coverage: 45.051%. First build
#620889952

Pull #26

travis-ci

Pull Request #26: Server support for protocols

110 of 405 new or added lines in 21 files covered. (27.16%)

7733 of 17165 relevant lines covered (45.05%)

0.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.09
/src/netius/base/protocol.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3

4
# Hive Netius System
5
# Copyright (c) 2008-2024 Hive Solutions Lda.
6
#
7
# This file is part of Hive Netius System.
8
#
9
# Hive Netius System is free software: you can redistribute it and/or modify
10
# it under the terms of the Apache License as published by the Apache
11
# Foundation, either version 2.0 of the License, or (at your option) any
12
# later version.
13
#
14
# Hive Netius System is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
# Apache License for more details.
18
#
19
# You should have received a copy of the Apache License along with
20
# Hive Netius System. If not, see <http://www.apache.org/licenses/>.
21

22
__author__ = "João Magalhães <joamag@hive.pt>"
1✔
23
""" The author(s) of the module """
24

25
__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda."
1✔
26
""" The copyright for the module """
27

28
__license__ = "Apache License, Version 2.0"
1✔
29
""" The license for the module """
30

31
from . import legacy
1✔
32
from . import request
33
from . import observer
34

1✔
35

36
class Protocol(observer.Observable):
37
    """
1✔
38
    Abstract class from which concrete implementation of
39
    protocol logic should be inherited.
40

1✔
41
    The logic of a protocol should implement both a reaction
1✔
42
    to the arrival of information (receive) and the sending
1✔
43
    of processed data (send).
44
    """
1✔
45

46
    def __init__(self, owner=None):
47
        observer.Observable.__init__(self)
48
        self.owner = owner
49
        self._transport = None
50
        self._loop = None
51
        self._writing = True
52
        self._open = False
53
        self._closed = False
54
        self._closing = False
1✔
55
        self._delayed = []
1✔
56
        self._callbacks = []
1✔
57

1✔
58
    def open(self):
1✔
59
        # in case the protocol is already open, ignores the current
1✔
60
        # call as it's considered a double opening
1✔
61
        if self.is_open():
1✔
62
            return
1✔
63

1✔
64
        # calls the concrete implementation of the open operation
1✔
65
        # allowing an extra level of indirection
66
        self.open_c()
1✔
67

68
        self.trigger("open", self)
69

1✔
70
    def close(self):
71
        # in case the protocol is already closed, ignores the current
72
        # call considering it a double closing operation
73
        if self.is_closed() or self.is_closing():
1✔
74
            return
75

1✔
76
        # calls the concrete implementation of the close operation
77
        # allowing an extra level of indirection
1✔
78
        self.close_c()
79

80
        self.trigger("close", self)
1✔
81

82
    def finish(self):
83
        # in case the current protocol is already (completely) closed
84
        # or is not in the state of closing, nothing should be done
1✔
85
        if self.is_closed():
86
            return
1✔
87
        if not self.is_closing():
88
            return
1✔
89

90
        # calls the concrete implementation of the finish operation
91
        # allowing an extra level of indirection
1✔
92
        self.finish_c()
1✔
93

94
        self.trigger("finish", self)
95

96
        # runs the "final" destroy operation that is going to run
1✔
97
        # the most structural elements of this object
98
        self.destroy()
1✔
99

100
    def open_c(self):
101
        # unmarks the current protocol from closed (and closing)
102
        # meaning that it will be opened one more time and
1✔
103
        # so it must not be considered as closed
104
        self._open = True
1✔
105
        self._closed = False
106
        self._closing = False
107

108
    def close_c(self):
1✔
109
        # marks the current protocol as closing, meaning that although
1✔
110
        # the close operation is not yet finished it's starting
1✔
111
        self._closing = True
112

1✔
113
        # runs the close transport call that triggers the process
114
        # of closing the underlying transport method, notice that
115
        # this operation is only considered to be safely completed
1✔
116
        # on the next tick of the event loop
117
        self._close_transport()
118

119
        # delays the execution of the finish (cleanup) operation
120
        # so that all the pending operations from the close transport
121
        # call can be executed in the meantime, ensuring a proper,
1✔
122
        # secure and clean execution of the finish method
123
        self.delay(self.finish)
124

125
    def finish_c(self):
126
        del self._delayed[:]
127
        del self._callbacks[:]
1✔
128

129
        self._transport = None
1✔
130
        self._loop = None
1✔
131
        self._writing = True
1✔
132
        self._open = False
133
        self._closed = True
1✔
134
        self._closing = False
1✔
135

1✔
136
    def info_dict(self, full=False):
1✔
137
        if not self._transport:
1✔
138
            return dict()
1✔
139
        info = self._transport.info_dict(full=full)
140
        return info
1✔
141

×
142
    def connection_made(self, transport):
×
143
        self._transport = transport
×
144

145
        # ensure that the protocol is open, please notice
1✔
146
        # that most of the time the protocol is already open
1✔
147
        self.open()
148

149
    def connection_lost(self, exception):
150
        self.close()
1✔
151

152
    def transport(self):
1✔
153
        return self._transport
1✔
154

155
    def loop(self):
1✔
156
        return self._loop
1✔
157

158
    def loop_set(self, loop):
1✔
159
        self._loop = loop
×
160

161
        self.trigger("loop_set", self)
1✔
162

1✔
163
    def loop_unset(self):
164
        self._loop = None
1✔
165

166
        self.trigger("loop_unset", self)
1✔
167

×
168
    def pause_writing(self):
169
        self._writing = False
×
170

171
    def resume_writing(self):
1✔
172
        self._writing = True
×
173
        self._flush_callbacks()
174
        self._flush_send()
1✔
175

×
176
    def delay(self, callable, timeout=None):
×
177
        # in case there's no event loop defined for the protocol
×
178
        # it's not possible to delay this execution so the
179
        # callable is called immediately
1✔
180
        if not self._loop:
181
            return callable()
182

183
        # verifies if the assigned loop contains the non-standard
1✔
184
        # delay method and if that's the case calls it instead of
185
        # the base asyncio API ones (compatibility)
186
        if hasattr(self._loop, "delay"):
187
            immediately = timeout == None
188
            return self._loop.delay(callable, timeout=timeout, immediately=immediately)
1✔
189

1✔
190
        # calls the proper call method taking into account if a timeout
1✔
191
        # value exists or not (soon against later)
192
        if timeout:
193
            return self._loop.call_later(timeout, callable)
194
        else:
195
            return self._loop.call_soon(callable)
196

197
    def debug(self, object):
198
        if not self._loop:
×
199
            return
×
200
        if not hasattr(self._loop, "debug"):
201
            return
1✔
202
        self._loop.debug(object)
1✔
203

1✔
204
    def info(self, object):
1✔
205
        if not self._loop:
206
            return
1✔
207
        if not hasattr(self._loop, "info"):
×
208
            return
×
209
        self._loop.info(object)
×
210

211
    def warning(self, object):
1✔
212
        if not self._loop:
×
213
            return
×
214
        if not hasattr(self._loop, "warning"):
×
215
            return
216
        self._loop.warning(object)
1✔
217

×
218
    def error(self, object):
×
219
        if not self._loop:
×
220
            return
221
        if not hasattr(self._loop, "error"):
1✔
222
            return
×
223
        self._loop.error(object)
×
224

×
225
    def critical(self, object):
226
        if not self._loop:
1✔
227
            return
×
228
        if not hasattr(self._loop, "critical"):
229
            return
1✔
230
        self._loop.critical(object)
1✔
231

232
    def is_pending(self):
1✔
233
        return not self._open and not self._closed and not self._closing
1✔
234

235
    def is_open(self):
1✔
236
        return self._open
1✔
237

238
    def is_closed(self):
1✔
239
        return self._closed
×
240

241
    def is_closing(self):
1✔
242
        return self._closing
1✔
243

1✔
244
    def is_closed_or_closing(self):
1✔
245
        return self._closed or self._closing
246

1✔
247
    def is_devel(self):
1✔
248
        if not self._loop:
1✔
249
            return False
250
        if not hasattr(self._loop, "is_devel"):
1✔
251
            return False
×
252
        return self._loop.is_devel()
×
253

×
254
    def _close_transport(self, force=False):
255
        if not self._transport:
1✔
256
            return
×
257
        self._transport.abort()
×
258

×
259
    def _delay_send(self, data, address=None, callback=None):
260
        item = (data, address, callback)
1✔
261
        self._delayed.append(item)
×
262
        return len(data)
×
263

×
264
    def _flush_callbacks(self):
×
NEW
265
        while self._callbacks:
×
NEW
266
            callback = self._callbacks.pop(0)
×
267
            self.delay(lambda: callback(self._transport))
268

1✔
269
    def _flush_send(self):
270
        while True:
1✔
271
            if not self._delayed:
1✔
272
                break
1✔
273
            if not self._writing:
1✔
274
                break
275
            data, address, callback = self._delayed.pop(0)
1✔
276
            if address:
1✔
277
                self.send(data, address, callback=callback)  # pylint: disable=E1101
278
            else:
1✔
279
                self.send(data, callback=callback)  # pylint: disable=E1101
×
280

281

1✔
282
class DatagramProtocol(Protocol):
1✔
283

284
    def __init__(self):
1✔
285
        Protocol.__init__(self)
286
        self.requests = []
287
        self.requests_m = {}
288

289
    def datagram_received(self, data, address):
290
        self.on_data(address, data)
291

292
    def error_received(self, exception):
1✔
293
        pass
294

295
    def on_data(self, address, data):
296
        self.trigger("data", self, data)
297

298
    def send(self, data, address, delay=True, force=False, callback=None):
299
        return self.send_to(data, address, delay=delay, force=force, callback=callback)
300

1✔
301
    def send_to(self, data, address, delay=True, force=False, callback=None):
302
        # ensures that the provided data value is a bytes sequence
303
        # so that its format is compliant with what's expected by
304
        # the underlying transport send to operation
305
        data = legacy.bytes(data)
306

307
        # in case the current transport buffers do not allow writing
308
        # (paused mode) the writing of the data is delayed until the
309
        # writing is again enabled (resume writing)
310
        if not self._writing:
311
            return self._delay_send(data, address=address, callback=callback)
1✔
312

313
        # pushes the write data down to the transport layer immediately
314
        # as writing is still allowed for the current protocol
315
        self._transport.sendto(data, address)
316

1✔
317
        # in case there's a callback associated with the send
×
318
        # tries to see if the data has been completely flushed
319
        # (writing still enabled) and if so schedules the callback
320
        # to be called on the next tick, otherwise adds it to the
321
        # callbacks to be called upon the next write resume operation
322
        if callback:
323
            if self._writing:
324
                self.delay(lambda: callback(self._transport))
325
            else:
1✔
326
                self._callbacks.append(callback)
327

328
        # returns the size (in bytes) of the data that has just been
329
        # explicitly sent through the associated transport
330
        return len(data)
331

332
    def add_request(self, request):
1✔
333
        # adds the current request object to the list of requests
×
334
        # that are pending a valid response, a garbage collector
×
335
        # system should be able to erase this request from the
336
        # pending list in case a timeout value has passed
337
        self.requests.append(request)
338
        self.requests_m[request.id] = request
1✔
339

340
    def remove_request(self, request):
1✔
341
        self.requests.remove(request)
342
        del self.requests_m[request.id]
343

344
    def get_request(self, id):
345
        is_response = isinstance(id, request.Response)
1✔
346
        if is_response:
1✔
347
            id = id.get_id()
348
        return self.requests_m.get(id, None)
1✔
349

1✔
350

1✔
351
class StreamProtocol(Protocol):
352

1✔
353
    def data_received(self, data):
1✔
354
        self.on_data(data)
1✔
355

1✔
356
    def eof_received(self):
357
        pass
1✔
358

359
    def on_data(self, data):
1✔
360
        self.trigger("data", self, data)
1✔
361

362
    def send(self, data, delay=True, force=False, callback=None):
1✔
363
        # ensures that the provided data value is a bytes sequence
1✔
364
        # so that its format is compliant with what's expected by
365
        # the underlying transport write operation
1✔
366
        data = legacy.bytes(data)
1✔
367

368
        # in case the current transport buffers do not allow writing
1✔
369
        # (paused mode) the writing of the data is delayed until the
370
        # writing is again enabled (resume writing)
371
        if not self._writing:
372
            return self._delay_send(data, callback=callback)
1✔
373

374
        # pushes the write data down to the transport layer immediately
375
        # as writing is still allowed for the current protocol
376
        self._transport.write(data)
377

1✔
378
        # in case there's a callback associated with the send
×
379
        # tries to see if the data has been completely flushed
380
        # (writing still enabled), and if so, schedules the callback
381
        # to be called on the next tick otherwise adds it to the
382
        # callbacks to be called upon the next write resume operation
1✔
383
        if callback:
384
            if self._writing:
385
                self.delay(lambda: callback(self._transport))
386
            else:
387
                self._callbacks.append(callback)
388

389
        # returns the size (in bytes) of the data that has just been
1✔
390
        # explicitly sent through the associated transport
1✔
391
        return len(data)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc