• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20154439467

11 Dec 2025 04:58PM UTC coverage: 86.873% (+0.006%) from 86.867%
20154439467

push

github

web-flow
SQS: Improve update support for CloudFormation handlers. (#13477)

34 of 34 new or added lines in 4 files covered. (100.0%)

15 existing lines in 5 files now uncovered.

69932 of 80499 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.19
/localstack-core/localstack/utils/container_utils/docker_sdk_client.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import queue
1✔
6
import re
1✔
7
import socket
1✔
8
import threading
1✔
9
from collections.abc import Callable
1✔
10
from functools import cache
1✔
11
from time import sleep
1✔
12
from typing import cast
1✔
13
from urllib.parse import quote
1✔
14

15
import docker
1✔
16
from docker import DockerClient
1✔
17
from docker.errors import APIError, ContainerError, DockerException, ImageNotFound, NotFound
1✔
18
from docker.models.containers import Container
1✔
19
from docker.types import LogConfig as DockerLogConfig
1✔
20
from docker.utils.socket import STDERR, STDOUT, frames_iter
1✔
21

22
from localstack.config import LS_LOG
1✔
23
from localstack.constants import TRACE_LOG_LEVELS
1✔
24
from localstack.utils.collections import ensure_list
1✔
25
from localstack.utils.container_utils.container_client import (
1✔
26
    AccessDenied,
27
    CancellableStream,
28
    ContainerClient,
29
    ContainerException,
30
    DockerContainerStats,
31
    DockerContainerStatus,
32
    DockerNotAvailable,
33
    DockerPlatform,
34
    LogConfig,
35
    NoSuchContainer,
36
    NoSuchImage,
37
    NoSuchNetwork,
38
    PortMappings,
39
    RegistryConnectionError,
40
    SimpleVolumeBind,
41
    Ulimit,
42
    Util,
43
)
44
from localstack.utils.strings import to_bytes, to_str
1✔
45
from localstack.utils.threads import start_worker_thread
1✔
46

47
LOG = logging.getLogger(__name__)
1✔
48
SDK_ISDIR = 1 << 31
1✔
49

50

51
class SdkDockerClient(ContainerClient):
1✔
52
    """
53
    Class for managing Docker (or Podman) using the Python Docker SDK.
54

55
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
56
    for Docker these days (with ongoing efforts to further streamline the two), and the Docker SDK
57
    is doing some of the heavy lifting for us to support both target platforms.
58
    """
59

60
    docker_client: DockerClient | None
1✔
61

62
    def __init__(self):
1✔
63
        try:
1✔
64
            self.docker_client = self._create_client()
1✔
65
            logging.getLogger("urllib3").setLevel(logging.INFO)
1✔
66
        except DockerNotAvailable:
1✔
67
            self.docker_client = None
1✔
68

69
    def client(self):
1✔
70
        if self.docker_client:
1✔
71
            return self.docker_client
1✔
72
        # if the initialization failed before, try to initialize on-demand
73
        self.docker_client = self._create_client()
1✔
74
        return self.docker_client
1✔
75

76
    @staticmethod
1✔
77
    def _create_client():
1✔
78
        from localstack.config import DOCKER_SDK_DEFAULT_RETRIES, DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS
1✔
79

80
        for attempt in range(0, DOCKER_SDK_DEFAULT_RETRIES + 1):
1✔
81
            try:
1✔
82
                return docker.from_env(timeout=DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS)
1✔
83
            except DockerException as e:
1✔
84
                LOG.debug(
1✔
85
                    "Creating Docker SDK client failed: %s. "
86
                    "If you want to use Docker as container runtime, make sure to mount the socket at /var/run/docker.sock",
87
                    e,
88
                    exc_info=LS_LOG in TRACE_LOG_LEVELS,
89
                )
90
                if attempt < DOCKER_SDK_DEFAULT_RETRIES:
1✔
91
                    # wait for a second before retrying
92
                    sleep(1)
1✔
93
                else:
94
                    # we are out of attempts
95
                    raise DockerNotAvailable("Docker not available") from e
1✔
96

97
    def _read_from_sock(self, sock: socket, tty: bool):
1✔
98
        """Reads multiplexed messages from a socket returned by attach_socket.
99

100
        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
101
        """
102
        stdout = b""
1✔
103
        stderr = b""
1✔
104
        for frame_type, frame_data in frames_iter(sock, tty):
1✔
105
            if frame_type == STDOUT:
1✔
106
                stdout += frame_data
1✔
107
            elif frame_type == STDERR:
1✔
108
                stderr += frame_data
1✔
109
            else:
110
                raise ContainerException("Invalid frame type when reading from socket")
×
111
        return stdout, stderr
1✔
112

113
    def _container_path_info(self, container: Container, container_path: str):
1✔
114
        """
115
        Get information about a path in the given container
116
        :param container: Container to be inspected
117
        :param container_path: Path in container
118
        :return: Tuple (path_exists, path_is_directory)
119
        """
120
        # Docker CLI copy uses go FileMode to determine if target is a dict or not
121
        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260
122
        # The isDir Bit is the most significant bit in the 32bit struct:
123
        # https://golang.org/src/os/types.go?s=2650:2683
124
        api_client = self.client().api
1✔
125

126
        def _head(path_suffix, **kwargs):
1✔
127
            return api_client.head(
1✔
128
                api_client.base_url + path_suffix, **api_client._set_request_timeout(kwargs)
129
            )
130

131
        escaped_id = quote(container.id, safe="/:")
1✔
132
        result = _head(f"/containers/{escaped_id}/archive", params={"path": container_path})
1✔
133
        stats = result.headers.get("X-Docker-Container-Path-Stat")
1✔
134
        target_exists = result.ok
1✔
135

136
        if target_exists:
1✔
137
            stats = json.loads(base64.b64decode(stats).decode("utf-8"))
1✔
138
        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)
1✔
139
        return target_exists, target_is_dir
1✔
140

141
    def get_system_info(self) -> dict:
1✔
142
        return self.client().info()
1✔
143

144
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
145
        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose
146
        try:
1✔
147
            container = self.client().containers.get(container_name)
1✔
148
            if container.status == "running":
1✔
149
                return DockerContainerStatus.UP
1✔
150
            elif container.status == "paused":
1✔
151
                return DockerContainerStatus.PAUSED
1✔
152
            else:
153
                return DockerContainerStatus.DOWN
1✔
154
        except NotFound:
1✔
155
            return DockerContainerStatus.NON_EXISTENT
1✔
156
        except APIError as e:
×
157
            raise ContainerException() from e
×
158

159
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
160
        try:
1✔
161
            container = self.client().containers.get(container_name)
1✔
162
            sdk_stats = container.stats(stream=False)
1✔
163

164
            # BlockIO: (Read, Write) bytes
165
            read_bytes = 0
1✔
166
            write_bytes = 0
1✔
167
            for entry in (
1✔
168
                sdk_stats.get("blkio_stats", {}).get("io_service_bytes_recursive", []) or []
169
            ):
170
                if entry.get("op") == "read":
×
171
                    read_bytes += entry.get("value", 0)
×
172
                elif entry.get("op") == "write":
×
173
                    write_bytes += entry.get("value", 0)
×
174

175
            # CPU percentage
176
            cpu_stats = sdk_stats.get("cpu_stats", {})
1✔
177
            precpu_stats = sdk_stats.get("precpu_stats", {})
1✔
178

179
            cpu_delta = cpu_stats.get("cpu_usage", {}).get("total_usage", 0) - precpu_stats.get(
1✔
180
                "cpu_usage", {}
181
            ).get("total_usage", 0)
182

183
            system_delta = cpu_stats.get("system_cpu_usage", 0) - precpu_stats.get(
1✔
184
                "system_cpu_usage", 0
185
            )
186

187
            online_cpus = cpu_stats.get("online_cpus", 1)
1✔
188
            cpu_percent = (
1✔
189
                (cpu_delta / system_delta * 100.0 * online_cpus) if system_delta > 0 else 0.0
190
            )
191

192
            # Memory (usage, limit) bytes
193
            memory_stats = sdk_stats.get("memory_stats", {})
1✔
194
            mem_usage = memory_stats.get("usage", 0)
1✔
195
            mem_limit = memory_stats.get("limit", 1)  # Prevent division by zero
1✔
196
            mem_inactive = memory_stats.get("stats", {}).get("inactive_file", 0)
1✔
197
            used_memory = max(0, mem_usage - mem_inactive)
1✔
198
            mem_percent = (used_memory / mem_limit * 100.0) if mem_limit else 0.0
1✔
199

200
            # Network IO
201
            net_rx = 0
1✔
202
            net_tx = 0
1✔
203
            for iface in sdk_stats.get("networks", {}).values():
1✔
204
                net_rx += iface.get("rx_bytes", 0)
1✔
205
                net_tx += iface.get("tx_bytes", 0)
1✔
206

207
            # Container ID
208
            container_id = sdk_stats.get("id", "")[:12]
1✔
209
            name = sdk_stats.get("name", "").lstrip("/")
1✔
210

211
            return DockerContainerStats(
1✔
212
                Container=container_id,
213
                ID=container_id,
214
                Name=name,
215
                BlockIO=(read_bytes, write_bytes),
216
                CPUPerc=round(cpu_percent, 2),
217
                MemPerc=round(mem_percent, 2),
218
                MemUsage=(used_memory, mem_limit),
219
                NetIO=(net_rx, net_tx),
220
                PIDs=sdk_stats.get("pids_stats", {}).get("current", 0),
221
                SDKStats=sdk_stats,  # keep the raw stats for more detailed information
222
            )
223
        except NotFound:
×
224
            raise NoSuchContainer(container_name)
×
225
        except APIError as e:
×
226
            raise ContainerException() from e
×
227

228
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
229
        LOG.debug("Stopping container: %s", container_name)
1✔
230
        try:
1✔
231
            container = self.client().containers.get(container_name)
1✔
232
            container.stop(timeout=timeout)
1✔
233
        except NotFound:
1✔
234
            raise NoSuchContainer(container_name)
1✔
235
        except APIError as e:
×
236
            raise ContainerException() from e
×
237

238
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
239
        LOG.debug("Restarting container: %s", container_name)
1✔
240
        try:
1✔
241
            container = self.client().containers.get(container_name)
1✔
242
            container.restart(timeout=timeout)
1✔
243
        except NotFound:
1✔
244
            raise NoSuchContainer(container_name)
1✔
245
        except APIError as e:
×
246
            raise ContainerException() from e
×
247

248
    def pause_container(self, container_name: str) -> None:
1✔
249
        LOG.debug("Pausing container: %s", container_name)
1✔
250
        try:
1✔
251
            container = self.client().containers.get(container_name)
1✔
252
            container.pause()
1✔
253
        except NotFound:
1✔
254
            raise NoSuchContainer(container_name)
1✔
255
        except APIError as e:
×
256
            raise ContainerException() from e
×
257

258
    def unpause_container(self, container_name: str) -> None:
1✔
259
        LOG.debug("Unpausing container: %s", container_name)
1✔
260
        try:
1✔
261
            container = self.client().containers.get(container_name)
1✔
262
            container.unpause()
1✔
263
        except NotFound:
×
264
            raise NoSuchContainer(container_name)
×
265
        except APIError as e:
×
266
            raise ContainerException() from e
×
267

268
    def remove_container(
1✔
269
        self, container_name: str, force=True, check_existence=False, volumes=False
270
    ) -> None:
271
        LOG.debug("Removing container: %s, with volumes: %s", container_name, volumes)
1✔
272
        if check_existence and container_name not in self.get_all_container_names():
1✔
273
            LOG.debug("Aborting removing due to check_existence check")
×
274
            return
×
275
        try:
1✔
276
            container = self.client().containers.get(container_name)
1✔
277
            container.remove(force=force, v=volumes)
1✔
278
        except NotFound:
1✔
279
            if not force:
1✔
280
                raise NoSuchContainer(container_name)
1✔
281
        except APIError as e:
×
282
            raise ContainerException() from e
×
283

284
    def list_containers(self, filter: list[str] | str | None = None, all=True) -> list[dict]:
1✔
285
        if filter:
1✔
286
            filter = [filter] if isinstance(filter, str) else filter
1✔
287
            filter = dict([f.split("=", 1) for f in filter])
1✔
288
        LOG.debug("Listing containers with filters: %s", filter)
1✔
289
        try:
1✔
290
            container_list = self.client().containers.list(filters=filter, all=all)
1✔
291
            result = []
1✔
292
            for container in container_list:
1✔
293
                try:
1✔
294
                    result.append(
1✔
295
                        {
296
                            "id": container.id,
297
                            "image": container.image,
298
                            "name": container.name,
299
                            "status": container.status,
300
                            "labels": container.labels,
301
                        }
302
                    )
303
                except Exception as e:
×
304
                    LOG.error("Error checking container %s: %s", container, e)
×
305
            return result
1✔
306
        except APIError as e:
1✔
307
            raise ContainerException() from e
1✔
308

309
    def copy_into_container(
1✔
310
        self, container_name: str, local_path: str, container_path: str
311
    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/
312
        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)
1✔
313
        try:
1✔
314
            container = self.client().containers.get(container_name)
1✔
315
            target_exists, target_isdir = self._container_path_info(container, container_path)
1✔
316
            target_path = container_path if target_isdir else os.path.dirname(container_path)
1✔
317
            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:
1✔
318
                container.put_archive(target_path, tar)
1✔
319
        except NotFound:
1✔
320
            raise NoSuchContainer(container_name)
1✔
321
        except APIError as e:
1✔
322
            raise ContainerException() from e
×
323

324
    def copy_from_container(
1✔
325
        self,
326
        container_name: str,
327
        local_path: str,
328
        container_path: str,
329
    ) -> None:
330
        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)
1✔
331
        try:
1✔
332
            container = self.client().containers.get(container_name)
1✔
333
            bits, _ = container.get_archive(container_path)
1✔
334
            Util.untar_to_path(bits, local_path)
1✔
335
        except NotFound:
1✔
336
            raise NoSuchContainer(container_name)
1✔
337
        except APIError as e:
×
338
            raise ContainerException() from e
×
339

340
    def pull_image(
1✔
341
        self,
342
        docker_image: str,
343
        platform: DockerPlatform | None = None,
344
        log_handler: Callable[[str], None] | None = None,
345
    ) -> None:
346
        LOG.debug("Pulling Docker image: %s", docker_image)
1✔
347
        # some path in the docker image string indicates a custom repository
348

349
        docker_image = self.registry_resolver_strategy.resolve(docker_image)
1✔
350
        kwargs: dict[str, str | bool] = {"platform": platform}
1✔
351
        try:
1✔
352
            if log_handler:
1✔
353
                # Use a lower-level API, as the 'stream' argument is not available in the higher-level `pull`-API
354
                kwargs["stream"] = True
1✔
355
                stream = self.client().api.pull(docker_image, **kwargs)
1✔
356
                for line in stream:
1✔
357
                    log_handler(to_str(line))
1✔
358
            else:
359
                self.client().images.pull(docker_image, **kwargs)
1✔
360
        except ImageNotFound:
1✔
361
            raise NoSuchImage(docker_image)
1✔
362
        except APIError as e:
×
363
            raise ContainerException() from e
×
364

365
    def push_image(self, docker_image: str) -> None:
1✔
366
        LOG.debug("Pushing Docker image: %s", docker_image)
1✔
367
        try:
1✔
368
            result = self.client().images.push(docker_image)
1✔
369
            # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising
370
            if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):
1✔
371
                if "image does not exist locally" in to_str(result):
1✔
372
                    raise NoSuchImage(docker_image)
1✔
373
                if "is denied" in to_str(result):
1✔
374
                    raise AccessDenied(docker_image)
1✔
375
                if "requesting higher privileges than access token allows" in to_str(result):
1✔
376
                    raise AccessDenied(docker_image)
×
377
                if "access token has insufficient scopes" in to_str(result):
1✔
378
                    raise AccessDenied(docker_image)
×
379
                if "connection refused" in to_str(result):
1✔
380
                    raise RegistryConnectionError(result)
1✔
381
                raise ContainerException(result)
×
382
        except ImageNotFound:
1✔
383
            raise NoSuchImage(docker_image)
×
384
        except APIError as e:
1✔
385
            # note: error message 'image not known' raised by Podman API
386
            if "image not known" in str(e):
×
387
                raise NoSuchImage(docker_image)
×
388
            raise ContainerException() from e
×
389

390
    def build_image(
1✔
391
        self,
392
        dockerfile_path: str,
393
        image_name: str,
394
        context_path: str = None,
395
        platform: DockerPlatform | None = None,
396
    ):
397
        try:
1✔
398
            dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
1✔
399
            context_path = context_path or os.path.dirname(dockerfile_path)
1✔
400
            LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)
1✔
401
            _, logs_iterator = self.client().images.build(
1✔
402
                path=context_path,
403
                dockerfile=dockerfile_path,
404
                tag=image_name,
405
                rm=True,
406
                platform=platform,
407
            )
408
            # logs_iterator is a stream of dicts. Example content:
409
            # {'stream': 'Step 1/4 : FROM alpine'}
410
            # ... other build steps
411
            # {'aux': {'ID': 'sha256:4dcf90e87fb963e898f9c7a0451a40e36f8e7137454c65ae4561277081747825'}}
412
            # {'stream': 'Successfully tagged img-5201f3e1:latest\n'}
413
            output = ""
1✔
414
            for log in logs_iterator:
1✔
415
                if isinstance(log, dict) and ("stream" in log or "error" in log):
1✔
416
                    output += log.get("stream") or log["error"]
1✔
417
            return output
1✔
418
        except APIError as e:
×
419
            raise ContainerException("Unable to build Docker image") from e
×
420

421
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
422
        try:
1✔
423
            LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)
1✔
424
            image = self.client().images.get(source_ref)
1✔
425
            image.tag(target_name)
1✔
426
        except APIError as e:
1✔
427
            if e.status_code == 404:
1✔
428
                raise NoSuchImage(source_ref)
1✔
429
            raise ContainerException("Unable to tag Docker image") from e
×
430

431
    def get_docker_image_names(
1✔
432
        self,
433
        strip_latest: bool = True,
434
        include_tags: bool = True,
435
        strip_wellknown_repo_prefixes: bool = True,
436
    ):
437
        try:
1✔
438
            images = self.client().images.list()
1✔
439
            image_names = [tag for image in images for tag in image.tags if image.tags]
1✔
440
            if not include_tags:
1✔
441
                image_names = [image_name.rpartition(":")[0] for image_name in image_names]
1✔
442
            if strip_wellknown_repo_prefixes:
1✔
443
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
1✔
444
            if strip_latest:
1✔
445
                Util.append_without_latest(image_names)
1✔
446
            return image_names
1✔
447
        except APIError as e:
×
448
            raise ContainerException() from e
×
449

450
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
451
        try:
1✔
452
            container = self.client().containers.get(container_name_or_id)
1✔
453
            return to_str(container.logs())
1✔
454
        except NotFound:
1✔
455
            if safe:
1✔
456
                return ""
1✔
457
            raise NoSuchContainer(container_name_or_id)
1✔
458
        except APIError as e:
1✔
459
            if safe:
1✔
460
                return ""
×
461
            raise ContainerException() from e
1✔
462

463
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
464
        try:
1✔
465
            container = self.client().containers.get(container_name_or_id)
1✔
466
            return container.logs(stream=True, follow=True)
1✔
467
        except NotFound:
1✔
468
            raise NoSuchContainer(container_name_or_id)
1✔
469
        except APIError as e:
×
470
            raise ContainerException() from e
×
471

472
    def inspect_container(self, container_name_or_id: str) -> dict[str, dict | str]:
1✔
473
        try:
1✔
474
            return self.client().containers.get(container_name_or_id).attrs
1✔
475
        except NotFound:
1✔
476
            raise NoSuchContainer(container_name_or_id)
1✔
477
        except APIError as e:
×
478
            raise ContainerException() from e
×
479

480
    def inspect_image(
1✔
481
        self,
482
        image_name: str,
483
        pull: bool = True,
484
        strip_wellknown_repo_prefixes: bool = True,
485
    ) -> dict[str, dict | list | str]:
486
        image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
487
        try:
1✔
488
            result = self.client().images.get(image_name).attrs
1✔
489
            if strip_wellknown_repo_prefixes:
1✔
490
                if result.get("RepoDigests"):
1✔
491
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
1✔
492
                        result["RepoDigests"]
493
                    )
494
                if result.get("RepoTags"):
1✔
495
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
1✔
496
            return result
1✔
497
        except NotFound:
1✔
498
            if pull:
1✔
499
                self.pull_image(image_name)
1✔
500
                return self.inspect_image(image_name, pull=False)
1✔
501
            raise NoSuchImage(image_name)
1✔
502
        except APIError as e:
×
503
            raise ContainerException() from e
×
504

505
    def create_network(self, network_name: str) -> None:
1✔
506
        try:
1✔
507
            return self.client().networks.create(name=network_name).id
1✔
508
        except APIError as e:
×
509
            raise ContainerException() from e
×
510

511
    def delete_network(self, network_name: str) -> None:
1✔
512
        try:
1✔
513
            return self.client().networks.get(network_name).remove()
1✔
UNCOV
514
        except NotFound:
×
515
            raise NoSuchNetwork(network_name)
×
UNCOV
516
        except APIError as e:
×
UNCOV
517
            raise ContainerException() from e
×
518

519
    def inspect_network(self, network_name: str) -> dict[str, dict | str]:
1✔
520
        try:
1✔
521
            return self.client().networks.get(network_name).attrs
1✔
522
        except NotFound:
1✔
523
            raise NoSuchNetwork(network_name)
1✔
524
        except APIError as e:
×
525
            raise ContainerException() from e
×
526

527
    def connect_container_to_network(
1✔
528
        self,
529
        network_name: str,
530
        container_name_or_id: str,
531
        aliases: list | None = None,
532
        link_local_ips: list[str] = None,
533
    ) -> None:
534
        LOG.debug(
1✔
535
            "Connecting container '%s' to network '%s' with aliases '%s'",
536
            container_name_or_id,
537
            network_name,
538
            aliases,
539
        )
540
        try:
1✔
541
            network = self.client().networks.get(network_name)
1✔
542
        except NotFound:
1✔
543
            raise NoSuchNetwork(network_name)
1✔
544
        try:
1✔
545
            network.connect(
1✔
546
                container=container_name_or_id,
547
                aliases=aliases,
548
                link_local_ips=link_local_ips,
549
            )
550
        except NotFound:
1✔
551
            raise NoSuchContainer(container_name_or_id)
1✔
552
        except APIError as e:
×
553
            raise ContainerException() from e
×
554

555
    def disconnect_container_from_network(
1✔
556
        self, network_name: str, container_name_or_id: str
557
    ) -> None:
558
        LOG.debug(
1✔
559
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
560
        )
561
        try:
1✔
562
            try:
1✔
563
                network = self.client().networks.get(network_name)
1✔
564
            except NotFound:
1✔
565
                raise NoSuchNetwork(network_name)
1✔
566
            try:
1✔
567
                network.disconnect(container_name_or_id)
1✔
568
            except NotFound:
1✔
569
                raise NoSuchContainer(container_name_or_id)
1✔
570
        except APIError as e:
1✔
571
            raise ContainerException() from e
×
572

573
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
574
        networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]
1✔
575
        network_names = list(networks)
1✔
576
        if len(network_names) > 1:
1✔
577
            LOG.info("Container has more than one assigned network. Picking the first one...")
×
578
        return networks[network_names[0]]["IPAddress"]
1✔
579

580
    @cache
1✔
581
    def has_docker(self) -> bool:
1✔
582
        try:
1✔
583
            if not self.docker_client:
1✔
584
                return False
×
585
            self.client().ping()
1✔
586
            return True
1✔
587
        except APIError:
×
588
            return False
×
589

590
    def remove_image(self, image: str, force: bool = True):
1✔
591
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
1✔
592
        try:
1✔
593
            self.client().images.remove(image=image, force=force)
1✔
594
        except ImageNotFound:
1✔
595
            if not force:
1✔
596
                raise NoSuchImage(image)
1✔
597
        except APIError as e:
×
598
            if "image not known" in str(e):
×
599
                raise NoSuchImage(image)
×
600
            raise ContainerException() from e
×
601

602
    def commit(
1✔
603
        self,
604
        container_name_or_id: str,
605
        image_name: str,
606
        image_tag: str,
607
    ):
608
        LOG.debug(
1✔
609
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
610
        )
611
        try:
1✔
612
            container = self.client().containers.get(container_name_or_id)
1✔
613
            container.commit(repository=image_name, tag=image_tag)
1✔
614
        except NotFound:
1✔
615
            raise NoSuchContainer(container_name_or_id)
1✔
616
        except APIError as e:
×
617
            raise ContainerException() from e
×
618

619
    def start_container(
1✔
620
        self,
621
        container_name_or_id: str,
622
        stdin=None,
623
        interactive: bool = False,
624
        attach: bool = False,
625
        flags: str | None = None,
626
    ) -> tuple[bytes, bytes]:
627
        LOG.debug("Starting container %s", container_name_or_id)
1✔
628
        try:
1✔
629
            container = self.client().containers.get(container_name_or_id)
1✔
630
            stdout = to_bytes(container_name_or_id)
1✔
631
            stderr = b""
1✔
632
            if interactive or attach:
1✔
633
                params = {"stdout": 1, "stderr": 1, "stream": 1}
1✔
634
                if interactive:
1✔
635
                    params["stdin"] = 1
1✔
636
                sock = container.attach_socket(params=params)
1✔
637
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
638
                result_queue = queue.Queue()
1✔
639
                thread_started = threading.Event()
1✔
640
                start_waiting = threading.Event()
1✔
641

642
                # Note: We need to be careful about potential race conditions here - .wait() should happen right
643
                #   after .start(). Hence starting a thread and asynchronously waiting for the container exit code
644
                def wait_for_result(*_):
1✔
645
                    _exit_code = -1
1✔
646
                    try:
1✔
647
                        thread_started.set()
1✔
648
                        start_waiting.wait()
1✔
649
                        _exit_code = container.wait()["StatusCode"]
1✔
650
                    except APIError as e:
×
651
                        _exit_code = 1
×
652
                        raise ContainerException(str(e))
×
653
                    finally:
654
                        result_queue.put(_exit_code)
1✔
655

656
                # start listener thread
657
                start_worker_thread(wait_for_result)
1✔
658
                thread_started.wait()
1✔
659
                try:
1✔
660
                    # start container
661
                    container.start()
1✔
662
                finally:
663
                    # start awaiting container result
664
                    start_waiting.set()
1✔
665

666
                # handle container input/output
667
                # under windows, the socket has no __enter__ / cannot be used as context manager
668
                # therefore try/finally instead of with here
669
                try:
1✔
670
                    if stdin:
1✔
671
                        sock.sendall(to_bytes(stdin))
1✔
672
                        sock.shutdown(socket.SHUT_WR)
1✔
673
                    stdout, stderr = self._read_from_sock(sock, False)
1✔
674
                except TimeoutError:
×
675
                    LOG.debug(
×
676
                        "Socket timeout when talking to the I/O streams of Docker container '%s'",
677
                        container_name_or_id,
678
                    )
679
                finally:
680
                    sock.close()
1✔
681

682
                # get container exit code
683
                exit_code = result_queue.get()
1✔
684
                if exit_code:
1✔
685
                    raise ContainerException(
1✔
686
                        f"Docker container returned with exit code {exit_code}",
687
                        stdout=stdout,
688
                        stderr=stderr,
689
                    )
690
            else:
691
                container.start()
1✔
692
            return stdout, stderr
1✔
693
        except NotFound:
1✔
694
            raise NoSuchContainer(container_name_or_id)
1✔
695
        except APIError as e:
1✔
696
            raise ContainerException() from e
1✔
697

698
    def attach_to_container(self, container_name_or_id: str):
1✔
699
        client: DockerClient = self.client()
×
700
        container = cast(Container, client.containers.get(container_name_or_id))
×
701
        container.attach()
×
702

703
    def create_container(
1✔
704
        self,
705
        image_name: str,
706
        *,
707
        name: str | None = None,
708
        entrypoint: list[str] | str | None = None,
709
        remove: bool = False,
710
        interactive: bool = False,
711
        tty: bool = False,
712
        detach: bool = False,
713
        command: list[str] | str | None = None,
714
        volumes: list[SimpleVolumeBind] | None = None,
715
        ports: PortMappings | None = None,
716
        exposed_ports: list[str] | None = None,
717
        env_vars: dict[str, str] | None = None,
718
        user: str | None = None,
719
        cap_add: list[str] | None = None,
720
        cap_drop: list[str] | None = None,
721
        security_opt: list[str] | None = None,
722
        network: str | None = None,
723
        dns: str | list[str] | None = None,
724
        additional_flags: str | None = None,
725
        workdir: str | None = None,
726
        privileged: bool | None = None,
727
        labels: dict[str, str] | None = None,
728
        platform: DockerPlatform | None = None,
729
        ulimits: list[Ulimit] | None = None,
730
        init: bool | None = None,
731
        log_config: LogConfig | None = None,
732
    ) -> str:
733
        LOG.debug("Creating container with attributes: %s", locals())
1✔
734
        extra_hosts = None
1✔
735
        if additional_flags:
1✔
736
            parsed_flags = Util.parse_additional_flags(
1✔
737
                additional_flags,
738
                env_vars=env_vars,
739
                volumes=volumes,
740
                network=network,
741
                platform=platform,
742
                privileged=privileged,
743
                ports=ports,
744
                ulimits=ulimits,
745
                user=user,
746
                dns=dns,
747
            )
748
            env_vars = parsed_flags.env_vars
1✔
749
            extra_hosts = parsed_flags.extra_hosts
1✔
750
            volumes = parsed_flags.volumes
1✔
751
            labels = parsed_flags.labels
1✔
752
            network = parsed_flags.network
1✔
753
            platform = parsed_flags.platform
1✔
754
            privileged = parsed_flags.privileged
1✔
755
            ports = parsed_flags.ports
1✔
756
            ulimits = parsed_flags.ulimits
1✔
757
            user = parsed_flags.user
1✔
758
            dns = parsed_flags.dns
1✔
759

760
        try:
1✔
761
            kwargs = {}
1✔
762
            if cap_add:
1✔
763
                kwargs["cap_add"] = cap_add
1✔
764
            if cap_drop:
1✔
765
                kwargs["cap_drop"] = cap_drop
1✔
766
            if security_opt:
1✔
767
                kwargs["security_opt"] = security_opt
1✔
768
            if dns:
1✔
769
                kwargs["dns"] = ensure_list(dns)
1✔
770
            if exposed_ports:
1✔
771
                # This is not exactly identical to --expose, as they are listed in the "HostConfig" on docker inspect
772
                # but the behavior should be identical
773
                kwargs["ports"] = {port: [] for port in exposed_ports}
×
774
            if ports:
1✔
775
                kwargs.setdefault("ports", {})
1✔
776
                kwargs["ports"].update(ports.to_dict())
1✔
777
            if workdir:
1✔
778
                kwargs["working_dir"] = workdir
1✔
779
            if privileged:
1✔
780
                kwargs["privileged"] = True
×
781
            if init:
1✔
782
                kwargs["init"] = True
1✔
783
            if labels:
1✔
784
                kwargs["labels"] = labels
1✔
785
            if log_config:
1✔
786
                kwargs["log_config"] = DockerLogConfig(
1✔
787
                    type=log_config.type, config=log_config.config
788
                )
789
            if ulimits:
1✔
790
                kwargs["ulimits"] = [
1✔
791
                    docker.types.Ulimit(
792
                        name=ulimit.name, soft=ulimit.soft_limit, hard=ulimit.hard_limit
793
                    )
794
                    for ulimit in ulimits
795
                ]
796
            mounts = None
1✔
797
            if volumes:
1✔
798
                mounts = Util.convert_mount_list_to_dict(volumes)
1✔
799

800
            image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
801

802
            def create_container():
1✔
803
                return self.client().containers.create(
1✔
804
                    image=image_name,
805
                    command=command,
806
                    auto_remove=remove,
807
                    name=name,
808
                    stdin_open=interactive,
809
                    tty=tty,
810
                    entrypoint=entrypoint,
811
                    environment=env_vars,
812
                    detach=detach,
813
                    user=user,
814
                    network=network,
815
                    volumes=mounts,
816
                    extra_hosts=extra_hosts,
817
                    platform=platform,
818
                    **kwargs,
819
                )
820

821
            try:
1✔
822
                container = create_container()
1✔
823
            except ImageNotFound:
1✔
824
                LOG.debug("Image not found. Pulling image %s", image_name)
1✔
825
                self.pull_image(image_name, platform)
1✔
826
                container = create_container()
1✔
827
            return container.id
1✔
828
        except ImageNotFound:
1✔
829
            raise NoSuchImage(image_name)
×
830
        except APIError as e:
1✔
831
            raise ContainerException() from e
×
832

833
    def run_container(
1✔
834
        self,
835
        image_name: str,
836
        stdin=None,
837
        *,
838
        name: str | None = None,
839
        entrypoint: str | None = None,
840
        remove: bool = False,
841
        interactive: bool = False,
842
        tty: bool = False,
843
        detach: bool = False,
844
        command: list[str] | str | None = None,
845
        volumes: list[SimpleVolumeBind] | None = None,
846
        ports: PortMappings | None = None,
847
        exposed_ports: list[str] | None = None,
848
        env_vars: dict[str, str] | None = None,
849
        user: str | None = None,
850
        cap_add: list[str] | None = None,
851
        cap_drop: list[str] | None = None,
852
        security_opt: list[str] | None = None,
853
        network: str | None = None,
854
        dns: str | None = None,
855
        additional_flags: str | None = None,
856
        workdir: str | None = None,
857
        labels: dict[str, str] | None = None,
858
        platform: DockerPlatform | None = None,
859
        privileged: bool | None = None,
860
        ulimits: list[Ulimit] | None = None,
861
        init: bool | None = None,
862
        log_config: LogConfig | None = None,
863
    ) -> tuple[bytes, bytes]:
864
        LOG.debug("Running container with image: %s", image_name)
1✔
865
        container = None
1✔
866
        try:
1✔
867
            container = self.create_container(
1✔
868
                image_name,
869
                name=name,
870
                entrypoint=entrypoint,
871
                interactive=interactive,
872
                tty=tty,
873
                detach=detach,
874
                remove=remove and detach,
875
                command=command,
876
                volumes=volumes,
877
                ports=ports,
878
                exposed_ports=exposed_ports,
879
                env_vars=env_vars,
880
                user=user,
881
                cap_add=cap_add,
882
                cap_drop=cap_drop,
883
                security_opt=security_opt,
884
                network=network,
885
                dns=dns,
886
                additional_flags=additional_flags,
887
                workdir=workdir,
888
                privileged=privileged,
889
                platform=platform,
890
                init=init,
891
                labels=labels,
892
                ulimits=ulimits,
893
                log_config=log_config,
894
            )
895
            result = self.start_container(
1✔
896
                container_name_or_id=container,
897
                stdin=stdin,
898
                interactive=interactive,
899
                attach=not detach,
900
            )
901
        finally:
902
            if remove and container and not detach:
1✔
903
                self.remove_container(container)
1✔
904
        return result
1✔
905

906
    def exec_in_container(
1✔
907
        self,
908
        container_name_or_id: str,
909
        command: list[str] | str,
910
        interactive=False,
911
        detach=False,
912
        env_vars: dict[str, str | None] | None = None,
913
        stdin: bytes | None = None,
914
        user: str | None = None,
915
        workdir: str | None = None,
916
    ) -> tuple[bytes, bytes]:
917
        LOG.debug("Executing command in container %s: %s", container_name_or_id, command)
1✔
918
        try:
1✔
919
            container: Container = self.client().containers.get(container_name_or_id)
1✔
920
            result = container.exec_run(
1✔
921
                cmd=command,
922
                environment=env_vars,
923
                user=user,
924
                detach=detach,
925
                stdin=interactive and bool(stdin),
926
                socket=interactive and bool(stdin),
927
                stdout=True,
928
                stderr=True,
929
                demux=True,
930
                workdir=workdir,
931
            )
932
            tty = False
1✔
933
            if interactive and stdin:  # result is a socket
1✔
934
                sock = result[1]
1✔
935
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
936
                with sock:
1✔
937
                    try:
1✔
938
                        sock.sendall(stdin)
1✔
939
                        sock.shutdown(socket.SHUT_WR)
1✔
940
                        stdout, stderr = self._read_from_sock(sock, tty)
1✔
941
                        return stdout, stderr
1✔
942
                    except TimeoutError:
×
943
                        pass
×
944
            else:
945
                if detach:
1✔
946
                    return b"", b""
×
947
                return_code = result[0]
1✔
948
                if isinstance(result[1], bytes):
1✔
949
                    stdout = result[1]
×
950
                    stderr = b""
×
951
                else:
952
                    stdout, stderr = result[1]
1✔
953
                if return_code != 0:
1✔
954
                    raise ContainerException(
1✔
955
                        f"Exec command returned with exit code {return_code}", stdout, stderr
956
                    )
957
                return stdout, stderr
1✔
958
        except ContainerError:
1✔
959
            raise NoSuchContainer(container_name_or_id)
×
960
        except APIError as e:
1✔
961
            raise ContainerException() from e
1✔
962

963
    def login(self, username: str, password: str, registry: str | None = None) -> None:
1✔
964
        LOG.debug("Docker login for %s", username)
×
965
        try:
×
966
            self.client().login(username, password=password, registry=registry, reauth=True)
×
967
        except APIError as e:
×
968
            raise ContainerException() from e
×
969

970

971
# apply patches required for podman API compatibility
972

973

974
@property
1✔
975
def _container_image(self):
1✔
976
    image_id = self.attrs.get("ImageID", self.attrs["Image"])
1✔
977
    if image_id is None:
1✔
978
        return None
×
979
    image_ref = image_id
1✔
980
    # Fix for podman API response: Docker returns "sha:..." for `Image`, podman returns "<image-name>:<tag>".
981
    # See https://github.com/containers/podman/issues/8329 . Without this check, the Docker client would
982
    # blindly strip off the suffix after the colon `:` (which is the `<tag>` in podman's case) which would
983
    # then lead to "no such image" errors.
984
    if re.match("sha256:[0-9a-f]{64}", image_id, flags=re.IGNORECASE):
1✔
985
        image_ref = image_id.split(":")[1]
1✔
986
    return self.client.images.get(image_ref)
1✔
987

988

989
Container.image = _container_image
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc