• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

funilrys / PyFunceble / 5109997145

pending completion
5109997145

push

github-actions

funilrys
Merge remote-tracking branch 'origin/dev' into mtest

296 of 461 new or added lines in 39 files covered. (64.21%)

11297 of 11853 relevant lines covered (95.31%)

11.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.19
/PyFunceble/checker/availability/extras/base.py
1
"""
2
The tool to check the availability or syntax of domain, IP or URL.
3

4
::
5

6

7
    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
8
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
9
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
10
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
11
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
12
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
13

14
Provides the base of all extra handlers.
15

16
Author:
17
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
18

19
Special thanks:
20
    https://pyfunceble.github.io/#/special-thanks
21

22
Contributors:
23
    https://pyfunceble.github.io/#/contributors
24

25
Project link:
26
    https://github.com/funilrys/PyFunceble
27

28
Project documentation:
29
    https://pyfunceble.readthedocs.io/en/dev/
30

31
Project homepage:
32
    https://pyfunceble.github.io/
33

34
License:
35
::
36

37

38
    Copyright 2017, 2018, 2019, 2020, 2022, 2023 Nissar Chababy
39

40
    Licensed under the Apache License, Version 2.0 (the "License");
41
    you may not use this file except in compliance with the License.
42
    You may obtain a copy of the License at
43

44
        http://www.apache.org/licenses/LICENSE-2.0
45

46
    Unless required by applicable law or agreed to in writing, software
47
    distributed under the License is distributed on an "AS IS" BASIS,
48
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49
    See the License for the specific language governing permissions and
50
    limitations under the License.
51
"""
52

53

54
import functools
12✔
55
import socket
12✔
56
from typing import Callable, Dict, List, Optional, Union
12✔
57

58
import requests
12✔
59

60
import PyFunceble.factory
12✔
61
from PyFunceble.checker.availability.status import AvailabilityCheckerStatus
12✔
62
from PyFunceble.helpers.regex import RegexHelper
12✔
63
from PyFunceble.query.dns.query_tool import DNSQueryTool
12✔
64

65

66
class ExtraRuleHandlerBase:
12✔
67
    """
68
    Provides the base of all extra rules handler.
69

70
    :param statatus:
71
        The previously gathered status.
72
    :type status:
73
        :class:`~PyFunceble.checker.availability.status.AvailabilityCheckerStatus`
74
    """
75

76
    _status: Optional[AvailabilityCheckerStatus] = None
12✔
77
    req: Optional[requests.Response] = None
12✔
78
    dns_query_tool: Optional[DNSQueryTool] = None
12✔
79
    regex_helper: Optional[RegexHelper] = None
12✔
80

81
    def __init__(self, status: Optional[AvailabilityCheckerStatus] = None) -> None:
82
        if status is not None:
83
            self.status = status
84

85
        # Be sure that all settings are loaded proprely!!
86
        PyFunceble.factory.Requester.guess_all_settings()
87
        self.dns_query_tool = DNSQueryTool()
88
        self.regex_helper = RegexHelper()
89

90
    def ensure_status_is_given(
12✔
91
        func: Callable[..., "ExtraRuleHandlerBase"]
92
    ):  # pylint: disable=no-self-argument
93
        """
94
        Ensures that the status is given before running the decorated method.
95

96
        :raise TypeError:
97
            If the subject is not a string.
98
        """
99

100
        @functools.wraps(func)
12✔
101
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
102
            if not self.status:
103
                raise TypeError(
104
                    f"<self.status> should be {AvailabilityCheckerStatus}, "
105
                    f"{type(self.status)} given."
106
                )
107

108
            return func(self, *args, **kwargs)  # pylint: disable=not-callable
109

110
        return wrapper
12✔
111

112
    def setup_status_before(
12✔
113
        func: Callable[..., "ExtraRuleHandlerBase"]
114
    ):  # pylint: disable=no-self-argument
115
        """
116
        Ensures that the status is given before running the decorated method.
117

118
        :raise TypeError:
119
            If the subject is not a string.
120
        """
121

122
        @functools.wraps(func)
12✔
123
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
124
            self.status.status_before_extra_rules = self.status.status
125
            self.status.status_source_before_extra_rules = self.status.status_source
126

127
            return func(self, *args, **kwargs)  # pylint: disable=not-callable
128

129
        return wrapper
12✔
130

131
    def setup_status_after(
12✔
132
        func: Callable[..., "ExtraRuleHandlerBase"]
133
    ):  # pylint: disable=no-self-argument
134
        """
135
        Ensures that the status is given before running the decorated method.
136

137
        :raise TypeError:
138
            If the subject is not a string.
139
        """
140

141
        @functools.wraps(func)
12✔
142
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
143
            result = func(self, *args, **kwargs)  # pylint: disable=not-callable
144

145
            if self.status.status_after_extra_rules:
146
                self.status.status = self.status.status_after_extra_rules
147
                self.status.status_source = self.status.status_source_after_extra_rules
148

149
                PyFunceble.facility.Logger.info(
150
                    "Could define the status of %r from our own set of rules.",
151
                    self.status.idna_subject,
152
                )
153
            else:
154
                self.status.status_before_extra_rules = None
155
                self.status.status_source_before_extra_rules = None
156
                self.status.status_after_extra_rules = None
157
                self.status.status_source_after_extra_rules = None
158

159
            return result
160

161
        return wrapper
12✔
162

163
    @property
12✔
164
    def req_url(self) -> Optional[str]:
12✔
165
        """
166
        Provides a viable request URL.
167
        """
168

NEW
169
        if any(self.status.idna_subject.startswith(x) for x in ("http:", "https:")):
×
NEW
170
            return self.status.idna_subject
×
NEW
171
        return f"http://{self.status.idna_subject}:80"
×
172

173
    @property
12✔
174
    def req_url_https(self) -> Optional[str]:
12✔
175
        """
176
        Provides a viable request URL that default to an HTTPS URL.
177
        """
178

NEW
179
        if any(self.status.idna_subject.startswith(x) for x in ("http:", "https:")):
×
NEW
180
            return self.status.idna_subject
×
NEW
181
        return f"https://{self.status.idna_subject}:443"
×
182

183
    @property
12✔
184
    def status(self) -> Optional[AvailabilityCheckerStatus]:
12✔
185
        """
186
        Provides the current state of the :code:`_status` attribute.
187
        """
188

NEW
189
        return self._status
×
190

191
    @status.setter
12✔
192
    def status(self, value: AvailabilityCheckerStatus) -> None:
12✔
193
        """
194
        Sets the status to work with.
195

196
        :param value:
197
            The status to work with.
198

199
        :raise TypeError:
200
            When the given :code:`value` is not a
201
            :class:`~PyFunceble.checker.availability.status.AvailabilityCheckerStatus`.
202
        """
203

NEW
204
        if not isinstance(value, AvailabilityCheckerStatus):
×
NEW
205
            raise TypeError(
×
206
                f"<value> should be {AvailabilityCheckerStatus}, {type(value)} given."
207
            )
208

NEW
209
        self._status = value
×
210

211
    def set_status(self, value: AvailabilityCheckerStatus) -> "ExtraRuleHandlerBase":
12✔
212
        """
213
        Sets the status to work with.
214

215
        :param value:
216
            The status to work with.
217
        """
218

NEW
219
        self.status = value
×
220

NEW
221
        return self
×
222

223
    def do_request(self, *, allow_redirects: bool = True) -> requests.Response:
12✔
224
        """
225
        Do a request and store its response into the `req` attribute.
226

227
        :param bool allow_redirects:
228
            Whether we shoold follow the redirection - or not.
229
        """
230

NEW
231
        self.req = PyFunceble.factory.Requester.get(
×
232
            self.req_url, allow_redirects=allow_redirects
233
        )
234

NEW
235
        return self
×
236

237
    def do_on_body_match(
12✔
238
        self,
239
        url: str,
240
        matches: List[str],
241
        *,
242
        method: Callable[..., "ExtraRuleHandlerBase"],
243
        match_mode: str = "regex",
244
        strict: bool = False,
245
        allow_redirects: bool = False,
246
    ) -> "ExtraRuleHandlerBase":
247
        """
248
        Make a request to the given :code:`url` and run the given :code:`method`,
249
        if one of the given :code:`matches` matches.
250

251
        :param url:
252
            The URL to query.
253
        :param matches:
254
            A list of strings to match.
255
        :param match_mode:
256
            A matching mode. Use :code:`regex` for a regex match, and anything
257
            else for a string match.
258
        :param strict:
259
            Whether we should match any (:code:`False`) or all (:code:`True`).
260
        """
261

NEW
262
        matcher = any if not strict else all
×
263

NEW
264
        def handle_regex_match_mode(_req: requests.Response):
×
NEW
265
            if matcher(
×
266
                self.regex_helper.set_regex(x).match(_req.text, return_match=False)
267
                for x in matches
268
            ):
NEW
269
                method()
×
270

NEW
271
        def handle_string_match_mode(_req: requests.Response):
×
NEW
272
            if matcher(x in _req.text for x in matches):
×
NEW
273
                method()
×
274

NEW
275
        try:
×
NEW
276
            req = PyFunceble.factory.Requester.get(url, allow_redirects=allow_redirects)
×
277

NEW
278
            if match_mode == "regex":
×
NEW
279
                handle_regex_match_mode(req)
×
280
            else:
NEW
281
                handle_string_match_mode(req)
×
NEW
282
        except (
×
283
            PyFunceble.factory.Requester.exceptions.RequestException,
284
            PyFunceble.factory.Requester.exceptions.InvalidURL,
285
            PyFunceble.factory.Requester.exceptions.Timeout,
286
            PyFunceble.factory.Requester.exceptions.ConnectionError,
287
            PyFunceble.factory.Requester.urllib3_exceptions.InvalidHeader,
288
            socket.timeout,
289
        ):
NEW
290
            pass
×
291

NEW
292
        return self
×
293

294
    def do_on_header_match(
12✔
295
        self,
296
        url: str,
297
        matches: Dict[str, List[str]],
298
        *,
299
        method: Callable[..., "ExtraRuleHandlerBase"],
300
        match_mode: str = "regex",
301
        strict: bool = False,
302
        allow_redirects: bool = True,
303
    ) -> "ExtraRuleHandlerBase":
304
        """
305
        Make a request to the given :code:`url` and run the given :code:`method`,
306
        if one of the chosen header matches any of the given matches.
307

308
        :param url:
309
            The URL to query.
310
        :param matches:
311
            A dict representing the match.
312

313
            .. example::
314

315
                {
316
                    "Location": ["foo", "bar"] // try to match foo or bar
317
                }
318
        :param match_mode:
319
            A matching mode. Use :code:`regex` for a regex match, and anything
320
            else for a string match.
321
        :param strict:
322
            Whether we should match any (:code:`False`) or all (:code:`True`).
323
        :param allow_redirects:
324
            Whether we should allow redirect.
325
        """
326

NEW
327
        matcher = any if not strict else all
×
328

NEW
329
        def handle_regex_match_mode(_req: requests.Response):
×
NEW
330
            matches2search_result = {}
×
331

NEW
332
            for header, loc_matches in matches:
×
NEW
333
                matches2search_result[header] = False
×
334

NEW
335
                if header not in _req.headers:
×
NEW
336
                    continue
×
337

NEW
338
                if matcher(
×
339
                    self.regex_helper.set_regex(x).match(
340
                        _req.headers[header], return_match=False
341
                    )
342
                    for x in loc_matches
343
                ):
NEW
344
                    matches2search_result[header] = True
×
NEW
345
                    continue
×
346

NEW
347
            if matcher(x for x in matches2search_result.values()):
×
NEW
348
                method()
×
349

NEW
350
        def handle_string_match_mode(_req: requests.Response):
×
NEW
351
            matches2search_result = {}
×
352

NEW
353
            for header, loc_matches in matches.items():
×
NEW
354
                matches2search_result[header] = False
×
355

NEW
356
                if header not in _req.headers:
×
NEW
357
                    continue
×
358

NEW
359
                if matcher(x in _req.headers[header] for x in loc_matches):
×
NEW
360
                    matches2search_result[header] = True
×
NEW
361
                    continue
×
362

NEW
363
            if matcher(x for x in matches2search_result.values()):
×
NEW
364
                method()
×
365

NEW
366
        try:
×
NEW
367
            req = PyFunceble.factory.Requester.get(url, allow_redirects=allow_redirects)
×
368

NEW
369
            if match_mode == "regex":
×
NEW
370
                handle_regex_match_mode(req)
×
371
            else:
NEW
372
                handle_string_match_mode(req)
×
NEW
373
        except (
×
374
            PyFunceble.factory.Requester.exceptions.RequestException,
375
            PyFunceble.factory.Requester.exceptions.InvalidURL,
376
            PyFunceble.factory.Requester.exceptions.Timeout,
377
            PyFunceble.factory.Requester.exceptions.ConnectionError,
378
            PyFunceble.factory.Requester.urllib3_exceptions.InvalidHeader,
379
            socket.timeout,
380
        ):
NEW
381
            pass
×
382

NEW
383
        return self
×
384

385
    def do_dns_lookup(self, *, subject: str, query_type: str) -> List[str]:
12✔
386
        """
387
        Do a DNS lookup and return its response.
388

389
        :param subject:
390
            The subject to query.
391
        :param query_type:
392
            The query type.
393
        """
394

NEW
395
        return (
×
396
            self.dns_query_tool.set_query_record_type(query_type)
397
            .set_subject(subject)
398
            .query()
399
        )
400

401
    def start(self) -> "ExtraRuleHandlerBase":
12✔
402
        """
403
        Starts the gathering process.
404
        """
405

406
        raise NotImplementedError()
407

408
    def switch_to_down(self) -> "ExtraRuleHandlerBase":
12✔
409
        """
410
        Switches the status to inactive.
411
        """
412

NEW
413
        self.status.status_after_extra_rules = PyFunceble.storage.STATUS.down
×
NEW
414
        self.status.status_source_after_extra_rules = "SPECIAL"
×
415

NEW
416
        return self
×
417

418
    def switch_to_down_if_status_code(
12✔
419
        self, status_code: Union[int, List[int]]
420
    ) -> "ExtraRuleHandlerBase":
421
        """
422
        Switches the status to inactive if the caught status code matches one
423
        of the given one.
424
        """
425

NEW
426
        if not isinstance(status_code, (list, tuple)):
×
NEW
427
            status_code = [status_code]
×
428

NEW
429
        if any(self.status.http_status_code == x for x in status_code):
×
NEW
430
            self.switch_to_down()
×
431

NEW
432
        return self
×
433

434
    def switch_down_if_dns_match(
12✔
435
        self, query_type: str, matches: list
436
    ) -> "ExtraRuleHandlerBase":
437
        """
438
        Switches the status to inactive if the DNS query of the type :code:`query_type`
439
        matches any of the given :code:`matches`.
440

441
        :param query_type:
442
            A DNS query type.
443
        :param matches:
444
            A list of string (not regex) to match.
445
        """
446

NEW
447
        for record in (
×
448
            self.dns_query_tool.set_query_record_type(query_type)
449
            .set_subject(self.status.netloc)
450
            .query()
451
        ):
NEW
452
            for match in matches:
×
NEW
453
                if match in record:
×
NEW
454
                    self.switch_to_down()
×
NEW
455
                    break
×
456

NEW
457
        return self
×
458

459
    def switch_to_up(self) -> "ExtraRuleHandlerBase":
12✔
460
        """
461
        Switches the status to active.
462
        """
463

NEW
464
        self.status.status_after_extra_rules = PyFunceble.storage.STATUS.up
×
NEW
465
        self.status.status_source_after_extra_rules = "SPECIAL"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc