• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22209548116

19 Feb 2026 02:08PM UTC coverage: 86.964% (-0.04%) from 87.003%
22209548116

push

github

web-flow
Logs: fix snapshot region from tests (#13792)

69755 of 80211 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.11
/localstack-core/localstack/utils/container_utils/docker_sdk_client.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import queue
1✔
6
import re
1✔
7
import socket
1✔
8
import threading
1✔
9
from collections.abc import Callable
1✔
10
from functools import cache
1✔
11
from time import sleep
1✔
12
from typing import cast
1✔
13
from urllib.parse import quote
1✔
14

15
import docker
1✔
16
from docker import DockerClient
1✔
17
from docker.errors import APIError, ContainerError, DockerException, ImageNotFound, NotFound
1✔
18
from docker.models.containers import Container
1✔
19
from docker.types import LogConfig as DockerLogConfig
1✔
20
from docker.utils.socket import STDERR, STDOUT, frames_iter
1✔
21

22
from localstack.config import LS_LOG
1✔
23
from localstack.constants import TRACE_LOG_LEVELS
1✔
24
from localstack.utils.collections import ensure_list
1✔
25
from localstack.utils.container_utils.container_client import (
1✔
26
    AccessDenied,
27
    CancellableStream,
28
    ContainerClient,
29
    ContainerException,
30
    DockerContainerStats,
31
    DockerContainerStatus,
32
    DockerNotAvailable,
33
    DockerPlatform,
34
    LogConfig,
35
    NoSuchContainer,
36
    NoSuchImage,
37
    NoSuchNetwork,
38
    PortMappings,
39
    RegistryConnectionError,
40
    SimpleVolumeBind,
41
    Ulimit,
42
    Util,
43
)
44
from localstack.utils.strings import to_bytes, to_str
1✔
45
from localstack.utils.threads import start_worker_thread
1✔
46

47
LOG = logging.getLogger(__name__)
1✔
48
SDK_ISDIR = 1 << 31
1✔
49

50

51
class SdkDockerClient(ContainerClient):
1✔
52
    """
53
    Class for managing Docker (or Podman) using the Python Docker SDK.
54

55
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
56
    for Docker these days (with ongoing efforts to further streamline the two), and the Docker SDK
57
    is doing some of the heavy lifting for us to support both target platforms.
58
    """
59

60
    docker_client: DockerClient | None
1✔
61

62
    def __init__(self):
1✔
63
        try:
1✔
64
            self.docker_client = self._create_client()
1✔
65
            logging.getLogger("urllib3").setLevel(logging.INFO)
1✔
66
        except DockerNotAvailable:
1✔
67
            self.docker_client = None
1✔
68

69
    def client(self):
1✔
70
        if self.docker_client:
1✔
71
            return self.docker_client
1✔
72
        # if the initialization failed before, try to initialize on-demand
73
        self.docker_client = self._create_client()
1✔
74
        return self.docker_client
1✔
75

76
    @staticmethod
1✔
77
    def _create_client():
1✔
78
        from localstack.config import DOCKER_SDK_DEFAULT_RETRIES, DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS
1✔
79

80
        for attempt in range(0, DOCKER_SDK_DEFAULT_RETRIES + 1):
1✔
81
            try:
1✔
82
                return docker.from_env(timeout=DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS)
1✔
83
            except DockerException as e:
1✔
84
                LOG.debug(
1✔
85
                    "Creating Docker SDK client failed: %s. "
86
                    "If you want to use Docker as container runtime, make sure to mount the socket at /var/run/docker.sock",
87
                    e,
88
                    exc_info=LS_LOG in TRACE_LOG_LEVELS,
89
                )
90
                if attempt < DOCKER_SDK_DEFAULT_RETRIES:
1✔
91
                    # wait for a second before retrying
92
                    sleep(1)
1✔
93
                else:
94
                    # we are out of attempts
95
                    raise DockerNotAvailable("Docker not available") from e
1✔
96

97
    def _read_from_sock(self, sock: socket, tty: bool):
1✔
98
        """Reads multiplexed messages from a socket returned by attach_socket.
99

100
        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
101
        """
102
        stdout = b""
1✔
103
        stderr = b""
1✔
104
        for frame_type, frame_data in frames_iter(sock, tty):
1✔
105
            if frame_type == STDOUT:
1✔
106
                stdout += frame_data
1✔
107
            elif frame_type == STDERR:
1✔
108
                stderr += frame_data
1✔
109
            else:
110
                raise ContainerException("Invalid frame type when reading from socket")
×
111
        return stdout, stderr
1✔
112

113
    def _container_path_info(self, container: Container, container_path: str):
1✔
114
        """
115
        Get information about a path in the given container
116
        :param container: Container to be inspected
117
        :param container_path: Path in container
118
        :return: Tuple (path_exists, path_is_directory)
119
        """
120
        # Docker CLI copy uses go FileMode to determine if target is a dict or not
121
        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260
122
        # The isDir Bit is the most significant bit in the 32bit struct:
123
        # https://golang.org/src/os/types.go?s=2650:2683
124
        api_client = self.client().api
1✔
125

126
        def _head(path_suffix, **kwargs):
1✔
127
            return api_client.head(
1✔
128
                api_client.base_url + path_suffix, **api_client._set_request_timeout(kwargs)
129
            )
130

131
        escaped_id = quote(container.id, safe="/:")
1✔
132
        result = _head(f"/containers/{escaped_id}/archive", params={"path": container_path})
1✔
133
        stats = result.headers.get("X-Docker-Container-Path-Stat")
1✔
134
        target_exists = result.ok
1✔
135

136
        if target_exists:
1✔
137
            stats = json.loads(base64.b64decode(stats).decode("utf-8"))
1✔
138
        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)
1✔
139
        return target_exists, target_is_dir
1✔
140

141
    def get_system_info(self) -> dict:
1✔
142
        return self.client().info()
1✔
143

144
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
145
        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose
146
        try:
1✔
147
            container = self.client().containers.get(container_name)
1✔
148
            if container.status == "running":
1✔
149
                return DockerContainerStatus.UP
1✔
150
            elif container.status == "paused":
1✔
151
                return DockerContainerStatus.PAUSED
1✔
152
            else:
153
                return DockerContainerStatus.DOWN
1✔
154
        except NotFound:
1✔
155
            return DockerContainerStatus.NON_EXISTENT
1✔
156
        except APIError as e:
×
157
            raise ContainerException() from e
×
158

159
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
160
        try:
1✔
161
            container = self.client().containers.get(container_name)
1✔
162
            sdk_stats = container.stats(stream=False)
1✔
163

164
            # BlockIO: (Read, Write) bytes
165
            read_bytes = 0
1✔
166
            write_bytes = 0
1✔
167
            for entry in (
1✔
168
                sdk_stats.get("blkio_stats", {}).get("io_service_bytes_recursive", []) or []
169
            ):
170
                if entry.get("op") == "read":
×
171
                    read_bytes += entry.get("value", 0)
×
172
                elif entry.get("op") == "write":
×
173
                    write_bytes += entry.get("value", 0)
×
174

175
            # CPU percentage
176
            cpu_stats = sdk_stats.get("cpu_stats", {})
1✔
177
            precpu_stats = sdk_stats.get("precpu_stats", {})
1✔
178

179
            cpu_delta = cpu_stats.get("cpu_usage", {}).get("total_usage", 0) - precpu_stats.get(
1✔
180
                "cpu_usage", {}
181
            ).get("total_usage", 0)
182

183
            system_delta = cpu_stats.get("system_cpu_usage", 0) - precpu_stats.get(
1✔
184
                "system_cpu_usage", 0
185
            )
186

187
            online_cpus = cpu_stats.get("online_cpus", 1)
1✔
188
            cpu_percent = (
1✔
189
                (cpu_delta / system_delta * 100.0 * online_cpus) if system_delta > 0 else 0.0
190
            )
191

192
            # Memory (usage, limit) bytes
193
            memory_stats = sdk_stats.get("memory_stats", {})
1✔
194
            mem_usage = memory_stats.get("usage", 0)
1✔
195
            mem_limit = memory_stats.get("limit", 1)  # Prevent division by zero
1✔
196
            mem_inactive = memory_stats.get("stats", {}).get("inactive_file", 0)
1✔
197
            used_memory = max(0, mem_usage - mem_inactive)
1✔
198
            mem_percent = (used_memory / mem_limit * 100.0) if mem_limit else 0.0
1✔
199

200
            # Network IO
201
            net_rx = 0
1✔
202
            net_tx = 0
1✔
203
            for iface in sdk_stats.get("networks", {}).values():
1✔
204
                net_rx += iface.get("rx_bytes", 0)
1✔
205
                net_tx += iface.get("tx_bytes", 0)
1✔
206

207
            # Container ID
208
            container_id = sdk_stats.get("id", "")[:12]
1✔
209
            name = sdk_stats.get("name", "").lstrip("/")
1✔
210

211
            return DockerContainerStats(
1✔
212
                Container=container_id,
213
                ID=container_id,
214
                Name=name,
215
                BlockIO=(read_bytes, write_bytes),
216
                CPUPerc=round(cpu_percent, 2),
217
                MemPerc=round(mem_percent, 2),
218
                MemUsage=(used_memory, mem_limit),
219
                NetIO=(net_rx, net_tx),
220
                PIDs=sdk_stats.get("pids_stats", {}).get("current", 0),
221
                SDKStats=sdk_stats,  # keep the raw stats for more detailed information
222
            )
223
        except NotFound:
×
224
            raise NoSuchContainer(container_name)
×
225
        except APIError as e:
×
226
            raise ContainerException() from e
×
227

228
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
229
        LOG.debug("Stopping container: %s", container_name)
1✔
230
        try:
1✔
231
            container = self.client().containers.get(container_name)
1✔
232
            container.stop(timeout=timeout)
1✔
233
        except NotFound:
1✔
234
            raise NoSuchContainer(container_name)
1✔
235
        except APIError as e:
×
236
            raise ContainerException() from e
×
237

238
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
239
        LOG.debug("Restarting container: %s", container_name)
1✔
240
        try:
1✔
241
            container = self.client().containers.get(container_name)
1✔
242
            container.restart(timeout=timeout)
1✔
243
        except NotFound:
1✔
244
            raise NoSuchContainer(container_name)
1✔
245
        except APIError as e:
×
246
            raise ContainerException() from e
×
247

248
    def pause_container(self, container_name: str) -> None:
1✔
249
        LOG.debug("Pausing container: %s", container_name)
1✔
250
        try:
1✔
251
            container = self.client().containers.get(container_name)
1✔
252
            container.pause()
1✔
253
        except NotFound:
1✔
254
            raise NoSuchContainer(container_name)
1✔
255
        except APIError as e:
×
256
            raise ContainerException() from e
×
257

258
    def unpause_container(self, container_name: str) -> None:
1✔
259
        LOG.debug("Unpausing container: %s", container_name)
1✔
260
        try:
1✔
261
            container = self.client().containers.get(container_name)
1✔
262
            container.unpause()
1✔
263
        except NotFound:
×
264
            raise NoSuchContainer(container_name)
×
265
        except APIError as e:
×
266
            raise ContainerException() from e
×
267

268
    def remove_container(
1✔
269
        self, container_name: str, force=True, check_existence=False, volumes=False
270
    ) -> None:
271
        LOG.debug("Removing container: %s, with volumes: %s", container_name, volumes)
1✔
272
        if check_existence and container_name not in self.get_all_container_names():
1✔
273
            LOG.debug("Aborting removing due to check_existence check")
×
274
            return
×
275
        try:
1✔
276
            container = self.client().containers.get(container_name)
1✔
277
            container.remove(force=force, v=volumes)
1✔
278
        except NotFound:
1✔
279
            if not force:
1✔
280
                raise NoSuchContainer(container_name)
1✔
281
        except APIError as e:
×
282
            raise ContainerException() from e
×
283

284
    def list_containers(self, filter: list[str] | str | None = None, all=True) -> list[dict]:
1✔
285
        if filter:
1✔
286
            filter = [filter] if isinstance(filter, str) else filter
1✔
287
            filter = dict([f.split("=", 1) for f in filter])
1✔
288
        LOG.debug("Listing containers with filters: %s", filter)
1✔
289
        try:
1✔
290
            container_list = self.client().containers.list(filters=filter, all=all)
1✔
291
            result = []
1✔
292
            for container in container_list:
1✔
293
                try:
1✔
294
                    result.append(
1✔
295
                        {
296
                            "id": container.id,
297
                            "image": container.image,
298
                            "name": container.name,
299
                            "status": container.status,
300
                            "labels": container.labels,
301
                        }
302
                    )
303
                except Exception as e:
×
304
                    LOG.error("Error checking container %s: %s", container, e)
×
305
            return result
1✔
306
        except APIError as e:
1✔
307
            raise ContainerException() from e
1✔
308

309
    def copy_into_container(
1✔
310
        self, container_name: str, local_path: str, container_path: str
311
    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/
312
        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)
1✔
313
        try:
1✔
314
            container = self.client().containers.get(container_name)
1✔
315
            target_exists, target_isdir = self._container_path_info(container, container_path)
1✔
316
            target_path = container_path if target_isdir else os.path.dirname(container_path)
1✔
317
            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:
1✔
318
                container.put_archive(target_path, tar)
1✔
319
        except NotFound:
1✔
320
            raise NoSuchContainer(container_name)
1✔
321
        except APIError as e:
1✔
322
            raise ContainerException() from e
×
323

324
    def copy_from_container(
1✔
325
        self,
326
        container_name: str,
327
        local_path: str,
328
        container_path: str,
329
    ) -> None:
330
        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)
1✔
331
        try:
1✔
332
            container = self.client().containers.get(container_name)
1✔
333
            bits, _ = container.get_archive(container_path)
1✔
334
            Util.untar_to_path(bits, local_path)
1✔
335
        except NotFound:
1✔
336
            raise NoSuchContainer(container_name)
1✔
337
        except APIError as e:
×
338
            raise ContainerException() from e
×
339

340
    def pull_image(
1✔
341
        self,
342
        docker_image: str,
343
        platform: DockerPlatform | None = None,
344
        log_handler: Callable[[str], None] | None = None,
345
        auth_config: dict[str, str] | None = None,
346
    ) -> None:
347
        LOG.debug("Pulling Docker image: %s", docker_image)
1✔
348
        # some path in the docker image string indicates a custom repository
349

350
        docker_image = self.registry_resolver_strategy.resolve(docker_image)
1✔
351
        kwargs: dict[str, str | bool | dict[str, str]] = {"platform": platform}
1✔
352
        if auth_config:
1✔
353
            kwargs["auth_config"] = auth_config
1✔
354
        try:
1✔
355
            if log_handler:
1✔
356
                # Use a lower-level API, as the 'stream' argument is not available in the higher-level `pull`-API
357
                kwargs["stream"] = True
1✔
358
                stream = self.client().api.pull(docker_image, **kwargs)
1✔
359
                for line in stream:
1✔
360
                    log_handler(to_str(line))
1✔
361
            else:
362
                self.client().images.pull(docker_image, **kwargs)
1✔
363
        except ImageNotFound:
1✔
364
            raise NoSuchImage(docker_image)
1✔
365
        except APIError as e:
1✔
366
            raise ContainerException() from e
1✔
367

368
    def push_image(self, docker_image: str, auth_config: dict[str, str] | None = None) -> None:
1✔
369
        LOG.debug("Pushing Docker image: %s", docker_image)
1✔
370
        kwargs: dict[str, dict[str, str]] = {}
1✔
371
        if auth_config:
1✔
372
            kwargs["auth_config"] = auth_config
1✔
373
        try:
1✔
374
            result = self.client().images.push(docker_image, **kwargs)
1✔
375
            # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising
376
            if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):
1✔
377
                if "image does not exist locally" in to_str(result):
1✔
378
                    raise NoSuchImage(docker_image)
1✔
379
                if "is denied" in to_str(result):
1✔
380
                    raise AccessDenied(docker_image)
1✔
381
                if "requesting higher privileges than access token allows" in to_str(result):
1✔
382
                    raise AccessDenied(docker_image)
×
383
                if "access token has insufficient scopes" in to_str(result):
1✔
384
                    raise AccessDenied(docker_image)
×
385
                if "authorization failed: no basic auth credentials" in to_str(result):
1✔
386
                    raise AccessDenied(docker_image)
×
387
                if "401 Unauthorized" in to_str(result):
1✔
388
                    raise AccessDenied(docker_image)
×
389
                if "no basic auth credentials" in to_str(result):
1✔
390
                    raise AccessDenied(docker_image)
1✔
391
                if "unauthorized: authentication required" in to_str(result):
1✔
392
                    raise AccessDenied(docker_image)
×
393
                if "insufficient_scope: authorization failed" in to_str(result):
1✔
394
                    raise AccessDenied(docker_image)
×
395
                if "connection refused" in to_str(result):
1✔
396
                    raise RegistryConnectionError(result)
1✔
397
                if "failed to do request:" in to_str(result):
×
398
                    raise RegistryConnectionError(result)
×
399
                raise ContainerException(result)
×
400
        except ImageNotFound:
1✔
401
            raise NoSuchImage(docker_image)
×
402
        except APIError as e:
1✔
403
            # note: error message 'image not known' raised by Podman API
404
            if "image not known" in str(e):
×
405
                raise NoSuchImage(docker_image)
×
406
            raise ContainerException() from e
×
407

408
    def build_image(
1✔
409
        self,
410
        dockerfile_path: str,
411
        image_name: str,
412
        context_path: str = None,
413
        platform: DockerPlatform | None = None,
414
    ):
415
        try:
1✔
416
            dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
1✔
417
            context_path = context_path or os.path.dirname(dockerfile_path)
1✔
418
            LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)
1✔
419
            _, logs_iterator = self.client().images.build(
1✔
420
                path=context_path,
421
                dockerfile=dockerfile_path,
422
                tag=image_name,
423
                rm=True,
424
                platform=platform,
425
            )
426
            # logs_iterator is a stream of dicts. Example content:
427
            # {'stream': 'Step 1/4 : FROM alpine'}
428
            # ... other build steps
429
            # {'aux': {'ID': 'sha256:4dcf90e87fb963e898f9c7a0451a40e36f8e7137454c65ae4561277081747825'}}
430
            # {'stream': 'Successfully tagged img-5201f3e1:latest\n'}
431
            output = ""
1✔
432
            for log in logs_iterator:
1✔
433
                if isinstance(log, dict) and ("stream" in log or "error" in log):
1✔
434
                    output += log.get("stream") or log["error"]
1✔
435
            return output
1✔
436
        except APIError as e:
×
437
            raise ContainerException("Unable to build Docker image") from e
×
438

439
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
440
        try:
1✔
441
            LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)
1✔
442
            image = self.client().images.get(source_ref)
1✔
443
            image.tag(target_name)
1✔
444
        except APIError as e:
1✔
445
            if e.status_code == 404:
1✔
446
                raise NoSuchImage(source_ref)
1✔
447
            raise ContainerException("Unable to tag Docker image") from e
×
448

449
    def get_docker_image_names(
1✔
450
        self,
451
        strip_latest: bool = True,
452
        include_tags: bool = True,
453
        strip_wellknown_repo_prefixes: bool = True,
454
    ):
455
        try:
1✔
456
            images = self.client().images.list()
1✔
457
            image_names = [tag for image in images for tag in image.tags if image.tags]
1✔
458
            if not include_tags:
1✔
459
                image_names = [image_name.rpartition(":")[0] for image_name in image_names]
1✔
460
            if strip_wellknown_repo_prefixes:
1✔
461
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
1✔
462
            if strip_latest:
1✔
463
                Util.append_without_latest(image_names)
1✔
464
            return image_names
1✔
465
        except APIError as e:
×
466
            raise ContainerException() from e
×
467

468
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
469
        try:
1✔
470
            container = self.client().containers.get(container_name_or_id)
1✔
471
            return to_str(container.logs())
1✔
472
        except NotFound:
1✔
473
            if safe:
1✔
474
                return ""
1✔
475
            raise NoSuchContainer(container_name_or_id)
1✔
476
        except APIError as e:
1✔
477
            if safe:
1✔
478
                return ""
×
479
            raise ContainerException() from e
1✔
480

481
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
482
        try:
1✔
483
            container = self.client().containers.get(container_name_or_id)
1✔
484
            return container.logs(stream=True, follow=True)
1✔
485
        except NotFound:
1✔
486
            raise NoSuchContainer(container_name_or_id)
1✔
487
        except APIError as e:
×
488
            raise ContainerException() from e
×
489

490
    def inspect_container(self, container_name_or_id: str) -> dict[str, dict | str]:
1✔
491
        try:
1✔
492
            return self.client().containers.get(container_name_or_id).attrs
1✔
493
        except NotFound:
1✔
494
            raise NoSuchContainer(container_name_or_id)
1✔
495
        except APIError as e:
×
496
            raise ContainerException() from e
×
497

498
    def inspect_image(
1✔
499
        self,
500
        image_name: str,
501
        pull: bool = True,
502
        strip_wellknown_repo_prefixes: bool = True,
503
    ) -> dict[str, dict | list | str]:
504
        image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
505
        try:
1✔
506
            result = self.client().images.get(image_name).attrs
1✔
507
            if strip_wellknown_repo_prefixes:
1✔
508
                if result.get("RepoDigests"):
1✔
509
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
1✔
510
                        result["RepoDigests"]
511
                    )
512
                if result.get("RepoTags"):
1✔
513
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
1✔
514
            return result
1✔
515
        except NotFound:
1✔
516
            if pull:
1✔
517
                self.pull_image(image_name)
1✔
518
                return self.inspect_image(image_name, pull=False)
1✔
519
            raise NoSuchImage(image_name)
1✔
520
        except APIError as e:
×
521
            raise ContainerException() from e
×
522

523
    def create_network(self, network_name: str) -> None:
1✔
524
        try:
1✔
525
            return self.client().networks.create(name=network_name).id
1✔
526
        except APIError as e:
×
527
            raise ContainerException() from e
×
528

529
    def delete_network(self, network_name: str) -> None:
1✔
530
        try:
1✔
531
            return self.client().networks.get(network_name).remove()
1✔
532
        except NotFound:
×
533
            raise NoSuchNetwork(network_name)
×
534
        except APIError as e:
×
535
            raise ContainerException() from e
×
536

537
    def inspect_network(self, network_name: str) -> dict[str, dict | str]:
1✔
538
        try:
1✔
539
            return self.client().networks.get(network_name).attrs
1✔
540
        except NotFound:
1✔
541
            raise NoSuchNetwork(network_name)
1✔
542
        except APIError as e:
×
543
            raise ContainerException() from e
×
544

545
    def connect_container_to_network(
1✔
546
        self,
547
        network_name: str,
548
        container_name_or_id: str,
549
        aliases: list | None = None,
550
        link_local_ips: list[str] = None,
551
    ) -> None:
552
        LOG.debug(
1✔
553
            "Connecting container '%s' to network '%s' with aliases '%s'",
554
            container_name_or_id,
555
            network_name,
556
            aliases,
557
        )
558
        try:
1✔
559
            network = self.client().networks.get(network_name)
1✔
560
        except NotFound:
1✔
561
            raise NoSuchNetwork(network_name)
1✔
562
        try:
1✔
563
            network.connect(
1✔
564
                container=container_name_or_id,
565
                aliases=aliases,
566
                link_local_ips=link_local_ips,
567
            )
568
        except NotFound:
1✔
569
            raise NoSuchContainer(container_name_or_id)
1✔
570
        except APIError as e:
×
571
            raise ContainerException() from e
×
572

573
    def disconnect_container_from_network(
1✔
574
        self, network_name: str, container_name_or_id: str
575
    ) -> None:
576
        LOG.debug(
1✔
577
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
578
        )
579
        try:
1✔
580
            try:
1✔
581
                network = self.client().networks.get(network_name)
1✔
582
            except NotFound:
1✔
583
                raise NoSuchNetwork(network_name)
1✔
584
            try:
1✔
585
                network.disconnect(container_name_or_id)
1✔
586
            except NotFound:
1✔
587
                raise NoSuchContainer(container_name_or_id)
1✔
588
        except APIError as e:
1✔
589
            raise ContainerException() from e
×
590

591
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
592
        networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]
1✔
593
        network_names = list(networks)
1✔
594
        if len(network_names) > 1:
1✔
595
            LOG.info("Container has more than one assigned network. Picking the first one...")
×
596
        return networks[network_names[0]]["IPAddress"]
1✔
597

598
    @cache
1✔
599
    def has_docker(self) -> bool:
1✔
600
        try:
1✔
601
            if not self.docker_client:
1✔
602
                return False
×
603
            self.client().ping()
1✔
604
            return True
1✔
605
        except APIError:
×
606
            return False
×
607

608
    def remove_image(self, image: str, force: bool = True):
1✔
609
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
1✔
610
        try:
1✔
611
            self.client().images.remove(image=image, force=force)
1✔
612
        except ImageNotFound:
1✔
613
            if not force:
1✔
614
                raise NoSuchImage(image)
1✔
615
        except APIError as e:
×
616
            if "image not known" in str(e):
×
617
                raise NoSuchImage(image)
×
618
            raise ContainerException() from e
×
619

620
    def commit(
1✔
621
        self,
622
        container_name_or_id: str,
623
        image_name: str,
624
        image_tag: str,
625
    ):
626
        LOG.debug(
1✔
627
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
628
        )
629
        try:
1✔
630
            container = self.client().containers.get(container_name_or_id)
1✔
631
            container.commit(repository=image_name, tag=image_tag)
1✔
632
        except NotFound:
1✔
633
            raise NoSuchContainer(container_name_or_id)
1✔
634
        except APIError as e:
×
635
            raise ContainerException() from e
×
636

637
    def start_container(
1✔
638
        self,
639
        container_name_or_id: str,
640
        stdin=None,
641
        interactive: bool = False,
642
        attach: bool = False,
643
        flags: str | None = None,
644
    ) -> tuple[bytes, bytes]:
645
        LOG.debug("Starting container %s", container_name_or_id)
1✔
646
        try:
1✔
647
            container = self.client().containers.get(container_name_or_id)
1✔
648
            stdout = to_bytes(container_name_or_id)
1✔
649
            stderr = b""
1✔
650
            if interactive or attach:
1✔
651
                params = {"stdout": 1, "stderr": 1, "stream": 1}
1✔
652
                if interactive:
1✔
653
                    params["stdin"] = 1
1✔
654
                sock = container.attach_socket(params=params)
1✔
655
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
656
                result_queue = queue.Queue()
1✔
657
                thread_started = threading.Event()
1✔
658
                start_waiting = threading.Event()
1✔
659

660
                # Note: We need to be careful about potential race conditions here - .wait() should happen right
661
                #   after .start(). Hence starting a thread and asynchronously waiting for the container exit code
662
                def wait_for_result(*_):
1✔
663
                    _exit_code = -1
1✔
664
                    try:
1✔
665
                        thread_started.set()
1✔
666
                        start_waiting.wait()
1✔
667
                        _exit_code = container.wait()["StatusCode"]
1✔
668
                    except APIError as e:
×
669
                        _exit_code = 1
×
670
                        raise ContainerException(str(e))
×
671
                    finally:
672
                        result_queue.put(_exit_code)
1✔
673

674
                # start listener thread
675
                start_worker_thread(wait_for_result)
1✔
676
                thread_started.wait()
1✔
677
                try:
1✔
678
                    # start container
679
                    container.start()
1✔
680
                finally:
681
                    # start awaiting container result
682
                    start_waiting.set()
1✔
683

684
                # handle container input/output
685
                # under windows, the socket has no __enter__ / cannot be used as context manager
686
                # therefore try/finally instead of with here
687
                try:
1✔
688
                    if stdin:
1✔
689
                        sock.sendall(to_bytes(stdin))
1✔
690
                        sock.shutdown(socket.SHUT_WR)
1✔
691
                    stdout, stderr = self._read_from_sock(sock, False)
1✔
692
                except TimeoutError:
×
693
                    LOG.debug(
×
694
                        "Socket timeout when talking to the I/O streams of Docker container '%s'",
695
                        container_name_or_id,
696
                    )
697
                finally:
698
                    sock.close()
1✔
699

700
                # get container exit code
701
                exit_code = result_queue.get()
1✔
702
                if exit_code:
1✔
703
                    raise ContainerException(
1✔
704
                        f"Docker container returned with exit code {exit_code}",
705
                        stdout=stdout,
706
                        stderr=stderr,
707
                    )
708
            else:
709
                container.start()
1✔
710
            return stdout, stderr
1✔
711
        except NotFound:
1✔
712
            raise NoSuchContainer(container_name_or_id)
1✔
713
        except APIError as e:
1✔
714
            raise ContainerException() from e
1✔
715

716
    def attach_to_container(self, container_name_or_id: str):
1✔
717
        client: DockerClient = self.client()
×
718
        container = cast(Container, client.containers.get(container_name_or_id))
×
719
        container.attach()
×
720

721
    def create_container(
1✔
722
        self,
723
        image_name: str,
724
        *,
725
        name: str | None = None,
726
        entrypoint: list[str] | str | None = None,
727
        remove: bool = False,
728
        interactive: bool = False,
729
        tty: bool = False,
730
        detach: bool = False,
731
        command: list[str] | str | None = None,
732
        volumes: list[SimpleVolumeBind] | None = None,
733
        ports: PortMappings | None = None,
734
        exposed_ports: list[str] | None = None,
735
        env_vars: dict[str, str] | None = None,
736
        user: str | None = None,
737
        cap_add: list[str] | None = None,
738
        cap_drop: list[str] | None = None,
739
        security_opt: list[str] | None = None,
740
        network: str | None = None,
741
        dns: str | list[str] | None = None,
742
        additional_flags: str | None = None,
743
        workdir: str | None = None,
744
        privileged: bool | None = None,
745
        labels: dict[str, str] | None = None,
746
        platform: DockerPlatform | None = None,
747
        ulimits: list[Ulimit] | None = None,
748
        init: bool | None = None,
749
        log_config: LogConfig | None = None,
750
        cpu_shares: int | None = None,
751
        mem_limit: int | str | None = None,
752
        auth_config: dict[str, str] | None = None,
753
    ) -> str:
754
        LOG.debug("Creating container with attributes: %s", locals())
1✔
755
        extra_hosts = None
1✔
756
        if additional_flags:
1✔
757
            parsed_flags = Util.parse_additional_flags(
1✔
758
                additional_flags,
759
                env_vars=env_vars,
760
                volumes=volumes,
761
                network=network,
762
                platform=platform,
763
                privileged=privileged,
764
                ports=ports,
765
                ulimits=ulimits,
766
                user=user,
767
                dns=dns,
768
            )
769
            env_vars = parsed_flags.env_vars
1✔
770
            extra_hosts = parsed_flags.extra_hosts
1✔
771
            volumes = parsed_flags.volumes
1✔
772
            labels = parsed_flags.labels
1✔
773
            network = parsed_flags.network
1✔
774
            platform = parsed_flags.platform
1✔
775
            privileged = parsed_flags.privileged
1✔
776
            ports = parsed_flags.ports
1✔
777
            ulimits = parsed_flags.ulimits
1✔
778
            user = parsed_flags.user
1✔
779
            dns = parsed_flags.dns
1✔
780

781
        try:
1✔
782
            kwargs = {}
1✔
783
            if cap_add:
1✔
784
                kwargs["cap_add"] = cap_add
1✔
785
            if cap_drop:
1✔
786
                kwargs["cap_drop"] = cap_drop
1✔
787
            if security_opt:
1✔
788
                kwargs["security_opt"] = security_opt
1✔
789
            if dns:
1✔
790
                kwargs["dns"] = ensure_list(dns)
1✔
791
            if exposed_ports:
1✔
792
                # This is not exactly identical to --expose, as they are listed in the "HostConfig" on docker inspect
793
                # but the behavior should be identical
794
                kwargs["ports"] = {port: [] for port in exposed_ports}
×
795
            if ports:
1✔
796
                kwargs.setdefault("ports", {})
1✔
797
                kwargs["ports"].update(ports.to_dict())
1✔
798
            if workdir:
1✔
799
                kwargs["working_dir"] = workdir
1✔
800
            if privileged:
1✔
801
                kwargs["privileged"] = True
×
802
            if init:
1✔
803
                kwargs["init"] = True
1✔
804
            if labels:
1✔
805
                kwargs["labels"] = labels
1✔
806
            if log_config:
1✔
807
                kwargs["log_config"] = DockerLogConfig(
1✔
808
                    type=log_config.type, config=log_config.config
809
                )
810
            if ulimits:
1✔
811
                kwargs["ulimits"] = [
1✔
812
                    docker.types.Ulimit(
813
                        name=ulimit.name, soft=ulimit.soft_limit, hard=ulimit.hard_limit
814
                    )
815
                    for ulimit in ulimits
816
                ]
817
            if cpu_shares:
1✔
818
                kwargs["cpu_shares"] = cpu_shares
1✔
819
            if mem_limit:
1✔
820
                kwargs["mem_limit"] = mem_limit
1✔
821
            mounts = None
1✔
822
            if volumes:
1✔
823
                mounts = Util.convert_mount_list_to_dict(volumes)
1✔
824

825
            image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
826

827
            def create_container():
1✔
828
                return self.client().containers.create(
1✔
829
                    image=image_name,
830
                    command=command,
831
                    auto_remove=remove,
832
                    name=name,
833
                    stdin_open=interactive,
834
                    tty=tty,
835
                    entrypoint=entrypoint,
836
                    environment=env_vars,
837
                    detach=detach,
838
                    user=user,
839
                    network=network,
840
                    volumes=mounts,
841
                    extra_hosts=extra_hosts,
842
                    platform=platform,
843
                    **kwargs,
844
                )
845

846
            try:
1✔
847
                container = create_container()
1✔
848
            except ImageNotFound:
1✔
849
                LOG.debug("Image not found. Pulling image %s", image_name)
1✔
850
                self.pull_image(image_name, platform, auth_config=auth_config)
1✔
851
                container = create_container()
1✔
852
            return container.id
1✔
853
        except ImageNotFound:
1✔
854
            raise NoSuchImage(image_name)
×
855
        except APIError as e:
1✔
856
            raise ContainerException() from e
×
857

858
    def run_container(
1✔
859
        self,
860
        image_name: str,
861
        stdin=None,
862
        *,
863
        name: str | None = None,
864
        entrypoint: str | None = None,
865
        remove: bool = False,
866
        interactive: bool = False,
867
        tty: bool = False,
868
        detach: bool = False,
869
        command: list[str] | str | None = None,
870
        volumes: list[SimpleVolumeBind] | None = None,
871
        ports: PortMappings | None = None,
872
        exposed_ports: list[str] | None = None,
873
        env_vars: dict[str, str] | None = None,
874
        user: str | None = None,
875
        cap_add: list[str] | None = None,
876
        cap_drop: list[str] | None = None,
877
        security_opt: list[str] | None = None,
878
        network: str | None = None,
879
        dns: str | None = None,
880
        additional_flags: str | None = None,
881
        workdir: str | None = None,
882
        labels: dict[str, str] | None = None,
883
        platform: DockerPlatform | None = None,
884
        privileged: bool | None = None,
885
        ulimits: list[Ulimit] | None = None,
886
        init: bool | None = None,
887
        log_config: LogConfig | None = None,
888
        cpu_shares: int | None = None,
889
        mem_limit: int | str | None = None,
890
        auth_config: dict[str, str] | None = None,
891
    ) -> tuple[bytes, bytes]:
892
        LOG.debug("Running container with image: %s", image_name)
1✔
893
        container = None
1✔
894
        try:
1✔
895
            container = self.create_container(
1✔
896
                image_name,
897
                name=name,
898
                entrypoint=entrypoint,
899
                interactive=interactive,
900
                tty=tty,
901
                detach=detach,
902
                remove=remove and detach,
903
                command=command,
904
                volumes=volumes,
905
                ports=ports,
906
                exposed_ports=exposed_ports,
907
                env_vars=env_vars,
908
                user=user,
909
                cap_add=cap_add,
910
                cap_drop=cap_drop,
911
                security_opt=security_opt,
912
                network=network,
913
                dns=dns,
914
                additional_flags=additional_flags,
915
                workdir=workdir,
916
                privileged=privileged,
917
                platform=platform,
918
                init=init,
919
                labels=labels,
920
                ulimits=ulimits,
921
                log_config=log_config,
922
                cpu_shares=cpu_shares,
923
                mem_limit=mem_limit,
924
                auth_config=auth_config,
925
            )
926
            result = self.start_container(
1✔
927
                container_name_or_id=container,
928
                stdin=stdin,
929
                interactive=interactive,
930
                attach=not detach,
931
            )
932
        finally:
933
            if remove and container and not detach:
1✔
934
                self.remove_container(container)
1✔
935
        return result
1✔
936

937
    def exec_in_container(
1✔
938
        self,
939
        container_name_or_id: str,
940
        command: list[str] | str,
941
        interactive=False,
942
        detach=False,
943
        env_vars: dict[str, str | None] | None = None,
944
        stdin: bytes | None = None,
945
        user: str | None = None,
946
        workdir: str | None = None,
947
    ) -> tuple[bytes, bytes]:
948
        LOG.debug("Executing command in container %s: %s", container_name_or_id, command)
1✔
949
        try:
1✔
950
            container: Container = self.client().containers.get(container_name_or_id)
1✔
951
            result = container.exec_run(
1✔
952
                cmd=command,
953
                environment=env_vars,
954
                user=user,
955
                detach=detach,
956
                stdin=interactive and bool(stdin),
957
                socket=interactive and bool(stdin),
958
                stdout=True,
959
                stderr=True,
960
                demux=True,
961
                workdir=workdir,
962
            )
963
            tty = False
1✔
964
            if interactive and stdin:  # result is a socket
1✔
965
                sock = result[1]
1✔
966
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
967
                with sock:
1✔
968
                    try:
1✔
969
                        sock.sendall(stdin)
1✔
970
                        sock.shutdown(socket.SHUT_WR)
1✔
971
                        stdout, stderr = self._read_from_sock(sock, tty)
1✔
972
                        return stdout, stderr
1✔
973
                    except TimeoutError:
×
974
                        pass
×
975
            else:
976
                if detach:
1✔
977
                    return b"", b""
×
978
                return_code = result[0]
1✔
979
                if isinstance(result[1], bytes):
1✔
980
                    stdout = result[1]
×
981
                    stderr = b""
×
982
                else:
983
                    stdout, stderr = result[1]
1✔
984
                if return_code != 0:
1✔
985
                    raise ContainerException(
1✔
986
                        f"Exec command returned with exit code {return_code}", stdout, stderr
987
                    )
988
                return stdout, stderr
1✔
989
        except ContainerError:
1✔
990
            raise NoSuchContainer(container_name_or_id)
×
991
        except APIError as e:
1✔
992
            raise ContainerException() from e
1✔
993

994
    def login(self, username: str, password: str, registry: str | None = None) -> None:
1✔
995
        LOG.debug("Docker login for %s", username)
×
996
        try:
×
997
            self.client().login(username, password=password, registry=registry, reauth=True)
×
998
        except APIError as e:
×
999
            raise ContainerException() from e
×
1000

1001

1002
# apply patches required for podman API compatibility
1003

1004

1005
@property
1✔
1006
def _container_image(self):
1✔
1007
    image_id = self.attrs.get("ImageID", self.attrs["Image"])
1✔
1008
    if image_id is None:
1✔
1009
        return None
×
1010
    image_ref = image_id
1✔
1011
    # Fix for podman API response: Docker returns "sha:..." for `Image`, podman returns "<image-name>:<tag>".
1012
    # See https://github.com/containers/podman/issues/8329 . Without this check, the Docker client would
1013
    # blindly strip off the suffix after the colon `:` (which is the `<tag>` in podman's case) which would
1014
    # then lead to "no such image" errors.
1015
    if re.match("sha256:[0-9a-f]{64}", image_id, flags=re.IGNORECASE):
1✔
1016
        image_ref = image_id.split(":")[1]
1✔
1017
    return self.client.images.get(image_ref)
1✔
1018

1019

1020
Container.image = _container_image
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc