• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 3797606986

pending completion
3797606986

push

github-actions

GitHub
Merge branch 'dev' into rop_raw_list

3931 of 6599 branches covered (59.57%)

102 of 102 new or added lines in 15 files covered. (100.0%)

12074 of 16876 relevant lines covered (71.55%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.22
/pwnlib/tubes/listen.py
1
from __future__ import absolute_import
1✔
2
from __future__ import division
1✔
3

4
import errno
1✔
5
import socket
1✔
6

7
from pwnlib.context import context
1✔
8
from pwnlib.log import getLogger
1✔
9
from pwnlib.timeout import Timeout
1✔
10
from pwnlib.tubes.sock import sock
1✔
11

12
log = getLogger(__name__)
1✔
13

14
class listen(sock):
1✔
15
    r"""Creates an TCP or UDP-socket to receive data on. It supports
16
    both IPv4 and IPv6. You need to call :meth:`wait_for_connection`
17
    before using the listen socket.
18

19
    The returned object supports all the methods from
20
    :class:`pwnlib.tubes.sock` and :class:`pwnlib.tubes.tube`.
21

22
    Arguments:
23
        port(int): The port to connect to.
24
            Defaults to a port auto-selected by the operating system.
25
        bindaddr(str): The address to bind to.
26
            Defaults to ``0.0.0.0`` / `::`.
27
        fam: The string "any", "ipv4" or "ipv6" or an integer to pass to :func:`socket.getaddrinfo`.
28
        typ: The string "tcp" or "udp" or an integer to pass to :func:`socket.getaddrinfo`.
29

30
    Examples:
31

32
        >>> l = listen(1234)
33
        >>> r = remote('localhost', l.lport)
34
        >>> _ = l.wait_for_connection()
35
        >>> l.sendline(b'Hello')
36
        >>> r.recvline()
37
        b'Hello\n'
38

39
        >>> # It works with ipv4 by default
40
        >>> l = listen()
41
        >>> l.spawn_process('/bin/sh')
42
        >>> r = remote('127.0.0.1', l.lport)
43
        >>> r.sendline(b'echo Goodbye')
44
        >>> r.recvline()
45
        b'Goodbye\n'
46

47
        >>> # and it works with ipv6 by defaut, too!
48
        >>> l = listen()
49
        >>> r = remote('::1', l.lport)
50
        >>> _ = l.wait_for_connection()
51
        >>> r.sendline(b'Bye-bye')
52
        >>> l.recvline()
53
        b'Bye-bye\n'
54
    """
55

56
    #: Local port
57
    lport = 0
1✔
58

59
    #: Local host
60
    lhost = None
1✔
61

62
    #: Socket type (e.g. socket.SOCK_STREAM)
63
    type = None
1✔
64

65
    #: Socket family
66
    family = None
1✔
67

68
    #: Socket protocol
69
    protocol = None
1✔
70

71
    #: Canonical name of the listening interface
72
    canonname = None
1✔
73

74
    #: Sockaddr structure that is being listened on
75
    sockaddr = None
1✔
76

77
    _accepter = None
1✔
78

79
    def __init__(self, port=0, bindaddr='::',
1✔
80
                 fam='any', typ='tcp', *args, **kwargs):
81
        super(listen, self).__init__(*args, **kwargs)
1✔
82

83
        fam = self._get_family(fam)
1✔
84
        typ = self._get_type(typ)
1✔
85

86
        if fam == socket.AF_INET and bindaddr == '::':
1!
87
            bindaddr = '0.0.0.0'
×
88

89
        h = self.waitfor('Trying to bind to %s on port %s' % (bindaddr, port))
1✔
90

91
        for res in socket.getaddrinfo(bindaddr, port, fam, typ, 0, socket.AI_PASSIVE):
1!
92
            self.family, self.type, self.proto, self.canonname, self.sockaddr = res
1✔
93

94
            if self.type not in [socket.SOCK_STREAM, socket.SOCK_DGRAM]:
1!
95
                continue
×
96

97
            h.status("Trying %s" % self.sockaddr[0])
1✔
98
            listen_sock = socket.socket(self.family, self.type, self.proto)
1✔
99
            listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1✔
100
            if self.family == socket.AF_INET6:
1!
101
                try:
1✔
102
                    listen_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, fam == socket.AF_INET6)
1✔
103
                except (socket.error, AttributeError):
×
104
                    self.warn("could not set socket to accept also IPV4")
×
105
            listen_sock.bind(self.sockaddr)
1✔
106
            self.lhost, self.lport = listen_sock.getsockname()[:2]
1✔
107
            if self.type == socket.SOCK_STREAM:
1!
108
                listen_sock.listen(1)
1✔
109
            break
1✔
110
        else:
111
            h.failure()
×
112
            self.error("Could not bind to %s on port %s" % (bindaddr, port))
×
113

114
        h.success()
1✔
115

116
        h = self.waitfor('Waiting for connections on %s:%s' % (self.lhost, self.lport))
1✔
117

118
        def accepter():
1✔
119
            while True:
120
                try:
1✔
121
                    if self.type == socket.SOCK_STREAM:
1!
122
                        self.sock, rhost = listen_sock.accept()
1✔
123
                        listen_sock.close()
1✔
124
                    else:
125
                        data, rhost = listen_sock.recvfrom(4096)
×
126
                        listen_sock.connect(rhost)
×
127
                        self.sock = listen_sock
×
128
                        self.unrecv(data)
×
129
                    self.settimeout(self.timeout)
1✔
130
                    break
1✔
131
                except socket.error as e:
×
132
                    if e.errno == errno.EINTR:
×
133
                        continue
×
134
                    h.failure()
×
135
                    self.exception("Socket failure while waiting for connection")
×
136
                    self.sock = None
×
137
                    return
×
138

139
            self.rhost, self.rport = rhost[:2]
1✔
140
            h.success('Got connection from %s on port %d' % (self.rhost, self.rport))
1✔
141

142
        self._accepter = context.Thread(target = accepter)
1✔
143
        self._accepter.daemon = True
1✔
144
        self._accepter.start()
1✔
145

146
    def spawn_process(self, *args, **kwargs):
1✔
147
        def accepter():
1✔
148
            self.wait_for_connection()
1✔
149
            self.sock.setblocking(1)
1✔
150
            p = super(listen, self).spawn_process(*args, **kwargs)
1✔
151
            p.wait()
1✔
152
            self.close()
×
153
        t = context.Thread(target = accepter)
1✔
154
        t.daemon = True
1✔
155
        t.start()
1✔
156

157
    def wait_for_connection(self):
1✔
158
        """Blocks until a connection has been established."""
159
        self.sock
1✔
160
        return self
1✔
161

162
    @property
1✔
163
    def sock(self):
1✔
164
        try:
1✔
165
            return self.__dict__['sock']
1✔
166
        except KeyError:
1✔
167
            pass
1✔
168
        self._accepter.join(timeout=self.timeout)
1✔
169
        return self.__dict__.get('sock')
1✔
170

171
    @sock.setter
1✔
172
    def sock(self, s):
1✔
173
        self.__dict__['sock'] = s
1✔
174

175
    def close(self):
1✔
176
        # since `close` is scheduled to run on exit we must check that we got
177
        # a connection or the program will hang in the `join` call above
178
        if self._accepter and self._accepter.is_alive():
1!
179
            return
×
180
        super(listen, self).close()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc