• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / bbb43a10-8331-4ff8-99f6-d3e1ef33b454

15 May 2025 09:38PM UTC coverage: 86.622% (-0.02%) from 86.646%
bbb43a10-8331-4ff8-99f6-d3e1ef33b454

push

circleci

web-flow
S3: fix IfMatch/IfNoneMatch in pre-signed URLs (#12624)

1 of 1 new or added line in 1 file covered. (100.0%)

48 existing lines in 14 files now uncovered.

64360 of 74300 relevant lines covered (86.62%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.21
/localstack-core/localstack/utils/net.py
1
import logging
1✔
2
import random
1✔
3
import re
1✔
4
import socket
1✔
5
import threading
1✔
6
from contextlib import closing
1✔
7
from typing import Any, List, MutableMapping, NamedTuple, Optional, Union
1✔
8
from urllib.parse import urlparse
1✔
9

10
import dns.resolver
1✔
11
from dnslib import DNSRecord
1✔
12

13
from localstack import config, constants
1✔
14

15
from .collections import CustomExpiryTTLCache
1✔
16
from .numbers import is_number
1✔
17
from .objects import singleton_factory
1✔
18
from .sync import retry
1✔
19

20
LOG = logging.getLogger(__name__)
1✔
21

22
# regular expression for IPv4 addresses
23
IP_REGEX = (
1✔
24
    r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
25
)
26

27
# many linux kernels use 32768-60999, RFC 6335 is 49152-65535, so we use a mix here
28
DYNAMIC_PORT_RANGE_START = 32768
1✔
29
DYNAMIC_PORT_RANGE_END = 65536
1✔
30

31
DEFAULT_PORT_RESERVED_SECONDS = 6
1✔
32
"""Default nuber of seconds a port is reserved in a PortRange."""
1✔
33

34

35
class Port(NamedTuple):
1✔
36
    """Represents a network port, with port number and protocol (TCP/UDP)"""
37

38
    port: int
1✔
39
    """the port number"""
1✔
40
    protocol: str
1✔
41
    """network protocol name (usually 'tcp' or 'udp')"""
1✔
42

43
    @classmethod
1✔
44
    def wrap(cls, port: "IntOrPort") -> "Port":
1✔
45
        """Return the given port as a Port object, using 'tcp' as the default protocol."""
46
        if isinstance(port, Port):
1✔
47
            return port
1✔
48
        return Port(port=port, protocol="tcp")
1✔
49

50

51
# simple helper type to encapsulate int/Port argument types
52
IntOrPort = Union[int, Port]
1✔
53

54

55
def is_port_open(
1✔
56
    port_or_url: Union[int, str],
57
    http_path: str = None,
58
    expect_success: bool = True,
59
    protocols: Optional[Union[str, List[str]]] = None,
60
    quiet: bool = True,
61
):
62
    from localstack.utils.http import safe_requests
1✔
63

64
    protocols = protocols or ["tcp"]
1✔
65
    port = port_or_url
1✔
66
    if is_number(port):
1✔
67
        port = int(port)
1✔
68
    host = "localhost"
1✔
69
    protocol = "http"
1✔
70
    protocols = protocols if isinstance(protocols, list) else [protocols]
1✔
71
    if isinstance(port, str):
1✔
72
        url = urlparse(port_or_url)
1✔
73
        port = url.port
1✔
74
        host = url.hostname
1✔
75
        protocol = url.scheme
1✔
76
    nw_protocols = []
1✔
77
    nw_protocols += [socket.SOCK_STREAM] if "tcp" in protocols else []
1✔
78
    nw_protocols += [socket.SOCK_DGRAM] if "udp" in protocols else []
1✔
79
    for nw_protocol in nw_protocols:
1✔
80
        with closing(
1✔
81
            socket.socket(socket.AF_INET if ":" not in host else socket.AF_INET6, nw_protocol)
82
        ) as sock:
83
            sock.settimeout(1)
1✔
84
            if nw_protocol == socket.SOCK_DGRAM:
1✔
85
                try:
×
86
                    if port == 53:
×
87
                        dnshost = "127.0.0.1" if host == "localhost" else host
×
88
                        resolver = dns.resolver.Resolver()
×
89
                        resolver.nameservers = [dnshost]
×
90
                        resolver.timeout = 1
×
91
                        resolver.lifetime = 1
×
92
                        answers = resolver.query("google.com", "A")
×
93
                        assert len(answers) > 0
×
94
                    else:
95
                        sock.sendto(bytes(), (host, port))
×
96
                        sock.recvfrom(1024)
×
97
                except Exception:
×
98
                    if not quiet:
×
99
                        LOG.exception("Error connecting to UDP port %s:%s", host, port)
×
100
                    return False
×
101
            elif nw_protocol == socket.SOCK_STREAM:
1✔
102
                result = sock.connect_ex((host, port))
1✔
103
                if result != 0:
1✔
104
                    if not quiet:
1✔
105
                        LOG.warning(
×
106
                            "Error connecting to TCP port %s:%s (result=%s)", host, port, result
107
                        )
108
                    return False
1✔
109
    if "tcp" not in protocols or not http_path:
1✔
110
        return True
1✔
111
    host = f"[{host}]" if ":" in host else host
×
112
    url = f"{protocol}://{host}:{port}{http_path}"
×
113
    try:
×
114
        response = safe_requests.get(url, verify=False)
×
115
        return not expect_success or response.status_code < 400
×
116
    except Exception:
×
117
        return False
×
118

119

120
def wait_for_port_open(
1✔
121
    port: int, http_path: str = None, expect_success=True, retries=10, sleep_time=0.5
122
):
123
    """Ping the given TCP network port until it becomes available (for a given number of retries).
124
    If 'http_path' is set, make a GET request to this path and assert a non-error response."""
125
    return wait_for_port_status(
1✔
126
        port,
127
        http_path=http_path,
128
        expect_success=expect_success,
129
        retries=retries,
130
        sleep_time=sleep_time,
131
    )
132

133

134
def wait_for_port_closed(
1✔
135
    port: int, http_path: str = None, expect_success=True, retries=10, sleep_time=0.5
136
):
137
    return wait_for_port_status(
1✔
138
        port,
139
        http_path=http_path,
140
        expect_success=expect_success,
141
        retries=retries,
142
        sleep_time=sleep_time,
143
        expect_closed=True,
144
    )
145

146

147
def wait_for_port_status(
1✔
148
    port: int,
149
    http_path: str = None,
150
    expect_success=True,
151
    retries=10,
152
    sleep_time=0.5,
153
    expect_closed=False,
154
):
155
    """Ping the given TCP network port until it becomes (un)available (for a given number of retries)."""
156

157
    def check():
1✔
158
        status = is_port_open(port, http_path=http_path, expect_success=expect_success)
1✔
159
        if bool(status) != (not expect_closed):
1✔
160
            raise Exception(
×
161
                "Port %s (path: %s) was not %s"
162
                % (port, http_path, "closed" if expect_closed else "open")
163
            )
164

165
    return retry(check, sleep=sleep_time, retries=retries)
1✔
166

167

168
def port_can_be_bound(port: IntOrPort, address: str = "") -> bool:
1✔
169
    """
170
    Return whether a local port (TCP or UDP) can be bound to. Note that this is a stricter check
171
    than is_port_open(...) above, as is_port_open() may return False if the port is
172
    not accessible (i.e., does not respond), yet cannot be bound to.
173
    """
174
    try:
1✔
175
        port = Port.wrap(port)
1✔
176
        if port.protocol == "tcp":
1✔
177
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1✔
178
        elif port.protocol == "udp":
1✔
179
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1✔
180
        else:
181
            LOG.debug("Unsupported network protocol '%s' for port check", port.protocol)
×
182
            return False
×
183
        sock.bind((address, port.port))
1✔
184
        return True
1✔
185
    except OSError:
1✔
186
        # either the port is used or we don't have permission to bind it
187
        return False
1✔
188
    except Exception:
1✔
189
        LOG.error("cannot bind port %s", port, exc_info=LOG.isEnabledFor(logging.DEBUG))
1✔
190
        return False
1✔
191

192

193
def get_free_udp_port(blocklist: List[int] = None) -> int:
1✔
194
    blocklist = blocklist or []
1✔
195
    for i in range(10):
1✔
196
        udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1✔
197
        udp.bind(("", 0))
1✔
198
        addr, port = udp.getsockname()
1✔
199
        udp.close()
1✔
200
        if port not in blocklist:
1✔
201
            return port
1✔
202
    raise Exception(f"Unable to determine free UDP port with blocklist {blocklist}")
×
203

204

205
def get_free_tcp_port(blocklist: List[int] = None) -> int:
1✔
206
    """
207
    Tries to bind a socket to port 0 and returns the port that was assigned by the system. If the port is
208
    in the given ``blocklist``, or the port is marked as reserved in ``dynamic_port_range``, the procedure
209
    is repeated for up to 50 times.
210

211
    :param blocklist: an optional list of ports that are not allowed as random ports
212
    :return: a free TCP port
213
    """
214
    blocklist = blocklist or []
1✔
215
    for i in range(50):
1✔
216
        tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1✔
217
        tcp.bind(("", 0))
1✔
218
        addr, port = tcp.getsockname()
1✔
219
        tcp.close()
1✔
220
        if port not in blocklist and not dynamic_port_range.is_port_reserved(port):
1✔
221
            try:
1✔
222
                dynamic_port_range.mark_reserved(port)
1✔
223
            except ValueError:
×
224
                # depending on the ephemeral port range of the system, the allocated port may be outside what
225
                # we defined as dynamic port range
226
                pass
×
227
            return port
1✔
228
    raise Exception(f"Unable to determine free TCP port with blocklist {blocklist}")
1✔
229

230

231
def get_free_tcp_port_range(num_ports: int, max_attempts: int = 50) -> "PortRange":
1✔
232
    """
233
    Attempts to get a contiguous range of free ports from the dynamic port range. For instance,
234
    ``get_free_tcp_port_range(4)`` may return the following result: ``PortRange(44000:44004)``.
235

236
    :param num_ports: the number of ports in the range
237
    :param max_attempts: the number of times to retry if a contiguous range was not found
238
    :return: a port range of free TCP ports
239
    :raises PortNotAvailableException: if max_attempts was reached to re-try
240
    """
241
    if num_ports < 2:
1✔
242
        raise ValueError(f"invalid number of ports {num_ports}")
×
243

244
    def _is_port_range_free(_range: PortRange):
1✔
245
        for _port in _range:
1✔
246
            if dynamic_port_range.is_port_reserved(_port) or not port_can_be_bound(_port):
1✔
247
                return False
1✔
248
        return True
1✔
249

250
    for _ in range(max_attempts):
1✔
251
        # try to find a suitable starting point (leave enough space at the end)
252
        port_range_start = random.randint(
1✔
253
            dynamic_port_range.start, dynamic_port_range.end - num_ports - 1
254
        )
255
        port_range = PortRange(port_range_start, port_range_start + num_ports - 1)
1✔
256

257
        # check that each port in the range is available (has not been reserved and can be bound)
258
        # we don't use dynamic_port_range.reserve_port because in case the port range check fails at some port
259
        # all ports up until then would be reserved
260
        if not _is_port_range_free(port_range):
1✔
261
            continue
1✔
262

263
        # port range found! mark them as reserved in the dynamic port range and return
264
        for port in port_range:
1✔
265
            dynamic_port_range.mark_reserved(port)
1✔
266
        return port_range
1✔
267

268
    raise PortNotAvailableException("reached max_attempts when trying to find port range")
1✔
269

270

271
def resolve_hostname(hostname: str) -> Optional[str]:
1✔
272
    """Resolve the given hostname and return its IP address, or None if it cannot be resolved."""
273
    try:
1✔
274
        return socket.gethostbyname(hostname)
1✔
275
    except socket.error:
1✔
276
        return None
1✔
277

278

279
def is_ip_address(addr: str) -> bool:
1✔
280
    try:
1✔
281
        socket.inet_aton(addr)
1✔
282
        return True
1✔
283
    except socket.error:
1✔
284
        return False
1✔
285

286

287
def is_ipv4_address(address: str) -> bool:
1✔
288
    """
289
    Checks if passed string looks like an IPv4 address
290
    :param address: Possible IPv4 address
291
    :return: True if string looks like IPv4 address, False otherwise
292
    """
293
    return bool(re.match(IP_REGEX, address))
1✔
294

295

296
class PortNotAvailableException(Exception):
1✔
297
    """Exception which indicates that the PortRange could not reserve a port."""
298

299
    pass
1✔
300

301

302
class PortRange:
1✔
303
    """Manages a range of ports that can be reserved and requested."""
304

305
    def __init__(self, start: int, end: int):
1✔
306
        """
307
        Create a new port range. The port range is inclusive, meaning ``PortRange(5000,5005)`` is 6 ports
308
        including both 5000 and 5005. This is different from ``range`` which is not inclusive, i.e.::
309

310
            PortRange(5000, 5005).as_range() == range(5000, 5005 + 1)
311

312
        :param start: the start port (inclusive)
313
        :param end: the end of the range (inclusive).
314
        """
315
        self.start = start
1✔
316
        self.end = end
1✔
317

318
        # cache for locally available ports (ports are reserved for a short period of a few seconds)
319
        self._ports_cache: MutableMapping[Port, Any] = CustomExpiryTTLCache(
1✔
320
            maxsize=len(self),
321
            ttl=DEFAULT_PORT_RESERVED_SECONDS,
322
        )
323
        self._ports_lock = threading.RLock()
1✔
324

325
    def subrange(self, start: int = None, end: int = None) -> "PortRange":
1✔
326
        """
327
        Creates a new PortRange object from this range which is a sub-range of this port range. The new
328
        object will use the same port cache and locks of this port range, so you can constrain port
329
        reservations of an existing port range but have reservations synced between them.
330

331
        :param start: the start of the subrange
332
        :param end: the end of the subrange
333
        :raises ValueError: if start or end are outside the current port range
334
        :return: a new PortRange object synced to this one
335
        """
336
        start = start if start is not None else self.start
1✔
337
        end = end if end is not None else self.end
1✔
338

339
        if start < self.start:
1✔
340
            raise ValueError(f"start not in range ({start} < {self.start})")
×
341
        if end > self.end:
1✔
342
            raise ValueError(f"end not in range ({end} < {self.end})")
×
343

344
        port_range = PortRange(start, end)
1✔
345
        port_range._ports_cache = self._ports_cache
1✔
346
        port_range._ports_lock = self._ports_lock
1✔
347
        return port_range
1✔
348

349
    def as_range(self) -> range:
1✔
350
        """
351
        Returns a ``range(start, end+1)`` object representing this port range.
352

353
        :return: a range
354
        """
355
        return range(self.start, self.end + 1)
1✔
356

357
    def reserve_port(self, port: Optional[IntOrPort] = None, duration: Optional[int] = None) -> int:
1✔
358
        """
359
        Reserves the given port (if it is still free). If the given port is None, it reserves a free port from the
360
        configured port range for external services. If a port is given, it has to be within the configured
361
        range of external services (i.e., in the range [self.start, self.end)).
362

363
        :param port: explicit port to check or None if a random port from the configured range should be selected
364
        :param duration: the time in seconds the port is reserved for (defaults to a few seconds)
365
        :return: reserved, free port number (int)
366
        :raises PortNotAvailableException: if the given port is outside the configured range, it is already bound or
367
                    reserved, or if the given port is none and there is no free port in the configured service range.
368
        """
369
        ports_range = self.as_range()
1✔
370
        port = Port.wrap(port) if port is not None else port
1✔
371
        if port is not None and port.port not in ports_range:
1✔
372
            raise PortNotAvailableException(
1✔
373
                f"The requested port ({port}) is not in the port range ({ports_range})."
374
            )
375
        with self._ports_lock:
1✔
376
            if port is not None:
1✔
377
                return self._try_reserve_port(port, duration=duration)
1✔
378
            else:
379
                for port_in_range in ports_range:
1✔
380
                    try:
1✔
381
                        return self._try_reserve_port(port_in_range, duration=duration)
1✔
382
                    except PortNotAvailableException:
1✔
383
                        # We ignore the fact that this single port is reserved, we just check the next one
384
                        pass
1✔
385
        raise PortNotAvailableException(
1✔
386
            f"No free network ports available in {self!r} (currently reserved: %s)",
387
            list(self._ports_cache.keys()),
388
        )
389

390
    def is_port_reserved(self, port: IntOrPort) -> bool:
1✔
391
        """
392
        Checks whether the port has been reserved in this PortRange. Does not check whether the port can be
393
        bound or not, and does not check whether the port is in range.
394

395
        :param port: the port to check
396
        :return: true if the port is reserved within the range
397
        """
398
        port = Port.wrap(port)
1✔
399
        return self._ports_cache.get(port) is not None
1✔
400

401
    def mark_reserved(self, port: IntOrPort, duration: int = None):
1✔
402
        """
403
        Marks the given port as reserved for the given duration, regardless of whether it is free for not.
404

405
        :param port: the port to reserve
406
        :param duration: the duration
407
        :raises ValueError: if the port is not in this port range
408
        """
409
        port = Port.wrap(port)
1✔
410

411
        if port.port not in self.as_range():
1✔
412
            raise ValueError(f"port {port} not in {self!r}")
×
413

414
        with self._ports_lock:
1✔
415
            # reserve the port for a short period of time
416
            self._ports_cache[port] = "__reserved__"
1✔
417
            if duration:
1✔
418
                self._ports_cache.set_expiry(port, duration)
1✔
419

420
    def _try_reserve_port(self, port: IntOrPort, duration: int) -> int:
1✔
421
        """Checks if the given port is currently not reserved and can be bound."""
422
        port = Port.wrap(port)
1✔
423

424
        if self.is_port_reserved(port):
1✔
425
            raise PortNotAvailableException(f"The given port ({port}) is already reserved.")
1✔
426
        if not self._port_can_be_bound(port):
1✔
UNCOV
427
            raise PortNotAvailableException(f"The given port ({port}) is already in use.")
×
428

429
        self.mark_reserved(port, duration)
1✔
430
        return port.port
1✔
431

432
    def _port_can_be_bound(self, port: IntOrPort) -> bool:
1✔
433
        """
434
        Internal check whether the port can be bound. Will open a socket connection and see if the port is
435
        available. Can be overwritten by subclasses to provide a custom implementation.
436

437
        :param port: the port to check
438
        :return: true if the port is free on the system
439
        """
440
        return port_can_be_bound(port)
1✔
441

442
    def __len__(self):
1✔
443
        return self.end - self.start + 1
1✔
444

445
    def __iter__(self):
1✔
446
        return self.as_range().__iter__()
1✔
447

448
    def __repr__(self):
449
        return f"PortRange({self.start}:{self.end})"
450

451

452
@singleton_factory
1✔
453
def get_docker_host_from_container() -> str:
1✔
454
    """
455
    Get the hostname/IP to connect to the host from within a Docker container (e.g., Lambda function).
456
    The logic is roughly as follows:
457
      1. return `host.docker.internal` if we're running in host mode, in a non-Linux OS
458
      2. return the IP address that `host.docker.internal` (or alternatively `host.containers.internal`)
459
        resolves to, if we're inside Docker
460
      3. return the Docker bridge IP (config.DOCKER_BRIDGE_IP) as a fallback, if option (2) fails
461
    """
462
    result = config.DOCKER_BRIDGE_IP
1✔
463
    try:
1✔
464
        if not config.is_in_docker and not config.is_in_linux:
1✔
465
            # If we're running outside Docker (in host mode), and would like the Lambda containers to be able
466
            # to access services running on the local machine, return `host.docker.internal` accordingly
467
            result = "host.docker.internal"
×
468
        if config.is_in_docker:
1✔
469
            try:
1✔
470
                result = socket.gethostbyname("host.docker.internal")
1✔
471
            except socket.error:
1✔
472
                result = socket.gethostbyname("host.containers.internal")
1✔
473
    except socket.error:
1✔
474
        # TODO if neither host resolves, we might be in linux. We could just use the default gateway then
475
        pass
1✔
476
    return result
1✔
477

478

479
def get_addressable_container_host(default_local_hostname: str = None) -> str:
1✔
480
    """
481
    Return the target host to address endpoints exposed by Docker containers, depending on
482
    the current execution context.
483

484
    If we're currently executing within Docker, then return get_docker_host_from_container(); otherwise, return
485
    the value of `LOCALHOST_HOSTNAME`, assuming that container endpoints are exposed and accessible under localhost.
486

487
    :param default_local_hostname: local hostname to return, if running outside Docker (defaults to LOCALHOST_HOSTNAME)
488
    """
489
    default_local_hostname = default_local_hostname or constants.LOCALHOST_HOSTNAME
1✔
490
    return get_docker_host_from_container() if config.is_in_docker else default_local_hostname
1✔
491

492

493
def send_dns_query(
1✔
494
    name: str,
495
    port: int = 53,
496
    ip_address: str = "127.0.0.1",
497
    qtype: str = "A",
498
    timeout: float = 1.0,
499
    tcp: bool = False,
500
) -> DNSRecord:
501
    LOG.debug("querying %s:%d for name %s", ip_address, port, name)
×
502
    request = DNSRecord.question(qname=name, qtype=qtype)
×
503
    reply_bytes = request.send(dest=ip_address, port=port, tcp=tcp, timeout=timeout, ipv6=False)
×
504
    return DNSRecord.parse(reply_bytes)
×
505

506

507
dynamic_port_range = PortRange(DYNAMIC_PORT_RANGE_START, DYNAMIC_PORT_RANGE_END)
1✔
508
"""The dynamic port range."""
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc