• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20571191684

29 Dec 2025 10:56AM UTC coverage: 84.082%. First build
20571191684

Pull #13569

github

web-flow
Merge 3e3c76e15 into 225bb3465
Pull Request #13569: Allow authenticated pull of docker images

18 of 26 new or added lines in 4 files covered. (69.23%)

67170 of 79886 relevant lines covered (84.08%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.19
/localstack-core/localstack/utils/container_utils/docker_sdk_client.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import queue
1✔
6
import re
1✔
7
import socket
1✔
8
import threading
1✔
9
from collections.abc import Callable
1✔
10
from functools import cache
1✔
11
from time import sleep
1✔
12
from typing import cast
1✔
13
from urllib.parse import quote
1✔
14

15
import docker
1✔
16
from docker import DockerClient
1✔
17
from docker.errors import APIError, ContainerError, DockerException, ImageNotFound, NotFound
1✔
18
from docker.models.containers import Container
1✔
19
from docker.types import LogConfig as DockerLogConfig
1✔
20
from docker.utils.socket import STDERR, STDOUT, frames_iter
1✔
21

22
from localstack.config import LS_LOG
1✔
23
from localstack.constants import TRACE_LOG_LEVELS
1✔
24
from localstack.utils.collections import ensure_list
1✔
25
from localstack.utils.container_utils.container_client import (
1✔
26
    AccessDenied,
27
    CancellableStream,
28
    ContainerClient,
29
    ContainerException,
30
    DockerContainerStats,
31
    DockerContainerStatus,
32
    DockerNotAvailable,
33
    DockerPlatform,
34
    LogConfig,
35
    NoSuchContainer,
36
    NoSuchImage,
37
    NoSuchNetwork,
38
    PortMappings,
39
    RegistryConnectionError,
40
    SimpleVolumeBind,
41
    Ulimit,
42
    Util,
43
)
44
from localstack.utils.strings import to_bytes, to_str
1✔
45
from localstack.utils.threads import start_worker_thread
1✔
46

47
LOG = logging.getLogger(__name__)
1✔
48
SDK_ISDIR = 1 << 31
1✔
49

50

51
class SdkDockerClient(ContainerClient):
1✔
52
    """
53
    Class for managing Docker (or Podman) using the Python Docker SDK.
54

55
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
56
    for Docker these days (with ongoing efforts to further streamline the two), and the Docker SDK
57
    is doing some of the heavy lifting for us to support both target platforms.
58
    """
59

60
    docker_client: DockerClient | None
1✔
61

62
    def __init__(self):
1✔
63
        try:
1✔
64
            self.docker_client = self._create_client()
1✔
65
            logging.getLogger("urllib3").setLevel(logging.INFO)
1✔
66
        except DockerNotAvailable:
1✔
67
            self.docker_client = None
1✔
68

69
    def client(self):
1✔
70
        if self.docker_client:
1✔
71
            return self.docker_client
1✔
72
        # if the initialization failed before, try to initialize on-demand
73
        self.docker_client = self._create_client()
1✔
74
        return self.docker_client
1✔
75

76
    @staticmethod
1✔
77
    def _create_client():
1✔
78
        from localstack.config import DOCKER_SDK_DEFAULT_RETRIES, DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS
1✔
79

80
        for attempt in range(0, DOCKER_SDK_DEFAULT_RETRIES + 1):
1✔
81
            try:
1✔
82
                return docker.from_env(timeout=DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS)
1✔
83
            except DockerException as e:
1✔
84
                LOG.debug(
1✔
85
                    "Creating Docker SDK client failed: %s. "
86
                    "If you want to use Docker as container runtime, make sure to mount the socket at /var/run/docker.sock",
87
                    e,
88
                    exc_info=LS_LOG in TRACE_LOG_LEVELS,
89
                )
90
                if attempt < DOCKER_SDK_DEFAULT_RETRIES:
1✔
91
                    # wait for a second before retrying
92
                    sleep(1)
1✔
93
                else:
94
                    # we are out of attempts
95
                    raise DockerNotAvailable("Docker not available") from e
1✔
96

97
    def _read_from_sock(self, sock: socket, tty: bool):
1✔
98
        """Reads multiplexed messages from a socket returned by attach_socket.
99

100
        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
101
        """
102
        stdout = b""
1✔
103
        stderr = b""
1✔
104
        for frame_type, frame_data in frames_iter(sock, tty):
1✔
105
            if frame_type == STDOUT:
1✔
106
                stdout += frame_data
1✔
107
            elif frame_type == STDERR:
1✔
108
                stderr += frame_data
1✔
109
            else:
110
                raise ContainerException("Invalid frame type when reading from socket")
×
111
        return stdout, stderr
1✔
112

113
    def _container_path_info(self, container: Container, container_path: str):
1✔
114
        """
115
        Get information about a path in the given container
116
        :param container: Container to be inspected
117
        :param container_path: Path in container
118
        :return: Tuple (path_exists, path_is_directory)
119
        """
120
        # Docker CLI copy uses go FileMode to determine if target is a dict or not
121
        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260
122
        # The isDir Bit is the most significant bit in the 32bit struct:
123
        # https://golang.org/src/os/types.go?s=2650:2683
124
        api_client = self.client().api
1✔
125

126
        def _head(path_suffix, **kwargs):
1✔
127
            return api_client.head(
1✔
128
                api_client.base_url + path_suffix, **api_client._set_request_timeout(kwargs)
129
            )
130

131
        escaped_id = quote(container.id, safe="/:")
1✔
132
        result = _head(f"/containers/{escaped_id}/archive", params={"path": container_path})
1✔
133
        stats = result.headers.get("X-Docker-Container-Path-Stat")
1✔
134
        target_exists = result.ok
1✔
135

136
        if target_exists:
1✔
137
            stats = json.loads(base64.b64decode(stats).decode("utf-8"))
1✔
138
        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)
1✔
139
        return target_exists, target_is_dir
1✔
140

141
    def get_system_info(self) -> dict:
1✔
142
        return self.client().info()
1✔
143

144
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
145
        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose
146
        try:
1✔
147
            container = self.client().containers.get(container_name)
1✔
148
            if container.status == "running":
1✔
149
                return DockerContainerStatus.UP
1✔
150
            elif container.status == "paused":
1✔
151
                return DockerContainerStatus.PAUSED
1✔
152
            else:
153
                return DockerContainerStatus.DOWN
1✔
154
        except NotFound:
1✔
155
            return DockerContainerStatus.NON_EXISTENT
1✔
156
        except APIError as e:
×
157
            raise ContainerException() from e
×
158

159
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
160
        try:
1✔
161
            container = self.client().containers.get(container_name)
1✔
162
            sdk_stats = container.stats(stream=False)
1✔
163

164
            # BlockIO: (Read, Write) bytes
165
            read_bytes = 0
1✔
166
            write_bytes = 0
1✔
167
            for entry in (
1✔
168
                sdk_stats.get("blkio_stats", {}).get("io_service_bytes_recursive", []) or []
169
            ):
170
                if entry.get("op") == "read":
×
171
                    read_bytes += entry.get("value", 0)
×
172
                elif entry.get("op") == "write":
×
173
                    write_bytes += entry.get("value", 0)
×
174

175
            # CPU percentage
176
            cpu_stats = sdk_stats.get("cpu_stats", {})
1✔
177
            precpu_stats = sdk_stats.get("precpu_stats", {})
1✔
178

179
            cpu_delta = cpu_stats.get("cpu_usage", {}).get("total_usage", 0) - precpu_stats.get(
1✔
180
                "cpu_usage", {}
181
            ).get("total_usage", 0)
182

183
            system_delta = cpu_stats.get("system_cpu_usage", 0) - precpu_stats.get(
1✔
184
                "system_cpu_usage", 0
185
            )
186

187
            online_cpus = cpu_stats.get("online_cpus", 1)
1✔
188
            cpu_percent = (
1✔
189
                (cpu_delta / system_delta * 100.0 * online_cpus) if system_delta > 0 else 0.0
190
            )
191

192
            # Memory (usage, limit) bytes
193
            memory_stats = sdk_stats.get("memory_stats", {})
1✔
194
            mem_usage = memory_stats.get("usage", 0)
1✔
195
            mem_limit = memory_stats.get("limit", 1)  # Prevent division by zero
1✔
196
            mem_inactive = memory_stats.get("stats", {}).get("inactive_file", 0)
1✔
197
            used_memory = max(0, mem_usage - mem_inactive)
1✔
198
            mem_percent = (used_memory / mem_limit * 100.0) if mem_limit else 0.0
1✔
199

200
            # Network IO
201
            net_rx = 0
1✔
202
            net_tx = 0
1✔
203
            for iface in sdk_stats.get("networks", {}).values():
1✔
204
                net_rx += iface.get("rx_bytes", 0)
1✔
205
                net_tx += iface.get("tx_bytes", 0)
1✔
206

207
            # Container ID
208
            container_id = sdk_stats.get("id", "")[:12]
1✔
209
            name = sdk_stats.get("name", "").lstrip("/")
1✔
210

211
            return DockerContainerStats(
1✔
212
                Container=container_id,
213
                ID=container_id,
214
                Name=name,
215
                BlockIO=(read_bytes, write_bytes),
216
                CPUPerc=round(cpu_percent, 2),
217
                MemPerc=round(mem_percent, 2),
218
                MemUsage=(used_memory, mem_limit),
219
                NetIO=(net_rx, net_tx),
220
                PIDs=sdk_stats.get("pids_stats", {}).get("current", 0),
221
                SDKStats=sdk_stats,  # keep the raw stats for more detailed information
222
            )
223
        except NotFound:
×
224
            raise NoSuchContainer(container_name)
×
225
        except APIError as e:
×
226
            raise ContainerException() from e
×
227

228
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
229
        LOG.debug("Stopping container: %s", container_name)
1✔
230
        try:
1✔
231
            container = self.client().containers.get(container_name)
1✔
232
            container.stop(timeout=timeout)
1✔
233
        except NotFound:
1✔
234
            raise NoSuchContainer(container_name)
1✔
235
        except APIError as e:
×
236
            raise ContainerException() from e
×
237

238
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
239
        LOG.debug("Restarting container: %s", container_name)
1✔
240
        try:
1✔
241
            container = self.client().containers.get(container_name)
1✔
242
            container.restart(timeout=timeout)
1✔
243
        except NotFound:
1✔
244
            raise NoSuchContainer(container_name)
1✔
245
        except APIError as e:
×
246
            raise ContainerException() from e
×
247

248
    def pause_container(self, container_name: str) -> None:
1✔
249
        LOG.debug("Pausing container: %s", container_name)
1✔
250
        try:
1✔
251
            container = self.client().containers.get(container_name)
1✔
252
            container.pause()
1✔
253
        except NotFound:
1✔
254
            raise NoSuchContainer(container_name)
1✔
255
        except APIError as e:
×
256
            raise ContainerException() from e
×
257

258
    def unpause_container(self, container_name: str) -> None:
1✔
259
        LOG.debug("Unpausing container: %s", container_name)
1✔
260
        try:
1✔
261
            container = self.client().containers.get(container_name)
1✔
262
            container.unpause()
1✔
263
        except NotFound:
×
264
            raise NoSuchContainer(container_name)
×
265
        except APIError as e:
×
266
            raise ContainerException() from e
×
267

268
    def remove_container(
1✔
269
        self, container_name: str, force=True, check_existence=False, volumes=False
270
    ) -> None:
271
        LOG.debug("Removing container: %s, with volumes: %s", container_name, volumes)
1✔
272
        if check_existence and container_name not in self.get_all_container_names():
1✔
273
            LOG.debug("Aborting removing due to check_existence check")
×
274
            return
×
275
        try:
1✔
276
            container = self.client().containers.get(container_name)
1✔
277
            container.remove(force=force, v=volumes)
1✔
278
        except NotFound:
1✔
279
            if not force:
1✔
280
                raise NoSuchContainer(container_name)
1✔
281
        except APIError as e:
×
282
            raise ContainerException() from e
×
283

284
    def list_containers(self, filter: list[str] | str | None = None, all=True) -> list[dict]:
1✔
285
        if filter:
1✔
286
            filter = [filter] if isinstance(filter, str) else filter
1✔
287
            filter = dict([f.split("=", 1) for f in filter])
1✔
288
        LOG.debug("Listing containers with filters: %s", filter)
1✔
289
        try:
1✔
290
            container_list = self.client().containers.list(filters=filter, all=all)
1✔
291
            result = []
1✔
292
            for container in container_list:
1✔
293
                try:
1✔
294
                    result.append(
1✔
295
                        {
296
                            "id": container.id,
297
                            "image": container.image,
298
                            "name": container.name,
299
                            "status": container.status,
300
                            "labels": container.labels,
301
                        }
302
                    )
303
                except Exception as e:
×
304
                    LOG.error("Error checking container %s: %s", container, e)
×
305
            return result
1✔
306
        except APIError as e:
1✔
307
            raise ContainerException() from e
1✔
308

309
    def copy_into_container(
1✔
310
        self, container_name: str, local_path: str, container_path: str
311
    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/
312
        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)
1✔
313
        try:
1✔
314
            container = self.client().containers.get(container_name)
1✔
315
            target_exists, target_isdir = self._container_path_info(container, container_path)
1✔
316
            target_path = container_path if target_isdir else os.path.dirname(container_path)
1✔
317
            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:
1✔
318
                container.put_archive(target_path, tar)
1✔
319
        except NotFound:
1✔
320
            raise NoSuchContainer(container_name)
1✔
321
        except APIError as e:
1✔
322
            raise ContainerException() from e
×
323

324
    def copy_from_container(
1✔
325
        self,
326
        container_name: str,
327
        local_path: str,
328
        container_path: str,
329
    ) -> None:
330
        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)
1✔
331
        try:
1✔
332
            container = self.client().containers.get(container_name)
1✔
333
            bits, _ = container.get_archive(container_path)
1✔
334
            Util.untar_to_path(bits, local_path)
1✔
335
        except NotFound:
1✔
336
            raise NoSuchContainer(container_name)
1✔
337
        except APIError as e:
×
338
            raise ContainerException() from e
×
339

340
    def pull_image(
1✔
341
        self,
342
        docker_image: str,
343
        platform: DockerPlatform | None = None,
344
        log_handler: Callable[[str], None] | None = None,
345
        auth_config: dict[str, str] | None = None,
346
    ) -> None:
347
        LOG.debug("Pulling Docker image: %s", docker_image)
1✔
348
        # some path in the docker image string indicates a custom repository
349

350
        docker_image = self.registry_resolver_strategy.resolve(docker_image)
1✔
351
        kwargs: dict[str, str | bool | dict[str, str]] = {"platform": platform}
1✔
352
        if auth_config:
1✔
NEW
353
            kwargs["auth_config"] = auth_config
×
354
        try:
1✔
355
            if log_handler:
1✔
356
                # Use a lower-level API, as the 'stream' argument is not available in the higher-level `pull`-API
357
                kwargs["stream"] = True
1✔
358
                stream = self.client().api.pull(docker_image, **kwargs)
1✔
359
                for line in stream:
1✔
360
                    log_handler(to_str(line))
1✔
361
            else:
362
                self.client().images.pull(docker_image, **kwargs)
1✔
363
        except ImageNotFound:
1✔
364
            raise NoSuchImage(docker_image)
1✔
365
        except APIError as e:
×
366
            raise ContainerException() from e
×
367

368
    def push_image(self, docker_image: str) -> None:
1✔
369
        LOG.debug("Pushing Docker image: %s", docker_image)
1✔
370
        try:
1✔
371
            result = self.client().images.push(docker_image)
1✔
372
            # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising
373
            if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):
1✔
374
                if "image does not exist locally" in to_str(result):
1✔
375
                    raise NoSuchImage(docker_image)
1✔
376
                if "is denied" in to_str(result):
1✔
377
                    raise AccessDenied(docker_image)
1✔
378
                if "requesting higher privileges than access token allows" in to_str(result):
1✔
379
                    raise AccessDenied(docker_image)
×
380
                if "access token has insufficient scopes" in to_str(result):
1✔
381
                    raise AccessDenied(docker_image)
×
382
                if "connection refused" in to_str(result):
1✔
383
                    raise RegistryConnectionError(result)
1✔
384
                raise ContainerException(result)
×
385
        except ImageNotFound:
1✔
386
            raise NoSuchImage(docker_image)
×
387
        except APIError as e:
1✔
388
            # note: error message 'image not known' raised by Podman API
389
            if "image not known" in str(e):
×
390
                raise NoSuchImage(docker_image)
×
391
            raise ContainerException() from e
×
392

393
    def build_image(
1✔
394
        self,
395
        dockerfile_path: str,
396
        image_name: str,
397
        context_path: str = None,
398
        platform: DockerPlatform | None = None,
399
    ):
400
        try:
1✔
401
            dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
1✔
402
            context_path = context_path or os.path.dirname(dockerfile_path)
1✔
403
            LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)
1✔
404
            _, logs_iterator = self.client().images.build(
1✔
405
                path=context_path,
406
                dockerfile=dockerfile_path,
407
                tag=image_name,
408
                rm=True,
409
                platform=platform,
410
            )
411
            # logs_iterator is a stream of dicts. Example content:
412
            # {'stream': 'Step 1/4 : FROM alpine'}
413
            # ... other build steps
414
            # {'aux': {'ID': 'sha256:4dcf90e87fb963e898f9c7a0451a40e36f8e7137454c65ae4561277081747825'}}
415
            # {'stream': 'Successfully tagged img-5201f3e1:latest\n'}
416
            output = ""
1✔
417
            for log in logs_iterator:
1✔
418
                if isinstance(log, dict) and ("stream" in log or "error" in log):
1✔
419
                    output += log.get("stream") or log["error"]
1✔
420
            return output
1✔
421
        except APIError as e:
×
422
            raise ContainerException("Unable to build Docker image") from e
×
423

424
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
425
        try:
1✔
426
            LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)
1✔
427
            image = self.client().images.get(source_ref)
1✔
428
            image.tag(target_name)
1✔
429
        except APIError as e:
1✔
430
            if e.status_code == 404:
1✔
431
                raise NoSuchImage(source_ref)
1✔
432
            raise ContainerException("Unable to tag Docker image") from e
×
433

434
    def get_docker_image_names(
1✔
435
        self,
436
        strip_latest: bool = True,
437
        include_tags: bool = True,
438
        strip_wellknown_repo_prefixes: bool = True,
439
    ):
440
        try:
1✔
441
            images = self.client().images.list()
1✔
442
            image_names = [tag for image in images for tag in image.tags if image.tags]
1✔
443
            if not include_tags:
1✔
444
                image_names = [image_name.rpartition(":")[0] for image_name in image_names]
1✔
445
            if strip_wellknown_repo_prefixes:
1✔
446
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
1✔
447
            if strip_latest:
1✔
448
                Util.append_without_latest(image_names)
1✔
449
            return image_names
1✔
450
        except APIError as e:
×
451
            raise ContainerException() from e
×
452

453
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
454
        try:
1✔
455
            container = self.client().containers.get(container_name_or_id)
1✔
456
            return to_str(container.logs())
1✔
457
        except NotFound:
1✔
458
            if safe:
1✔
459
                return ""
1✔
460
            raise NoSuchContainer(container_name_or_id)
1✔
461
        except APIError as e:
1✔
462
            if safe:
1✔
463
                return ""
×
464
            raise ContainerException() from e
1✔
465

466
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
467
        try:
1✔
468
            container = self.client().containers.get(container_name_or_id)
1✔
469
            return container.logs(stream=True, follow=True)
1✔
470
        except NotFound:
1✔
471
            raise NoSuchContainer(container_name_or_id)
1✔
472
        except APIError as e:
×
473
            raise ContainerException() from e
×
474

475
    def inspect_container(self, container_name_or_id: str) -> dict[str, dict | str]:
1✔
476
        try:
1✔
477
            return self.client().containers.get(container_name_or_id).attrs
1✔
478
        except NotFound:
1✔
479
            raise NoSuchContainer(container_name_or_id)
1✔
480
        except APIError as e:
×
481
            raise ContainerException() from e
×
482

483
    def inspect_image(
1✔
484
        self,
485
        image_name: str,
486
        pull: bool = True,
487
        strip_wellknown_repo_prefixes: bool = True,
488
    ) -> dict[str, dict | list | str]:
489
        image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
490
        try:
1✔
491
            result = self.client().images.get(image_name).attrs
1✔
492
            if strip_wellknown_repo_prefixes:
1✔
493
                if result.get("RepoDigests"):
1✔
494
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
1✔
495
                        result["RepoDigests"]
496
                    )
497
                if result.get("RepoTags"):
1✔
498
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
1✔
499
            return result
1✔
500
        except NotFound:
1✔
501
            if pull:
1✔
502
                self.pull_image(image_name)
1✔
503
                return self.inspect_image(image_name, pull=False)
1✔
504
            raise NoSuchImage(image_name)
1✔
505
        except APIError as e:
×
506
            raise ContainerException() from e
×
507

508
    def create_network(self, network_name: str) -> None:
1✔
509
        try:
1✔
510
            return self.client().networks.create(name=network_name).id
1✔
511
        except APIError as e:
×
512
            raise ContainerException() from e
×
513

514
    def delete_network(self, network_name: str) -> None:
1✔
515
        try:
1✔
516
            return self.client().networks.get(network_name).remove()
1✔
517
        except NotFound:
×
518
            raise NoSuchNetwork(network_name)
×
519
        except APIError as e:
×
520
            raise ContainerException() from e
×
521

522
    def inspect_network(self, network_name: str) -> dict[str, dict | str]:
1✔
523
        try:
1✔
524
            return self.client().networks.get(network_name).attrs
1✔
525
        except NotFound:
1✔
526
            raise NoSuchNetwork(network_name)
1✔
527
        except APIError as e:
×
528
            raise ContainerException() from e
×
529

530
    def connect_container_to_network(
1✔
531
        self,
532
        network_name: str,
533
        container_name_or_id: str,
534
        aliases: list | None = None,
535
        link_local_ips: list[str] = None,
536
    ) -> None:
537
        LOG.debug(
1✔
538
            "Connecting container '%s' to network '%s' with aliases '%s'",
539
            container_name_or_id,
540
            network_name,
541
            aliases,
542
        )
543
        try:
1✔
544
            network = self.client().networks.get(network_name)
1✔
545
        except NotFound:
1✔
546
            raise NoSuchNetwork(network_name)
1✔
547
        try:
1✔
548
            network.connect(
1✔
549
                container=container_name_or_id,
550
                aliases=aliases,
551
                link_local_ips=link_local_ips,
552
            )
553
        except NotFound:
1✔
554
            raise NoSuchContainer(container_name_or_id)
1✔
555
        except APIError as e:
×
556
            raise ContainerException() from e
×
557

558
    def disconnect_container_from_network(
1✔
559
        self, network_name: str, container_name_or_id: str
560
    ) -> None:
561
        LOG.debug(
1✔
562
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
563
        )
564
        try:
1✔
565
            try:
1✔
566
                network = self.client().networks.get(network_name)
1✔
567
            except NotFound:
1✔
568
                raise NoSuchNetwork(network_name)
1✔
569
            try:
1✔
570
                network.disconnect(container_name_or_id)
1✔
571
            except NotFound:
1✔
572
                raise NoSuchContainer(container_name_or_id)
1✔
573
        except APIError as e:
1✔
574
            raise ContainerException() from e
×
575

576
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
577
        networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]
1✔
578
        network_names = list(networks)
1✔
579
        if len(network_names) > 1:
1✔
580
            LOG.info("Container has more than one assigned network. Picking the first one...")
×
581
        return networks[network_names[0]]["IPAddress"]
1✔
582

583
    @cache
1✔
584
    def has_docker(self) -> bool:
1✔
585
        try:
1✔
586
            if not self.docker_client:
1✔
587
                return False
×
588
            self.client().ping()
1✔
589
            return True
1✔
590
        except APIError:
×
591
            return False
×
592

593
    def remove_image(self, image: str, force: bool = True):
1✔
594
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
1✔
595
        try:
1✔
596
            self.client().images.remove(image=image, force=force)
1✔
597
        except ImageNotFound:
1✔
598
            if not force:
1✔
599
                raise NoSuchImage(image)
1✔
600
        except APIError as e:
×
601
            if "image not known" in str(e):
×
602
                raise NoSuchImage(image)
×
603
            raise ContainerException() from e
×
604

605
    def commit(
1✔
606
        self,
607
        container_name_or_id: str,
608
        image_name: str,
609
        image_tag: str,
610
    ):
611
        LOG.debug(
1✔
612
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
613
        )
614
        try:
1✔
615
            container = self.client().containers.get(container_name_or_id)
1✔
616
            container.commit(repository=image_name, tag=image_tag)
1✔
617
        except NotFound:
1✔
618
            raise NoSuchContainer(container_name_or_id)
1✔
619
        except APIError as e:
×
620
            raise ContainerException() from e
×
621

622
    def start_container(
1✔
623
        self,
624
        container_name_or_id: str,
625
        stdin=None,
626
        interactive: bool = False,
627
        attach: bool = False,
628
        flags: str | None = None,
629
    ) -> tuple[bytes, bytes]:
630
        LOG.debug("Starting container %s", container_name_or_id)
1✔
631
        try:
1✔
632
            container = self.client().containers.get(container_name_or_id)
1✔
633
            stdout = to_bytes(container_name_or_id)
1✔
634
            stderr = b""
1✔
635
            if interactive or attach:
1✔
636
                params = {"stdout": 1, "stderr": 1, "stream": 1}
1✔
637
                if interactive:
1✔
638
                    params["stdin"] = 1
1✔
639
                sock = container.attach_socket(params=params)
1✔
640
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
641
                result_queue = queue.Queue()
1✔
642
                thread_started = threading.Event()
1✔
643
                start_waiting = threading.Event()
1✔
644

645
                # Note: We need to be careful about potential race conditions here - .wait() should happen right
646
                #   after .start(). Hence starting a thread and asynchronously waiting for the container exit code
647
                def wait_for_result(*_):
1✔
648
                    _exit_code = -1
1✔
649
                    try:
1✔
650
                        thread_started.set()
1✔
651
                        start_waiting.wait()
1✔
652
                        _exit_code = container.wait()["StatusCode"]
1✔
653
                    except APIError as e:
×
654
                        _exit_code = 1
×
655
                        raise ContainerException(str(e))
×
656
                    finally:
657
                        result_queue.put(_exit_code)
1✔
658

659
                # start listener thread
660
                start_worker_thread(wait_for_result)
1✔
661
                thread_started.wait()
1✔
662
                try:
1✔
663
                    # start container
664
                    container.start()
1✔
665
                finally:
666
                    # start awaiting container result
667
                    start_waiting.set()
1✔
668

669
                # handle container input/output
670
                # under windows, the socket has no __enter__ / cannot be used as context manager
671
                # therefore try/finally instead of with here
672
                try:
1✔
673
                    if stdin:
1✔
674
                        sock.sendall(to_bytes(stdin))
1✔
675
                        sock.shutdown(socket.SHUT_WR)
1✔
676
                    stdout, stderr = self._read_from_sock(sock, False)
1✔
677
                except TimeoutError:
×
678
                    LOG.debug(
×
679
                        "Socket timeout when talking to the I/O streams of Docker container '%s'",
680
                        container_name_or_id,
681
                    )
682
                finally:
683
                    sock.close()
1✔
684

685
                # get container exit code
686
                exit_code = result_queue.get()
1✔
687
                if exit_code:
1✔
688
                    raise ContainerException(
1✔
689
                        f"Docker container returned with exit code {exit_code}",
690
                        stdout=stdout,
691
                        stderr=stderr,
692
                    )
693
            else:
694
                container.start()
1✔
695
            return stdout, stderr
1✔
696
        except NotFound:
1✔
697
            raise NoSuchContainer(container_name_or_id)
1✔
698
        except APIError as e:
1✔
699
            raise ContainerException() from e
1✔
700

701
    def attach_to_container(self, container_name_or_id: str):
1✔
702
        client: DockerClient = self.client()
×
703
        container = cast(Container, client.containers.get(container_name_or_id))
×
704
        container.attach()
×
705

706
    def create_container(
1✔
707
        self,
708
        image_name: str,
709
        *,
710
        name: str | None = None,
711
        entrypoint: list[str] | str | None = None,
712
        remove: bool = False,
713
        interactive: bool = False,
714
        tty: bool = False,
715
        detach: bool = False,
716
        command: list[str] | str | None = None,
717
        volumes: list[SimpleVolumeBind] | None = None,
718
        ports: PortMappings | None = None,
719
        exposed_ports: list[str] | None = None,
720
        env_vars: dict[str, str] | None = None,
721
        user: str | None = None,
722
        cap_add: list[str] | None = None,
723
        cap_drop: list[str] | None = None,
724
        security_opt: list[str] | None = None,
725
        network: str | None = None,
726
        dns: str | list[str] | None = None,
727
        additional_flags: str | None = None,
728
        workdir: str | None = None,
729
        privileged: bool | None = None,
730
        labels: dict[str, str] | None = None,
731
        platform: DockerPlatform | None = None,
732
        ulimits: list[Ulimit] | None = None,
733
        init: bool | None = None,
734
        log_config: LogConfig | None = None,
735
        cpu_shares: int | None = None,
736
        mem_limit: int | str | None = None,
737
        auth_config: dict[str, str] | None = None,
738
    ) -> str:
739
        LOG.debug("Creating container with attributes: %s", locals())
1✔
740
        extra_hosts = None
1✔
741
        if additional_flags:
1✔
742
            parsed_flags = Util.parse_additional_flags(
1✔
743
                additional_flags,
744
                env_vars=env_vars,
745
                volumes=volumes,
746
                network=network,
747
                platform=platform,
748
                privileged=privileged,
749
                ports=ports,
750
                ulimits=ulimits,
751
                user=user,
752
                dns=dns,
753
            )
754
            env_vars = parsed_flags.env_vars
1✔
755
            extra_hosts = parsed_flags.extra_hosts
1✔
756
            volumes = parsed_flags.volumes
1✔
757
            labels = parsed_flags.labels
1✔
758
            network = parsed_flags.network
1✔
759
            platform = parsed_flags.platform
1✔
760
            privileged = parsed_flags.privileged
1✔
761
            ports = parsed_flags.ports
1✔
762
            ulimits = parsed_flags.ulimits
1✔
763
            user = parsed_flags.user
1✔
764
            dns = parsed_flags.dns
1✔
765

766
        try:
1✔
767
            kwargs = {}
1✔
768
            if cap_add:
1✔
769
                kwargs["cap_add"] = cap_add
1✔
770
            if cap_drop:
1✔
771
                kwargs["cap_drop"] = cap_drop
1✔
772
            if security_opt:
1✔
773
                kwargs["security_opt"] = security_opt
1✔
774
            if dns:
1✔
775
                kwargs["dns"] = ensure_list(dns)
1✔
776
            if exposed_ports:
1✔
777
                # This is not exactly identical to --expose, as they are listed in the "HostConfig" on docker inspect
778
                # but the behavior should be identical
779
                kwargs["ports"] = {port: [] for port in exposed_ports}
×
780
            if ports:
1✔
781
                kwargs.setdefault("ports", {})
1✔
782
                kwargs["ports"].update(ports.to_dict())
1✔
783
            if workdir:
1✔
784
                kwargs["working_dir"] = workdir
1✔
785
            if privileged:
1✔
786
                kwargs["privileged"] = True
×
787
            if init:
1✔
788
                kwargs["init"] = True
1✔
789
            if labels:
1✔
790
                kwargs["labels"] = labels
1✔
791
            if log_config:
1✔
792
                kwargs["log_config"] = DockerLogConfig(
1✔
793
                    type=log_config.type, config=log_config.config
794
                )
795
            if ulimits:
1✔
796
                kwargs["ulimits"] = [
1✔
797
                    docker.types.Ulimit(
798
                        name=ulimit.name, soft=ulimit.soft_limit, hard=ulimit.hard_limit
799
                    )
800
                    for ulimit in ulimits
801
                ]
802
            if cpu_shares:
1✔
803
                kwargs["cpu_shares"] = cpu_shares
1✔
804
            if mem_limit:
1✔
805
                kwargs["mem_limit"] = mem_limit
1✔
806
            mounts = None
1✔
807
            if volumes:
1✔
808
                mounts = Util.convert_mount_list_to_dict(volumes)
1✔
809

810
            image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
811

812
            def create_container():
1✔
813
                return self.client().containers.create(
1✔
814
                    image=image_name,
815
                    command=command,
816
                    auto_remove=remove,
817
                    name=name,
818
                    stdin_open=interactive,
819
                    tty=tty,
820
                    entrypoint=entrypoint,
821
                    environment=env_vars,
822
                    detach=detach,
823
                    user=user,
824
                    network=network,
825
                    volumes=mounts,
826
                    extra_hosts=extra_hosts,
827
                    platform=platform,
828
                    **kwargs,
829
                )
830

831
            try:
1✔
832
                container = create_container()
1✔
833
            except ImageNotFound:
1✔
834
                LOG.debug("Image not found. Pulling image %s", image_name)
1✔
835
                self.pull_image(image_name, platform, auth_config=auth_config)
1✔
836
                container = create_container()
1✔
837
            return container.id
1✔
838
        except ImageNotFound:
1✔
839
            raise NoSuchImage(image_name)
×
840
        except APIError as e:
1✔
841
            raise ContainerException() from e
×
842

843
    def run_container(
1✔
844
        self,
845
        image_name: str,
846
        stdin=None,
847
        *,
848
        name: str | None = None,
849
        entrypoint: str | None = None,
850
        remove: bool = False,
851
        interactive: bool = False,
852
        tty: bool = False,
853
        detach: bool = False,
854
        command: list[str] | str | None = None,
855
        volumes: list[SimpleVolumeBind] | None = None,
856
        ports: PortMappings | None = None,
857
        exposed_ports: list[str] | None = None,
858
        env_vars: dict[str, str] | None = None,
859
        user: str | None = None,
860
        cap_add: list[str] | None = None,
861
        cap_drop: list[str] | None = None,
862
        security_opt: list[str] | None = None,
863
        network: str | None = None,
864
        dns: str | None = None,
865
        additional_flags: str | None = None,
866
        workdir: str | None = None,
867
        labels: dict[str, str] | None = None,
868
        platform: DockerPlatform | None = None,
869
        privileged: bool | None = None,
870
        ulimits: list[Ulimit] | None = None,
871
        init: bool | None = None,
872
        log_config: LogConfig | None = None,
873
        cpu_shares: int | None = None,
874
        mem_limit: int | str | None = None,
875
    ) -> tuple[bytes, bytes]:
876
        LOG.debug("Running container with image: %s", image_name)
1✔
877
        container = None
1✔
878
        try:
1✔
879
            container = self.create_container(
1✔
880
                image_name,
881
                name=name,
882
                entrypoint=entrypoint,
883
                interactive=interactive,
884
                tty=tty,
885
                detach=detach,
886
                remove=remove and detach,
887
                command=command,
888
                volumes=volumes,
889
                ports=ports,
890
                exposed_ports=exposed_ports,
891
                env_vars=env_vars,
892
                user=user,
893
                cap_add=cap_add,
894
                cap_drop=cap_drop,
895
                security_opt=security_opt,
896
                network=network,
897
                dns=dns,
898
                additional_flags=additional_flags,
899
                workdir=workdir,
900
                privileged=privileged,
901
                platform=platform,
902
                init=init,
903
                labels=labels,
904
                ulimits=ulimits,
905
                log_config=log_config,
906
                cpu_shares=cpu_shares,
907
                mem_limit=mem_limit,
908
            )
909
            result = self.start_container(
1✔
910
                container_name_or_id=container,
911
                stdin=stdin,
912
                interactive=interactive,
913
                attach=not detach,
914
            )
915
        finally:
916
            if remove and container and not detach:
1✔
917
                self.remove_container(container)
1✔
918
        return result
1✔
919

920
    def exec_in_container(
1✔
921
        self,
922
        container_name_or_id: str,
923
        command: list[str] | str,
924
        interactive=False,
925
        detach=False,
926
        env_vars: dict[str, str | None] | None = None,
927
        stdin: bytes | None = None,
928
        user: str | None = None,
929
        workdir: str | None = None,
930
    ) -> tuple[bytes, bytes]:
931
        LOG.debug("Executing command in container %s: %s", container_name_or_id, command)
1✔
932
        try:
1✔
933
            container: Container = self.client().containers.get(container_name_or_id)
1✔
934
            result = container.exec_run(
1✔
935
                cmd=command,
936
                environment=env_vars,
937
                user=user,
938
                detach=detach,
939
                stdin=interactive and bool(stdin),
940
                socket=interactive and bool(stdin),
941
                stdout=True,
942
                stderr=True,
943
                demux=True,
944
                workdir=workdir,
945
            )
946
            tty = False
1✔
947
            if interactive and stdin:  # result is a socket
1✔
948
                sock = result[1]
1✔
949
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
950
                with sock:
1✔
951
                    try:
1✔
952
                        sock.sendall(stdin)
1✔
953
                        sock.shutdown(socket.SHUT_WR)
1✔
954
                        stdout, stderr = self._read_from_sock(sock, tty)
1✔
955
                        return stdout, stderr
1✔
956
                    except TimeoutError:
×
957
                        pass
×
958
            else:
959
                if detach:
1✔
960
                    return b"", b""
×
961
                return_code = result[0]
1✔
962
                if isinstance(result[1], bytes):
1✔
963
                    stdout = result[1]
×
964
                    stderr = b""
×
965
                else:
966
                    stdout, stderr = result[1]
1✔
967
                if return_code != 0:
1✔
968
                    raise ContainerException(
1✔
969
                        f"Exec command returned with exit code {return_code}", stdout, stderr
970
                    )
971
                return stdout, stderr
1✔
972
        except ContainerError:
1✔
973
            raise NoSuchContainer(container_name_or_id)
×
974
        except APIError as e:
1✔
975
            raise ContainerException() from e
1✔
976

977
    def login(self, username: str, password: str, registry: str | None = None) -> None:
1✔
978
        LOG.debug("Docker login for %s", username)
×
979
        try:
×
980
            self.client().login(username, password=password, registry=registry, reauth=True)
×
981
        except APIError as e:
×
982
            raise ContainerException() from e
×
983

984

985
# apply patches required for podman API compatibility
986

987

988
@property
1✔
989
def _container_image(self):
1✔
990
    image_id = self.attrs.get("ImageID", self.attrs["Image"])
1✔
991
    if image_id is None:
1✔
992
        return None
×
993
    image_ref = image_id
1✔
994
    # Fix for podman API response: Docker returns "sha:..." for `Image`, podman returns "<image-name>:<tag>".
995
    # See https://github.com/containers/podman/issues/8329 . Without this check, the Docker client would
996
    # blindly strip off the suffix after the colon `:` (which is the `<tag>` in podman's case) which would
997
    # then lead to "no such image" errors.
998
    if re.match("sha256:[0-9a-f]{64}", image_id, flags=re.IGNORECASE):
1✔
999
        image_ref = image_id.split(":")[1]
1✔
1000
    return self.client.images.get(image_ref)
1✔
1001

1002

1003
Container.image = _container_image
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc