• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

un33k / python-ipware / 7199953543

13 Dec 2023 07:12PM UTC coverage: 93.333%. Remained the same
7199953543

push

github

web-flow
Remove HTTP_VIA support, support for 3.12 (#14)

* Remove HTTP_VIA
* up version
* add py 3.12

126 of 135 relevant lines covered (93.33%)

6.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.23
/python_ipware/python_ipware.py
1
import ipaddress
7✔
2
import logging
7✔
3

4
from typing import Any, Dict, List, Optional, Tuple, Union
7✔
5

6
IpAddressType = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
7✔
7
OptionalIpAddressType = Optional[IpAddressType]
7✔
8

9

10
class IpWareMeta:
7✔
11
    """
12
    A class that handles meta data frm an HTTP request.
13
    """
14

15
    def __init__(
7✔
16
        self,
17
        precedence: Optional[Tuple[str, ...]] = None,
18
        leftmost: bool = True,
19
    ) -> None:
20
        self.precedence = precedence or (
7✔
21
            "X_FORWARDED_FOR",  # Load balancers or proxies such as AWS ELB (default client is `left-most` [`<client>, <proxy1>, <proxy2>`])
22
            "HTTP_X_FORWARDED_FOR",  # Similar to X_FORWARDED_TO
23
            "HTTP_CLIENT_IP",  # Standard headers used by providers such as Amazon EC2, Heroku etc.
24
            "HTTP_X_REAL_IP",  # Standard headers used by providers such as Amazon EC2, Heroku etc.
25
            "HTTP_X_FORWARDED",  # Squid and others
26
            "HTTP_X_CLUSTER_CLIENT_IP",  # Rackspace LB and Riverbed Stingray
27
            "HTTP_FORWARDED_FOR",  # RFC 7239
28
            "HTTP_FORWARDED",  # RFC 7239
29
            "X-CLIENT-IP",  # Microsoft Azure
30
            "X-REAL-IP",  # NGINX
31
            "X-CLUSTER-CLIENT-IP",  # Rackspace Cloud Load Balancers
32
            "X_FORWARDED",  # Squid
33
            "FORWARDED_FOR",  # RFC 7239
34
            "CF-CONNECTING-IP",  # CloudFlare
35
            "TRUE-CLIENT-IP",  # CloudFlare Enterprise,
36
            "FASTLY-CLIENT-IP",  # Firebase, Fastly
37
            "FORWARDED",  # RFC 7239
38
            "CLIENT-IP",  # Akamai and Cloudflare: True-Client-IP and Fastly: Fastly-Client-IP
39
            "REMOTE_ADDR",  # Default
40
        )
41
        self.leftmost = leftmost
7✔
42

43

44
class IpWareIpAddress:
7✔
45
    """
46
    A class that handles IP address data from an HTTP request.
47
    """
48

49
    def extract_ipv4_only(self, ip_address: str) -> str:
7✔
50
        """
51
        Given an IPv4 address or address:port, it extracts the IP address
52
        @param ip_str: IP address or address:port
53
        @return: IP address
54
        """
55

56
        if ip_address:
7✔
57
            # handle ipv4 addresses with port
58
            if ":" in ip_address:
7✔
59
                ip_address = ip_address.split(":")[0]
7✔
60
                return ip_address.strip()
7✔
61

62
            return ip_address.strip()
7✔
63

64
        return ""
×
65

66
    def extract_ipv6_only(self, ip_address: str) -> str:
7✔
67
        """
68
        Given an IPv6 address or address:port, it extracts the IP address
69
        @param ip_str: IP address or address:port
70
        @return: IP address
71
        """
72

73
        if ip_address:
7✔
74
            # handle ipv6 addresses with port
75
            if "]:" in ip_address:
7✔
76
                ip_address = ip_address.split("]:")[0]
7✔
77
                ip_address = ip_address.replace("[", "")
7✔
78
                return ip_address.strip()
7✔
79

80
            return ip_address.strip()
7✔
81

82
        return ""
×
83

84
    def get_ip_object(
7✔
85
        self,
86
        ip_str: str,
87
    ) -> OptionalIpAddressType:
88
        """
89
        Given an IP address or address:port, it parses the IP address
90
        @param ip_str: IP address or address:port
91
        @return: IP address of type IPv4Address or IPv6Address
92
        """
93

94
        ip: OptionalIpAddressType = None
7✔
95
        if ip_str:
7✔
96
            try:
7✔
97
                # try to parse as IPv6 address with optional port
98
                ipv6 = self.extract_ipv6_only(ip_str)
7✔
99
                ip = ipaddress.IPv6Address(ipv6)
7✔
100
                ip = ip.ipv4_mapped or ip
7✔
101
            except ipaddress.AddressValueError:
7✔
102
                try:
7✔
103
                    # try to parse as IPv4 address with optional port
104
                    ipv4 = self.extract_ipv4_only(ip_str)
7✔
105
                    ip = ipaddress.IPv4Address(ipv4)
7✔
106
                except ipaddress.AddressValueError:
7✔
107
                    # not a valid IP address, return None
108
                    logging.info("Invalid ip address. {0}".format(ip_str))
7✔
109
                    ip = None
7✔
110
        return ip
7✔
111

112
    def get_ips_from_string(
7✔
113
        self,
114
        ip_str: str,
115
    ) -> Optional[List[IpAddressType]]:
116
        """
117
        Given a comma separated list of IP addresses or address:port, it parses the IP addresses
118
        @param ip_str: comma separated list of IP addresses or address:port
119
        @return: list of IP addresses of type IPv4Address or IPv6Address
120
        """
121
        ip_list: List[IpAddressType] = []
7✔
122

123
        for ip_address in ip_str.split(","):
7✔
124
            ip = self.get_ip_object(ip_address.strip())
7✔
125
            if ip:
7✔
126
                ip_list.append(ip)
7✔
127
            else:
128
                # we have at least one invalid IP address, return empty list, instead
129
                return None
7✔
130

131
        if not self.leftmost:
7✔
132
            ip_list.reverse()
7✔
133

134
        return ip_list
7✔
135

136

137
class IpWareProxy:
7✔
138
    """
139
    A class that handles proxy data from an HTTP request.
140
    """
141

142
    def __init__(
7✔
143
        self,
144
        proxy_count: int = 0,
145
        proxy_list: Optional[List[str]] = None,
146
    ) -> None:
147
        if proxy_count is None or proxy_count < 0:
7✔
148
            raise ValueError("proxy_count must be a positive integer")
×
149

150
        self.proxy_count = proxy_count
7✔
151
        self.proxy_list = self._is_valid_proxy_trusted_list(proxy_list or [])
7✔
152

153
    def _is_valid_proxy_trusted_list(self, proxy_list: Any) -> List[str]:
7✔
154
        """
155
        Checks if the proxy list is a valid list of strings
156
        @return: proxy list or raises an exception
157
        """
158

159
        if not isinstance(proxy_list, list):
7✔
160
            raise ValueError("Parameter must be a list")
×
161
        if not all(isinstance(x, str) for x in proxy_list):
7✔
162
            raise ValueError("All elements in list must be strings")
×
163

164
        return proxy_list
7✔
165

166
    def is_proxy_count_valid(
7✔
167
        self, ip_list: List[IpAddressType], strict: bool = False
168
    ) -> bool:
169
        """
170
        Checks if the proxy count is valid
171
        @param ip_list: list of ip addresses
172
        @param strict: if True, we must have exactly proxy_count proxies
173
        @return: True if the proxy count is valid, False otherwise
174
        """
175
        if self.proxy_count < 1:
7✔
176
            return True
7✔
177

178
        ip_count: int = len(ip_list)
7✔
179
        if ip_count < 1:
7✔
180
            return False
×
181

182
        if strict:
7✔
183
            # our first proxy takes the last ip address and treats it as client ip
184
            return self.proxy_count == ip_count - 1
7✔
185

186
        # the client could have gone through their own proxy and included extra ips
187
        # client could be sending in the header: X-Forwarded-For: <fake_ip>, <client_ip>
188
        return ip_count - 1 > self.proxy_count
7✔
189

190
    def is_proxy_trusted_list_valid(
7✔
191
        self,
192
        ip_list: List[IpAddressType],
193
        strict: bool = False,
194
    ) -> bool:
195
        """
196
        Checks if the proxy list is valid (all proxies are in the proxy_list)
197
        @param ip_list: list of ip addresses
198
        @param strict: if True, we must have exactly proxy_count proxies
199
        @return: client's best match ip address or False
200
        """
201
        if not self.proxy_list:
7✔
202
            return True
7✔
203

204
        ip_count = len(ip_list)
7✔
205
        proxy_list_count = len(self.proxy_list)
7✔
206

207
        # in strict mode, total ip count must be 1 more than proxy count
208
        if strict and ip_count - 1 != proxy_list_count:
7✔
209
            return False
7✔
210

211
        # total ip count (client + proxies) must be more than proxy count
212
        if ip_count - 1 < proxy_list_count:
7✔
213
            return False
×
214

215
        # start from the end, slice the incoming ip list to the same length as the trusted proxy list
216
        ip_list_slice = ip_list[-proxy_list_count:]
7✔
217
        for index, value in enumerate(ip_list_slice):
7✔
218
            if not str(value).startswith(self.proxy_list[index]):
7✔
219
                return False
7✔
220

221
        # now all we need is to return the first ip in the list that is not in the trusted proxy list
222
        # best_client_ip_index = proxy_list_count + 1
223
        # best_client_ip = ip_list[-best_client_ip_index]
224

225
        return True
7✔
226

227

228
class IpWare(IpWareMeta, IpWareProxy, IpWareIpAddress):
7✔
229
    """
230
    A class that makes best effort to determine the client's IP address.
231
    """
232

233
    def __init__(
7✔
234
        self,
235
        precedence: Optional[Tuple[str, ...]] = None,
236
        leftmost: bool = True,
237
        proxy_count: int = 0,
238
        proxy_list: Optional[List[str]] = None,
239
    ) -> None:
240
        IpWareMeta.__init__(self, precedence, leftmost)
7✔
241
        IpWareProxy.__init__(self, proxy_count or 0, proxy_list or [])
7✔
242

243
    def get_meta_value(self, meta: Dict[str, str], key: str) -> str:
7✔
244
        """
245
        Given a key, it returns a cleaned up version of the value
246
        @param key: the key to lookup
247
        @return: the value of the key or empty string
248
        """
249
        meta = meta or {}
7✔
250
        return meta.get(key, meta.get(key.replace("_", "-"), "")).strip()
7✔
251

252
    def get_meta_values(self, meta: Dict[str, str]) -> List[str]:
7✔
253
        """
254
        Given a list of keys, it returns a list of cleaned up values
255
        @return: a list of values
256
        """
257
        return [self.get_meta_value(meta, key) for key in self.precedence]
7✔
258

259
    def get_client_ip(
7✔
260
        self,
261
        meta: Dict[str, str],
262
        strict: bool = False,
263
    ) -> Tuple[OptionalIpAddressType, bool]:
264
        """
265
        Returns the client's IP address.
266
        """
267

268
        loopback_list: List[IpAddressType] = []
7✔
269
        private_list: List[OptionalIpAddressType] = []
7✔
270

271
        for ip_str in self.get_meta_values(meta):
7✔
272
            if not ip_str:
7✔
273
                continue
7✔
274

275
            ip_list = self.get_ips_from_string(ip_str)
7✔
276
            if not ip_list:
7✔
277
                continue
7✔
278

279
            proxy_count_validated = self.is_proxy_count_valid(ip_list, strict)
7✔
280
            if not proxy_count_validated:
7✔
281
                continue
7✔
282

283
            proxy_list_validated = self.is_proxy_trusted_list_valid(ip_list, strict)
7✔
284
            if not proxy_list_validated:
7✔
285
                continue
7✔
286

287
            client_ip, trusted_route = self.get_best_ip(
7✔
288
                ip_list, proxy_count_validated, proxy_list_validated
289
            )
290

291
            # we found a global ip, return it
292
            if client_ip is not None and client_ip.is_global:
7✔
293
                return client_ip, trusted_route
7✔
294

295
            # we found a private ip, save it
296
            if client_ip is not None and client_ip.is_loopback:
7✔
297
                loopback_list.append(client_ip)
7✔
298
            else:
299
                # if not global (public) or loopback (local), we treat it asd private
300
                private_list.append(client_ip)
7✔
301

302
        # we have not been able to locate a global ip
303
        # it could be the server is running on the intranet
304
        # we will return the first private ip we found
305
        if private_list:
7✔
306
            return private_list[0], False
7✔
307

308
        # we have not been able to locate a global ip, nor a private ip
309
        # it could be the server is running on a loopback address serving local requests
310
        if loopback_list:
7✔
311
            return loopback_list[0], False
7✔
312

313
        # we were unable to find any ip address
314
        return None, False
7✔
315

316
    def get_best_ip(
7✔
317
        self,
318
        ip_list: List[IpAddressType],
319
        proxy_count_validated: bool = True,
320
        proxy_list_validated: bool = True,
321
    ) -> Tuple[OptionalIpAddressType, bool]:
322
        """
323
        Returns the best possible ip for the client.
324
        """
325

326
        if not ip_list:
7✔
327
            logging.warning("Invalid ip list provided.")
×
328
            return None, False
×
329

330
        # the incoming ips match our trusted proxy list
331
        if len(self.proxy_list) > 0 and proxy_list_validated:
7✔
332
            best_client_ip_index = len(self.proxy_list) + 1
7✔
333
            best_client_ip = ip_list[-best_client_ip_index]
7✔
334
            return best_client_ip, True
7✔
335

336
        # the incoming ips match our proxy count
337
        if self.proxy_count > 0 and proxy_count_validated:
7✔
338
            best_client_ip_index = self.proxy_count + 1
7✔
339
            best_client_ip = ip_list[-best_client_ip_index]
7✔
340
            return best_client_ip, True
7✔
341

342
        # we don't track proxy related info, so we just return the first ip
343
        return ip_list[0], False
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc