• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.91
/localstack-core/localstack/testing/pytest/container.py
1
import logging
1✔
2
import os
1✔
3
import shlex
1✔
4
import threading
1✔
5
from collections.abc import Generator
1✔
6
from typing import Callable, Optional
1✔
7

8
import pytest
1✔
9

10
from localstack import constants
1✔
11
from localstack.utils.bootstrap import Container, RunningContainer, get_docker_image_to_start
1✔
12
from localstack.utils.container_utils.container_client import (
1✔
13
    CancellableStream,
14
    ContainerConfiguration,
15
    ContainerConfigurator,
16
    NoSuchNetwork,
17
    PortMappings,
18
    VolumeMappings,
19
)
20
from localstack.utils.docker_utils import DOCKER_CLIENT
1✔
21
from localstack.utils.strings import short_uid
1✔
22
from localstack.utils.sync import poll_condition
1✔
23

24
LOG = logging.getLogger(__name__)
1✔
25

26
ENV_TEST_CONTAINER_MOUNT_SOURCES = "TEST_CONTAINER_MOUNT_SOURCES"
1✔
27
"""Environment variable used to indicate that we should mount LocalStack  source files into the container."""
1✔
28

29
ENV_TEST_CONTAINER_MOUNT_DEPENDENCIES = "TEST_CONTAINER_MOUNT_DEPENDENCIES"
1✔
30
"""Environment variable used to indicate that we should mount dependencies into the container."""
1✔
31

32

33
class ContainerFactory:
1✔
34
    def __init__(self):
1✔
35
        self._containers: list[Container] = []
1✔
36

37
    def __call__(
1✔
38
        self,
39
        # convenience properties
40
        pro: bool = False,
41
        publish: Optional[list[int]] = None,
42
        configurators: Optional[list[ContainerConfigurator]] = None,
43
        # ContainerConfig properties
44
        **kwargs,
45
    ) -> Container:
46
        port_configuration = PortMappings()
1✔
47
        if publish:
1✔
48
            for port in publish:
×
UNCOV
49
                port_configuration.add(port)
×
50

51
        container_configuration = ContainerConfiguration(
1✔
52
            image_name=get_docker_image_to_start(),
53
            name=None,
54
            volumes=VolumeMappings(),
55
            remove=True,
56
            ports=port_configuration,
57
            entrypoint=os.environ.get("ENTRYPOINT"),
58
            command=shlex.split(os.environ.get("CMD", "")) or None,
59
            env_vars={},
60
        )
61

62
        # handle the convenience options
63
        if pro:
1✔
64
            container_configuration.env_vars["GATEWAY_LISTEN"] = "0.0.0.0:4566,0.0.0.0:443"
×
UNCOV
65
            container_configuration.env_vars["LOCALSTACK_AUTH_TOKEN"] = os.environ.get(
×
66
                "LOCALSTACK_AUTH_TOKEN", "test"
67
            )
68

69
        # override values from kwargs
70
        for key, value in kwargs.items():
1✔
71
            setattr(container_configuration, key, value)
1✔
72

73
        container = Container(container_configuration)
1✔
74

75
        if configurators:
1✔
76
            container.configure(configurators)
1✔
77

78
        # track the container so we can remove it later
79
        self._containers.append(container)
1✔
80
        return container
1✔
81

82
    def remove_all_containers(self):
1✔
83
        failures = []
1✔
84
        for container in self._containers:
1✔
85
            if not container.running_container:
1✔
86
                # container is not running
87
                continue
1✔
88

89
            try:
1✔
90
                container.running_container.shutdown()
1✔
91
            except Exception as e:
×
UNCOV
92
                failures.append((container, e))
×
93

94
        if failures:
1✔
95
            for container, ex in failures:
×
UNCOV
96
                LOG.error(
×
97
                    "Failed to remove container %s",
98
                    container.running_container.id,
99
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
100
                )
101

102

103
class LogStreamFactory:
1✔
104
    def __init__(self):
1✔
105
        self.streams: list[CancellableStream] = []
1✔
106
        self.stop_events: list[threading.Event] = []
1✔
107
        self.mutex = threading.RLock()
1✔
108

109
    def __call__(self, container: Container, callback: Callable[[str], None] = None) -> None:
1✔
110
        """
111
        Create and start a new log stream thread. The thread starts immediately and waits for the container
112
        to move into a running state. Once it's running, it will attempt to stream the container logs. If
113
        the container is already closed by then, an exception will be raised in the thread and it will
114
        terminate.
115

116
        :param container: the container to stream the logs from
117
        :param callback: an optional callback called on each log line.
118
        """
119
        stop = threading.Event()
1✔
120
        self.stop_events.append(stop)
1✔
121

122
        def _can_continue():
1✔
123
            if stop.is_set():
1✔
UNCOV
124
                return True
×
125
            if not container.running_container:
1✔
126
                return False
1✔
127
            return container.running_container.is_running()
1✔
128

129
        def _run_stream_container_logs():
1✔
130
            # wait until either the container is running or the test was terminated
131
            poll_condition(_can_continue)
1✔
132
            with self.mutex:
1✔
133
                if stop.is_set():
1✔
UNCOV
134
                    return
×
135

136
                stream = container.running_container.stream_logs()
1✔
137
                self.streams.append(stream)
1✔
138

139
            # create a default logger
140
            if callback is None:
1✔
141
                log = logging.getLogger(f"container.{container.running_container.name}")
1✔
142
                log.setLevel(level=logging.DEBUG)
1✔
143
                _callback = log.debug
1✔
144
            else:
UNCOV
145
                _callback = callback
×
146

147
            for line in stream:
1✔
148
                _callback(line.decode("utf-8").rstrip(os.linesep))
1✔
149

150
        t = threading.Thread(
1✔
151
            target=_run_stream_container_logs,
152
            name=threading._newname("log-stream-%d"),
153
            daemon=True,
154
        )
155
        t.start()
1✔
156

157
    def close(self):
1✔
158
        with self.mutex:
1✔
159
            for _event in self.stop_events:
1✔
160
                _event.set()
1✔
161

162
        for _stream in self.streams:
1✔
163
            _stream.close()
1✔
164

165

166
@pytest.fixture
1✔
167
def container_factory() -> Generator[ContainerFactory, None, None]:
1✔
168
    factory = ContainerFactory()
1✔
169
    yield factory
1✔
170
    factory.remove_all_containers()
1✔
171

172

173
@pytest.fixture(scope="session")
1✔
174
def wait_for_localstack_ready():
1✔
175
    def _wait_for(container: RunningContainer, timeout: Optional[float] = None):
1✔
176
        container.wait_until_ready(timeout)
1✔
177

178
        poll_condition(
1✔
179
            lambda: constants.READY_MARKER_OUTPUT in container.get_logs().splitlines(),
180
            timeout=timeout,
181
        )
182

183
    return _wait_for
1✔
184

185

186
@pytest.fixture
1✔
187
def ensure_network():
1✔
188
    networks = []
1✔
189

190
    def _ensure_network(name: str):
1✔
191
        try:
1✔
192
            DOCKER_CLIENT.inspect_network(name)
1✔
193
        except NoSuchNetwork:
1✔
194
            DOCKER_CLIENT.create_network(name)
1✔
195
            networks.append(name)
1✔
196

197
    yield _ensure_network
1✔
198

199
    for network_name in networks:
1✔
200
        # detach attached containers
201
        details = DOCKER_CLIENT.inspect_network(network_name)
1✔
202
        for container_id in details.get("Containers", []):
1✔
203
            DOCKER_CLIENT.disconnect_container_from_network(
1✔
204
                network_name=network_name, container_name_or_id=container_id
205
            )
206
        DOCKER_CLIENT.delete_network(network_name)
1✔
207

208

209
@pytest.fixture
1✔
210
def docker_network(ensure_network):
1✔
211
    network_name = f"net-{short_uid()}"
1✔
212
    ensure_network(network_name)
1✔
213
    return network_name
1✔
214

215

216
@pytest.fixture
1✔
217
def dns_query_from_container(container_factory: ContainerFactory, monkeypatch):
1✔
218
    """
219
    Run the LocalStack container after installing dig
220
    """
221
    containers: list[RunningContainer] = []
1✔
222

223
    def query(name: str, ip_address: str, port: int = 53, **kwargs) -> tuple[bytes, bytes]:
1✔
224
        container = container_factory(
1✔
225
            image_name="localstack/localstack",
226
            command=["infinity"],
227
            entrypoint="sleep",
228
            **kwargs,
229
        )
230
        running_container = container.start()
1✔
231
        containers.append(running_container)
1✔
232

233
        command = [
1✔
234
            "bash",
235
            "-c",
236
            f"apt-get install -y --no-install-recommends dnsutils >/dev/null && dig +short @{ip_address} -p {port} {name}",
237
        ]
238
        # The CmdDockerClient has its output set to a logfile. We must patch
239
        # the client to ensure the output of the command goes to stdout. We use
240
        # a monkeypatch.context here to make sure the scope of the patching is
241
        # minimal.
242
        with monkeypatch.context() as m:
1✔
243
            m.setattr(running_container.container_client, "default_run_outfile", None)
1✔
244
            stdout, stderr = running_container.exec_in_container(command=command)
1✔
245
        return stdout, stderr
1✔
246

247
    yield query
1✔
248

249
    for container in containers:
1✔
250
        container.shutdown()
1✔
251

252

253
@pytest.fixture
1✔
254
def stream_container_logs() -> Generator[LogStreamFactory, None, None]:
1✔
255
    """
256
    Factory fixture for streaming logs of containers in the background. Invoke as follows::
257

258
        def test_container(container_factory, stream_container_logs):
259
            container: Container = container_factory(...)
260

261
            with container.start() as running_container:
262
                stream_container_logs(container)
263

264
    This will start a background thread that streams the container logs to a python logger
265
    ``containers.<container-name>``. You can find it in the logs as::
266

267
        2023-09-03T18:49:06.236 DEBUG --- [log-stream-1] container.localstack-5a4c3678 : foobar
268
        2023-09-03T18:49:06.236 DEBUG --- [log-stream-1] container.localstack-5a4c3678 : hello world
269

270
    The function ``stream_container_logs`` also accepts a ``callback`` argument that can be used to
271
    overwrite the default logging mechanism. For example, to print every log line directly to stdout, call::
272

273
        stream_container_logs(container, callback=print)
274

275
    :return: a factory to start log streams
276
    """
277
    factory = LogStreamFactory()
1✔
278
    yield factory
1✔
279
    factory.close()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc