• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hivesolutions / netius / #620889952

23 Apr 2024 08:23PM UTC coverage: 45.051%. First build
#620889952

Pull #26

travis-ci

Pull Request #26: Server support for protocols

110 of 405 new or added lines in 21 files covered. (27.16%)

7733 of 17165 relevant lines covered (45.05%)

0.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.81
/src/netius/base/poll.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3

4
# Hive Netius System
5
# Copyright (c) 2008-2024 Hive Solutions Lda.
6
#
7
# This file is part of Hive Netius System.
8
#
9
# Hive Netius System is free software: you can redistribute it and/or modify
10
# it under the terms of the Apache License as published by the Apache
11
# Foundation, either version 2.0 of the License, or (at your option) any
12
# later version.
13
#
14
# Hive Netius System is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
# Apache License for more details.
18
#
19
# You should have received a copy of the Apache License along with
20
# Hive Netius System. If not, see <http://www.apache.org/licenses/>.
21

22
__author__ = "João Magalhães <joamag@hive.pt>"
1✔
23
""" The author(s) of the module """
24

25
__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda."
1✔
26
""" The copyright for the module """
27

28
__license__ = "Apache License, Version 2.0"
1✔
29
""" The license for the module """
30

31
import time
1✔
32
import select
33

34
POLL_TIMEOUT = 0.25
1✔
35
""" The timeout to be used under the all the poll methods
36
this should be considered the maximum amount of time a
37
thread waits for a poll request """
1✔
38

39

40
class Poll(object):
1✔
41
    """
1✔
42
    The top level abstract implementation of a poll object
43
    should be used for inheritance and reference on the
1✔
44
    various methods that are part of the api.
45
    """
46

47
    def __init__(self):
48
        self._open = False
1✔
49
        self.timeout = POLL_TIMEOUT
50
        self.read_o = {}
51
        self.write_o = {}
52
        self.error_o = {}
53

54
    @classmethod
55
    def name(cls):
1✔
56
        name = cls.__name__
1✔
57
        name = name[:-4]
1✔
58
        name = name.lower()
1✔
59
        return name
1✔
60

1✔
61
    @classmethod
62
    def test(cls):
1✔
63
        return True
64

1✔
65
    def open(self, timeout=POLL_TIMEOUT):
1✔
66
        if self._open:
1✔
67
            return
1✔
68
        self._open = True
69
        self.timeout = timeout
1✔
70

71
        self.read_o.clear()
×
72
        self.write_o.clear()
73
        self.error_o.clear()
1✔
74

1✔
75
    def close(self):
1✔
76
        if not self._open:
1✔
77
            return
78
        self._open = False
1✔
79

1✔
80
        self.read_o.clear()
1✔
81
        self.write_o.clear()
82
        self.error_o.clear()
1✔
83

1✔
84
    def poll(self):
1✔
85
        return ([], [], [])
86

1✔
87
    def poll_owner(self):
1✔
88
        reads, writes, errors = self.poll()
1✔
89

90
        result = dict()
1✔
NEW
91

×
92
        for read in reads:
93
            base = self.read_o[read]
1✔
94
            value = result.get(base, None)
×
95
            if not value:
96
                value = ([], [], [])
×
97
                result[base] = value
98
            value[0].append(read)
×
99

×
100
        for write in writes:
×
101
            base = self.write_o[write]
×
102
            value = result.get(base, None)
×
103
            if not value:
×
104
                value = ([], [], [])
×
105
                result[base] = value
106
            value[1].append(write)
×
107

×
108
        for error in errors:
×
109
            base = self.error_o[error]
×
110
            value = result.get(base, None)
×
111
            if not value:
×
112
                value = ([], [], [])
×
113
                result[base] = value
114
            value[2].append(error)
×
115

×
116
        return result
×
117

×
118
    def is_open(self):
×
119
        return self._open
×
120

×
121
    def is_edge(self):
122
        return False
×
123

124
    def is_empty(self):
1✔
125
        return not self.read_o and not self.write_o and not self.error_o
1✔
126

127
    def sub_all(self, socket, owner=None):
1✔
128
        self.sub_read(socket, owner=owner)
×
129
        self.sub_write(socket, owner=owner)
130
        self.sub_error(socket, owner=owner)
1✔
131

×
132
    def unsub_all(self, socket):
133
        self.unsub_error(socket)
1✔
134
        self.unsub_write(socket)
×
135
        self.unsub_read(socket)
×
136

×
137
    def is_sub_read(self, socket):
138
        return socket in self.read_o
1✔
139

1✔
140
    def is_sub_write(self, socket):
1✔
141
        return socket in self.write_o
1✔
142

143
    def is_sub_error(self, socket):
1✔
144
        return socket in self.error_o
×
145

146
    def sub_read(self, socket, owner=None):
1✔
147
        if socket in self.read_o:
1✔
148
            return
149
        self.read_o[socket] = owner
1✔
150

×
151
    def sub_write(self, socket, owner=None):
152
        if socket in self.write_o:
1✔
153
            return
1✔
154
        self.write_o[socket] = owner
1✔
155

156
    def sub_error(self, socket, owner=None):
1✔
157
        if socket in self.error_o:
×
158
            return
×
159
        self.error_o[socket] = owner
160

1✔
161
    def unsub_read(self, socket):
1✔
162
        if not socket in self.read_o:
1✔
163
            return
164
        del self.read_o[socket]
1✔
165

1✔
166
    def unsub_write(self, socket):
1✔
167
        if not socket in self.write_o:
168
            return
1✔
169
        del self.write_o[socket]
1✔
170

×
171
    def unsub_error(self, socket):
172
        if not socket in self.error_o:
1✔
173
            return
1✔
174
        del self.error_o[socket]
1✔
175

176

1✔
177
class EpollPoll(Poll):
178

1✔
179
    def __init__(self, *args, **kwargs):
1✔
180
        Poll.__init__(self, *args, **kwargs)
1✔
181
        self._open = False
182

1✔
183
    @classmethod
184
    def test(cls):
1✔
185
        return hasattr(select, "epoll")
186

1✔
187
    def open(self, timeout=POLL_TIMEOUT):
1✔
188
        if self._open:
1✔
189
            return
1✔
190
        self._open = True
191
        self.timeout = timeout
1✔
192

193
        self.epoll = select.epoll()  # @UndefinedVariable pylint: disable=E1101
1✔
194

195
        self.fd_m = {}
1✔
196

1✔
197
        self.read_o = {}
1✔
198
        self.write_o = {}
199
        self.error_o = {}
1✔
200

1✔
201
    def close(self):
1✔
202
        if not self._open:
203
            return
1✔
204
        self._open = False
1✔
205

1✔
206
        for fd in self.fd_m:
207
            self.epoll.unregister(fd)
1✔
208
        self.epoll.close()
209
        self.epoll = None
1✔
210

1✔
211
        self.fd_m.clear()
1✔
212

213
        self.read_o.clear()
1✔
214
        self.write_o.clear()
1✔
215
        self.error_o.clear()
216

1✔
217
    def poll(self):
1✔
218
        result = ([], [], [])
1✔
219

1✔
220
        events = self.epoll.poll(self.timeout)
1✔
221
        for fd, event in events:
1✔
222
            if event & select.EPOLLIN:  # @UndefinedVariable pylint: disable=E1101
1✔
223
                socket = self.fd_m.get(fd, None)
1✔
224
                socket and result[0].append(socket)
1✔
225
            if event & select.EPOLLOUT:  # @UndefinedVariable pylint: disable=E1101
×
226
                socket = self.fd_m.get(fd, None)
×
227
                socket and result[1].append(socket)
228
            if (
1✔
229
                event & select.EPOLLERR or event & select.EPOLLHUP
230
            ):  # @UndefinedVariable pylint: disable=E1101
1✔
231
                socket = self.fd_m.get(fd, None)
×
232
                socket and result[2].append(socket)
233

1✔
234
        return result
1✔
235

1✔
236
    def is_edge(self):
1✔
237
        return True
1✔
238

1✔
239
    def sub_read(self, socket, owner=None):
1✔
240
        if socket in self.read_o:
1✔
241
            return
242
        socket_fd = socket.fileno()
243
        self.fd_m[socket_fd] = socket
244
        self.read_o[socket] = owner
245
        self.write_o[socket] = owner
1✔
246
        self.error_o[socket] = owner
×
247
        self.epoll.register(  # @UndefinedVariable pylint: disable=E1101
248
            socket_fd,
1✔
249
            select.EPOLLIN
1✔
250
            | select.EPOLLOUT
251
            | select.EPOLLERR
1✔
252
            | select.EPOLLHUP
1✔
253
            | select.EPOLLET,  # @UndefinedVariable pylint: disable=E1101
1✔
254
        )
1✔
255

256
    def sub_write(self, socket, owner=None):
257
        pass
1✔
258

1✔
259
    def sub_error(self, socket, owner=None):
1✔
260
        pass
1✔
261

262
    def unsub_read(self, socket):
1✔
263
        if not socket in self.read_o:
1✔
264
            return
265
        socket_fd = socket.fileno()
1✔
266
        self.epoll.unregister(socket_fd)  # @UndefinedVariable pylint: disable=E1101
1✔
267
        del self.fd_m[socket_fd]
268
        del self.read_o[socket]
1✔
269
        del self.write_o[socket]
270
        del self.error_o[socket]
1✔
271

×
272
    def unsub_write(self, socket):
×
273
        pass
274

1✔
275
    def unsub_error(self, socket):
276
        pass
×
277

278

1✔
279
class KqueuePoll(Poll):
×
280

×
281
    def __init__(self, *args, **kwargs):
×
282
        Poll.__init__(self, *args, **kwargs)
×
283
        self._open = False
NEW
284

×
285
    @classmethod
286
    def test(cls):
×
287
        return hasattr(select, "kqueue")
288

×
289
    def open(self, timeout=POLL_TIMEOUT):
×
290
        if self._open:
×
291
            return
292
        self._open = True
1✔
293
        self.timeout = timeout
×
294
        if self.timeout < 0:
×
295
            self.timeout = None
296

×
297
        self.kqueue = select.kqueue()  # @UndefinedVariable pylint: disable=E1101
×
298

299
        self.fd_m = {}
×
300

301
        self.read_o = {}
×
302
        self.write_o = {}
×
303
        self.error_o = {}
×
304

305
    def close(self):
1✔
306
        if not self._open:
×
307
            return
308
        self._open = False
×
309

×
NEW
310
        self.kqueue.close()
×
311
        self.kqueue = None
×
312

×
NEW
313
        self.fd_m.clear()
×
314

×
NEW
315
        self.read_o.clear()
×
316
        self.write_o.clear()
×
NEW
317
        self.error_o.clear()
×
318

×
NEW
319
    def poll(self):
×
320
        result = ([], [], [])
×
321

322
        events = self.kqueue.control(None, 32, self.timeout)
×
323
        for event in events:
324
            if (
1✔
325
                event.flags & select.KQ_EV_ERROR
×
326
            ):  # @UndefinedVariable pylint: disable=E1101
327
                socket = self.fd_m.get(event.udata, None)
1✔
328
                socket and result[2].append(socket)
×
329
            elif (
×
330
                event.filter == select.KQ_FILTER_READ
×
331
            ):  # @UndefinedVariable pylint: disable=E1101
×
332
                socket = self.fd_m.get(event.udata, None)
×
333
                index = (
×
NEW
334
                    2 if event.flags & select.KQ_EV_EOF else 0
×
335
                )  # @UndefinedVariable pylint: disable=E1101
336
                socket and result[index].append(socket)
337
            elif (
338
                event.filter == select.KQ_FILTER_WRITE
339
            ):  # @UndefinedVariable pylint: disable=E1101
340
                socket = self.fd_m.get(event.udata, None)
×
NEW
341
                index = (
×
342
                    2 if event.flags & select.KQ_EV_EOF else 1
343
                )  # @UndefinedVariable pylint: disable=E1101
344
                socket and result[index].append(socket)
345

346
        return result
347

×
348
    def is_edge(self):
349
        return True
1✔
350

×
351
    def sub_read(self, socket, owner=None):
352
        if socket in self.read_o:
1✔
353
            return
×
354
        socket_fd = socket.fileno()
355
        self.fd_m[socket_fd] = socket
1✔
356
        self.read_o[socket] = owner
×
357
        self.write_o[socket] = owner
×
NEW
358
        self.error_o[socket] = owner
×
359
        event = select.kevent(  # @UndefinedVariable pylint: disable=E1101
360
            socket_fd,
361
            filter=select.KQ_FILTER_READ,  # @UndefinedVariable pylint: disable=E1101
362
            flags=select.KQ_EV_ADD
363
            | select.KQ_EV_CLEAR,  # @UndefinedVariable pylint: disable=E1101
×
NEW
364
            udata=socket_fd,
×
365
        )
366
        self.kqueue.control([event], 0)
367
        event = select.kevent(  # @UndefinedVariable pylint: disable=E1101
368
            socket_fd,
369
            filter=select.KQ_FILTER_WRITE,  # @UndefinedVariable pylint: disable=E1101
×
370
            flags=select.KQ_EV_ADD
×
371
            | select.KQ_EV_CLEAR,  # @UndefinedVariable pylint: disable=E1101
×
372
            udata=socket_fd,
×
373
        )
×
374
        self.kqueue.control([event], 0)
375

1✔
376
    def sub_write(self, socket, owner=None):
×
377
        pass
378

1✔
379
    def sub_error(self, socket, owner=None):
×
380
        pass
381

1✔
382
    def unsub_read(self, socket):
383
        if not socket in self.read_o:
1✔
384
            return
×
385
        socket_fd = socket.fileno()
×
386
        event = select.kevent(  # @UndefinedVariable pylint: disable=E1101
387
            socket_fd,
1✔
388
            filter=select.KQ_FILTER_READ,  # @UndefinedVariable pylint: disable=E1101
389
            flags=select.KQ_EV_DELETE,  # @UndefinedVariable pylint: disable=E1101
×
390
        )
391
        self.kqueue.control([event], 0)
1✔
392
        event = select.kevent(  # @UndefinedVariable pylint: disable=E1101
×
393
            socket_fd,
×
394
            filter=select.KQ_FILTER_WRITE,  # @UndefinedVariable pylint: disable=E1101
×
395
            flags=select.KQ_EV_DELETE,  # @UndefinedVariable pylint: disable=E1101
NEW
396
        )
×
397
        self.kqueue.control([event], 0)
398
        del self.fd_m[socket_fd]
×
399
        del self.read_o[socket]
×
400
        del self.write_o[socket]
401
        del self.error_o[socket]
×
402

×
403
    def unsub_write(self, socket):
×
404
        pass
405

1✔
406
    def unsub_error(self, socket):
×
407
        pass
×
408

409

×
410
class PollPoll(Poll):
×
411

412
    def __init__(self, *args, **kwargs):
×
413
        Poll.__init__(self, *args, **kwargs)
×
414
        self._open = False
415

×
416
    @classmethod
×
417
    def test(cls):
×
418
        return hasattr(select, "poll")
419

1✔
420
    def open(self, timeout=POLL_TIMEOUT):
×
421
        if self._open:
422
            return
×
423
        self._open = True
×
NEW
424
        self.timeout = timeout
×
425

×
426
        self._poll = select.poll()  # @UndefinedVariable pylint: disable=E1101
×
NEW
427

×
428
        self.read_fd = {}
×
429
        self.write_fd = {}
×
NEW
430

×
431
        self.read_o = {}
×
432
        self.write_o = {}
×
433
        self.error_o = {}
434

×
435
    def close(self):
436
        if not self._open:
1✔
437
            return
×
438
        self._open = False
439

1✔
440
        for fd in self.read_fd:
×
441
            self._poll.unregister(fd)
×
442
        self._poll = None
×
443

×
NEW
444
        self.read_fd.clear()
×
445
        self.write_fd.clear()
446

447
        self.read_o.clear()
448
        self.write_o.clear()
449
        self.error_o.clear()
1✔
450

×
451
    def poll(self):
×
452
        result = ([], [], [])
×
453

×
NEW
454
        events = self._poll.poll(self.timeout * 1000)
×
455
        for fd, event in events:
456
            if event & select.POLLIN:  # @UndefinedVariable pylint: disable=E1101
457
                socket = self.read_fd.get(fd, None)
458
                socket and result[0].append(socket)
459
            if event & select.POLLOUT:  # @UndefinedVariable pylint: disable=E1101
1✔
460
                socket = self.write_fd.get(fd, None)
×
461
                socket and result[1].append(socket)
×
462
            if (
463
                event & select.POLLERR or event & select.POLLHUP
1✔
464
            ):  # @UndefinedVariable pylint: disable=E1101
×
465
                socket = self.read_fd.get(fd, None)
×
NEW
466
                socket and result[2].append(socket)
×
467

468
        return result
469

×
470
    def is_edge(self):
×
471
        return False
472

1✔
473
    def sub_read(self, socket, owner=None):
×
474
        if socket in self.read_o:
×
NEW
475
            return
×
476
        socket_fd = socket.fileno()
477
        self.read_fd[socket_fd] = socket
478
        self.read_o[socket] = owner
479
        self._poll.register(  # @UndefinedVariable pylint: disable=E1101
×
480
            socket_fd, select.POLLIN  # @UndefinedVariable pylint: disable=E1101
×
481
        )
482

1✔
483
    def sub_write(self, socket, owner=None):
×
484
        if socket in self.write_o:
×
485
            return
486
        socket_fd = socket.fileno()
1✔
487
        self.write_fd[socket_fd] = socket
488
        self.write_o[socket] = owner
1✔
489
        self._poll.modify(  # @UndefinedVariable pylint: disable=E1101
×
490
            socket_fd,
×
491
            select.POLLIN | select.POLLOUT,  # @UndefinedVariable pylint: disable=E1101
492
        )
1✔
493

×
494
    def sub_error(self, socket, owner=None):
×
495
        if socket in self.error_o:
×
496
            return
×
497
        self.error_o[socket] = owner
498

×
499
    def unsub_read(self, socket):
×
500
        if not socket in self.read_o:
×
501
            return
502
        socket_fd = socket.fileno()
×
503
        self._poll.unregister(socket_fd)  # @UndefinedVariable pylint: disable=E1101
×
504
        del self.read_fd[socket_fd]
×
505
        del self.read_o[socket]
506

1✔
507
    def unsub_write(self, socket):
×
508
        if not socket in self.write_o:
×
509
            return
510
        socket_fd = socket.fileno()
511
        self._poll.modify(  # @UndefinedVariable pylint: disable=E1101
512
            socket_fd, select.POLLIN  # @UndefinedVariable pylint: disable=E1101
×
513
        )
×
514
        del self.write_fd[socket_fd]
×
515
        del self.write_o[socket]
516

517
    def unsub_error(self, socket):
518
        if not socket in self.error_o:
×
519
            return
×
520
        del self.error_o[socket]
×
521

522

1✔
523
class SelectPoll(Poll):
524

525
    def __init__(self, *args, **kwargs):
526
        Poll.__init__(self, *args, **kwargs)
×
527
        self._open = False
528

529
    def open(self, timeout=POLL_TIMEOUT):
530
        if self._open:
531
            return
×
532
        self._open = True
×
533
        self.timeout = timeout
534
        if self.timeout < 0:
535
            self.timeout = None
536

537
        self.read_l = []
×
538
        self.write_l = []
539
        self.error_l = []
540

541
        self.read_o = {}
542
        self.write_o = {}
543
        self.error_o = {}
544

1✔
545
    def close(self):
×
546
        if not self._open:
547
            return
1✔
548
        self._open = False
×
549

×
550
        # removes the contents of all of the loop related structures
×
551
        # so that no extra selection operations are issued
552
        del self.read_l[:]
1✔
553
        del self.write_l[:]
×
554
        del self.error_l[:]
×
555

×
556
        # removes the complete set of elements from the map that associated
557
        # a socket with the proper owner
1✔
558
        self.read_o.clear()
×
559
        self.write_o.clear()
×
560
        self.error_o.clear()
×
561

562
    def poll(self):
1✔
563
        # "calculates" the amount of time the select method is going
×
564
        # to be sleeping for empty polls based on the fact that the
×
565
        # current timeout value may be unset
×
566
        sleep_timeout = self.timeout or POLL_TIMEOUT
567

1✔
568
        # verifies if the current selection list is empty
×
569
        # in case it's sleeps for a while and then continues
×
570
        # the loop (this avoids error in empty selection)
×
571
        is_empty = self.is_empty()
572
        if is_empty:
1✔
573
            time.sleep(sleep_timeout)
×
574
            return ([], [], [])
×
575

×
576
        # runs the proper select statement waiting for the desired
577
        # amount of time as timeout at the end a tuple with three
578
        # list for the different operations should be returned
579
        return select.select(self.read_l, self.write_l, self.error_l, self.timeout)
580

581
    def is_edge(self):
582
        return False
583

584
    def sub_read(self, socket, owner=None):
585
        if socket in self.read_o:
586
            return
587
        self.read_o[socket] = owner
588
        self.read_l.append(socket)
589

590
    def sub_write(self, socket, owner=None):
591
        if socket in self.write_o:
592
            return
593
        self.write_o[socket] = owner
594
        self.write_l.append(socket)
595

596
    def sub_error(self, socket, owner=None):
597
        if socket in self.error_o:
598
            return
599
        self.error_o[socket] = owner
600
        self.error_l.append(socket)
601

602
    def unsub_read(self, socket):
603
        if not socket in self.read_o:
604
            return
605
        self.read_l.remove(socket)
606
        del self.read_o[socket]
607

608
    def unsub_write(self, socket):
609
        if not socket in self.write_o:
610
            return
611
        self.write_l.remove(socket)
612
        del self.write_o[socket]
613

614
    def unsub_error(self, socket):
615
        if not socket in self.error_o:
616
            return
617
        self.error_l.remove(socket)
618
        del self.error_o[socket]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc