• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

funilrys / PyFunceble / 8439871604

26 Mar 2024 04:55PM UTC coverage: 94.856% (-0.04%) from 94.896%
8439871604

push

github

funilrys
Merge remote-tracking branch 'origin/dev' into col20

37 of 49 new or added lines in 10 files covered. (75.51%)

28 existing lines in 1 file now uncovered.

11415 of 12034 relevant lines covered (94.86%)

11.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.19
/PyFunceble/checker/availability/extras/base.py
1
"""
2
The tool to check the availability or syntax of domain, IP or URL.
3

4
::
5

6

7
    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
8
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
9
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
10
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
11
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
12
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
13

14
Provides the base of all extra handlers.
15

16
Author:
17
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
18

19
Special thanks:
20
    https://pyfunceble.github.io/#/special-thanks
21

22
Contributors:
23
    https://pyfunceble.github.io/#/contributors
24

25
Project link:
26
    https://github.com/funilrys/PyFunceble
27

28
Project documentation:
29
    https://pyfunceble.readthedocs.io/en/dev/
30

31
Project homepage:
32
    https://pyfunceble.github.io/
33

34
License:
35
::
36

37

38
    Copyright 2017, 2018, 2019, 2020, 2022, 2023, 2024 Nissar Chababy
39

40
    Licensed under the Apache License, Version 2.0 (the "License");
41
    you may not use this file except in compliance with the License.
42
    You may obtain a copy of the License at
43

44
        https://www.apache.org/licenses/LICENSE-2.0
45

46
    Unless required by applicable law or agreed to in writing, software
47
    distributed under the License is distributed on an "AS IS" BASIS,
48
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49
    See the License for the specific language governing permissions and
50
    limitations under the License.
51
"""
52

53
import functools
12✔
54
import socket
12✔
55
from typing import Callable, Dict, List, Optional, Union
12✔
56

57
import requests
12✔
58

59
import PyFunceble.factory
12✔
60
from PyFunceble.checker.availability.status import AvailabilityCheckerStatus
12✔
61
from PyFunceble.helpers.regex import RegexHelper
12✔
62
from PyFunceble.query.dns.query_tool import DNSQueryTool
12✔
63

64

65
class ExtraRuleHandlerBase:
12✔
66
    """
67
    Provides the base of all extra rules handler.
68

69
    :param statatus:
70
        The previously gathered status.
71
    :type status:
72
        :class:`~PyFunceble.checker.availability.status.AvailabilityCheckerStatus`
73
    """
74

75
    _status: Optional[AvailabilityCheckerStatus] = None
12✔
76
    req: Optional[requests.Response] = None
12✔
77
    dns_query_tool: Optional[DNSQueryTool] = None
12✔
78
    regex_helper: Optional[RegexHelper] = None
12✔
79

80
    def __init__(self, status: Optional[AvailabilityCheckerStatus] = None) -> None:
81
        if status is not None:
82
            self.status = status
83

84
        # Be sure that all settings are loaded proprely!!
85
        PyFunceble.factory.Requester.guess_all_settings()
86
        self.dns_query_tool = DNSQueryTool()
87
        self.regex_helper = RegexHelper()
88

89
    def ensure_status_is_given(
12✔
90
        func: Callable[..., "ExtraRuleHandlerBase"]
91
    ):  # pylint: disable=no-self-argument
92
        """
93
        Ensures that the status is given before running the decorated method.
94

95
        :raise TypeError:
96
            If the subject is not a string.
97
        """
98

99
        @functools.wraps(func)
12✔
100
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
101
            if not self.status:
102
                raise TypeError(
103
                    f"<self.status> should be {AvailabilityCheckerStatus}, "
104
                    f"{type(self.status)} given."
105
                )
106

107
            return func(self, *args, **kwargs)  # pylint: disable=not-callable
108

109
        return wrapper
12✔
110

111
    def setup_status_before(
12✔
112
        func: Callable[..., "ExtraRuleHandlerBase"]
113
    ):  # pylint: disable=no-self-argument
114
        """
115
        Ensures that the status is given before running the decorated method.
116

117
        :raise TypeError:
118
            If the subject is not a string.
119
        """
120

121
        @functools.wraps(func)
12✔
122
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
123
            self.status.status_before_extra_rules = self.status.status
124
            self.status.status_source_before_extra_rules = self.status.status_source
125

126
            return func(self, *args, **kwargs)  # pylint: disable=not-callable
127

128
        return wrapper
12✔
129

130
    def setup_status_after(
12✔
131
        func: Callable[..., "ExtraRuleHandlerBase"]
132
    ):  # pylint: disable=no-self-argument
133
        """
134
        Ensures that the status is given before running the decorated method.
135

136
        :raise TypeError:
137
            If the subject is not a string.
138
        """
139

140
        @functools.wraps(func)
12✔
141
        def wrapper(self, *args, **kwargs):  # pragma: no cover ## Safety!
142
            result = func(self, *args, **kwargs)  # pylint: disable=not-callable
143

144
            if self.status.status_after_extra_rules:
145
                self.status.status = self.status.status_after_extra_rules
146
                self.status.status_source = self.status.status_source_after_extra_rules
147

148
                PyFunceble.facility.Logger.info(
149
                    "Could define the status of %r from our own set of rules.",
150
                    self.status.idna_subject,
151
                )
152
            else:
153
                self.status.status_before_extra_rules = None
154
                self.status.status_source_before_extra_rules = None
155
                self.status.status_after_extra_rules = None
156
                self.status.status_source_after_extra_rules = None
157

158
            return result
159

160
        return wrapper
12✔
161

162
    @property
12✔
163
    def req_url(self) -> Optional[str]:
12✔
164
        """
165
        Provides a viable request URL.
166
        """
167

168
        if any(self.status.idna_subject.startswith(x) for x in ("http:", "https:")):
×
169
            return self.status.idna_subject
×
170
        return f"http://{self.status.idna_subject}:80"
×
171

172
    @property
12✔
173
    def req_url_https(self) -> Optional[str]:
12✔
174
        """
175
        Provides a viable request URL that default to an HTTPS URL.
176
        """
177

178
        if any(self.status.idna_subject.startswith(x) for x in ("http:", "https:")):
×
179
            return self.status.idna_subject
×
180
        return f"https://{self.status.idna_subject}:443"
×
181

182
    @property
12✔
183
    def status(self) -> Optional[AvailabilityCheckerStatus]:
12✔
184
        """
185
        Provides the current state of the :code:`_status` attribute.
186
        """
187

188
        return self._status
×
189

190
    @status.setter
12✔
191
    def status(self, value: AvailabilityCheckerStatus) -> None:
12✔
192
        """
193
        Sets the status to work with.
194

195
        :param value:
196
            The status to work with.
197

198
        :raise TypeError:
199
            When the given :code:`value` is not a
200
            :class:`~PyFunceble.checker.availability.status.AvailabilityCheckerStatus`.
201
        """
202

203
        if not isinstance(value, AvailabilityCheckerStatus):
×
204
            raise TypeError(
×
205
                f"<value> should be {AvailabilityCheckerStatus}, {type(value)} given."
206
            )
207

208
        self._status = value
×
209

210
    def set_status(self, value: AvailabilityCheckerStatus) -> "ExtraRuleHandlerBase":
12✔
211
        """
212
        Sets the status to work with.
213

214
        :param value:
215
            The status to work with.
216
        """
217

218
        self.status = value
×
219

220
        return self
×
221

222
    def do_request(self, *, allow_redirects: bool = True) -> requests.Response:
12✔
223
        """
224
        Do a request and store its response into the `req` attribute.
225

226
        :param bool allow_redirects:
227
            Whether we shoold follow the redirection - or not.
228
        """
229

230
        self.req = PyFunceble.factory.Requester.get(
×
231
            self.req_url, allow_redirects=allow_redirects
232
        )
233

234
        return self
×
235

236
    def do_on_body_match(
12✔
237
        self,
238
        url: str,
239
        matches: List[str],
240
        *,
241
        method: Callable[..., "ExtraRuleHandlerBase"],
242
        match_mode: str = "regex",
243
        strict: bool = False,
244
        allow_redirects: bool = False,
245
    ) -> "ExtraRuleHandlerBase":
246
        """
247
        Make a request to the given :code:`url` and run the given :code:`method`,
248
        if one of the given :code:`matches` matches.
249

250
        :param url:
251
            The URL to query.
252
        :param matches:
253
            A list of strings to match.
254
        :param match_mode:
255
            A matching mode. Use :code:`regex` for a regex match, and anything
256
            else for a string match.
257
        :param strict:
258
            Whether we should match any (:code:`False`) or all (:code:`True`).
259
        """
260

261
        matcher = any if not strict else all
×
262

263
        def handle_regex_match_mode(_req: requests.Response):
×
264
            if matcher(
×
265
                self.regex_helper.set_regex(x).match(_req.text, return_match=False)
266
                for x in matches
267
            ):
268
                method()
×
269

270
        def handle_string_match_mode(_req: requests.Response):
×
271
            if matcher(x in _req.text for x in matches):
×
272
                method()
×
273

274
        try:
×
275
            req = PyFunceble.factory.Requester.get(url, allow_redirects=allow_redirects)
×
276

277
            if match_mode == "regex":
×
278
                handle_regex_match_mode(req)
×
279
            else:
280
                handle_string_match_mode(req)
×
281
        except (
×
282
            PyFunceble.factory.Requester.exceptions.RequestException,
283
            PyFunceble.factory.Requester.exceptions.InvalidURL,
284
            PyFunceble.factory.Requester.exceptions.Timeout,
285
            PyFunceble.factory.Requester.exceptions.ConnectionError,
286
            PyFunceble.factory.Requester.urllib3_exceptions.InvalidHeader,
287
            socket.timeout,
288
        ):
289
            pass
×
290

291
        return self
×
292

293
    def do_on_header_match(
12✔
294
        self,
295
        url: str,
296
        matches: Dict[str, List[str]],
297
        *,
298
        method: Callable[..., "ExtraRuleHandlerBase"],
299
        match_mode: str = "regex",
300
        strict: bool = False,
301
        allow_redirects: bool = True,
302
    ) -> "ExtraRuleHandlerBase":
303
        """
304
        Make a request to the given :code:`url` and run the given :code:`method`,
305
        if one of the chosen header matches any of the given matches.
306

307
        :param url:
308
            The URL to query.
309
        :param matches:
310
            A dict representing the match.
311

312
            .. example::
313

314
                {
315
                    "Location": ["foo", "bar"] // try to match foo or bar
316
                }
317
        :param match_mode:
318
            A matching mode. Use :code:`regex` for a regex match, and anything
319
            else for a string match.
320
        :param strict:
321
            Whether we should match any (:code:`False`) or all (:code:`True`).
322
        :param allow_redirects:
323
            Whether we should allow redirect.
324
        """
325

326
        matcher = any if not strict else all
×
327

328
        def handle_regex_match_mode(_req: requests.Response):
×
329
            matches2search_result = {}
×
330

331
            for header, loc_matches in matches:
×
332
                matches2search_result[header] = False
×
333

334
                if header not in _req.headers:
×
335
                    continue
×
336

337
                if matcher(
×
338
                    self.regex_helper.set_regex(x).match(
339
                        _req.headers[header], return_match=False
340
                    )
341
                    for x in loc_matches
342
                ):
343
                    matches2search_result[header] = True
×
344
                    continue
×
345

346
            if matcher(x for x in matches2search_result.values()):
×
347
                method()
×
348

349
        def handle_string_match_mode(_req: requests.Response):
×
350
            matches2search_result = {}
×
351

352
            for header, loc_matches in matches.items():
×
353
                matches2search_result[header] = False
×
354

355
                if header not in _req.headers:
×
356
                    continue
×
357

358
                if matcher(x in _req.headers[header] for x in loc_matches):
×
359
                    matches2search_result[header] = True
×
360
                    continue
×
361

362
            if matcher(x for x in matches2search_result.values()):
×
363
                method()
×
364

365
        try:
×
366
            req = PyFunceble.factory.Requester.get(url, allow_redirects=allow_redirects)
×
367

368
            if match_mode == "regex":
×
369
                handle_regex_match_mode(req)
×
370
            else:
371
                handle_string_match_mode(req)
×
372
        except (
×
373
            PyFunceble.factory.Requester.exceptions.RequestException,
374
            PyFunceble.factory.Requester.exceptions.InvalidURL,
375
            PyFunceble.factory.Requester.exceptions.Timeout,
376
            PyFunceble.factory.Requester.exceptions.ConnectionError,
377
            PyFunceble.factory.Requester.urllib3_exceptions.InvalidHeader,
378
            socket.timeout,
379
        ):
380
            pass
×
381

382
        return self
×
383

384
    def do_dns_lookup(self, *, subject: str, query_type: str) -> List[str]:
12✔
385
        """
386
        Do a DNS lookup and return its response.
387

388
        :param subject:
389
            The subject to query.
390
        :param query_type:
391
            The query type.
392
        """
393

394
        return (
×
395
            self.dns_query_tool.set_query_record_type(query_type)
396
            .set_subject(subject)
397
            .query()
398
        )
399

400
    def start(self) -> "ExtraRuleHandlerBase":
12✔
401
        """
402
        Starts the gathering process.
403
        """
404

405
        raise NotImplementedError()
406

407
    def switch_to_down(self) -> "ExtraRuleHandlerBase":
12✔
408
        """
409
        Switches the status to inactive.
410
        """
411

412
        self.status.status_after_extra_rules = PyFunceble.storage.STATUS.down
×
413
        self.status.status_source_after_extra_rules = "SPECIAL"
×
414

415
        return self
×
416

417
    def switch_to_down_if_status_code(
12✔
418
        self, status_code: Union[int, List[int]]
419
    ) -> "ExtraRuleHandlerBase":
420
        """
421
        Switches the status to inactive if the caught status code matches one
422
        of the given one.
423
        """
424

NEW
425
        if not isinstance(status_code, (list, tuple, set)):
×
426
            status_code = [status_code]
×
427

428
        if any(self.status.http_status_code == x for x in status_code):
×
429
            self.switch_to_down()
×
430

431
        return self
×
432

433
    def switch_down_if_dns_match(
12✔
434
        self, query_type: str, matches: list
435
    ) -> "ExtraRuleHandlerBase":
436
        """
437
        Switches the status to inactive if the DNS query of the type :code:`query_type`
438
        matches any of the given :code:`matches`.
439

440
        :param query_type:
441
            A DNS query type.
442
        :param matches:
443
            A list of string (not regex) to match.
444
        """
445

446
        for record in (
×
447
            self.dns_query_tool.set_query_record_type(query_type)
448
            .set_subject(self.status.netloc)
449
            .query()
450
        ):
451
            for match in matches:
×
452
                if match in record:
×
453
                    self.switch_to_down()
×
454
                    break
×
455

456
        return self
×
457

458
    def switch_to_up(self) -> "ExtraRuleHandlerBase":
12✔
459
        """
460
        Switches the status to active.
461
        """
462

463
        self.status.status_after_extra_rules = PyFunceble.storage.STATUS.up
×
464
        self.status.status_source_after_extra_rules = "SPECIAL"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc