• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

romis2012 / python-socks / 88

pending completion
88

push

travis-ci-com

romis2012
Read socks5 server bound address

26 of 26 new or added lines in 3 files covered. (100.0%)

1588 of 1596 relevant lines covered (99.5%)

3.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.35
/python_socks/_proto/socks5.py
1
import enum
4✔
2
import ipaddress
4✔
3
import typing
4✔
4
from .._helpers import is_ip_address
4✔
5
from .._errors import ProxyError
4✔
6

7
RSV = NULL = 0x00
4✔
8
SOCKS_VER = 0x05
4✔
9

10

11
class AuthMethod(enum.IntEnum):
4✔
12
    ANONYMOUS = 0x00
4✔
13
    GSSAPI = 0x01
4✔
14
    USERNAME_PASSWORD = 0x02
4✔
15
    NO_ACCEPTABLE = 0xff
4✔
16

17

18
class AddressType(enum.IntEnum):
4✔
19
    IPV4 = 0x01
4✔
20
    DOMAIN = 0x03
4✔
21
    IPV6 = 0x04
4✔
22

23
    @classmethod
4✔
24
    def from_ip_ver(cls, ver: int):
4✔
25
        if ver == 4:
4✔
26
            return cls.IPV4
4✔
27
        if ver == 6:
×
28
            return cls.IPV6
×
29

30
        raise ValueError('Invalid IP version')
31

32

33
class Command(enum.IntEnum):
4✔
34
    CONNECT = 0x01
4✔
35
    BIND = 0x02
4✔
36
    UDP_ASSOCIATE = 0x03
4✔
37

38

39
class ReplyCode(enum.IntEnum):
4✔
40
    GRANTED = 0x00
4✔
41
    GENERAL_FAILURE = 0x01
4✔
42
    CONNECTION_NOT_ALLOWED = 0x02
4✔
43
    NETWORK_UNREACHABLE = 0x03
4✔
44
    HOST_UNREACHABLE = 0x04
4✔
45
    CONNECTION_REFUSED = 0x05
4✔
46
    TTL_EXPIRED = 0x06
4✔
47
    COMMAND_NOT_SUPPORTED = 0x07
4✔
48
    ADDRESS_TYPE_NOT_SUPPORTED = 0x08
4✔
49

50

51
ReplyMessages = {
4✔
52
    ReplyCode.GRANTED: 'Request granted',
53
    ReplyCode.GENERAL_FAILURE: 'General SOCKS server failure',
54
    ReplyCode.CONNECTION_NOT_ALLOWED: 'Connection not allowed by ruleset',
55
    ReplyCode.NETWORK_UNREACHABLE: 'Network unreachable',
56
    ReplyCode.HOST_UNREACHABLE: 'Host unreachable',
57
    ReplyCode.CONNECTION_REFUSED: 'Connection refused by destination host',
58
    ReplyCode.TTL_EXPIRED: 'TTL expired',
59
    ReplyCode.COMMAND_NOT_SUPPORTED: 'Command not supported or protocol error',
60
    ReplyCode.ADDRESS_TYPE_NOT_SUPPORTED: 'Address type not supported'
61
}
62

63

64
class AuthMethodsRequest:
4✔
65
    def __init__(self, username: str, password: str):
4✔
66
        auth_methods = bytearray([AuthMethod.ANONYMOUS])
4✔
67

68
        if username and password:
4✔
69
            auth_methods.append(AuthMethod.USERNAME_PASSWORD)
4✔
70

71
        self.auth_methods = auth_methods
4✔
72

73
    def __bytes__(self):
4✔
74
        return bytes([SOCKS_VER, len(self.auth_methods)]) + self.auth_methods
4✔
75

76

77
class AuthMethodsResponse:
4✔
78
    socks_ver: int
4✔
79
    auth_method: AuthMethod
4✔
80

81
    def __init__(self, data: bytes):
4✔
82
        assert len(data) == 2
4✔
83
        self.socks_ver = data[0]
4✔
84
        self.auth_method = data[1]  # noqa
4✔
85

86
    def validate(self, request: AuthMethodsRequest):
4✔
87
        if self.socks_ver != SOCKS_VER:
4✔
88
            raise ProxyError('Unexpected '  # pragma: no cover
89
                             'SOCKS version number: '
90
                             '{}'.format(self.socks_ver))
91

92
        if self.auth_method == AuthMethod.NO_ACCEPTABLE:
4✔
93
            raise ProxyError('No acceptable '  # pragma: no cover
94
                             'authentication methods were offered')
95

96
        if self.auth_method not in request.auth_methods:
4✔
97
            raise ProxyError('Unexpected SOCKS '  # pragma: no cover
98
                             'authentication method: '
99
                             '{}'.format(self.auth_method))
100

101

102
class AuthRequest(typing.SupportsBytes):
4✔
103
    VER = 0x01
4✔
104

105
    def __init__(self, username: str, password: str):
4✔
106
        self.username = username
4✔
107
        self.password = password
4✔
108

109
    def __bytes__(self):
4✔
110
        data = bytearray()
4✔
111
        data.append(self.VER)
4✔
112
        data.append(len(self.username))
4✔
113
        data += self.username.encode('ascii')
4✔
114
        data.append(len(self.password))
4✔
115
        data += self.password.encode('ascii')
4✔
116
        return bytes(data)
4✔
117

118

119
class AuthResponse:
4✔
120
    ver: int
4✔
121
    reply: ReplyCode
4✔
122

123
    def __init__(self, data: bytes):
4✔
124
        assert len(data) == 2
4✔
125
        self.ver = data[0]
4✔
126
        self.reply = data[1]  # noqa
4✔
127

128
    def validate(self):
4✔
129
        if self.ver != AuthRequest.VER:
4✔
130
            raise ProxyError('Invalid '  # pragma: no cover
131
                             'authentication response')
132

133
        if self.reply != ReplyCode.GRANTED:
4✔
134
            raise ProxyError('Username and password '  # pragma: no cover
135
                             'authentication failure')
136

137

138
class ConnectRequest:
4✔
139
    def __init__(self, host: str, port: int, rdns: bool):
4✔
140
        self.host = host
4✔
141
        self.port = port
4✔
142
        self.rdns = rdns
4✔
143
        self._resolved_host = None
4✔
144

145
    def __bytes__(self):
4✔
146
        data = bytearray([SOCKS_VER, Command.CONNECT, RSV])
4✔
147
        data += self._build_addr_request()
4✔
148
        return bytes(data)
4✔
149

150
    @property
4✔
151
    def need_resolve(self):
3✔
152
        return not is_ip_address(self.host) and not self.rdns
4✔
153

154
    def set_resolved_host(self, value):
4✔
155
        self._resolved_host = value
4✔
156

157
    def _build_addr_request(self) -> bytes:
4✔
158
        port = self.port.to_bytes(2, 'big')
4✔
159

160
        # destination address provided is an IPv4 or IPv6 address
161
        if is_ip_address(self.host):
4✔
162
            ip = ipaddress.ip_address(self.host)
4✔
163
            address_type = AddressType.from_ip_ver(ip.version)
4✔
164
            return bytes([address_type]) + ip.packed + port
4✔
165

166
        # not IP address, probably a DNS name
167
        if self.rdns:
4✔
168
            # resolve remotely
169
            address_type = AddressType.DOMAIN
4✔
170
            host = self.host.encode('idna')
4✔
171
            host_len = len(host)
4✔
172
            return bytes([address_type, host_len]) + host + port
4✔
173
        else:
174
            assert self._resolved_host is not None
4✔
175
            addr = self._resolved_host
4✔
176
            ip = ipaddress.ip_address(addr)
4✔
177
            address_type = AddressType.from_ip_ver(ip.version)
4✔
178
            return bytes([address_type]) + ip.packed + port
4✔
179

180

181
class ConnectResponse:
4✔
182
    socks_ver: int
4✔
183
    reply: ReplyCode
4✔
184
    rsv: int
4✔
185

186
    def __init__(self, data: bytes):
4✔
187
        assert len(data) == 3
4✔
188
        self.socks_ver = data[0]
4✔
189
        self.reply = data[1]  # noqa
4✔
190
        self.rsv = data[2]
4✔
191

192
    def validate(self):
4✔
193
        if self.socks_ver != SOCKS_VER:
4✔
194
            raise ProxyError('Unexpected SOCKS '  # pragma: no cover
195
                             'version number: {:#02X}'.format(self.socks_ver))
196

197
        if self.reply != ReplyCode.GRANTED:  # pragma: no cover
198
            msg = ReplyMessages.get(self.reply, 'Unknown error')
199
            raise ProxyError(msg, self.reply)
200

201
        if self.rsv != RSV:
4✔
202
            raise ProxyError('The reserved byte '  # pragma: no cover
203
                             'must be {:#02X}'.format(RSV))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc