• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hivesolutions / netius / 4006

pending completion
4006

push

travis-ci-com

joamag
chore: added ignore settings

6844 of 15290 relevant lines covered (44.76%)

3.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.46
/src/netius/base/server.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3

4
# Hive Netius System
5
# Copyright (c) 2008-2018 Hive Solutions Lda.
6
#
7
# This file is part of Hive Netius System.
8
#
9
# Hive Netius System is free software: you can redistribute it and/or modify
10
# it under the terms of the Apache License as published by the Apache
11
# Foundation, either version 2.0 of the License, or (at your option) any
12
# later version.
13
#
14
# Hive Netius System is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
# Apache License for more details.
18
#
19
# You should have received a copy of the Apache License along with
20
# Hive Netius System. If not, see <http://www.apache.org/licenses/>.
21

22
__author__ = "João Magalhães <joamag@hive.pt>"
8✔
23
""" The author(s) of the module """
24

25
__version__ = "1.0.0"
8✔
26
""" The version of the module """
27

28
__revision__ = "$LastChangedRevision$"
8✔
29
""" The revision number of the module """
30

31
__date__ = "$LastChangedDate$"
8✔
32
""" The last change date of the module """
33

34
__copyright__ = "Copyright (c) 2008-2018 Hive Solutions Lda."
8✔
35
""" The copyright for the module """
36

37
__license__ = "Apache License, Version 2.0"
8✔
38
""" The license for the module """
39

40
from .common import * #@UnusedWildImport
8✔
41

42
BUFFER_SIZE_S = None
8✔
43
""" The size of both the send and receive buffers for
44
the socket representing the server, this socket is
45
responsible for the handling of the new connections """
46

47
BUFFER_SIZE_C = None
8✔
48
""" The size of the buffers (send and receive) that
49
is going to be set on the on the sockets created by
50
the server (client sockets), this is critical for a
51
good performance of the server (large value) """
52

53
class Server(Base):
8✔
54

55
    def __init__(self, *args, **kwargs):
8✔
56
        Base.__init__(self, *args, **kwargs)
8✔
57
        self.receive_buffer_s = kwargs.get("receive_buffer_s", BUFFER_SIZE_S)
8✔
58
        self.send_buffer_s = kwargs.get("send_buffer_s", BUFFER_SIZE_S)
8✔
59
        self.receive_buffer_c = kwargs.get("receive_buffer_c", BUFFER_SIZE_C)
8✔
60
        self.send_buffer_c = kwargs.get("send_buffer_c", BUFFER_SIZE_C)
8✔
61
        self.socket = None
8✔
62
        self.host = None
8✔
63
        self.port = None
8✔
64
        self.type = None
8✔
65
        self.ssl = False
8✔
66
        self.key_file = None
8✔
67
        self.cer_file = None
8✔
68
        self.ca_file = None
8✔
69
        self.env = False
8✔
70
        self.allowed = []
8✔
71

72
    def welcome(self):
8✔
73
        Base.welcome(self)
×
74

75
        self.info("Booting %s %s (%s) ..." % (NAME, VERSION, PLATFORM))
×
76

77
    def cleanup(self):
8✔
78
        Base.cleanup(self)
8✔
79

80
        # unsubscribes the current socket from all the positions in
81
        # the current polling mechanism, required for coherence
82
        self.unsub_all(self.socket)
8✔
83

84
        # tries to close the service socket, as this is the one that
85
        # has no connection associated and is independent
86
        try: self.socket and self.socket.close()
8✔
87
        except: pass
×
88

89
        # unsets the socket attribute as the socket should now be closed
90
        # and not able to be used for any kind of communication
91
        self.socket = None
8✔
92

93
    def info_dict(self, full = False):
8✔
94
        info = Base.info_dict(self, full = full)
×
95
        info.update(
×
96
            host = self.host,
97
            port = self.port,
98
            type = self.type,
99
            ssl = self.ssl
100
        )
101
        return info
×
102

103
    def serve(
8✔
104
        self,
105
        host = None,
106
        port = 9090,
107
        type = TCP_TYPE,
108
        ipv6 = False,
109
        ssl = False,
110
        key_file = None,
111
        cer_file = None,
112
        ca_file = None,
113
        ca_root = True,
114
        ssl_verify = False,
115
        ssl_host = None,
116
        ssl_fingerprint = None,
117
        ssl_dump = False,
118
        setuid = None,
119
        backlog = socket.SOMAXCONN,
120
        load = True,
121
        start = True,
122
        env = False
123
    ):
124
        # processes the various default values taking into account if
125
        # the environment variables are meant to be processed for the
126
        # current context (default values are processed accordingly)
127
        host = self.get_env("HOST", host) if env else host
×
128
        port = self.get_env("PORT", port, cast = int) if env else port
×
129
        type = self.get_env("TYPE", type, cast = int) if env else type
×
130
        ipv6 = self.get_env("IPV6", ipv6, cast = bool) if env else ipv6
×
131
        ssl = self.get_env("SSL", ssl, cast = bool) if env else ssl
×
132
        port = self.get_env("UNIX_PATH", port) if env else port
×
133
        key_file = self.get_env("KEY_FILE", key_file) if env else key_file
×
134
        cer_file = self.get_env("CER_FILE", cer_file) if env else cer_file
×
135
        ca_file = self.get_env("CA_FILE", ca_file) if env else ca_file
×
136
        ca_root = self.get_env("CA_ROOT", ca_root, cast = bool) if env else ca_root
×
137
        ssl_verify = self.get_env("SSL_VERIFY", ssl_verify, cast = bool) if env else ssl_verify
×
138
        ssl_host = self.get_env("SSL_HOST", ssl_host) if env else ssl_host
×
139
        ssl_fingerprint = self.get_env("SSL_FINGERPRINT", ssl_fingerprint) if env else ssl_fingerprint
×
140
        ssl_dump = self.get_env("SSL_DUMP", ssl_dump) if env else ssl_dump
×
141
        key_file = self.get_env("KEY_DATA", key_file, expand = True) if env else key_file
×
142
        cer_file = self.get_env("CER_DATA", cer_file, expand = True) if env else cer_file
×
143
        ca_file = self.get_env("CA_DATA", ca_file, expand = True) if env else ca_file
×
144
        setuid = self.get_env("SETUID", setuid, cast = int) if env else setuid
×
145
        backlog = self.get_env("BACKLOG", backlog, cast = int) if env else backlog
×
146

147
        # runs the various extra variable initialization taking into
148
        # account if the environment variable is currently set or not
149
        # please note that some side effects may arise from this set
150
        if env: self.level = self.get_env("LEVEL", self.level)
×
151
        if env: self.diag = self.get_env("DIAG", self.diag, cast = bool)
×
152
        if env: self.middleware = self.get_env("MIDDLEWARE", self.middleware, cast = list)
×
153
        if env: self.children = self.get_env("CHILD", self.children, cast = int)
×
154
        if env: self.children = self.get_env("CHILDREN", self.children, cast = int)
×
155
        if env: self.logging = self.get_env("LOGGING", self.logging)
×
156
        if env: self.poll_name = self.get_env("POLL", self.poll_name)
×
157
        if env: self.poll_timeout = self.get_env(
×
158
            "POLL_TIMEOUT",
159
            self.poll_timeout,
160
            cast = float
161
        )
162
        if env: self.keepalive_timeout = self.get_env(
×
163
            "KEEPALIVE_TIMEOUT",
164
            self.keepalive_timeout,
165
            cast = int
166
        )
167
        if env: self.keepalive_interval = self.get_env(
×
168
            "KEEPALIVE_INTERVAL",
169
            self.keepalive_interval,
170
            cast = int
171
        )
172
        if env: self.keepalive_count = self.get_env(
×
173
            "KEEPALIVE_COUNT",
174
            self.keepalive_count,
175
            cast = int
176
        )
177
        if env: self.allowed = self.get_env("ALLOWED", self.allowed, cast = list)
×
178

179
        # updates the current service status to the configuration
180
        # stage as the next steps is to configure the service socket
181
        self.set_state(STATE_CONFIG)
×
182

183
        # starts the loading process of the base system so that the system should
184
        # be able to log some information that is going to be output
185
        if load: self.load()
×
186

187
        # ensures the proper default address value, taking into account
188
        # the type of connection that is currently being used, this avoids
189
        # problems with multiple stack based servers (ipv4 and ipv6)
190
        if host == None: host = "::1" if ipv6 else "127.0.0.1"
×
191

192
        # defaults the provided ssl key and certificate paths to the
193
        # ones statically defined (dummy certificates), please beware
194
        # that using these certificates may create validation problems
195
        key_file = key_file or SSL_KEY_PATH
×
196
        cer_file = cer_file or SSL_CER_PATH
×
197
        ca_file = ca_file or SSL_CA_PATH
×
198

199
        # populates the basic information on the currently running
200
        # server like the host the port and the (is) ssl flag to be
201
        # used latter for reference operations
202
        self.host = host
×
203
        self.port = port
×
204
        self.type = type
×
205
        self.ssl = ssl
×
206
        self.ssl_host = ssl_host
×
207
        self.ssl_fingerprint = ssl_fingerprint
×
208
        self.ssl_dump = ssl_dump
×
209
        self.env = env
×
210

211
        # populates the key, certificate and certificate authority file
212
        # information with the values that have just been resolved, these
213
        # values are going to be used for runtime certificate loading
214
        self.key_file = key_file
×
215
        self.cer_file = cer_file
×
216
        self.ca_file = ca_file
×
217

218
        # determines if the client side certificate should be verified
219
        # according to the loaded certificate authority values or if
220
        # on the contrary no (client) validation should be performed
221
        ssl_verify = ssl_verify or False
×
222

223
        # verifies if the type of server that is going to be created is
224
        # unix or internet based, this allows the current infra-structure
225
        # to work under the much more latency free unix sockets
226
        is_unix = host == "unix"
×
227

228
        # checks the type of service that is meant to be created and
229
        # creates a service socket according to the defined service
230
        family = socket.AF_INET6 if ipv6 else socket.AF_INET
×
231
        family = socket.AF_UNIX if is_unix else family
×
232
        if type == TCP_TYPE: self.socket = self.socket_tcp(
×
233
            ssl,
234
            key_file = key_file,
235
            cer_file = cer_file,
236
            ca_file = ca_file,
237
            ca_root = ca_root,
238
            ssl_verify = ssl_verify,
239
            family = family
240
        )
241
        elif type == UDP_TYPE: self.socket = self.socket_udp()
×
242
        else: raise errors.NetiusError("Invalid server type provided '%d'" % type)
×
243

244
        # "calculates" the address "bind target", taking into account that this
245
        # server may be running under a unix based socket infra-structure and
246
        # if that's the case the target (file path) is also removed, avoiding
247
        # a duplicated usage of the socket (required for address re-usage)
248
        address = port if is_unix else (host, port)
×
249
        if is_unix and os.path.exists(address): os.remove(address)
×
250

251
        # binds the socket to the provided address value (per spec) and then
252
        # starts the listening in the socket with the provided backlog value
253
        # defaulting to the typical maximum backlog as possible if not provided
254
        self.socket.bind(address)
×
255
        if type == TCP_TYPE: self.socket.listen(backlog)
×
256

257
        # in case the set user id value the user of the current process should
258
        # be changed so that it represents the new (possibly unprivileged user)
259
        if setuid: os.setuid(setuid)
×
260

261
        # in case the selected port is zero based, meaning that a randomly selected
262
        # port has been assigned by the bind operation the new port must be retrieved
263
        # and set for the current server instance as the new port (for future reference)
264
        if self.port == 0: self.port = self.socket.getsockname()[1]
×
265

266
        # creates the string that identifies it the current service connection
267
        # is using a secure channel (ssl) and then prints an info message about
268
        # the service that is going to be started
269
        ipv6_s = " on ipv6" if ipv6 else ""
×
270
        ssl_s = " using ssl" if ssl else ""
×
271
        self.info("Serving '%s' service on %s:%s%s%s ..." % (self.name, host, port, ipv6_s, ssl_s))
×
272

273
        # runs the fork operation responsible for the forking of the
274
        # current process into the various child processes for multiple
275
        # process based parallelism, note that this must be done after
276
        # the master socket has been created (to be shared), note that
277
        # in case the result is not valid an immediate return is performed
278
        # as this represents a master based process (not meant to serve)
279
        result = self.fork()
×
280
        if not result: return
×
281

282
        # ensures that the current polling mechanism is correctly open as the
283
        # service socket is going to be added to it next, this overrides the
284
        # default behavior of the common infra-structure (on start)
285
        self.poll = self.build_poll()
×
286
        self.poll.open(timeout = self.poll_timeout)
×
287

288
        # adds the socket to all of the pool lists so that it's ready to read
289
        # write and handle error, this is the expected behavior of a service
290
        # socket so that it can handle all of the expected operations
291
        self.sub_all(self.socket)
×
292

293
        # calls the on serve callback handler so that underlying services may be
294
        # able to respond to the fact that the service is starting and some of
295
        # them may print some specific debugging information
296
        self.on_serve()
×
297

298
        # starts the base system so that the event loop gets started and the
299
        # the servers gets ready to accept new connections (starts service)
300
        if start: self.start()
×
301

302
    def socket_tcp(
8✔
303
        self,
304
        ssl = False,
305
        key_file = None,
306
        cer_file = None,
307
        ca_file = None,
308
        ca_root = True,
309
        ssl_verify = False,
310
        family = socket.AF_INET,
311
        type = socket.SOCK_STREAM
312
    ):
313
        # verifies if the provided family is of type internet and if that's
314
        # the case the associated flag is set to valid for usage
315
        is_inet = family in (socket.AF_INET, socket.AF_INET6)
×
316

317
        # retrieves the proper string based type for the current server socket
318
        # and the prints a series of log message about the socket to be created
319
        type_s = "ssl" if ssl else ""
×
320
        self.debug("Creating server's tcp %s socket ..." % type_s)
×
321
        if ssl: self.debug("Loading '%s' as key file" % key_file)
×
322
        if ssl: self.debug("Loading '%s' as certificate file" % cer_file)
×
323
        if ssl and ca_file: self.debug("Loading '%s' as certificate authority file" % ca_file)
×
324
        if ssl and ssl_verify: self.debug("Loading with client ssl verification")
×
325

326
        # creates the socket that it's going to be used for the listening
327
        # of new connections (server socket) and sets it as non blocking
328
        _socket = socket.socket(family, type)
×
329
        _socket.setblocking(0)
×
330

331
        # in case the server is meant to be used as ssl wraps the socket
332
        # in suck fashion so that it becomes "secured"
333
        if ssl: _socket = self._ssl_wrap(
×
334
            _socket,
335
            key_file = key_file,
336
            cer_file = cer_file,
337
            ca_file = ca_file,
338
            ca_root = ca_root,
339
            ssl_verify = ssl_verify,
340
            server = True
341
        )
342

343
        # sets the various options in the service socket so that it becomes
344
        # ready for the operation with the highest possible performance, these
345
        # options include the reuse address to be able to re-bind to the port
346
        # and address and the keep alive that drops connections after some time
347
        # avoiding the leak of connections (operative system managed)
348
        _socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
×
349
        _socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
×
350
        if is_inet: _socket.setsockopt(
×
351
            socket.IPPROTO_TCP,
352
            socket.TCP_NODELAY,
353
            1
354
        )
355
        if self.receive_buffer_s: _socket.setsockopt(
×
356
            socket.SOL_SOCKET,
357
            socket.SO_RCVBUF,
358
            self.receive_buffer_s
359
        )
360
        if self.send_buffer_s: _socket.setsockopt(
×
361
            socket.SOL_SOCKET,
362
            socket.SO_SNDBUF,
363
            self.send_buffer_s
364
        )
365
        self._socket_keepalive(_socket)
×
366

367
        # returns the created tcp socket to the calling method so that it
368
        # may be used from this point on
369
        return _socket
×
370

371
    def socket_udp(self, family = socket.AF_INET, type = socket.SOCK_DGRAM):
8✔
372
        # prints a small debug message about the udp socket that is going
373
        # to be created for the server's connection
374
        self.debug("Creating server's udp socket ...")
×
375

376
        # creates the socket that it's going to be used for the listening
377
        # of new connections (server socket) and sets it as non blocking
378
        _socket = socket.socket(family, type)
×
379
        _socket.setblocking(0)
×
380

381
        # sets the various options in the service socket so that it becomes
382
        # ready for the operation with the highest possible performance
383
        _socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
×
384
        _socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
×
385

386
        # returns the created udp socket to the calling method so that it
387
        # may be used from this point on
388
        return _socket
×
389

390
    def on_serve(self):
8✔
391
        pass
×
392

393
class DatagramServer(Server):
8✔
394

395
    def __init__(self, *args, **kwargs):
8✔
396
        Server.__init__(self, *args, **kwargs)
×
397
        self.renable = True
×
398
        self.wready = True
×
399
        self.pending_s = 0
×
400
        self.pending = collections.deque()
×
401
        self.pending_lock = threading.RLock()
×
402

403
    def reads(self, reads, state = True):
8✔
404
        Server.reads(self, reads, state = state)
×
405
        for read in reads:
×
406
            self.on_read(read)
×
407

408
    def writes(self, writes, state = True):
8✔
409
        Server.writes(self, writes, state = state)
×
410
        for write in writes:
×
411
            self.on_write(write)
×
412

413
    def errors(self, errors, state = True):
8✔
414
        Server.errors(self, errors, state = state)
×
415
        for error in errors:
×
416
            self.on_error(error)
×
417

418
    def serve(self, type = UDP_TYPE, *args, **kwargs):
8✔
419
        Server.serve(self, type = type, *args, **kwargs)
×
420

421
    def on_read(self, _socket):
8✔
422
        # tries to retrieve a proper callback for the socket
423
        # that received the read operations and calls the
424
        # proper callback as expected
425
        callbacks = self.callbacks_m.get(_socket, None)
×
426
        if callbacks:
×
427
            for callback in callbacks: callback("read", _socket)
×
428

429
        # in case the read enabled flag is not currently set
430
        # must return immediately because the read operation
431
        # is not currently being allowed
432
        if not self.renable == True: return
×
433

434
        # verifies if the provided socket for reading is the same
435
        # as the one registered in the client if that's not the case
436
        # return immediately to avoid unwanted operations
437
        if not _socket == self.socket: return
×
438

439
        try:
×
440
            # iterates continuously trying to read as much data as possible
441
            # when there's a failure to read more data it should raise an
442
            # exception that should be handled properly, note that if the
443
            # read enabled flag changed in the middle of the read handler
444
            # the loop is stop as no more read operations are allowed
445
            while True:
×
446
                data, address = _socket.recvfrom(CHUNK_SIZE)
×
447
                self.on_data(address, data)
×
448
                if not self.renable == True: break
×
449
        except ssl.SSLError as error:
×
450
            error_v = error.args[0] if error.args else None
×
451
            error_m = error.reason if hasattr(error, "reason") else None
×
452
            if error_v in SSL_SILENT_ERRORS:
×
453
                self.on_expected(error)
×
454
            elif not error_v in SSL_VALID_ERRORS and\
×
455
                not error_m in SSL_VALID_REASONS:
456
                self.on_exception(error)
×
457
        except socket.error as error:
×
458
            error_v = error.args[0] if error.args else None
×
459
            if error_v in SILENT_ERRORS:
×
460
                self.on_expected(error)
×
461
            elif not error_v in VALID_ERRORS:
×
462
                self.on_exception(error)
×
463
        except BaseException as exception:
×
464
            self.on_exception(exception)
×
465

466
    def on_write(self, _socket):
8✔
467
        callbacks = self.callbacks_m.get(_socket, None)
×
468
        if callbacks:
×
469
            for callback in callbacks: callback("write", _socket)
×
470

471
        # verifies if the provided socket for writing is the same
472
        # as the one registered in the client if that's not the case
473
        # return immediately to avoid unwanted operations
474
        if not _socket == self.socket: return
×
475

476
        try:
×
477
            self._send(_socket)
×
478
        except ssl.SSLError as error:
×
479
            error_v = error.args[0] if error.args else None
×
480
            error_m = error.reason if hasattr(error, "reason") else None
×
481
            if error_v in SSL_SILENT_ERRORS:
×
482
                self.on_expected(error)
×
483
            elif not error_v in SSL_VALID_ERRORS and\
×
484
                not error_m in SSL_VALID_REASONS:
485
                self.on_exception(error)
×
486
        except socket.error as error:
×
487
            error_v = error.args[0] if error.args else None
×
488
            if error_v in SILENT_ERRORS:
×
489
                self.on_expected(error)
×
490
            elif not error_v in VALID_ERRORS:
×
491
                self.on_exception(error)
×
492
        except BaseException as exception:
×
493
            self.on_exception(exception)
×
494

495
    def on_error(self, _socket):
8✔
496
        callbacks = self.callbacks_m.get(_socket, None)
×
497
        if callbacks:
×
498
            for callback in callbacks: callback("error", _socket)
×
499

500
        # verifies if the provided socket for error is the same
501
        # as the one registered in the client if that's not the case
502
        # return immediately to avoid unwanted operations
503
        if not _socket == self.socket: return
×
504

505
    def on_exception(self, exception):
8✔
506
        self.warning(exception)
×
507
        self.log_stack()
×
508

509
    def on_expected(self, exception):
8✔
510
        self.debug(exception)
×
511

512
    def on_data(self, address, data):
8✔
513
        pass
×
514

515
    def ensure_write(self):
8✔
516
        # retrieves the identifier of the current thread and
517
        # checks if it's the same as the one defined in the
518
        # owner in case it's not then the operation is not
519
        # considered to be safe and must be delayed
520
        cthread = threading.current_thread()
×
521
        tid = cthread.ident or 0
×
522
        is_safe = tid == self.tid
×
523

524
        # in case the thread where this code is being executed
525
        # is not the same the operation is considered to be not
526
        # safe and so it must be delayed to be executed in the
527
        # next loop of the thread cycle, must return immediately
528
        # to avoid extra subscription operations
529
        if not is_safe: return self.delay(self.ensure_write, safe = True)
×
530

531
        # adds the current socket to the list of write operations
532
        # so that it's going to be available for writing as soon
533
        # as possible from the poll mechanism
534
        self.sub_write(self.socket)
×
535

536
    def remove_write(self):
8✔
537
        self.unsub_write(self.socket)
×
538

539
    def enable_read(self):
8✔
540
        if not self.renable == False: return
×
541
        self.renable = True
×
542
        self.sub_read(self.socket)
×
543

544
    def disable_read(self):
8✔
545
        if not self.renable == True: return
×
546
        self.renable = False
×
547
        self.unsub_read(self.socket)
×
548

549
    def send(self, data, address, delay = True, callback = None):
8✔
550
        data = legacy.bytes(data)
×
551
        data_l = len(data)
×
552

553
        if callback: data = (data, callback)
×
554
        data = (data, address)
×
555

556
        cthread = threading.current_thread()
×
557
        tid = cthread.ident or 0
×
558
        is_safe = tid == self.tid
×
559

560
        self.pending_lock.acquire()
×
561
        try: self.pending.appendleft(data)
×
562
        finally: self.pending_lock.release()
×
563

564
        self.pending_s += data_l
×
565

566
        if self.wready:
×
567
            if is_safe and not delay: self._flush_write()
×
568
            else: self.delay(
×
569
                self._flush_write,
570
                immediately = True,
571
                verify = True,
572
                safe = True
573
            )
574
        else:
575
            self.ensure_write()
×
576

577
    def _send(self, _socket):
8✔
578
        self.wready = True
×
579
        self.pending_lock.acquire()
×
580
        try:
×
581
            while True:
×
582
                # in case there's no pending data to be sent to the
583
                # client side breaks the current loop (queue empty)
584
                if not self.pending: break
×
585

586
                # retrieves the current data from the pending list
587
                # of data to be sent and then saves the original data
588
                # object (for latter usage), sets the callback as not
589
                # defined and then unpacks the data into data and address
590
                data = self.pending.pop()
×
591
                data_o = data
×
592
                callback = None
×
593
                data, address = data
×
594

595
                # verifies if the data type of the data is a tuple and
596
                # if that's the case unpacks it as data and callback
597
                is_tuple = type(data) == tuple
×
598
                if is_tuple: data, callback = data
×
599

600
                # retrieves the length (in bytes) of the data that is
601
                # going to be sent to the client
602
                data_l = len(data)
×
603

604
                try:
×
605
                    # tries to send the data through the socket and
606
                    # retrieves the number of bytes that were correctly
607
                    # sent through the socket, this number may not be
608
                    # the same as the size of the data in case only
609
                    # part of the data has been sent
610
                    if data: count = _socket.sendto(data, address)
×
611
                    else: count = 0
×
612

613
                    # verifies if the current situation is that of a non
614
                    # closed socket and valid data, and if that's the case
615
                    # and no data has been sent the socket is considered to
616
                    # be in a would block situation and and such an error
617
                    # is raised indicating the issue (is going to be caught
618
                    # as a normal would block exception)
619
                    if data and count == 0: raise socket.error(errno.EWOULDBLOCK)
×
620
                except:
×
621
                    # sets the current connection write ready flag to false
622
                    # so that a new level notification must be received
623
                    self.wready = False
×
624

625
                    # ensures that the write event is going to be triggered
626
                    # this is required so that the remaining pending data is
627
                    # going to be correctly written on a new write event,
628
                    # triggered when the connection is ready for more writing
629
                    self.ensure_write()
×
630

631
                    # in case there's an exception must add the data
632
                    # object to the list of pending data because the data
633
                    # has not been correctly sent
634
                    self.pending.append(data_o)
×
635
                    raise
×
636
                else:
637
                    # decrements the size of the pending buffer by the number
638
                    # of bytes that were correctly send through the buffer
639
                    self.pending_s -= count
×
640

641
                    # verifies if the data has been correctly sent through
642
                    # the socket and for suck situations calls the callback
643
                    # object, otherwise creates a new data object with only
644
                    # the remaining (partial data) and the callback to be
645
                    # sent latter (only then the callback is called)
646
                    is_valid = count == data_l
×
647
                    if is_valid:
×
648
                        callback and callback(self)
×
649
                    else:
650
                        data_o = ((data[count:], callback), address)
×
651
                        self.pending.append(data_o)
×
652
        finally:
653
            self.pending_lock.release()
×
654

655
        self.remove_write()
×
656

657
    def _flush_write(self):
8✔
658
        """
659
        Flush operations to be called by the delaying controller
660
        (in ticks) that will trigger all the write operations
661
        pending for the current connection's socket.
662
        """
663

664
        self.writes((self.socket,), state = False)
×
665

666
class StreamServer(Server):
8✔
667

668
    def reads(self, reads, state = True):
8✔
669
        Server.reads(self, reads, state = state)
×
670
        for read in reads:
×
671
            if read == self.socket: self.on_read_s(read)
×
672
            else: self.on_read(read)
×
673

674
    def writes(self, writes, state = True):
8✔
675
        Server.writes(self, writes, state = state)
×
676
        for write in writes:
×
677
            if write == self.socket: self.on_write_s(write)
×
678
            else: self.on_write(write)
×
679

680
    def errors(self, errors, state = True):
8✔
681
        Server.errors(self, errors, state = state)
×
682
        for error in errors:
×
683
            if error == self.socket: self.on_error_s(error)
×
684
            else: self.on_error(error)
×
685

686
    def serve(self, type = TCP_TYPE, *args, **kwargs):
8✔
687
        Server.serve(self, type = type, *args, **kwargs)
×
688

689
    def on_read_s(self, _socket):
8✔
690
        try:
×
691
            while True:
×
692
                socket_c, address = _socket.accept()
×
693
                try: self.on_socket_c(socket_c, address)
×
694
                except: socket_c.close(); raise
×
695
        except ssl.SSLError as error:
×
696
            error_v = error.args[0] if error.args else None
×
697
            error_m = error.reason if hasattr(error, "reason") else None
×
698
            if error_v in SSL_SILENT_ERRORS:
×
699
                self.on_expected_s(error)
×
700
            elif not error_v in SSL_VALID_ERRORS and\
×
701
                not error_m in SSL_VALID_REASONS:
702
                self.on_exception_s(error)
×
703
        except socket.error as error:
×
704
            error_v = error.args[0] if error.args else None
×
705
            if error_v in SILENT_ERRORS:
×
706
                self.on_expected_s(error)
×
707
            elif not error_v in VALID_ERRORS:
×
708
                self.on_exception_s(error)
×
709
        except BaseException as exception:
×
710
            self.on_exception_s(exception)
×
711

712
    def on_write_s(self, _socket):
8✔
713
        pass
×
714

715
    def on_error_s(self, _socket):
8✔
716
        pass
×
717

718
    def on_read(self, _socket):
8✔
719
        # tries to retrieve a possible callback registered for the socket
720
        # and if there's one calls it to be able to "append" extra operations
721
        # to the execution of the read operation in the socket
722
        callbacks = self.callbacks_m.get(_socket, None)
×
723
        if callbacks:
×
724
            for callback in callbacks: callback("read", _socket)
×
725

726
        # tries to retrieve the connection from the provided socket
727
        # object (using the associative map) in case there no connection
728
        # or the connection is not ready for return the control flow is
729
        # returned to the caller method (nothing to be done)
730
        connection = self.connections_m.get(_socket, None)
×
731
        if not connection: return
×
732
        if not connection.status == OPEN: return
×
733
        if not connection.renable == True: return
×
734

735
        try:
×
736
            # verifies if there's any pending operations in the
737
            # connection (eg: ssl handshaking) and performs it trying
738
            # to finish them, if they are still pending at the current
739
            # state returns immediately (waits for next loop)
740
            if self._pending(connection): return
×
741

742
            # iterates continuously trying to read as much data as possible
743
            # when there's a failure to read more data it should raise an
744
            # exception that should be handled properly
745
            while True:
×
746
                data = connection.recv(CHUNK_SIZE)
×
747
                if data: self.on_data(connection, data)
×
748
                else: connection.close(); break
×
749
                if not connection.status == OPEN: break
×
750
                if not connection.renable == True: break
×
751
                if not connection.socket == _socket: break
×
752
        except ssl.SSLError as error:
×
753
            error_v = error.args[0] if error.args else None
×
754
            error_m = error.reason if hasattr(error, "reason") else None
×
755
            if error_v in SSL_SILENT_ERRORS:
×
756
                self.on_expected(error, connection)
×
757
            elif not error_v in SSL_VALID_ERRORS and\
×
758
                not error_m in SSL_VALID_REASONS:
759
                self.on_exception(error, connection)
×
760
        except socket.error as error:
×
761
            error_v = error.args[0] if error.args else None
×
762
            if error_v in SILENT_ERRORS:
×
763
                self.on_expected(error, connection)
×
764
            elif not error_v in VALID_ERRORS:
×
765
                self.on_exception(error, connection)
×
766
        except BaseException as exception:
×
767
            self.on_exception(exception, connection)
×
768

769
    def on_write(self, _socket):
8✔
770
        callbacks = self.callbacks_m.get(_socket, None)
×
771
        if callbacks:
×
772
            for callback in callbacks: callback("write", _socket)
×
773

774
        connection = self.connections_m.get(_socket, None)
×
775
        if not connection: return
×
776
        if not connection.status == OPEN: return
×
777

778
        try:
×
779
            connection._send()
×
780
        except ssl.SSLError as error:
×
781
            error_v = error.args[0] if error.args else None
×
782
            error_m = error.reason if hasattr(error, "reason") else None
×
783
            if error_v in SSL_SILENT_ERRORS:
×
784
                self.on_expected(error, connection)
×
785
            elif not error_v in SSL_VALID_ERRORS and\
×
786
                not error_m in SSL_VALID_REASONS:
787
                self.on_exception(error, connection)
×
788
        except socket.error as error:
×
789
            error_v = error.args[0] if error.args else None
×
790
            if error_v in SILENT_ERRORS:
×
791
                self.on_expected(error, connection)
×
792
            elif not error_v in VALID_ERRORS:
×
793
                self.on_exception(error, connection)
×
794
        except BaseException as exception:
×
795
            self.on_exception(exception, connection)
×
796

797
    def on_error(self, _socket):
8✔
798
        callbacks = self.callbacks_m.get(_socket, None)
×
799
        if callbacks:
×
800
            for callback in callbacks: callback("error", _socket)
×
801

802
        connection = self.connections_m.get(_socket, None)
×
803
        if not connection: return
×
804
        if not connection.status == OPEN: return
×
805

806
        connection.close()
×
807

808
    def on_exception(self, exception, connection):
8✔
809
        self.warning(exception)
×
810
        self.log_stack()
×
811
        connection.close()
×
812

813
    def on_exception_s(self, exception):
8✔
814
        self.warning(exception)
×
815
        self.log_stack()
×
816

817
    def on_expected(self, exception, connection):
8✔
818
        self.debug(exception)
×
819
        connection.close()
×
820

821
    def on_expected_s(self, exception):
8✔
822
        self.debug(exception)
×
823

824
    def on_upgrade(self, connection):
8✔
825
        connection.set_upgraded()
×
826

827
    def on_ssl(self, connection):
8✔
828
        # in case an ssl host verification value is defined for the server
829
        # the client connection is going to be verified against such host
830
        # to make sure the client represents the expected entity, note that
831
        # as a fallback the ssl verification process is performed with no
832
        # value defined, meaning that a possible (ssl) host value set in the
833
        # connection is going to be used instead for the verification
834
        if self.ssl_host: connection.ssl_verify_host(self.ssl_host)
×
835
        else: connection.ssl_verify_host()
×
836

837
        # in case the ssl fingerprint verification process is enabled for the
838
        # current server the client certificates are going to be verified for
839
        # their integrity using this technique, otherwise the default verification
840
        # process is going to be run instead
841
        if self.ssl_fingerprint: connection.ssl_verify_fingerprint(self.ssl_fingerprint)
×
842
        else: connection.ssl_verify_fingerprint()
×
843

844
        # in case the ssl dump flag is set the dump operation is performed according
845
        # to that flag, otherwise the default operation is performed, that in most
846
        # of the cases should prevent the dump of the information
847
        if self.ssl_dump: connection.ssl_dump_certificate(self.ssl_dump)
×
848
        else: connection.ssl_dump_certificate()
×
849

850
        # in case the current connection is under the upgrade
851
        # status calls the proper event handler so that the
852
        # connection workflow may proceed accordingly
853
        if connection.upgrading: self.on_upgrade(connection)
×
854

855
    def on_data(self, connection, data):
8✔
856
        connection.set_data(data)
×
857

858
    def on_socket_c(self, socket_c, address):
8✔
859
        # verifies if the current address (host value) is present in
860
        # the currently defined allowed list and in case that's not
861
        # the case raises an exception indicating the issue
862
        host = address[0] if address else ""
×
863
        result = netius.common.assert_ip4(host, self.allowed)
×
864
        if not result: raise errors.NetiusError(
×
865
            "Address '%s' not present in allowed list" % host
866
        )
867

868
        # verifies a series of pre-conditions on the socket so
869
        # that it's ensured to be in a valid state before it's
870
        # set as a new connection for the server (validation)
871
        if self.ssl and not socket_c._sslobj: socket_c.close(); return
×
872

873
        # in case the ssl mode is enabled, "patches" the socket
874
        # object with an extra pending reference, that is going
875
        # to be to store pending callable operations in it
876
        if self.ssl: socket_c.pending = None
×
877

878
        # verifies if the socket is of type internet (either ipv4
879
        # of ipv6), this is going to be used for conditional setting
880
        # of some of the socket options
881
        is_inet = socket_c.family in (socket.AF_INET, socket.AF_INET6)
×
882

883
        # sets the socket as non blocking and then updated a series
884
        # of options in it, some of them taking into account if the
885
        # socket if of type internet (timeout values)
886
        socket_c.setblocking(0)
×
887
        socket_c.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
×
888
        if is_inet: socket_c.setsockopt(
×
889
            socket.IPPROTO_TCP,
890
            socket.TCP_NODELAY,
891
            1
892
        )
893
        if self.receive_buffer_c: socket_c.setsockopt(
×
894
            socket.SOL_SOCKET,
895
            socket.SO_RCVBUF,
896
            self.receive_buffer_c
897
        )
898
        if self.send_buffer_c: socket_c.setsockopt(
×
899
            socket.SOL_SOCKET,
900
            socket.SO_SNDBUF,
901
            self.send_buffer_c
902
        )
903

904
        # the process creation is considered completed and a new
905
        # connection is created for it and opened, from this time
906
        # on a new connection is considered accepted/created for server
907
        connection = self.new_connection(socket_c, address, ssl = self.ssl)
×
908
        connection.open()
×
909

910
        # registers the ssl handshake method as a starter method
911
        # for the connection, so that the handshake is properly
912
        # performed on the initial stage of the connection (as expected)
913
        if self.ssl: connection.add_starter(self._ssl_handshake)
×
914

915
        # runs the initial try for the handshaking process, note that
916
        # this is an async process and further tries to the handshake
917
        # may come after this one (async operation) in case an exception
918
        # is raises the connection is closed (avoids possible errors)
919
        try: connection.run_starter()
×
920
        except: connection.close(); raise
×
921

922
        # in case there's extraneous data pending to be read from the
923
        # current connection's internal receive buffer it must be properly
924
        # handled on the risk of blocking the newly created connection
925
        if connection.is_pending_data(): self.on_read(connection.socket)
×
926

927
    def on_socket_d(self, socket_c):
8✔
928
        connection = self.connections_m.get(socket_c, None)
×
929
        if not connection: return
×
930

931
    def _ssl_handshake(self, connection):
8✔
932
        Server._ssl_handshake(self, connection)
×
933

934
        # verifies if the socket still has finished the ssl handshaking
935
        # process (by verifying the appropriate flag) and then if that's
936
        # not the case returns immediately (nothing done)
937
        if not connection.ssl_handshake: return
×
938

939
        # prints a debug information notifying the developer about
940
        # the finishing of the handshaking process for the connection
941
        self.debug("SSL Handshaking completed for connection")
×
942

943
        # calls the proper callback on the connection meaning
944
        # that ssl is now enabled for that socket/connection and so
945
        # the communication between peers is now secured
946
        self.on_ssl(connection)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc