• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.27
/localstack-core/localstack/utils/net.py
1
import logging
1✔
2
import random
1✔
3
import re
1✔
4
import socket
1✔
5
import threading
1✔
6
from collections.abc import MutableMapping
1✔
7
from contextlib import closing
1✔
8
from typing import Any, NamedTuple, Optional, Union
1✔
9
from urllib.parse import urlparse
1✔
10

11
import dns.resolver
1✔
12
from dnslib import DNSRecord
1✔
13

14
from localstack import config, constants
1✔
15

16
from .collections import CustomExpiryTTLCache
1✔
17
from .numbers import is_number
1✔
18
from .objects import singleton_factory
1✔
19
from .sync import retry
1✔
20

21
LOG = logging.getLogger(__name__)
1✔
22

23
# regular expression for IPv4 addresses
24
IP_REGEX = (
1✔
25
    r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
26
)
27

28
# many linux kernels use 32768-60999, RFC 6335 is 49152-65535, so we use a mix here
29
DYNAMIC_PORT_RANGE_START = 32768
1✔
30
DYNAMIC_PORT_RANGE_END = 65536
1✔
31

32
DEFAULT_PORT_RESERVED_SECONDS = 6
1✔
33
"""Default nuber of seconds a port is reserved in a PortRange."""
1✔
34

35

36
class Port(NamedTuple):
1✔
37
    """Represents a network port, with port number and protocol (TCP/UDP)"""
38

39
    port: int
1✔
40
    """the port number"""
1✔
41
    protocol: str
1✔
42
    """network protocol name (usually 'tcp' or 'udp')"""
1✔
43

44
    @classmethod
1✔
45
    def wrap(cls, port: "IntOrPort") -> "Port":
1✔
46
        """Return the given port as a Port object, using 'tcp' as the default protocol."""
47
        if isinstance(port, Port):
1✔
48
            return port
1✔
49
        return Port(port=port, protocol="tcp")
1✔
50

51

52
# simple helper type to encapsulate int/Port argument types
53
IntOrPort = Union[int, Port]
1✔
54

55

56
def is_port_open(
1✔
57
    port_or_url: Union[int, str],
58
    http_path: str = None,
59
    expect_success: bool = True,
60
    protocols: Optional[Union[str, list[str]]] = None,
61
    quiet: bool = True,
62
):
63
    from localstack.utils.http import safe_requests
1✔
64

65
    protocols = protocols or ["tcp"]
1✔
66
    port = port_or_url
1✔
67
    if is_number(port):
1✔
68
        port = int(port)
1✔
69
    host = "localhost"
1✔
70
    protocol = "http"
1✔
71
    protocols = protocols if isinstance(protocols, list) else [protocols]
1✔
72
    if isinstance(port, str):
1✔
73
        url = urlparse(port_or_url)
1✔
74
        port = url.port
1✔
75
        host = url.hostname
1✔
76
        protocol = url.scheme
1✔
77
    nw_protocols = []
1✔
78
    nw_protocols += [socket.SOCK_STREAM] if "tcp" in protocols else []
1✔
79
    nw_protocols += [socket.SOCK_DGRAM] if "udp" in protocols else []
1✔
80
    for nw_protocol in nw_protocols:
1✔
81
        with closing(
1✔
82
            socket.socket(socket.AF_INET if ":" not in host else socket.AF_INET6, nw_protocol)
83
        ) as sock:
84
            sock.settimeout(1)
1✔
85
            if nw_protocol == socket.SOCK_DGRAM:
1✔
86
                try:
×
87
                    if port == 53:
×
88
                        dnshost = "127.0.0.1" if host == "localhost" else host
×
89
                        resolver = dns.resolver.Resolver()
×
90
                        resolver.nameservers = [dnshost]
×
91
                        resolver.timeout = 1
×
92
                        resolver.lifetime = 1
×
93
                        answers = resolver.query("google.com", "A")
×
UNCOV
94
                        assert len(answers) > 0
×
95
                    else:
96
                        sock.sendto(b"", (host, port))
×
97
                        sock.recvfrom(1024)
×
98
                except Exception:
×
99
                    if not quiet:
×
100
                        LOG.exception("Error connecting to UDP port %s:%s", host, port)
×
UNCOV
101
                    return False
×
102
            elif nw_protocol == socket.SOCK_STREAM:
1✔
103
                result = sock.connect_ex((host, port))
1✔
104
                if result != 0:
1✔
105
                    if not quiet:
1✔
UNCOV
106
                        LOG.warning(
×
107
                            "Error connecting to TCP port %s:%s (result=%s)", host, port, result
108
                        )
109
                    return False
1✔
110
    if "tcp" not in protocols or not http_path:
1✔
111
        return True
1✔
112
    host = f"[{host}]" if ":" in host else host
×
113
    url = f"{protocol}://{host}:{port}{http_path}"
×
114
    try:
×
115
        response = safe_requests.get(url, verify=False)
×
116
        return not expect_success or response.status_code < 400
×
117
    except Exception:
×
UNCOV
118
        return False
×
119

120

121
def wait_for_port_open(
1✔
122
    port: int, http_path: str = None, expect_success=True, retries=10, sleep_time=0.5
123
):
124
    """Ping the given TCP network port until it becomes available (for a given number of retries).
125
    If 'http_path' is set, make a GET request to this path and assert a non-error response."""
126
    return wait_for_port_status(
1✔
127
        port,
128
        http_path=http_path,
129
        expect_success=expect_success,
130
        retries=retries,
131
        sleep_time=sleep_time,
132
    )
133

134

135
def wait_for_port_closed(
1✔
136
    port: int, http_path: str = None, expect_success=True, retries=10, sleep_time=0.5
137
):
138
    return wait_for_port_status(
1✔
139
        port,
140
        http_path=http_path,
141
        expect_success=expect_success,
142
        retries=retries,
143
        sleep_time=sleep_time,
144
        expect_closed=True,
145
    )
146

147

148
def wait_for_port_status(
1✔
149
    port: int,
150
    http_path: str = None,
151
    expect_success=True,
152
    retries=10,
153
    sleep_time=0.5,
154
    expect_closed=False,
155
):
156
    """Ping the given TCP network port until it becomes (un)available (for a given number of retries)."""
157

158
    def check():
1✔
159
        status = is_port_open(port, http_path=http_path, expect_success=expect_success)
1✔
160
        if bool(status) != (not expect_closed):
1✔
UNCOV
161
            raise Exception(
×
162
                "Port %s (path: %s) was not %s"
163
                % (port, http_path, "closed" if expect_closed else "open")
164
            )
165

166
    return retry(check, sleep=sleep_time, retries=retries)
1✔
167

168

169
def port_can_be_bound(port: IntOrPort, address: str = "") -> bool:
1✔
170
    """
171
    Return whether a local port (TCP or UDP) can be bound to. Note that this is a stricter check
172
    than is_port_open(...) above, as is_port_open() may return False if the port is
173
    not accessible (i.e., does not respond), yet cannot be bound to.
174
    """
175
    try:
1✔
176
        port = Port.wrap(port)
1✔
177
        if port.protocol == "tcp":
1✔
178
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1✔
179
        elif port.protocol == "udp":
1✔
180
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1✔
181
        else:
182
            LOG.debug("Unsupported network protocol '%s' for port check", port.protocol)
×
UNCOV
183
            return False
×
184
        sock.bind((address, port.port))
1✔
185
        return True
1✔
186
    except OSError:
1✔
187
        # either the port is used or we don't have permission to bind it
188
        return False
1✔
189
    except Exception:
1✔
190
        LOG.error("cannot bind port %s", port, exc_info=LOG.isEnabledFor(logging.DEBUG))
1✔
191
        return False
1✔
192

193

194
def get_free_udp_port(blocklist: list[int] = None) -> int:
1✔
195
    blocklist = blocklist or []
1✔
196
    for i in range(10):
1✔
197
        udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1✔
198
        udp.bind(("", 0))
1✔
199
        addr, port = udp.getsockname()
1✔
200
        udp.close()
1✔
201
        if port not in blocklist:
1✔
202
            return port
1✔
UNCOV
203
    raise Exception(f"Unable to determine free UDP port with blocklist {blocklist}")
×
204

205

206
def get_free_tcp_port(blocklist: list[int] = None) -> int:
1✔
207
    """
208
    Tries to bind a socket to port 0 and returns the port that was assigned by the system. If the port is
209
    in the given ``blocklist``, or the port is marked as reserved in ``dynamic_port_range``, the procedure
210
    is repeated for up to 50 times.
211

212
    :param blocklist: an optional list of ports that are not allowed as random ports
213
    :return: a free TCP port
214
    """
215
    blocklist = blocklist or []
1✔
216
    for i in range(50):
1✔
217
        tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1✔
218
        tcp.bind(("", 0))
1✔
219
        addr, port = tcp.getsockname()
1✔
220
        tcp.close()
1✔
221
        if port not in blocklist and not dynamic_port_range.is_port_reserved(port):
1✔
222
            try:
1✔
223
                dynamic_port_range.mark_reserved(port)
1✔
UNCOV
224
            except ValueError:
×
225
                # depending on the ephemeral port range of the system, the allocated port may be outside what
226
                # we defined as dynamic port range
UNCOV
227
                pass
×
228
            return port
1✔
229
    raise Exception(f"Unable to determine free TCP port with blocklist {blocklist}")
1✔
230

231

232
def get_free_tcp_port_range(num_ports: int, max_attempts: int = 50) -> "PortRange":
1✔
233
    """
234
    Attempts to get a contiguous range of free ports from the dynamic port range. For instance,
235
    ``get_free_tcp_port_range(4)`` may return the following result: ``PortRange(44000:44004)``.
236

237
    :param num_ports: the number of ports in the range
238
    :param max_attempts: the number of times to retry if a contiguous range was not found
239
    :return: a port range of free TCP ports
240
    :raises PortNotAvailableException: if max_attempts was reached to re-try
241
    """
242
    if num_ports < 2:
1✔
UNCOV
243
        raise ValueError(f"invalid number of ports {num_ports}")
×
244

245
    def _is_port_range_free(_range: PortRange):
1✔
246
        for _port in _range:
1✔
247
            if dynamic_port_range.is_port_reserved(_port) or not port_can_be_bound(_port):
1✔
248
                return False
1✔
249
        return True
1✔
250

251
    for _ in range(max_attempts):
1✔
252
        # try to find a suitable starting point (leave enough space at the end)
253
        port_range_start = random.randint(
1✔
254
            dynamic_port_range.start, dynamic_port_range.end - num_ports - 1
255
        )
256
        port_range = PortRange(port_range_start, port_range_start + num_ports - 1)
1✔
257

258
        # check that each port in the range is available (has not been reserved and can be bound)
259
        # we don't use dynamic_port_range.reserve_port because in case the port range check fails at some port
260
        # all ports up until then would be reserved
261
        if not _is_port_range_free(port_range):
1✔
262
            continue
1✔
263

264
        # port range found! mark them as reserved in the dynamic port range and return
265
        for port in port_range:
1✔
266
            dynamic_port_range.mark_reserved(port)
1✔
267
        return port_range
1✔
268

269
    raise PortNotAvailableException("reached max_attempts when trying to find port range")
1✔
270

271

272
def resolve_hostname(hostname: str) -> Optional[str]:
1✔
273
    """Resolve the given hostname and return its IP address, or None if it cannot be resolved."""
274
    try:
1✔
275
        return socket.gethostbyname(hostname)
1✔
276
    except OSError:
1✔
277
        return None
1✔
278

279

280
def is_ip_address(addr: str) -> bool:
1✔
281
    try:
1✔
282
        socket.inet_aton(addr)
1✔
283
        return True
1✔
284
    except OSError:
1✔
285
        return False
1✔
286

287

288
def is_ipv4_address(address: str) -> bool:
1✔
289
    """
290
    Checks if passed string looks like an IPv4 address
291
    :param address: Possible IPv4 address
292
    :return: True if string looks like IPv4 address, False otherwise
293
    """
294
    return bool(re.match(IP_REGEX, address))
1✔
295

296

297
class PortNotAvailableException(Exception):
1✔
298
    """Exception which indicates that the PortRange could not reserve a port."""
299

300
    pass
1✔
301

302

303
class PortRange:
1✔
304
    """Manages a range of ports that can be reserved and requested."""
305

306
    def __init__(self, start: int, end: int):
1✔
307
        """
308
        Create a new port range. The port range is inclusive, meaning ``PortRange(5000,5005)`` is 6 ports
309
        including both 5000 and 5005. This is different from ``range`` which is not inclusive, i.e.::
310

311
            PortRange(5000, 5005).as_range() == range(5000, 5005 + 1)
312

313
        :param start: the start port (inclusive)
314
        :param end: the end of the range (inclusive).
315
        """
316
        self.start = start
1✔
317
        self.end = end
1✔
318

319
        # cache for locally available ports (ports are reserved for a short period of a few seconds)
320
        self._ports_cache: MutableMapping[Port, Any] = CustomExpiryTTLCache(
1✔
321
            maxsize=len(self),
322
            ttl=DEFAULT_PORT_RESERVED_SECONDS,
323
        )
324
        self._ports_lock = threading.RLock()
1✔
325

326
    def subrange(self, start: int = None, end: int = None) -> "PortRange":
1✔
327
        """
328
        Creates a new PortRange object from this range which is a sub-range of this port range. The new
329
        object will use the same port cache and locks of this port range, so you can constrain port
330
        reservations of an existing port range but have reservations synced between them.
331

332
        :param start: the start of the subrange
333
        :param end: the end of the subrange
334
        :raises ValueError: if start or end are outside the current port range
335
        :return: a new PortRange object synced to this one
336
        """
337
        start = start if start is not None else self.start
1✔
338
        end = end if end is not None else self.end
1✔
339

340
        if start < self.start:
1✔
UNCOV
341
            raise ValueError(f"start not in range ({start} < {self.start})")
×
342
        if end > self.end:
1✔
UNCOV
343
            raise ValueError(f"end not in range ({end} < {self.end})")
×
344

345
        port_range = PortRange(start, end)
1✔
346
        port_range._ports_cache = self._ports_cache
1✔
347
        port_range._ports_lock = self._ports_lock
1✔
348
        return port_range
1✔
349

350
    def as_range(self) -> range:
1✔
351
        """
352
        Returns a ``range(start, end+1)`` object representing this port range.
353

354
        :return: a range
355
        """
356
        return range(self.start, self.end + 1)
1✔
357

358
    def reserve_port(self, port: Optional[IntOrPort] = None, duration: Optional[int] = None) -> int:
1✔
359
        """
360
        Reserves the given port (if it is still free). If the given port is None, it reserves a free port from the
361
        configured port range for external services. If a port is given, it has to be within the configured
362
        range of external services (i.e., in the range [self.start, self.end)).
363

364
        :param port: explicit port to check or None if a random port from the configured range should be selected
365
        :param duration: the time in seconds the port is reserved for (defaults to a few seconds)
366
        :return: reserved, free port number (int)
367
        :raises PortNotAvailableException: if the given port is outside the configured range, it is already bound or
368
                    reserved, or if the given port is none and there is no free port in the configured service range.
369
        """
370
        ports_range = self.as_range()
1✔
371
        port = Port.wrap(port) if port is not None else port
1✔
372
        if port is not None and port.port not in ports_range:
1✔
373
            raise PortNotAvailableException(
1✔
374
                f"The requested port ({port}) is not in the port range ({ports_range})."
375
            )
376
        with self._ports_lock:
1✔
377
            if port is not None:
1✔
378
                return self._try_reserve_port(port, duration=duration)
1✔
379
            else:
380
                for port_in_range in ports_range:
1✔
381
                    try:
1✔
382
                        return self._try_reserve_port(port_in_range, duration=duration)
1✔
383
                    except PortNotAvailableException:
1✔
384
                        # We ignore the fact that this single port is reserved, we just check the next one
385
                        pass
1✔
386
        raise PortNotAvailableException(
1✔
387
            f"No free network ports available in {self!r} (currently reserved: %s)",
388
            list(self._ports_cache.keys()),
389
        )
390

391
    def is_port_reserved(self, port: IntOrPort) -> bool:
1✔
392
        """
393
        Checks whether the port has been reserved in this PortRange. Does not check whether the port can be
394
        bound or not, and does not check whether the port is in range.
395

396
        :param port: the port to check
397
        :return: true if the port is reserved within the range
398
        """
399
        port = Port.wrap(port)
1✔
400
        return self._ports_cache.get(port) is not None
1✔
401

402
    def mark_reserved(self, port: IntOrPort, duration: int = None):
1✔
403
        """
404
        Marks the given port as reserved for the given duration, regardless of whether it is free for not.
405

406
        :param port: the port to reserve
407
        :param duration: the duration
408
        :raises ValueError: if the port is not in this port range
409
        """
410
        port = Port.wrap(port)
1✔
411

412
        if port.port not in self.as_range():
1✔
UNCOV
413
            raise ValueError(f"port {port} not in {self!r}")
×
414

415
        with self._ports_lock:
1✔
416
            # reserve the port for a short period of time
417
            self._ports_cache[port] = "__reserved__"
1✔
418
            if duration:
1✔
419
                self._ports_cache.set_expiry(port, duration)
1✔
420

421
    def _try_reserve_port(self, port: IntOrPort, duration: int) -> int:
1✔
422
        """Checks if the given port is currently not reserved and can be bound."""
423
        port = Port.wrap(port)
1✔
424

425
        if self.is_port_reserved(port):
1✔
426
            raise PortNotAvailableException(f"The given port ({port}) is already reserved.")
1✔
427
        if not self._port_can_be_bound(port):
1✔
UNCOV
428
            raise PortNotAvailableException(f"The given port ({port}) is already in use.")
×
429

430
        self.mark_reserved(port, duration)
1✔
431
        return port.port
1✔
432

433
    def _port_can_be_bound(self, port: IntOrPort) -> bool:
1✔
434
        """
435
        Internal check whether the port can be bound. Will open a socket connection and see if the port is
436
        available. Can be overwritten by subclasses to provide a custom implementation.
437

438
        :param port: the port to check
439
        :return: true if the port is free on the system
440
        """
441
        return port_can_be_bound(port)
1✔
442

443
    def __len__(self):
1✔
444
        return self.end - self.start + 1
1✔
445

446
    def __iter__(self):
1✔
447
        return self.as_range().__iter__()
1✔
448

449
    def __repr__(self):
450
        return f"PortRange({self.start}:{self.end})"
451

452

453
@singleton_factory
1✔
454
def get_docker_host_from_container() -> str:
1✔
455
    """
456
    Get the hostname/IP to connect to the host from within a Docker container (e.g., Lambda function).
457
    The logic is roughly as follows:
458
      1. return `host.docker.internal` if we're running in host mode, in a non-Linux OS
459
      2. return the IP address that `host.docker.internal` (or alternatively `host.containers.internal`)
460
        resolves to, if we're inside Docker
461
      3. return the Docker bridge IP (config.DOCKER_BRIDGE_IP) as a fallback, if option (2) fails
462
    """
463
    result = config.DOCKER_BRIDGE_IP
1✔
464
    try:
1✔
465
        if not config.is_in_docker and not config.is_in_linux:
1✔
466
            # If we're running outside Docker (in host mode), and would like the Lambda containers to be able
467
            # to access services running on the local machine, return `host.docker.internal` accordingly
UNCOV
468
            result = "host.docker.internal"
×
469
        if config.is_in_docker:
1✔
470
            try:
1✔
471
                result = socket.gethostbyname("host.docker.internal")
1✔
472
            except OSError:
1✔
473
                result = socket.gethostbyname("host.containers.internal")
1✔
474
    except OSError:
1✔
475
        # TODO if neither host resolves, we might be in linux. We could just use the default gateway then
476
        pass
1✔
477
    return result
1✔
478

479

480
def get_addressable_container_host(default_local_hostname: str = None) -> str:
1✔
481
    """
482
    Return the target host to address endpoints exposed by Docker containers, depending on
483
    the current execution context.
484

485
    If we're currently executing within Docker, then return get_docker_host_from_container(); otherwise, return
486
    the value of `LOCALHOST_HOSTNAME`, assuming that container endpoints are exposed and accessible under localhost.
487

488
    :param default_local_hostname: local hostname to return, if running outside Docker (defaults to LOCALHOST_HOSTNAME)
489
    """
490
    default_local_hostname = default_local_hostname or constants.LOCALHOST_HOSTNAME
1✔
491
    return get_docker_host_from_container() if config.is_in_docker else default_local_hostname
1✔
492

493

494
def send_dns_query(
1✔
495
    name: str,
496
    port: int = 53,
497
    ip_address: str = "127.0.0.1",
498
    qtype: str = "A",
499
    timeout: float = 1.0,
500
    tcp: bool = False,
501
) -> DNSRecord:
502
    LOG.debug("querying %s:%d for name %s", ip_address, port, name)
×
503
    request = DNSRecord.question(qname=name, qtype=qtype)
×
504
    reply_bytes = request.send(dest=ip_address, port=port, tcp=tcp, timeout=timeout, ipv6=False)
×
UNCOV
505
    return DNSRecord.parse(reply_bytes)
×
506

507

508
dynamic_port_range = PortRange(DYNAMIC_PORT_RANGE_START, DYNAMIC_PORT_RANGE_END)
1✔
509
"""The dynamic port range."""
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc