• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20014894797

06 Dec 2025 01:32PM UTC coverage: 86.869% (-0.04%) from 86.904%
20014894797

push

github

web-flow
CFn: handle updates with empty resource properties (#13471)

2 of 2 new or added lines in 1 file covered. (100.0%)

419 existing lines in 20 files now uncovered.

69891 of 80456 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.27
/localstack-core/localstack/utils/net.py
1
import logging
1✔
2
import random
1✔
3
import re
1✔
4
import socket
1✔
5
import threading
1✔
6
from collections.abc import MutableMapping
1✔
7
from contextlib import closing
1✔
8
from typing import Any, NamedTuple
1✔
9
from urllib.parse import urlparse
1✔
10

11
import dns.resolver
1✔
12
from dnslib import DNSRecord
1✔
13

14
from localstack import config, constants
1✔
15

16
from .collections import CustomExpiryTTLCache
1✔
17
from .numbers import is_number
1✔
18
from .objects import singleton_factory
1✔
19
from .sync import retry
1✔
20

21
LOG = logging.getLogger(__name__)
1✔
22

23
# regular expression for IPv4 addresses
24
IP_REGEX = (
1✔
25
    r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
26
)
27

28
# many linux kernels use 32768-60999, RFC 6335 is 49152-65535, so we use a mix here
29
DYNAMIC_PORT_RANGE_START = 32768
1✔
30
DYNAMIC_PORT_RANGE_END = 65536
1✔
31

32
DEFAULT_PORT_RESERVED_SECONDS = 6
1✔
33
"""Default nuber of seconds a port is reserved in a PortRange."""
1✔
34

35

36
class Port(NamedTuple):
1✔
37
    """Represents a network port, with port number and protocol (TCP/UDP)"""
38

39
    port: int
1✔
40
    """the port number"""
1✔
41
    protocol: str
1✔
42
    """network protocol name (usually 'tcp' or 'udp')"""
1✔
43

44
    @classmethod
1✔
45
    def wrap(cls, port: "IntOrPort") -> "Port":
1✔
46
        """Return the given port as a Port object, using 'tcp' as the default protocol."""
47
        if isinstance(port, Port):
1✔
48
            return port
1✔
49
        return Port(port=port, protocol="tcp")
1✔
50

51

52
# simple helper type to encapsulate int/Port argument types
53
IntOrPort = int | Port
1✔
54

55

56
def is_port_open(
1✔
57
    port_or_url: int | str,
58
    http_path: str = None,
59
    expect_success: bool = True,
60
    protocols: str | list[str] | None = None,
61
    quiet: bool = True,
62
):
63
    from localstack.utils.http import safe_requests
1✔
64

65
    protocols = protocols or ["tcp"]
1✔
66
    port = port_or_url
1✔
67
    if is_number(port):
1✔
68
        port = int(port)
1✔
69
    host = "localhost"
1✔
70
    protocol = "http"
1✔
71
    protocols = protocols if isinstance(protocols, list) else [protocols]
1✔
72
    if isinstance(port, str):
1✔
73
        url = urlparse(port_or_url)
1✔
74
        port = url.port
1✔
75
        host = url.hostname
1✔
76
        protocol = url.scheme
1✔
77
    nw_protocols = []
1✔
78
    nw_protocols += [socket.SOCK_STREAM] if "tcp" in protocols else []
1✔
79
    nw_protocols += [socket.SOCK_DGRAM] if "udp" in protocols else []
1✔
80
    for nw_protocol in nw_protocols:
1✔
81
        with closing(
1✔
82
            socket.socket(socket.AF_INET if ":" not in host else socket.AF_INET6, nw_protocol)
83
        ) as sock:
84
            sock.settimeout(1)
1✔
85
            if nw_protocol == socket.SOCK_DGRAM:
1✔
86
                try:
×
87
                    if port == 53:
×
88
                        dnshost = "127.0.0.1" if host == "localhost" else host
×
89
                        resolver = dns.resolver.Resolver()
×
90
                        resolver.nameservers = [dnshost]
×
91
                        resolver.timeout = 1
×
92
                        resolver.lifetime = 1
×
93
                        answers = resolver.query("google.com", "A")
×
94
                        assert len(answers) > 0
×
95
                    else:
96
                        sock.sendto(b"", (host, port))
×
97
                        sock.recvfrom(1024)
×
98
                except Exception:
×
99
                    if not quiet:
×
100
                        LOG.error(
×
101
                            "Error connecting to UDP port %s:%s",
102
                            host,
103
                            port,
104
                            exc_info=LOG.isEnabledFor(logging.DEBUG),
105
                        )
106
                    return False
×
107
            elif nw_protocol == socket.SOCK_STREAM:
1✔
108
                result = sock.connect_ex((host, port))
1✔
109
                if result != 0:
1✔
110
                    if not quiet:
1✔
111
                        LOG.warning(
×
112
                            "Error connecting to TCP port %s:%s (result=%s)", host, port, result
113
                        )
114
                    return False
1✔
115
    if "tcp" not in protocols or not http_path:
1✔
116
        return True
1✔
117
    host = f"[{host}]" if ":" in host else host
×
118
    url = f"{protocol}://{host}:{port}{http_path}"
×
119
    try:
×
120
        response = safe_requests.get(url, verify=False)
×
121
        return not expect_success or response.status_code < 400
×
122
    except Exception:
×
123
        return False
×
124

125

126
def wait_for_port_open(
1✔
127
    port: int, http_path: str = None, expect_success=True, retries=10, sleep_time=0.5
128
):
129
    """Ping the given TCP network port until it becomes available (for a given number of retries).
130
    If 'http_path' is set, make a GET request to this path and assert a non-error response."""
131
    return wait_for_port_status(
1✔
132
        port,
133
        http_path=http_path,
134
        expect_success=expect_success,
135
        retries=retries,
136
        sleep_time=sleep_time,
137
    )
138

139

140
def wait_for_port_closed(
1✔
141
    port: int, http_path: str = None, expect_success=True, retries=10, sleep_time=0.5
142
):
143
    return wait_for_port_status(
1✔
144
        port,
145
        http_path=http_path,
146
        expect_success=expect_success,
147
        retries=retries,
148
        sleep_time=sleep_time,
149
        expect_closed=True,
150
    )
151

152

153
def wait_for_port_status(
1✔
154
    port: int,
155
    http_path: str = None,
156
    expect_success=True,
157
    retries=10,
158
    sleep_time=0.5,
159
    expect_closed=False,
160
):
161
    """Ping the given TCP network port until it becomes (un)available (for a given number of retries)."""
162

163
    def check():
1✔
164
        status = is_port_open(port, http_path=http_path, expect_success=expect_success)
1✔
165
        if bool(status) != (not expect_closed):
1✔
166
            raise Exception(
×
167
                "Port {} (path: {}) was not {}".format(
168
                    port, http_path, "closed" if expect_closed else "open"
169
                )
170
            )
171

172
    return retry(check, sleep=sleep_time, retries=retries)
1✔
173

174

175
def port_can_be_bound(port: IntOrPort, address: str = "") -> bool:
1✔
176
    """
177
    Return whether a local port (TCP or UDP) can be bound to. Note that this is a stricter check
178
    than is_port_open(...) above, as is_port_open() may return False if the port is
179
    not accessible (i.e., does not respond), yet cannot be bound to.
180
    """
181
    try:
1✔
182
        port = Port.wrap(port)
1✔
183
        if port.protocol == "tcp":
1✔
184
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1✔
185
        elif port.protocol == "udp":
1✔
186
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1✔
187
        else:
188
            LOG.debug("Unsupported network protocol '%s' for port check", port.protocol)
×
189
            return False
×
190
        sock.bind((address, port.port))
1✔
191
        return True
1✔
192
    except OSError:
1✔
193
        # either the port is used or we don't have permission to bind it
194
        return False
1✔
195
    except Exception:
1✔
196
        LOG.error("cannot bind port %s", port, exc_info=LOG.isEnabledFor(logging.DEBUG))
1✔
197
        return False
1✔
198

199

200
def get_free_udp_port(blocklist: list[int] = None) -> int:
1✔
201
    blocklist = blocklist or []
1✔
202
    for i in range(10):
1✔
203
        udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1✔
204
        udp.bind(("", 0))
1✔
205
        addr, port = udp.getsockname()
1✔
206
        udp.close()
1✔
207
        if port not in blocklist:
1✔
208
            return port
1✔
209
    raise Exception(f"Unable to determine free UDP port with blocklist {blocklist}")
×
210

211

212
def get_free_tcp_port(blocklist: list[int] = None) -> int:
1✔
213
    """
214
    Tries to bind a socket to port 0 and returns the port that was assigned by the system. If the port is
215
    in the given ``blocklist``, or the port is marked as reserved in ``dynamic_port_range``, the procedure
216
    is repeated for up to 50 times.
217

218
    :param blocklist: an optional list of ports that are not allowed as random ports
219
    :return: a free TCP port
220
    """
221
    blocklist = blocklist or []
1✔
222
    for i in range(50):
1✔
223
        tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1✔
224
        tcp.bind(("", 0))
1✔
225
        addr, port = tcp.getsockname()
1✔
226
        tcp.close()
1✔
227
        if port not in blocklist and not dynamic_port_range.is_port_reserved(port):
1✔
228
            try:
1✔
229
                dynamic_port_range.mark_reserved(port)
1✔
230
            except ValueError:
×
231
                # depending on the ephemeral port range of the system, the allocated port may be outside what
232
                # we defined as dynamic port range
233
                pass
×
234
            return port
1✔
235
    raise Exception(f"Unable to determine free TCP port with blocklist {blocklist}")
1✔
236

237

238
def get_free_tcp_port_range(num_ports: int, max_attempts: int = 50) -> "PortRange":
1✔
239
    """
240
    Attempts to get a contiguous range of free ports from the dynamic port range. For instance,
241
    ``get_free_tcp_port_range(4)`` may return the following result: ``PortRange(44000:44004)``.
242

243
    :param num_ports: the number of ports in the range
244
    :param max_attempts: the number of times to retry if a contiguous range was not found
245
    :return: a port range of free TCP ports
246
    :raises PortNotAvailableException: if max_attempts was reached to re-try
247
    """
248
    if num_ports < 2:
1✔
249
        raise ValueError(f"invalid number of ports {num_ports}")
×
250

251
    def _is_port_range_free(_range: PortRange):
1✔
252
        for _port in _range:
1✔
253
            if dynamic_port_range.is_port_reserved(_port) or not port_can_be_bound(_port):
1✔
254
                return False
1✔
255
        return True
1✔
256

257
    for _ in range(max_attempts):
1✔
258
        # try to find a suitable starting point (leave enough space at the end)
259
        port_range_start = random.randint(
1✔
260
            dynamic_port_range.start, dynamic_port_range.end - num_ports - 1
261
        )
262
        port_range = PortRange(port_range_start, port_range_start + num_ports - 1)
1✔
263

264
        # check that each port in the range is available (has not been reserved and can be bound)
265
        # we don't use dynamic_port_range.reserve_port because in case the port range check fails at some port
266
        # all ports up until then would be reserved
267
        if not _is_port_range_free(port_range):
1✔
268
            continue
1✔
269

270
        # port range found! mark them as reserved in the dynamic port range and return
271
        for port in port_range:
1✔
272
            dynamic_port_range.mark_reserved(port)
1✔
273
        return port_range
1✔
274

275
    raise PortNotAvailableException("reached max_attempts when trying to find port range")
1✔
276

277

278
def resolve_hostname(hostname: str) -> str | None:
1✔
279
    """Resolve the given hostname and return its IP address, or None if it cannot be resolved."""
280
    try:
1✔
281
        return socket.gethostbyname(hostname)
1✔
282
    except OSError:
1✔
283
        return None
1✔
284

285

286
def is_ip_address(addr: str) -> bool:
1✔
287
    try:
1✔
288
        socket.inet_aton(addr)
1✔
289
        return True
1✔
290
    except OSError:
1✔
291
        return False
1✔
292

293

294
def is_ipv4_address(address: str) -> bool:
1✔
295
    """
296
    Checks if passed string looks like an IPv4 address
297
    :param address: Possible IPv4 address
298
    :return: True if string looks like IPv4 address, False otherwise
299
    """
300
    return bool(re.match(IP_REGEX, address))
1✔
301

302

303
class PortNotAvailableException(Exception):
1✔
304
    """Exception which indicates that the PortRange could not reserve a port."""
305

306
    pass
1✔
307

308

309
class PortRange:
1✔
310
    """Manages a range of ports that can be reserved and requested."""
311

312
    def __init__(self, start: int, end: int):
1✔
313
        """
314
        Create a new port range. The port range is inclusive, meaning ``PortRange(5000,5005)`` is 6 ports
315
        including both 5000 and 5005. This is different from ``range`` which is not inclusive, i.e.::
316

317
            PortRange(5000, 5005).as_range() == range(5000, 5005 + 1)
318

319
        :param start: the start port (inclusive)
320
        :param end: the end of the range (inclusive).
321
        """
322
        self.start = start
1✔
323
        self.end = end
1✔
324

325
        # cache for locally available ports (ports are reserved for a short period of a few seconds)
326
        self._ports_cache: MutableMapping[Port, Any] = CustomExpiryTTLCache(
1✔
327
            maxsize=len(self),
328
            ttl=DEFAULT_PORT_RESERVED_SECONDS,
329
        )
330
        self._ports_lock = threading.RLock()
1✔
331

332
    def subrange(self, start: int = None, end: int = None) -> "PortRange":
1✔
333
        """
334
        Creates a new PortRange object from this range which is a sub-range of this port range. The new
335
        object will use the same port cache and locks of this port range, so you can constrain port
336
        reservations of an existing port range but have reservations synced between them.
337

338
        :param start: the start of the subrange
339
        :param end: the end of the subrange
340
        :raises ValueError: if start or end are outside the current port range
341
        :return: a new PortRange object synced to this one
342
        """
343
        start = start if start is not None else self.start
1✔
344
        end = end if end is not None else self.end
1✔
345

346
        if start < self.start:
1✔
347
            raise ValueError(f"start not in range ({start} < {self.start})")
×
348
        if end > self.end:
1✔
349
            raise ValueError(f"end not in range ({end} < {self.end})")
×
350

351
        # ensures that we return an instance of a subclass
352
        port_range = type(self)(start, end)
1✔
353
        port_range._ports_cache = self._ports_cache
1✔
354
        port_range._ports_lock = self._ports_lock
1✔
355
        return port_range
1✔
356

357
    def as_range(self) -> range:
1✔
358
        """
359
        Returns a ``range(start, end+1)`` object representing this port range.
360

361
        :return: a range
362
        """
363
        return range(self.start, self.end + 1)
1✔
364

365
    def reserve_port(self, port: IntOrPort | None = None, duration: int | None = None) -> int:
1✔
366
        """
367
        Reserves the given port (if it is still free). If the given port is None, it reserves a free port from the
368
        configured port range for external services. If a port is given, it has to be within the configured
369
        range of external services (i.e., in the range [self.start, self.end)).
370

371
        :param port: explicit port to check or None if a random port from the configured range should be selected
372
        :param duration: the time in seconds the port is reserved for (defaults to a few seconds)
373
        :return: reserved, free port number (int)
374
        :raises PortNotAvailableException: if the given port is outside the configured range, it is already bound or
375
                    reserved, or if the given port is none and there is no free port in the configured service range.
376
        """
377
        ports_range = self.as_range()
1✔
378
        port = Port.wrap(port) if port is not None else port
1✔
379
        if port is not None and port.port not in ports_range:
1✔
380
            raise PortNotAvailableException(
1✔
381
                f"The requested port ({port}) is not in the port range ({ports_range})."
382
            )
383
        with self._ports_lock:
1✔
384
            if port is not None:
1✔
385
                return self._try_reserve_port(port, duration=duration)
1✔
386
            else:
387
                for port_in_range in ports_range:
1✔
388
                    try:
1✔
389
                        return self._try_reserve_port(port_in_range, duration=duration)
1✔
390
                    except PortNotAvailableException:
1✔
391
                        # We ignore the fact that this single port is reserved, we just check the next one
392
                        pass
1✔
393
        raise PortNotAvailableException(
1✔
394
            f"No free network ports available in {self!r} (currently reserved: %s)",
395
            list(self._ports_cache.keys()),
396
        )
397

398
    def is_port_reserved(self, port: IntOrPort) -> bool:
1✔
399
        """
400
        Checks whether the port has been reserved in this PortRange. Does not check whether the port can be
401
        bound or not, and does not check whether the port is in range.
402

403
        :param port: the port to check
404
        :return: true if the port is reserved within the range
405
        """
406
        port = Port.wrap(port)
1✔
407
        return self._ports_cache.get(port) is not None
1✔
408

409
    def mark_reserved(self, port: IntOrPort, duration: int = None):
1✔
410
        """
411
        Marks the given port as reserved for the given duration, regardless of whether it is free for not.
412

413
        :param port: the port to reserve
414
        :param duration: the duration
415
        :raises ValueError: if the port is not in this port range
416
        """
417
        port = Port.wrap(port)
1✔
418

419
        if port.port not in self.as_range():
1✔
420
            raise ValueError(f"port {port} not in {self!r}")
×
421

422
        with self._ports_lock:
1✔
423
            # reserve the port for a short period of time
424
            self._ports_cache[port] = "__reserved__"
1✔
425
            if duration:
1✔
426
                self._ports_cache.set_expiry(port, duration)
1✔
427

428
    def _try_reserve_port(self, port: IntOrPort, duration: int) -> int:
1✔
429
        """Checks if the given port is currently not reserved and can be bound."""
430
        port = Port.wrap(port)
1✔
431

432
        if self.is_port_reserved(port):
1✔
433
            raise PortNotAvailableException(f"The given port ({port}) is already reserved.")
1✔
434
        if not self._port_can_be_bound(port):
1✔
UNCOV
435
            raise PortNotAvailableException(f"The given port ({port}) is already in use.")
×
436

437
        self.mark_reserved(port, duration)
1✔
438
        return port.port
1✔
439

440
    def _port_can_be_bound(self, port: IntOrPort) -> bool:
1✔
441
        """
442
        Internal check whether the port can be bound. Will open a socket connection and see if the port is
443
        available. Can be overwritten by subclasses to provide a custom implementation.
444

445
        :param port: the port to check
446
        :return: true if the port is free on the system
447
        """
448
        return port_can_be_bound(port)
1✔
449

450
    def __len__(self):
1✔
451
        return self.end - self.start + 1
1✔
452

453
    def __iter__(self):
1✔
454
        return self.as_range().__iter__()
1✔
455

456
    def __repr__(self):
457
        return f"PortRange({self.start}:{self.end})"
458

459

460
@singleton_factory
1✔
461
def get_docker_host_from_container() -> str:
1✔
462
    """
463
    Get the hostname/IP to connect to the host from within a Docker container (e.g., Lambda function).
464
    The logic is roughly as follows:
465
      1. return `host.docker.internal` if we're running in host mode, in a non-Linux OS
466
      2. return the IP address that `host.docker.internal` (or alternatively `host.containers.internal`)
467
        resolves to, if we're inside Docker
468
      3. return the Docker bridge IP (config.DOCKER_BRIDGE_IP) as a fallback, if option (2) fails
469
    """
470
    result = config.DOCKER_BRIDGE_IP
1✔
471
    try:
1✔
472
        if not config.is_in_docker and not config.is_in_linux:
1✔
473
            # If we're running outside Docker (in host mode), and would like the Lambda containers to be able
474
            # to access services running on the local machine, return `host.docker.internal` accordingly
475
            result = "host.docker.internal"
×
476
        if config.is_in_docker:
1✔
477
            try:
1✔
478
                result = socket.gethostbyname("host.docker.internal")
1✔
479
            except OSError:
1✔
480
                result = socket.gethostbyname("host.containers.internal")
1✔
481
    except OSError:
1✔
482
        # TODO if neither host resolves, we might be in linux. We could just use the default gateway then
483
        pass
1✔
484
    return result
1✔
485

486

487
def get_addressable_container_host(default_local_hostname: str = None) -> str:
1✔
488
    """
489
    Return the target host to address endpoints exposed by Docker containers, depending on
490
    the current execution context.
491

492
    If we're currently executing within Docker, then return get_docker_host_from_container(); otherwise, return
493
    the value of `LOCALHOST_HOSTNAME`, assuming that container endpoints are exposed and accessible under localhost.
494

495
    :param default_local_hostname: local hostname to return, if running outside Docker (defaults to LOCALHOST_HOSTNAME)
496
    """
497
    default_local_hostname = default_local_hostname or constants.LOCALHOST_HOSTNAME
1✔
498
    return get_docker_host_from_container() if config.is_in_docker else default_local_hostname
1✔
499

500

501
def send_dns_query(
1✔
502
    name: str,
503
    port: int = 53,
504
    ip_address: str = "127.0.0.1",
505
    qtype: str = "A",
506
    timeout: float = 1.0,
507
    tcp: bool = False,
508
) -> DNSRecord:
509
    LOG.debug("querying %s:%d for name %s", ip_address, port, name)
×
510
    request = DNSRecord.question(qname=name, qtype=qtype)
×
511
    reply_bytes = request.send(dest=ip_address, port=port, tcp=tcp, timeout=timeout, ipv6=False)
×
512
    return DNSRecord.parse(reply_bytes)
×
513

514

515
dynamic_port_range = PortRange(DYNAMIC_PORT_RANGE_START, DYNAMIC_PORT_RANGE_END)
1✔
516
"""The dynamic port range."""
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc