• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17391568346

01 Sep 2025 02:21PM UTC coverage: 86.858% (-0.007%) from 86.865%
17391568346

push

github

web-flow
Fix typing for the tagging service (#13077)

6 of 6 new or added lines in 1 file covered. (100.0%)

9 existing lines in 3 files now uncovered.

67082 of 77232 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.16
/localstack-core/localstack/utils/container_utils/docker_sdk_client.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import queue
1✔
6
import re
1✔
7
import socket
1✔
8
import threading
1✔
9
from functools import cache
1✔
10
from time import sleep
1✔
11
from typing import Callable, Optional, Union, cast
1✔
12
from urllib.parse import quote
1✔
13

14
import docker
1✔
15
from docker import DockerClient
1✔
16
from docker.errors import APIError, ContainerError, DockerException, ImageNotFound, NotFound
1✔
17
from docker.models.containers import Container
1✔
18
from docker.types import LogConfig as DockerLogConfig
1✔
19
from docker.utils.socket import STDERR, STDOUT, frames_iter
1✔
20

21
from localstack.config import LS_LOG
1✔
22
from localstack.constants import TRACE_LOG_LEVELS
1✔
23
from localstack.utils.collections import ensure_list
1✔
24
from localstack.utils.container_utils.container_client import (
1✔
25
    AccessDenied,
26
    CancellableStream,
27
    ContainerClient,
28
    ContainerException,
29
    DockerContainerStats,
30
    DockerContainerStatus,
31
    DockerNotAvailable,
32
    DockerPlatform,
33
    LogConfig,
34
    NoSuchContainer,
35
    NoSuchImage,
36
    NoSuchNetwork,
37
    PortMappings,
38
    RegistryConnectionError,
39
    SimpleVolumeBind,
40
    Ulimit,
41
    Util,
42
)
43
from localstack.utils.strings import to_bytes, to_str
1✔
44
from localstack.utils.threads import start_worker_thread
1✔
45

46
LOG = logging.getLogger(__name__)
1✔
47
SDK_ISDIR = 1 << 31
1✔
48

49

50
class SdkDockerClient(ContainerClient):
1✔
51
    """
52
    Class for managing Docker (or Podman) using the Python Docker SDK.
53

54
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
55
    for Docker these days (with ongoing efforts to further streamline the two), and the Docker SDK
56
    is doing some of the heavy lifting for us to support both target platforms.
57
    """
58

59
    docker_client: Optional[DockerClient]
1✔
60

61
    def __init__(self):
1✔
62
        try:
1✔
63
            self.docker_client = self._create_client()
1✔
64
            logging.getLogger("urllib3").setLevel(logging.INFO)
1✔
65
        except DockerNotAvailable:
1✔
66
            self.docker_client = None
1✔
67

68
    def client(self):
1✔
69
        if self.docker_client:
1✔
70
            return self.docker_client
1✔
71
        # if the initialization failed before, try to initialize on-demand
72
        self.docker_client = self._create_client()
1✔
73
        return self.docker_client
1✔
74

75
    @staticmethod
1✔
76
    def _create_client():
1✔
77
        from localstack.config import DOCKER_SDK_DEFAULT_RETRIES, DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS
1✔
78

79
        for attempt in range(0, DOCKER_SDK_DEFAULT_RETRIES + 1):
1✔
80
            try:
1✔
81
                return docker.from_env(timeout=DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS)
1✔
82
            except DockerException as e:
1✔
83
                LOG.debug(
1✔
84
                    "Creating Docker SDK client failed: %s. "
85
                    "If you want to use Docker as container runtime, make sure to mount the socket at /var/run/docker.sock",
86
                    e,
87
                    exc_info=LS_LOG in TRACE_LOG_LEVELS,
88
                )
89
                if attempt < DOCKER_SDK_DEFAULT_RETRIES:
1✔
90
                    # wait for a second before retrying
91
                    sleep(1)
1✔
92
                else:
93
                    # we are out of attempts
94
                    raise DockerNotAvailable("Docker not available") from e
1✔
95

96
    def _read_from_sock(self, sock: socket, tty: bool):
1✔
97
        """Reads multiplexed messages from a socket returned by attach_socket.
98

99
        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
100
        """
101
        stdout = b""
1✔
102
        stderr = b""
1✔
103
        for frame_type, frame_data in frames_iter(sock, tty):
1✔
104
            if frame_type == STDOUT:
1✔
105
                stdout += frame_data
1✔
106
            elif frame_type == STDERR:
1✔
107
                stderr += frame_data
1✔
108
            else:
109
                raise ContainerException("Invalid frame type when reading from socket")
×
110
        return stdout, stderr
1✔
111

112
    def _container_path_info(self, container: Container, container_path: str):
1✔
113
        """
114
        Get information about a path in the given container
115
        :param container: Container to be inspected
116
        :param container_path: Path in container
117
        :return: Tuple (path_exists, path_is_directory)
118
        """
119
        # Docker CLI copy uses go FileMode to determine if target is a dict or not
120
        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260
121
        # The isDir Bit is the most significant bit in the 32bit struct:
122
        # https://golang.org/src/os/types.go?s=2650:2683
123
        api_client = self.client().api
1✔
124

125
        def _head(path_suffix, **kwargs):
1✔
126
            return api_client.head(
1✔
127
                api_client.base_url + path_suffix, **api_client._set_request_timeout(kwargs)
128
            )
129

130
        escaped_id = quote(container.id, safe="/:")
1✔
131
        result = _head(f"/containers/{escaped_id}/archive", params={"path": container_path})
1✔
132
        stats = result.headers.get("X-Docker-Container-Path-Stat")
1✔
133
        target_exists = result.ok
1✔
134

135
        if target_exists:
1✔
136
            stats = json.loads(base64.b64decode(stats).decode("utf-8"))
1✔
137
        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)
1✔
138
        return target_exists, target_is_dir
1✔
139

140
    def get_system_info(self) -> dict:
1✔
141
        return self.client().info()
1✔
142

143
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
144
        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose
145
        try:
1✔
146
            container = self.client().containers.get(container_name)
1✔
147
            if container.status == "running":
1✔
148
                return DockerContainerStatus.UP
1✔
149
            elif container.status == "paused":
1✔
150
                return DockerContainerStatus.PAUSED
1✔
151
            else:
152
                return DockerContainerStatus.DOWN
1✔
153
        except NotFound:
1✔
154
            return DockerContainerStatus.NON_EXISTENT
1✔
155
        except APIError as e:
×
156
            raise ContainerException() from e
×
157

158
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
159
        try:
1✔
160
            container = self.client().containers.get(container_name)
1✔
161
            sdk_stats = container.stats(stream=False)
1✔
162

163
            # BlockIO: (Read, Write) bytes
164
            read_bytes = 0
1✔
165
            write_bytes = 0
1✔
166
            for entry in (
1✔
167
                sdk_stats.get("blkio_stats", {}).get("io_service_bytes_recursive", []) or []
168
            ):
169
                if entry.get("op") == "read":
×
170
                    read_bytes += entry.get("value", 0)
×
171
                elif entry.get("op") == "write":
×
172
                    write_bytes += entry.get("value", 0)
×
173

174
            # CPU percentage
175
            cpu_stats = sdk_stats.get("cpu_stats", {})
1✔
176
            precpu_stats = sdk_stats.get("precpu_stats", {})
1✔
177

178
            cpu_delta = cpu_stats.get("cpu_usage", {}).get("total_usage", 0) - precpu_stats.get(
1✔
179
                "cpu_usage", {}
180
            ).get("total_usage", 0)
181

182
            system_delta = cpu_stats.get("system_cpu_usage", 0) - precpu_stats.get(
1✔
183
                "system_cpu_usage", 0
184
            )
185

186
            online_cpus = cpu_stats.get("online_cpus", 1)
1✔
187
            cpu_percent = (
1✔
188
                (cpu_delta / system_delta * 100.0 * online_cpus) if system_delta > 0 else 0.0
189
            )
190

191
            # Memory (usage, limit) bytes
192
            memory_stats = sdk_stats.get("memory_stats", {})
1✔
193
            mem_usage = memory_stats.get("usage", 0)
1✔
194
            mem_limit = memory_stats.get("limit", 1)  # Prevent division by zero
1✔
195
            mem_inactive = memory_stats.get("stats", {}).get("inactive_file", 0)
1✔
196
            used_memory = max(0, mem_usage - mem_inactive)
1✔
197
            mem_percent = (used_memory / mem_limit * 100.0) if mem_limit else 0.0
1✔
198

199
            # Network IO
200
            net_rx = 0
1✔
201
            net_tx = 0
1✔
202
            for iface in sdk_stats.get("networks", {}).values():
1✔
203
                net_rx += iface.get("rx_bytes", 0)
1✔
204
                net_tx += iface.get("tx_bytes", 0)
1✔
205

206
            # Container ID
207
            container_id = sdk_stats.get("id", "")[:12]
1✔
208
            name = sdk_stats.get("name", "").lstrip("/")
1✔
209

210
            return DockerContainerStats(
1✔
211
                Container=container_id,
212
                ID=container_id,
213
                Name=name,
214
                BlockIO=(read_bytes, write_bytes),
215
                CPUPerc=round(cpu_percent, 2),
216
                MemPerc=round(mem_percent, 2),
217
                MemUsage=(used_memory, mem_limit),
218
                NetIO=(net_rx, net_tx),
219
                PIDs=sdk_stats.get("pids_stats", {}).get("current", 0),
220
                SDKStats=sdk_stats,  # keep the raw stats for more detailed information
221
            )
222
        except NotFound:
×
223
            raise NoSuchContainer(container_name)
×
224
        except APIError as e:
×
225
            raise ContainerException() from e
×
226

227
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
228
        LOG.debug("Stopping container: %s", container_name)
1✔
229
        try:
1✔
230
            container = self.client().containers.get(container_name)
1✔
231
            container.stop(timeout=timeout)
1✔
232
        except NotFound:
1✔
233
            raise NoSuchContainer(container_name)
1✔
234
        except APIError as e:
×
235
            raise ContainerException() from e
×
236

237
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
238
        LOG.debug("Restarting container: %s", container_name)
1✔
239
        try:
1✔
240
            container = self.client().containers.get(container_name)
1✔
241
            container.restart(timeout=timeout)
1✔
242
        except NotFound:
1✔
243
            raise NoSuchContainer(container_name)
1✔
244
        except APIError as e:
×
245
            raise ContainerException() from e
×
246

247
    def pause_container(self, container_name: str) -> None:
1✔
248
        LOG.debug("Pausing container: %s", container_name)
1✔
249
        try:
1✔
250
            container = self.client().containers.get(container_name)
1✔
251
            container.pause()
1✔
252
        except NotFound:
1✔
253
            raise NoSuchContainer(container_name)
1✔
254
        except APIError as e:
×
255
            raise ContainerException() from e
×
256

257
    def unpause_container(self, container_name: str) -> None:
1✔
258
        LOG.debug("Unpausing container: %s", container_name)
1✔
259
        try:
1✔
260
            container = self.client().containers.get(container_name)
1✔
261
            container.unpause()
1✔
262
        except NotFound:
×
263
            raise NoSuchContainer(container_name)
×
264
        except APIError as e:
×
265
            raise ContainerException() from e
×
266

267
    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:
1✔
268
        LOG.debug("Removing container: %s", container_name)
1✔
269
        if check_existence and container_name not in self.get_all_container_names():
1✔
270
            LOG.debug("Aborting removing due to check_existence check")
×
271
            return
×
272
        try:
1✔
273
            container = self.client().containers.get(container_name)
1✔
274
            container.remove(force=force)
1✔
275
        except NotFound:
1✔
276
            if not force:
1✔
277
                raise NoSuchContainer(container_name)
1✔
278
        except APIError as e:
×
279
            raise ContainerException() from e
×
280

281
    def list_containers(self, filter: Union[list[str], str, None] = None, all=True) -> list[dict]:
1✔
282
        if filter:
1✔
283
            filter = [filter] if isinstance(filter, str) else filter
1✔
284
            filter = dict([f.split("=", 1) for f in filter])
1✔
285
        LOG.debug("Listing containers with filters: %s", filter)
1✔
286
        try:
1✔
287
            container_list = self.client().containers.list(filters=filter, all=all)
1✔
288
            result = []
1✔
289
            for container in container_list:
1✔
290
                try:
1✔
291
                    result.append(
1✔
292
                        {
293
                            "id": container.id,
294
                            "image": container.image,
295
                            "name": container.name,
296
                            "status": container.status,
297
                            "labels": container.labels,
298
                        }
299
                    )
300
                except Exception as e:
×
301
                    LOG.error("Error checking container %s: %s", container, e)
×
302
            return result
1✔
303
        except APIError as e:
1✔
304
            raise ContainerException() from e
1✔
305

306
    def copy_into_container(
1✔
307
        self, container_name: str, local_path: str, container_path: str
308
    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/
309
        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)
1✔
310
        try:
1✔
311
            container = self.client().containers.get(container_name)
1✔
312
            target_exists, target_isdir = self._container_path_info(container, container_path)
1✔
313
            target_path = container_path if target_isdir else os.path.dirname(container_path)
1✔
314
            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:
1✔
315
                container.put_archive(target_path, tar)
1✔
316
        except NotFound:
1✔
317
            raise NoSuchContainer(container_name)
1✔
318
        except APIError as e:
1✔
319
            raise ContainerException() from e
×
320

321
    def copy_from_container(
1✔
322
        self,
323
        container_name: str,
324
        local_path: str,
325
        container_path: str,
326
    ) -> None:
327
        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)
1✔
328
        try:
1✔
329
            container = self.client().containers.get(container_name)
1✔
330
            bits, _ = container.get_archive(container_path)
1✔
331
            Util.untar_to_path(bits, local_path)
1✔
332
        except NotFound:
1✔
333
            raise NoSuchContainer(container_name)
1✔
334
        except APIError as e:
×
335
            raise ContainerException() from e
×
336

337
    def pull_image(
1✔
338
        self,
339
        docker_image: str,
340
        platform: Optional[DockerPlatform] = None,
341
        log_handler: Optional[Callable[[str], None]] = None,
342
    ) -> None:
343
        LOG.debug("Pulling Docker image: %s", docker_image)
1✔
344
        # some path in the docker image string indicates a custom repository
345

346
        docker_image = self.registry_resolver_strategy.resolve(docker_image)
1✔
347
        kwargs: dict[str, Union[str, bool]] = {"platform": platform}
1✔
348
        try:
1✔
349
            if log_handler:
1✔
350
                # Use a lower-level API, as the 'stream' argument is not available in the higher-level `pull`-API
351
                kwargs["stream"] = True
1✔
352
                stream = self.client().api.pull(docker_image, **kwargs)
1✔
353
                for line in stream:
1✔
354
                    log_handler(to_str(line))
1✔
355
            else:
356
                self.client().images.pull(docker_image, **kwargs)
1✔
357
        except ImageNotFound:
1✔
358
            raise NoSuchImage(docker_image)
1✔
359
        except APIError as e:
×
360
            raise ContainerException() from e
×
361

362
    def push_image(self, docker_image: str) -> None:
1✔
363
        LOG.debug("Pushing Docker image: %s", docker_image)
1✔
364
        try:
1✔
365
            result = self.client().images.push(docker_image)
1✔
366
            # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising
367
            if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):
1✔
368
                if "image does not exist locally" in to_str(result):
1✔
369
                    raise NoSuchImage(docker_image)
1✔
370
                if "is denied" in to_str(result):
1✔
371
                    raise AccessDenied(docker_image)
1✔
372
                if "requesting higher privileges than access token allows" in to_str(result):
1✔
373
                    raise AccessDenied(docker_image)
×
374
                if "access token has insufficient scopes" in to_str(result):
1✔
375
                    raise AccessDenied(docker_image)
×
376
                if "connection refused" in to_str(result):
1✔
377
                    raise RegistryConnectionError(result)
1✔
378
                raise ContainerException(result)
×
379
        except ImageNotFound:
1✔
380
            raise NoSuchImage(docker_image)
×
381
        except APIError as e:
1✔
382
            # note: error message 'image not known' raised by Podman API
383
            if "image not known" in str(e):
×
384
                raise NoSuchImage(docker_image)
×
385
            raise ContainerException() from e
×
386

387
    def build_image(
1✔
388
        self,
389
        dockerfile_path: str,
390
        image_name: str,
391
        context_path: str = None,
392
        platform: Optional[DockerPlatform] = None,
393
    ):
394
        try:
1✔
395
            dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
1✔
396
            context_path = context_path or os.path.dirname(dockerfile_path)
1✔
397
            LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)
1✔
398
            _, logs_iterator = self.client().images.build(
1✔
399
                path=context_path,
400
                dockerfile=dockerfile_path,
401
                tag=image_name,
402
                rm=True,
403
                platform=platform,
404
            )
405
            # logs_iterator is a stream of dicts. Example content:
406
            # {'stream': 'Step 1/4 : FROM alpine'}
407
            # ... other build steps
408
            # {'aux': {'ID': 'sha256:4dcf90e87fb963e898f9c7a0451a40e36f8e7137454c65ae4561277081747825'}}
409
            # {'stream': 'Successfully tagged img-5201f3e1:latest\n'}
410
            output = ""
1✔
411
            for log in logs_iterator:
1✔
412
                if isinstance(log, dict) and ("stream" in log or "error" in log):
1✔
413
                    output += log.get("stream") or log["error"]
1✔
414
            return output
1✔
415
        except APIError as e:
×
416
            raise ContainerException("Unable to build Docker image") from e
×
417

418
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
419
        try:
1✔
420
            LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)
1✔
421
            image = self.client().images.get(source_ref)
1✔
422
            image.tag(target_name)
1✔
423
        except APIError as e:
1✔
424
            if e.status_code == 404:
1✔
425
                raise NoSuchImage(source_ref)
1✔
426
            raise ContainerException("Unable to tag Docker image") from e
×
427

428
    def get_docker_image_names(
1✔
429
        self,
430
        strip_latest: bool = True,
431
        include_tags: bool = True,
432
        strip_wellknown_repo_prefixes: bool = True,
433
    ):
434
        try:
1✔
435
            images = self.client().images.list()
1✔
436
            image_names = [tag for image in images for tag in image.tags if image.tags]
1✔
437
            if not include_tags:
1✔
438
                image_names = [image_name.rpartition(":")[0] for image_name in image_names]
1✔
439
            if strip_wellknown_repo_prefixes:
1✔
440
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
1✔
441
            if strip_latest:
1✔
442
                Util.append_without_latest(image_names)
1✔
443
            return image_names
1✔
444
        except APIError as e:
×
445
            raise ContainerException() from e
×
446

447
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
448
        try:
1✔
449
            container = self.client().containers.get(container_name_or_id)
1✔
450
            return to_str(container.logs())
1✔
451
        except NotFound:
1✔
452
            if safe:
1✔
453
                return ""
1✔
454
            raise NoSuchContainer(container_name_or_id)
1✔
455
        except APIError as e:
1✔
456
            if safe:
1✔
457
                return ""
×
458
            raise ContainerException() from e
1✔
459

460
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
461
        try:
1✔
462
            container = self.client().containers.get(container_name_or_id)
1✔
463
            return container.logs(stream=True, follow=True)
1✔
464
        except NotFound:
1✔
465
            raise NoSuchContainer(container_name_or_id)
1✔
466
        except APIError as e:
×
467
            raise ContainerException() from e
×
468

469
    def inspect_container(self, container_name_or_id: str) -> dict[str, Union[dict, str]]:
1✔
470
        try:
1✔
471
            return self.client().containers.get(container_name_or_id).attrs
1✔
472
        except NotFound:
1✔
473
            raise NoSuchContainer(container_name_or_id)
1✔
474
        except APIError as e:
×
475
            raise ContainerException() from e
×
476

477
    def inspect_image(
1✔
478
        self,
479
        image_name: str,
480
        pull: bool = True,
481
        strip_wellknown_repo_prefixes: bool = True,
482
    ) -> dict[str, Union[dict, list, str]]:
483
        image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
484
        try:
1✔
485
            result = self.client().images.get(image_name).attrs
1✔
486
            if strip_wellknown_repo_prefixes:
1✔
487
                if result.get("RepoDigests"):
1✔
488
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
1✔
489
                        result["RepoDigests"]
490
                    )
491
                if result.get("RepoTags"):
1✔
492
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
1✔
493
            return result
1✔
494
        except NotFound:
1✔
495
            if pull:
1✔
496
                self.pull_image(image_name)
1✔
497
                return self.inspect_image(image_name, pull=False)
1✔
498
            raise NoSuchImage(image_name)
1✔
499
        except APIError as e:
×
500
            raise ContainerException() from e
×
501

502
    def create_network(self, network_name: str) -> None:
1✔
503
        try:
1✔
504
            return self.client().networks.create(name=network_name).id
1✔
505
        except APIError as e:
×
506
            raise ContainerException() from e
×
507

508
    def delete_network(self, network_name: str) -> None:
1✔
509
        try:
1✔
510
            return self.client().networks.get(network_name).remove()
1✔
UNCOV
511
        except NotFound:
×
512
            raise NoSuchNetwork(network_name)
×
UNCOV
513
        except APIError as e:
×
UNCOV
514
            raise ContainerException() from e
×
515

516
    def inspect_network(self, network_name: str) -> dict[str, Union[dict, str]]:
1✔
517
        try:
1✔
518
            return self.client().networks.get(network_name).attrs
1✔
519
        except NotFound:
1✔
520
            raise NoSuchNetwork(network_name)
1✔
521
        except APIError as e:
×
522
            raise ContainerException() from e
×
523

524
    def connect_container_to_network(
1✔
525
        self,
526
        network_name: str,
527
        container_name_or_id: str,
528
        aliases: Optional[list] = None,
529
        link_local_ips: list[str] = None,
530
    ) -> None:
531
        LOG.debug(
1✔
532
            "Connecting container '%s' to network '%s' with aliases '%s'",
533
            container_name_or_id,
534
            network_name,
535
            aliases,
536
        )
537
        try:
1✔
538
            network = self.client().networks.get(network_name)
1✔
539
        except NotFound:
1✔
540
            raise NoSuchNetwork(network_name)
1✔
541
        try:
1✔
542
            network.connect(
1✔
543
                container=container_name_or_id,
544
                aliases=aliases,
545
                link_local_ips=link_local_ips,
546
            )
547
        except NotFound:
1✔
548
            raise NoSuchContainer(container_name_or_id)
1✔
549
        except APIError as e:
×
550
            raise ContainerException() from e
×
551

552
    def disconnect_container_from_network(
1✔
553
        self, network_name: str, container_name_or_id: str
554
    ) -> None:
555
        LOG.debug(
1✔
556
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
557
        )
558
        try:
1✔
559
            try:
1✔
560
                network = self.client().networks.get(network_name)
1✔
561
            except NotFound:
1✔
562
                raise NoSuchNetwork(network_name)
1✔
563
            try:
1✔
564
                network.disconnect(container_name_or_id)
1✔
565
            except NotFound:
1✔
566
                raise NoSuchContainer(container_name_or_id)
1✔
567
        except APIError as e:
1✔
568
            raise ContainerException() from e
×
569

570
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
571
        networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]
1✔
572
        network_names = list(networks)
1✔
573
        if len(network_names) > 1:
1✔
574
            LOG.info("Container has more than one assigned network. Picking the first one...")
×
575
        return networks[network_names[0]]["IPAddress"]
1✔
576

577
    @cache
1✔
578
    def has_docker(self) -> bool:
1✔
579
        try:
1✔
580
            if not self.docker_client:
1✔
581
                return False
×
582
            self.client().ping()
1✔
583
            return True
1✔
584
        except APIError:
×
585
            return False
×
586

587
    def remove_image(self, image: str, force: bool = True):
1✔
588
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
1✔
589
        try:
1✔
590
            self.client().images.remove(image=image, force=force)
1✔
591
        except ImageNotFound:
1✔
592
            if not force:
1✔
593
                raise NoSuchImage(image)
1✔
594
        except APIError as e:
×
595
            if "image not known" in str(e):
×
596
                raise NoSuchImage(image)
×
597
            raise ContainerException() from e
×
598

599
    def commit(
1✔
600
        self,
601
        container_name_or_id: str,
602
        image_name: str,
603
        image_tag: str,
604
    ):
605
        LOG.debug(
1✔
606
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
607
        )
608
        try:
1✔
609
            container = self.client().containers.get(container_name_or_id)
1✔
610
            container.commit(repository=image_name, tag=image_tag)
1✔
611
        except NotFound:
1✔
612
            raise NoSuchContainer(container_name_or_id)
1✔
613
        except APIError as e:
×
614
            raise ContainerException() from e
×
615

616
    def start_container(
1✔
617
        self,
618
        container_name_or_id: str,
619
        stdin=None,
620
        interactive: bool = False,
621
        attach: bool = False,
622
        flags: Optional[str] = None,
623
    ) -> tuple[bytes, bytes]:
624
        LOG.debug("Starting container %s", container_name_or_id)
1✔
625
        try:
1✔
626
            container = self.client().containers.get(container_name_or_id)
1✔
627
            stdout = to_bytes(container_name_or_id)
1✔
628
            stderr = b""
1✔
629
            if interactive or attach:
1✔
630
                params = {"stdout": 1, "stderr": 1, "stream": 1}
1✔
631
                if interactive:
1✔
632
                    params["stdin"] = 1
1✔
633
                sock = container.attach_socket(params=params)
1✔
634
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
635
                result_queue = queue.Queue()
1✔
636
                thread_started = threading.Event()
1✔
637
                start_waiting = threading.Event()
1✔
638

639
                # Note: We need to be careful about potential race conditions here - .wait() should happen right
640
                #   after .start(). Hence starting a thread and asynchronously waiting for the container exit code
641
                def wait_for_result(*_):
1✔
642
                    _exit_code = -1
1✔
643
                    try:
1✔
644
                        thread_started.set()
1✔
645
                        start_waiting.wait()
1✔
646
                        _exit_code = container.wait()["StatusCode"]
1✔
647
                    except APIError as e:
×
648
                        _exit_code = 1
×
649
                        raise ContainerException(str(e))
×
650
                    finally:
651
                        result_queue.put(_exit_code)
1✔
652

653
                # start listener thread
654
                start_worker_thread(wait_for_result)
1✔
655
                thread_started.wait()
1✔
656
                try:
1✔
657
                    # start container
658
                    container.start()
1✔
659
                finally:
660
                    # start awaiting container result
661
                    start_waiting.set()
1✔
662

663
                # handle container input/output
664
                # under windows, the socket has no __enter__ / cannot be used as context manager
665
                # therefore try/finally instead of with here
666
                try:
1✔
667
                    if stdin:
1✔
668
                        sock.sendall(to_bytes(stdin))
1✔
669
                        sock.shutdown(socket.SHUT_WR)
1✔
670
                    stdout, stderr = self._read_from_sock(sock, False)
1✔
671
                except socket.timeout:
×
672
                    LOG.debug(
×
673
                        "Socket timeout when talking to the I/O streams of Docker container '%s'",
674
                        container_name_or_id,
675
                    )
676
                finally:
677
                    sock.close()
1✔
678

679
                # get container exit code
680
                exit_code = result_queue.get()
1✔
681
                if exit_code:
1✔
682
                    raise ContainerException(
1✔
683
                        f"Docker container returned with exit code {exit_code}",
684
                        stdout=stdout,
685
                        stderr=stderr,
686
                    )
687
            else:
688
                container.start()
1✔
689
            return stdout, stderr
1✔
690
        except NotFound:
1✔
691
            raise NoSuchContainer(container_name_or_id)
1✔
692
        except APIError as e:
1✔
693
            raise ContainerException() from e
1✔
694

695
    def attach_to_container(self, container_name_or_id: str):
1✔
696
        client: DockerClient = self.client()
×
697
        container = cast(Container, client.containers.get(container_name_or_id))
×
698
        container.attach()
×
699

700
    def create_container(
1✔
701
        self,
702
        image_name: str,
703
        *,
704
        name: Optional[str] = None,
705
        entrypoint: Optional[Union[list[str], str]] = None,
706
        remove: bool = False,
707
        interactive: bool = False,
708
        tty: bool = False,
709
        detach: bool = False,
710
        command: Optional[Union[list[str], str]] = None,
711
        volumes: Optional[list[SimpleVolumeBind]] = None,
712
        ports: Optional[PortMappings] = None,
713
        exposed_ports: Optional[list[str]] = None,
714
        env_vars: Optional[dict[str, str]] = None,
715
        user: Optional[str] = None,
716
        cap_add: Optional[list[str]] = None,
717
        cap_drop: Optional[list[str]] = None,
718
        security_opt: Optional[list[str]] = None,
719
        network: Optional[str] = None,
720
        dns: Optional[Union[str, list[str]]] = None,
721
        additional_flags: Optional[str] = None,
722
        workdir: Optional[str] = None,
723
        privileged: Optional[bool] = None,
724
        labels: Optional[dict[str, str]] = None,
725
        platform: Optional[DockerPlatform] = None,
726
        ulimits: Optional[list[Ulimit]] = None,
727
        init: Optional[bool] = None,
728
        log_config: Optional[LogConfig] = None,
729
    ) -> str:
730
        LOG.debug("Creating container with attributes: %s", locals())
1✔
731
        extra_hosts = None
1✔
732
        if additional_flags:
1✔
733
            parsed_flags = Util.parse_additional_flags(
1✔
734
                additional_flags,
735
                env_vars=env_vars,
736
                volumes=volumes,
737
                network=network,
738
                platform=platform,
739
                privileged=privileged,
740
                ports=ports,
741
                ulimits=ulimits,
742
                user=user,
743
                dns=dns,
744
            )
745
            env_vars = parsed_flags.env_vars
1✔
746
            extra_hosts = parsed_flags.extra_hosts
1✔
747
            volumes = parsed_flags.volumes
1✔
748
            labels = parsed_flags.labels
1✔
749
            network = parsed_flags.network
1✔
750
            platform = parsed_flags.platform
1✔
751
            privileged = parsed_flags.privileged
1✔
752
            ports = parsed_flags.ports
1✔
753
            ulimits = parsed_flags.ulimits
1✔
754
            user = parsed_flags.user
1✔
755
            dns = parsed_flags.dns
1✔
756

757
        try:
1✔
758
            kwargs = {}
1✔
759
            if cap_add:
1✔
760
                kwargs["cap_add"] = cap_add
1✔
761
            if cap_drop:
1✔
762
                kwargs["cap_drop"] = cap_drop
1✔
763
            if security_opt:
1✔
764
                kwargs["security_opt"] = security_opt
1✔
765
            if dns:
1✔
766
                kwargs["dns"] = ensure_list(dns)
1✔
767
            if exposed_ports:
1✔
768
                # This is not exactly identical to --expose, as they are listed in the "HostConfig" on docker inspect
769
                # but the behavior should be identical
770
                kwargs["ports"] = {port: [] for port in exposed_ports}
×
771
            if ports:
1✔
772
                kwargs.setdefault("ports", {})
1✔
773
                kwargs["ports"].update(ports.to_dict())
1✔
774
            if workdir:
1✔
775
                kwargs["working_dir"] = workdir
1✔
776
            if privileged:
1✔
777
                kwargs["privileged"] = True
×
778
            if init:
1✔
779
                kwargs["init"] = True
1✔
780
            if labels:
1✔
781
                kwargs["labels"] = labels
1✔
782
            if log_config:
1✔
783
                kwargs["log_config"] = DockerLogConfig(
1✔
784
                    type=log_config.type, config=log_config.config
785
                )
786
            if ulimits:
1✔
787
                kwargs["ulimits"] = [
1✔
788
                    docker.types.Ulimit(
789
                        name=ulimit.name, soft=ulimit.soft_limit, hard=ulimit.hard_limit
790
                    )
791
                    for ulimit in ulimits
792
                ]
793
            mounts = None
1✔
794
            if volumes:
1✔
795
                mounts = Util.convert_mount_list_to_dict(volumes)
1✔
796

797
            image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
798

799
            def create_container():
1✔
800
                return self.client().containers.create(
1✔
801
                    image=image_name,
802
                    command=command,
803
                    auto_remove=remove,
804
                    name=name,
805
                    stdin_open=interactive,
806
                    tty=tty,
807
                    entrypoint=entrypoint,
808
                    environment=env_vars,
809
                    detach=detach,
810
                    user=user,
811
                    network=network,
812
                    volumes=mounts,
813
                    extra_hosts=extra_hosts,
814
                    platform=platform,
815
                    **kwargs,
816
                )
817

818
            try:
1✔
819
                container = create_container()
1✔
820
            except ImageNotFound:
1✔
821
                LOG.debug("Image not found. Pulling image %s", image_name)
1✔
822
                self.pull_image(image_name, platform)
1✔
823
                container = create_container()
1✔
824
            return container.id
1✔
825
        except ImageNotFound:
1✔
826
            raise NoSuchImage(image_name)
×
827
        except APIError as e:
1✔
828
            raise ContainerException() from e
×
829

830
    def run_container(
1✔
831
        self,
832
        image_name: str,
833
        stdin=None,
834
        *,
835
        name: Optional[str] = None,
836
        entrypoint: Optional[str] = None,
837
        remove: bool = False,
838
        interactive: bool = False,
839
        tty: bool = False,
840
        detach: bool = False,
841
        command: Optional[Union[list[str], str]] = None,
842
        volumes: Optional[list[SimpleVolumeBind]] = None,
843
        ports: Optional[PortMappings] = None,
844
        exposed_ports: Optional[list[str]] = None,
845
        env_vars: Optional[dict[str, str]] = None,
846
        user: Optional[str] = None,
847
        cap_add: Optional[list[str]] = None,
848
        cap_drop: Optional[list[str]] = None,
849
        security_opt: Optional[list[str]] = None,
850
        network: Optional[str] = None,
851
        dns: Optional[str] = None,
852
        additional_flags: Optional[str] = None,
853
        workdir: Optional[str] = None,
854
        labels: Optional[dict[str, str]] = None,
855
        platform: Optional[DockerPlatform] = None,
856
        privileged: Optional[bool] = None,
857
        ulimits: Optional[list[Ulimit]] = None,
858
        init: Optional[bool] = None,
859
        log_config: Optional[LogConfig] = None,
860
    ) -> tuple[bytes, bytes]:
861
        LOG.debug("Running container with image: %s", image_name)
1✔
862
        container = None
1✔
863
        try:
1✔
864
            container = self.create_container(
1✔
865
                image_name,
866
                name=name,
867
                entrypoint=entrypoint,
868
                interactive=interactive,
869
                tty=tty,
870
                detach=detach,
871
                remove=remove and detach,
872
                command=command,
873
                volumes=volumes,
874
                ports=ports,
875
                exposed_ports=exposed_ports,
876
                env_vars=env_vars,
877
                user=user,
878
                cap_add=cap_add,
879
                cap_drop=cap_drop,
880
                security_opt=security_opt,
881
                network=network,
882
                dns=dns,
883
                additional_flags=additional_flags,
884
                workdir=workdir,
885
                privileged=privileged,
886
                platform=platform,
887
                init=init,
888
                labels=labels,
889
                ulimits=ulimits,
890
                log_config=log_config,
891
            )
892
            result = self.start_container(
1✔
893
                container_name_or_id=container,
894
                stdin=stdin,
895
                interactive=interactive,
896
                attach=not detach,
897
            )
898
        finally:
899
            if remove and container and not detach:
1✔
900
                self.remove_container(container)
1✔
901
        return result
1✔
902

903
    def exec_in_container(
1✔
904
        self,
905
        container_name_or_id: str,
906
        command: Union[list[str], str],
907
        interactive=False,
908
        detach=False,
909
        env_vars: Optional[dict[str, Optional[str]]] = None,
910
        stdin: Optional[bytes] = None,
911
        user: Optional[str] = None,
912
        workdir: Optional[str] = None,
913
    ) -> tuple[bytes, bytes]:
914
        LOG.debug("Executing command in container %s: %s", container_name_or_id, command)
1✔
915
        try:
1✔
916
            container: Container = self.client().containers.get(container_name_or_id)
1✔
917
            result = container.exec_run(
1✔
918
                cmd=command,
919
                environment=env_vars,
920
                user=user,
921
                detach=detach,
922
                stdin=interactive and bool(stdin),
923
                socket=interactive and bool(stdin),
924
                stdout=True,
925
                stderr=True,
926
                demux=True,
927
                workdir=workdir,
928
            )
929
            tty = False
1✔
930
            if interactive and stdin:  # result is a socket
1✔
931
                sock = result[1]
1✔
932
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
933
                with sock:
1✔
934
                    try:
1✔
935
                        sock.sendall(stdin)
1✔
936
                        sock.shutdown(socket.SHUT_WR)
1✔
937
                        stdout, stderr = self._read_from_sock(sock, tty)
1✔
938
                        return stdout, stderr
1✔
939
                    except socket.timeout:
×
940
                        pass
×
941
            else:
942
                if detach:
1✔
943
                    return b"", b""
×
944
                return_code = result[0]
1✔
945
                if isinstance(result[1], bytes):
1✔
946
                    stdout = result[1]
×
947
                    stderr = b""
×
948
                else:
949
                    stdout, stderr = result[1]
1✔
950
                if return_code != 0:
1✔
951
                    raise ContainerException(
1✔
952
                        f"Exec command returned with exit code {return_code}", stdout, stderr
953
                    )
954
                return stdout, stderr
1✔
955
        except ContainerError:
1✔
956
            raise NoSuchContainer(container_name_or_id)
×
957
        except APIError as e:
1✔
958
            raise ContainerException() from e
1✔
959

960
    def login(self, username: str, password: str, registry: Optional[str] = None) -> None:
1✔
961
        LOG.debug("Docker login for %s", username)
×
962
        try:
×
963
            self.client().login(username, password=password, registry=registry, reauth=True)
×
964
        except APIError as e:
×
965
            raise ContainerException() from e
×
966

967

968
# apply patches required for podman API compatibility
969

970

971
@property
1✔
972
def _container_image(self):
1✔
973
    image_id = self.attrs.get("ImageID", self.attrs["Image"])
1✔
974
    if image_id is None:
1✔
975
        return None
×
976
    image_ref = image_id
1✔
977
    # Fix for podman API response: Docker returns "sha:..." for `Image`, podman returns "<image-name>:<tag>".
978
    # See https://github.com/containers/podman/issues/8329 . Without this check, the Docker client would
979
    # blindly strip off the suffix after the colon `:` (which is the `<tag>` in podman's case) which would
980
    # then lead to "no such image" errors.
981
    if re.match("sha256:[0-9a-f]{64}", image_id, flags=re.IGNORECASE):
1✔
982
        image_ref = image_id.split(":")[1]
1✔
983
    return self.client.images.get(image_ref)
1✔
984

985

986
Container.image = _container_image
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc