• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

funilrys / PyFunceble / 17046250819

18 Aug 2025 04:12PM UTC coverage: 96.648% (+1.9%) from 94.721%
17046250819

push

github

funilrys
Bump verstion to v4.3.0

1 of 1 new or added line in 1 file covered. (100.0%)

154 existing lines in 14 files now uncovered.

11967 of 12382 relevant lines covered (96.65%)

8.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.66
/PyFunceble/query/dns/query_tool.py
1
"""
2
The tool to check the availability or syntax of domain, IP or URL.
3

4
::
5

6

7
    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
8
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
9
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
10
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
11
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
12
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
13

14
Provides an interface for the query.
15

16
Author:
17
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
18

19
Special thanks:
20
    https://pyfunceble.github.io/#/special-thanks
21

22
Contributors:
23
    https://pyfunceble.github.io/#/contributors
24

25
Project link:
26
    https://github.com/funilrys/PyFunceble
27

28
Project documentation:
29
    https://docs.pyfunceble.com
30

31
Project homepage:
32
    https://pyfunceble.github.io/
33

34
License:
35
::
36

37

38
    Copyright 2017, 2018, 2019, 2020, 2022, 2023, 2024, 2025 Nissar Chababy
39

40
    Licensed under the Apache License, Version 2.0 (the "License");
41
    you may not use this file except in compliance with the License.
42
    You may obtain a copy of the License at
43

44
        https://www.apache.org/licenses/LICENSE-2.0
45

46
    Unless required by applicable law or agreed to in writing, software
47
    distributed under the License is distributed on an "AS IS" BASIS,
48
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49
    See the License for the specific language governing permissions and
50
    limitations under the License.
51
"""
52

53
# pylint: disable=too-many-lines
54

55
import copy
9✔
56
import functools
9✔
57
import ipaddress
9✔
58
import random
9✔
59
import socket
9✔
60
import time
9✔
61
from typing import Dict, List, Optional, Union
9✔
62

63
import dns.exception
9✔
64
import dns.message
9✔
65
import dns.name
9✔
66
import dns.query
9✔
67
import dns.rdataclass
9✔
68
import dns.rdatatype
9✔
69

70
import PyFunceble.facility
9✔
71
import PyFunceble.storage
9✔
72
from PyFunceble.query.dns.nameserver import Nameservers
9✔
73
from PyFunceble.query.record.dns import DNSQueryToolRecord
9✔
74

75

76
class DNSQueryTool:
9✔
77
    """
78
    Provides our query tool.
79
    """
80

81
    # pylint: disable=too-many-public-methods
82

83
    STD_PROTOCOL: str = "UDP"
9✔
84
    STD_TIMEOUT: float = 5.0
9✔
85
    STD_FOLLOW_NAMESERVER_ORDER: bool = True
9✔
86
    STD_TRUST_SERVER: bool = False
9✔
87
    STD_DELAY: float = 0.0
9✔
88

89
    SUPPORTED_PROTOCOL: List[str] = ["TCP", "UDP", "HTTPS", "TLS"]
9✔
90

91
    value2rdata_type: Dict[int, str] = {
9✔
92
        x.value: x.name for x in dns.rdatatype.RdataType
93
    }
94
    rdata_type2value: Dict[str, int] = {
9✔
95
        x.name: x.value for x in dns.rdatatype.RdataType
96
    }
97

98
    nameservers: Optional[Nameservers] = None
9✔
99
    _query_record_type: int = dns.rdatatype.RdataType.ANY
9✔
100

101
    _subject: Optional[str] = None
9✔
102
    _follow_nameserver_order: bool = True
9✔
103
    _preferred_protocol: str = "UDP"
9✔
104
    _query_timeout: float = 5.0
9✔
105
    _trust_server: bool = False
9✔
106
    _delay: float = 0.0
9✔
107

108
    dns_name: Optional[str] = None
9✔
109

110
    query_message: Optional[dns.message.QueryMessage] = None
9✔
111
    lookup_record: Optional[DNSQueryToolRecord] = None
9✔
112

113
    def __init__(
114
        self,
115
        *,
116
        nameservers: Optional[List[str]] = None,
117
        follow_nameserver_order: Optional[bool] = None,
118
        preferred_protocol: Optional[str] = None,
119
        trust_server: Optional[bool] = None,
120
        delay: Optional[bool] = None,
121
    ) -> None:
122
        self.nameservers = Nameservers()
1✔
123

124
        if preferred_protocol is not None:
1✔
125
            self.preferred_protocol = preferred_protocol
1✔
126
        else:
127
            self.guess_and_set_preferred_protocol()
1✔
128

129
        if nameservers is not None:
1✔
130
            self.nameservers.set_nameservers(nameservers)
1✔
131
        else:  # pragma: no cover ## I'm not playing with system resolver.
132
            self.nameservers.guess_and_set_nameservers()
133

134
        if follow_nameserver_order is not None:
1✔
135
            self.follow_nameserver_order = follow_nameserver_order
1✔
136
        else:
137
            self.guess_and_set_follow_nameserver_order()
1✔
138

139
        if trust_server is not None:
1✔
140
            self.trust_server = trust_server
1✔
141
        else:
142
            self.guess_and_set_trust_server()
1✔
143

144
        if delay is not None:
1✔
145
            self.delay = delay
1✔
146
        else:
147
            self.guess_and_set_delay()
1✔
148

149
    def prepare_query(func):  # pylint: disable=no-self-argument
9✔
150
        """
151
        Prepare the query after running the decorated method.
152
        """
153

154
        @functools.wraps(func)
9✔
155
        def wrapper(self, *args, **kwargs):
9✔
156
            result = func(self, *args, **kwargs)  # pylint: disable=not-callable
9✔
157

158
            if self.subject and self.query_record_type:
9✔
159
                self.dns_name = self.get_dns_name_from_subject_and_query_type()
9✔
160

161
                if self.dns_name:
9✔
162
                    self.query_message = dns.message.make_query(
9✔
163
                        self.dns_name, self.query_record_type
164
                    )
165
                else:
166
                    self.query_message = None
9✔
167

168
            return result
9✔
169

170
        return wrapper
9✔
171

172
    def update_lookup_record(func):  # pylint: disable=no-self-argument
9✔
173
        """
174
        Ensures that a clean record is generated after the execution of
175
        the decorated method.
176
        """
177

178
        @functools.wraps(func)
9✔
179
        def wrapper(self, *args, **kwargs):
9✔
180
            result = func(self, *args, **kwargs)  # pylint: disable=not-callable
9✔
181

182
            if self.lookup_record is None or self.subject != self.lookup_record.subject:
9✔
183
                self.lookup_record = DNSQueryToolRecord()
9✔
184
                self.lookup_record.subject = self.subject
9✔
185

186
            if self.dns_name != self.lookup_record.dns_name:
9✔
187
                self.lookup_record.dns_name = self.dns_name
9✔
188

189
            if (
9✔
190
                self.get_human_query_record_type()
191
                != self.lookup_record.query_record_type
192
            ):
193
                self.lookup_record.query_record_type = (
9✔
194
                    self.get_human_query_record_type()
195
                )
196

197
            if (
9✔
198
                self.follow_nameserver_order
199
                != self.lookup_record.follow_nameserver_order
200
            ):
201
                self.lookup_record.follow_nameserver_order = (
9✔
202
                    self.follow_nameserver_order
203
                )
204

205
            if self.query_timeout != self.lookup_record.query_timeout:
9✔
206
                self.lookup_record.query_timeout = self.query_timeout
9✔
207

208
            if self.preferred_protocol != self.lookup_record.preferred_protocol:
9✔
209
                self.lookup_record.preferred_protocol = self.preferred_protocol
9✔
210

211
            return result
9✔
212

213
        return wrapper
9✔
214

215
    def ensure_subject_is_given(func):  # pylint: disable=no-self-argument
9✔
216
        """
217
        Ensures that the subject to work with is given before running the
218
        decorated method.
219

220
        :raise TypeError:
221
            If :code:`self.subject` is not a :py:class:`str`.
222
        """
223

224
        @functools.wraps(func)
9✔
225
        def wrapper(self, *args, **kwargs):
9✔
226
            if not isinstance(self.subject, str):
9✔
227
                raise TypeError(
9✔
228
                    f"<self.subject> should be {str}, {type(self.subject)} given."
229
                )
230

231
            return func(self, *args, **kwargs)  # pylint: disable=not-callable
9✔
232

233
        return wrapper
9✔
234

235
    def update_lookup_record_response(func):  # pylint: disable=no-self-argument
9✔
236
        """
237
        Ensures that the response of the decorated method is set as response
238
        in our record.
239
        """
240

241
        @functools.wraps(func)
9✔
242
        def wrapper(self, *args, **kwargs):
9✔
243
            result = func(self, *args, **kwargs)  # pylint: disable=not-callable
9✔
244

245
            if result != self.lookup_record.response:
9✔
246
                self.lookup_record.response = result
9✔
247

248
            return result
9✔
249

250
        return wrapper
9✔
251

252
    def ignore_if_query_message_is_missing(func):  # pylint: disable=no-self-argument
9✔
253
        """
254
        Ignores the call to the decorated method if the query message is
255
        missing. Otherwise, return an empty list.
256
        """
257

258
        @functools.wraps(func)
9✔
259
        def wrapper(self, *args, **kwargs):
9✔
260
            if self.query_message:
9✔
261
                return func(self, *args, **kwargs)  # pylint: disable=not-callable
9✔
262
            return []  # pragma: no cover ## Safety
263

264
        return wrapper
9✔
265

266
    @ensure_subject_is_given
9✔
267
    def get_dns_name_from_subject_and_query_type(self):
9✔
268
        """
269
        Provides the dns name based on the current subject and query type.
270
        """
271

272
        try:
9✔
273
            if self.get_human_query_record_type().lower() == "ptr":
9✔
274
                try:
9✔
275
                    return dns.name.from_text(
9✔
276
                        ipaddress.ip_address(self.subject).reverse_pointer
277
                    )
278
                except ValueError:
9✔
279
                    return dns.name.from_text(self.subject)
9✔
280
            return dns.name.from_text(self.subject)
9✔
281
        except (
9✔
282
            dns.name.LabelTooLong,
283
            dns.name.EmptyLabel,
284
            dns.name.BadEscape,
285
            dns.name.NameTooLong,
286
        ):
287
            return None
9✔
288

289
    @property
9✔
290
    def subject(self) -> Optional[str]:
9✔
291
        """
292
        Provides the current state of the :code:`_subject` attribute.
293
        """
294

295
        return self._subject
9✔
296

297
    @subject.setter
9✔
298
    @prepare_query
9✔
299
    @update_lookup_record
9✔
300
    def subject(self, value: str) -> None:
9✔
301
        """
302
        Sets the subject to work with.
303

304
        :param value:
305
            The subject to set.
306

307
        :raise TypeError:
308
            When the given :code:`value` is not a :py:class:`str`.
309
        :raise ValueError:
310
            When the given :code:`value` is empty.
311
        """
312

313
        if not isinstance(value, str):
9✔
314
            raise TypeError(f"<value> should be {str}, {type(value)} given.")
9✔
315

316
        if not value:
9✔
317
            raise ValueError("<value> should not be empty.")
9✔
318

319
        self._subject = value
9✔
320

321
    def set_subject(self, value: str) -> "DNSQueryTool":
9✔
322
        """
323
        Sets the subject to work with.
324

325
        :param value:
326
            The subject to set.
327
        """
328

329
        self.subject = value
9✔
330

331
        return self
9✔
332

333
    def set_nameservers(
334
        self, value: List[str]
335
    ) -> "DNSQueryTool":  # pragma: no cover ## Underlying already tested.
336
        """
337
        Sets the nameservers to work with.
338

339
        :raise TypeError:
340
            When the given :code:`value` is not a list of :py:class:`str`.
341
        :raise ValueError:
342
            When the given :code:`value` is empty.
343
        """
344

345
        self.nameservers.set_nameservers(value)
346

347
        return self
348

349
    @property
9✔
350
    def follow_nameserver_order(self) -> bool:
9✔
351
        """
352
        Provides the current state of the :code:`_follow_nameserver_order`
353
        attribute.
354
        """
355

356
        return self._follow_nameserver_order
9✔
357

358
    @follow_nameserver_order.setter
9✔
359
    @update_lookup_record
9✔
360
    def follow_nameserver_order(self, value: bool) -> None:
9✔
361
        """
362
        Updates the :code:`follow_nameserver_order` variable.
363

364
        :param value:
365
            The value to set.
366

367
        :raise TypeError:
368
            When the given :code:`value` is not a :py:class:`bool`.
369
        """
370

371
        if not isinstance(value, bool):
9✔
372
            raise TypeError(f"<value> should be {bool}, {type(value)} given.")
9✔
373

374
        self._follow_nameserver_order = value
9✔
375

376
    def set_follow_nameserver_order(self, value: bool) -> "DNSQueryTool":
9✔
377
        """
378
        Updates the :code:`follow_nameserver_order` variable.
379

380
        :param value:
381
            The value to set.
382
        """
383

384
        self.follow_nameserver_order = value
9✔
385

386
        return self
9✔
387

388
    @property
9✔
389
    def query_record_type(self) -> int:
9✔
390
        """
391
        Provides the current state of the :code:`_query_record_type` attribute.
392
        """
393

394
        return self._query_record_type
9✔
395

396
    @query_record_type.setter
9✔
397
    @prepare_query
9✔
398
    @update_lookup_record
9✔
399
    def query_record_type(self, value: Union[str, int]) -> None:
9✔
400
        """
401
        Sets the DNS record type to query.
402

403
        :param value:
404
            The value to set. It can be the human version (e.g AAAA) or an
405
            integer as registered in the :code:`value2rdata_type` attribute.
406

407
        :raise TypeError:
408
            When the given :code:`value` is not a :py:class:`str` nor
409
            :py:class:`int`.
410
        :raise ValueError:
411
            When the given :code:`value` is unknown or unsupported.
412
        """
413

414
        if not isinstance(value, (str, int)):
9✔
415
            raise TypeError(f"<value> should be {int} or {str}, {type(value)} given.")
9✔
416

417
        if value in self.rdata_type2value:
9✔
418
            self._query_record_type = self.rdata_type2value[value]
9✔
419
        elif value in self.value2rdata_type:
9✔
420
            self._query_record_type = value
9✔
421
        else:
422
            raise ValueError(f"<value> ({value!r}) is unknown or unsupported.")
9✔
423

424
    def set_query_record_type(self, value: Union[str, int]) -> "DNSQueryTool":
9✔
425
        """
426
        Sets the DNS record type to query.
427

428
        :param value:
429
            The value to set. It can be the human version (e.g AAAA) or an
430
            integer as registered in the :code:`value2rdata_type` attribute.
431
        """
432

433
        self.query_record_type = value
9✔
434

435
        return self
9✔
436

437
    def get_human_query_record_type(self) -> str:
9✔
438
        """
439
        Provides the currently set record type.
440
        """
441

442
        return self.value2rdata_type[self.query_record_type]
9✔
443

444
    @property
9✔
445
    def query_timeout(self) -> float:
9✔
446
        """
447
        Provides the current state of the :code:`_query_timeout` attribute.
448
        """
449

450
        return self._query_timeout
9✔
451

452
    @query_timeout.setter
9✔
453
    @update_lookup_record
9✔
454
    def query_timeout(self, value: Union[int, float]) -> None:
9✔
455
        """
456
        Sets the timeout to apply.
457

458
        :param value:
459
            The timeout to apply.
460

461
        :raise TypeError:
462
            When the given :code:`value` is not a :py:class:`float`
463
            nor :py:class.`int`.
464
        """
465

466
        if not isinstance(value, (float, int)):
9✔
467
            raise TypeError(f"<value> should be {float} or {int}, {type(value)} given.")
9✔
468

469
        self._query_timeout = float(value)
9✔
470

471
    def set_timeout(self, value: Union[int, float]) -> "DNSQueryTool":
9✔
472
        """
473
        Sets the timeout to apply.
474

475
        :param value:
476
            The timeout to apply.
477
        """
478

479
        self.query_timeout = value
9✔
480

481
        return self
9✔
482

483
    @property
9✔
484
    def trust_server(self) -> Optional[bool]:
9✔
485
        """
486
        Provides the current state of the :code:`trust_server` attribute.
487
        """
488

489
        return self._trust_server
9✔
490

491
    @trust_server.setter
9✔
492
    def trust_server(self, value: bool) -> None:
9✔
493
        """
494
        Sets the value to apply.
495

496
        :param value:
497
            The value to apply.
498

499
        :raise TypeError:
500
            When the given :code:`value` is not a :py:class:`bool`.
501
        """
502

503
        if not isinstance(value, bool):
9✔
504
            raise TypeError(f"<value> should be {bool}, {type(value)} given.")
9✔
505

506
        self._trust_server = value
9✔
507

508
    def set_trust_server(self, value: bool) -> "DNSQueryTool":
9✔
509
        """
510
        Sets the value to apply.
511

512
        :param value:
513
            The value to apply.
514
        """
515

516
        self.trust_server = value
9✔
517

518
        return self
9✔
519

520
    def guess_and_set_timeout(self) -> "DNSQueryTool":
9✔
521
        """
522
        Try to guess and set the timeout.
523
        """
524

525
        if PyFunceble.facility.ConfigLoader.is_already_loaded():
9✔
526
            if PyFunceble.storage.CONFIGURATION.lookup.timeout:
9✔
527
                self.query_timeout = PyFunceble.storage.CONFIGURATION.lookup.timeout
9✔
528
            else:
529
                self.query_timeout = self.STD_TIMEOUT
9✔
530
        else:
531
            self.query_timeout = self.STD_TIMEOUT
9✔
532

533
        return self
9✔
534

535
    @property
9✔
536
    def preferred_protocol(self) -> Optional[str]:
9✔
537
        """
538
        Provides the current state of the :code:`_preferred_protocol` attribute.
539
        """
540

541
        return self._preferred_protocol
9✔
542

543
    @preferred_protocol.setter
9✔
544
    def preferred_protocol(self, value: str) -> None:
9✔
545
        """
546
        Sets the preferred protocol.
547

548
        :param value:
549
            The protocol to use.
550

551
        :raise TypeError:
552
            When the given :code:`value` is not a :py:class:`str`.
553
        :raise ValueError:
554
            When the given :code:`value` is unknown or unsupported.
555
        """
556

557
        if not isinstance(value, str):
9✔
558
            raise TypeError(f"<value> should be {str}, {type(value)} given.")
9✔
559

560
        value = value.upper()
9✔
561

562
        if value not in self.SUPPORTED_PROTOCOL:
9✔
563
            raise ValueError(
9✔
564
                f"<value> {value!r} is unknown or unsupported "
565
                f"(supported: {self.SUPPORTED_PROTOCOL!r})."
566
            )
567

568
        self._preferred_protocol = self.nameservers.protocol = value
9✔
569

570
    def set_preferred_protocol(self, value: str) -> "DNSQueryTool":
9✔
571
        """
572
        Sets the preferred protocol.
573

574
        :param value:
575
            The protocol to use.
576
        """
577

578
        self.preferred_protocol = value
9✔
579

580
        return self
9✔
581

582
    @property
9✔
583
    def delay(self) -> float:
9✔
584
        """
585
        Provides the current state of the :code:`_delay` attribute.
586
        """
587

588
        return self._delay
9✔
589

590
    @delay.setter
9✔
591
    @update_lookup_record
9✔
592
    def delay(self, value: Union[int, float]) -> None:
9✔
593
        """
594
        Sets the delay to apply.
595

596
        :param value:
597
            The delay to apply.
598

599
        :raise TypeError:
600
            When the given :code:`value` is not a :py:class:`float`
601
            nor :py:class.`int`.
602
        :raise ValueError:
603
            When the given :code:`value` is not a positive.
604
        """
605

606
        if not isinstance(value, (float, int)):
9✔
607
            raise TypeError(f"<value> should be {float} or {int}, {type(value)} given.")
9✔
608

609
        if value < 0:
9✔
610
            raise ValueError(f"<value> should be positive, {value} given.")
9✔
611

612
        self._delay = float(value)
9✔
613

614
    def set_delay(self, value: Union[int, float]) -> "DNSQueryTool":
9✔
615
        """
616
        Sets the delay to apply.
617

618
        :param value:
619
            The delay to apply.
620
        """
621

622
        self.delay = value
9✔
623

624
        return self
9✔
625

626
    def guess_and_set_preferred_protocol(self) -> "DNSQueryTool":
9✔
627
        """
628
        Try to guess and set the preferred procol.
629
        """
630

631
        if PyFunceble.facility.ConfigLoader.is_already_loaded():
9✔
632
            if isinstance(PyFunceble.storage.CONFIGURATION.dns.protocol, str):
9✔
633
                self.preferred_protocol = PyFunceble.storage.CONFIGURATION.dns.protocol
9✔
634
            else:
635
                self.preferred_protocol = self.STD_PROTOCOL
9✔
636
        else:
637
            self.preferred_protocol = self.STD_PROTOCOL
9✔
638

639
        return self
9✔
640

641
    def guess_and_set_follow_nameserver_order(self) -> "DNSQueryTool":
9✔
642
        """
643
        Try to guess and authorize the mix of the nameserver before each
644
        query.
645
        """
646

647
        if PyFunceble.facility.ConfigLoader.is_already_loaded():
9✔
648
            if isinstance(
9✔
649
                PyFunceble.storage.CONFIGURATION.dns.follow_server_order, bool
650
            ):
651
                self.follow_nameserver_order = (
9✔
652
                    PyFunceble.storage.CONFIGURATION.dns.follow_server_order
653
                )
654
            else:
655
                self.follow_nameserver_order = self.STD_FOLLOW_NAMESERVER_ORDER
9✔
656
        else:
657
            self.follow_nameserver_order = self.STD_FOLLOW_NAMESERVER_ORDER
9✔
658

659
        return self
9✔
660

661
    def guess_and_set_trust_server(self) -> "DNSQueryTool":
9✔
662
        """
663
        Try to guess and set the trust flag.
664
        """
665

666
        if PyFunceble.facility.ConfigLoader.is_already_loaded():
9✔
667
            if isinstance(PyFunceble.storage.CONFIGURATION.dns.trust_server, bool):
9✔
668
                self.trust_server = PyFunceble.storage.CONFIGURATION.dns.trust_server
9✔
669
            else:
670
                self.trust_server = self.STD_TRUST_SERVER
9✔
671
        else:
672
            self.trust_server = self.STD_TRUST_SERVER
9✔
673

674
        return self
9✔
675

676
    def guess_and_set_delay(self) -> "DNSQueryTool":
9✔
677
        """
678
        Try to guess and set the delay to apply.
679
        """
680

681
        if PyFunceble.facility.ConfigLoader.is_already_loaded():
9✔
682
            if PyFunceble.storage.CONFIGURATION.dns.delay:
9✔
683
                self.delay = PyFunceble.storage.CONFIGURATION.dns.delay
9✔
684
            else:
685
                self.delay = self.STD_DELAY
9✔
686
        else:
687
            self.delay = self.STD_DELAY
9✔
688

689
    def guess_all_settings(
690
        self,
691
    ) -> "DNSQueryTool":  # pragma: no cover ## Method themselves are more important
692
        """
693
        Try to guess all settings.
694
        """
695

696
        to_ignore = ["guess_all_settings"]
697

698
        for method in dir(self):
699
            if method in to_ignore or not method.startswith("guess_"):
700
                continue
701

702
            getattr(self, method)()
703

704
        return self
705

706
    def get_lookup_record(
9✔
707
        self,
708
    ) -> Optional[DNSQueryToolRecord]:
709
        """
710
        Provides the current query record.
711
        """
712

713
        return self.lookup_record
9✔
714

715
    def _get_result_from_response(
716
        self, response: dns.message.Message
717
    ) -> List[str]:  # pragma: no cover ## This just reads upstream result
718
        """
719
        Given a response, we return the best possible result.
720
        """
721

722
        result = []
723

724
        rrset = response.get_rrset(
725
            response.answer,
726
            self.dns_name,
727
            dns.rdataclass.RdataClass.IN,
728
            self.query_record_type,
729
        )
730

731
        if rrset:
732
            result.extend([x.to_text() for x in rrset])
733

734
        PyFunceble.facility.Logger.debug("Result from response:\r%r", result)
735

736
        return result
737

738
    def _mix_order(
739
        self, data: Union[dict, List[str]]
740
    ) -> Union[dict, List[str]]:  # pragma: no cover ## Just a shuffle :-)
741
        """
742
        Given a dataset, we mix its order.
743
        """
744

745
        dataset = copy.deepcopy(data)
746

747
        if not self.follow_nameserver_order:
748
            if isinstance(dataset, list):
749
                random.shuffle(dataset)
750

751
                return dataset
752

753
            if isinstance(dataset, dict):
754
                temp = list(dataset.items())
755
                random.shuffle(temp)
756

757
                return dict(temp)
758

759
        PyFunceble.facility.Logger.debug("Mixed data:\n%r", dataset)
760
        return dataset
761

762
    def _query_protocol(self, protocol: str) -> Optional[List[str]]:
9✔
763
        """
764
        Given a protocol, we query the nameservers with or through it.
765

766
        :param str protocol:
767
            The protocol to use for the query.
768

769
            It can be one of the following:
770
                - "TCP"
771
                - "UDP"
772
                - "HTTPS"
773
                - "TLS"
774
        :return:
775
            The result of the query, or an empty list if the query failed.
776
        """
777

778
        self.lookup_record.used_protocol = protocol.upper()
9✔
779

780
        if self.lookup_record.used_protocol not in self.SUPPORTED_PROTOCOL:
9✔
UNCOV
781
            raise ValueError(
×
782
                f"<protocol> ({self.lookup_record.used_protocol!r}) is unknown or "
783
                f"unsupported (supported: {self.SUPPORTED_PROTOCOL!r})."
784
            )
785

786
        result = set()
9✔
787

788
        for nameserver, port in self._mix_order(
9✔
789
            self.nameservers.get_nameserver_ports()
790
        ).items():
791
            PyFunceble.facility.Logger.debug(
792
                "Started to query information of %r from %r (%s)",
793
                self.subject,
794
                nameserver,
795
                self.lookup_record.used_protocol,
796
            )
797

798
            if self.lookup_record.used_protocol == "HTTPS" and port == 53:
9✔
799
                # Default port for nameserver class is 53. So we ensure we
800
                # overwrite with our own default.
801
                port = 443
9✔
802

803
            if self.lookup_record.used_protocol == "TLS" and port == 53:
9✔
804
                # Default port for nameserver class is 53. So we ensure we
805
                # overwrite with our own default.
806
                port = 853
9✔
807

808
            try:
9✔
809
                response = getattr(dns.query, protocol.lower())(
9✔
810
                    self.query_message,
811
                    nameserver,
812
                    port=port,
813
                    timeout=self.query_timeout,
814
                )
815

816
                local_result = self._get_result_from_response(response)
9✔
817

818
                if local_result:
9✔
819
                    result.update(local_result)
9✔
820

821
                    self.lookup_record.nameserver = nameserver
9✔
822
                    self.lookup_record.port = port
9✔
823

824
                    PyFunceble.facility.Logger.debug(
825
                        "Successfully queried information of %r from %r. (%s)",
826
                        self.subject,
827
                        nameserver,
828
                        self.lookup_record.used_protocol,
829
                    )
830

831
                if self.trust_server:  # pragma: no cover: Per case.
832
                    break
833
            except (dns.exception.Timeout, socket.error):
9✔
834
                # Example: Resource temporarily unavailable.
835
                pass
9✔
836
            except dns.query.UnexpectedSource:
9✔
837
                # Example: got a response from XXX instead of XXX.
838
                pass
9✔
839
            except dns.query.BadResponse:
9✔
840
                # Example: A DNS query response does not respond to the question
841
                # asked.
842
                pass
9✔
843
            except ValueError:
9✔
844
                # Example: Input is malformed.
845
                break
9✔
846

847
            PyFunceble.facility.Logger.debug(
848
                "Unsuccessfully queried information of %r from %r (%s). Sleeping %fs.",
849
                self.subject,
850
                nameserver,
851
                self.lookup_record.used_protocol,
852
                self.delay,
853
            )
854

855
            time.sleep(self.delay)
9✔
856

857
        return list(result)
9✔
858

859
    @ensure_subject_is_given
9✔
860
    @ignore_if_query_message_is_missing
9✔
861
    @update_lookup_record_response
9✔
862
    def tcp(
9✔
863
        self,
864
    ) -> Optional[List[str]]:
865
        """
866
        Request the chosen record through the TCP protocol.
867
        """
868

869
        return self._query_protocol("TCP")
9✔
870

871
    @ensure_subject_is_given
9✔
872
    @ignore_if_query_message_is_missing
9✔
873
    @update_lookup_record_response
9✔
874
    def udp(
9✔
875
        self,
876
    ) -> Optional[List[str]]:
877
        """
878
        Request the chosen record through the UTP protocol.
879
        """
880

881
        return self._query_protocol("UDP")
9✔
882

883
    @ensure_subject_is_given
9✔
884
    @ignore_if_query_message_is_missing
9✔
885
    @update_lookup_record_response
9✔
886
    def https(
9✔
887
        self,
888
    ) -> Optional[List[str]]:
889
        """
890
        Request the chosen record through the https protocol.
891
        """
892

893
        return self._query_protocol("HTTPS")
9✔
894

895
    @ensure_subject_is_given
9✔
896
    @ignore_if_query_message_is_missing
9✔
897
    @update_lookup_record_response
9✔
898
    def tls(
9✔
899
        self,
900
    ) -> Optional[List[str]]:
901
        """
902
        Request the chosen record through the TLS protocol.
903
        """
904

905
        return self._query_protocol("TLS")
9✔
906

907
    def query(
9✔
908
        self,
909
    ) -> Optional[List[str]]:
910
        """
911
        Process the query based on the preferred protocol.
912
        """
913

914
        return getattr(self, self.preferred_protocol.lower())()
9✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc