• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21810945741

06 Feb 2026 05:38PM UTC coverage: 86.871% (-0.01%) from 86.883%
21810945741

push

github

web-flow
APIGW: fix model typing / update custom id logic (#13694)

5 of 5 new or added lines in 2 files covered. (100.0%)

70 existing lines in 4 files now uncovered.

69960 of 80533 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.22
/localstack-core/localstack/utils/container_utils/docker_sdk_client.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import queue
1✔
6
import re
1✔
7
import socket
1✔
8
import threading
1✔
9
from collections.abc import Callable
1✔
10
from functools import cache
1✔
11
from time import sleep
1✔
12
from typing import cast
1✔
13
from urllib.parse import quote
1✔
14

15
import docker
1✔
16
from docker import DockerClient
1✔
17
from docker.errors import APIError, ContainerError, DockerException, ImageNotFound, NotFound
1✔
18
from docker.models.containers import Container
1✔
19
from docker.types import LogConfig as DockerLogConfig
1✔
20
from docker.utils.socket import STDERR, STDOUT, frames_iter
1✔
21

22
from localstack.config import LS_LOG
1✔
23
from localstack.constants import TRACE_LOG_LEVELS
1✔
24
from localstack.utils.collections import ensure_list
1✔
25
from localstack.utils.container_utils.container_client import (
1✔
26
    AccessDenied,
27
    CancellableStream,
28
    ContainerClient,
29
    ContainerException,
30
    DockerContainerStats,
31
    DockerContainerStatus,
32
    DockerNotAvailable,
33
    DockerPlatform,
34
    LogConfig,
35
    NoSuchContainer,
36
    NoSuchImage,
37
    NoSuchNetwork,
38
    PortMappings,
39
    RegistryConnectionError,
40
    SimpleVolumeBind,
41
    Ulimit,
42
    Util,
43
)
44
from localstack.utils.strings import to_bytes, to_str
1✔
45
from localstack.utils.threads import start_worker_thread
1✔
46

47
LOG = logging.getLogger(__name__)
1✔
48
SDK_ISDIR = 1 << 31
1✔
49

50

51
class SdkDockerClient(ContainerClient):
1✔
52
    """
53
    Class for managing Docker (or Podman) using the Python Docker SDK.
54

55
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
56
    for Docker these days (with ongoing efforts to further streamline the two), and the Docker SDK
57
    is doing some of the heavy lifting for us to support both target platforms.
58
    """
59

60
    docker_client: DockerClient | None
1✔
61

62
    def __init__(self):
1✔
63
        try:
1✔
64
            self.docker_client = self._create_client()
1✔
65
            logging.getLogger("urllib3").setLevel(logging.INFO)
1✔
66
        except DockerNotAvailable:
1✔
67
            self.docker_client = None
1✔
68

69
    def client(self):
1✔
70
        if self.docker_client:
1✔
71
            return self.docker_client
1✔
72
        # if the initialization failed before, try to initialize on-demand
73
        self.docker_client = self._create_client()
1✔
74
        return self.docker_client
1✔
75

76
    @staticmethod
1✔
77
    def _create_client():
1✔
78
        from localstack.config import DOCKER_SDK_DEFAULT_RETRIES, DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS
1✔
79

80
        for attempt in range(0, DOCKER_SDK_DEFAULT_RETRIES + 1):
1✔
81
            try:
1✔
82
                return docker.from_env(timeout=DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS)
1✔
83
            except DockerException as e:
1✔
84
                LOG.debug(
1✔
85
                    "Creating Docker SDK client failed: %s. "
86
                    "If you want to use Docker as container runtime, make sure to mount the socket at /var/run/docker.sock",
87
                    e,
88
                    exc_info=LS_LOG in TRACE_LOG_LEVELS,
89
                )
90
                if attempt < DOCKER_SDK_DEFAULT_RETRIES:
1✔
91
                    # wait for a second before retrying
92
                    sleep(1)
1✔
93
                else:
94
                    # we are out of attempts
95
                    raise DockerNotAvailable("Docker not available") from e
1✔
96

97
    def _read_from_sock(self, sock: socket, tty: bool):
1✔
98
        """Reads multiplexed messages from a socket returned by attach_socket.
99

100
        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
101
        """
102
        stdout = b""
1✔
103
        stderr = b""
1✔
104
        for frame_type, frame_data in frames_iter(sock, tty):
1✔
105
            if frame_type == STDOUT:
1✔
106
                stdout += frame_data
1✔
107
            elif frame_type == STDERR:
1✔
108
                stderr += frame_data
1✔
109
            else:
110
                raise ContainerException("Invalid frame type when reading from socket")
×
111
        return stdout, stderr
1✔
112

113
    def _container_path_info(self, container: Container, container_path: str):
1✔
114
        """
115
        Get information about a path in the given container
116
        :param container: Container to be inspected
117
        :param container_path: Path in container
118
        :return: Tuple (path_exists, path_is_directory)
119
        """
120
        # Docker CLI copy uses go FileMode to determine if target is a dict or not
121
        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260
122
        # The isDir Bit is the most significant bit in the 32bit struct:
123
        # https://golang.org/src/os/types.go?s=2650:2683
124
        api_client = self.client().api
1✔
125

126
        def _head(path_suffix, **kwargs):
1✔
127
            return api_client.head(
1✔
128
                api_client.base_url + path_suffix, **api_client._set_request_timeout(kwargs)
129
            )
130

131
        escaped_id = quote(container.id, safe="/:")
1✔
132
        result = _head(f"/containers/{escaped_id}/archive", params={"path": container_path})
1✔
133
        stats = result.headers.get("X-Docker-Container-Path-Stat")
1✔
134
        target_exists = result.ok
1✔
135

136
        if target_exists:
1✔
137
            stats = json.loads(base64.b64decode(stats).decode("utf-8"))
1✔
138
        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)
1✔
139
        return target_exists, target_is_dir
1✔
140

141
    def get_system_info(self) -> dict:
1✔
142
        return self.client().info()
1✔
143

144
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
145
        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose
146
        try:
1✔
147
            container = self.client().containers.get(container_name)
1✔
148
            if container.status == "running":
1✔
149
                return DockerContainerStatus.UP
1✔
150
            elif container.status == "paused":
1✔
151
                return DockerContainerStatus.PAUSED
1✔
152
            else:
153
                return DockerContainerStatus.DOWN
1✔
154
        except NotFound:
1✔
155
            return DockerContainerStatus.NON_EXISTENT
1✔
156
        except APIError as e:
×
157
            raise ContainerException() from e
×
158

159
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
160
        try:
1✔
161
            container = self.client().containers.get(container_name)
1✔
162
            sdk_stats = container.stats(stream=False)
1✔
163

164
            # BlockIO: (Read, Write) bytes
165
            read_bytes = 0
1✔
166
            write_bytes = 0
1✔
167
            for entry in (
1✔
168
                sdk_stats.get("blkio_stats", {}).get("io_service_bytes_recursive", []) or []
169
            ):
170
                if entry.get("op") == "read":
×
171
                    read_bytes += entry.get("value", 0)
×
172
                elif entry.get("op") == "write":
×
173
                    write_bytes += entry.get("value", 0)
×
174

175
            # CPU percentage
176
            cpu_stats = sdk_stats.get("cpu_stats", {})
1✔
177
            precpu_stats = sdk_stats.get("precpu_stats", {})
1✔
178

179
            cpu_delta = cpu_stats.get("cpu_usage", {}).get("total_usage", 0) - precpu_stats.get(
1✔
180
                "cpu_usage", {}
181
            ).get("total_usage", 0)
182

183
            system_delta = cpu_stats.get("system_cpu_usage", 0) - precpu_stats.get(
1✔
184
                "system_cpu_usage", 0
185
            )
186

187
            online_cpus = cpu_stats.get("online_cpus", 1)
1✔
188
            cpu_percent = (
1✔
189
                (cpu_delta / system_delta * 100.0 * online_cpus) if system_delta > 0 else 0.0
190
            )
191

192
            # Memory (usage, limit) bytes
193
            memory_stats = sdk_stats.get("memory_stats", {})
1✔
194
            mem_usage = memory_stats.get("usage", 0)
1✔
195
            mem_limit = memory_stats.get("limit", 1)  # Prevent division by zero
1✔
196
            mem_inactive = memory_stats.get("stats", {}).get("inactive_file", 0)
1✔
197
            used_memory = max(0, mem_usage - mem_inactive)
1✔
198
            mem_percent = (used_memory / mem_limit * 100.0) if mem_limit else 0.0
1✔
199

200
            # Network IO
201
            net_rx = 0
1✔
202
            net_tx = 0
1✔
203
            for iface in sdk_stats.get("networks", {}).values():
1✔
204
                net_rx += iface.get("rx_bytes", 0)
1✔
205
                net_tx += iface.get("tx_bytes", 0)
1✔
206

207
            # Container ID
208
            container_id = sdk_stats.get("id", "")[:12]
1✔
209
            name = sdk_stats.get("name", "").lstrip("/")
1✔
210

211
            return DockerContainerStats(
1✔
212
                Container=container_id,
213
                ID=container_id,
214
                Name=name,
215
                BlockIO=(read_bytes, write_bytes),
216
                CPUPerc=round(cpu_percent, 2),
217
                MemPerc=round(mem_percent, 2),
218
                MemUsage=(used_memory, mem_limit),
219
                NetIO=(net_rx, net_tx),
220
                PIDs=sdk_stats.get("pids_stats", {}).get("current", 0),
221
                SDKStats=sdk_stats,  # keep the raw stats for more detailed information
222
            )
223
        except NotFound:
×
224
            raise NoSuchContainer(container_name)
×
225
        except APIError as e:
×
226
            raise ContainerException() from e
×
227

228
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
229
        LOG.debug("Stopping container: %s", container_name)
1✔
230
        try:
1✔
231
            container = self.client().containers.get(container_name)
1✔
232
            container.stop(timeout=timeout)
1✔
233
        except NotFound:
1✔
234
            raise NoSuchContainer(container_name)
1✔
235
        except APIError as e:
×
236
            raise ContainerException() from e
×
237

238
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
239
        LOG.debug("Restarting container: %s", container_name)
1✔
240
        try:
1✔
241
            container = self.client().containers.get(container_name)
1✔
242
            container.restart(timeout=timeout)
1✔
243
        except NotFound:
1✔
244
            raise NoSuchContainer(container_name)
1✔
245
        except APIError as e:
×
246
            raise ContainerException() from e
×
247

248
    def pause_container(self, container_name: str) -> None:
1✔
249
        LOG.debug("Pausing container: %s", container_name)
1✔
250
        try:
1✔
251
            container = self.client().containers.get(container_name)
1✔
252
            container.pause()
1✔
253
        except NotFound:
1✔
254
            raise NoSuchContainer(container_name)
1✔
255
        except APIError as e:
×
256
            raise ContainerException() from e
×
257

258
    def unpause_container(self, container_name: str) -> None:
1✔
259
        LOG.debug("Unpausing container: %s", container_name)
1✔
260
        try:
1✔
261
            container = self.client().containers.get(container_name)
1✔
262
            container.unpause()
1✔
263
        except NotFound:
×
264
            raise NoSuchContainer(container_name)
×
265
        except APIError as e:
×
266
            raise ContainerException() from e
×
267

268
    def remove_container(
1✔
269
        self, container_name: str, force=True, check_existence=False, volumes=False
270
    ) -> None:
271
        LOG.debug("Removing container: %s, with volumes: %s", container_name, volumes)
1✔
272
        if check_existence and container_name not in self.get_all_container_names():
1✔
273
            LOG.debug("Aborting removing due to check_existence check")
×
274
            return
×
275
        try:
1✔
276
            container = self.client().containers.get(container_name)
1✔
277
            container.remove(force=force, v=volumes)
1✔
278
        except NotFound:
1✔
279
            if not force:
1✔
280
                raise NoSuchContainer(container_name)
1✔
281
        except APIError as e:
×
282
            raise ContainerException() from e
×
283

284
    def list_containers(self, filter: list[str] | str | None = None, all=True) -> list[dict]:
1✔
285
        if filter:
1✔
286
            filter = [filter] if isinstance(filter, str) else filter
1✔
287
            filter = dict([f.split("=", 1) for f in filter])
1✔
288
        LOG.debug("Listing containers with filters: %s", filter)
1✔
289
        try:
1✔
290
            container_list = self.client().containers.list(filters=filter, all=all)
1✔
291
            result = []
1✔
292
            for container in container_list:
1✔
293
                try:
1✔
294
                    result.append(
1✔
295
                        {
296
                            "id": container.id,
297
                            "image": container.image,
298
                            "name": container.name,
299
                            "status": container.status,
300
                            "labels": container.labels,
301
                        }
302
                    )
303
                except Exception as e:
×
304
                    LOG.error("Error checking container %s: %s", container, e)
×
305
            return result
1✔
306
        except APIError as e:
1✔
307
            raise ContainerException() from e
1✔
308

309
    def copy_into_container(
1✔
310
        self, container_name: str, local_path: str, container_path: str
311
    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/
312
        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)
1✔
313
        try:
1✔
314
            container = self.client().containers.get(container_name)
1✔
315
            target_exists, target_isdir = self._container_path_info(container, container_path)
1✔
316
            target_path = container_path if target_isdir else os.path.dirname(container_path)
1✔
317
            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:
1✔
318
                container.put_archive(target_path, tar)
1✔
319
        except NotFound:
1✔
320
            raise NoSuchContainer(container_name)
1✔
321
        except APIError as e:
1✔
322
            raise ContainerException() from e
×
323

324
    def copy_from_container(
1✔
325
        self,
326
        container_name: str,
327
        local_path: str,
328
        container_path: str,
329
    ) -> None:
330
        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)
1✔
331
        try:
1✔
332
            container = self.client().containers.get(container_name)
1✔
333
            bits, _ = container.get_archive(container_path)
1✔
334
            Util.untar_to_path(bits, local_path)
1✔
335
        except NotFound:
1✔
336
            raise NoSuchContainer(container_name)
1✔
337
        except APIError as e:
×
338
            raise ContainerException() from e
×
339

340
    def pull_image(
1✔
341
        self,
342
        docker_image: str,
343
        platform: DockerPlatform | None = None,
344
        log_handler: Callable[[str], None] | None = None,
345
        auth_config: dict[str, str] | None = None,
346
    ) -> None:
347
        LOG.debug("Pulling Docker image: %s", docker_image)
1✔
348
        # some path in the docker image string indicates a custom repository
349

350
        docker_image = self.registry_resolver_strategy.resolve(docker_image)
1✔
351
        kwargs: dict[str, str | bool | dict[str, str]] = {"platform": platform}
1✔
352
        if auth_config:
1✔
353
            kwargs["auth_config"] = auth_config
1✔
354
        try:
1✔
355
            if log_handler:
1✔
356
                # Use a lower-level API, as the 'stream' argument is not available in the higher-level `pull`-API
357
                kwargs["stream"] = True
1✔
358
                stream = self.client().api.pull(docker_image, **kwargs)
1✔
359
                for line in stream:
1✔
360
                    log_handler(to_str(line))
1✔
361
            else:
362
                self.client().images.pull(docker_image, **kwargs)
1✔
363
        except ImageNotFound:
1✔
364
            raise NoSuchImage(docker_image)
1✔
365
        except APIError as e:
1✔
366
            raise ContainerException() from e
1✔
367

368
    def push_image(self, docker_image: str, auth_config: dict[str, str] | None = None) -> None:
1✔
369
        LOG.debug("Pushing Docker image: %s", docker_image)
1✔
370
        kwargs: dict[str, dict[str, str]] = {}
1✔
371
        if auth_config:
1✔
372
            kwargs["auth_config"] = auth_config
1✔
373
        try:
1✔
374
            result = self.client().images.push(docker_image, **kwargs)
1✔
375
            # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising
376
            if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):
1✔
377
                if "image does not exist locally" in to_str(result):
1✔
378
                    raise NoSuchImage(docker_image)
1✔
379
                if "is denied" in to_str(result):
1✔
380
                    raise AccessDenied(docker_image)
1✔
381
                if "requesting higher privileges than access token allows" in to_str(result):
1✔
382
                    raise AccessDenied(docker_image)
×
383
                if "access token has insufficient scopes" in to_str(result):
1✔
384
                    raise AccessDenied(docker_image)
×
385
                if "authorization failed: no basic auth credentials" in to_str(result):
1✔
386
                    raise AccessDenied(docker_image)
×
387
                if "401 Unauthorized" in to_str(result):
1✔
388
                    raise AccessDenied(docker_image)
×
389
                if "no basic auth credentials" in to_str(result):
1✔
390
                    raise AccessDenied(docker_image)
1✔
391
                if "unauthorized: authentication required" in to_str(result):
1✔
392
                    raise AccessDenied(docker_image)
×
393
                if "connection refused" in to_str(result):
1✔
394
                    raise RegistryConnectionError(result)
1✔
395
                if "failed to do request:" in to_str(result):
×
396
                    raise RegistryConnectionError(result)
×
397
                raise ContainerException(result)
×
398
        except ImageNotFound:
1✔
399
            raise NoSuchImage(docker_image)
×
400
        except APIError as e:
1✔
401
            # note: error message 'image not known' raised by Podman API
402
            if "image not known" in str(e):
×
403
                raise NoSuchImage(docker_image)
×
404
            raise ContainerException() from e
×
405

406
    def build_image(
1✔
407
        self,
408
        dockerfile_path: str,
409
        image_name: str,
410
        context_path: str = None,
411
        platform: DockerPlatform | None = None,
412
    ):
413
        try:
1✔
414
            dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
1✔
415
            context_path = context_path or os.path.dirname(dockerfile_path)
1✔
416
            LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)
1✔
417
            _, logs_iterator = self.client().images.build(
1✔
418
                path=context_path,
419
                dockerfile=dockerfile_path,
420
                tag=image_name,
421
                rm=True,
422
                platform=platform,
423
            )
424
            # logs_iterator is a stream of dicts. Example content:
425
            # {'stream': 'Step 1/4 : FROM alpine'}
426
            # ... other build steps
427
            # {'aux': {'ID': 'sha256:4dcf90e87fb963e898f9c7a0451a40e36f8e7137454c65ae4561277081747825'}}
428
            # {'stream': 'Successfully tagged img-5201f3e1:latest\n'}
429
            output = ""
1✔
430
            for log in logs_iterator:
1✔
431
                if isinstance(log, dict) and ("stream" in log or "error" in log):
1✔
432
                    output += log.get("stream") or log["error"]
1✔
433
            return output
1✔
434
        except APIError as e:
×
435
            raise ContainerException("Unable to build Docker image") from e
×
436

437
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
438
        try:
1✔
439
            LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)
1✔
440
            image = self.client().images.get(source_ref)
1✔
441
            image.tag(target_name)
1✔
442
        except APIError as e:
1✔
443
            if e.status_code == 404:
1✔
444
                raise NoSuchImage(source_ref)
1✔
445
            raise ContainerException("Unable to tag Docker image") from e
×
446

447
    def get_docker_image_names(
1✔
448
        self,
449
        strip_latest: bool = True,
450
        include_tags: bool = True,
451
        strip_wellknown_repo_prefixes: bool = True,
452
    ):
453
        try:
1✔
454
            images = self.client().images.list()
1✔
455
            image_names = [tag for image in images for tag in image.tags if image.tags]
1✔
456
            if not include_tags:
1✔
457
                image_names = [image_name.rpartition(":")[0] for image_name in image_names]
1✔
458
            if strip_wellknown_repo_prefixes:
1✔
459
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
1✔
460
            if strip_latest:
1✔
461
                Util.append_without_latest(image_names)
1✔
462
            return image_names
1✔
463
        except APIError as e:
×
464
            raise ContainerException() from e
×
465

466
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
467
        try:
1✔
468
            container = self.client().containers.get(container_name_or_id)
1✔
469
            return to_str(container.logs())
1✔
470
        except NotFound:
1✔
471
            if safe:
1✔
472
                return ""
1✔
473
            raise NoSuchContainer(container_name_or_id)
1✔
474
        except APIError as e:
1✔
475
            if safe:
1✔
476
                return ""
×
477
            raise ContainerException() from e
1✔
478

479
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
480
        try:
1✔
481
            container = self.client().containers.get(container_name_or_id)
1✔
482
            return container.logs(stream=True, follow=True)
1✔
483
        except NotFound:
1✔
484
            raise NoSuchContainer(container_name_or_id)
1✔
485
        except APIError as e:
×
486
            raise ContainerException() from e
×
487

488
    def inspect_container(self, container_name_or_id: str) -> dict[str, dict | str]:
1✔
489
        try:
1✔
490
            return self.client().containers.get(container_name_or_id).attrs
1✔
491
        except NotFound:
1✔
492
            raise NoSuchContainer(container_name_or_id)
1✔
493
        except APIError as e:
×
494
            raise ContainerException() from e
×
495

496
    def inspect_image(
1✔
497
        self,
498
        image_name: str,
499
        pull: bool = True,
500
        strip_wellknown_repo_prefixes: bool = True,
501
    ) -> dict[str, dict | list | str]:
502
        image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
503
        try:
1✔
504
            result = self.client().images.get(image_name).attrs
1✔
505
            if strip_wellknown_repo_prefixes:
1✔
506
                if result.get("RepoDigests"):
1✔
507
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
1✔
508
                        result["RepoDigests"]
509
                    )
510
                if result.get("RepoTags"):
1✔
511
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
1✔
512
            return result
1✔
513
        except NotFound:
1✔
514
            if pull:
1✔
515
                self.pull_image(image_name)
1✔
516
                return self.inspect_image(image_name, pull=False)
1✔
517
            raise NoSuchImage(image_name)
1✔
518
        except APIError as e:
×
519
            raise ContainerException() from e
×
520

521
    def create_network(self, network_name: str) -> None:
1✔
522
        try:
1✔
523
            return self.client().networks.create(name=network_name).id
1✔
524
        except APIError as e:
×
525
            raise ContainerException() from e
×
526

527
    def delete_network(self, network_name: str) -> None:
1✔
528
        try:
1✔
529
            return self.client().networks.get(network_name).remove()
1✔
UNCOV
530
        except NotFound:
×
531
            raise NoSuchNetwork(network_name)
×
UNCOV
532
        except APIError as e:
×
UNCOV
533
            raise ContainerException() from e
×
534

535
    def inspect_network(self, network_name: str) -> dict[str, dict | str]:
1✔
536
        try:
1✔
537
            return self.client().networks.get(network_name).attrs
1✔
538
        except NotFound:
1✔
539
            raise NoSuchNetwork(network_name)
1✔
540
        except APIError as e:
×
541
            raise ContainerException() from e
×
542

543
    def connect_container_to_network(
1✔
544
        self,
545
        network_name: str,
546
        container_name_or_id: str,
547
        aliases: list | None = None,
548
        link_local_ips: list[str] = None,
549
    ) -> None:
550
        LOG.debug(
1✔
551
            "Connecting container '%s' to network '%s' with aliases '%s'",
552
            container_name_or_id,
553
            network_name,
554
            aliases,
555
        )
556
        try:
1✔
557
            network = self.client().networks.get(network_name)
1✔
558
        except NotFound:
1✔
559
            raise NoSuchNetwork(network_name)
1✔
560
        try:
1✔
561
            network.connect(
1✔
562
                container=container_name_or_id,
563
                aliases=aliases,
564
                link_local_ips=link_local_ips,
565
            )
566
        except NotFound:
1✔
567
            raise NoSuchContainer(container_name_or_id)
1✔
568
        except APIError as e:
×
569
            raise ContainerException() from e
×
570

571
    def disconnect_container_from_network(
1✔
572
        self, network_name: str, container_name_or_id: str
573
    ) -> None:
574
        LOG.debug(
1✔
575
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
576
        )
577
        try:
1✔
578
            try:
1✔
579
                network = self.client().networks.get(network_name)
1✔
580
            except NotFound:
1✔
581
                raise NoSuchNetwork(network_name)
1✔
582
            try:
1✔
583
                network.disconnect(container_name_or_id)
1✔
584
            except NotFound:
1✔
585
                raise NoSuchContainer(container_name_or_id)
1✔
586
        except APIError as e:
1✔
587
            raise ContainerException() from e
×
588

589
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
590
        networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]
1✔
591
        network_names = list(networks)
1✔
592
        if len(network_names) > 1:
1✔
593
            LOG.info("Container has more than one assigned network. Picking the first one...")
×
594
        return networks[network_names[0]]["IPAddress"]
1✔
595

596
    @cache
1✔
597
    def has_docker(self) -> bool:
1✔
598
        try:
1✔
599
            if not self.docker_client:
1✔
600
                return False
×
601
            self.client().ping()
1✔
602
            return True
1✔
603
        except APIError:
×
604
            return False
×
605

606
    def remove_image(self, image: str, force: bool = True):
1✔
607
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
1✔
608
        try:
1✔
609
            self.client().images.remove(image=image, force=force)
1✔
610
        except ImageNotFound:
1✔
611
            if not force:
1✔
612
                raise NoSuchImage(image)
1✔
613
        except APIError as e:
×
614
            if "image not known" in str(e):
×
615
                raise NoSuchImage(image)
×
616
            raise ContainerException() from e
×
617

618
    def commit(
1✔
619
        self,
620
        container_name_or_id: str,
621
        image_name: str,
622
        image_tag: str,
623
    ):
624
        LOG.debug(
1✔
625
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
626
        )
627
        try:
1✔
628
            container = self.client().containers.get(container_name_or_id)
1✔
629
            container.commit(repository=image_name, tag=image_tag)
1✔
630
        except NotFound:
1✔
631
            raise NoSuchContainer(container_name_or_id)
1✔
632
        except APIError as e:
×
633
            raise ContainerException() from e
×
634

635
    def start_container(
1✔
636
        self,
637
        container_name_or_id: str,
638
        stdin=None,
639
        interactive: bool = False,
640
        attach: bool = False,
641
        flags: str | None = None,
642
    ) -> tuple[bytes, bytes]:
643
        LOG.debug("Starting container %s", container_name_or_id)
1✔
644
        try:
1✔
645
            container = self.client().containers.get(container_name_or_id)
1✔
646
            stdout = to_bytes(container_name_or_id)
1✔
647
            stderr = b""
1✔
648
            if interactive or attach:
1✔
649
                params = {"stdout": 1, "stderr": 1, "stream": 1}
1✔
650
                if interactive:
1✔
651
                    params["stdin"] = 1
1✔
652
                sock = container.attach_socket(params=params)
1✔
653
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
654
                result_queue = queue.Queue()
1✔
655
                thread_started = threading.Event()
1✔
656
                start_waiting = threading.Event()
1✔
657

658
                # Note: We need to be careful about potential race conditions here - .wait() should happen right
659
                #   after .start(). Hence starting a thread and asynchronously waiting for the container exit code
660
                def wait_for_result(*_):
1✔
661
                    _exit_code = -1
1✔
662
                    try:
1✔
663
                        thread_started.set()
1✔
664
                        start_waiting.wait()
1✔
665
                        _exit_code = container.wait()["StatusCode"]
1✔
666
                    except APIError as e:
×
667
                        _exit_code = 1
×
668
                        raise ContainerException(str(e))
×
669
                    finally:
670
                        result_queue.put(_exit_code)
1✔
671

672
                # start listener thread
673
                start_worker_thread(wait_for_result)
1✔
674
                thread_started.wait()
1✔
675
                try:
1✔
676
                    # start container
677
                    container.start()
1✔
678
                finally:
679
                    # start awaiting container result
680
                    start_waiting.set()
1✔
681

682
                # handle container input/output
683
                # under windows, the socket has no __enter__ / cannot be used as context manager
684
                # therefore try/finally instead of with here
685
                try:
1✔
686
                    if stdin:
1✔
687
                        sock.sendall(to_bytes(stdin))
1✔
688
                        sock.shutdown(socket.SHUT_WR)
1✔
689
                    stdout, stderr = self._read_from_sock(sock, False)
1✔
690
                except TimeoutError:
×
691
                    LOG.debug(
×
692
                        "Socket timeout when talking to the I/O streams of Docker container '%s'",
693
                        container_name_or_id,
694
                    )
695
                finally:
696
                    sock.close()
1✔
697

698
                # get container exit code
699
                exit_code = result_queue.get()
1✔
700
                if exit_code:
1✔
701
                    raise ContainerException(
1✔
702
                        f"Docker container returned with exit code {exit_code}",
703
                        stdout=stdout,
704
                        stderr=stderr,
705
                    )
706
            else:
707
                container.start()
1✔
708
            return stdout, stderr
1✔
709
        except NotFound:
1✔
710
            raise NoSuchContainer(container_name_or_id)
1✔
711
        except APIError as e:
1✔
712
            raise ContainerException() from e
1✔
713

714
    def attach_to_container(self, container_name_or_id: str):
1✔
715
        client: DockerClient = self.client()
×
716
        container = cast(Container, client.containers.get(container_name_or_id))
×
717
        container.attach()
×
718

719
    def create_container(
1✔
720
        self,
721
        image_name: str,
722
        *,
723
        name: str | None = None,
724
        entrypoint: list[str] | str | None = None,
725
        remove: bool = False,
726
        interactive: bool = False,
727
        tty: bool = False,
728
        detach: bool = False,
729
        command: list[str] | str | None = None,
730
        volumes: list[SimpleVolumeBind] | None = None,
731
        ports: PortMappings | None = None,
732
        exposed_ports: list[str] | None = None,
733
        env_vars: dict[str, str] | None = None,
734
        user: str | None = None,
735
        cap_add: list[str] | None = None,
736
        cap_drop: list[str] | None = None,
737
        security_opt: list[str] | None = None,
738
        network: str | None = None,
739
        dns: str | list[str] | None = None,
740
        additional_flags: str | None = None,
741
        workdir: str | None = None,
742
        privileged: bool | None = None,
743
        labels: dict[str, str] | None = None,
744
        platform: DockerPlatform | None = None,
745
        ulimits: list[Ulimit] | None = None,
746
        init: bool | None = None,
747
        log_config: LogConfig | None = None,
748
        cpu_shares: int | None = None,
749
        mem_limit: int | str | None = None,
750
        auth_config: dict[str, str] | None = None,
751
    ) -> str:
752
        LOG.debug("Creating container with attributes: %s", locals())
1✔
753
        extra_hosts = None
1✔
754
        if additional_flags:
1✔
755
            parsed_flags = Util.parse_additional_flags(
1✔
756
                additional_flags,
757
                env_vars=env_vars,
758
                volumes=volumes,
759
                network=network,
760
                platform=platform,
761
                privileged=privileged,
762
                ports=ports,
763
                ulimits=ulimits,
764
                user=user,
765
                dns=dns,
766
            )
767
            env_vars = parsed_flags.env_vars
1✔
768
            extra_hosts = parsed_flags.extra_hosts
1✔
769
            volumes = parsed_flags.volumes
1✔
770
            labels = parsed_flags.labels
1✔
771
            network = parsed_flags.network
1✔
772
            platform = parsed_flags.platform
1✔
773
            privileged = parsed_flags.privileged
1✔
774
            ports = parsed_flags.ports
1✔
775
            ulimits = parsed_flags.ulimits
1✔
776
            user = parsed_flags.user
1✔
777
            dns = parsed_flags.dns
1✔
778

779
        try:
1✔
780
            kwargs = {}
1✔
781
            if cap_add:
1✔
782
                kwargs["cap_add"] = cap_add
1✔
783
            if cap_drop:
1✔
784
                kwargs["cap_drop"] = cap_drop
1✔
785
            if security_opt:
1✔
786
                kwargs["security_opt"] = security_opt
1✔
787
            if dns:
1✔
788
                kwargs["dns"] = ensure_list(dns)
1✔
789
            if exposed_ports:
1✔
790
                # This is not exactly identical to --expose, as they are listed in the "HostConfig" on docker inspect
791
                # but the behavior should be identical
792
                kwargs["ports"] = {port: [] for port in exposed_ports}
×
793
            if ports:
1✔
794
                kwargs.setdefault("ports", {})
1✔
795
                kwargs["ports"].update(ports.to_dict())
1✔
796
            if workdir:
1✔
797
                kwargs["working_dir"] = workdir
1✔
798
            if privileged:
1✔
799
                kwargs["privileged"] = True
×
800
            if init:
1✔
801
                kwargs["init"] = True
1✔
802
            if labels:
1✔
803
                kwargs["labels"] = labels
1✔
804
            if log_config:
1✔
805
                kwargs["log_config"] = DockerLogConfig(
1✔
806
                    type=log_config.type, config=log_config.config
807
                )
808
            if ulimits:
1✔
809
                kwargs["ulimits"] = [
1✔
810
                    docker.types.Ulimit(
811
                        name=ulimit.name, soft=ulimit.soft_limit, hard=ulimit.hard_limit
812
                    )
813
                    for ulimit in ulimits
814
                ]
815
            if cpu_shares:
1✔
816
                kwargs["cpu_shares"] = cpu_shares
1✔
817
            if mem_limit:
1✔
818
                kwargs["mem_limit"] = mem_limit
1✔
819
            mounts = None
1✔
820
            if volumes:
1✔
821
                mounts = Util.convert_mount_list_to_dict(volumes)
1✔
822

823
            image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
824

825
            def create_container():
1✔
826
                return self.client().containers.create(
1✔
827
                    image=image_name,
828
                    command=command,
829
                    auto_remove=remove,
830
                    name=name,
831
                    stdin_open=interactive,
832
                    tty=tty,
833
                    entrypoint=entrypoint,
834
                    environment=env_vars,
835
                    detach=detach,
836
                    user=user,
837
                    network=network,
838
                    volumes=mounts,
839
                    extra_hosts=extra_hosts,
840
                    platform=platform,
841
                    **kwargs,
842
                )
843

844
            try:
1✔
845
                container = create_container()
1✔
846
            except ImageNotFound:
1✔
847
                LOG.debug("Image not found. Pulling image %s", image_name)
1✔
848
                self.pull_image(image_name, platform, auth_config=auth_config)
1✔
849
                container = create_container()
1✔
850
            return container.id
1✔
851
        except ImageNotFound:
1✔
852
            raise NoSuchImage(image_name)
×
853
        except APIError as e:
1✔
854
            raise ContainerException() from e
×
855

856
    def run_container(
1✔
857
        self,
858
        image_name: str,
859
        stdin=None,
860
        *,
861
        name: str | None = None,
862
        entrypoint: str | None = None,
863
        remove: bool = False,
864
        interactive: bool = False,
865
        tty: bool = False,
866
        detach: bool = False,
867
        command: list[str] | str | None = None,
868
        volumes: list[SimpleVolumeBind] | None = None,
869
        ports: PortMappings | None = None,
870
        exposed_ports: list[str] | None = None,
871
        env_vars: dict[str, str] | None = None,
872
        user: str | None = None,
873
        cap_add: list[str] | None = None,
874
        cap_drop: list[str] | None = None,
875
        security_opt: list[str] | None = None,
876
        network: str | None = None,
877
        dns: str | None = None,
878
        additional_flags: str | None = None,
879
        workdir: str | None = None,
880
        labels: dict[str, str] | None = None,
881
        platform: DockerPlatform | None = None,
882
        privileged: bool | None = None,
883
        ulimits: list[Ulimit] | None = None,
884
        init: bool | None = None,
885
        log_config: LogConfig | None = None,
886
        cpu_shares: int | None = None,
887
        mem_limit: int | str | None = None,
888
        auth_config: dict[str, str] | None = None,
889
    ) -> tuple[bytes, bytes]:
890
        LOG.debug("Running container with image: %s", image_name)
1✔
891
        container = None
1✔
892
        try:
1✔
893
            container = self.create_container(
1✔
894
                image_name,
895
                name=name,
896
                entrypoint=entrypoint,
897
                interactive=interactive,
898
                tty=tty,
899
                detach=detach,
900
                remove=remove and detach,
901
                command=command,
902
                volumes=volumes,
903
                ports=ports,
904
                exposed_ports=exposed_ports,
905
                env_vars=env_vars,
906
                user=user,
907
                cap_add=cap_add,
908
                cap_drop=cap_drop,
909
                security_opt=security_opt,
910
                network=network,
911
                dns=dns,
912
                additional_flags=additional_flags,
913
                workdir=workdir,
914
                privileged=privileged,
915
                platform=platform,
916
                init=init,
917
                labels=labels,
918
                ulimits=ulimits,
919
                log_config=log_config,
920
                cpu_shares=cpu_shares,
921
                mem_limit=mem_limit,
922
                auth_config=auth_config,
923
            )
924
            result = self.start_container(
1✔
925
                container_name_or_id=container,
926
                stdin=stdin,
927
                interactive=interactive,
928
                attach=not detach,
929
            )
930
        finally:
931
            if remove and container and not detach:
1✔
932
                self.remove_container(container)
1✔
933
        return result
1✔
934

935
    def exec_in_container(
1✔
936
        self,
937
        container_name_or_id: str,
938
        command: list[str] | str,
939
        interactive=False,
940
        detach=False,
941
        env_vars: dict[str, str | None] | None = None,
942
        stdin: bytes | None = None,
943
        user: str | None = None,
944
        workdir: str | None = None,
945
    ) -> tuple[bytes, bytes]:
946
        LOG.debug("Executing command in container %s: %s", container_name_or_id, command)
1✔
947
        try:
1✔
948
            container: Container = self.client().containers.get(container_name_or_id)
1✔
949
            result = container.exec_run(
1✔
950
                cmd=command,
951
                environment=env_vars,
952
                user=user,
953
                detach=detach,
954
                stdin=interactive and bool(stdin),
955
                socket=interactive and bool(stdin),
956
                stdout=True,
957
                stderr=True,
958
                demux=True,
959
                workdir=workdir,
960
            )
961
            tty = False
1✔
962
            if interactive and stdin:  # result is a socket
1✔
963
                sock = result[1]
1✔
964
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
965
                with sock:
1✔
966
                    try:
1✔
967
                        sock.sendall(stdin)
1✔
968
                        sock.shutdown(socket.SHUT_WR)
1✔
969
                        stdout, stderr = self._read_from_sock(sock, tty)
1✔
970
                        return stdout, stderr
1✔
971
                    except TimeoutError:
×
972
                        pass
×
973
            else:
974
                if detach:
1✔
975
                    return b"", b""
×
976
                return_code = result[0]
1✔
977
                if isinstance(result[1], bytes):
1✔
978
                    stdout = result[1]
×
979
                    stderr = b""
×
980
                else:
981
                    stdout, stderr = result[1]
1✔
982
                if return_code != 0:
1✔
983
                    raise ContainerException(
1✔
984
                        f"Exec command returned with exit code {return_code}", stdout, stderr
985
                    )
986
                return stdout, stderr
1✔
987
        except ContainerError:
1✔
988
            raise NoSuchContainer(container_name_or_id)
×
989
        except APIError as e:
1✔
990
            raise ContainerException() from e
1✔
991

992
    def login(self, username: str, password: str, registry: str | None = None) -> None:
1✔
993
        LOG.debug("Docker login for %s", username)
×
994
        try:
×
995
            self.client().login(username, password=password, registry=registry, reauth=True)
×
996
        except APIError as e:
×
997
            raise ContainerException() from e
×
998

999

1000
# apply patches required for podman API compatibility
1001

1002

1003
@property
1✔
1004
def _container_image(self):
1✔
1005
    image_id = self.attrs.get("ImageID", self.attrs["Image"])
1✔
1006
    if image_id is None:
1✔
1007
        return None
×
1008
    image_ref = image_id
1✔
1009
    # Fix for podman API response: Docker returns "sha:..." for `Image`, podman returns "<image-name>:<tag>".
1010
    # See https://github.com/containers/podman/issues/8329 . Without this check, the Docker client would
1011
    # blindly strip off the suffix after the colon `:` (which is the `<tag>` in podman's case) which would
1012
    # then lead to "no such image" errors.
1013
    if re.match("sha256:[0-9a-f]{64}", image_id, flags=re.IGNORECASE):
1✔
1014
        image_ref = image_id.split(":")[1]
1✔
1015
    return self.client.images.get(image_ref)
1✔
1016

1017

1018
Container.image = _container_image
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc