• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20420150350

19 Dec 2025 10:27AM UTC coverage: 86.92% (+0.007%) from 86.913%
20420150350

push

github

web-flow
Fix Lambda CI log pollution issues (#13546)

2 of 4 new or added lines in 1 file covered. (50.0%)

75 existing lines in 6 files now uncovered.

70016 of 80552 relevant lines covered (86.92%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.3
/localstack-core/localstack/utils/container_utils/docker_sdk_client.py
1
import base64
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import queue
1✔
6
import re
1✔
7
import socket
1✔
8
import threading
1✔
9
from collections.abc import Callable
1✔
10
from functools import cache
1✔
11
from time import sleep
1✔
12
from typing import cast
1✔
13
from urllib.parse import quote
1✔
14

15
import docker
1✔
16
from docker import DockerClient
1✔
17
from docker.errors import APIError, ContainerError, DockerException, ImageNotFound, NotFound
1✔
18
from docker.models.containers import Container
1✔
19
from docker.types import LogConfig as DockerLogConfig
1✔
20
from docker.utils.socket import STDERR, STDOUT, frames_iter
1✔
21

22
from localstack.config import LS_LOG
1✔
23
from localstack.constants import TRACE_LOG_LEVELS
1✔
24
from localstack.utils.collections import ensure_list
1✔
25
from localstack.utils.container_utils.container_client import (
1✔
26
    AccessDenied,
27
    CancellableStream,
28
    ContainerClient,
29
    ContainerException,
30
    DockerContainerStats,
31
    DockerContainerStatus,
32
    DockerNotAvailable,
33
    DockerPlatform,
34
    LogConfig,
35
    NoSuchContainer,
36
    NoSuchImage,
37
    NoSuchNetwork,
38
    PortMappings,
39
    RegistryConnectionError,
40
    SimpleVolumeBind,
41
    Ulimit,
42
    Util,
43
)
44
from localstack.utils.strings import to_bytes, to_str
1✔
45
from localstack.utils.threads import start_worker_thread
1✔
46

47
LOG = logging.getLogger(__name__)
1✔
48
SDK_ISDIR = 1 << 31
1✔
49

50

51
class SdkDockerClient(ContainerClient):
1✔
52
    """
53
    Class for managing Docker (or Podman) using the Python Docker SDK.
54

55
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
56
    for Docker these days (with ongoing efforts to further streamline the two), and the Docker SDK
57
    is doing some of the heavy lifting for us to support both target platforms.
58
    """
59

60
    docker_client: DockerClient | None
1✔
61

62
    def __init__(self):
1✔
63
        try:
1✔
64
            self.docker_client = self._create_client()
1✔
65
            logging.getLogger("urllib3").setLevel(logging.INFO)
1✔
66
        except DockerNotAvailable:
1✔
67
            self.docker_client = None
1✔
68

69
    def client(self):
1✔
70
        if self.docker_client:
1✔
71
            return self.docker_client
1✔
72
        # if the initialization failed before, try to initialize on-demand
73
        self.docker_client = self._create_client()
1✔
74
        return self.docker_client
1✔
75

76
    @staticmethod
1✔
77
    def _create_client():
1✔
78
        from localstack.config import DOCKER_SDK_DEFAULT_RETRIES, DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS
1✔
79

80
        for attempt in range(0, DOCKER_SDK_DEFAULT_RETRIES + 1):
1✔
81
            try:
1✔
82
                return docker.from_env(timeout=DOCKER_SDK_DEFAULT_TIMEOUT_SECONDS)
1✔
83
            except DockerException as e:
1✔
84
                LOG.debug(
1✔
85
                    "Creating Docker SDK client failed: %s. "
86
                    "If you want to use Docker as container runtime, make sure to mount the socket at /var/run/docker.sock",
87
                    e,
88
                    exc_info=LS_LOG in TRACE_LOG_LEVELS,
89
                )
90
                if attempt < DOCKER_SDK_DEFAULT_RETRIES:
1✔
91
                    # wait for a second before retrying
92
                    sleep(1)
1✔
93
                else:
94
                    # we are out of attempts
95
                    raise DockerNotAvailable("Docker not available") from e
1✔
96

97
    def _read_from_sock(self, sock: socket, tty: bool):
1✔
98
        """Reads multiplexed messages from a socket returned by attach_socket.
99

100
        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
101
        """
102
        stdout = b""
1✔
103
        stderr = b""
1✔
104
        for frame_type, frame_data in frames_iter(sock, tty):
1✔
105
            if frame_type == STDOUT:
1✔
106
                stdout += frame_data
1✔
107
            elif frame_type == STDERR:
1✔
108
                stderr += frame_data
1✔
109
            else:
110
                raise ContainerException("Invalid frame type when reading from socket")
×
111
        return stdout, stderr
1✔
112

113
    def _container_path_info(self, container: Container, container_path: str):
1✔
114
        """
115
        Get information about a path in the given container
116
        :param container: Container to be inspected
117
        :param container_path: Path in container
118
        :return: Tuple (path_exists, path_is_directory)
119
        """
120
        # Docker CLI copy uses go FileMode to determine if target is a dict or not
121
        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260
122
        # The isDir Bit is the most significant bit in the 32bit struct:
123
        # https://golang.org/src/os/types.go?s=2650:2683
124
        api_client = self.client().api
1✔
125

126
        def _head(path_suffix, **kwargs):
1✔
127
            return api_client.head(
1✔
128
                api_client.base_url + path_suffix, **api_client._set_request_timeout(kwargs)
129
            )
130

131
        escaped_id = quote(container.id, safe="/:")
1✔
132
        result = _head(f"/containers/{escaped_id}/archive", params={"path": container_path})
1✔
133
        stats = result.headers.get("X-Docker-Container-Path-Stat")
1✔
134
        target_exists = result.ok
1✔
135

136
        if target_exists:
1✔
137
            stats = json.loads(base64.b64decode(stats).decode("utf-8"))
1✔
138
        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)
1✔
139
        return target_exists, target_is_dir
1✔
140

141
    def get_system_info(self) -> dict:
1✔
142
        return self.client().info()
1✔
143

144
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
145
        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose
146
        try:
1✔
147
            container = self.client().containers.get(container_name)
1✔
148
            if container.status == "running":
1✔
149
                return DockerContainerStatus.UP
1✔
150
            elif container.status == "paused":
1✔
151
                return DockerContainerStatus.PAUSED
1✔
152
            else:
153
                return DockerContainerStatus.DOWN
1✔
154
        except NotFound:
1✔
155
            return DockerContainerStatus.NON_EXISTENT
1✔
156
        except APIError as e:
×
157
            raise ContainerException() from e
×
158

159
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
160
        try:
1✔
161
            container = self.client().containers.get(container_name)
1✔
162
            sdk_stats = container.stats(stream=False)
1✔
163

164
            # BlockIO: (Read, Write) bytes
165
            read_bytes = 0
1✔
166
            write_bytes = 0
1✔
167
            for entry in (
1✔
168
                sdk_stats.get("blkio_stats", {}).get("io_service_bytes_recursive", []) or []
169
            ):
170
                if entry.get("op") == "read":
×
171
                    read_bytes += entry.get("value", 0)
×
172
                elif entry.get("op") == "write":
×
173
                    write_bytes += entry.get("value", 0)
×
174

175
            # CPU percentage
176
            cpu_stats = sdk_stats.get("cpu_stats", {})
1✔
177
            precpu_stats = sdk_stats.get("precpu_stats", {})
1✔
178

179
            cpu_delta = cpu_stats.get("cpu_usage", {}).get("total_usage", 0) - precpu_stats.get(
1✔
180
                "cpu_usage", {}
181
            ).get("total_usage", 0)
182

183
            system_delta = cpu_stats.get("system_cpu_usage", 0) - precpu_stats.get(
1✔
184
                "system_cpu_usage", 0
185
            )
186

187
            online_cpus = cpu_stats.get("online_cpus", 1)
1✔
188
            cpu_percent = (
1✔
189
                (cpu_delta / system_delta * 100.0 * online_cpus) if system_delta > 0 else 0.0
190
            )
191

192
            # Memory (usage, limit) bytes
193
            memory_stats = sdk_stats.get("memory_stats", {})
1✔
194
            mem_usage = memory_stats.get("usage", 0)
1✔
195
            mem_limit = memory_stats.get("limit", 1)  # Prevent division by zero
1✔
196
            mem_inactive = memory_stats.get("stats", {}).get("inactive_file", 0)
1✔
197
            used_memory = max(0, mem_usage - mem_inactive)
1✔
198
            mem_percent = (used_memory / mem_limit * 100.0) if mem_limit else 0.0
1✔
199

200
            # Network IO
201
            net_rx = 0
1✔
202
            net_tx = 0
1✔
203
            for iface in sdk_stats.get("networks", {}).values():
1✔
204
                net_rx += iface.get("rx_bytes", 0)
1✔
205
                net_tx += iface.get("tx_bytes", 0)
1✔
206

207
            # Container ID
208
            container_id = sdk_stats.get("id", "")[:12]
1✔
209
            name = sdk_stats.get("name", "").lstrip("/")
1✔
210

211
            return DockerContainerStats(
1✔
212
                Container=container_id,
213
                ID=container_id,
214
                Name=name,
215
                BlockIO=(read_bytes, write_bytes),
216
                CPUPerc=round(cpu_percent, 2),
217
                MemPerc=round(mem_percent, 2),
218
                MemUsage=(used_memory, mem_limit),
219
                NetIO=(net_rx, net_tx),
220
                PIDs=sdk_stats.get("pids_stats", {}).get("current", 0),
221
                SDKStats=sdk_stats,  # keep the raw stats for more detailed information
222
            )
223
        except NotFound:
×
224
            raise NoSuchContainer(container_name)
×
225
        except APIError as e:
×
226
            raise ContainerException() from e
×
227

228
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
229
        LOG.debug("Stopping container: %s", container_name)
1✔
230
        try:
1✔
231
            container = self.client().containers.get(container_name)
1✔
232
            container.stop(timeout=timeout)
1✔
233
        except NotFound:
1✔
234
            raise NoSuchContainer(container_name)
1✔
235
        except APIError as e:
×
236
            raise ContainerException() from e
×
237

238
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
239
        LOG.debug("Restarting container: %s", container_name)
1✔
240
        try:
1✔
241
            container = self.client().containers.get(container_name)
1✔
242
            container.restart(timeout=timeout)
1✔
243
        except NotFound:
1✔
244
            raise NoSuchContainer(container_name)
1✔
245
        except APIError as e:
×
246
            raise ContainerException() from e
×
247

248
    def pause_container(self, container_name: str) -> None:
1✔
249
        LOG.debug("Pausing container: %s", container_name)
1✔
250
        try:
1✔
251
            container = self.client().containers.get(container_name)
1✔
252
            container.pause()
1✔
253
        except NotFound:
1✔
254
            raise NoSuchContainer(container_name)
1✔
255
        except APIError as e:
×
256
            raise ContainerException() from e
×
257

258
    def unpause_container(self, container_name: str) -> None:
1✔
259
        LOG.debug("Unpausing container: %s", container_name)
1✔
260
        try:
1✔
261
            container = self.client().containers.get(container_name)
1✔
262
            container.unpause()
1✔
263
        except NotFound:
×
264
            raise NoSuchContainer(container_name)
×
265
        except APIError as e:
×
266
            raise ContainerException() from e
×
267

268
    def remove_container(
1✔
269
        self, container_name: str, force=True, check_existence=False, volumes=False
270
    ) -> None:
271
        LOG.debug("Removing container: %s, with volumes: %s", container_name, volumes)
1✔
272
        if check_existence and container_name not in self.get_all_container_names():
1✔
273
            LOG.debug("Aborting removing due to check_existence check")
×
274
            return
×
275
        try:
1✔
276
            container = self.client().containers.get(container_name)
1✔
277
            container.remove(force=force, v=volumes)
1✔
278
        except NotFound:
1✔
279
            if not force:
1✔
280
                raise NoSuchContainer(container_name)
1✔
281
        except APIError as e:
×
282
            raise ContainerException() from e
×
283

284
    def list_containers(self, filter: list[str] | str | None = None, all=True) -> list[dict]:
1✔
285
        if filter:
1✔
286
            filter = [filter] if isinstance(filter, str) else filter
1✔
287
            filter = dict([f.split("=", 1) for f in filter])
1✔
288
        LOG.debug("Listing containers with filters: %s", filter)
1✔
289
        try:
1✔
290
            container_list = self.client().containers.list(filters=filter, all=all)
1✔
291
            result = []
1✔
292
            for container in container_list:
1✔
293
                try:
1✔
294
                    result.append(
1✔
295
                        {
296
                            "id": container.id,
297
                            "image": container.image,
298
                            "name": container.name,
299
                            "status": container.status,
300
                            "labels": container.labels,
301
                        }
302
                    )
303
                except Exception as e:
×
304
                    LOG.error("Error checking container %s: %s", container, e)
×
305
            return result
1✔
306
        except APIError as e:
1✔
307
            raise ContainerException() from e
1✔
308

309
    def copy_into_container(
1✔
310
        self, container_name: str, local_path: str, container_path: str
311
    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/
312
        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)
1✔
313
        try:
1✔
314
            container = self.client().containers.get(container_name)
1✔
315
            target_exists, target_isdir = self._container_path_info(container, container_path)
1✔
316
            target_path = container_path if target_isdir else os.path.dirname(container_path)
1✔
317
            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:
1✔
318
                container.put_archive(target_path, tar)
1✔
319
        except NotFound:
1✔
320
            raise NoSuchContainer(container_name)
1✔
321
        except APIError as e:
1✔
322
            raise ContainerException() from e
×
323

324
    def copy_from_container(
1✔
325
        self,
326
        container_name: str,
327
        local_path: str,
328
        container_path: str,
329
    ) -> None:
330
        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)
1✔
331
        try:
1✔
332
            container = self.client().containers.get(container_name)
1✔
333
            bits, _ = container.get_archive(container_path)
1✔
334
            Util.untar_to_path(bits, local_path)
1✔
335
        except NotFound:
1✔
336
            raise NoSuchContainer(container_name)
1✔
337
        except APIError as e:
×
338
            raise ContainerException() from e
×
339

340
    def pull_image(
1✔
341
        self,
342
        docker_image: str,
343
        platform: DockerPlatform | None = None,
344
        log_handler: Callable[[str], None] | None = None,
345
    ) -> None:
346
        LOG.debug("Pulling Docker image: %s", docker_image)
1✔
347
        # some path in the docker image string indicates a custom repository
348

349
        docker_image = self.registry_resolver_strategy.resolve(docker_image)
1✔
350
        kwargs: dict[str, str | bool] = {"platform": platform}
1✔
351
        try:
1✔
352
            if log_handler:
1✔
353
                # Use a lower-level API, as the 'stream' argument is not available in the higher-level `pull`-API
354
                kwargs["stream"] = True
1✔
355
                stream = self.client().api.pull(docker_image, **kwargs)
1✔
356
                for line in stream:
1✔
357
                    log_handler(to_str(line))
1✔
358
            else:
359
                self.client().images.pull(docker_image, **kwargs)
1✔
360
        except ImageNotFound:
1✔
361
            raise NoSuchImage(docker_image)
1✔
362
        except APIError as e:
×
363
            raise ContainerException() from e
×
364

365
    def push_image(self, docker_image: str) -> None:
1✔
366
        LOG.debug("Pushing Docker image: %s", docker_image)
1✔
367
        try:
1✔
368
            result = self.client().images.push(docker_image)
1✔
369
            # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising
370
            if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):
1✔
371
                if "image does not exist locally" in to_str(result):
1✔
372
                    raise NoSuchImage(docker_image)
1✔
373
                if "is denied" in to_str(result):
1✔
374
                    raise AccessDenied(docker_image)
1✔
375
                if "requesting higher privileges than access token allows" in to_str(result):
1✔
376
                    raise AccessDenied(docker_image)
×
377
                if "access token has insufficient scopes" in to_str(result):
1✔
378
                    raise AccessDenied(docker_image)
×
379
                if "connection refused" in to_str(result):
1✔
380
                    raise RegistryConnectionError(result)
1✔
381
                raise ContainerException(result)
×
382
        except ImageNotFound:
1✔
383
            raise NoSuchImage(docker_image)
×
384
        except APIError as e:
1✔
385
            # note: error message 'image not known' raised by Podman API
386
            if "image not known" in str(e):
×
387
                raise NoSuchImage(docker_image)
×
388
            raise ContainerException() from e
×
389

390
    def build_image(
1✔
391
        self,
392
        dockerfile_path: str,
393
        image_name: str,
394
        context_path: str = None,
395
        platform: DockerPlatform | None = None,
396
    ):
397
        try:
1✔
398
            dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
1✔
399
            context_path = context_path or os.path.dirname(dockerfile_path)
1✔
400
            LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)
1✔
401
            _, logs_iterator = self.client().images.build(
1✔
402
                path=context_path,
403
                dockerfile=dockerfile_path,
404
                tag=image_name,
405
                rm=True,
406
                platform=platform,
407
            )
408
            # logs_iterator is a stream of dicts. Example content:
409
            # {'stream': 'Step 1/4 : FROM alpine'}
410
            # ... other build steps
411
            # {'aux': {'ID': 'sha256:4dcf90e87fb963e898f9c7a0451a40e36f8e7137454c65ae4561277081747825'}}
412
            # {'stream': 'Successfully tagged img-5201f3e1:latest\n'}
413
            output = ""
1✔
414
            for log in logs_iterator:
1✔
415
                if isinstance(log, dict) and ("stream" in log or "error" in log):
1✔
416
                    output += log.get("stream") or log["error"]
1✔
417
            return output
1✔
418
        except APIError as e:
×
419
            raise ContainerException("Unable to build Docker image") from e
×
420

421
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
422
        try:
1✔
423
            LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)
1✔
424
            image = self.client().images.get(source_ref)
1✔
425
            image.tag(target_name)
1✔
426
        except APIError as e:
1✔
427
            if e.status_code == 404:
1✔
428
                raise NoSuchImage(source_ref)
1✔
429
            raise ContainerException("Unable to tag Docker image") from e
×
430

431
    def get_docker_image_names(
1✔
432
        self,
433
        strip_latest: bool = True,
434
        include_tags: bool = True,
435
        strip_wellknown_repo_prefixes: bool = True,
436
    ):
437
        try:
1✔
438
            images = self.client().images.list()
1✔
439
            image_names = [tag for image in images for tag in image.tags if image.tags]
1✔
440
            if not include_tags:
1✔
441
                image_names = [image_name.rpartition(":")[0] for image_name in image_names]
1✔
442
            if strip_wellknown_repo_prefixes:
1✔
443
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
1✔
444
            if strip_latest:
1✔
445
                Util.append_without_latest(image_names)
1✔
446
            return image_names
1✔
447
        except APIError as e:
×
448
            raise ContainerException() from e
×
449

450
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
451
        try:
1✔
452
            container = self.client().containers.get(container_name_or_id)
1✔
453
            return to_str(container.logs())
1✔
454
        except NotFound:
1✔
455
            if safe:
1✔
456
                return ""
1✔
457
            raise NoSuchContainer(container_name_or_id)
1✔
458
        except APIError as e:
1✔
459
            if safe:
1✔
460
                return ""
×
461
            raise ContainerException() from e
1✔
462

463
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
464
        try:
1✔
465
            container = self.client().containers.get(container_name_or_id)
1✔
466
            return container.logs(stream=True, follow=True)
1✔
467
        except NotFound:
1✔
468
            raise NoSuchContainer(container_name_or_id)
1✔
469
        except APIError as e:
×
470
            raise ContainerException() from e
×
471

472
    def inspect_container(self, container_name_or_id: str) -> dict[str, dict | str]:
1✔
473
        try:
1✔
474
            return self.client().containers.get(container_name_or_id).attrs
1✔
475
        except NotFound:
1✔
476
            raise NoSuchContainer(container_name_or_id)
1✔
477
        except APIError as e:
×
478
            raise ContainerException() from e
×
479

480
    def inspect_image(
1✔
481
        self,
482
        image_name: str,
483
        pull: bool = True,
484
        strip_wellknown_repo_prefixes: bool = True,
485
    ) -> dict[str, dict | list | str]:
486
        image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
487
        try:
1✔
488
            result = self.client().images.get(image_name).attrs
1✔
489
            if strip_wellknown_repo_prefixes:
1✔
490
                if result.get("RepoDigests"):
1✔
491
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
1✔
492
                        result["RepoDigests"]
493
                    )
494
                if result.get("RepoTags"):
1✔
495
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
1✔
496
            return result
1✔
497
        except NotFound:
1✔
498
            if pull:
1✔
499
                self.pull_image(image_name)
1✔
500
                return self.inspect_image(image_name, pull=False)
1✔
501
            raise NoSuchImage(image_name)
1✔
502
        except APIError as e:
×
503
            raise ContainerException() from e
×
504

505
    def create_network(self, network_name: str) -> None:
1✔
506
        try:
1✔
507
            return self.client().networks.create(name=network_name).id
1✔
508
        except APIError as e:
×
509
            raise ContainerException() from e
×
510

511
    def delete_network(self, network_name: str) -> None:
1✔
512
        try:
1✔
513
            return self.client().networks.get(network_name).remove()
1✔
514
        except NotFound:
×
515
            raise NoSuchNetwork(network_name)
×
516
        except APIError as e:
×
517
            raise ContainerException() from e
×
518

519
    def inspect_network(self, network_name: str) -> dict[str, dict | str]:
1✔
520
        try:
1✔
521
            return self.client().networks.get(network_name).attrs
1✔
522
        except NotFound:
1✔
523
            raise NoSuchNetwork(network_name)
1✔
524
        except APIError as e:
×
525
            raise ContainerException() from e
×
526

527
    def connect_container_to_network(
1✔
528
        self,
529
        network_name: str,
530
        container_name_or_id: str,
531
        aliases: list | None = None,
532
        link_local_ips: list[str] = None,
533
    ) -> None:
534
        LOG.debug(
1✔
535
            "Connecting container '%s' to network '%s' with aliases '%s'",
536
            container_name_or_id,
537
            network_name,
538
            aliases,
539
        )
540
        try:
1✔
541
            network = self.client().networks.get(network_name)
1✔
542
        except NotFound:
1✔
543
            raise NoSuchNetwork(network_name)
1✔
544
        try:
1✔
545
            network.connect(
1✔
546
                container=container_name_or_id,
547
                aliases=aliases,
548
                link_local_ips=link_local_ips,
549
            )
550
        except NotFound:
1✔
551
            raise NoSuchContainer(container_name_or_id)
1✔
552
        except APIError as e:
×
553
            raise ContainerException() from e
×
554

555
    def disconnect_container_from_network(
1✔
556
        self, network_name: str, container_name_or_id: str
557
    ) -> None:
558
        LOG.debug(
1✔
559
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
560
        )
561
        try:
1✔
562
            try:
1✔
563
                network = self.client().networks.get(network_name)
1✔
564
            except NotFound:
1✔
565
                raise NoSuchNetwork(network_name)
1✔
566
            try:
1✔
567
                network.disconnect(container_name_or_id)
1✔
568
            except NotFound:
1✔
569
                raise NoSuchContainer(container_name_or_id)
1✔
570
        except APIError as e:
1✔
571
            raise ContainerException() from e
×
572

573
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
574
        networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]
1✔
575
        network_names = list(networks)
1✔
576
        if len(network_names) > 1:
1✔
577
            LOG.info("Container has more than one assigned network. Picking the first one...")
×
578
        return networks[network_names[0]]["IPAddress"]
1✔
579

580
    @cache
1✔
581
    def has_docker(self) -> bool:
1✔
582
        try:
1✔
583
            if not self.docker_client:
1✔
584
                return False
×
585
            self.client().ping()
1✔
586
            return True
1✔
587
        except APIError:
×
588
            return False
×
589

590
    def remove_image(self, image: str, force: bool = True):
1✔
591
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
1✔
592
        try:
1✔
593
            self.client().images.remove(image=image, force=force)
1✔
594
        except ImageNotFound:
1✔
595
            if not force:
1✔
596
                raise NoSuchImage(image)
1✔
597
        except APIError as e:
×
598
            if "image not known" in str(e):
×
599
                raise NoSuchImage(image)
×
600
            raise ContainerException() from e
×
601

602
    def commit(
1✔
603
        self,
604
        container_name_or_id: str,
605
        image_name: str,
606
        image_tag: str,
607
    ):
608
        LOG.debug(
1✔
609
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
610
        )
611
        try:
1✔
612
            container = self.client().containers.get(container_name_or_id)
1✔
613
            container.commit(repository=image_name, tag=image_tag)
1✔
614
        except NotFound:
1✔
615
            raise NoSuchContainer(container_name_or_id)
1✔
616
        except APIError as e:
×
617
            raise ContainerException() from e
×
618

619
    def start_container(
1✔
620
        self,
621
        container_name_or_id: str,
622
        stdin=None,
623
        interactive: bool = False,
624
        attach: bool = False,
625
        flags: str | None = None,
626
    ) -> tuple[bytes, bytes]:
627
        LOG.debug("Starting container %s", container_name_or_id)
1✔
628
        try:
1✔
629
            container = self.client().containers.get(container_name_or_id)
1✔
630
            stdout = to_bytes(container_name_or_id)
1✔
631
            stderr = b""
1✔
632
            if interactive or attach:
1✔
633
                params = {"stdout": 1, "stderr": 1, "stream": 1}
1✔
634
                if interactive:
1✔
635
                    params["stdin"] = 1
1✔
636
                sock = container.attach_socket(params=params)
1✔
637
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
638
                result_queue = queue.Queue()
1✔
639
                thread_started = threading.Event()
1✔
640
                start_waiting = threading.Event()
1✔
641

642
                # Note: We need to be careful about potential race conditions here - .wait() should happen right
643
                #   after .start(). Hence starting a thread and asynchronously waiting for the container exit code
644
                def wait_for_result(*_):
1✔
645
                    _exit_code = -1
1✔
646
                    try:
1✔
647
                        thread_started.set()
1✔
648
                        start_waiting.wait()
1✔
649
                        _exit_code = container.wait()["StatusCode"]
1✔
650
                    except APIError as e:
×
651
                        _exit_code = 1
×
652
                        raise ContainerException(str(e))
×
653
                    finally:
654
                        result_queue.put(_exit_code)
1✔
655

656
                # start listener thread
657
                start_worker_thread(wait_for_result)
1✔
658
                thread_started.wait()
1✔
659
                try:
1✔
660
                    # start container
661
                    container.start()
1✔
662
                finally:
663
                    # start awaiting container result
664
                    start_waiting.set()
1✔
665

666
                # handle container input/output
667
                # under windows, the socket has no __enter__ / cannot be used as context manager
668
                # therefore try/finally instead of with here
669
                try:
1✔
670
                    if stdin:
1✔
671
                        sock.sendall(to_bytes(stdin))
1✔
672
                        sock.shutdown(socket.SHUT_WR)
1✔
673
                    stdout, stderr = self._read_from_sock(sock, False)
1✔
674
                except TimeoutError:
×
675
                    LOG.debug(
×
676
                        "Socket timeout when talking to the I/O streams of Docker container '%s'",
677
                        container_name_or_id,
678
                    )
679
                finally:
680
                    sock.close()
1✔
681

682
                # get container exit code
683
                exit_code = result_queue.get()
1✔
684
                if exit_code:
1✔
685
                    raise ContainerException(
1✔
686
                        f"Docker container returned with exit code {exit_code}",
687
                        stdout=stdout,
688
                        stderr=stderr,
689
                    )
690
            else:
691
                container.start()
1✔
692
            return stdout, stderr
1✔
693
        except NotFound:
1✔
694
            raise NoSuchContainer(container_name_or_id)
1✔
695
        except APIError as e:
1✔
696
            raise ContainerException() from e
1✔
697

698
    def attach_to_container(self, container_name_or_id: str):
1✔
699
        client: DockerClient = self.client()
×
700
        container = cast(Container, client.containers.get(container_name_or_id))
×
701
        container.attach()
×
702

703
    def create_container(
1✔
704
        self,
705
        image_name: str,
706
        *,
707
        name: str | None = None,
708
        entrypoint: list[str] | str | None = None,
709
        remove: bool = False,
710
        interactive: bool = False,
711
        tty: bool = False,
712
        detach: bool = False,
713
        command: list[str] | str | None = None,
714
        volumes: list[SimpleVolumeBind] | None = None,
715
        ports: PortMappings | None = None,
716
        exposed_ports: list[str] | None = None,
717
        env_vars: dict[str, str] | None = None,
718
        user: str | None = None,
719
        cap_add: list[str] | None = None,
720
        cap_drop: list[str] | None = None,
721
        security_opt: list[str] | None = None,
722
        network: str | None = None,
723
        dns: str | list[str] | None = None,
724
        additional_flags: str | None = None,
725
        workdir: str | None = None,
726
        privileged: bool | None = None,
727
        labels: dict[str, str] | None = None,
728
        platform: DockerPlatform | None = None,
729
        ulimits: list[Ulimit] | None = None,
730
        init: bool | None = None,
731
        log_config: LogConfig | None = None,
732
        cpu_shares: int | None = None,
733
        mem_limit: int | str | None = None,
734
    ) -> str:
735
        LOG.debug("Creating container with attributes: %s", locals())
1✔
736
        extra_hosts = None
1✔
737
        if additional_flags:
1✔
738
            parsed_flags = Util.parse_additional_flags(
1✔
739
                additional_flags,
740
                env_vars=env_vars,
741
                volumes=volumes,
742
                network=network,
743
                platform=platform,
744
                privileged=privileged,
745
                ports=ports,
746
                ulimits=ulimits,
747
                user=user,
748
                dns=dns,
749
            )
750
            env_vars = parsed_flags.env_vars
1✔
751
            extra_hosts = parsed_flags.extra_hosts
1✔
752
            volumes = parsed_flags.volumes
1✔
753
            labels = parsed_flags.labels
1✔
754
            network = parsed_flags.network
1✔
755
            platform = parsed_flags.platform
1✔
756
            privileged = parsed_flags.privileged
1✔
757
            ports = parsed_flags.ports
1✔
758
            ulimits = parsed_flags.ulimits
1✔
759
            user = parsed_flags.user
1✔
760
            dns = parsed_flags.dns
1✔
761

762
        try:
1✔
763
            kwargs = {}
1✔
764
            if cap_add:
1✔
765
                kwargs["cap_add"] = cap_add
1✔
766
            if cap_drop:
1✔
767
                kwargs["cap_drop"] = cap_drop
1✔
768
            if security_opt:
1✔
769
                kwargs["security_opt"] = security_opt
1✔
770
            if dns:
1✔
771
                kwargs["dns"] = ensure_list(dns)
1✔
772
            if exposed_ports:
1✔
773
                # This is not exactly identical to --expose, as they are listed in the "HostConfig" on docker inspect
774
                # but the behavior should be identical
UNCOV
775
                kwargs["ports"] = {port: [] for port in exposed_ports}
×
776
            if ports:
1✔
777
                kwargs.setdefault("ports", {})
1✔
778
                kwargs["ports"].update(ports.to_dict())
1✔
779
            if workdir:
1✔
780
                kwargs["working_dir"] = workdir
1✔
781
            if privileged:
1✔
UNCOV
782
                kwargs["privileged"] = True
×
783
            if init:
1✔
784
                kwargs["init"] = True
1✔
785
            if labels:
1✔
786
                kwargs["labels"] = labels
1✔
787
            if log_config:
1✔
788
                kwargs["log_config"] = DockerLogConfig(
1✔
789
                    type=log_config.type, config=log_config.config
790
                )
791
            if ulimits:
1✔
792
                kwargs["ulimits"] = [
1✔
793
                    docker.types.Ulimit(
794
                        name=ulimit.name, soft=ulimit.soft_limit, hard=ulimit.hard_limit
795
                    )
796
                    for ulimit in ulimits
797
                ]
798
            if cpu_shares:
1✔
799
                kwargs["cpu_shares"] = cpu_shares
1✔
800
            if mem_limit:
1✔
801
                kwargs["mem_limit"] = mem_limit
1✔
802
            mounts = None
1✔
803
            if volumes:
1✔
804
                mounts = Util.convert_mount_list_to_dict(volumes)
1✔
805

806
            image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
807

808
            def create_container():
1✔
809
                return self.client().containers.create(
1✔
810
                    image=image_name,
811
                    command=command,
812
                    auto_remove=remove,
813
                    name=name,
814
                    stdin_open=interactive,
815
                    tty=tty,
816
                    entrypoint=entrypoint,
817
                    environment=env_vars,
818
                    detach=detach,
819
                    user=user,
820
                    network=network,
821
                    volumes=mounts,
822
                    extra_hosts=extra_hosts,
823
                    platform=platform,
824
                    **kwargs,
825
                )
826

827
            try:
1✔
828
                container = create_container()
1✔
829
            except ImageNotFound:
1✔
830
                LOG.debug("Image not found. Pulling image %s", image_name)
1✔
831
                self.pull_image(image_name, platform)
1✔
832
                container = create_container()
1✔
833
            return container.id
1✔
834
        except ImageNotFound:
1✔
UNCOV
835
            raise NoSuchImage(image_name)
×
836
        except APIError as e:
1✔
UNCOV
837
            raise ContainerException() from e
×
838

839
    def run_container(
1✔
840
        self,
841
        image_name: str,
842
        stdin=None,
843
        *,
844
        name: str | None = None,
845
        entrypoint: str | None = None,
846
        remove: bool = False,
847
        interactive: bool = False,
848
        tty: bool = False,
849
        detach: bool = False,
850
        command: list[str] | str | None = None,
851
        volumes: list[SimpleVolumeBind] | None = None,
852
        ports: PortMappings | None = None,
853
        exposed_ports: list[str] | None = None,
854
        env_vars: dict[str, str] | None = None,
855
        user: str | None = None,
856
        cap_add: list[str] | None = None,
857
        cap_drop: list[str] | None = None,
858
        security_opt: list[str] | None = None,
859
        network: str | None = None,
860
        dns: str | None = None,
861
        additional_flags: str | None = None,
862
        workdir: str | None = None,
863
        labels: dict[str, str] | None = None,
864
        platform: DockerPlatform | None = None,
865
        privileged: bool | None = None,
866
        ulimits: list[Ulimit] | None = None,
867
        init: bool | None = None,
868
        log_config: LogConfig | None = None,
869
        cpu_shares: int | None = None,
870
        mem_limit: int | str | None = None,
871
    ) -> tuple[bytes, bytes]:
872
        LOG.debug("Running container with image: %s", image_name)
1✔
873
        container = None
1✔
874
        try:
1✔
875
            container = self.create_container(
1✔
876
                image_name,
877
                name=name,
878
                entrypoint=entrypoint,
879
                interactive=interactive,
880
                tty=tty,
881
                detach=detach,
882
                remove=remove and detach,
883
                command=command,
884
                volumes=volumes,
885
                ports=ports,
886
                exposed_ports=exposed_ports,
887
                env_vars=env_vars,
888
                user=user,
889
                cap_add=cap_add,
890
                cap_drop=cap_drop,
891
                security_opt=security_opt,
892
                network=network,
893
                dns=dns,
894
                additional_flags=additional_flags,
895
                workdir=workdir,
896
                privileged=privileged,
897
                platform=platform,
898
                init=init,
899
                labels=labels,
900
                ulimits=ulimits,
901
                log_config=log_config,
902
                cpu_shares=cpu_shares,
903
                mem_limit=mem_limit,
904
            )
905
            result = self.start_container(
1✔
906
                container_name_or_id=container,
907
                stdin=stdin,
908
                interactive=interactive,
909
                attach=not detach,
910
            )
911
        finally:
912
            if remove and container and not detach:
1✔
913
                self.remove_container(container)
1✔
914
        return result
1✔
915

916
    def exec_in_container(
1✔
917
        self,
918
        container_name_or_id: str,
919
        command: list[str] | str,
920
        interactive=False,
921
        detach=False,
922
        env_vars: dict[str, str | None] | None = None,
923
        stdin: bytes | None = None,
924
        user: str | None = None,
925
        workdir: str | None = None,
926
    ) -> tuple[bytes, bytes]:
927
        LOG.debug("Executing command in container %s: %s", container_name_or_id, command)
1✔
928
        try:
1✔
929
            container: Container = self.client().containers.get(container_name_or_id)
1✔
930
            result = container.exec_run(
1✔
931
                cmd=command,
932
                environment=env_vars,
933
                user=user,
934
                detach=detach,
935
                stdin=interactive and bool(stdin),
936
                socket=interactive and bool(stdin),
937
                stdout=True,
938
                stderr=True,
939
                demux=True,
940
                workdir=workdir,
941
            )
942
            tty = False
1✔
943
            if interactive and stdin:  # result is a socket
1✔
944
                sock = result[1]
1✔
945
                sock = sock._sock if hasattr(sock, "_sock") else sock
1✔
946
                with sock:
1✔
947
                    try:
1✔
948
                        sock.sendall(stdin)
1✔
949
                        sock.shutdown(socket.SHUT_WR)
1✔
950
                        stdout, stderr = self._read_from_sock(sock, tty)
1✔
951
                        return stdout, stderr
1✔
UNCOV
952
                    except TimeoutError:
×
UNCOV
953
                        pass
×
954
            else:
955
                if detach:
1✔
UNCOV
956
                    return b"", b""
×
957
                return_code = result[0]
1✔
958
                if isinstance(result[1], bytes):
1✔
959
                    stdout = result[1]
×
UNCOV
960
                    stderr = b""
×
961
                else:
962
                    stdout, stderr = result[1]
1✔
963
                if return_code != 0:
1✔
964
                    raise ContainerException(
1✔
965
                        f"Exec command returned with exit code {return_code}", stdout, stderr
966
                    )
967
                return stdout, stderr
1✔
968
        except ContainerError:
1✔
UNCOV
969
            raise NoSuchContainer(container_name_or_id)
×
970
        except APIError as e:
1✔
971
            raise ContainerException() from e
1✔
972

973
    def login(self, username: str, password: str, registry: str | None = None) -> None:
1✔
UNCOV
974
        LOG.debug("Docker login for %s", username)
×
UNCOV
975
        try:
×
UNCOV
976
            self.client().login(username, password=password, registry=registry, reauth=True)
×
UNCOV
977
        except APIError as e:
×
978
            raise ContainerException() from e
×
979

980

981
# apply patches required for podman API compatibility
982

983

984
@property
1✔
985
def _container_image(self):
1✔
986
    image_id = self.attrs.get("ImageID", self.attrs["Image"])
1✔
987
    if image_id is None:
1✔
UNCOV
988
        return None
×
989
    image_ref = image_id
1✔
990
    # Fix for podman API response: Docker returns "sha:..." for `Image`, podman returns "<image-name>:<tag>".
991
    # See https://github.com/containers/podman/issues/8329 . Without this check, the Docker client would
992
    # blindly strip off the suffix after the colon `:` (which is the `<tag>` in podman's case) which would
993
    # then lead to "no such image" errors.
994
    if re.match("sha256:[0-9a-f]{64}", image_id, flags=re.IGNORECASE):
1✔
995
        image_ref = image_id.split(":")[1]
1✔
996
    return self.client.images.get(image_ref)
1✔
997

998

999
Container.image = _container_image
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc