• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18505123992

14 Oct 2025 05:30PM UTC coverage: 86.888% (-0.01%) from 86.899%
18505123992

push

github

web-flow
S3: fix `aws-global` validation in CreateBucket (#13250)

10 of 10 new or added lines in 4 files covered. (100.0%)

831 existing lines in 40 files now uncovered.

68028 of 78294 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.85
/localstack-core/localstack/testing/pytest/container.py
1
import logging
1✔
2
import os
1✔
3
import shlex
1✔
4
import threading
1✔
5
from collections.abc import Callable, Generator
1✔
6

7
import pytest
1✔
8

9
from localstack import constants
1✔
10
from localstack.utils.bootstrap import Container, RunningContainer, get_docker_image_to_start
1✔
11
from localstack.utils.container_utils.container_client import (
1✔
12
    CancellableStream,
13
    ContainerConfiguration,
14
    ContainerConfigurator,
15
    NoSuchNetwork,
16
    PortMappings,
17
    VolumeMappings,
18
)
19
from localstack.utils.docker_utils import DOCKER_CLIENT
1✔
20
from localstack.utils.strings import short_uid
1✔
21
from localstack.utils.sync import poll_condition
1✔
22

23
LOG = logging.getLogger(__name__)
1✔
24

25
ENV_TEST_CONTAINER_MOUNT_SOURCES = "TEST_CONTAINER_MOUNT_SOURCES"
1✔
26
"""Environment variable used to indicate that we should mount LocalStack  source files into the container."""
1✔
27

28
ENV_TEST_CONTAINER_MOUNT_DEPENDENCIES = "TEST_CONTAINER_MOUNT_DEPENDENCIES"
1✔
29
"""Environment variable used to indicate that we should mount dependencies into the container."""
1✔
30

31

32
class ContainerFactory:
1✔
33
    def __init__(self):
1✔
34
        self._containers: list[Container] = []
1✔
35

36
    def __call__(
1✔
37
        self,
38
        # convenience properties
39
        pro: bool = False,
40
        publish: list[int] | None = None,
41
        configurators: list[ContainerConfigurator] | None = None,
42
        # ContainerConfig properties
43
        **kwargs,
44
    ) -> Container:
45
        port_configuration = PortMappings()
1✔
46
        if publish:
1✔
UNCOV
47
            for port in publish:
×
48
                port_configuration.add(port)
×
49

50
        container_configuration = ContainerConfiguration(
1✔
51
            image_name=get_docker_image_to_start(),
52
            name=None,
53
            volumes=VolumeMappings(),
54
            remove=True,
55
            ports=port_configuration,
56
            entrypoint=os.environ.get("ENTRYPOINT"),
57
            command=shlex.split(os.environ.get("CMD", "")) or None,
58
            env_vars={},
59
        )
60

61
        # handle the convenience options
62
        if pro:
1✔
UNCOV
63
            container_configuration.env_vars["GATEWAY_LISTEN"] = "0.0.0.0:4566,0.0.0.0:443"
×
64
            container_configuration.env_vars["LOCALSTACK_AUTH_TOKEN"] = os.environ.get(
×
65
                "LOCALSTACK_AUTH_TOKEN", "test"
66
            )
67

68
        # override values from kwargs
69
        for key, value in kwargs.items():
1✔
70
            setattr(container_configuration, key, value)
1✔
71

72
        container = Container(container_configuration)
1✔
73

74
        if configurators:
1✔
75
            container.configure(configurators)
1✔
76

77
        # track the container so we can remove it later
78
        self._containers.append(container)
1✔
79
        return container
1✔
80

81
    def remove_all_containers(self):
1✔
82
        failures = []
1✔
83
        for container in self._containers:
1✔
84
            if not container.running_container:
1✔
85
                # container is not running
86
                continue
1✔
87

88
            try:
1✔
89
                container.running_container.shutdown()
1✔
UNCOV
90
            except Exception as e:
×
91
                failures.append((container, e))
×
92

93
        if failures:
1✔
UNCOV
94
            for container, ex in failures:
×
95
                LOG.error(
×
96
                    "Failed to remove container %s",
97
                    container.running_container.id,
98
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
99
                )
100

101

102
class LogStreamFactory:
1✔
103
    def __init__(self):
1✔
104
        self.streams: list[CancellableStream] = []
1✔
105
        self.stop_events: list[threading.Event] = []
1✔
106
        self.mutex = threading.RLock()
1✔
107

108
    def __call__(self, container: Container, callback: Callable[[str], None] = None) -> None:
1✔
109
        """
110
        Create and start a new log stream thread. The thread starts immediately and waits for the container
111
        to move into a running state. Once it's running, it will attempt to stream the container logs. If
112
        the container is already closed by then, an exception will be raised in the thread and it will
113
        terminate.
114

115
        :param container: the container to stream the logs from
116
        :param callback: an optional callback called on each log line.
117
        """
118
        stop = threading.Event()
1✔
119
        self.stop_events.append(stop)
1✔
120

121
        def _can_continue():
1✔
122
            if stop.is_set():
1✔
UNCOV
123
                return True
×
124
            if not container.running_container:
1✔
125
                return False
1✔
126
            return container.running_container.is_running()
1✔
127

128
        def _run_stream_container_logs():
1✔
129
            # wait until either the container is running or the test was terminated
130
            poll_condition(_can_continue)
1✔
131
            with self.mutex:
1✔
132
                if stop.is_set():
1✔
UNCOV
133
                    return
×
134

135
                stream = container.running_container.stream_logs()
1✔
136
                self.streams.append(stream)
1✔
137

138
            # create a default logger
139
            if callback is None:
1✔
140
                log = logging.getLogger(f"container.{container.running_container.name}")
1✔
141
                log.setLevel(level=logging.DEBUG)
1✔
142
                _callback = log.debug
1✔
143
            else:
UNCOV
144
                _callback = callback
×
145

146
            for line in stream:
1✔
147
                _callback(line.decode("utf-8").rstrip(os.linesep))
1✔
148

149
        t = threading.Thread(
1✔
150
            target=_run_stream_container_logs,
151
            name=threading._newname("log-stream-%d"),
152
            daemon=True,
153
        )
154
        t.start()
1✔
155

156
    def close(self):
1✔
157
        with self.mutex:
1✔
158
            for _event in self.stop_events:
1✔
159
                _event.set()
1✔
160

161
        for _stream in self.streams:
1✔
162
            _stream.close()
1✔
163

164

165
@pytest.fixture
1✔
166
def container_factory() -> Generator[ContainerFactory, None, None]:
1✔
167
    factory = ContainerFactory()
1✔
168
    yield factory
1✔
169
    factory.remove_all_containers()
1✔
170

171

172
@pytest.fixture(scope="session")
1✔
173
def wait_for_localstack_ready():
1✔
174
    def _wait_for(container: RunningContainer, timeout: float | None = None):
1✔
175
        container.wait_until_ready(timeout)
1✔
176

177
        poll_condition(
1✔
178
            lambda: constants.READY_MARKER_OUTPUT in container.get_logs().splitlines(),
179
            timeout=timeout,
180
        )
181

182
    return _wait_for
1✔
183

184

185
@pytest.fixture
1✔
186
def ensure_network():
1✔
187
    networks = []
1✔
188

189
    def _ensure_network(name: str):
1✔
190
        try:
1✔
191
            DOCKER_CLIENT.inspect_network(name)
1✔
192
        except NoSuchNetwork:
1✔
193
            DOCKER_CLIENT.create_network(name)
1✔
194
            networks.append(name)
1✔
195

196
    yield _ensure_network
1✔
197

198
    for network_name in networks:
1✔
199
        # detach attached containers
200
        details = DOCKER_CLIENT.inspect_network(network_name)
1✔
201
        for container_id in details.get("Containers", []):
1✔
202
            DOCKER_CLIENT.disconnect_container_from_network(
1✔
203
                network_name=network_name, container_name_or_id=container_id
204
            )
205
        DOCKER_CLIENT.delete_network(network_name)
1✔
206

207

208
@pytest.fixture
1✔
209
def docker_network(ensure_network):
1✔
210
    network_name = f"net-{short_uid()}"
1✔
211
    ensure_network(network_name)
1✔
212
    return network_name
1✔
213

214

215
@pytest.fixture
1✔
216
def dns_query_from_container(container_factory: ContainerFactory, monkeypatch):
1✔
217
    """
218
    Run the LocalStack container after installing dig
219
    """
220
    containers: list[RunningContainer] = []
1✔
221

222
    def query(name: str, ip_address: str, port: int = 53, **kwargs) -> tuple[bytes, bytes]:
1✔
223
        container = container_factory(
1✔
224
            image_name="localstack/localstack",
225
            command=["infinity"],
226
            entrypoint="sleep",
227
            **kwargs,
228
        )
229
        running_container = container.start()
1✔
230
        containers.append(running_container)
1✔
231

232
        command = [
1✔
233
            "bash",
234
            "-c",
235
            f"apt-get install -y --no-install-recommends dnsutils >/dev/null && dig +short @{ip_address} -p {port} {name}",
236
        ]
237
        # The CmdDockerClient has its output set to a logfile. We must patch
238
        # the client to ensure the output of the command goes to stdout. We use
239
        # a monkeypatch.context here to make sure the scope of the patching is
240
        # minimal.
241
        with monkeypatch.context() as m:
1✔
242
            m.setattr(running_container.container_client, "default_run_outfile", None)
1✔
243
            stdout, stderr = running_container.exec_in_container(command=command)
1✔
244
        return stdout, stderr
1✔
245

246
    yield query
1✔
247

248
    for container in containers:
1✔
249
        container.shutdown()
1✔
250

251

252
@pytest.fixture
1✔
253
def stream_container_logs() -> Generator[LogStreamFactory, None, None]:
1✔
254
    """
255
    Factory fixture for streaming logs of containers in the background. Invoke as follows::
256

257
        def test_container(container_factory, stream_container_logs):
258
            container: Container = container_factory(...)
259

260
            with container.start() as running_container:
261
                stream_container_logs(container)
262

263
    This will start a background thread that streams the container logs to a python logger
264
    ``containers.<container-name>``. You can find it in the logs as::
265

266
        2023-09-03T18:49:06.236 DEBUG --- [log-stream-1] container.localstack-5a4c3678 : foobar
267
        2023-09-03T18:49:06.236 DEBUG --- [log-stream-1] container.localstack-5a4c3678 : hello world
268

269
    The function ``stream_container_logs`` also accepts a ``callback`` argument that can be used to
270
    overwrite the default logging mechanism. For example, to print every log line directly to stdout, call::
271

272
        stream_container_logs(container, callback=print)
273

274
    :return: a factory to start log streams
275
    """
276
    factory = LogStreamFactory()
1✔
277
    yield factory
1✔
278
    factory.close()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc