• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

funilrys / PyFunceble / 5099208467

pending completion
5099208467

push

github-actions

funilrys
Reintroduce accidently removed method.

1 of 3 new or added lines in 1 file covered. (33.33%)

11297 of 11857 relevant lines covered (95.28%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.19
/PyFunceble/checker/availability/extras/base.py
1
"""
2
The tool to check the availability or syntax of domain, IP or URL.
3

4
::
5

6

7
    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
8
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
9
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
10
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
11
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
12
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
13

14
Provides the base of all extra handlers.
15

16
Author:
17
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
18

19
Special thanks:
20
    https://pyfunceble.github.io/#/special-thanks
21

22
Contributors:
23
    https://pyfunceble.github.io/#/contributors
24

25
Project link:
26
    https://github.com/funilrys/PyFunceble
27

28
Project documentation:
29
    https://pyfunceble.readthedocs.io/en/dev/
30

31
Project homepage:
32
    https://pyfunceble.github.io/
33

34
License:
35
::
36

37

38
    Copyright 2017, 2018, 2019, 2020, 2022, 2023 Nissar Chababy
39

40
    Licensed under the Apache License, Version 2.0 (the "License");
41
    you may not use this file except in compliance with the License.
42
    You may obtain a copy of the License at
43

44
        http://www.apache.org/licenses/LICENSE-2.0
45

46
    Unless required by applicable law or agreed to in writing, software
47
    distributed under the License is distributed on an "AS IS" BASIS,
48
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49
    See the License for the specific language governing permissions and
50
    limitations under the License.
51
"""
52

53

54
import functools
1✔
55
import socket
1✔
56
from typing import Callable, Dict, List, Optional, Union
1✔
57

58
import requests
1✔
59

60
import PyFunceble.factory
1✔
61
from PyFunceble.checker.availability.status import AvailabilityCheckerStatus
1✔
62
from PyFunceble.helpers.regex import RegexHelper
1✔
63
from PyFunceble.query.dns.query_tool import DNSQueryTool
1✔
64

65

66
class ExtraRuleHandlerBase:
1✔
67
    """
68
    Provides the base of all extra rules handler.
69

70
    :param statatus:
71
        The previously gathered status.
72
    :type status:
73
        :class:`~PyFunceble.checker.availability.status.AvailabilityCheckerStatus`
74
    """
75

76
    _status: Optional[AvailabilityCheckerStatus] = None
1✔
77
    req: Optional[requests.Response] = None
1✔
78
    dns_query_tool: Optional[DNSQueryTool] = None
1✔
79
    regex_helper: Optional[RegexHelper] = None
1✔
80

81
    def __init__(self, status: Optional[AvailabilityCheckerStatus] = None) -> None:
82
        if status is not None:
83
            self.status = status
84

85
        # Be sure that all settings are loaded proprely!!
86
        PyFunceble.factory.Requester.guess_all_settings()
87
        self.dns_query_tool = DNSQueryTool()
88
        self.regex_helper = RegexHelper()
89

90
    def ensure_status_is_given(
1✔
91
        func: Callable[..., "ExtraRuleHandlerBase"]
92
    ):  # pylint: disable=no-self-argument
93
        """
94
        Ensures that the status is given before running the decorated method.
95

96
        :raise TypeError:
97
            If the subject is not a string.
98
        """
99

100
        @functools.wraps(func)
1✔
101
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
102
            if not self.status:
103
                raise TypeError(
104
                    f"<self.status> should be {AvailabilityCheckerStatus}, "
105
                    f"{type(self.status)} given."
106
                )
107

108
            return func(self, *args, **kwargs)  # pylint: disable=not-callable
109

110
        return wrapper
1✔
111

112
    def setup_status_before(
1✔
113
        func: Callable[..., "ExtraRuleHandlerBase"]
114
    ):  # pylint: disable=no-self-argument
115
        """
116
        Ensures that the status is given before running the decorated method.
117

118
        :raise TypeError:
119
            If the subject is not a string.
120
        """
121

122
        @functools.wraps(func)
1✔
123
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
124
            self.status.status_before_extra_rules = self.status.status
125
            self.status.status_source_before_extra_rules = self.status.status_source
126

127
            return func(self, *args, **kwargs)  # pylint: disable=not-callable
128

129
        return wrapper
1✔
130

131
    def setup_status_after(
1✔
132
        func: Callable[..., "ExtraRuleHandlerBase"]
133
    ):  # pylint: disable=no-self-argument
134
        """
135
        Ensures that the status is given before running the decorated method.
136

137
        :raise TypeError:
138
            If the subject is not a string.
139
        """
140

141
        @functools.wraps(func)
1✔
142
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
143
            result = func(self, *args, **kwargs)  # pylint: disable=not-callable
144

145
            if self.status.status_after_extra_rules:
146
                self.status.status = self.status.status_after_extra_rules
147
                self.status.status_source = self.status.status_source_after_extra_rules
148

149
                PyFunceble.facility.Logger.info(
150
                    "Could define the status of %r from our own set of rules.",
151
                    self.status.idna_subject,
152
                )
153
            else:
154
                self.status.status_before_extra_rules = None
155
                self.status.status_source_before_extra_rules = None
156
                self.status.status_after_extra_rules = None
157
                self.status.status_source_after_extra_rules = None
158

159
            return result
160

161
        return wrapper
1✔
162

163
    @property
1✔
164
    def req_url(self) -> Optional[str]:
1✔
165
        """
166
        Provides a viable request URL.
167
        """
168

169
        if any(self.status.idna_subject.startswith(x) for x in ("http:", "https:")):
×
170
            return self.status.idna_subject
×
171
        return f"http://{self.status.idna_subject}:80"
×
172

173
    @property
1✔
174
    def req_url_https(self) -> Optional[str]:
1✔
175
        """
176
        Provides a viable request URL that default to an HTTPS URL.
177
        """
178

179
        if any(self.status.idna_subject.startswith(x) for x in ("http:", "https:")):
×
180
            return self.status.idna_subject
×
181
        return f"https://{self.status.idna_subject}:443"
×
182

183
    @property
1✔
184
    def status(self) -> Optional[AvailabilityCheckerStatus]:
1✔
185
        """
186
        Provides the current state of the :code:`_status` attribute.
187
        """
188

189
        return self._status
×
190

191
    @status.setter
1✔
192
    def status(self, value: AvailabilityCheckerStatus) -> None:
1✔
193
        """
194
        Sets the status to work with.
195

196
        :param value:
197
            The status to work with.
198

199
        :raise TypeError:
200
            When the given :code:`value` is not a
201
            :class:`~PyFunceble.checker.availability.status.AvailabilityCheckerStatus`.
202
        """
203

204
        if not isinstance(value, AvailabilityCheckerStatus):
×
205
            raise TypeError(
×
206
                f"<value> should be {AvailabilityCheckerStatus}, {type(value)} given."
207
            )
208

209
        self._status = value
×
210

211
    def set_status(self, value: AvailabilityCheckerStatus) -> "ExtraRuleHandlerBase":
1✔
212
        """
213
        Sets the status to work with.
214

215
        :param value:
216
            The status to work with.
217
        """
218

219
        self.status = value
×
220

221
        return self
×
222

223
    def do_request(self, *, allow_redirects: bool = True) -> requests.Response:
1✔
224
        """
225
        Do a request and store its response into the `req` attribute.
226

227
        :param bool allow_redirects:
228
            Whether we shoold follow the redirection - or not.
229
        """
230

NEW
231
        self.req = PyFunceble.factory.Requester.get(
×
232
            self.req_url, allow_redirects=allow_redirects
233
        )
234

NEW
235
        return self
×
236

237
    def do_on_body_match(
1✔
238
        self,
239
        url: str,
240
        matches: List[str],
241
        *,
242
        method: Callable[..., "ExtraRuleHandlerBase"],
243
        match_mode: str = "regex",
244
        strict: bool = False,
245
        allow_redirects: bool = False,
246
    ) -> "ExtraRuleHandlerBase":
247
        """
248
        Make a request to the given :code:`url` and run the given :code:`method`,
249
        if one of the given :code:`matches` matches.
250

251
        :param url:
252
            The URL to query.
253
        :param matches:
254
            A list of strings to match.
255
        :param match_mode:
256
            A matching mode. Use :code:`regex` for a regex match, and anything
257
            else for a string match.
258
        :param strict:
259
            Whether we should match any (:code:`False`) or all (:code:`True`).
260
        """
261

262
        matcher = any if not strict else all
×
263

264
        def handle_regex_match_mode(_req: requests.Response):
×
265
            if matcher(
×
266
                self.regex_helper.set_regex(x).match(_req.text, return_match=False)
267
                for x in matches
268
            ):
269
                method()
×
270

271
        def handle_string_match_mode(_req: requests.Response):
×
272
            if matcher(x in _req.text for x in matches):
×
273
                method()
×
274

275
        try:
×
276
            req = PyFunceble.factory.Requester.get(url, allow_redirects=allow_redirects)
×
277

278
            if match_mode == "regex":
×
279
                handle_regex_match_mode(req)
×
280
            else:
281
                handle_string_match_mode(req)
×
282
        except (
×
283
            PyFunceble.factory.Requester.exceptions.RequestException,
284
            PyFunceble.factory.Requester.exceptions.InvalidURL,
285
            PyFunceble.factory.Requester.exceptions.Timeout,
286
            PyFunceble.factory.Requester.exceptions.ConnectionError,
287
            PyFunceble.factory.Requester.urllib3_exceptions.InvalidHeader,
288
            socket.timeout,
289
        ):
290
            pass
×
291

292
        return self
×
293

294
    def do_on_header_match(
1✔
295
        self,
296
        url: str,
297
        matches: Dict[str, List[str]],
298
        *,
299
        method: Callable[..., "ExtraRuleHandlerBase"],
300
        match_mode: str = "regex",
301
        strict: bool = False,
302
        allow_redirects: bool = True,
303
    ) -> "ExtraRuleHandlerBase":
304
        """
305
        Make a request to the given :code:`url` and run the given :code:`method`,
306
        if one of the chosen header matches any of the given matches.
307

308
        :param url:
309
            The URL to query.
310
        :param matches:
311
            A dict representing the match.
312

313
            .. example::
314

315
                {
316
                    "Location": ["foo", "bar"] // try to match foo or bar
317
                }
318
        :param match_mode:
319
            A matching mode. Use :code:`regex` for a regex match, and anything
320
            else for a string match.
321
        :param strict:
322
            Whether we should match any (:code:`False`) or all (:code:`True`).
323
        :param allow_redirects:
324
            Whether we should allow redirect.
325
        """
326

327
        matcher = any if not strict else all
×
328

329
        def handle_regex_match_mode(_req: requests.Response):
×
330
            matches2search_result = {}
×
331

332
            for header, loc_matches in matches:
×
333
                matches2search_result[header] = False
×
334

335
                if header not in _req.headers:
×
336
                    continue
×
337

338
                if matcher(
×
339
                    self.regex_helper.set_regex(x).match(
340
                        _req.headers[header], return_match=False
341
                    )
342
                    for x in loc_matches
343
                ):
344
                    matches2search_result[header] = True
×
345
                    continue
×
346

347
            if matcher(x for x in matches2search_result.values()):
×
348
                method()
×
349

350
        def handle_string_match_mode(_req: requests.Response):
×
351
            matches2search_result = {}
×
352

353
            for header, loc_matches in matches.items():
×
354
                matches2search_result[header] = False
×
355

356
                if header not in _req.headers:
×
357
                    continue
×
358

359
                if matcher(x in _req.headers[header] for x in loc_matches):
×
360
                    matches2search_result[header] = True
×
361
                    continue
×
362

363
            if matcher(x for x in matches2search_result.values()):
×
364
                method()
×
365

366
        try:
×
367
            req = PyFunceble.factory.Requester.get(url, allow_redirects=allow_redirects)
×
368

369
            if match_mode == "regex":
×
370
                handle_regex_match_mode(req)
×
371
            else:
372
                handle_string_match_mode(req)
×
373
        except (
×
374
            PyFunceble.factory.Requester.exceptions.RequestException,
375
            PyFunceble.factory.Requester.exceptions.InvalidURL,
376
            PyFunceble.factory.Requester.exceptions.Timeout,
377
            PyFunceble.factory.Requester.exceptions.ConnectionError,
378
            PyFunceble.factory.Requester.urllib3_exceptions.InvalidHeader,
379
            socket.timeout,
380
        ):
381
            pass
×
382

383
        return self
×
384

385
    def do_dns_lookup(self, *, subject: str, query_type: str) -> List[str]:
1✔
386
        """
387
        Do a DNS lookup and return its response.
388

389
        :param subject:
390
            The subject to query.
391
        :param query_type:
392
            The query type.
393
        """
394

395
        return (
×
396
            self.dns_query_tool.set_query_record_type(query_type)
397
            .set_subject(subject)
398
            .query()
399
        )
400

401
    def start(self) -> "ExtraRuleHandlerBase":
1✔
402
        """
403
        Starts the gathering process.
404
        """
405

406
        raise NotImplementedError()
407

408
    def switch_to_down(self) -> "ExtraRuleHandlerBase":
1✔
409
        """
410
        Switches the status to inactive.
411
        """
412

413
        self.status.status_after_extra_rules = PyFunceble.storage.STATUS.down
×
414
        self.status.status_source_after_extra_rules = "SPECIAL"
×
415

416
        return self
×
417

418
    def switch_to_down_if_status_code(
1✔
419
        self, status_code: Union[int, List[int]]
420
    ) -> "ExtraRuleHandlerBase":
421
        """
422
        Switches the status to inactive if the caught status code matches one
423
        of the given one.
424
        """
425

426
        if not isinstance(status_code, (list, tuple)):
×
427
            status_code = [status_code]
×
428

429
        if any(self.status.http_status_code == x for x in status_code):
×
430
            self.switch_to_down()
×
431

432
        return self
×
433

434
    def switch_down_if_dns_match(
1✔
435
        self, query_type: str, matches: list
436
    ) -> "ExtraRuleHandlerBase":
437
        """
438
        Switches the status to inactive if the DNS query of the type :code:`query_type`
439
        matches any of the given :code:`matches`.
440

441
        :param query_type:
442
            A DNS query type.
443
        :param matches:
444
            A list of string (not regex) to match.
445
        """
446

447
        for record in (
×
448
            self.dns_query_tool.set_query_record_type(query_type)
449
            .set_subject(self.status.netloc)
450
            .query()
451
        ):
452
            for match in matches:
×
453
                if match in record:
×
454
                    self.switch_to_down()
×
455
                    break
×
456

457
        return self
×
458

459
    def switch_to_up(self) -> "ExtraRuleHandlerBase":
1✔
460
        """
461
        Switches the status to active.
462
        """
463

464
        self.status.status_after_extra_rules = PyFunceble.storage.STATUS.up
×
465
        self.status.status_source_after_extra_rules = "SPECIAL"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc