• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

funilrys / PyFunceble / 17046250819

18 Aug 2025 04:12PM UTC coverage: 96.648% (+1.9%) from 94.721%
17046250819

push

github

funilrys
Bump verstion to v4.3.0

1 of 1 new or added line in 1 file covered. (100.0%)

154 existing lines in 14 files now uncovered.

11967 of 12382 relevant lines covered (96.65%)

8.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.3
/PyFunceble/query/requests/adapter/base.py
1
"""
2
The tool to check the availability or syntax of domain, IP or URL.
3

4
::
5

6

7
    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
8
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
9
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
10
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
11
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
12
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
13

14
Provides the base of all our adapter.
15

16
Author:
17
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
18

19
Special thanks:
20
    https://pyfunceble.github.io/#/special-thanks
21

22
Contributors:
23
    https://pyfunceble.github.io/#/contributors
24

25
Project link:
26
    https://github.com/funilrys/PyFunceble
27

28
Project documentation:
29
    https://docs.pyfunceble.com
30

31
Project homepage:
32
    https://pyfunceble.github.io/
33

34
License:
35
::
36

37

38
    Copyright 2017, 2018, 2019, 2020, 2022, 2023, 2024, 2025 Nissar Chababy
39

40
    Licensed under the Apache License, Version 2.0 (the "License");
41
    you may not use this file except in compliance with the License.
42
    You may obtain a copy of the License at
43

44
        https://www.apache.org/licenses/LICENSE-2.0
45

46
    Unless required by applicable law or agreed to in writing, software
47
    distributed under the License is distributed on an "AS IS" BASIS,
48
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49
    See the License for the specific language governing permissions and
50
    limitations under the License.
51
"""
52

53
import secrets
9✔
54
from typing import Optional
9✔
55

56
import requests.adapters
9✔
57
import requests.exceptions
9✔
58
import requests.models
9✔
59

60
from PyFunceble.checker.syntax.ip import IPSyntaxChecker
9✔
61
from PyFunceble.query.dns.query_tool import DNSQueryTool
9✔
62

63

64
class RequestAdapterBase(requests.adapters.HTTPAdapter):
9✔
65
    """
66
    Extends the built-in HTTP adapter and acts as a base for all our own
67
    adapter.
68
    """
69

70
    NOT_RESOLVED_STD_HOSTNAME: str = (
9✔
71
        f"{secrets.token_hex(12)}.mock-resolver.pyfunceble.com"
72
    )
73

74
    resolving_cache: dict = {}
9✔
75
    resolving_use_cache: bool = False
9✔
76
    timeout: float = 5.0
9✔
77
    proxy_pattern: dict = {}
9✔
78
    ssl_context: Optional[dict] = None
9✔
79

80
    def __init__(self, *args, **kwargs):
81
        if "timeout" in kwargs:
82
            self.timeout = float(kwargs["timeout"])
83
            del kwargs["timeout"]
84

85
        if "max_retries" in kwargs:
86
            kwargs["max_retries"] = requests.adapters.Retry(
87
                total=kwargs["max_retries"], respect_retry_after_header=False
88
            )
89

90
        if "dns_query_tool" in kwargs:
91
            self.dns_query_tool = kwargs["dns_query_tool"]
92
            del kwargs["dns_query_tool"]
93
        else:
94
            self.dns_query_tool = DNSQueryTool()
95

96
        if "proxy_pattern" in kwargs:
97
            self.proxy_pattern = kwargs["proxy_pattern"]
98
            del kwargs["proxy_pattern"]
99
        else:
100
            self.proxy_pattern = {}
101

102
        if "ssl_context" in kwargs:
103
            self.ssl_context = kwargs["ssl_context"]
104
            del kwargs["ssl_context"]
105

106
        super().__init__(*args, **kwargs)
107

108
    @staticmethod
9✔
109
    def fake_response() -> requests.models.Response:
9✔
110
        """
111
        Provides the fake response that is provided when we couldn't resolve the
112
        given domain.
113
        """
114

UNCOV
115
        raise requests.exceptions.ConnectionError("Could not resolve.")
×
116

117
    @staticmethod
9✔
118
    def extract_extension(subject: str) -> Optional[str]:
9✔
119
        """
120
        Provides the extension of the given subject.
121

122
        .. versionchanged:: 4.1.1.dev
123
            Handle the case that the given subject does not have a `.` (point).
124

125
        :param str subject:
126
            The subject to get extract the extension from.
127

128
        :raise TypeError:
129
            When the given :code:`subject` is not a :py:class:`str`.
130
        :raise ValueError:
131
            When the given :code:`subject` is an empty :py:class:`str`.
132
        """
133

134
        if not subject or "." not in subject:
9✔
135
            return None
×
136

137
        if subject.endswith("."):
9✔
138
            # Absolute needs a little correction.
139
            last_point = subject[:-1].rfind(".")
×
140
        else:
141
            last_point = subject.rindex(".")
9✔
142

143
        extension = subject[last_point + 1 :]
9✔
144

145
        if extension.endswith("."):
9✔
UNCOV
146
            return extension[:-1]
×
147
        return extension
9✔
148

149
    def fetch_proxy_from_pattern(self, subject: str) -> dict:
9✔
150
        """
151
        Provides the proxy settings to use for the given subject.
152

153
        .. versionchanged:: 4.1.1.dev
154
            Handle the case that the given subject has no extension/TLD.
155

156
        :param str subject:
157
            The subject to work with.
158

159
        :raise TypeError:
160
            When the given :code:`subject` is not a :py:class:`str`.
161
        :raise ValueError:
162
            When the given :code:`subject` is an empty :py:class:`str`.
163
        """
164

165
        def correct_input(pattern_input: dict) -> dict:
9✔
166
            result = {}
9✔
167

168
            if "http" in pattern_input and pattern_input["http"]:
9✔
169
                result["http"] = pattern_input["http"]
×
170

171
            if "https" in pattern_input and pattern_input["https"]:
9✔
172
                result["https"] = pattern_input["https"]
×
173

174
            if "http" in result and "https" not in result:
9✔
UNCOV
175
                result["https"] = result["http"]
×
176

177
            if "https" in result and "http" not in result:
9✔
178
                result["http"] = result["https"]
×
179

180
            return result
9✔
181

182
        extension = self.extract_extension(subject)
9✔
183

184
        proxies = {}
9✔
185

186
        if extension and "rules" in self.proxy_pattern:
9✔
187
            for rule in self.proxy_pattern["rules"]:
9✔
188
                local_proxy = {}
×
189

190
                if "http" in rule and rule["http"]:
×
191
                    local_proxy["http"] = rule["http"]
×
192
                if "https" in rule and rule["https"]:
×
UNCOV
193
                    local_proxy["https"] = rule["https"]
×
194

195
                if not local_proxy:
×
UNCOV
196
                    continue
×
197

UNCOV
198
                if "tld" in rule and extension in rule["tld"]:
×
UNCOV
199
                    proxies = correct_input(local_proxy)
×
UNCOV
200
                    break
×
201

202
        if not proxies and "global" in self.proxy_pattern:
9✔
203
            proxies = correct_input(self.proxy_pattern["global"])
9✔
204

205
        return proxies
9✔
206

207
    def resolve_with_cache(self, hostname: str) -> Optional[str]:
9✔
208
        """
209
        Try to resolve using an internal cache.
210
        """
211

UNCOV
212
        if hostname not in self.resolving_cache:
×
UNCOV
213
            self.resolving_cache[hostname] = self.resolve_without_cache(hostname)
×
214

UNCOV
215
        return self.resolving_cache[hostname]
×
216

217
    def resolve_without_cache(self, hostname: str) -> Optional[str]:
9✔
218
        """
219
        Resolves the IP of the given hostname.
220

221
        :param hostname:
222
            The hostname to get resolve.
223
        """
224

225
        def get_last_cname(subject: str, recursion_depth: int = 60) -> Optional[str]:
9✔
226
            """
227
            Given a subject, this function tries to query the CNAME until there
228
            is none.
229

230
            :param subject:
231
                The first subject.
232
            """
233

234
            last_cname_result = []
9✔
235
            last_cname_new_subject = subject
9✔
236

237
            depth = 0
9✔
238

239
            while depth < recursion_depth:
9✔
240
                local_last_cname_result = (
9✔
241
                    self.dns_query_tool.set_query_record_type("CNAME")
242
                    .set_subject(last_cname_new_subject)
243
                    .query()
244
                )
245

246
                depth += 1
9✔
247

248
                if any(x in last_cname_result for x in local_last_cname_result):
9✔
UNCOV
249
                    break
×
250

251
                last_cname_result.extend(local_last_cname_result)
9✔
252

253
                if local_last_cname_result:
9✔
UNCOV
254
                    last_cname_new_subject = local_last_cname_result[0]
×
255
                else:
256
                    break
7✔
257

258
            try:
9✔
259
                return last_cname_result[-1]
9✔
260
            except IndexError:
9✔
261
                return None
9✔
262

263
        result = set()
9✔
264

265
        if not IPSyntaxChecker(hostname).is_valid():
9✔
266
            last_cname = get_last_cname(hostname)
9✔
267

268
            if last_cname:
9✔
UNCOV
269
                result.update(
×
270
                    self.dns_query_tool.set_query_record_type("A")
271
                    .set_subject(last_cname)
272
                    .query()
273
                )
274
            else:
275
                result.update(
9✔
276
                    self.dns_query_tool.set_query_record_type("A")
277
                    .set_subject(hostname)
278
                    .query()
279
                )
280
        else:
UNCOV
281
            result.add(hostname)
×
282

283
        if result:
9✔
284
            return result.pop()
9✔
285
        return None
×
286

287
    def resolve(self, hostname: str) -> Optional[str]:
9✔
288
        """
289
        Resolves with the preferred method.
290
        """
291

292
        if hostname:
9✔
293
            if self.resolving_use_cache:
9✔
UNCOV
294
                return self.resolve_with_cache(hostname)
×
295
            return self.resolve_without_cache(hostname)
9✔
UNCOV
296
        return None
×
297

298
    def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):
9✔
299
        """
300
        Overwrite the upstream :code:`init_poolmanager` method to ensure that we
301
        use our own ssl context - when given.
302
        """
303

304
        pool_kwargs.pop("ssl_context", None)
9✔
305

306
        return super().init_poolmanager(
9✔
307
            connections, maxsize, block, ssl_context=self.ssl_context, **pool_kwargs
308
        )
309

310
    def proxy_manager_for(self, proxy, **proxy_kwargs):
9✔
311
        """
312
        Overwrite the upstream :code:`proxy_manager_for` method to ensure that we
313
        use our own ssl context - when given.
314
        """
315

UNCOV
316
        _ = proxy_kwargs.pop("ssl_context", None)
×
317

UNCOV
318
        return super().proxy_manager_for(
×
319
            proxy, ssl_context=self.ssl_context, **proxy_kwargs
320
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc