• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18300225569

06 Oct 2025 04:03PM UTC coverage: 86.873% (-0.002%) from 86.875%
18300225569

push

github

web-flow
IaC: Update CatalogPlugin class and common catalog plugins logic (#13184)

Co-authored-by: Benjamin Simon <benjh.simon@gmail.com>

14 of 14 new or added lines in 2 files covered. (100.0%)

3 existing lines in 1 file now uncovered.

67818 of 78066 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.16
/localstack-core/localstack/utils/container_utils/docker_sdk_client.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import queue
1✔
6
import re
1✔
7
import socket
1✔
8
import threading
1✔
9
from functools import cache
1✔
10
from time import sleep
1✔
11
from typing import Callable, Optional, Union, cast
1✔
12
from urllib.parse import quote
1✔
13

14
import docker
1✔
15
from docker import DockerClient
1✔
16
from docker.errors import APIError, ContainerError, DockerException, ImageNotFound, NotFound
1✔
17
from docker.models.containers import Container
1✔
18
from docker.types import LogConfig as DockerLogConfig
1✔
19
from docker.utils.socket import STDERR, STDOUT, frames_iter
1✔
20

21
from localstack.config import LS_LOG
1✔
22
from localstack.constants import TRACE_LOG_LEVELS
1✔
23
from localstack.utils.collections import ensure_list
1✔
24
from localstack.utils.container_utils.container_client import (
1✔
25
    AccessDenied,
26
    CancellableStream,
27
    ContainerClient,
28
    ContainerException,
29
    DockerContainerStats,
30
    DockerContainerStatus,
31
    DockerNotAvailable,
32
    DockerPlatform,
33
    LogConfig,
34
    NoSuchContainer,
35
    NoSuchImage,
36
    NoSuchNetwork,
37
    PortMappings,
38
    RegistryConnectionError,
39
    SimpleVolumeBind,
40
    Ulimit,
41
    Util,
42
)
43
from localstack.utils.strings import to_bytes, to_str
1✔
44
from localstack.utils.threads import start_worker_thread
1✔
45

46
LOG = logging.getLogger(__name__)
1✔
47
SDK_ISDIR = 1 << 31
1✔
48

49

50
class SdkDockerClient(ContainerClient):
1✔
51
    """
52
    Class for managing Docker (or Podman) using the Python Docker SDK.
53

54
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
55
    for Docker these days (with ongoing efforts to further streamline the two), and the Docker SDK
56
    is doing some of the heavy lifting for us to support both target platforms.
57
    """
58

59
    docker_client: Optional[DockerClient]
1✔
60

61
    def __init__(self):
1✔
62
        try:
1✔
63
            self.docker_client = self._create_client()
1✔
64
            logging.getLogger("urllib3").setLevel(logging.INFO)
1✔
65
        except DockerNotAvailable:
1✔
66
            self.docker_client = None
1✔
67

68
    def client(self):
1✔
69
        if self.docker_client:
1✔
70
            return self.docker_client
1✔
71
        # if the initialization failed before, try to initialize on-demand
72
        self.docker_client = self._create_client()
1✔
73
        return self.docker_client
1✔
74

75
    @staticmethod
1✔
76
    def _create_client():
1✔
77
        from localstack.config import DOCKER_SDK_DEFAULT_RETRIES, DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS
1✔
78

79
        for attempt in range(0, DOCKER_SDK_DEFAULT_RETRIES + 1):
1✔
80
            try:
1✔
81
                return docker.from_env(timeout=DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS)
1✔
82
            except DockerException as e:
1✔
83
                LOG.debug(
1✔
84
                    "Creating Docker SDK client failed: %s. "
85
                    "If you want to use Docker as container runtime, make sure to mount the socket at /var/run/docker.sock",
86
                    e,
87
                    exc_info=LS_LOG in TRACE_LOG_LEVELS,
88
                )
89
                if attempt < DOCKER_SDK_DEFAULT_RETRIES:
1✔
90
                    # wait for a second before retrying
91
                    sleep(1)
1✔
92
                else:
93
                    # we are out of attempts
94
                    raise DockerNotAvailable("Docker not available") from e
1✔
95

96
    def _read_from_sock(self, sock: socket, tty: bool):
1✔
97
        """Reads multiplexed messages from a socket returned by attach_socket.
98

99
        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
100
        """
101
        stdout = b""
1✔
102
        stderr = b""
1✔
103
        for frame_type, frame_data in frames_iter(sock, tty):
1✔
104
            if frame_type == STDOUT:
1✔
105
                stdout += frame_data
1✔
106
            elif frame_type == STDERR:
1✔
107
                stderr += frame_data
1✔
108
            else:
109
                raise ContainerException("Invalid frame type when reading from socket")
×
110
        return stdout, stderr
1✔
111

112
    def _container_path_info(self, container: Container, container_path: str):
1✔
113
        """
114
        Get information about a path in the given container
115
        :param container: Container to be inspected
116
        :param container_path: Path in container
117
        :return: Tuple (path_exists, path_is_directory)
118
        """
119
        # Docker CLI copy uses go FileMode to determine if target is a dict or not
120
        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260
121
        # The isDir Bit is the most significant bit in the 32bit struct:
122
        # https://golang.org/src/os/types.go?s=2650:2683
123
        api_client = self.client().api
1✔
124

125
        def _head(path_suffix, **kwargs):
1✔
126
            return api_client.head(
1✔
127
                api_client.base_url + path_suffix, **api_client._set_request_timeout(kwargs)
128
            )
129

130
        escaped_id = quote(container.id, safe="/:")
1✔
131
        result = _head(f"/containers/{escaped_id}/archive", params={"path": container_path})
1✔
132
        stats = result.headers.get("X-Docker-Container-Path-Stat")
1✔
133
        target_exists = result.ok
1✔
134

135
        if target_exists:
1✔
136
            stats = json.loads(base64.b64decode(stats).decode("utf-8"))
1✔
137
        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)
1✔
138
        return target_exists, target_is_dir
1✔
139

140
    def get_system_info(self) -> dict:
1✔
141
        return self.client().info()
1✔
142

143
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
144
        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose
145
        try:
1✔
146
            container = self.client().containers.get(container_name)
1✔
147
            if container.status == "running":
1✔
148
                return DockerContainerStatus.UP
1✔
149
            elif container.status == "paused":
1✔
150
                return DockerContainerStatus.PAUSED
1✔
151
            else:
152
                return DockerContainerStatus.DOWN
1✔
153
        except NotFound:
1✔
154
            return DockerContainerStatus.NON_EXISTENT
1✔
155
        except APIError as e:
×
156
            raise ContainerException() from e
×
157

158
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
159
        try:
1✔
160
            container = self.client().containers.get(container_name)
1✔
161
            sdk_stats = container.stats(stream=False)
1✔
162

163
            # BlockIO: (Read, Write) bytes
164
            read_bytes = 0
1✔
165
            write_bytes = 0
1✔
166
            for entry in (
1✔
167
                sdk_stats.get("blkio_stats", {}).get("io_service_bytes_recursive", []) or []
168
            ):
169
                if entry.get("op") == "read":
×
170
                    read_bytes += entry.get("value", 0)
×
171
                elif entry.get("op") == "write":
×
172
                    write_bytes += entry.get("value", 0)
×
173

174
            # CPU percentage
175
            cpu_stats = sdk_stats.get("cpu_stats", {})
1✔
176
            precpu_stats = sdk_stats.get("precpu_stats", {})
1✔
177

178
            cpu_delta = cpu_stats.get("cpu_usage", {}).get("total_usage", 0) - precpu_stats.get(
1✔
179
                "cpu_usage", {}
180
            ).get("total_usage", 0)
181

182
            system_delta = cpu_stats.get("system_cpu_usage", 0) - precpu_stats.get(
1✔
183
                "system_cpu_usage", 0
184
            )
185

186
            online_cpus = cpu_stats.get("online_cpus", 1)
1✔
187
            cpu_percent = (
1✔
188
                (cpu_delta / system_delta * 100.0 * online_cpus) if system_delta > 0 else 0.0
189
            )
190

191
            # Memory (usage, limit) bytes
192
            memory_stats = sdk_stats.get("memory_stats", {})
1✔
193
            mem_usage = memory_stats.get("usage", 0)
1✔
194
            mem_limit = memory_stats.get("limit", 1)  # Prevent division by zero
1✔
195
            mem_inactive = memory_stats.get("stats", {}).get("inactive_file", 0)
1✔
196
            used_memory = max(0, mem_usage - mem_inactive)
1✔
197
            mem_percent = (used_memory / mem_limit * 100.0) if mem_limit else 0.0
1✔
198

199
            # Network IO
200
            net_rx = 0
1✔
201
            net_tx = 0
1✔
202
            for iface in sdk_stats.get("networks", {}).values():
1✔
203
                net_rx += iface.get("rx_bytes", 0)
1✔
204
                net_tx += iface.get("tx_bytes", 0)
1✔
205

206
            # Container ID
207
            container_id = sdk_stats.get("id", "")[:12]
1✔
208
            name = sdk_stats.get("name", "").lstrip("/")
1✔
209

210
            return DockerContainerStats(
1✔
211
                Container=container_id,
212
                ID=container_id,
213
                Name=name,
214
                BlockIO=(read_bytes, write_bytes),
215
                CPUPerc=round(cpu_percent, 2),
216
                MemPerc=round(mem_percent, 2),
217
                MemUsage=(used_memory, mem_limit),
218
                NetIO=(net_rx, net_tx),
219
                PIDs=sdk_stats.get("pids_stats", {}).get("current", 0),
220
                SDKStats=sdk_stats,  # keep the raw stats for more detailed information
221
            )
222
        except NotFound:
×
223
            raise NoSuchContainer(container_name)
×
224
        except APIError as e:
×
225
            raise ContainerException() from e
×
226

227
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
228
        LOG.debug("Stopping container: %s", container_name)
1✔
229
        try:
1✔
230
            container = self.client().containers.get(container_name)
1✔
231
            container.stop(timeout=timeout)
1✔
232
        except NotFound:
1✔
233
            raise NoSuchContainer(container_name)
1✔
234
        except APIError as e:
×
235
            raise ContainerException() from e
×
236

237
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
238
        LOG.debug("Restarting container: %s", container_name)
1✔
239
        try:
1✔
240
            container = self.client().containers.get(container_name)
1✔
241
            container.restart(timeout=timeout)
1✔
242
        except NotFound:
1✔
243
            raise NoSuchContainer(container_name)
1✔
244
        except APIError as e:
×
245
            raise ContainerException() from e
×
246

247
    def pause_container(self, container_name: str) -> None:
1✔
248
        LOG.debug("Pausing container: %s", container_name)
1✔
249
        try:
1✔
250
            container = self.client().containers.get(container_name)
1✔
251
            container.pause()
1✔
252
        except NotFound:
1✔
253
            raise NoSuchContainer(container_name)
1✔
254
        except APIError as e:
×
255
            raise ContainerException() from e
×
256

257
    def unpause_container(self, container_name: str) -> None:
1✔
258
        LOG.debug("Unpausing container: %s", container_name)
1✔
259
        try:
1✔
260
            container = self.client().containers.get(container_name)
1✔
261
            container.unpause()
1✔
262
        except NotFound:
×
263
            raise NoSuchContainer(container_name)
×
264
        except APIError as e:
×
265
            raise ContainerException() from e
×
266

267
    def remove_container(
1✔
268
        self, container_name: str, force=True, check_existence=False, volumes=False
269
    ) -> None:
270
        LOG.debug("Removing container: %s, with volumes: %s", container_name, volumes)
1✔
271
        if check_existence and container_name not in self.get_all_container_names():
1✔
272
            LOG.debug("Aborting removing due to check_existence check")
×
273
            return
×
274
        try:
1✔
275
            container = self.client().containers.get(container_name)
1✔
276
            container.remove(force=force, v=volumes)
1✔
277
        except NotFound:
1✔
278
            if not force:
1✔
279
                raise NoSuchContainer(container_name)
1✔
280
        except APIError as e:
×
281
            raise ContainerException() from e
×
282

283
    def list_containers(self, filter: Union[list[str], str, None] = None, all=True) -> list[dict]:
1✔
284
        if filter:
1✔
285
            filter = [filter] if isinstance(filter, str) else filter
1✔
286
            filter = dict([f.split("=", 1) for f in filter])
1✔
287
        LOG.debug("Listing containers with filters: %s", filter)
1✔
288
        try:
1✔
289
            container_list = self.client().containers.list(filters=filter, all=all)
1✔
290
            result = []
1✔
291
            for container in container_list:
1✔
292
                try:
1✔
293
                    result.append(
1✔
294
                        {
295
                            "id": container.id,
296
                            "image": container.image,
297
                            "name": container.name,
298
                            "status": container.status,
299
                            "labels": container.labels,
300
                        }
301
                    )
302
                except Exception as e:
×
303
                    LOG.error("Error checking container %s: %s", container, e)
×
304
            return result
1✔
305
        except APIError as e:
1✔
306
            raise ContainerException() from e
1✔
307

308
    def copy_into_container(
1✔
309
        self, container_name: str, local_path: str, container_path: str
310
    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/
311
        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)
1✔
312
        try:
1✔
313
            container = self.client().containers.get(container_name)
1✔
314
            target_exists, target_isdir = self._container_path_info(container, container_path)
1✔
315
            target_path = container_path if target_isdir else os.path.dirname(container_path)
1✔
316
            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:
1✔
317
                container.put_archive(target_path, tar)
1✔
318
        except NotFound:
1✔
319
            raise NoSuchContainer(container_name)
1✔
320
        except APIError as e:
1✔
321
            raise ContainerException() from e
×
322

323
    def copy_from_container(
1✔
324
        self,
325
        container_name: str,
326
        local_path: str,
327
        container_path: str,
328
    ) -> None:
329
        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)
1✔
330
        try:
1✔
331
            container = self.client().containers.get(container_name)
1✔
332
            bits, _ = container.get_archive(container_path)
1✔
333
            Util.untar_to_path(bits, local_path)
1✔
334
        except NotFound:
1✔
335
            raise NoSuchContainer(container_name)
1✔
336
        except APIError as e:
×
337
            raise ContainerException() from e
×
338

339
    def pull_image(
1✔
340
        self,
341
        docker_image: str,
342
        platform: Optional[DockerPlatform] = None,
343
        log_handler: Optional[Callable[[str], None]] = None,
344
    ) -> None:
345
        LOG.debug("Pulling Docker image: %s", docker_image)
1✔
346
        # some path in the docker image string indicates a custom repository
347

348
        docker_image = self.registry_resolver_strategy.resolve(docker_image)
1✔
349
        kwargs: dict[str, Union[str, bool]] = {"platform": platform}
1✔
350
        try:
1✔
351
            if log_handler:
1✔
352
                # Use a lower-level API, as the 'stream' argument is not available in the higher-level `pull`-API
353
                kwargs["stream"] = True
1✔
354
                stream = self.client().api.pull(docker_image, **kwargs)
1✔
355
                for line in stream:
1✔
356
                    log_handler(to_str(line))
1✔
357
            else:
358
                self.client().images.pull(docker_image, **kwargs)
1✔
359
        except ImageNotFound:
1✔
360
            raise NoSuchImage(docker_image)
1✔
361
        except APIError as e:
×
362
            raise ContainerException() from e
×
363

364
    def push_image(self, docker_image: str) -> None:
1✔
365
        LOG.debug("Pushing Docker image: %s", docker_image)
1✔
366
        try:
1✔
367
            result = self.client().images.push(docker_image)
1✔
368
            # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising
369
            if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):
1✔
370
                if "image does not exist locally" in to_str(result):
1✔
371
                    raise NoSuchImage(docker_image)
1✔
372
                if "is denied" in to_str(result):
1✔
373
                    raise AccessDenied(docker_image)
1✔
374
                if "requesting higher privileges than access token allows" in to_str(result):
1✔
375
                    raise AccessDenied(docker_image)
×
376
                if "access token has insufficient scopes" in to_str(result):
1✔
377
                    raise AccessDenied(docker_image)
×
378
                if "connection refused" in to_str(result):
1✔
379
                    raise RegistryConnectionError(result)
1✔
380
                raise ContainerException(result)
×
381
        except ImageNotFound:
1✔
382
            raise NoSuchImage(docker_image)
×
383
        except APIError as e:
1✔
384
            # note: error message 'image not known' raised by Podman API
385
            if "image not known" in str(e):
×
386
                raise NoSuchImage(docker_image)
×
387
            raise ContainerException() from e
×
388

389
    def build_image(
1✔
390
        self,
391
        dockerfile_path: str,
392
        image_name: str,
393
        context_path: str = None,
394
        platform: Optional[DockerPlatform] = None,
395
    ):
396
        try:
1✔
397
            dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
1✔
398
            context_path = context_path or os.path.dirname(dockerfile_path)
1✔
399
            LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)
1✔
400
            _, logs_iterator = self.client().images.build(
1✔
401
                path=context_path,
402
                dockerfile=dockerfile_path,
403
                tag=image_name,
404
                rm=True,
405
                platform=platform,
406
            )
407
            # logs_iterator is a stream of dicts. Example content:
408
            # {'stream': 'Step 1/4 : FROM alpine'}
409
            # ... other build steps
410
            # {'aux': {'ID': 'sha256:4dcf90e87fb963e898f9c7a0451a40e36f8e7137454c65ae4561277081747825'}}
411
            # {'stream': 'Successfully tagged img-5201f3e1:latest\n'}
412
            output = ""
1✔
413
            for log in logs_iterator:
1✔
414
                if isinstance(log, dict) and ("stream" in log or "error" in log):
1✔
415
                    output += log.get("stream") or log["error"]
1✔
416
            return output
1✔
417
        except APIError as e:
×
418
            raise ContainerException("Unable to build Docker image") from e
×
419

420
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
421
        try:
1✔
422
            LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)
1✔
423
            image = self.client().images.get(source_ref)
1✔
424
            image.tag(target_name)
1✔
425
        except APIError as e:
1✔
426
            if e.status_code == 404:
1✔
427
                raise NoSuchImage(source_ref)
1✔
428
            raise ContainerException("Unable to tag Docker image") from e
×
429

430
    def get_docker_image_names(
1✔
431
        self,
432
        strip_latest: bool = True,
433
        include_tags: bool = True,
434
        strip_wellknown_repo_prefixes: bool = True,
435
    ):
436
        try:
1✔
437
            images = self.client().images.list()
1✔
438
            image_names = [tag for image in images for tag in image.tags if image.tags]
1✔
439
            if not include_tags:
1✔
440
                image_names = [image_name.rpartition(":")[0] for image_name in image_names]
1✔
441
            if strip_wellknown_repo_prefixes:
1✔
442
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
1✔
443
            if strip_latest:
1✔
444
                Util.append_without_latest(image_names)
1✔
445
            return image_names
1✔
446
        except APIError as e:
×
447
            raise ContainerException() from e
×
448

449
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
450
        try:
1✔
451
            container = self.client().containers.get(container_name_or_id)
1✔
452
            return to_str(container.logs())
1✔
453
        except NotFound:
1✔
454
            if safe:
1✔
455
                return ""
1✔
456
            raise NoSuchContainer(container_name_or_id)
1✔
457
        except APIError as e:
1✔
458
            if safe:
1✔
459
                return ""
×
460
            raise ContainerException() from e
1✔
461

462
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
463
        try:
1✔
464
            container = self.client().containers.get(container_name_or_id)
1✔
465
            return container.logs(stream=True, follow=True)
1✔
466
        except NotFound:
1✔
467
            raise NoSuchContainer(container_name_or_id)
1✔
468
        except APIError as e:
×
469
            raise ContainerException() from e
×
470

471
    def inspect_container(self, container_name_or_id: str) -> dict[str, Union[dict, str]]:
1✔
472
        try:
1✔
473
            return self.client().containers.get(container_name_or_id).attrs
1✔
474
        except NotFound:
1✔
475
            raise NoSuchContainer(container_name_or_id)
1✔
476
        except APIError as e:
×
477
            raise ContainerException() from e
×
478

479
    def inspect_image(
1✔
480
        self,
481
        image_name: str,
482
        pull: bool = True,
483
        strip_wellknown_repo_prefixes: bool = True,
484
    ) -> dict[str, Union[dict, list, str]]:
485
        image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
486
        try:
1✔
487
            result = self.client().images.get(image_name).attrs
1✔
488
            if strip_wellknown_repo_prefixes:
1✔
489
                if result.get("RepoDigests"):
1✔
490
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
1✔
491
                        result["RepoDigests"]
492
                    )
493
                if result.get("RepoTags"):
1✔
494
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
1✔
495
            return result
1✔
496
        except NotFound:
1✔
497
            if pull:
1✔
498
                self.pull_image(image_name)
1✔
499
                return self.inspect_image(image_name, pull=False)
1✔
500
            raise NoSuchImage(image_name)
1✔
501
        except APIError as e:
×
502
            raise ContainerException() from e
×
503

504
    def create_network(self, network_name: str) -> None:
1✔
505
        try:
1✔
506
            return self.client().networks.create(name=network_name).id
1✔
507
        except APIError as e:
×
508
            raise ContainerException() from e
×
509

510
    def delete_network(self, network_name: str) -> None:
1✔
511
        try:
1✔
512
            return self.client().networks.get(network_name).remove()
1✔
UNCOV
513
        except NotFound:
×
514
            raise NoSuchNetwork(network_name)
×
UNCOV
515
        except APIError as e:
×
UNCOV
516
            raise ContainerException() from e
×
517

518
    def inspect_network(self, network_name: str) -> dict[str, Union[dict, str]]:
1✔
519
        try:
1✔
520
            return self.client().networks.get(network_name).attrs
1✔
521
        except NotFound:
1✔
522
            raise NoSuchNetwork(network_name)
1✔
523
        except APIError as e:
×
524
            raise ContainerException() from e
×
525

526
    def connect_container_to_network(
1✔
527
        self,
528
        network_name: str,
529
        container_name_or_id: str,
530
        aliases: Optional[list] = None,
531
        link_local_ips: list[str] = None,
532
    ) -> None:
533
        LOG.debug(
1✔
534
            "Connecting container '%s' to network '%s' with aliases '%s'",
535
            container_name_or_id,
536
            network_name,
537
            aliases,
538
        )
539
        try:
1✔
540
            network = self.client().networks.get(network_name)
1✔
541
        except NotFound:
1✔
542
            raise NoSuchNetwork(network_name)
1✔
543
        try:
1✔
544
            network.connect(
1✔
545
                container=container_name_or_id,
546
                aliases=aliases,
547
                link_local_ips=link_local_ips,
548
            )
549
        except NotFound:
1✔
550
            raise NoSuchContainer(container_name_or_id)
1✔
551
        except APIError as e:
×
552
            raise ContainerException() from e
×
553

554
    def disconnect_container_from_network(
1✔
555
        self, network_name: str, container_name_or_id: str
556
    ) -> None:
557
        LOG.debug(
1✔
558
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
559
        )
560
        try:
1✔
561
            try:
1✔
562
                network = self.client().networks.get(network_name)
1✔
563
            except NotFound:
1✔
564
                raise NoSuchNetwork(network_name)
1✔
565
            try:
1✔
566
                network.disconnect(container_name_or_id)
1✔
567
            except NotFound:
1✔
568
                raise NoSuchContainer(container_name_or_id)
1✔
569
        except APIError as e:
1✔
570
            raise ContainerException() from e
×
571

572
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
573
        networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]
1✔
574
        network_names = list(networks)
1✔
575
        if len(network_names) > 1:
1✔
576
            LOG.info("Container has more than one assigned network. Picking the first one...")
×
577
        return networks[network_names[0]]["IPAddress"]
1✔
578

579
    @cache
1✔
580
    def has_docker(self) -> bool:
1✔
581
        try:
1✔
582
            if not self.docker_client:
1✔
583
                return False
×
584
            self.client().ping()
1✔
585
            return True
1✔
586
        except APIError:
×
587
            return False
×
588

589
    def remove_image(self, image: str, force: bool = True):
1✔
590
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
1✔
591
        try:
1✔
592
            self.client().images.remove(image=image, force=force)
1✔
593
        except ImageNotFound:
1✔
594
            if not force:
1✔
595
                raise NoSuchImage(image)
1✔
596
        except APIError as e:
×
597
            if "image not known" in str(e):
×
598
                raise NoSuchImage(image)
×
599
            raise ContainerException() from e
×
600

601
    def commit(
1✔
602
        self,
603
        container_name_or_id: str,
604
        image_name: str,
605
        image_tag: str,
606
    ):
607
        LOG.debug(
1✔
608
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
609
        )
610
        try:
1✔
611
            container = self.client().containers.get(container_name_or_id)
1✔
612
            container.commit(repository=image_name, tag=image_tag)
1✔
613
        except NotFound:
1✔
614
            raise NoSuchContainer(container_name_or_id)
1✔
615
        except APIError as e:
×
616
            raise ContainerException() from e
×
617

618
    def start_container(
1✔
619
        self,
620
        container_name_or_id: str,
621
        stdin=None,
622
        interactive: bool = False,
623
        attach: bool = False,
624
        flags: Optional[str] = None,
625
    ) -> tuple[bytes, bytes]:
626
        LOG.debug("Starting container %s", container_name_or_id)
1✔
627
        try:
1✔
628
            container = self.client().containers.get(container_name_or_id)
1✔
629
            stdout = to_bytes(container_name_or_id)
1✔
630
            stderr = b""
1✔
631
            if interactive or attach:
1✔
632
                params = {"stdout": 1, "stderr": 1, "stream": 1}
1✔
633
                if interactive:
1✔
634
                    params["stdin"] = 1
1✔
635
                sock = container.attach_socket(params=params)
1✔
636
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
637
                result_queue = queue.Queue()
1✔
638
                thread_started = threading.Event()
1✔
639
                start_waiting = threading.Event()
1✔
640

641
                # Note: We need to be careful about potential race conditions here - .wait() should happen right
642
                #   after .start(). Hence starting a thread and asynchronously waiting for the container exit code
643
                def wait_for_result(*_):
1✔
644
                    _exit_code = -1
1✔
645
                    try:
1✔
646
                        thread_started.set()
1✔
647
                        start_waiting.wait()
1✔
648
                        _exit_code = container.wait()["StatusCode"]
1✔
649
                    except APIError as e:
×
650
                        _exit_code = 1
×
651
                        raise ContainerException(str(e))
×
652
                    finally:
653
                        result_queue.put(_exit_code)
1✔
654

655
                # start listener thread
656
                start_worker_thread(wait_for_result)
1✔
657
                thread_started.wait()
1✔
658
                try:
1✔
659
                    # start container
660
                    container.start()
1✔
661
                finally:
662
                    # start awaiting container result
663
                    start_waiting.set()
1✔
664

665
                # handle container input/output
666
                # under windows, the socket has no __enter__ / cannot be used as context manager
667
                # therefore try/finally instead of with here
668
                try:
1✔
669
                    if stdin:
1✔
670
                        sock.sendall(to_bytes(stdin))
1✔
671
                        sock.shutdown(socket.SHUT_WR)
1✔
672
                    stdout, stderr = self._read_from_sock(sock, False)
1✔
673
                except socket.timeout:
×
674
                    LOG.debug(
×
675
                        "Socket timeout when talking to the I/O streams of Docker container '%s'",
676
                        container_name_or_id,
677
                    )
678
                finally:
679
                    sock.close()
1✔
680

681
                # get container exit code
682
                exit_code = result_queue.get()
1✔
683
                if exit_code:
1✔
684
                    raise ContainerException(
1✔
685
                        f"Docker container returned with exit code {exit_code}",
686
                        stdout=stdout,
687
                        stderr=stderr,
688
                    )
689
            else:
690
                container.start()
1✔
691
            return stdout, stderr
1✔
692
        except NotFound:
1✔
693
            raise NoSuchContainer(container_name_or_id)
1✔
694
        except APIError as e:
1✔
695
            raise ContainerException() from e
1✔
696

697
    def attach_to_container(self, container_name_or_id: str):
1✔
698
        client: DockerClient = self.client()
×
699
        container = cast(Container, client.containers.get(container_name_or_id))
×
700
        container.attach()
×
701

702
    def create_container(
1✔
703
        self,
704
        image_name: str,
705
        *,
706
        name: Optional[str] = None,
707
        entrypoint: Optional[Union[list[str], str]] = None,
708
        remove: bool = False,
709
        interactive: bool = False,
710
        tty: bool = False,
711
        detach: bool = False,
712
        command: Optional[Union[list[str], str]] = None,
713
        volumes: Optional[list[SimpleVolumeBind]] = None,
714
        ports: Optional[PortMappings] = None,
715
        exposed_ports: Optional[list[str]] = None,
716
        env_vars: Optional[dict[str, str]] = None,
717
        user: Optional[str] = None,
718
        cap_add: Optional[list[str]] = None,
719
        cap_drop: Optional[list[str]] = None,
720
        security_opt: Optional[list[str]] = None,
721
        network: Optional[str] = None,
722
        dns: Optional[Union[str, list[str]]] = None,
723
        additional_flags: Optional[str] = None,
724
        workdir: Optional[str] = None,
725
        privileged: Optional[bool] = None,
726
        labels: Optional[dict[str, str]] = None,
727
        platform: Optional[DockerPlatform] = None,
728
        ulimits: Optional[list[Ulimit]] = None,
729
        init: Optional[bool] = None,
730
        log_config: Optional[LogConfig] = None,
731
    ) -> str:
732
        LOG.debug("Creating container with attributes: %s", locals())
1✔
733
        extra_hosts = None
1✔
734
        if additional_flags:
1✔
735
            parsed_flags = Util.parse_additional_flags(
1✔
736
                additional_flags,
737
                env_vars=env_vars,
738
                volumes=volumes,
739
                network=network,
740
                platform=platform,
741
                privileged=privileged,
742
                ports=ports,
743
                ulimits=ulimits,
744
                user=user,
745
                dns=dns,
746
            )
747
            env_vars = parsed_flags.env_vars
1✔
748
            extra_hosts = parsed_flags.extra_hosts
1✔
749
            volumes = parsed_flags.volumes
1✔
750
            labels = parsed_flags.labels
1✔
751
            network = parsed_flags.network
1✔
752
            platform = parsed_flags.platform
1✔
753
            privileged = parsed_flags.privileged
1✔
754
            ports = parsed_flags.ports
1✔
755
            ulimits = parsed_flags.ulimits
1✔
756
            user = parsed_flags.user
1✔
757
            dns = parsed_flags.dns
1✔
758

759
        try:
1✔
760
            kwargs = {}
1✔
761
            if cap_add:
1✔
762
                kwargs["cap_add"] = cap_add
1✔
763
            if cap_drop:
1✔
764
                kwargs["cap_drop"] = cap_drop
1✔
765
            if security_opt:
1✔
766
                kwargs["security_opt"] = security_opt
1✔
767
            if dns:
1✔
768
                kwargs["dns"] = ensure_list(dns)
1✔
769
            if exposed_ports:
1✔
770
                # This is not exactly identical to --expose, as they are listed in the "HostConfig" on docker inspect
771
                # but the behavior should be identical
772
                kwargs["ports"] = {port: [] for port in exposed_ports}
×
773
            if ports:
1✔
774
                kwargs.setdefault("ports", {})
1✔
775
                kwargs["ports"].update(ports.to_dict())
1✔
776
            if workdir:
1✔
777
                kwargs["working_dir"] = workdir
1✔
778
            if privileged:
1✔
779
                kwargs["privileged"] = True
×
780
            if init:
1✔
781
                kwargs["init"] = True
1✔
782
            if labels:
1✔
783
                kwargs["labels"] = labels
1✔
784
            if log_config:
1✔
785
                kwargs["log_config"] = DockerLogConfig(
1✔
786
                    type=log_config.type, config=log_config.config
787
                )
788
            if ulimits:
1✔
789
                kwargs["ulimits"] = [
1✔
790
                    docker.types.Ulimit(
791
                        name=ulimit.name, soft=ulimit.soft_limit, hard=ulimit.hard_limit
792
                    )
793
                    for ulimit in ulimits
794
                ]
795
            mounts = None
1✔
796
            if volumes:
1✔
797
                mounts = Util.convert_mount_list_to_dict(volumes)
1✔
798

799
            image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
800

801
            def create_container():
1✔
802
                return self.client().containers.create(
1✔
803
                    image=image_name,
804
                    command=command,
805
                    auto_remove=remove,
806
                    name=name,
807
                    stdin_open=interactive,
808
                    tty=tty,
809
                    entrypoint=entrypoint,
810
                    environment=env_vars,
811
                    detach=detach,
812
                    user=user,
813
                    network=network,
814
                    volumes=mounts,
815
                    extra_hosts=extra_hosts,
816
                    platform=platform,
817
                    **kwargs,
818
                )
819

820
            try:
1✔
821
                container = create_container()
1✔
822
            except ImageNotFound:
1✔
823
                LOG.debug("Image not found. Pulling image %s", image_name)
1✔
824
                self.pull_image(image_name, platform)
1✔
825
                container = create_container()
1✔
826
            return container.id
1✔
827
        except ImageNotFound:
1✔
828
            raise NoSuchImage(image_name)
×
829
        except APIError as e:
1✔
830
            raise ContainerException() from e
×
831

832
    def run_container(
1✔
833
        self,
834
        image_name: str,
835
        stdin=None,
836
        *,
837
        name: Optional[str] = None,
838
        entrypoint: Optional[str] = None,
839
        remove: bool = False,
840
        interactive: bool = False,
841
        tty: bool = False,
842
        detach: bool = False,
843
        command: Optional[Union[list[str], str]] = None,
844
        volumes: Optional[list[SimpleVolumeBind]] = None,
845
        ports: Optional[PortMappings] = None,
846
        exposed_ports: Optional[list[str]] = None,
847
        env_vars: Optional[dict[str, str]] = None,
848
        user: Optional[str] = None,
849
        cap_add: Optional[list[str]] = None,
850
        cap_drop: Optional[list[str]] = None,
851
        security_opt: Optional[list[str]] = None,
852
        network: Optional[str] = None,
853
        dns: Optional[str] = None,
854
        additional_flags: Optional[str] = None,
855
        workdir: Optional[str] = None,
856
        labels: Optional[dict[str, str]] = None,
857
        platform: Optional[DockerPlatform] = None,
858
        privileged: Optional[bool] = None,
859
        ulimits: Optional[list[Ulimit]] = None,
860
        init: Optional[bool] = None,
861
        log_config: Optional[LogConfig] = None,
862
    ) -> tuple[bytes, bytes]:
863
        LOG.debug("Running container with image: %s", image_name)
1✔
864
        container = None
1✔
865
        try:
1✔
866
            container = self.create_container(
1✔
867
                image_name,
868
                name=name,
869
                entrypoint=entrypoint,
870
                interactive=interactive,
871
                tty=tty,
872
                detach=detach,
873
                remove=remove and detach,
874
                command=command,
875
                volumes=volumes,
876
                ports=ports,
877
                exposed_ports=exposed_ports,
878
                env_vars=env_vars,
879
                user=user,
880
                cap_add=cap_add,
881
                cap_drop=cap_drop,
882
                security_opt=security_opt,
883
                network=network,
884
                dns=dns,
885
                additional_flags=additional_flags,
886
                workdir=workdir,
887
                privileged=privileged,
888
                platform=platform,
889
                init=init,
890
                labels=labels,
891
                ulimits=ulimits,
892
                log_config=log_config,
893
            )
894
            result = self.start_container(
1✔
895
                container_name_or_id=container,
896
                stdin=stdin,
897
                interactive=interactive,
898
                attach=not detach,
899
            )
900
        finally:
901
            if remove and container and not detach:
1✔
902
                self.remove_container(container)
1✔
903
        return result
1✔
904

905
    def exec_in_container(
1✔
906
        self,
907
        container_name_or_id: str,
908
        command: Union[list[str], str],
909
        interactive=False,
910
        detach=False,
911
        env_vars: Optional[dict[str, Optional[str]]] = None,
912
        stdin: Optional[bytes] = None,
913
        user: Optional[str] = None,
914
        workdir: Optional[str] = None,
915
    ) -> tuple[bytes, bytes]:
916
        LOG.debug("Executing command in container %s: %s", container_name_or_id, command)
1✔
917
        try:
1✔
918
            container: Container = self.client().containers.get(container_name_or_id)
1✔
919
            result = container.exec_run(
1✔
920
                cmd=command,
921
                environment=env_vars,
922
                user=user,
923
                detach=detach,
924
                stdin=interactive and bool(stdin),
925
                socket=interactive and bool(stdin),
926
                stdout=True,
927
                stderr=True,
928
                demux=True,
929
                workdir=workdir,
930
            )
931
            tty = False
1✔
932
            if interactive and stdin:  # result is a socket
1✔
933
                sock = result[1]
1✔
934
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
935
                with sock:
1✔
936
                    try:
1✔
937
                        sock.sendall(stdin)
1✔
938
                        sock.shutdown(socket.SHUT_WR)
1✔
939
                        stdout, stderr = self._read_from_sock(sock, tty)
1✔
940
                        return stdout, stderr
1✔
941
                    except socket.timeout:
×
942
                        pass
×
943
            else:
944
                if detach:
1✔
945
                    return b"", b""
×
946
                return_code = result[0]
1✔
947
                if isinstance(result[1], bytes):
1✔
948
                    stdout = result[1]
×
949
                    stderr = b""
×
950
                else:
951
                    stdout, stderr = result[1]
1✔
952
                if return_code != 0:
1✔
953
                    raise ContainerException(
1✔
954
                        f"Exec command returned with exit code {return_code}", stdout, stderr
955
                    )
956
                return stdout, stderr
1✔
957
        except ContainerError:
1✔
958
            raise NoSuchContainer(container_name_or_id)
×
959
        except APIError as e:
1✔
960
            raise ContainerException() from e
1✔
961

962
    def login(self, username: str, password: str, registry: Optional[str] = None) -> None:
1✔
963
        LOG.debug("Docker login for %s", username)
×
964
        try:
×
965
            self.client().login(username, password=password, registry=registry, reauth=True)
×
966
        except APIError as e:
×
967
            raise ContainerException() from e
×
968

969

970
# apply patches required for podman API compatibility
971

972

973
@property
1✔
974
def _container_image(self):
1✔
975
    image_id = self.attrs.get("ImageID", self.attrs["Image"])
1✔
976
    if image_id is None:
1✔
977
        return None
×
978
    image_ref = image_id
1✔
979
    # Fix for podman API response: Docker returns "sha:..." for `Image`, podman returns "<image-name>:<tag>".
980
    # See https://github.com/containers/podman/issues/8329 . Without this check, the Docker client would
981
    # blindly strip off the suffix after the colon `:` (which is the `<tag>` in podman's case) which would
982
    # then lead to "no such image" errors.
983
    if re.match("sha256:[0-9a-f]{64}", image_id, flags=re.IGNORECASE):
1✔
984
        image_ref = image_id.split(":")[1]
1✔
985
    return self.client.images.get(image_ref)
1✔
986

987

988
Container.image = _container_image
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc