• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hivesolutions / netius / #620821434

22 Apr 2024 05:03PM UTC coverage: 45.312%. First build
#620821434

Pull #27

travis-ci

Pull Request #27: SMTP client support

2 of 4 new or added lines in 1 file covered. (50.0%)

7559 of 16682 relevant lines covered (45.31%)

0.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.53
/src/netius/base/protocol.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3

4
# Hive Netius System
5
# Copyright (c) 2008-2024 Hive Solutions Lda.
6
#
7
# This file is part of Hive Netius System.
8
#
9
# Hive Netius System is free software: you can redistribute it and/or modify
10
# it under the terms of the Apache License as published by the Apache
11
# Foundation, either version 2.0 of the License, or (at your option) any
12
# later version.
13
#
14
# Hive Netius System is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
# Apache License for more details.
18
#
19
# You should have received a copy of the Apache License along with
20
# Hive Netius System. If not, see <http://www.apache.org/licenses/>.
21

22
__author__ = "João Magalhães <joamag@hive.pt>"
1✔
23
""" The author(s) of the module """
24

25
__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda."
1✔
26
""" The copyright for the module """
27

28
__license__ = "Apache License, Version 2.0"
1✔
29
""" The license for the module """
30

31
from . import legacy
1✔
32
from . import request
33
from . import observer
34

1✔
35

36
class Protocol(observer.Observable):
37
    """
1✔
38
    Abstract class from which concrete implementation of
39
    protocol logic should be inherited.
40

1✔
41
    The logic of a protocol should implement both a reaction
1✔
42
    to the arrival of information (receive) and the sending
1✔
43
    of processed data (send).
1✔
44
    """
45

1✔
46
    def __init__(self, owner=None):
47
        observer.Observable.__init__(self)
48
        self.owner = owner
49
        self._transport = None
50
        self._loop = None
51
        self._writing = True
52
        self._open = False
53
        self._closed = False
54
        self._closing = False
55
        self._delayed = []
1✔
56
        self._callbacks = []
1✔
57

1✔
58
    def open(self):
1✔
59
        # in case the protocol is already open, ignores the current
1✔
60
        # call as it's considered a double opening
1✔
61
        if self.is_open():
1✔
62
            return
1✔
63

1✔
64
        # calls the concrete implementation of the open operation
1✔
65
        # allowing an extra level of indirection
1✔
66
        self.open_c()
67

1✔
68
        self.trigger("open", self)
69

70
    def close(self):
1✔
71
        # in case the protocol is already closed, ignores the current
72
        # call considering it a double closing operation
73
        if self.is_closed() or self.is_closing():
74
            return
1✔
75

76
        # calls the concrete implementation of the close operation
1✔
77
        # allowing an extra level of indirection
78
        self.close_c()
1✔
79

80
        self.trigger("close", self)
81

1✔
82
    def finish(self):
83
        # in case the current protocol is already (completely) closed
84
        # or is not in the state of closing, nothing should be done
85
        if self.is_closed():
1✔
86
            return
87
        if not self.is_closing():
1✔
88
            return
89

1✔
90
        # calls the concrete implementation of the finish operation
91
        # allowing an extra level of indirection
92
        self.finish_c()
1✔
93

1✔
94
        self.trigger("finish", self)
95

96
        # runs the "final" destroy operation that is going to run
97
        # the most structural elements of this object
1✔
98
        self.destroy()
99

1✔
100
    def open_c(self):
101
        # unmarks the current protocol from closed (and closing)
102
        # meaning that it will be opened one more time and
103
        # so it must not be considered as closed
1✔
104
        self._open = True
105
        self._closed = False
1✔
106
        self._closing = False
107

108
    def close_c(self):
109
        # marks the current protocol as closing, meaning that although
1✔
110
        # the close operation is not yet finished it's starting
1✔
111
        self._closing = True
1✔
112

113
        # runs the close transport call that triggers the process
1✔
114
        # of closing the underlying transport method, notice that
115
        # this operation is only considered to be safely completed
116
        # on the next tick of the event loop
1✔
117
        self._close_transport()
118

119
        # delays the execution of the finish (cleanup) operation
120
        # so that all the pending operations from the close transport
121
        # call can be executed in the meantime, ensuring a proper,
122
        # secure and clean execution of the finish method
1✔
123
        self.delay(self.finish)
124

125
    def finish_c(self):
126
        del self._delayed[:]
127
        del self._callbacks[:]
128

1✔
129
        self._transport = None
130
        self._loop = None
1✔
131
        self._writing = True
1✔
132
        self._open = False
1✔
133
        self._closed = True
134
        self._closing = False
1✔
135

1✔
136
    def info_dict(self, full=False):
1✔
137
        if not self._transport:
1✔
138
            return dict()
1✔
139
        info = self._transport.info_dict(full=full)
1✔
140
        return info
141

1✔
142
    def connection_made(self, transport):
×
143
        self._transport = transport
×
144

×
145
        # ensure that the protocol is open, please notice
146
        # that most of the time the protocol is already open
1✔
147
        self.open()
1✔
148

149
    def connection_lost(self, exception):
150
        self.close()
151

1✔
152
    def transport(self):
153
        return self._transport
1✔
154

1✔
155
    def loop(self):
156
        return self._loop
1✔
157

1✔
158
    def loop_set(self, loop):
159
        self._loop = loop
1✔
160

×
161
        self.trigger("loop_set", self)
162

1✔
163
    def loop_unset(self):
1✔
164
        self._loop = None
165

1✔
166
        self.trigger("loop_unset", self)
167

1✔
168
    def pause_writing(self):
×
169
        self._writing = False
170

×
171
    def resume_writing(self):
172
        self._writing = True
1✔
173
        self._flush_callbacks()
×
174
        self._flush_send()
175

1✔
176
    def delay(self, callable, timeout=None):
×
177
        # in case there's no event loop defined for the protocol
×
178
        # it's not possible to delay this execution so the
×
179
        # callable is called immediately
180
        if not self._loop:
1✔
181
            return callable()
182

183
        # verifies if the assigned loop contains the non-standard
184
        # delay method and if that's the case calls it instead of
1✔
185
        # the base asyncio API ones (compatibility)
186
        if hasattr(self._loop, "delay"):
187
            immediately = timeout == None
188
            return self._loop.delay(callable, timeout=timeout, immediately=immediately)
189

1✔
190
        # calls the proper call method taking into account if a timeout
1✔
191
        # value exists or not (soon against later)
1✔
192
        if timeout:
193
            return self._loop.call_later(timeout, callable)
194
        else:
195
            return self._loop.call_soon(callable)
196

197
    def debug(self, object):
198
        if not self._loop:
199
            return
×
200
        if not hasattr(self._loop, "debug"):
×
201
            return
202
        self._loop.debug(object)
1✔
203

1✔
204
    def info(self, object):
1✔
205
        if not self._loop:
1✔
206
            return
207
        if not hasattr(self._loop, "info"):
1✔
208
            return
×
209
        self._loop.info(object)
×
210

×
211
    def warning(self, object):
212
        if not self._loop:
1✔
213
            return
×
214
        if not hasattr(self._loop, "warning"):
×
215
            return
×
216
        self._loop.warning(object)
217

1✔
218
    def error(self, object):
×
219
        if not self._loop:
×
220
            return
×
221
        if not hasattr(self._loop, "error"):
222
            return
1✔
223
        self._loop.error(object)
×
224

×
225
    def critical(self, object):
×
226
        if not self._loop:
227
            return
1✔
228
        if not hasattr(self._loop, "critical"):
×
229
            return
230
        self._loop.critical(object)
1✔
231

1✔
232
    def is_pending(self):
233
        return not self._open and not self._closed and not self._closing
1✔
234

1✔
235
    def is_open(self):
236
        return self._open
1✔
237

1✔
238
    def is_closed(self):
239
        return self._closed
1✔
240

×
241
    def is_closing(self):
242
        return self._closing
1✔
243

1✔
244
    def is_closed_or_closing(self):
1✔
245
        return self._closed or self._closing
1✔
246

247
    def is_devel(self):
1✔
248
        if not self._loop:
1✔
249
            return False
1✔
250
        if not hasattr(self._loop, "is_devel"):
251
            return False
1✔
252
        return self._loop.is_devel()
×
253

×
254
    def _close_transport(self, force=False):
×
255
        if not self._transport:
256
            return
1✔
257
        self._transport.abort()
×
258

×
259
    def _delay_send(self, data, address=None, callback=None):
×
260
        item = (data, address, callback)
261
        self._delayed.append(item)
1✔
262
        return len(data)
×
263

×
264
    def _flush_callbacks(self):
×
265
        while self._callbacks:
×
266
            callback = self._callbacks.pop(0)
×
267
            self.delay(lambda: callback(self._transport))
×
268

269
    def _flush_send(self):
1✔
270
        while True:
271
            if not self._delayed:
1✔
272
                break
1✔
273
            if not self._writing:
1✔
274
                break
1✔
275
            data, address, callback = self._delayed.pop(0)
276
            if address:
1✔
277
                self.send(data, address, callback=callback)  # pylint: disable=E1101
1✔
278
            else:
279
                self.send(data, callback=callback)  # pylint: disable=E1101
1✔
280

×
281

282
class DatagramProtocol(Protocol):
1✔
283

1✔
284
    def __init__(self):
285
        Protocol.__init__(self)
1✔
286
        self.requests = []
287
        self.requests_m = {}
288

289
    def datagram_received(self, data, address):
290
        self.on_data(address, data)
291

292
    def error_received(self, exception):
293
        pass
1✔
294

295
    def on_data(self, address, data):
296
        self.trigger("data", self, data)
297

298
    def send(self, data, address, delay=True, force=False, callback=None):
299
        return self.send_to(data, address, delay=delay, force=force, callback=callback)
300

301
    def send_to(self, data, address, delay=True, force=False, callback=None):
1✔
302
        # ensures that the provided data value is a bytes sequence
303
        # so that its format is compliant with what's expected by
304
        # the underlying transport send to operation
305
        data = legacy.bytes(data)
306

307
        # in case the current transport buffers do not allow writing
308
        # (paused mode) the writing of the data is delayed until the
309
        # writing is again enabled (resume writing)
310
        if not self._writing:
311
            return self._delay_send(data, address=address, callback=callback)
312

1✔
313
        # pushes the write data down to the transport layer immediately
314
        # as writing is still allowed for the current protocol
315
        self._transport.sendto(data, address)
316

317
        # in case there's a callback associated with the send
1✔
318
        # tries to see if the data has been completely flushed
×
319
        # (writing still enabled) and if so schedules the callback
320
        # to be called on the next tick, otherwise adds it to the
321
        # callbacks to be called upon the next write resume operation
322
        if callback:
323
            if self._writing:
324
                self.delay(lambda: callback(self._transport))
325
            else:
326
                self._callbacks.append(callback)
1✔
327

328
        # returns the size (in bytes) of the data that has just been
329
        # explicitly sent through the associated transport
330
        return len(data)
331

332
    def add_request(self, request):
333
        # adds the current request object to the list of requests
1✔
334
        # that are pending a valid response, a garbage collector
×
335
        # system should be able to erase this request from the
×
336
        # pending list in case a timeout value has passed
337
        self.requests.append(request)
338
        self.requests_m[request.id] = request
339

1✔
340
    def remove_request(self, request):
341
        self.requests.remove(request)
1✔
342
        del self.requests_m[request.id]
343

344
    def get_request(self, id):
345
        is_response = isinstance(id, request.Response)
346
        if is_response:
1✔
347
            id = id.get_id()
1✔
348
        return self.requests_m.get(id, None)
349

1✔
350

1✔
351
class StreamProtocol(Protocol):
1✔
352

353
    def data_received(self, data):
1✔
354
        self.on_data(data)
1✔
355

1✔
356
    def eof_received(self):
1✔
357
        pass
358

1✔
359
    def on_data(self, data):
360
        self.trigger("data", self, data)
1✔
361

1✔
362
    def send(self, data, delay=True, force=False, callback=None):
363
        # ensures that the provided data value is a bytes sequence
1✔
364
        # so that its format is compliant with what's expected by
1✔
365
        # the underlying transport write operation
366
        data = legacy.bytes(data)
1✔
367

1✔
368
        # in case the current transport buffers do not allow writing
369
        # (paused mode) the writing of the data is delayed until the
1✔
370
        # writing is again enabled (resume writing)
371
        if not self._writing:
372
            return self._delay_send(data, callback=callback)
373

1✔
374
        # pushes the write data down to the transport layer immediately
375
        # as writing is still allowed for the current protocol
376
        self._transport.write(data)
377

378
        # in case there's a callback associated with the send
1✔
379
        # tries to see if the data has been completely flushed
×
380
        # (writing still enabled), and if so, schedules the callback
381
        # to be called on the next tick otherwise adds it to the
382
        # callbacks to be called upon the next write resume operation
383
        if callback:
1✔
384
            if self._writing:
385
                self.delay(lambda: callback(self._transport))
386
            else:
387
                self._callbacks.append(callback)
388

389
        # returns the size (in bytes) of the data that has just been
390
        # explicitly sent through the associated transport
1✔
391
        return len(data)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc