• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / ec92b1b9-02f7-4f1d-af80-ca3b2f6ea8a3

19 May 2025 11:07AM UTC coverage: 86.661% (+0.03%) from 86.633%
ec92b1b9-02f7-4f1d-af80-ca3b2f6ea8a3

push

circleci

web-flow
CloudFormation V2 Engine: Support for Pseudo Parameter References (#12595)

40 of 44 new or added lines in 4 files covered. (90.91%)

14 existing lines in 6 files now uncovered.

64477 of 74401 relevant lines covered (86.66%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.42
/localstack-core/localstack/utils/container_utils/docker_sdk_client.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import queue
1✔
6
import re
1✔
7
import socket
1✔
8
import threading
1✔
9
from functools import lru_cache
1✔
10
from time import sleep
1✔
11
from typing import Dict, List, Optional, Tuple, Union, cast
1✔
12
from urllib.parse import quote
1✔
13

14
import docker
1✔
15
from docker import DockerClient
1✔
16
from docker.errors import APIError, ContainerError, DockerException, ImageNotFound, NotFound
1✔
17
from docker.models.containers import Container
1✔
18
from docker.types import LogConfig as DockerLogConfig
1✔
19
from docker.utils.socket import STDERR, STDOUT, frames_iter
1✔
20

21
from localstack.config import LS_LOG
1✔
22
from localstack.constants import TRACE_LOG_LEVELS
1✔
23
from localstack.utils.collections import ensure_list
1✔
24
from localstack.utils.container_utils.container_client import (
1✔
25
    AccessDenied,
26
    CancellableStream,
27
    ContainerClient,
28
    ContainerException,
29
    DockerContainerStats,
30
    DockerContainerStatus,
31
    DockerNotAvailable,
32
    DockerPlatform,
33
    LogConfig,
34
    NoSuchContainer,
35
    NoSuchImage,
36
    NoSuchNetwork,
37
    PortMappings,
38
    RegistryConnectionError,
39
    SimpleVolumeBind,
40
    Ulimit,
41
    Util,
42
)
43
from localstack.utils.strings import to_bytes, to_str
1✔
44
from localstack.utils.threads import start_worker_thread
1✔
45

46
LOG = logging.getLogger(__name__)
1✔
47
SDK_ISDIR = 1 << 31
1✔
48

49

50
class SdkDockerClient(ContainerClient):
1✔
51
    """
52
    Class for managing Docker (or Podman) using the Python Docker SDK.
53

54
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
55
    for Docker these days (with ongoing efforts to further streamline the two), and the Docker SDK
56
    is doing some of the heavy lifting for us to support both target platforms.
57
    """
58

59
    docker_client: Optional[DockerClient]
1✔
60

61
    def __init__(self):
1✔
62
        try:
1✔
63
            self.docker_client = self._create_client()
1✔
64
            logging.getLogger("urllib3").setLevel(logging.INFO)
1✔
65
        except DockerNotAvailable:
1✔
66
            self.docker_client = None
1✔
67

68
    def client(self):
1✔
69
        if self.docker_client:
1✔
70
            return self.docker_client
1✔
71
        # if the initialization failed before, try to initialize on-demand
72
        self.docker_client = self._create_client()
1✔
73
        return self.docker_client
1✔
74

75
    @staticmethod
1✔
76
    def _create_client():
1✔
77
        from localstack.config import DOCKER_SDK_DEFAULT_RETRIES, DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS
1✔
78

79
        for attempt in range(0, DOCKER_SDK_DEFAULT_RETRIES + 1):
1✔
80
            try:
1✔
81
                return docker.from_env(timeout=DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS)
1✔
82
            except DockerException as e:
1✔
83
                LOG.debug(
1✔
84
                    "Creating Docker SDK client failed: %s. "
85
                    "If you want to use Docker as container runtime, make sure to mount the socket at /var/run/docker.sock",
86
                    e,
87
                    exc_info=LS_LOG in TRACE_LOG_LEVELS,
88
                )
89
                if attempt < DOCKER_SDK_DEFAULT_RETRIES:
1✔
90
                    # wait for a second before retrying
91
                    sleep(1)
1✔
92
                else:
93
                    # we are out of attempts
94
                    raise DockerNotAvailable("Docker not available") from e
1✔
95

96
    def _read_from_sock(self, sock: socket, tty: bool):
1✔
97
        """Reads multiplexed messages from a socket returned by attach_socket.
98

99
        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
100
        """
101
        stdout = b""
1✔
102
        stderr = b""
1✔
103
        for frame_type, frame_data in frames_iter(sock, tty):
1✔
104
            if frame_type == STDOUT:
1✔
105
                stdout += frame_data
1✔
106
            elif frame_type == STDERR:
1✔
107
                stderr += frame_data
1✔
108
            else:
109
                raise ContainerException("Invalid frame type when reading from socket")
×
110
        return stdout, stderr
1✔
111

112
    def _container_path_info(self, container: Container, container_path: str):
1✔
113
        """
114
        Get information about a path in the given container
115
        :param container: Container to be inspected
116
        :param container_path: Path in container
117
        :return: Tuple (path_exists, path_is_directory)
118
        """
119
        # Docker CLI copy uses go FileMode to determine if target is a dict or not
120
        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260
121
        # The isDir Bit is the most significant bit in the 32bit struct:
122
        # https://golang.org/src/os/types.go?s=2650:2683
123
        api_client = self.client().api
1✔
124

125
        def _head(path_suffix, **kwargs):
1✔
126
            return api_client.head(
1✔
127
                api_client.base_url + path_suffix, **api_client._set_request_timeout(kwargs)
128
            )
129

130
        escaped_id = quote(container.id, safe="/:")
1✔
131
        result = _head(f"/containers/{escaped_id}/archive", params={"path": container_path})
1✔
132
        stats = result.headers.get("X-Docker-Container-Path-Stat")
1✔
133
        target_exists = result.ok
1✔
134

135
        if target_exists:
1✔
136
            stats = json.loads(base64.b64decode(stats).decode("utf-8"))
1✔
137
        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)
1✔
138
        return target_exists, target_is_dir
1✔
139

140
    def get_system_info(self) -> dict:
1✔
141
        return self.client().info()
1✔
142

143
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
144
        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose
145
        try:
1✔
146
            container = self.client().containers.get(container_name)
1✔
147
            if container.status == "running":
1✔
148
                return DockerContainerStatus.UP
1✔
149
            elif container.status == "paused":
1✔
150
                return DockerContainerStatus.PAUSED
1✔
151
            else:
152
                return DockerContainerStatus.DOWN
1✔
153
        except NotFound:
1✔
154
            return DockerContainerStatus.NON_EXISTENT
1✔
155
        except APIError as e:
×
156
            raise ContainerException() from e
×
157

158
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
159
        try:
1✔
160
            container = self.client().containers.get(container_name)
1✔
161
            sdk_stats = container.stats(stream=False)
1✔
162

163
            # BlockIO: (Read, Write) bytes
164
            read_bytes = 0
1✔
165
            write_bytes = 0
1✔
166
            for entry in (
1✔
167
                sdk_stats.get("blkio_stats", {}).get("io_service_bytes_recursive", []) or []
168
            ):
169
                if entry.get("op") == "read":
×
170
                    read_bytes += entry.get("value", 0)
×
171
                elif entry.get("op") == "write":
×
172
                    write_bytes += entry.get("value", 0)
×
173

174
            # CPU percentage
175
            cpu_stats = sdk_stats.get("cpu_stats", {})
1✔
176
            precpu_stats = sdk_stats.get("precpu_stats", {})
1✔
177

178
            cpu_delta = cpu_stats.get("cpu_usage", {}).get("total_usage", 0) - precpu_stats.get(
1✔
179
                "cpu_usage", {}
180
            ).get("total_usage", 0)
181

182
            system_delta = cpu_stats.get("system_cpu_usage", 0) - precpu_stats.get(
1✔
183
                "system_cpu_usage", 0
184
            )
185

186
            online_cpus = cpu_stats.get("online_cpus", 1)
1✔
187
            cpu_percent = (
1✔
188
                (cpu_delta / system_delta * 100.0 * online_cpus) if system_delta > 0 else 0.0
189
            )
190

191
            # Memory (usage, limit) bytes
192
            memory_stats = sdk_stats.get("memory_stats", {})
1✔
193
            mem_usage = memory_stats.get("usage", 0)
1✔
194
            mem_limit = memory_stats.get("limit", 1)  # Prevent division by zero
1✔
195
            mem_inactive = memory_stats.get("stats", {}).get("inactive_file", 0)
1✔
196
            used_memory = max(0, mem_usage - mem_inactive)
1✔
197
            mem_percent = (used_memory / mem_limit * 100.0) if mem_limit else 0.0
1✔
198

199
            # Network IO
200
            net_rx = 0
1✔
201
            net_tx = 0
1✔
202
            for iface in sdk_stats.get("networks", {}).values():
1✔
203
                net_rx += iface.get("rx_bytes", 0)
1✔
204
                net_tx += iface.get("tx_bytes", 0)
1✔
205

206
            # Container ID
207
            container_id = sdk_stats.get("id", "")[:12]
1✔
208
            name = sdk_stats.get("name", "").lstrip("/")
1✔
209

210
            return DockerContainerStats(
1✔
211
                Container=container_id,
212
                ID=container_id,
213
                Name=name,
214
                BlockIO=(read_bytes, write_bytes),
215
                CPUPerc=round(cpu_percent, 2),
216
                MemPerc=round(mem_percent, 2),
217
                MemUsage=(used_memory, mem_limit),
218
                NetIO=(net_rx, net_tx),
219
                PIDs=sdk_stats.get("pids_stats", {}).get("current", 0),
220
                SDKStats=sdk_stats,  # keep the raw stats for more detailed information
221
            )
222
        except NotFound:
×
223
            raise NoSuchContainer(container_name)
×
224
        except APIError as e:
×
225
            raise ContainerException() from e
×
226

227
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
228
        LOG.debug("Stopping container: %s", container_name)
1✔
229
        try:
1✔
230
            container = self.client().containers.get(container_name)
1✔
231
            container.stop(timeout=timeout)
1✔
232
        except NotFound:
1✔
233
            raise NoSuchContainer(container_name)
1✔
234
        except APIError as e:
×
235
            raise ContainerException() from e
×
236

237
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
238
        LOG.debug("Restarting container: %s", container_name)
1✔
239
        try:
1✔
240
            container = self.client().containers.get(container_name)
1✔
241
            container.restart(timeout=timeout)
1✔
242
        except NotFound:
1✔
243
            raise NoSuchContainer(container_name)
1✔
244
        except APIError as e:
×
245
            raise ContainerException() from e
×
246

247
    def pause_container(self, container_name: str) -> None:
1✔
248
        LOG.debug("Pausing container: %s", container_name)
1✔
249
        try:
1✔
250
            container = self.client().containers.get(container_name)
1✔
251
            container.pause()
1✔
252
        except NotFound:
1✔
253
            raise NoSuchContainer(container_name)
1✔
254
        except APIError as e:
×
255
            raise ContainerException() from e
×
256

257
    def unpause_container(self, container_name: str) -> None:
1✔
258
        LOG.debug("Unpausing container: %s", container_name)
1✔
259
        try:
1✔
260
            container = self.client().containers.get(container_name)
1✔
261
            container.unpause()
1✔
262
        except NotFound:
×
263
            raise NoSuchContainer(container_name)
×
264
        except APIError as e:
×
265
            raise ContainerException() from e
×
266

267
    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:
1✔
268
        LOG.debug("Removing container: %s", container_name)
1✔
269
        if check_existence and container_name not in self.get_running_container_names():
1✔
270
            LOG.debug("Aborting removing due to check_existence check")
×
271
            return
×
272
        try:
1✔
273
            container = self.client().containers.get(container_name)
1✔
274
            container.remove(force=force)
1✔
275
        except NotFound:
1✔
276
            if not force:
1✔
277
                raise NoSuchContainer(container_name)
1✔
278
        except APIError as e:
×
279
            raise ContainerException() from e
×
280

281
    def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:
1✔
282
        if filter:
1✔
283
            filter = [filter] if isinstance(filter, str) else filter
1✔
284
            filter = dict([f.split("=", 1) for f in filter])
1✔
285
        LOG.debug("Listing containers with filters: %s", filter)
1✔
286
        try:
1✔
287
            container_list = self.client().containers.list(filters=filter, all=all)
1✔
288
            result = []
1✔
289
            for container in container_list:
1✔
290
                try:
1✔
291
                    result.append(
1✔
292
                        {
293
                            "id": container.id,
294
                            "image": container.image,
295
                            "name": container.name,
296
                            "status": container.status,
297
                            "labels": container.labels,
298
                        }
299
                    )
300
                except Exception as e:
×
301
                    LOG.error("Error checking container %s: %s", container, e)
×
302
            return result
1✔
303
        except APIError as e:
1✔
304
            raise ContainerException() from e
1✔
305

306
    def copy_into_container(
1✔
307
        self, container_name: str, local_path: str, container_path: str
308
    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/
309
        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)
1✔
310
        try:
1✔
311
            container = self.client().containers.get(container_name)
1✔
312
            target_exists, target_isdir = self._container_path_info(container, container_path)
1✔
313
            target_path = container_path if target_isdir else os.path.dirname(container_path)
1✔
314
            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:
1✔
315
                container.put_archive(target_path, tar)
1✔
316
        except NotFound:
1✔
317
            raise NoSuchContainer(container_name)
1✔
318
        except APIError as e:
1✔
319
            raise ContainerException() from e
×
320

321
    def copy_from_container(
1✔
322
        self,
323
        container_name: str,
324
        local_path: str,
325
        container_path: str,
326
    ) -> None:
327
        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)
1✔
328
        try:
1✔
329
            container = self.client().containers.get(container_name)
1✔
330
            bits, _ = container.get_archive(container_path)
1✔
331
            Util.untar_to_path(bits, local_path)
1✔
332
        except NotFound:
1✔
333
            raise NoSuchContainer(container_name)
1✔
334
        except APIError as e:
×
335
            raise ContainerException() from e
×
336

337
    def pull_image(self, docker_image: str, platform: Optional[DockerPlatform] = None) -> None:
1✔
338
        LOG.debug("Pulling Docker image: %s", docker_image)
1✔
339
        # some path in the docker image string indicates a custom repository
340
        try:
1✔
341
            self.client().images.pull(docker_image, platform=platform)
1✔
342
        except ImageNotFound:
1✔
343
            raise NoSuchImage(docker_image)
1✔
344
        except APIError as e:
×
345
            raise ContainerException() from e
×
346

347
    def push_image(self, docker_image: str) -> None:
1✔
348
        LOG.debug("Pushing Docker image: %s", docker_image)
1✔
349
        try:
1✔
350
            result = self.client().images.push(docker_image)
1✔
351
            # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising
352
            if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):
1✔
353
                if "image does not exist locally" in to_str(result):
1✔
354
                    raise NoSuchImage(docker_image)
1✔
355
                if "is denied" in to_str(result):
1✔
356
                    raise AccessDenied(docker_image)
1✔
357
                if "requesting higher privileges than access token allows" in to_str(result):
1✔
358
                    raise AccessDenied(docker_image)
×
359
                if "access token has insufficient scopes" in to_str(result):
1✔
360
                    raise AccessDenied(docker_image)
×
361
                if "connection refused" in to_str(result):
1✔
362
                    raise RegistryConnectionError(result)
1✔
363
                raise ContainerException(result)
×
364
        except ImageNotFound:
1✔
365
            raise NoSuchImage(docker_image)
×
366
        except APIError as e:
1✔
367
            # note: error message 'image not known' raised by Podman API
368
            if "image not known" in str(e):
×
369
                raise NoSuchImage(docker_image)
×
370
            raise ContainerException() from e
×
371

372
    def build_image(
1✔
373
        self,
374
        dockerfile_path: str,
375
        image_name: str,
376
        context_path: str = None,
377
        platform: Optional[DockerPlatform] = None,
378
    ):
379
        try:
1✔
380
            dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
1✔
381
            context_path = context_path or os.path.dirname(dockerfile_path)
1✔
382
            LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)
1✔
383
            _, logs_iterator = self.client().images.build(
1✔
384
                path=context_path,
385
                dockerfile=dockerfile_path,
386
                tag=image_name,
387
                rm=True,
388
                platform=platform,
389
            )
390
            # logs_iterator is a stream of dicts. Example content:
391
            # {'stream': 'Step 1/4 : FROM alpine'}
392
            # ... other build steps
393
            # {'aux': {'ID': 'sha256:4dcf90e87fb963e898f9c7a0451a40e36f8e7137454c65ae4561277081747825'}}
394
            # {'stream': 'Successfully tagged img-5201f3e1:latest\n'}
395
            output = ""
1✔
396
            for log in logs_iterator:
1✔
397
                if isinstance(log, dict) and ("stream" in log or "error" in log):
1✔
398
                    output += log.get("stream") or log["error"]
1✔
399
            return output
1✔
400
        except APIError as e:
×
401
            raise ContainerException("Unable to build Docker image") from e
×
402

403
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
404
        try:
1✔
405
            LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)
1✔
406
            image = self.client().images.get(source_ref)
1✔
407
            image.tag(target_name)
1✔
408
        except APIError as e:
1✔
409
            if e.status_code == 404:
1✔
410
                raise NoSuchImage(source_ref)
1✔
411
            raise ContainerException("Unable to tag Docker image") from e
×
412

413
    def get_docker_image_names(
1✔
414
        self,
415
        strip_latest: bool = True,
416
        include_tags: bool = True,
417
        strip_wellknown_repo_prefixes: bool = True,
418
    ):
419
        try:
1✔
420
            images = self.client().images.list()
1✔
421
            image_names = [tag for image in images for tag in image.tags if image.tags]
1✔
422
            if not include_tags:
1✔
423
                image_names = [image_name.rpartition(":")[0] for image_name in image_names]
1✔
424
            if strip_wellknown_repo_prefixes:
1✔
425
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
1✔
426
            if strip_latest:
1✔
427
                Util.append_without_latest(image_names)
1✔
428
            return image_names
1✔
429
        except APIError as e:
×
430
            raise ContainerException() from e
×
431

432
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
433
        try:
1✔
434
            container = self.client().containers.get(container_name_or_id)
1✔
435
            return to_str(container.logs())
1✔
436
        except NotFound:
1✔
437
            if safe:
1✔
438
                return ""
1✔
439
            raise NoSuchContainer(container_name_or_id)
1✔
440
        except APIError as e:
1✔
441
            if safe:
1✔
442
                return ""
×
443
            raise ContainerException() from e
1✔
444

445
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
446
        try:
1✔
447
            container = self.client().containers.get(container_name_or_id)
1✔
448
            return container.logs(stream=True, follow=True)
1✔
449
        except NotFound:
1✔
450
            raise NoSuchContainer(container_name_or_id)
1✔
451
        except APIError as e:
×
452
            raise ContainerException() from e
×
453

454
    def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:
1✔
455
        try:
1✔
456
            return self.client().containers.get(container_name_or_id).attrs
1✔
457
        except NotFound:
1✔
458
            raise NoSuchContainer(container_name_or_id)
1✔
459
        except APIError as e:
×
460
            raise ContainerException() from e
×
461

462
    def inspect_image(
1✔
463
        self,
464
        image_name: str,
465
        pull: bool = True,
466
        strip_wellknown_repo_prefixes: bool = True,
467
    ) -> Dict[str, Union[dict, list, str]]:
468
        try:
1✔
469
            result = self.client().images.get(image_name).attrs
1✔
470
            if strip_wellknown_repo_prefixes:
1✔
471
                if result.get("RepoDigests"):
1✔
472
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
1✔
473
                        result["RepoDigests"]
474
                    )
475
                if result.get("RepoTags"):
1✔
476
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
1✔
477
            return result
1✔
478
        except NotFound:
1✔
479
            if pull:
1✔
480
                self.pull_image(image_name)
1✔
481
                return self.inspect_image(image_name, pull=False)
1✔
482
            raise NoSuchImage(image_name)
1✔
483
        except APIError as e:
×
484
            raise ContainerException() from e
×
485

486
    def create_network(self, network_name: str) -> None:
1✔
487
        try:
1✔
488
            return self.client().networks.create(name=network_name).id
1✔
489
        except APIError as e:
×
490
            raise ContainerException() from e
×
491

492
    def delete_network(self, network_name: str) -> None:
1✔
493
        try:
1✔
494
            return self.client().networks.get(network_name).remove()
1✔
495
        except NotFound:
1✔
496
            raise NoSuchNetwork(network_name)
×
497
        except APIError as e:
1✔
498
            raise ContainerException() from e
1✔
499

500
    def inspect_network(self, network_name: str) -> Dict[str, Union[Dict, str]]:
1✔
501
        try:
1✔
502
            return self.client().networks.get(network_name).attrs
1✔
503
        except NotFound:
1✔
504
            raise NoSuchNetwork(network_name)
1✔
505
        except APIError as e:
×
506
            raise ContainerException() from e
×
507

508
    def connect_container_to_network(
1✔
509
        self,
510
        network_name: str,
511
        container_name_or_id: str,
512
        aliases: Optional[List] = None,
513
        link_local_ips: List[str] = None,
514
    ) -> None:
515
        LOG.debug(
1✔
516
            "Connecting container '%s' to network '%s' with aliases '%s'",
517
            container_name_or_id,
518
            network_name,
519
            aliases,
520
        )
521
        try:
1✔
522
            network = self.client().networks.get(network_name)
1✔
523
        except NotFound:
1✔
524
            raise NoSuchNetwork(network_name)
1✔
525
        try:
1✔
526
            network.connect(
1✔
527
                container=container_name_or_id,
528
                aliases=aliases,
529
                link_local_ips=link_local_ips,
530
            )
531
        except NotFound:
1✔
532
            raise NoSuchContainer(container_name_or_id)
1✔
533
        except APIError as e:
×
534
            raise ContainerException() from e
×
535

536
    def disconnect_container_from_network(
1✔
537
        self, network_name: str, container_name_or_id: str
538
    ) -> None:
539
        LOG.debug(
1✔
540
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
541
        )
542
        try:
1✔
543
            try:
1✔
544
                network = self.client().networks.get(network_name)
1✔
545
            except NotFound:
1✔
546
                raise NoSuchNetwork(network_name)
1✔
547
            try:
1✔
548
                network.disconnect(container_name_or_id)
1✔
549
            except NotFound:
1✔
550
                raise NoSuchContainer(container_name_or_id)
1✔
551
        except APIError as e:
1✔
552
            raise ContainerException() from e
×
553

554
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
555
        networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]
1✔
556
        network_names = list(networks)
1✔
557
        if len(network_names) > 1:
1✔
558
            LOG.info("Container has more than one assigned network. Picking the first one...")
×
559
        return networks[network_names[0]]["IPAddress"]
1✔
560

561
    @lru_cache(maxsize=None)
1✔
562
    def has_docker(self) -> bool:
1✔
563
        try:
1✔
564
            if not self.docker_client:
1✔
565
                return False
×
566
            self.client().ping()
1✔
567
            return True
1✔
568
        except APIError:
×
569
            return False
×
570

571
    def remove_image(self, image: str, force: bool = True):
1✔
572
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
1✔
573
        try:
1✔
574
            self.client().images.remove(image=image, force=force)
1✔
575
        except ImageNotFound:
1✔
576
            if not force:
1✔
577
                raise NoSuchImage(image)
1✔
578
        except APIError as e:
×
579
            if "image not known" in str(e):
×
580
                raise NoSuchImage(image)
×
581
            raise ContainerException() from e
×
582

583
    def commit(
1✔
584
        self,
585
        container_name_or_id: str,
586
        image_name: str,
587
        image_tag: str,
588
    ):
589
        LOG.debug(
1✔
590
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
591
        )
592
        try:
1✔
593
            container = self.client().containers.get(container_name_or_id)
1✔
594
            container.commit(repository=image_name, tag=image_tag)
1✔
595
        except NotFound:
1✔
596
            raise NoSuchContainer(container_name_or_id)
1✔
597
        except APIError as e:
×
598
            raise ContainerException() from e
×
599

600
    def start_container(
1✔
601
        self,
602
        container_name_or_id: str,
603
        stdin=None,
604
        interactive: bool = False,
605
        attach: bool = False,
606
        flags: Optional[str] = None,
607
    ) -> Tuple[bytes, bytes]:
608
        LOG.debug("Starting container %s", container_name_or_id)
1✔
609
        try:
1✔
610
            container = self.client().containers.get(container_name_or_id)
1✔
611
            stdout = to_bytes(container_name_or_id)
1✔
612
            stderr = b""
1✔
613
            if interactive or attach:
1✔
614
                params = {"stdout": 1, "stderr": 1, "stream": 1}
1✔
615
                if interactive:
1✔
616
                    params["stdin"] = 1
1✔
617
                sock = container.attach_socket(params=params)
1✔
618
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
619
                result_queue = queue.Queue()
1✔
620
                thread_started = threading.Event()
1✔
621
                start_waiting = threading.Event()
1✔
622

623
                # Note: We need to be careful about potential race conditions here - .wait() should happen right
624
                #   after .start(). Hence starting a thread and asynchronously waiting for the container exit code
625
                def wait_for_result(*_):
1✔
626
                    _exit_code = -1
1✔
627
                    try:
1✔
628
                        thread_started.set()
1✔
629
                        start_waiting.wait()
1✔
630
                        _exit_code = container.wait()["StatusCode"]
1✔
631
                    except APIError as e:
×
632
                        _exit_code = 1
×
633
                        raise ContainerException(str(e))
×
634
                    finally:
635
                        result_queue.put(_exit_code)
1✔
636

637
                # start listener thread
638
                start_worker_thread(wait_for_result)
1✔
639
                thread_started.wait()
1✔
640
                try:
1✔
641
                    # start container
642
                    container.start()
1✔
643
                finally:
644
                    # start awaiting container result
645
                    start_waiting.set()
1✔
646

647
                # handle container input/output
648
                # under windows, the socket has no __enter__ / cannot be used as context manager
649
                # therefore try/finally instead of with here
650
                try:
1✔
651
                    if stdin:
1✔
652
                        sock.sendall(to_bytes(stdin))
1✔
653
                        sock.shutdown(socket.SHUT_WR)
1✔
654
                    stdout, stderr = self._read_from_sock(sock, False)
1✔
655
                except socket.timeout:
×
656
                    LOG.debug(
×
657
                        "Socket timeout when talking to the I/O streams of Docker container '%s'",
658
                        container_name_or_id,
659
                    )
660
                finally:
661
                    sock.close()
1✔
662

663
                # get container exit code
664
                exit_code = result_queue.get()
1✔
665
                if exit_code:
1✔
666
                    raise ContainerException(
1✔
667
                        f"Docker container returned with exit code {exit_code}",
668
                        stdout=stdout,
669
                        stderr=stderr,
670
                    )
671
            else:
672
                container.start()
1✔
673
            return stdout, stderr
1✔
674
        except NotFound:
1✔
675
            raise NoSuchContainer(container_name_or_id)
1✔
676
        except APIError as e:
1✔
677
            raise ContainerException() from e
1✔
678

679
    def attach_to_container(self, container_name_or_id: str):
1✔
680
        client: DockerClient = self.client()
×
681
        container = cast(Container, client.containers.get(container_name_or_id))
×
682
        container.attach()
×
683

684
    def create_container(
1✔
685
        self,
686
        image_name: str,
687
        *,
688
        name: Optional[str] = None,
689
        entrypoint: Optional[Union[List[str], str]] = None,
690
        remove: bool = False,
691
        interactive: bool = False,
692
        tty: bool = False,
693
        detach: bool = False,
694
        command: Optional[Union[List[str], str]] = None,
695
        volumes: Optional[List[SimpleVolumeBind]] = None,
696
        ports: Optional[PortMappings] = None,
697
        exposed_ports: Optional[List[str]] = None,
698
        env_vars: Optional[Dict[str, str]] = None,
699
        user: Optional[str] = None,
700
        cap_add: Optional[List[str]] = None,
701
        cap_drop: Optional[List[str]] = None,
702
        security_opt: Optional[List[str]] = None,
703
        network: Optional[str] = None,
704
        dns: Optional[Union[str, List[str]]] = None,
705
        additional_flags: Optional[str] = None,
706
        workdir: Optional[str] = None,
707
        privileged: Optional[bool] = None,
708
        labels: Optional[Dict[str, str]] = None,
709
        platform: Optional[DockerPlatform] = None,
710
        ulimits: Optional[List[Ulimit]] = None,
711
        init: Optional[bool] = None,
712
        log_config: Optional[LogConfig] = None,
713
    ) -> str:
714
        LOG.debug("Creating container with attributes: %s", locals())
1✔
715
        extra_hosts = None
1✔
716
        if additional_flags:
1✔
717
            parsed_flags = Util.parse_additional_flags(
1✔
718
                additional_flags,
719
                env_vars=env_vars,
720
                volumes=volumes,
721
                network=network,
722
                platform=platform,
723
                privileged=privileged,
724
                ports=ports,
725
                ulimits=ulimits,
726
                user=user,
727
                dns=dns,
728
            )
729
            env_vars = parsed_flags.env_vars
1✔
730
            extra_hosts = parsed_flags.extra_hosts
1✔
731
            volumes = parsed_flags.volumes
1✔
732
            labels = parsed_flags.labels
1✔
733
            network = parsed_flags.network
1✔
734
            platform = parsed_flags.platform
1✔
735
            privileged = parsed_flags.privileged
1✔
736
            ports = parsed_flags.ports
1✔
737
            ulimits = parsed_flags.ulimits
1✔
738
            user = parsed_flags.user
1✔
739
            dns = parsed_flags.dns
1✔
740

741
        try:
1✔
742
            kwargs = {}
1✔
743
            if cap_add:
1✔
744
                kwargs["cap_add"] = cap_add
1✔
745
            if cap_drop:
1✔
746
                kwargs["cap_drop"] = cap_drop
1✔
747
            if security_opt:
1✔
748
                kwargs["security_opt"] = security_opt
1✔
749
            if dns:
1✔
750
                kwargs["dns"] = ensure_list(dns)
1✔
751
            if exposed_ports:
1✔
752
                # This is not exactly identical to --expose, as they are listed in the "HostConfig" on docker inspect
753
                # but the behavior should be identical
UNCOV
754
                kwargs["ports"] = {port: [] for port in exposed_ports}
×
755
            if ports:
1✔
756
                kwargs.setdefault("ports", {})
1✔
757
                kwargs["ports"].update(ports.to_dict())
1✔
758
            if workdir:
1✔
759
                kwargs["working_dir"] = workdir
1✔
760
            if privileged:
1✔
761
                kwargs["privileged"] = True
×
762
            if init:
1✔
763
                kwargs["init"] = True
1✔
764
            if labels:
1✔
765
                kwargs["labels"] = labels
1✔
766
            if log_config:
1✔
767
                kwargs["log_config"] = DockerLogConfig(
1✔
768
                    type=log_config.type, config=log_config.config
769
                )
770
            if ulimits:
1✔
771
                kwargs["ulimits"] = [
1✔
772
                    docker.types.Ulimit(
773
                        name=ulimit.name, soft=ulimit.soft_limit, hard=ulimit.hard_limit
774
                    )
775
                    for ulimit in ulimits
776
                ]
777
            mounts = None
1✔
778
            if volumes:
1✔
779
                mounts = Util.convert_mount_list_to_dict(volumes)
1✔
780

781
            def create_container():
1✔
782
                return self.client().containers.create(
1✔
783
                    image=image_name,
784
                    command=command,
785
                    auto_remove=remove,
786
                    name=name,
787
                    stdin_open=interactive,
788
                    tty=tty,
789
                    entrypoint=entrypoint,
790
                    environment=env_vars,
791
                    detach=detach,
792
                    user=user,
793
                    network=network,
794
                    volumes=mounts,
795
                    extra_hosts=extra_hosts,
796
                    platform=platform,
797
                    **kwargs,
798
                )
799

800
            try:
1✔
801
                container = create_container()
1✔
802
            except ImageNotFound:
1✔
803
                LOG.debug("Image not found. Pulling image %s", image_name)
1✔
804
                self.pull_image(image_name, platform)
1✔
805
                container = create_container()
1✔
806
            return container.id
1✔
807
        except ImageNotFound:
1✔
808
            raise NoSuchImage(image_name)
×
809
        except APIError as e:
1✔
810
            raise ContainerException() from e
×
811

812
    def run_container(
1✔
813
        self,
814
        image_name: str,
815
        stdin=None,
816
        *,
817
        name: Optional[str] = None,
818
        entrypoint: Optional[str] = None,
819
        remove: bool = False,
820
        interactive: bool = False,
821
        tty: bool = False,
822
        detach: bool = False,
823
        command: Optional[Union[List[str], str]] = None,
824
        volumes: Optional[List[SimpleVolumeBind]] = None,
825
        ports: Optional[PortMappings] = None,
826
        exposed_ports: Optional[List[str]] = None,
827
        env_vars: Optional[Dict[str, str]] = None,
828
        user: Optional[str] = None,
829
        cap_add: Optional[List[str]] = None,
830
        cap_drop: Optional[List[str]] = None,
831
        security_opt: Optional[List[str]] = None,
832
        network: Optional[str] = None,
833
        dns: Optional[str] = None,
834
        additional_flags: Optional[str] = None,
835
        workdir: Optional[str] = None,
836
        labels: Optional[Dict[str, str]] = None,
837
        platform: Optional[DockerPlatform] = None,
838
        privileged: Optional[bool] = None,
839
        ulimits: Optional[List[Ulimit]] = None,
840
        init: Optional[bool] = None,
841
        log_config: Optional[LogConfig] = None,
842
    ) -> Tuple[bytes, bytes]:
843
        LOG.debug("Running container with image: %s", image_name)
1✔
844
        container = None
1✔
845
        try:
1✔
846
            container = self.create_container(
1✔
847
                image_name,
848
                name=name,
849
                entrypoint=entrypoint,
850
                interactive=interactive,
851
                tty=tty,
852
                detach=detach,
853
                remove=remove and detach,
854
                command=command,
855
                volumes=volumes,
856
                ports=ports,
857
                exposed_ports=exposed_ports,
858
                env_vars=env_vars,
859
                user=user,
860
                cap_add=cap_add,
861
                cap_drop=cap_drop,
862
                security_opt=security_opt,
863
                network=network,
864
                dns=dns,
865
                additional_flags=additional_flags,
866
                workdir=workdir,
867
                privileged=privileged,
868
                platform=platform,
869
                init=init,
870
                labels=labels,
871
                ulimits=ulimits,
872
                log_config=log_config,
873
            )
874
            result = self.start_container(
1✔
875
                container_name_or_id=container,
876
                stdin=stdin,
877
                interactive=interactive,
878
                attach=not detach,
879
            )
880
        finally:
881
            if remove and container and not detach:
1✔
882
                self.remove_container(container)
1✔
883
        return result
1✔
884

885
    def exec_in_container(
1✔
886
        self,
887
        container_name_or_id: str,
888
        command: Union[List[str], str],
889
        interactive=False,
890
        detach=False,
891
        env_vars: Optional[Dict[str, Optional[str]]] = None,
892
        stdin: Optional[bytes] = None,
893
        user: Optional[str] = None,
894
        workdir: Optional[str] = None,
895
    ) -> Tuple[bytes, bytes]:
896
        LOG.debug("Executing command in container %s: %s", container_name_or_id, command)
1✔
897
        try:
1✔
898
            container: Container = self.client().containers.get(container_name_or_id)
1✔
899
            result = container.exec_run(
1✔
900
                cmd=command,
901
                environment=env_vars,
902
                user=user,
903
                detach=detach,
904
                stdin=interactive and bool(stdin),
905
                socket=interactive and bool(stdin),
906
                stdout=True,
907
                stderr=True,
908
                demux=True,
909
                workdir=workdir,
910
            )
911
            tty = False
1✔
912
            if interactive and stdin:  # result is a socket
1✔
913
                sock = result[1]
1✔
914
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
915
                with sock:
1✔
916
                    try:
1✔
917
                        sock.sendall(stdin)
1✔
918
                        sock.shutdown(socket.SHUT_WR)
1✔
919
                        stdout, stderr = self._read_from_sock(sock, tty)
1✔
920
                        return stdout, stderr
1✔
921
                    except socket.timeout:
×
922
                        pass
×
923
            else:
924
                if detach:
1✔
925
                    return b"", b""
×
926
                return_code = result[0]
1✔
927
                if isinstance(result[1], bytes):
1✔
928
                    stdout = result[1]
×
929
                    stderr = b""
×
930
                else:
931
                    stdout, stderr = result[1]
1✔
932
                if return_code != 0:
1✔
933
                    raise ContainerException(
1✔
934
                        f"Exec command returned with exit code {return_code}", stdout, stderr
935
                    )
936
                return stdout, stderr
1✔
937
        except ContainerError:
1✔
938
            raise NoSuchContainer(container_name_or_id)
×
939
        except APIError as e:
1✔
940
            raise ContainerException() from e
1✔
941

942
    def login(self, username: str, password: str, registry: Optional[str] = None) -> None:
1✔
943
        LOG.debug("Docker login for %s", username)
×
944
        try:
×
945
            self.client().login(username, password=password, registry=registry, reauth=True)
×
946
        except APIError as e:
×
947
            raise ContainerException() from e
×
948

949

950
# apply patches required for podman API compatibility
951

952

953
@property
1✔
954
def _container_image(self):
1✔
955
    image_id = self.attrs.get("ImageID", self.attrs["Image"])
1✔
956
    if image_id is None:
1✔
957
        return None
×
958
    image_ref = image_id
1✔
959
    # Fix for podman API response: Docker returns "sha:..." for `Image`, podman returns "<image-name>:<tag>".
960
    # See https://github.com/containers/podman/issues/8329 . Without this check, the Docker client would
961
    # blindly strip off the suffix after the colon `:` (which is the `<tag>` in podman's case) which would
962
    # then lead to "no such image" errors.
963
    if re.match("sha256:[0-9a-f]{64}", image_id, flags=re.IGNORECASE):
1✔
964
        image_ref = image_id.split(":")[1]
1✔
965
    return self.client.images.get(image_ref)
1✔
966

967

968
Container.image = _container_image
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc