• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 65ebfa7b-38ab-49bb-b71e-44800576a11c

14 Mar 2025 12:37AM UTC coverage: 86.954% (+0.02%) from 86.93%
65ebfa7b-38ab-49bb-b71e-44800576a11c

push

circleci

web-flow
Fix apigw input path formatting (#12379)

43 of 49 new or added lines in 2 files covered. (87.76%)

103 existing lines in 16 files now uncovered.

62313 of 71662 relevant lines covered (86.95%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.91
/localstack-core/localstack/utils/container_utils/docker_sdk_client.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import queue
1✔
6
import re
1✔
7
import socket
1✔
8
import threading
1✔
9
from functools import lru_cache
1✔
10
from time import sleep
1✔
11
from typing import Dict, List, Optional, Tuple, Union, cast
1✔
12
from urllib.parse import quote
1✔
13

14
import docker
1✔
15
from docker import DockerClient
1✔
16
from docker.errors import APIError, ContainerError, DockerException, ImageNotFound, NotFound
1✔
17
from docker.models.containers import Container
1✔
18
from docker.types import LogConfig as DockerLogConfig
1✔
19
from docker.utils.socket import STDERR, STDOUT, frames_iter
1✔
20

21
from localstack.config import LS_LOG
1✔
22
from localstack.constants import TRACE_LOG_LEVELS
1✔
23
from localstack.utils.collections import ensure_list
1✔
24
from localstack.utils.container_utils.container_client import (
1✔
25
    AccessDenied,
26
    CancellableStream,
27
    ContainerClient,
28
    ContainerException,
29
    DockerContainerStats,
30
    DockerContainerStatus,
31
    DockerNotAvailable,
32
    DockerPlatform,
33
    LogConfig,
34
    NoSuchContainer,
35
    NoSuchImage,
36
    NoSuchNetwork,
37
    PortMappings,
38
    RegistryConnectionError,
39
    SimpleVolumeBind,
40
    Ulimit,
41
    Util,
42
)
43
from localstack.utils.strings import to_bytes, to_str
1✔
44
from localstack.utils.threads import start_worker_thread
1✔
45

46
LOG = logging.getLogger(__name__)
1✔
47
SDK_ISDIR = 1 << 31
1✔
48

49

50
class SdkDockerClient(ContainerClient):
1✔
51
    """
52
    Class for managing Docker (or Podman) using the Python Docker SDK.
53

54
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
55
    for Docker these days (with ongoing efforts to further streamline the two), and the Docker SDK
56
    is doing some of the heavy lifting for us to support both target platforms.
57
    """
58

59
    docker_client: Optional[DockerClient]
1✔
60

61
    def __init__(self):
1✔
62
        try:
1✔
63
            self.docker_client = self._create_client()
1✔
64
            logging.getLogger("urllib3").setLevel(logging.INFO)
1✔
65
        except DockerNotAvailable:
1✔
66
            self.docker_client = None
1✔
67

68
    def client(self):
1✔
69
        if self.docker_client:
1✔
70
            return self.docker_client
1✔
71
        # if the initialization failed before, try to initialize on-demand
72
        self.docker_client = self._create_client()
1✔
73
        return self.docker_client
1✔
74

75
    @staticmethod
1✔
76
    def _create_client():
1✔
77
        from localstack.config import DOCKER_SDK_DEFAULT_RETRIES, DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS
1✔
78

79
        for attempt in range(0, DOCKER_SDK_DEFAULT_RETRIES + 1):
1✔
80
            try:
1✔
81
                return docker.from_env(timeout=DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS)
1✔
82
            except DockerException as e:
1✔
83
                LOG.debug(
1✔
84
                    "Creating Docker SDK client failed: %s. "
85
                    "If you want to use Docker as container runtime, make sure to mount the socket at /var/run/docker.sock",
86
                    e,
87
                    exc_info=LS_LOG in TRACE_LOG_LEVELS,
88
                )
89
                if attempt < DOCKER_SDK_DEFAULT_RETRIES:
1✔
90
                    # wait for a second before retrying
91
                    sleep(1)
1✔
92
                else:
93
                    # we are out of attempts
94
                    raise DockerNotAvailable("Docker not available") from e
1✔
95

96
    def _read_from_sock(self, sock: socket, tty: bool):
1✔
97
        """Reads multiplexed messages from a socket returned by attach_socket.
98

99
        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
100
        """
101
        stdout = b""
1✔
102
        stderr = b""
1✔
103
        for frame_type, frame_data in frames_iter(sock, tty):
1✔
104
            if frame_type == STDOUT:
1✔
105
                stdout += frame_data
1✔
106
            elif frame_type == STDERR:
1✔
107
                stderr += frame_data
1✔
108
            else:
109
                raise ContainerException("Invalid frame type when reading from socket")
×
110
        return stdout, stderr
1✔
111

112
    def _container_path_info(self, container: Container, container_path: str):
1✔
113
        """
114
        Get information about a path in the given container
115
        :param container: Container to be inspected
116
        :param container_path: Path in container
117
        :return: Tuple (path_exists, path_is_directory)
118
        """
119
        # Docker CLI copy uses go FileMode to determine if target is a dict or not
120
        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260
121
        # The isDir Bit is the most significant bit in the 32bit struct:
122
        # https://golang.org/src/os/types.go?s=2650:2683
123
        api_client = self.client().api
1✔
124

125
        def _head(path_suffix, **kwargs):
1✔
126
            return api_client.head(
1✔
127
                api_client.base_url + path_suffix, **api_client._set_request_timeout(kwargs)
128
            )
129

130
        escaped_id = quote(container.id, safe="/:")
1✔
131
        result = _head(f"/containers/{escaped_id}/archive", params={"path": container_path})
1✔
132
        stats = result.headers.get("X-Docker-Container-Path-Stat")
1✔
133
        target_exists = result.ok
1✔
134

135
        if target_exists:
1✔
136
            stats = json.loads(base64.b64decode(stats).decode("utf-8"))
1✔
137
        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)
1✔
138
        return target_exists, target_is_dir
1✔
139

140
    def get_system_info(self) -> dict:
1✔
141
        return self.client().info()
1✔
142

143
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
144
        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose
145
        try:
1✔
146
            container = self.client().containers.get(container_name)
1✔
147
            if container.status == "running":
1✔
148
                return DockerContainerStatus.UP
1✔
149
            elif container.status == "paused":
1✔
150
                return DockerContainerStatus.PAUSED
1✔
151
            else:
152
                return DockerContainerStatus.DOWN
1✔
153
        except NotFound:
1✔
154
            return DockerContainerStatus.NON_EXISTENT
1✔
155
        except APIError as e:
×
156
            raise ContainerException() from e
×
157

158
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
159
        try:
1✔
160
            container = self.client().containers.get(container_name)
1✔
161
            sdk_stats = container.stats(stream=False)
1✔
162

163
            # BlockIO: (Read, Write) bytes
164
            read_bytes = 0
1✔
165
            write_bytes = 0
1✔
166
            for entry in (
1✔
167
                sdk_stats.get("blkio_stats", {}).get("io_service_bytes_recursive", []) or []
168
            ):
169
                if entry.get("op") == "read":
×
170
                    read_bytes += entry.get("value", 0)
×
171
                elif entry.get("op") == "write":
×
172
                    write_bytes += entry.get("value", 0)
×
173

174
            # CPU percentage
175
            cpu_stats = sdk_stats.get("cpu_stats", {})
1✔
176
            precpu_stats = sdk_stats.get("precpu_stats", {})
1✔
177

178
            cpu_delta = cpu_stats.get("cpu_usage", {}).get("total_usage", 0) - precpu_stats.get(
1✔
179
                "cpu_usage", {}
180
            ).get("total_usage", 0)
181

182
            system_delta = cpu_stats.get("system_cpu_usage", 0) - precpu_stats.get(
1✔
183
                "system_cpu_usage", 0
184
            )
185

186
            online_cpus = cpu_stats.get("online_cpus", 1)
1✔
187
            cpu_percent = (
1✔
188
                (cpu_delta / system_delta * 100.0 * online_cpus) if system_delta > 0 else 0.0
189
            )
190

191
            # Memory (usage, limit) bytes
192
            memory_stats = sdk_stats.get("memory_stats", {})
1✔
193
            mem_usage = memory_stats.get("usage", 0)
1✔
194
            mem_limit = memory_stats.get("limit", 1)  # Prevent division by zero
1✔
195
            mem_inactive = memory_stats.get("stats", {}).get("inactive_file", 0)
1✔
196
            used_memory = max(0, mem_usage - mem_inactive)
1✔
197
            mem_percent = (used_memory / mem_limit * 100.0) if mem_limit else 0.0
1✔
198

199
            # Network IO
200
            net_rx = 0
1✔
201
            net_tx = 0
1✔
202
            for iface in sdk_stats.get("networks", {}).values():
1✔
203
                net_rx += iface.get("rx_bytes", 0)
1✔
204
                net_tx += iface.get("tx_bytes", 0)
1✔
205

206
            # Container ID
207
            container_id = sdk_stats.get("id", "")[:12]
1✔
208
            name = sdk_stats.get("name", "").lstrip("/")
1✔
209

210
            return DockerContainerStats(
1✔
211
                Container=container_id,
212
                ID=container_id,
213
                Name=name,
214
                BlockIO=(read_bytes, write_bytes),
215
                CPUPerc=round(cpu_percent, 2),
216
                MemPerc=round(mem_percent, 2),
217
                MemUsage=(used_memory, mem_limit),
218
                NetIO=(net_rx, net_tx),
219
                PIDs=sdk_stats.get("pids_stats", {}).get("current", 0),
220
                SDKStats=sdk_stats,  # keep the raw stats for more detailed information
221
            )
222
        except NotFound:
×
223
            raise NoSuchContainer(container_name)
×
224
        except APIError as e:
×
225
            raise ContainerException() from e
×
226

227
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
228
        LOG.debug("Stopping container: %s", container_name)
1✔
229
        try:
1✔
230
            container = self.client().containers.get(container_name)
1✔
231
            container.stop(timeout=timeout)
1✔
232
        except NotFound:
1✔
233
            raise NoSuchContainer(container_name)
1✔
234
        except APIError as e:
×
235
            raise ContainerException() from e
×
236

237
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
238
        LOG.debug("Restarting container: %s", container_name)
1✔
239
        try:
1✔
240
            container = self.client().containers.get(container_name)
1✔
241
            container.restart(timeout=timeout)
1✔
242
        except NotFound:
1✔
243
            raise NoSuchContainer(container_name)
1✔
244
        except APIError as e:
×
245
            raise ContainerException() from e
×
246

247
    def pause_container(self, container_name: str) -> None:
1✔
248
        LOG.debug("Pausing container: %s", container_name)
1✔
249
        try:
1✔
250
            container = self.client().containers.get(container_name)
1✔
251
            container.pause()
1✔
252
        except NotFound:
1✔
253
            raise NoSuchContainer(container_name)
1✔
254
        except APIError as e:
×
255
            raise ContainerException() from e
×
256

257
    def unpause_container(self, container_name: str) -> None:
1✔
258
        LOG.debug("Unpausing container: %s", container_name)
1✔
259
        try:
1✔
260
            container = self.client().containers.get(container_name)
1✔
261
            container.unpause()
1✔
262
        except NotFound:
×
263
            raise NoSuchContainer(container_name)
×
264
        except APIError as e:
×
265
            raise ContainerException() from e
×
266

267
    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:
1✔
268
        LOG.debug("Removing container: %s", container_name)
1✔
269
        if check_existence and container_name not in self.get_running_container_names():
1✔
270
            LOG.debug("Aborting removing due to check_existence check")
×
271
            return
×
272
        try:
1✔
273
            container = self.client().containers.get(container_name)
1✔
274
            container.remove(force=force)
1✔
275
        except NotFound:
1✔
276
            if not force:
1✔
277
                raise NoSuchContainer(container_name)
1✔
278
        except APIError as e:
×
279
            raise ContainerException() from e
×
280

281
    def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:
1✔
282
        if filter:
1✔
283
            filter = [filter] if isinstance(filter, str) else filter
1✔
284
            filter = dict([f.split("=", 1) for f in filter])
1✔
285
        LOG.debug("Listing containers with filters: %s", filter)
1✔
286
        try:
1✔
287
            container_list = self.client().containers.list(filters=filter, all=all)
1✔
288
            result = []
1✔
289
            for container in container_list:
1✔
290
                try:
1✔
291
                    result.append(
1✔
292
                        {
293
                            "id": container.id,
294
                            "image": container.image,
295
                            "name": container.name,
296
                            "status": container.status,
297
                            "labels": container.labels,
298
                        }
299
                    )
300
                except Exception as e:
×
301
                    LOG.error("Error checking container %s: %s", container, e)
×
302
            return result
1✔
303
        except APIError as e:
1✔
304
            raise ContainerException() from e
1✔
305

306
    def copy_into_container(
1✔
307
        self, container_name: str, local_path: str, container_path: str
308
    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/
309
        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)
1✔
310
        try:
1✔
311
            container = self.client().containers.get(container_name)
1✔
312
            target_exists, target_isdir = self._container_path_info(container, container_path)
1✔
313
            target_path = container_path if target_isdir else os.path.dirname(container_path)
1✔
314
            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:
1✔
315
                container.put_archive(target_path, tar)
1✔
316
        except NotFound:
1✔
317
            raise NoSuchContainer(container_name)
1✔
318
        except APIError as e:
1✔
319
            raise ContainerException() from e
×
320

321
    def copy_from_container(
1✔
322
        self,
323
        container_name: str,
324
        local_path: str,
325
        container_path: str,
326
    ) -> None:
327
        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)
1✔
328
        try:
1✔
329
            container = self.client().containers.get(container_name)
1✔
330
            bits, _ = container.get_archive(container_path)
1✔
331
            Util.untar_to_path(bits, local_path)
1✔
332
        except NotFound:
1✔
333
            raise NoSuchContainer(container_name)
1✔
334
        except APIError as e:
×
335
            raise ContainerException() from e
×
336

337
    def pull_image(self, docker_image: str, platform: Optional[DockerPlatform] = None) -> None:
1✔
338
        LOG.debug("Pulling Docker image: %s", docker_image)
1✔
339
        # some path in the docker image string indicates a custom repository
340
        try:
1✔
341
            self.client().images.pull(docker_image, platform=platform)
1✔
342
        except ImageNotFound:
1✔
343
            raise NoSuchImage(docker_image)
1✔
344
        except APIError as e:
×
345
            raise ContainerException() from e
×
346

347
    def push_image(self, docker_image: str) -> None:
1✔
348
        LOG.debug("Pushing Docker image: %s", docker_image)
1✔
349
        try:
1✔
350
            result = self.client().images.push(docker_image)
1✔
351
            # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising
352
            if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):
1✔
353
                if "image does not exist locally" in to_str(result):
1✔
354
                    raise NoSuchImage(docker_image)
1✔
355
                if "is denied" in to_str(result):
1✔
356
                    raise AccessDenied(docker_image)
1✔
357
                if "requesting higher privileges than access token allows" in to_str(result):
1✔
358
                    raise AccessDenied(docker_image)
×
359
                if "access token has insufficient scopes" in to_str(result):
1✔
360
                    raise AccessDenied(docker_image)
×
361
                if "connection refused" in to_str(result):
1✔
362
                    raise RegistryConnectionError(result)
1✔
363
                raise ContainerException(result)
×
364
        except ImageNotFound:
1✔
365
            raise NoSuchImage(docker_image)
×
366
        except APIError as e:
1✔
367
            # note: error message 'image not known' raised by Podman API
368
            if "image not known" in str(e):
×
369
                raise NoSuchImage(docker_image)
×
370
            raise ContainerException() from e
×
371

372
    def build_image(
1✔
373
        self,
374
        dockerfile_path: str,
375
        image_name: str,
376
        context_path: str = None,
377
        platform: Optional[DockerPlatform] = None,
378
    ):
379
        try:
1✔
380
            dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
1✔
381
            context_path = context_path or os.path.dirname(dockerfile_path)
1✔
382
            LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)
1✔
383
            self.client().images.build(
1✔
384
                path=context_path,
385
                dockerfile=dockerfile_path,
386
                tag=image_name,
387
                rm=True,
388
                platform=platform,
389
            )
390
        except APIError as e:
×
391
            raise ContainerException("Unable to build Docker image") from e
×
392

393
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
394
        try:
1✔
395
            LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)
1✔
396
            image = self.client().images.get(source_ref)
1✔
397
            image.tag(target_name)
1✔
398
        except APIError as e:
1✔
399
            if e.status_code == 404:
1✔
400
                raise NoSuchImage(source_ref)
1✔
401
            raise ContainerException("Unable to tag Docker image") from e
×
402

403
    def get_docker_image_names(
1✔
404
        self,
405
        strip_latest: bool = True,
406
        include_tags: bool = True,
407
        strip_wellknown_repo_prefixes: bool = True,
408
    ):
409
        try:
1✔
410
            images = self.client().images.list()
1✔
411
            image_names = [tag for image in images for tag in image.tags if image.tags]
1✔
412
            if not include_tags:
1✔
413
                image_names = [image_name.rpartition(":")[0] for image_name in image_names]
1✔
414
            if strip_wellknown_repo_prefixes:
1✔
415
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
1✔
416
            if strip_latest:
1✔
417
                Util.append_without_latest(image_names)
1✔
418
            return image_names
1✔
419
        except APIError as e:
×
420
            raise ContainerException() from e
×
421

422
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
423
        try:
1✔
424
            container = self.client().containers.get(container_name_or_id)
1✔
425
            return to_str(container.logs())
1✔
426
        except NotFound:
1✔
427
            if safe:
1✔
428
                return ""
1✔
429
            raise NoSuchContainer(container_name_or_id)
1✔
430
        except APIError as e:
1✔
431
            if safe:
1✔
432
                return ""
×
433
            raise ContainerException() from e
1✔
434

435
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
436
        try:
1✔
437
            container = self.client().containers.get(container_name_or_id)
1✔
438
            return container.logs(stream=True, follow=True)
1✔
439
        except NotFound:
1✔
440
            raise NoSuchContainer(container_name_or_id)
1✔
441
        except APIError as e:
×
442
            raise ContainerException() from e
×
443

444
    def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:
1✔
445
        try:
1✔
446
            return self.client().containers.get(container_name_or_id).attrs
1✔
447
        except NotFound:
1✔
448
            raise NoSuchContainer(container_name_or_id)
1✔
449
        except APIError as e:
×
450
            raise ContainerException() from e
×
451

452
    def inspect_image(
1✔
453
        self,
454
        image_name: str,
455
        pull: bool = True,
456
        strip_wellknown_repo_prefixes: bool = True,
457
    ) -> Dict[str, Union[dict, list, str]]:
458
        try:
1✔
459
            result = self.client().images.get(image_name).attrs
1✔
460
            if strip_wellknown_repo_prefixes:
1✔
461
                if result.get("RepoDigests"):
1✔
462
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
1✔
463
                        result["RepoDigests"]
464
                    )
465
                if result.get("RepoTags"):
1✔
466
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
1✔
467
            return result
1✔
468
        except NotFound:
1✔
469
            if pull:
1✔
470
                self.pull_image(image_name)
1✔
471
                return self.inspect_image(image_name, pull=False)
1✔
472
            raise NoSuchImage(image_name)
1✔
473
        except APIError as e:
×
474
            raise ContainerException() from e
×
475

476
    def create_network(self, network_name: str) -> None:
1✔
477
        try:
1✔
478
            return self.client().networks.create(name=network_name).id
1✔
479
        except APIError as e:
×
480
            raise ContainerException() from e
×
481

482
    def delete_network(self, network_name: str) -> None:
1✔
483
        try:
1✔
484
            return self.client().networks.get(network_name).remove()
1✔
UNCOV
485
        except NotFound:
×
486
            raise NoSuchNetwork(network_name)
×
UNCOV
487
        except APIError as e:
×
UNCOV
488
            raise ContainerException() from e
×
489

490
    def inspect_network(self, network_name: str) -> Dict[str, Union[Dict, str]]:
1✔
491
        try:
1✔
492
            return self.client().networks.get(network_name).attrs
1✔
493
        except NotFound:
1✔
494
            raise NoSuchNetwork(network_name)
1✔
495
        except APIError as e:
×
496
            raise ContainerException() from e
×
497

498
    def connect_container_to_network(
1✔
499
        self,
500
        network_name: str,
501
        container_name_or_id: str,
502
        aliases: Optional[List] = None,
503
        link_local_ips: List[str] = None,
504
    ) -> None:
505
        LOG.debug(
1✔
506
            "Connecting container '%s' to network '%s' with aliases '%s'",
507
            container_name_or_id,
508
            network_name,
509
            aliases,
510
        )
511
        try:
1✔
512
            network = self.client().networks.get(network_name)
1✔
513
        except NotFound:
1✔
514
            raise NoSuchNetwork(network_name)
1✔
515
        try:
1✔
516
            network.connect(
1✔
517
                container=container_name_or_id,
518
                aliases=aliases,
519
                link_local_ips=link_local_ips,
520
            )
521
        except NotFound:
1✔
522
            raise NoSuchContainer(container_name_or_id)
1✔
523
        except APIError as e:
×
524
            raise ContainerException() from e
×
525

526
    def disconnect_container_from_network(
1✔
527
        self, network_name: str, container_name_or_id: str
528
    ) -> None:
529
        LOG.debug(
1✔
530
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
531
        )
532
        try:
1✔
533
            try:
1✔
534
                network = self.client().networks.get(network_name)
1✔
535
            except NotFound:
1✔
536
                raise NoSuchNetwork(network_name)
1✔
537
            try:
1✔
538
                network.disconnect(container_name_or_id)
1✔
539
            except NotFound:
1✔
540
                raise NoSuchContainer(container_name_or_id)
1✔
541
        except APIError as e:
1✔
542
            raise ContainerException() from e
×
543

544
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
545
        networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]
1✔
546
        network_names = list(networks)
1✔
547
        if len(network_names) > 1:
1✔
548
            LOG.info("Container has more than one assigned network. Picking the first one...")
×
549
        return networks[network_names[0]]["IPAddress"]
1✔
550

551
    @lru_cache(maxsize=None)
1✔
552
    def has_docker(self) -> bool:
1✔
553
        try:
1✔
554
            if not self.docker_client:
1✔
555
                return False
×
556
            self.client().ping()
1✔
557
            return True
1✔
558
        except APIError:
×
559
            return False
×
560

561
    def remove_image(self, image: str, force: bool = True):
1✔
562
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
1✔
563
        try:
1✔
564
            self.client().images.remove(image=image, force=force)
1✔
565
        except ImageNotFound:
1✔
566
            if not force:
1✔
567
                raise NoSuchImage(image)
1✔
568
        except APIError as e:
×
569
            if "image not known" in str(e):
×
570
                raise NoSuchImage(image)
×
571
            raise ContainerException() from e
×
572

573
    def commit(
1✔
574
        self,
575
        container_name_or_id: str,
576
        image_name: str,
577
        image_tag: str,
578
    ):
579
        LOG.debug(
1✔
580
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
581
        )
582
        try:
1✔
583
            container = self.client().containers.get(container_name_or_id)
1✔
584
            container.commit(repository=image_name, tag=image_tag)
1✔
585
        except NotFound:
1✔
586
            raise NoSuchContainer(container_name_or_id)
1✔
587
        except APIError as e:
×
588
            raise ContainerException() from e
×
589

590
    def start_container(
1✔
591
        self,
592
        container_name_or_id: str,
593
        stdin=None,
594
        interactive: bool = False,
595
        attach: bool = False,
596
        flags: Optional[str] = None,
597
    ) -> Tuple[bytes, bytes]:
598
        LOG.debug("Starting container %s", container_name_or_id)
1✔
599
        try:
1✔
600
            container = self.client().containers.get(container_name_or_id)
1✔
601
            stdout = to_bytes(container_name_or_id)
1✔
602
            stderr = b""
1✔
603
            if interactive or attach:
1✔
604
                params = {"stdout": 1, "stderr": 1, "stream": 1}
1✔
605
                if interactive:
1✔
606
                    params["stdin"] = 1
1✔
607
                sock = container.attach_socket(params=params)
1✔
608
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
609
                result_queue = queue.Queue()
1✔
610
                thread_started = threading.Event()
1✔
611
                start_waiting = threading.Event()
1✔
612

613
                # Note: We need to be careful about potential race conditions here - .wait() should happen right
614
                #   after .start(). Hence starting a thread and asynchronously waiting for the container exit code
615
                def wait_for_result(*_):
1✔
616
                    _exit_code = -1
1✔
617
                    try:
1✔
618
                        thread_started.set()
1✔
619
                        start_waiting.wait()
1✔
620
                        _exit_code = container.wait()["StatusCode"]
1✔
621
                    except APIError as e:
×
622
                        _exit_code = 1
×
623
                        raise ContainerException(str(e))
×
624
                    finally:
625
                        result_queue.put(_exit_code)
1✔
626

627
                # start listener thread
628
                start_worker_thread(wait_for_result)
1✔
629
                thread_started.wait()
1✔
630
                try:
1✔
631
                    # start container
632
                    container.start()
1✔
633
                finally:
634
                    # start awaiting container result
635
                    start_waiting.set()
1✔
636

637
                # handle container input/output
638
                # under windows, the socket has no __enter__ / cannot be used as context manager
639
                # therefore try/finally instead of with here
640
                try:
1✔
641
                    if stdin:
1✔
642
                        sock.sendall(to_bytes(stdin))
1✔
643
                        sock.shutdown(socket.SHUT_WR)
1✔
644
                    stdout, stderr = self._read_from_sock(sock, False)
1✔
645
                except socket.timeout:
×
646
                    LOG.debug(
×
647
                        "Socket timeout when talking to the I/O streams of Docker container '%s'",
648
                        container_name_or_id,
649
                    )
650
                finally:
651
                    sock.close()
1✔
652

653
                # get container exit code
654
                exit_code = result_queue.get()
1✔
655
                if exit_code:
1✔
656
                    raise ContainerException(
1✔
657
                        f"Docker container returned with exit code {exit_code}",
658
                        stdout=stdout,
659
                        stderr=stderr,
660
                    )
661
            else:
662
                container.start()
1✔
663
            return stdout, stderr
1✔
664
        except NotFound:
1✔
665
            raise NoSuchContainer(container_name_or_id)
1✔
666
        except APIError as e:
1✔
667
            raise ContainerException() from e
1✔
668

669
    def attach_to_container(self, container_name_or_id: str):
1✔
670
        client: DockerClient = self.client()
×
671
        container = cast(Container, client.containers.get(container_name_or_id))
×
672
        container.attach()
×
673

674
    def create_container(
1✔
675
        self,
676
        image_name: str,
677
        *,
678
        name: Optional[str] = None,
679
        entrypoint: Optional[Union[List[str], str]] = None,
680
        remove: bool = False,
681
        interactive: bool = False,
682
        tty: bool = False,
683
        detach: bool = False,
684
        command: Optional[Union[List[str], str]] = None,
685
        volumes: Optional[List[SimpleVolumeBind]] = None,
686
        ports: Optional[PortMappings] = None,
687
        exposed_ports: Optional[List[str]] = None,
688
        env_vars: Optional[Dict[str, str]] = None,
689
        user: Optional[str] = None,
690
        cap_add: Optional[List[str]] = None,
691
        cap_drop: Optional[List[str]] = None,
692
        security_opt: Optional[List[str]] = None,
693
        network: Optional[str] = None,
694
        dns: Optional[Union[str, List[str]]] = None,
695
        additional_flags: Optional[str] = None,
696
        workdir: Optional[str] = None,
697
        privileged: Optional[bool] = None,
698
        labels: Optional[Dict[str, str]] = None,
699
        platform: Optional[DockerPlatform] = None,
700
        ulimits: Optional[List[Ulimit]] = None,
701
        init: Optional[bool] = None,
702
        log_config: Optional[LogConfig] = None,
703
    ) -> str:
704
        LOG.debug("Creating container with attributes: %s", locals())
1✔
705
        extra_hosts = None
1✔
706
        if additional_flags:
1✔
707
            parsed_flags = Util.parse_additional_flags(
1✔
708
                additional_flags,
709
                env_vars=env_vars,
710
                volumes=volumes,
711
                network=network,
712
                platform=platform,
713
                privileged=privileged,
714
                ports=ports,
715
                ulimits=ulimits,
716
                user=user,
717
                dns=dns,
718
            )
719
            env_vars = parsed_flags.env_vars
1✔
720
            extra_hosts = parsed_flags.extra_hosts
1✔
721
            volumes = parsed_flags.volumes
1✔
722
            labels = parsed_flags.labels
1✔
723
            network = parsed_flags.network
1✔
724
            platform = parsed_flags.platform
1✔
725
            privileged = parsed_flags.privileged
1✔
726
            ports = parsed_flags.ports
1✔
727
            ulimits = parsed_flags.ulimits
1✔
728
            user = parsed_flags.user
1✔
729
            dns = parsed_flags.dns
1✔
730

731
        try:
1✔
732
            kwargs = {}
1✔
733
            if cap_add:
1✔
734
                kwargs["cap_add"] = cap_add
1✔
735
            if cap_drop:
1✔
736
                kwargs["cap_drop"] = cap_drop
1✔
737
            if security_opt:
1✔
738
                kwargs["security_opt"] = security_opt
1✔
739
            if dns:
1✔
740
                kwargs["dns"] = ensure_list(dns)
1✔
741
            if exposed_ports:
1✔
742
                # This is not exactly identical to --expose, as they are listed in the "HostConfig" on docker inspect
743
                # but the behavior should be identical
744
                kwargs["ports"] = {port: [] for port in exposed_ports}
1✔
745
            if ports:
1✔
746
                kwargs.setdefault("ports", {})
1✔
747
                kwargs["ports"].update(ports.to_dict())
1✔
748
            if workdir:
1✔
749
                kwargs["working_dir"] = workdir
1✔
750
            if privileged:
1✔
751
                kwargs["privileged"] = True
×
752
            if init:
1✔
753
                kwargs["init"] = True
1✔
754
            if labels:
1✔
755
                kwargs["labels"] = labels
1✔
756
            if log_config:
1✔
757
                kwargs["log_config"] = DockerLogConfig(
1✔
758
                    type=log_config.type, config=log_config.config
759
                )
760
            if ulimits:
1✔
761
                kwargs["ulimits"] = [
1✔
762
                    docker.types.Ulimit(
763
                        name=ulimit.name, soft=ulimit.soft_limit, hard=ulimit.hard_limit
764
                    )
765
                    for ulimit in ulimits
766
                ]
767
            mounts = None
1✔
768
            if volumes:
1✔
769
                mounts = Util.convert_mount_list_to_dict(volumes)
1✔
770

771
            def create_container():
1✔
772
                return self.client().containers.create(
1✔
773
                    image=image_name,
774
                    command=command,
775
                    auto_remove=remove,
776
                    name=name,
777
                    stdin_open=interactive,
778
                    tty=tty,
779
                    entrypoint=entrypoint,
780
                    environment=env_vars,
781
                    detach=detach,
782
                    user=user,
783
                    network=network,
784
                    volumes=mounts,
785
                    extra_hosts=extra_hosts,
786
                    platform=platform,
787
                    **kwargs,
788
                )
789

790
            try:
1✔
791
                container = create_container()
1✔
792
            except ImageNotFound:
1✔
793
                LOG.debug("Image not found. Pulling image %s", image_name)
1✔
794
                self.pull_image(image_name, platform)
1✔
795
                container = create_container()
1✔
796
            return container.id
1✔
797
        except ImageNotFound:
1✔
798
            raise NoSuchImage(image_name)
×
799
        except APIError as e:
1✔
800
            raise ContainerException() from e
×
801

802
    def run_container(
1✔
803
        self,
804
        image_name: str,
805
        stdin=None,
806
        *,
807
        name: Optional[str] = None,
808
        entrypoint: Optional[str] = None,
809
        remove: bool = False,
810
        interactive: bool = False,
811
        tty: bool = False,
812
        detach: bool = False,
813
        command: Optional[Union[List[str], str]] = None,
814
        volumes: Optional[List[SimpleVolumeBind]] = None,
815
        ports: Optional[PortMappings] = None,
816
        exposed_ports: Optional[List[str]] = None,
817
        env_vars: Optional[Dict[str, str]] = None,
818
        user: Optional[str] = None,
819
        cap_add: Optional[List[str]] = None,
820
        cap_drop: Optional[List[str]] = None,
821
        security_opt: Optional[List[str]] = None,
822
        network: Optional[str] = None,
823
        dns: Optional[str] = None,
824
        additional_flags: Optional[str] = None,
825
        workdir: Optional[str] = None,
826
        labels: Optional[Dict[str, str]] = None,
827
        platform: Optional[DockerPlatform] = None,
828
        privileged: Optional[bool] = None,
829
        ulimits: Optional[List[Ulimit]] = None,
830
        init: Optional[bool] = None,
831
        log_config: Optional[LogConfig] = None,
832
    ) -> Tuple[bytes, bytes]:
833
        LOG.debug("Running container with image: %s", image_name)
1✔
834
        container = None
1✔
835
        try:
1✔
836
            container = self.create_container(
1✔
837
                image_name,
838
                name=name,
839
                entrypoint=entrypoint,
840
                interactive=interactive,
841
                tty=tty,
842
                detach=detach,
843
                remove=remove and detach,
844
                command=command,
845
                volumes=volumes,
846
                ports=ports,
847
                exposed_ports=exposed_ports,
848
                env_vars=env_vars,
849
                user=user,
850
                cap_add=cap_add,
851
                cap_drop=cap_drop,
852
                security_opt=security_opt,
853
                network=network,
854
                dns=dns,
855
                additional_flags=additional_flags,
856
                workdir=workdir,
857
                privileged=privileged,
858
                platform=platform,
859
                init=init,
860
                labels=labels,
861
                ulimits=ulimits,
862
                log_config=log_config,
863
            )
864
            result = self.start_container(
1✔
865
                container_name_or_id=container,
866
                stdin=stdin,
867
                interactive=interactive,
868
                attach=not detach,
869
            )
870
        finally:
871
            if remove and container and not detach:
1✔
872
                self.remove_container(container)
1✔
873
        return result
1✔
874

875
    def exec_in_container(
1✔
876
        self,
877
        container_name_or_id: str,
878
        command: Union[List[str], str],
879
        interactive=False,
880
        detach=False,
881
        env_vars: Optional[Dict[str, Optional[str]]] = None,
882
        stdin: Optional[bytes] = None,
883
        user: Optional[str] = None,
884
        workdir: Optional[str] = None,
885
    ) -> Tuple[bytes, bytes]:
886
        LOG.debug("Executing command in container %s: %s", container_name_or_id, command)
1✔
887
        try:
1✔
888
            container: Container = self.client().containers.get(container_name_or_id)
1✔
889
            result = container.exec_run(
1✔
890
                cmd=command,
891
                environment=env_vars,
892
                user=user,
893
                detach=detach,
894
                stdin=interactive and bool(stdin),
895
                socket=interactive and bool(stdin),
896
                stdout=True,
897
                stderr=True,
898
                demux=True,
899
                workdir=workdir,
900
            )
901
            tty = False
1✔
902
            if interactive and stdin:  # result is a socket
1✔
903
                sock = result[1]
1✔
904
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
905
                with sock:
1✔
906
                    try:
1✔
907
                        sock.sendall(stdin)
1✔
908
                        sock.shutdown(socket.SHUT_WR)
1✔
909
                        stdout, stderr = self._read_from_sock(sock, tty)
1✔
910
                        return stdout, stderr
1✔
911
                    except socket.timeout:
×
912
                        pass
×
913
            else:
914
                if detach:
1✔
915
                    return b"", b""
×
916
                return_code = result[0]
1✔
917
                if isinstance(result[1], bytes):
1✔
918
                    stdout = result[1]
×
919
                    stderr = b""
×
920
                else:
921
                    stdout, stderr = result[1]
1✔
922
                if return_code != 0:
1✔
923
                    raise ContainerException(
1✔
924
                        f"Exec command returned with exit code {return_code}", stdout, stderr
925
                    )
926
                return stdout, stderr
1✔
927
        except ContainerError:
1✔
928
            raise NoSuchContainer(container_name_or_id)
×
929
        except APIError as e:
1✔
930
            raise ContainerException() from e
1✔
931

932
    def login(self, username: str, password: str, registry: Optional[str] = None) -> None:
1✔
933
        LOG.debug("Docker login for %s", username)
×
934
        try:
×
935
            self.client().login(username, password=password, registry=registry, reauth=True)
×
936
        except APIError as e:
×
937
            raise ContainerException() from e
×
938

939

940
# apply patches required for podman API compatibility
941

942

943
@property
1✔
944
def _container_image(self):
1✔
945
    image_id = self.attrs.get("ImageID", self.attrs["Image"])
1✔
946
    if image_id is None:
1✔
947
        return None
×
948
    image_ref = image_id
1✔
949
    # Fix for podman API response: Docker returns "sha:..." for `Image`, podman returns "<image-name>:<tag>".
950
    # See https://github.com/containers/podman/issues/8329 . Without this check, the Docker client would
951
    # blindly strip off the suffix after the colon `:` (which is the `<tag>` in podman's case) which would
952
    # then lead to "no such image" errors.
953
    if re.match("sha256:[0-9a-f]{64}", image_id, flags=re.IGNORECASE):
1✔
954
        image_ref = image_id.split(":")[1]
1✔
955
    return self.client.images.get(image_ref)
1✔
956

957

958
Container.image = _container_image
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc