• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / ed350f2e-b6c3-4bef-a7fa-04255aec4056

03 Jun 2025 05:34PM UTC coverage: 86.768% (+0.04%) from 86.729%
ed350f2e-b6c3-4bef-a7fa-04255aec4056

push

circleci

web-flow
CloudFormation v2 Engine: Base Support for Fn::Base64 (#12700)

20 of 22 new or added lines in 3 files covered. (90.91%)

185 existing lines in 14 files now uncovered.

65077 of 75001 relevant lines covered (86.77%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.61
/localstack-core/localstack/dns/server.py
1
import argparse
1✔
2
import copy
1✔
3
import logging
1✔
4
import os
1✔
5
import re
1✔
6
import textwrap
1✔
7
import threading
1✔
8
from datetime import datetime
1✔
9
from functools import cache
1✔
10
from ipaddress import IPv4Address, IPv4Interface
1✔
11
from pathlib import Path
1✔
12
from socket import AddressFamily
1✔
13
from typing import Iterable, Literal, Tuple
1✔
14

15
import psutil
1✔
16
from cachetools import TTLCache, cached
1✔
17
from dnslib import (
1✔
18
    AAAA,
19
    CNAME,
20
    MX,
21
    NS,
22
    QTYPE,
23
    RCODE,
24
    RD,
25
    RDMAP,
26
    RR,
27
    SOA,
28
    TXT,
29
    A,
30
    DNSHeader,
31
    DNSLabel,
32
    DNSQuestion,
33
    DNSRecord,
34
)
35
from dnslib.server import DNSHandler, DNSServer
1✔
36
from psutil._common import snicaddr
1✔
37

38
import dns.flags
1✔
39
import dns.message
1✔
40
import dns.query
1✔
41
from dns.exception import Timeout
1✔
42

43
# Note: avoid adding additional imports here, to avoid import issues when running the CLI
44
from localstack import config
1✔
45
from localstack.constants import LOCALHOST_HOSTNAME, LOCALHOST_IP
1✔
46
from localstack.dns.models import (
1✔
47
    AliasTarget,
48
    DnsServerProtocol,
49
    DynamicRecord,
50
    NameRecord,
51
    RecordType,
52
    SOARecord,
53
    TargetRecord,
54
)
55
from localstack.services.edge import run_module_as_sudo
1✔
56
from localstack.utils import iputils
1✔
57
from localstack.utils.net import Port, port_can_be_bound
1✔
58
from localstack.utils.platform import in_docker
1✔
59
from localstack.utils.serving import Server
1✔
60
from localstack.utils.strings import to_bytes, to_str
1✔
61
from localstack.utils.sync import sleep_forever
1✔
62

63
EPOCH = datetime(1970, 1, 1)
1✔
64
SERIAL = int((datetime.utcnow() - EPOCH).total_seconds())
1✔
65

66
DEFAULT_FALLBACK_DNS_SERVER = "8.8.8.8"
1✔
67
FALLBACK_DNS_LOCK = threading.RLock()
1✔
68
VERIFICATION_DOMAIN = config.DNS_VERIFICATION_DOMAIN
1✔
69

70
RCODE_REFUSED = 5
1✔
71

72
DNS_SERVER: "DnsServerProtocol" = None
1✔
73
PREVIOUS_RESOLV_CONF_FILE: str | None = None
1✔
74

75
REQUEST_TIMEOUT_SECS = 7
1✔
76

77
TYPE_LOOKUP = {
1✔
78
    A: QTYPE.A,
79
    AAAA: QTYPE.AAAA,
80
    CNAME: QTYPE.CNAME,
81
    MX: QTYPE.MX,
82
    NS: QTYPE.NS,
83
    SOA: QTYPE.SOA,
84
    TXT: QTYPE.TXT,
85
}
86

87
LOG = logging.getLogger(__name__)
1✔
88

89
THREAD_LOCAL = threading.local()
1✔
90

91
# Type of the value given by DNSHandler.client_address
92
# in the form (ip, port) e.g. ("127.0.0.1", 58291)
93
ClientAddress = Tuple[str, int]
1✔
94

95
psutil_cache = TTLCache(maxsize=100, ttl=10)
1✔
96

97

98
# TODO: update route53 provider to use this util
99
def normalise_dns_name(name: DNSLabel | str) -> str:
1✔
100
    name = str(name)
1✔
101
    if not name.endswith("."):
1✔
102
        return f"{name}."
1✔
103

104
    return name
1✔
105

106

107
@cached(cache=psutil_cache)
1✔
108
def list_network_interface_details() -> dict[str, list[snicaddr]]:
1✔
109
    return psutil.net_if_addrs()
1✔
110

111

112
class Record:
1✔
113
    def __init__(self, rdata_type, *args, **kwargs):
1✔
114
        rtype = kwargs.get("rtype")
1✔
115
        rname = kwargs.get("rname")
1✔
116
        ttl = kwargs.get("ttl")
1✔
117

118
        if isinstance(rdata_type, RD):
1✔
119
            # actually an instance, not a type
120
            self._rtype = TYPE_LOOKUP[rdata_type.__class__]
×
121
            rdata = rdata_type
×
122
        else:
123
            self._rtype = TYPE_LOOKUP[rdata_type]
1✔
124
            if rdata_type == SOA and len(args) == 2:
1✔
125
                # add sensible times to SOA
126
                args += (
1✔
127
                    (
128
                        SERIAL,  # serial number
129
                        60 * 60 * 1,  # refresh
130
                        60 * 60 * 3,  # retry
131
                        60 * 60 * 24,  # expire
132
                        60 * 60 * 1,  # minimum
133
                    ),
134
                )
135
            rdata = rdata_type(*args)
1✔
136

137
        if rtype:
1✔
138
            self._rtype = rtype
×
139
        self._rname = rname
1✔
140
        self.kwargs = dict(rdata=rdata, ttl=self.sensible_ttl() if ttl is None else ttl, **kwargs)
1✔
141

142
    def try_rr(self, q):
1✔
143
        if q.qtype == QTYPE.ANY or q.qtype == self._rtype:
1✔
144
            return self.as_rr(q.qname)
1✔
145

146
    def as_rr(self, alt_rname):
1✔
147
        return RR(rname=self._rname or alt_rname, rtype=self._rtype, **self.kwargs)
1✔
148

149
    def sensible_ttl(self):
1✔
150
        if self._rtype in (QTYPE.NS, QTYPE.SOA):
1✔
151
            return 60 * 60 * 24
1✔
152
        else:
153
            return 300
1✔
154

155
    @property
1✔
156
    def is_soa(self):
1✔
157
        return self._rtype == QTYPE.SOA
1✔
158

159
    def __str__(self):
1✔
160
        return f"{QTYPE[self._rtype]}({self.kwargs})"
×
161

162
    def __repr__(self):
163
        return self.__str__()
164

165

166
class RecordConverter:
1✔
167
    """
168
    Handles returning the correct DNS record for the stored name_record.
169

170
    Particularly, if the record is a DynamicRecord, then perform dynamic IP address lookup.
171
    """
172

173
    def __init__(self, request: DNSRecord, client_address: ClientAddress):
1✔
174
        self.request = request
1✔
175
        self.client_address = client_address
1✔
176

177
    def to_record(self, name_record: NameRecord) -> Record:
1✔
178
        """
179
        :param name_record: Internal representation of the name entry
180
        :return: Record type for the associated name record
181
        """
182
        match name_record:
1✔
183
            case TargetRecord(target=target, record_type=record_type):
1✔
184
                return Record(RDMAP.get(record_type.name), target)
1✔
185
            case SOARecord(m_name=m_name, r_name=r_name, record_type=_):
1✔
186
                return Record(SOA, m_name, r_name)
1✔
187
            case DynamicRecord(record_type=record_type):
1✔
188
                # Marker indicating that the target of the domain name lookup should be resolved
189
                # dynamically at query time to the most suitable LocalStack container IP address
190
                ip = self._determine_best_ip()
1✔
191
                # TODO: be more dynamic with IPv6
192
                if record_type == RecordType.AAAA:
1✔
193
                    ip = "::1"
1✔
194
                return Record(RDMAP.get(record_type.name), ip)
1✔
195
            case _:
×
196
                raise NotImplementedError(f"Record type '{type(name_record)}' not implemented")
197

198
    def _determine_best_ip(self) -> str:
1✔
199
        client_ip, _ = self.client_address
1✔
200
        # allow for overriding if required
201
        if config.DNS_RESOLVE_IP != LOCALHOST_IP:
1✔
202
            return config.DNS_RESOLVE_IP
1✔
203

204
        # Look up best matching ip address for the client
205
        interfaces = self._fetch_interfaces()
1✔
206
        for interface in interfaces:
1✔
207
            subnet = interface.network
1✔
208
            ip_address = IPv4Address(client_ip)
1✔
209
            if ip_address in subnet:
1✔
210
                # check if the request has come from the gateway or not. If so
211
                # assume the request has come from the host, and return
212
                # 127.0.0.1
213
                if config.is_in_docker and self._is_gateway(ip_address):
1✔
214
                    return LOCALHOST_IP
×
215

216
                return str(interface.ip)
1✔
217

218
        # no best solution found
219
        LOG.warning(
×
220
            "could not determine subnet-matched IP address for %s, falling back to %s",
221
            self.request.q.qname,
222
            LOCALHOST_IP,
223
        )
224
        return LOCALHOST_IP
×
225

226
    @staticmethod
1✔
227
    def _is_gateway(ip: IPv4Address) -> bool:
1✔
228
        """
229
        Look up the gateways that this contianer has, and return True if the
230
        supplied ip address is in that list.
231
        """
232
        return ip == iputils.get_default_gateway()
1✔
233

234
    @staticmethod
1✔
235
    def _fetch_interfaces() -> Iterable[IPv4Interface]:
1✔
236
        interfaces = list_network_interface_details()
1✔
237
        for _, addresses in interfaces.items():
1✔
238
            for address in addresses:
1✔
239
                if address.family != AddressFamily.AF_INET:
1✔
240
                    # TODO: IPv6
241
                    continue
1✔
242

243
                # argument is of the form e.g. 127.0.0.1/255.0.0.0
244
                net = IPv4Interface(f"{address.address}/{address.netmask}")
1✔
245
                yield net
1✔
246

247

248
class NonLoggingHandler(DNSHandler):
1✔
249
    """Subclass of DNSHandler that avoids logging to stdout on error"""
250

251
    def handle(self, *args, **kwargs):
1✔
252
        try:
1✔
253
            THREAD_LOCAL.client_address = self.client_address
1✔
254
            THREAD_LOCAL.server = self.server
1✔
255
            THREAD_LOCAL.request = self.request
1✔
256
            return super(NonLoggingHandler, self).handle(*args, **kwargs)
1✔
257
        except Exception:
×
258
            pass
×
259

260

261
# List of unique non-subdomain prefixes (e.g., data-) from endpoint.hostPrefix in the botocore specs.
262
# Subdomain-prefixes (e.g., api.) work properly unless DNS rebind protection blocks DNS resolution, but
263
# these `-` dash-prefixes require special consideration.
264
# IMPORTANT: Adding a new host prefix here requires deploying a public DNS entry to ensure proper DNS resolution for
265
# such non-dot prefixed domains (e.g., data-localhost.localstack.cloud)
266
# LIMITATION: As of 2025-05-26, only used prefixes are deployed to our public DNS, including `sync-` and `data-`
267
HOST_PREFIXES_NO_SUBDOMAIN = [
1✔
268
    "analytics-",
269
    "control-storage-",
270
    "data-",
271
    "query-",
272
    "runtime-",
273
    "storage-",
274
    "streaming-",
275
    "sync-",
276
    "tags-",
277
    "workflows-",
278
]
279
HOST_PREFIX_NAME_PATTERNS = [
1✔
280
    f"{host_prefix}{LOCALHOST_HOSTNAME}" for host_prefix in HOST_PREFIXES_NO_SUBDOMAIN
281
]
282

283
NAME_PATTERNS_POINTING_TO_LOCALSTACK = [
1✔
284
    f".*{LOCALHOST_HOSTNAME}",
285
    *HOST_PREFIX_NAME_PATTERNS,
286
]
287

288

289
def exclude_from_resolution(domain_regex: str):
1✔
290
    """
291
    Excludes the given domain pattern from being resolved to LocalStack.
292
    Currently only works in docker, since in host mode dns is started as separate process
293
    :param domain_regex: Domain regex string
294
    """
UNCOV
295
    if DNS_SERVER:
×
UNCOV
296
        DNS_SERVER.add_skip(domain_regex)
×
297

298

299
def revert_exclude_from_resolution(domain_regex: str):
1✔
300
    """
301
    Reverts the exclusion of the given domain pattern
302
    :param domain_regex: Domain regex string
303
    """
UNCOV
304
    try:
×
UNCOV
305
        if DNS_SERVER:
×
UNCOV
306
            DNS_SERVER.delete_skip(domain_regex)
×
UNCOV
307
    except ValueError:
×
UNCOV
308
        pass
×
309

310

311
def _should_delete_zone(record_to_delete: NameRecord, record_to_check: NameRecord):
1✔
312
    """
313
    Helper function to check if we should delete the record_to_check from the list we are iterating over
314
    :param record_to_delete: Record which we got from the delete request
315
    :param record_to_check: Record to be checked if it should be included in the records after delete
316
    :return:
317
    """
318
    if record_to_delete == record_to_check:
1✔
319
        return True
1✔
320
    return (
1✔
321
        record_to_delete.record_type == record_to_check.record_type
322
        and record_to_delete.record_id == record_to_check.record_id
323
    )
324

325

326
def _should_delete_alias(alias_to_delete: AliasTarget, alias_to_check: AliasTarget):
1✔
327
    """
328
    Helper function to check if we should delete the alias_to_check from the list we are iterating over
329
    :param alias_to_delete: Alias which we got from the delete request
330
    :param alias_to_check: Alias to be checked if it should be included in the records after delete
331
    :return:
332
    """
333
    return alias_to_delete.alias_id == alias_to_check.alias_id
1✔
334

335

336
class NoopLogger:
1✔
337
    """
338
    Necessary helper class to avoid logging of any dns records by dnslib
339
    """
340

341
    def __init__(self, *args, **kwargs):
1✔
342
        pass
1✔
343

344
    def log_pass(self, *args, **kwargs):
1✔
UNCOV
345
        pass
×
346

347
    def log_prefix(self, *args, **kwargs):
1✔
UNCOV
348
        pass
×
349

350
    def log_recv(self, *args, **kwargs):
1✔
351
        pass
1✔
352

353
    def log_send(self, *args, **kwargs):
1✔
354
        pass
1✔
355

356
    def log_request(self, *args, **kwargs):
1✔
357
        pass
1✔
358

359
    def log_reply(self, *args, **kwargs):
1✔
360
        pass
1✔
361

362
    def log_truncated(self, *args, **kwargs):
1✔
UNCOV
363
        pass
×
364

365
    def log_error(self, *args, **kwargs):
1✔
UNCOV
366
        pass
×
367

368
    def log_data(self, *args, **kwargs):
1✔
UNCOV
369
        pass
×
370

371

372
class Resolver(DnsServerProtocol):
1✔
373
    # Upstream DNS server
374
    upstream_dns: str
1✔
375
    # List of patterns which will be skipped for local resolution and always forwarded to upstream
376
    skip_patterns: list[str]
1✔
377
    # Dict of zones: (domain name or pattern) -> list[dns records]
378
    zones: dict[str, list[NameRecord]]
1✔
379
    # Alias map (source_name, record_type) => target_name (target name then still has to be resolved!)
380
    aliases: dict[tuple[DNSLabel, RecordType], list[AliasTarget]]
1✔
381
    # Lock to prevent issues due to concurrent modifications
382
    lock: threading.RLock
1✔
383

384
    def __init__(self, upstream_dns: str):
1✔
385
        self.upstream_dns = upstream_dns
1✔
386
        self.skip_patterns = []
1✔
387
        self.zones = {}
1✔
388
        self.aliases = {}
1✔
389
        self.lock = threading.RLock()
1✔
390

391
    def resolve(self, request: DNSRecord, handler: DNSHandler) -> DNSRecord | None:
1✔
392
        """
393
        Resolve a given request, by either checking locally registered records, or forwarding to the defined
394
        upstream DNS server.
395

396
        :param request: DNS Request
397
        :param handler: Unused.
398
        :return: DNS Reply
399
        """
400
        reply = request.reply()
1✔
401
        found = False
1✔
402

403
        try:
1✔
404
            if not self._skip_local_resolution(request):
1✔
405
                found = self._resolve_name(request, reply, handler.client_address)
1✔
406
        except Exception as e:
×
407
            LOG.info("Unable to get DNS result: %s", e)
×
408

409
        if found:
1✔
410
            return reply
1✔
411

412
        # If we did not find a matching record in our local zones, we forward to our upstream dns
413
        try:
1✔
414
            req_parsed = dns.message.from_wire(bytes(request.pack()))
1✔
415
            r = dns.query.udp(req_parsed, self.upstream_dns, timeout=REQUEST_TIMEOUT_SECS)
1✔
416
            result = self._map_response_dnspython_to_dnslib(r)
1✔
417
            return result
1✔
UNCOV
418
        except Exception as e:
×
UNCOV
419
            LOG.info(
×
420
                "Unable to get DNS result from upstream server %s for domain %s: %s",
421
                self.upstream_dns,
422
                str(request.q.qname),
423
                e,
424
            )
425

426
        # if we cannot reach upstream dns, return SERVFAIL
UNCOV
427
        if not reply.rr and reply.header.get_rcode == RCODE.NOERROR:
×
428
            # setting this return code will cause commands like 'host' to try the next nameserver
UNCOV
429
            reply.header.set_rcode(RCODE.SERVFAIL)
×
UNCOV
430
            return None
×
431

UNCOV
432
        return reply
×
433

434
    def _skip_local_resolution(self, request) -> bool:
1✔
435
        """
436
        Check whether we should skip local resolution for the given request, and directly contact upstream
437

438
        :param request: DNS Request
439
        :return: Whether the request local resolution should be skipped
440
        """
441
        request_name = to_str(str(request.q.qname))
1✔
442
        for p in self.skip_patterns:
1✔
443
            if re.match(p, request_name):
1✔
444
                return True
1✔
445
        return False
1✔
446

447
    def _resolve_alias(
1✔
448
        self, request: DNSRecord, reply: DNSRecord, client_address: ClientAddress
449
    ) -> bool:
450
        if request.q.qtype in (QTYPE.A, QTYPE.AAAA, QTYPE.CNAME):
1✔
451
            key = (DNSLabel(to_bytes(request.q.qname)), RecordType[QTYPE[request.q.qtype]])
1✔
452
            # check if we have aliases defined for our given qname/qtype pair
453
            if aliases := self.aliases.get(key):
1✔
454
                for alias in aliases:
1✔
455
                    # if there is no health check, or the healthcheck is successful, we will consider this alias
456
                    # take the first alias passing this check
457
                    if not alias.health_check or alias.health_check():
1✔
458
                        request_copy: DNSRecord = copy.deepcopy(request)
1✔
459
                        request_copy.q.qname = alias.target
1✔
460
                        # check if we can resolve the alias
461
                        found = self._resolve_name_from_zones(request_copy, reply, client_address)
1✔
462
                        if found:
1✔
463
                            LOG.debug(
1✔
464
                                "Found entry for AliasTarget '%s' ('%s')", request.q.qname, alias
465
                            )
466
                            # change the replaced rr-DNS names back to the original request
467
                            for rr in reply.rr:
1✔
468
                                rr.set_rname(request.q.qname)
1✔
469
                        else:
UNCOV
470
                            reply.header.set_rcode(RCODE.REFUSED)
×
471
                        return True
1✔
472
        return False
1✔
473

474
    def _resolve_name(
1✔
475
        self, request: DNSRecord, reply: DNSRecord, client_address: ClientAddress
476
    ) -> bool:
477
        if alias_found := self._resolve_alias(request, reply, client_address):
1✔
478
            LOG.debug("Alias found: %s", request.q.qname)
1✔
479
            return alias_found
1✔
480
        return self._resolve_name_from_zones(request, reply, client_address)
1✔
481

482
    def _resolve_name_from_zones(
1✔
483
        self, request: DNSRecord, reply: DNSRecord, client_address: ClientAddress
484
    ) -> bool:
485
        found = False
1✔
486

487
        converter = RecordConverter(request, client_address)
1✔
488

489
        # check for direct (not regex based) response
490
        zone = self.zones.get(normalise_dns_name(request.q.qname))
1✔
491
        if zone is not None:
1✔
492
            for zone_records in zone:
1✔
493
                rr = converter.to_record(zone_records).try_rr(request.q)
1✔
494
                if rr:
1✔
495
                    found = True
1✔
496
                    reply.add_answer(rr)
1✔
497
        else:
498
            # no direct zone so look for an SOA record for a higher level zone
499
            for zone_label, zone_records in self.zones.items():
1✔
500
                # try regex match
501
                pattern = re.sub(r"(^|[^.])\*", ".*", str(zone_label))
1✔
502
                if re.match(pattern, str(request.q.qname)):
1✔
503
                    for record in zone_records:
1✔
504
                        rr = converter.to_record(record).try_rr(request.q)
1✔
505
                        if rr:
1✔
506
                            found = True
1✔
507
                            reply.add_answer(rr)
1✔
508
                # try suffix match
509
                elif request.q.qname.matchSuffix(to_bytes(zone_label)):
1✔
510
                    try:
1✔
511
                        soa_record = next(r for r in zone_records if converter.to_record(r).is_soa)
1✔
512
                    except StopIteration:
1✔
513
                        continue
1✔
514
                    else:
515
                        found = True
1✔
516
                        reply.add_answer(converter.to_record(soa_record).as_rr(zone_label))
1✔
517
                        break
1✔
518
        return found
1✔
519

520
    def _parse_section(self, section: str) -> list[RR]:
1✔
521
        result = []
1✔
522
        for line in section.split("\n"):
1✔
523
            line = line.strip()
1✔
524
            if line:
1✔
525
                if line.startswith(";"):
1✔
526
                    # section ended, stop parsing
527
                    break
1✔
528
                else:
529
                    result += RR.fromZone(line)
1✔
530
        return result
1✔
531

532
    def _map_response_dnspython_to_dnslib(self, response):
1✔
533
        """Map response object from dnspython to dnslib (looks like we cannot
534
        simply export/import the raw messages from the wire)"""
535
        flags = dns.flags.to_text(response.flags)
1✔
536

537
        def flag(f):
1✔
538
            return 1 if f.upper() in flags else 0
1✔
539

540
        questions = []
1✔
541
        for q in response.question:
1✔
542
            questions.append(DNSQuestion(qname=str(q.name), qtype=q.rdtype, qclass=q.rdclass))
1✔
543

544
        result = DNSRecord(
1✔
545
            DNSHeader(
546
                qr=flag("qr"), aa=flag("aa"), ra=flag("ra"), id=response.id, rcode=response.rcode()
547
            ),
548
            q=questions[0],
549
        )
550

551
        # extract answers
552
        answer_parts = str(response).partition(";ANSWER")
1✔
553
        result.add_answer(*self._parse_section(answer_parts[2]))
1✔
554
        # extract authority information
555
        authority_parts = str(response).partition(";AUTHORITY")
1✔
556
        result.add_auth(*self._parse_section(authority_parts[2]))
1✔
557
        return result
1✔
558

559
    def add_host(self, name: str, record: NameRecord):
1✔
560
        LOG.debug("Adding host %s with record %s", name, record)
1✔
561
        name = normalise_dns_name(name)
1✔
562
        with self.lock:
1✔
563
            self.zones.setdefault(name, [])
1✔
564
            self.zones[name].append(record)
1✔
565

566
    def delete_host(self, name: str, record: NameRecord):
1✔
567
        LOG.debug("Deleting host %s with record %s", name, record)
1✔
568
        name = normalise_dns_name(name)
1✔
569
        with self.lock:
1✔
570
            if not self.zones.get(name):
1✔
571
                raise ValueError("Could not find entry %s for name %s in zones", record, name)
1✔
572
            self.zones.setdefault(name, [])
1✔
573
            current_zones = self.zones[name]
1✔
574
            self.zones[name] = [
1✔
575
                zone for zone in self.zones[name] if not _should_delete_zone(record, zone)
576
            ]
577
            if self.zones[name] == current_zones:
1✔
UNCOV
578
                raise ValueError("Could not find entry %s for name %s in zones", record, name)
×
579
            # if we deleted the last entry, clean up
580
            if not self.zones[name]:
1✔
581
                del self.zones[name]
1✔
582

583
    def add_alias(self, source_name: str, record_type: RecordType, target: AliasTarget):
1✔
584
        LOG.debug("Adding alias %s with record type %s target %s", source_name, record_type, target)
1✔
585
        label = (DNSLabel(to_bytes(source_name)), record_type)
1✔
586
        with self.lock:
1✔
587
            self.aliases.setdefault(label, [])
1✔
588
            self.aliases[label].append(target)
1✔
589

590
    def delete_alias(self, source_name: str, record_type: RecordType, target: AliasTarget):
1✔
591
        LOG.debug(
1✔
592
            "Deleting alias %s with record type %s",
593
            source_name,
594
            record_type,
595
        )
596
        label = (DNSLabel(to_bytes(source_name)), record_type)
1✔
597
        with self.lock:
1✔
598
            if not self.aliases.get(label):
1✔
599
                raise ValueError(
1✔
600
                    "Could not find entry %s for name %s, record type %s in aliases",
601
                    target,
602
                    source_name,
603
                    record_type,
604
                )
605
            self.aliases.setdefault(label, [])
1✔
606
            current_aliases = self.aliases[label]
1✔
607
            self.aliases[label] = [
1✔
608
                alias for alias in self.aliases[label] if not _should_delete_alias(target, alias)
609
            ]
610
            if self.aliases[label] == current_aliases:
1✔
UNCOV
611
                raise ValueError(
×
612
                    "Could not find entry %s for name %s, record_type %s in aliases",
613
                    target,
614
                    source_name,
615
                    record_type,
616
                )
617
            # if we deleted the last entry, clean up
618
            if not self.aliases[label]:
1✔
619
                del self.aliases[label]
1✔
620

621
    def add_host_pointing_to_localstack(self, name: str):
1✔
622
        LOG.debug("Adding host %s pointing to LocalStack", name)
1✔
623
        self.add_host(name, DynamicRecord(record_type=RecordType.A))
1✔
624
        if config.DNS_RESOLVE_IP == config.LOCALHOST_IP:
1✔
625
            self.add_host(name, DynamicRecord(record_type=RecordType.AAAA))
1✔
626

627
    def delete_host_pointing_to_localstack(self, name: str):
1✔
628
        LOG.debug("Deleting host %s pointing to LocalStack", name)
1✔
629
        self.delete_host(name, DynamicRecord(record_type=RecordType.A))
1✔
630
        if config.DNS_RESOLVE_IP == config.LOCALHOST_IP:
1✔
631
            self.delete_host(name, DynamicRecord(record_type=RecordType.AAAA))
1✔
632

633
    def add_skip(self, skip_pattern: str):
1✔
634
        LOG.debug("Adding skip pattern %s", skip_pattern)
1✔
635
        self.skip_patterns.append(skip_pattern)
1✔
636

637
    def delete_skip(self, skip_pattern: str):
1✔
638
        LOG.debug("Deleting skip pattern %s", skip_pattern)
1✔
639
        self.skip_patterns.remove(skip_pattern)
1✔
640

641
    def clear(self):
1✔
642
        LOG.debug("Clearing DNS zones")
1✔
643
        self.skip_patterns.clear()
1✔
644
        self.zones.clear()
1✔
645
        self.aliases.clear()
1✔
646

647

648
class DnsServer(Server, DnsServerProtocol):
1✔
649
    servers: list[DNSServer]
1✔
650
    resolver: Resolver | None
1✔
651

652
    def __init__(
1✔
653
        self,
654
        port: int,
655
        protocols: list[Literal["udp", "tcp"]],
656
        upstream_dns: str,
657
        host: str = "0.0.0.0",
658
    ) -> None:
659
        super().__init__(port, host)
1✔
660
        self.resolver = Resolver(upstream_dns=upstream_dns)
1✔
661
        self.protocols = protocols
1✔
662
        self.servers = []
1✔
663
        self.handler_class = NonLoggingHandler
1✔
664

665
    def _get_servers(self) -> list[DNSServer]:
1✔
666
        servers = []
1✔
667
        for protocol in self.protocols:
1✔
668
            # TODO add option to use normal logger instead of NoopLogger for verbose debug mode
669
            servers.append(
1✔
670
                DNSServer(
671
                    self.resolver,
672
                    handler=self.handler_class,
673
                    logger=NoopLogger(),
674
                    port=self.port,
675
                    address=self.host,
676
                    tcp=protocol == "tcp",
677
                )
678
            )
679
        return servers
1✔
680

681
    @property
1✔
682
    def protocol(self):
1✔
UNCOV
683
        return "udp"
×
684

685
    def health(self):
1✔
686
        """
687
        Runs a health check on the server. The default implementation performs is_port_open on the server URL.
688
        """
689
        try:
1✔
690
            request = dns.message.make_query("localhost.localstack.cloud", "A")
1✔
691
            answers = dns.query.udp(request, "127.0.0.1", port=self.port, timeout=0.5).answer
1✔
692
            return len(answers) > 0
1✔
693
        except Exception:
1✔
694
            return False
1✔
695

696
    def do_run(self):
1✔
697
        self.servers = self._get_servers()
1✔
698
        for server in self.servers:
1✔
699
            server.start_thread()
1✔
700
        LOG.debug("DNS Server started")
1✔
701
        for server in self.servers:
1✔
702
            server.thread.join()
1✔
703

704
    def do_shutdown(self):
1✔
705
        for server in self.servers:
1✔
706
            server.stop()
1✔
707

708
    def add_host(self, name: str, record: NameRecord):
1✔
709
        self.resolver.add_host(name, record)
1✔
710

711
    def delete_host(self, name: str, record: NameRecord):
1✔
712
        self.resolver.delete_host(name, record)
1✔
713

714
    def add_alias(self, source_name: str, record_type: RecordType, target: AliasTarget):
1✔
715
        self.resolver.add_alias(source_name, record_type, target)
1✔
716

717
    def delete_alias(self, source_name: str, record_type: RecordType, target: AliasTarget):
1✔
718
        self.resolver.delete_alias(source_name, record_type, target)
1✔
719

720
    def add_host_pointing_to_localstack(self, name: str):
1✔
721
        self.resolver.add_host_pointing_to_localstack(name)
1✔
722

723
    def delete_host_pointing_to_localstack(self, name: str):
1✔
724
        self.resolver.delete_host_pointing_to_localstack(name)
1✔
725

726
    def add_skip(self, skip_pattern: str):
1✔
727
        self.resolver.add_skip(skip_pattern)
1✔
728

729
    def delete_skip(self, skip_pattern: str):
1✔
730
        self.resolver.delete_skip(skip_pattern)
1✔
731

732
    def clear(self):
1✔
733
        self.resolver.clear()
1✔
734

735

736
class SeparateProcessDNSServer(Server, DnsServerProtocol):
1✔
737
    def __init__(
1✔
738
        self,
739
        port: int = 53,
740
        host: str = "0.0.0.0",
741
    ) -> None:
742
        super().__init__(port, host)
1✔
743

744
    @property
1✔
745
    def protocol(self):
1✔
UNCOV
746
        return "udp"
×
747

748
    def health(self):
1✔
749
        """
750
        Runs a health check on the server. The default implementation performs is_port_open on the server URL.
751
        """
752
        try:
1✔
753
            request = dns.message.make_query("localhost.localstack.cloud", "A")
1✔
754
            answers = dns.query.udp(request, "127.0.0.1", port=self.port, timeout=0.5).answer
1✔
UNCOV
755
            return len(answers) > 0
×
756
        except Exception:
1✔
757
            return False
1✔
758

759
    def do_start_thread(self):
1✔
760
        # For host mode
761
        env_vars = {}
1✔
762
        for env_var in config.CONFIG_ENV_VARS:
1✔
763
            if env_var.startswith("DNS_"):
1✔
764
                value = os.environ.get(env_var, None)
1✔
765
                if value is not None:
1✔
UNCOV
766
                    env_vars[env_var] = value
×
767

768
        # note: running in a separate process breaks integration with Route53 (to be fixed for local dev mode!)
769
        thread = run_module_as_sudo(
1✔
770
            "localstack.dns.server",
771
            asynchronous=True,
772
            env_vars=env_vars,
773
            arguments=["-p", str(self.port)],
774
        )
775
        return thread
1✔
776

777

778
def get_fallback_dns_server():
1✔
779
    return config.DNS_SERVER or get_available_dns_server()
1✔
780

781

782
@cache
1✔
783
def get_available_dns_server():
1✔
784
    #  TODO check if more loop-checks are necessary than just not using our own DNS server
785
    with FALLBACK_DNS_LOCK:
1✔
786
        resolver = dns.resolver.Resolver()
1✔
787
        # we do not want to include localhost here, or a loop might happen
788
        candidates = [r for r in resolver.nameservers if r != "127.0.0.1"]
1✔
789
        result = None
1✔
790
        candidates.append(DEFAULT_FALLBACK_DNS_SERVER)
1✔
791
        for ns in candidates:
1✔
792
            resolver.nameservers = [ns]
1✔
793
            try:
1✔
794
                try:
1✔
795
                    answer = resolver.resolve(VERIFICATION_DOMAIN, "a", lifetime=3)
1✔
796
                    answer = [
1✔
797
                        res.to_text() for answers in answer.response.answer for res in answers.items
798
                    ]
UNCOV
799
                except Timeout:
×
UNCOV
800
                    answer = None
×
801
                if not answer:
1✔
UNCOV
802
                    continue
×
803
                result = ns
1✔
804
                break
1✔
UNCOV
805
            except Exception:
×
UNCOV
806
                pass
×
807

808
        if result:
1✔
809
            LOG.debug("Determined fallback dns: %s", result)
1✔
810
        else:
UNCOV
811
            LOG.info(
×
812
                "Unable to determine fallback DNS. Please check if '%s' is reachable by your configured DNS servers"
813
                "DNS fallback will be disabled.",
814
                VERIFICATION_DOMAIN,
815
            )
816
        return result
1✔
817

818

819
# ###### LEGACY METHODS ######
820
def add_resolv_entry(file_path: Path | str = Path("/etc/resolv.conf")):
1✔
821
    global PREVIOUS_RESOLV_CONF_FILE
822
    # never overwrite the host configuration without the user's permission
823
    if not in_docker():
1✔
824
        LOG.warning("Incorrectly attempted to alter host networking config")
1✔
825
        return
1✔
826

827
    LOG.debug("Overwriting container DNS server to point to localhost")
1✔
828
    content = textwrap.dedent(
1✔
829
        """
830
    # The following line is required by LocalStack
831
    nameserver 127.0.0.1
832
    """
833
    )
834
    file_path = Path(file_path)
1✔
835
    try:
1✔
836
        with file_path.open("r+") as outfile:
1✔
837
            PREVIOUS_RESOLV_CONF_FILE = outfile.read()
1✔
838
            previous_resolv_conf_without_nameservers = [
1✔
839
                line
840
                for line in PREVIOUS_RESOLV_CONF_FILE.splitlines()
841
                if not line.startswith("nameserver")
842
            ]
843
            outfile.seek(0)
1✔
844
            outfile.write(content)
1✔
845
            outfile.write("\n".join(previous_resolv_conf_without_nameservers))
1✔
846
            outfile.truncate()
1✔
UNCOV
847
    except Exception:
×
UNCOV
848
        LOG.warning(
×
849
            "Could not update container DNS settings", exc_info=LOG.isEnabledFor(logging.DEBUG)
850
        )
851

852

853
def revert_resolv_entry(file_path: Path | str = Path("/etc/resolv.conf")):
1✔
854
    # never overwrite the host configuration without the user's permission
855
    if not in_docker():
1✔
UNCOV
856
        LOG.warning("Incorrectly attempted to alter host networking config")
×
UNCOV
857
        return
×
858

859
    if not PREVIOUS_RESOLV_CONF_FILE:
1✔
UNCOV
860
        LOG.warning("resolv.conf file to restore not found.")
×
UNCOV
861
        return
×
862

863
    LOG.debug("Reverting container DNS config")
1✔
864
    file_path = Path(file_path)
1✔
865
    try:
1✔
866
        with file_path.open("w") as outfile:
1✔
867
            outfile.write(PREVIOUS_RESOLV_CONF_FILE)
1✔
UNCOV
868
    except Exception:
×
UNCOV
869
        LOG.warning(
×
870
            "Could not revert container DNS settings", exc_info=LOG.isEnabledFor(logging.DEBUG)
871
        )
872

873

874
def setup_network_configuration():
1✔
875
    # check if DNS is disabled
876
    if not config.use_custom_dns():
1✔
877
        return
×
878

879
    # add entry to /etc/resolv.conf
880
    if in_docker():
1✔
881
        add_resolv_entry()
1✔
882

883

884
def revert_network_configuration():
1✔
885
    # check if DNS is disabled
886
    if not config.use_custom_dns():
1✔
UNCOV
887
        return
×
888

889
    # add entry to /etc/resolv.conf
890
    if in_docker():
1✔
891
        revert_resolv_entry()
1✔
892

893

894
def start_server(upstream_dns: str, host: str, port: int = config.DNS_PORT):
1✔
895
    global DNS_SERVER
896

897
    if DNS_SERVER:
1✔
898
        # already started - bail
UNCOV
899
        LOG.debug("DNS servers are already started. Avoid starting again.")
×
UNCOV
900
        return
×
901

902
    LOG.debug("Starting DNS servers (tcp/udp port %s on %s)...", port, host)
1✔
903
    dns_server = DnsServer(port, protocols=["tcp", "udp"], host=host, upstream_dns=upstream_dns)
1✔
904

905
    for name in NAME_PATTERNS_POINTING_TO_LOCALSTACK:
1✔
906
        dns_server.add_host_pointing_to_localstack(name)
1✔
907
    if config.LOCALSTACK_HOST.host != LOCALHOST_HOSTNAME:
1✔
UNCOV
908
        dns_server.add_host_pointing_to_localstack(f".*{config.LOCALSTACK_HOST.host}")
×
909

910
    # support both DNS_NAME_PATTERNS_TO_RESOLVE_UPSTREAM and DNS_LOCAL_NAME_PATTERNS
911
    # until the next major version change
912
    # TODO(srw): remove the usage of DNS_LOCAL_NAME_PATTERNS
913
    skip_local_resolution = " ".join(
1✔
914
        [
915
            config.DNS_NAME_PATTERNS_TO_RESOLVE_UPSTREAM,
916
            config.DNS_LOCAL_NAME_PATTERNS,
917
        ]
918
    ).strip()
919
    if skip_local_resolution:
1✔
UNCOV
920
        for skip_pattern in re.split(r"[,;\s]+", skip_local_resolution):
×
UNCOV
921
            dns_server.add_skip(skip_pattern.strip(" \"'"))
×
922

923
    dns_server.start()
1✔
924
    if not dns_server.wait_is_up(timeout=5):
1✔
UNCOV
925
        LOG.warning("DNS server did not come up within 5 seconds.")
×
UNCOV
926
        dns_server.shutdown()
×
UNCOV
927
        return
×
928
    DNS_SERVER = dns_server
1✔
929
    LOG.debug("DNS server startup finished.")
1✔
930

931

932
def stop_servers():
1✔
933
    if DNS_SERVER:
1✔
934
        DNS_SERVER.shutdown()
1✔
935

936

937
def start_dns_server_as_sudo(port: int):
1✔
938
    global DNS_SERVER
939
    LOG.debug(
1✔
940
        "Starting the DNS on its privileged port (%s) needs root permissions. Trying to start DNS with sudo.",
941
        config.DNS_PORT,
942
    )
943

944
    dns_server = SeparateProcessDNSServer(port)
1✔
945
    dns_server.start()
1✔
946

947
    if not dns_server.wait_is_up(timeout=5):
1✔
948
        LOG.warning("DNS server did not come up within 5 seconds.")
1✔
949
        dns_server.shutdown()
1✔
950
        return
1✔
951

UNCOV
952
    DNS_SERVER = dns_server
×
UNCOV
953
    LOG.debug("DNS server startup finished (as sudo).")
×
954

955

956
def start_dns_server(port: int, asynchronous: bool = False, standalone: bool = False):
1✔
957
    if DNS_SERVER:
1✔
958
        # already started - bail
UNCOV
959
        LOG.error("DNS servers are already started. Avoid starting again.")
×
UNCOV
960
        return
×
961

962
    # check if DNS server is disabled
963
    if not config.use_custom_dns():
1✔
UNCOV
964
        LOG.debug("Not starting DNS. DNS_ADDRESS=%s", config.DNS_ADDRESS)
×
UNCOV
965
        return
×
966

967
    upstream_dns = get_fallback_dns_server()
1✔
968
    if not upstream_dns:
1✔
UNCOV
969
        LOG.warning("Error starting the DNS server: No upstream dns server found.")
×
UNCOV
970
        return
×
971

972
    # host to bind the DNS server to. In docker we always want to bind to "0.0.0.0"
973
    host = config.DNS_ADDRESS
1✔
974
    if in_docker():
1✔
975
        host = "0.0.0.0"
1✔
976

977
    if port_can_be_bound(Port(port, "udp"), address=host):
1✔
978
        start_server(port=port, host=host, upstream_dns=upstream_dns)
1✔
979
        if not asynchronous:
1✔
UNCOV
980
            sleep_forever()
×
981
        return
1✔
982

983
    if standalone:
1✔
UNCOV
984
        LOG.debug("Already in standalone mode and port binding still fails.")
×
UNCOV
985
        return
×
986

987
    start_dns_server_as_sudo(port)
1✔
988

989

990
def get_dns_server() -> DnsServerProtocol:
1✔
UNCOV
991
    return DNS_SERVER
×
992

993

994
def is_server_running() -> bool:
1✔
995
    return DNS_SERVER is not None
1✔
996

997

998
if __name__ == "__main__":
999
    parser = argparse.ArgumentParser()
1000
    parser.add_argument("-p", "--port", required=False, default=53, type=int)
1001
    args = parser.parse_args()
1002

1003
    start_dns_server(asynchronous=False, port=args.port, standalone=True)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc