• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

un33k / python-ipware / 6641404654

25 Oct 2023 01:51PM CUT coverage: 93.333%. Remained the same
6641404654

push

github

un33k
github action

126 of 135 relevant lines covered (93.33%)

5.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.23
/python_ipware/python_ipware.py
1
import ipaddress
6✔
2
import logging
6✔
3

4
from typing import Any, Dict, List, Optional, Tuple, Union
6✔
5

6
IpAddressType = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
6✔
7
OptionalIpAddressType = Optional[IpAddressType]
6✔
8

9

10
class IpWareMeta:
6✔
11
    """
12
    A class that handles meta data frm an HTTP request.
13
    """
14

15
    def __init__(
6✔
16
        self,
17
        precedence: Optional[Tuple[str, ...]] = None,
18
        leftmost: bool = True,
19
    ) -> None:
20
        self.precedence = precedence or (
6✔
21
            "X_FORWARDED_FOR",  # Load balancers or proxies such as AWS ELB (default client is `left-most` [`<client>, <proxy1>, <proxy2>`])
22
            "HTTP_X_FORWARDED_FOR",  # Similar to X_FORWARDED_TO
23
            "HTTP_CLIENT_IP",  # Standard headers used by providers such as Amazon EC2, Heroku etc.
24
            "HTTP_X_REAL_IP",  # Standard headers used by providers such as Amazon EC2, Heroku etc.
25
            "HTTP_X_FORWARDED",  # Squid and others
26
            "HTTP_X_CLUSTER_CLIENT_IP",  # Rackspace LB and Riverbed Stingray
27
            "HTTP_FORWARDED_FOR",  # RFC 7239
28
            "HTTP_FORWARDED",  # RFC 7239
29
            "HTTP_VIA",  # Squid and others
30
            "X-CLIENT-IP",  # Microsoft Azure
31
            "X-REAL-IP",  # NGINX
32
            "X-CLUSTER-CLIENT-IP",  # Rackspace Cloud Load Balancers
33
            "X_FORWARDED",  # Squid
34
            "FORWARDED_FOR",  # RFC 7239
35
            "CF-CONNECTING-IP",  # CloudFlare
36
            "TRUE-CLIENT-IP",  # CloudFlare Enterprise,
37
            "FASTLY-CLIENT-IP",  # Firebase, Fastly
38
            "FORWARDED",  # RFC 7239
39
            "CLIENT-IP",  # Akamai and Cloudflare: True-Client-IP and Fastly: Fastly-Client-IP
40
            "REMOTE_ADDR",  # Default
41
        )
42
        self.leftmost = leftmost
6✔
43

44

45
class IpWareIpAddress:
6✔
46
    """
47
    A class that handles IP address data from an HTTP request.
48
    """
49

50
    def extract_ipv4_only(self, ip_address: str) -> str:
6✔
51
        """
52
        Given an IPv4 address or address:port, it extracts the IP address
53
        @param ip_str: IP address or address:port
54
        @return: IP address
55
        """
56

57
        if ip_address:
6✔
58
            # handle ipv4 addresses with port
59
            if ":" in ip_address:
6✔
60
                ip_address = ip_address.split(":")[0]
6✔
61
                return ip_address.strip()
6✔
62

63
            return ip_address.strip()
6✔
64

65
        return ""
×
66

67
    def extract_ipv6_only(self, ip_address: str) -> str:
6✔
68
        """
69
        Given an IPv6 address or address:port, it extracts the IP address
70
        @param ip_str: IP address or address:port
71
        @return: IP address
72
        """
73

74
        if ip_address:
6✔
75
            # handle ipv6 addresses with port
76
            if "]:" in ip_address:
6✔
77
                ip_address = ip_address.split("]:")[0]
6✔
78
                ip_address = ip_address.replace("[", "")
6✔
79
                return ip_address.strip()
6✔
80

81
            return ip_address.strip()
6✔
82

83
        return ""
×
84

85
    def get_ip_object(
6✔
86
        self,
87
        ip_str: str,
88
    ) -> OptionalIpAddressType:
89
        """
90
        Given an IP address or address:port, it parses the IP address
91
        @param ip_str: IP address or address:port
92
        @return: IP address of type IPv4Address or IPv6Address
93
        """
94

95
        ip: OptionalIpAddressType = None
6✔
96
        if ip_str:
6✔
97
            try:
6✔
98
                # try to parse as IPv6 address with optional port
99
                ipv6 = self.extract_ipv6_only(ip_str)
6✔
100
                ip = ipaddress.IPv6Address(ipv6)
6✔
101
                ip = ip.ipv4_mapped or ip
6✔
102
            except ipaddress.AddressValueError:
6✔
103
                try:
6✔
104
                    # try to parse as IPv4 address with optional port
105
                    ipv4 = self.extract_ipv4_only(ip_str)
6✔
106
                    ip = ipaddress.IPv4Address(ipv4)
6✔
107
                except ipaddress.AddressValueError:
6✔
108
                    # not a valid IP address, return None
109
                    logging.exception("Invalid ip address. {0}".format(ip_str))
6✔
110
                    ip = None
6✔
111
        return ip
6✔
112

113
    def get_ips_from_string(
6✔
114
        self,
115
        ip_str: str,
116
    ) -> Optional[List[IpAddressType]]:
117
        """
118
        Given a comma separated list of IP addresses or address:port, it parses the IP addresses
119
        @param ip_str: comma separated list of IP addresses or address:port
120
        @return: list of IP addresses of type IPv4Address or IPv6Address
121
        """
122
        ip_list: List[IpAddressType] = []
6✔
123

124
        for ip_address in ip_str.split(","):
6✔
125
            ip = self.get_ip_object(ip_address.strip())
6✔
126
            if ip:
6✔
127
                ip_list.append(ip)
6✔
128
            else:
129
                # we have at least one invalid IP address, return empty list, instead
130
                return None
6✔
131

132
        if not self.leftmost:
6✔
133
            ip_list.reverse()
6✔
134

135
        return ip_list
6✔
136

137

138
class IpWareProxy:
6✔
139
    """
140
    A class that handles proxy data from an HTTP request.
141
    """
142

143
    def __init__(
6✔
144
        self,
145
        proxy_count: int = 0,
146
        proxy_list: Optional[List[str]] = None,
147
    ) -> None:
148
        if proxy_count is None or proxy_count < 0:
6✔
149
            raise ValueError("proxy_count must be a positive integer")
×
150

151
        self.proxy_count = proxy_count
6✔
152
        self.proxy_list = self._is_valid_proxy_trusted_list(proxy_list or [])
6✔
153

154
    def _is_valid_proxy_trusted_list(self, proxy_list: Any) -> List[str]:
6✔
155
        """
156
        Checks if the proxy list is a valid list of strings
157
        @return: proxy list or raises an exception
158
        """
159

160
        if not isinstance(proxy_list, list):
6✔
161
            raise ValueError("Parameter must be a list")
×
162
        if not all(isinstance(x, str) for x in proxy_list):
6✔
163
            raise ValueError("All elements in list must be strings")
×
164

165
        return proxy_list
6✔
166

167
    def is_proxy_count_valid(
6✔
168
        self, ip_list: List[IpAddressType], strict: bool = False
169
    ) -> bool:
170
        """
171
        Checks if the proxy count is valid
172
        @param ip_list: list of ip addresses
173
        @param strict: if True, we must have exactly proxy_count proxies
174
        @return: True if the proxy count is valid, False otherwise
175
        """
176
        if self.proxy_count < 1:
6✔
177
            return True
6✔
178

179
        ip_count: int = len(ip_list)
6✔
180
        if ip_count < 1:
6✔
181
            return False
×
182

183
        if strict:
6✔
184
            # our first proxy takes the last ip address and treats it as client ip
185
            return self.proxy_count == ip_count - 1
6✔
186

187
        # the client could have gone through their own proxy and included extra ips
188
        # client could be sending in the header: X-Forwarded-For: <fake_ip>, <client_ip>
189
        return ip_count - 1 > self.proxy_count
6✔
190

191
    def is_proxy_trusted_list_valid(
6✔
192
        self,
193
        ip_list: List[IpAddressType],
194
        strict: bool = False,
195
    ) -> bool:
196
        """
197
        Checks if the proxy list is valid (all proxies are in the proxy_list)
198
        @param ip_list: list of ip addresses
199
        @param strict: if True, we must have exactly proxy_count proxies
200
        @return: client's best match ip address or False
201
        """
202
        if not self.proxy_list:
6✔
203
            return True
6✔
204

205
        ip_count = len(ip_list)
6✔
206
        proxy_list_count = len(self.proxy_list)
6✔
207

208
        # in strict mode, total ip count must be 1 more than proxy count
209
        if strict and ip_count - 1 != proxy_list_count:
6✔
210
            return False
6✔
211

212
        # total ip count (client + proxies) must be more than proxy count
213
        if ip_count - 1 < proxy_list_count:
6✔
214
            return False
×
215

216
        # start from the end, slice the incoming ip list to the same length as the trusted proxy list
217
        ip_list_slice = ip_list[-proxy_list_count:]
6✔
218
        for index, value in enumerate(ip_list_slice):
6✔
219
            if not str(value).startswith(self.proxy_list[index]):
6✔
220
                return False
6✔
221

222
        # now all we need is to return the first ip in the list that is not in the trusted proxy list
223
        # best_client_ip_index = proxy_list_count + 1
224
        # best_client_ip = ip_list[-best_client_ip_index]
225

226
        return True
6✔
227

228

229
class IpWare(IpWareMeta, IpWareProxy, IpWareIpAddress):
6✔
230
    """
231
    A class that makes best effort to determine the client's IP address.
232
    """
233

234
    def __init__(
6✔
235
        self,
236
        precedence: Optional[Tuple[str, ...]] = None,
237
        leftmost: bool = True,
238
        proxy_count: int = 0,
239
        proxy_list: Optional[List[str]] = None,
240
    ) -> None:
241
        IpWareMeta.__init__(self, precedence, leftmost)
6✔
242
        IpWareProxy.__init__(self, proxy_count or 0, proxy_list or [])
6✔
243

244
    def get_meta_value(self, meta: Dict[str, str], key: str) -> str:
6✔
245
        """
246
        Given a key, it returns a cleaned up version of the value
247
        @param key: the key to lookup
248
        @return: the value of the key or empty string
249
        """
250
        meta = meta or {}
6✔
251
        return meta.get(key, meta.get(key.replace("_", "-"), "")).strip()
6✔
252

253
    def get_meta_values(self, meta: Dict[str, str]) -> List[str]:
6✔
254
        """
255
        Given a list of keys, it returns a list of cleaned up values
256
        @return: a list of values
257
        """
258
        return [self.get_meta_value(meta, key) for key in self.precedence]
6✔
259

260
    def get_client_ip(
6✔
261
        self,
262
        meta: Dict[str, str],
263
        strict: bool = False,
264
    ) -> Tuple[OptionalIpAddressType, bool]:
265
        """
266
        Returns the client's IP address.
267
        """
268

269
        loopback_list: List[IpAddressType] = []
6✔
270
        private_list: List[OptionalIpAddressType] = []
6✔
271

272
        for ip_str in self.get_meta_values(meta):
6✔
273
            if not ip_str:
6✔
274
                continue
6✔
275

276
            ip_list = self.get_ips_from_string(ip_str)
6✔
277
            if not ip_list:
6✔
278
                continue
6✔
279

280
            proxy_count_validated = self.is_proxy_count_valid(ip_list, strict)
6✔
281
            if not proxy_count_validated:
6✔
282
                continue
6✔
283

284
            proxy_list_validated = self.is_proxy_trusted_list_valid(ip_list, strict)
6✔
285
            if not proxy_list_validated:
6✔
286
                continue
6✔
287

288
            client_ip, trusted_route = self.get_best_ip(
6✔
289
                ip_list, proxy_count_validated, proxy_list_validated
290
            )
291

292
            # we found a global ip, return it
293
            if client_ip is not None and client_ip.is_global:
6✔
294
                return client_ip, trusted_route
6✔
295

296
            # we found a private ip, save it
297
            if client_ip is not None and client_ip.is_loopback:
6✔
298
                loopback_list.append(client_ip)
6✔
299
            else:
300
                # if not global (public) or loopback (local), we treat it asd private
301
                private_list.append(client_ip)
6✔
302

303
        # we have not been able to locate a global ip
304
        # it could be the server is running on the intranet
305
        # we will return the first private ip we found
306
        if private_list:
6✔
307
            return private_list[0], False
6✔
308

309
        # we have not been able to locate a global ip, nor a private ip
310
        # it could be the server is running on a loopback address serving local requests
311
        if loopback_list:
6✔
312
            return loopback_list[0], False
6✔
313

314
        # we were unable to find any ip address
315
        return None, False
6✔
316

317
    def get_best_ip(
6✔
318
        self,
319
        ip_list: List[IpAddressType],
320
        proxy_count_validated: bool = True,
321
        proxy_list_validated: bool = True,
322
    ) -> Tuple[OptionalIpAddressType, bool]:
323
        """
324
        Returns the best possible ip for the client.
325
        """
326

327
        if not ip_list:
6✔
328
            logging.warning("Invalid ip list provided.")
×
329
            return None, False
×
330

331
        # the incoming ips match our trusted proxy list
332
        if len(self.proxy_list) > 0 and proxy_list_validated:
6✔
333
            best_client_ip_index = len(self.proxy_list) + 1
6✔
334
            best_client_ip = ip_list[-best_client_ip_index]
6✔
335
            return best_client_ip, True
6✔
336

337
        # the incoming ips match our proxy count
338
        if self.proxy_count > 0 and proxy_count_validated:
6✔
339
            best_client_ip_index = self.proxy_count + 1
6✔
340
            best_client_ip = ip_list[-best_client_ip_index]
6✔
341
            return best_client_ip, True
6✔
342

343
        # we don't track proxy related info, so we just return the first ip
344
        return ip_list[0], False
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc