• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.0
/localstack-core/localstack/aws/serving/twisted.py
1
"""
2
Bindings to serve LocalStack using twisted.
3
"""
4

5
import logging
1✔
6
import time
1✔
7

8
from rolo.gateway import Gateway
1✔
9
from rolo.serving.twisted import TwistedGateway
1✔
10
from twisted.internet import endpoints, interfaces, reactor, ssl
1✔
11
from twisted.protocols.policies import ProtocolWrapper, WrappingFactory
1✔
12
from twisted.protocols.tls import BufferingTLSTransport, TLSMemoryBIOFactory
1✔
13
from twisted.python.threadpool import ThreadPool
1✔
14

15
from localstack import config
1✔
16
from localstack.config import HostAndPort
1✔
17
from localstack.runtime.shutdown import ON_AFTER_SERVICE_SHUTDOWN_HANDLERS
1✔
18
from localstack.utils.patch import patch
1✔
19
from localstack.utils.ssl import create_ssl_cert, install_predefined_cert_if_available
1✔
20
from localstack.utils.threads import start_worker_thread
1✔
21

22
LOG = logging.getLogger(__name__)
1✔
23

24

25
class TLSMultiplexer(ProtocolWrapper):
1✔
26
    """
27
    Custom protocol to multiplex HTTPS and HTTP connections over the same port. This is the equivalent of
28
    ``DuplexSocket``, but since twisted use its own SSL layer and doesn't use `ssl.SSLSocket``, we need to implement
29
    the multiplexing behavior in the Twisted layer.
30

31
    The basic idea is to defer the ``makeConnection`` call until the first data are received, and then re-configure
32
    the underlying ``wrappedProtocol`` if needed with a TLS wrapper.
33
    """
34

35
    tlsProtocol = BufferingTLSTransport
1✔
36

37
    def __init__(
1✔
38
        self,
39
        factory: "WrappingFactory",
40
        wrappedProtocol: interfaces.IProtocol,
41
    ):
42
        super().__init__(factory, wrappedProtocol)
1✔
43
        self._isInitialized = False
1✔
44
        self._isTLS = None
1✔
45
        self._negotiatedProtocol = None
1✔
46

47
    def makeConnection(self, transport):
1✔
48
        self.connected = 1
1✔
49
        self.transport = transport
1✔
50
        self.factory.registerProtocol(self)  # this is idempotent
1✔
51
        # we defer the actual makeConnection call to the first invocation of dataReceived
52

53
    def dataReceived(self, data: bytes) -> None:
1✔
54
        if self._isInitialized:
1✔
55
            super().dataReceived(data)
1✔
56
            return
1✔
57

58
        # once the first data have been received, we can check whether it's a TLS handshake, then we need to run the
59
        # actual makeConnection procedure.
60
        self._isInitialized = True
1✔
61
        self._isTLS = data[0] == 22  # 0x16 is the marker byte identifying a TLS handshake
1✔
62

63
        if self._isTLS:
1✔
64
            # wrap protocol again in tls protocol
65
            self.wrappedProtocol = self.tlsProtocol(self.factory, self.wrappedProtocol)
1✔
66
        else:
67
            if data.startswith(b"PRI * HTTP/2"):
1✔
68
                # TODO: can we do proper protocol negotiation like in ALPN?
69
                # in the TLS case, this is determined by the ALPN procedure by OpenSSL.
70
                self._negotiatedProtocol = b"h2"
1✔
71

72
        # now that we've set the real wrapped protocol, run the make connection procedure
73
        super().makeConnection(self.transport)
1✔
74
        super().dataReceived(data)
1✔
75

76
    @property
1✔
77
    def negotiatedProtocol(self) -> str | None:
1✔
78
        if self._negotiatedProtocol:
1✔
79
            return self._negotiatedProtocol
1✔
80
        return self.wrappedProtocol.negotiatedProtocol
1✔
81

82

83
class TLSMultiplexerFactory(TLSMemoryBIOFactory):
1✔
84
    protocol = TLSMultiplexer
1✔
85

86

87
def stop_thread_pool(self: ThreadPool, stop, timeout: float = None):
1✔
88
    """
89
    Patch for a custom shutdown procedure for a ThreadPool that waits a given amount of time for all threads.
90

91
    :param self: the pool to shut down
92
    :param stop: the original function
93
    :param timeout: the maximum amount of time to wait
94
    """
95
    # copied from ThreadPool.stop()
96
    if self.joined:
1✔
97
        return
1✔
98
    if not timeout:
1✔
UNCOV
99
        stop()
×
100
        return
×
101

102
    self.joined = True
1✔
103
    self.started = False
1✔
104
    self._team.quit()
1✔
105

106
    # our own joining logic with timeout
107
    remaining = timeout
1✔
108
    total_waited = 0
1✔
109

110
    for thread in self.threads:
1✔
111
        then = time.time()
1✔
112

113
        # LOG.info("[shutdown] Joining thread %s", thread)
114
        thread.join(remaining)
1✔
115

116
        waited = time.time() - then
1✔
117
        total_waited += waited
1✔
118
        remaining -= waited
1✔
119

120
        if thread.is_alive():
1✔
UNCOV
121
            LOG.warning(
×
122
                "[shutdown] Request thread %s still alive after %.2f seconds",
123
                thread,
124
                total_waited,
125
            )
126

127
        if remaining <= 0:
1✔
UNCOV
128
            remaining = 0
×
129

130

131
def serve_gateway(
1✔
132
    gateway: Gateway, listen: list[HostAndPort], use_ssl: bool, asynchronous: bool = False
133
):
134
    """
135
    Serve a Gateway instance using twisted.
136
    """
137
    # setup reactor
UNCOV
138
    reactor.suggestThreadPoolSize(config.GATEWAY_WORKER_COUNT)
×
139
    thread_pool = reactor.getThreadPool()
×
140
    patch(thread_pool.stop)(stop_thread_pool)
×
141

UNCOV
142
    def _shutdown_reactor():
×
143
        LOG.debug("[shutdown] Shutting down twisted reactor serving the gateway")
×
144
        thread_pool.stop(timeout=10)
×
145
        reactor.stop()
×
146

UNCOV
147
    ON_AFTER_SERVICE_SHUTDOWN_HANDLERS.register(_shutdown_reactor)
×
148

149
    # setup twisted webserver Site
UNCOV
150
    site = TwistedGateway(gateway)
×
151

152
    # configure ssl
UNCOV
153
    if use_ssl:
×
154
        install_predefined_cert_if_available()
×
155
        serial_number = listen[0].port
×
156
        _, cert_file_name, key_file_name = create_ssl_cert(serial_number=serial_number)
×
157
        context_factory = ssl.DefaultOpenSSLContextFactory(key_file_name, cert_file_name)
×
158
        context_factory.getContext().use_certificate_chain_file(cert_file_name)
×
159
        protocol_factory = TLSMultiplexerFactory(context_factory, False, site)
×
160
    else:
UNCOV
161
        protocol_factory = site
×
162

163
    # add endpoint for each host/port combination
UNCOV
164
    for host_and_port in listen:
×
165
        # TODO: interface = host?
UNCOV
166
        endpoint = endpoints.TCP4ServerEndpoint(reactor, host_and_port.port)
×
167
        endpoint.listen(protocol_factory)
×
168

UNCOV
169
    if asynchronous:
×
170
        return start_worker_thread(reactor.run)
×
171
    else:
UNCOV
172
        return reactor.run()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc