• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.74
/localstack-core/localstack/utils/container_utils/docker_cmd_client.py
1
import functools
1✔
2
import itertools
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import shlex
1✔
8
import subprocess
1✔
9
from collections.abc import Callable
1✔
10

11
from localstack import config
1✔
12
from localstack.utils.collections import ensure_list
1✔
13
from localstack.utils.container_utils.container_client import (
1✔
14
    AccessDenied,
15
    CancellableStream,
16
    ContainerClient,
17
    ContainerException,
18
    DockerContainerStats,
19
    DockerContainerStatus,
20
    DockerNotAvailable,
21
    DockerPlatform,
22
    LogConfig,
23
    Mount,
24
    NoSuchContainer,
25
    NoSuchImage,
26
    NoSuchNetwork,
27
    NoSuchObject,
28
    PortMappings,
29
    RegistryConnectionError,
30
    Ulimit,
31
    Util,
32
    VolumeMappingSpecification,
33
)
34
from localstack.utils.run import run
1✔
35
from localstack.utils.strings import first_char_to_upper, to_str
1✔
36

37
LOG = logging.getLogger(__name__)
1✔
38

39

40
class CancellableProcessStream(CancellableStream):
1✔
41
    process: subprocess.Popen
1✔
42

43
    def __init__(self, process: subprocess.Popen) -> None:
1✔
44
        super().__init__()
1✔
45
        self.process = process
1✔
46

47
    def __iter__(self):
1✔
48
        return self
1✔
49

50
    def __next__(self):
1✔
51
        line = self.process.stdout.readline()
1✔
52
        if not line:
1✔
53
            raise StopIteration
1✔
54
        return line
1✔
55

56
    def close(self):
1✔
57
        return self.process.terminate()
1✔
58

59

60
def parse_size_string(size_str: str) -> int:
1✔
61
    """Parse human-readable size strings from Docker CLI into bytes"""
62
    size_str = size_str.strip().replace(" ", "").upper()
×
63
    if size_str == "0B":
×
64
        return 0
×
65

66
    # Match value and unit using regex
67
    match = re.match(r"^([\d.]+)([A-Za-z]+)$", size_str)
×
68
    if not match:
×
69
        return 0
×
70

71
    value = float(match.group(1))
×
72
    unit = match.group(2)
×
73

74
    unit_factors = {
×
75
        "B": 1,
76
        "KB": 10**3,
77
        "MB": 10**6,
78
        "GB": 10**9,
79
        "TB": 10**12,
80
        "KIB": 2**10,
81
        "MIB": 2**20,
82
        "GIB": 2**30,
83
        "TIB": 2**40,
84
    }
85

86
    return int(value * unit_factors.get(unit, 1))
×
87

88

89
class CmdDockerClient(ContainerClient):
1✔
90
    """
91
    Class for managing Docker (or Podman) containers using the command line executable.
92

93
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
94
    for Docker these days. The majority of compatibility switches in this class is to handle slightly
95
    different response payloads or error messages returned by the `docker` vs `podman` commands.
96
    """
97

98
    default_run_outfile: str | None = None
1✔
99

100
    def _docker_cmd(self) -> list[str]:
1✔
101
        """
102
        Get the configured, tested Docker CMD.
103
        :return: string to be used for running Docker commands
104
        :raises: DockerNotAvailable exception if the Docker command or the socker is not available
105
        """
106
        if not self.has_docker():
1✔
107
            raise DockerNotAvailable()
1✔
108
        return shlex.split(config.DOCKER_CMD)
1✔
109

110
    def get_system_info(self) -> dict:
1✔
111
        cmd = [
1✔
112
            *self._docker_cmd(),
113
            "info",
114
            "--format",
115
            "{{json .}}",
116
        ]
117
        cmd_result = run(cmd)
1✔
118

119
        return json.loads(cmd_result)
1✔
120

121
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
122
        cmd = self._docker_cmd()
1✔
123
        cmd += [
1✔
124
            "ps",
125
            "-a",
126
            "--filter",
127
            f"name={container_name}",
128
            "--format",
129
            "{{ .Status }} - {{ .Names }}",
130
        ]
131
        cmd_result = run(cmd)
1✔
132

133
        # filter empty / invalid lines from docker ps output
134
        cmd_result = next((line for line in cmd_result.splitlines() if container_name in line), "")
1✔
135
        container_status = cmd_result.strip().lower()
1✔
136
        if len(container_status) == 0:
1✔
137
            return DockerContainerStatus.NON_EXISTENT
1✔
138
        elif "(paused)" in container_status:
1✔
139
            return DockerContainerStatus.PAUSED
×
140
        elif container_status.startswith("up "):
1✔
141
            return DockerContainerStatus.UP
1✔
142
        else:
143
            return DockerContainerStatus.DOWN
1✔
144

145
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
146
        cmd = self._docker_cmd()
×
147
        cmd += ["stats", "--no-stream", "--format", "{{json .}}", container_name]
×
148
        cmd_result = run(cmd)
×
149
        raw_stats = json.loads(cmd_result)
×
150

151
        # BlockIO (read, write)
152
        block_io_parts = raw_stats["BlockIO"].split("/")
×
153
        block_read = parse_size_string(block_io_parts[0])
×
154
        block_write = parse_size_string(block_io_parts[1])
×
155

156
        # CPU percentage
157
        cpu_percentage = float(raw_stats["CPUPerc"].strip("%"))
×
158

159
        # Memory (usage, limit)
160
        mem_parts = raw_stats["MemUsage"].split("/")
×
161
        mem_used = parse_size_string(mem_parts[0])
×
162
        mem_limit = parse_size_string(mem_parts[1])
×
163
        mem_percentage = float(raw_stats["MemPerc"].strip("%"))
×
164

165
        # Network (rx, tx)
166
        net_parts = raw_stats["NetIO"].split("/")
×
167
        net_rx = parse_size_string(net_parts[0])
×
168
        net_tx = parse_size_string(net_parts[1])
×
169

170
        return DockerContainerStats(
×
171
            Container=raw_stats["ID"],
172
            ID=raw_stats["ID"],
173
            Name=raw_stats["Name"],
174
            BlockIO=(block_read, block_write),
175
            CPUPerc=round(cpu_percentage, 2),
176
            MemPerc=round(mem_percentage, 2),
177
            MemUsage=(mem_used, mem_limit),
178
            NetIO=(net_rx, net_tx),
179
            PIDs=int(raw_stats["PIDs"]),
180
            SDKStats=None,
181
        )
182

183
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
184
        cmd = self._docker_cmd()
1✔
185
        cmd += ["stop", "--time", str(timeout), container_name]
1✔
186
        LOG.debug("Stopping container with cmd %s", cmd)
1✔
187
        try:
1✔
188
            run(cmd)
1✔
189
        except subprocess.CalledProcessError as e:
1✔
190
            self._check_and_raise_no_such_container_error(container_name, error=e)
1✔
191
            raise ContainerException(
×
192
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
193
            ) from e
194

195
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
196
        cmd = self._docker_cmd()
×
197
        cmd += ["restart", "--time", str(timeout), container_name]
×
198
        LOG.debug("Restarting container with cmd %s", cmd)
×
199
        try:
×
200
            run(cmd)
×
201
        except subprocess.CalledProcessError as e:
×
202
            self._check_and_raise_no_such_container_error(container_name, error=e)
×
203
            raise ContainerException(
×
204
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
205
            ) from e
206

207
    def pause_container(self, container_name: str) -> None:
1✔
208
        cmd = self._docker_cmd()
×
209
        cmd += ["pause", container_name]
×
210
        LOG.debug("Pausing container with cmd %s", cmd)
×
211
        try:
×
212
            run(cmd)
×
213
        except subprocess.CalledProcessError as e:
×
214
            self._check_and_raise_no_such_container_error(container_name, error=e)
×
215
            raise ContainerException(
×
216
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
217
            ) from e
218

219
    def unpause_container(self, container_name: str) -> None:
1✔
220
        cmd = self._docker_cmd()
×
221
        cmd += ["unpause", container_name]
×
222
        LOG.debug("Unpausing container with cmd %s", cmd)
×
223
        try:
×
224
            run(cmd)
×
225
        except subprocess.CalledProcessError as e:
×
226
            self._check_and_raise_no_such_container_error(container_name, error=e)
×
227
            raise ContainerException(
×
228
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
229
            ) from e
230

231
    def remove_image(self, image: str, force: bool = True) -> None:
1✔
232
        cmd = self._docker_cmd()
×
233
        cmd += ["rmi", image]
×
234
        if force:
×
235
            cmd += ["--force"]
×
236
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
×
237
        try:
×
238
            run(cmd)
×
239
        except subprocess.CalledProcessError as e:
×
240
            # handle different error messages for Docker and podman
241
            error_messages = ["No such image", "image not known"]
×
242
            if any(msg in to_str(e.stdout) for msg in error_messages):
×
243
                raise NoSuchImage(image, stdout=e.stdout, stderr=e.stderr)
×
244
            raise ContainerException(
×
245
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
246
            ) from e
247

248
    def commit(
1✔
249
        self,
250
        container_name_or_id: str,
251
        image_name: str,
252
        image_tag: str,
253
    ):
254
        cmd = self._docker_cmd()
×
255
        cmd += ["commit", container_name_or_id, f"{image_name}:{image_tag}"]
×
256
        LOG.debug(
×
257
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
258
        )
259
        try:
×
260
            run(cmd)
×
261
        except subprocess.CalledProcessError as e:
×
262
            self._check_and_raise_no_such_container_error(container_name_or_id, error=e)
×
263
            raise ContainerException(
×
264
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
265
            ) from e
266

267
    def remove_container(
1✔
268
        self, container_name: str, force=True, check_existence=False, volumes=False
269
    ) -> None:
270
        if check_existence and container_name not in self.get_all_container_names():
1✔
271
            return
×
272
        cmd = self._docker_cmd() + ["rm"]
1✔
273
        if force:
1✔
274
            cmd.append("-f")
1✔
275
        if volumes:
1✔
276
            cmd.append("--volumes")
×
277
        cmd.append(container_name)
1✔
278
        LOG.debug("Removing container with cmd %s", cmd)
1✔
279
        try:
1✔
280
            output = run(cmd)
1✔
281
            # When the container does not exist, the output could have the error message without any exception
282
            if isinstance(output, str) and not force:
1✔
283
                self._check_output_and_raise_no_such_container_error(container_name, output=output)
×
284
        except subprocess.CalledProcessError as e:
1✔
285
            if not force:
1✔
286
                self._check_and_raise_no_such_container_error(container_name, error=e)
×
287
            raise ContainerException(
1✔
288
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
289
            ) from e
290

291
    def list_containers(self, filter: list[str] | str | None = None, all=True) -> list[dict]:
1✔
292
        filter = [filter] if isinstance(filter, str) else filter
1✔
293
        cmd = self._docker_cmd()
1✔
294
        cmd.append("ps")
1✔
295
        if all:
1✔
296
            cmd.append("-a")
1✔
297
        options = []
1✔
298
        if filter:
1✔
299
            options += [y for filter_item in filter for y in ["--filter", filter_item]]
×
300
        cmd += options
1✔
301
        cmd.append("--format")
1✔
302
        cmd.append("{{json . }}")
1✔
303
        try:
1✔
304
            cmd_result = run(cmd).strip()
1✔
305
        except subprocess.CalledProcessError as e:
×
306
            raise ContainerException(
×
307
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
308
            ) from e
309
        container_list = []
1✔
310
        if cmd_result:
1✔
311
            if cmd_result[0] == "[":
1✔
312
                container_list = json.loads(cmd_result)
×
313
            else:
314
                container_list = [json.loads(line) for line in cmd_result.splitlines()]
1✔
315
        result = []
1✔
316
        for container in container_list:
1✔
317
            labels = self._transform_container_labels(container["Labels"])
1✔
318
            result.append(
1✔
319
                {
320
                    # support both, Docker and podman API response formats (`ID` vs `Id`)
321
                    "id": container.get("ID") or container["Id"],
322
                    "image": container["Image"],
323
                    # Docker returns a single string for `Names`, whereas podman returns a list of names
324
                    "name": ensure_list(container["Names"])[0],
325
                    "status": container["State"],
326
                    "labels": labels,
327
                }
328
            )
329
        return result
1✔
330

331
    def copy_into_container(
1✔
332
        self, container_name: str, local_path: str, container_path: str
333
    ) -> None:
334
        cmd = self._docker_cmd()
×
335
        cmd += ["cp", local_path, f"{container_name}:{container_path}"]
×
336
        LOG.debug("Copying into container with cmd: %s", cmd)
×
337
        try:
×
338
            run(cmd)
×
339
        except subprocess.CalledProcessError as e:
×
340
            self._check_and_raise_no_such_container_error(container_name, error=e)
×
341
            if "does not exist" in to_str(e.stdout):
×
342
                raise NoSuchContainer(container_name, stdout=e.stdout, stderr=e.stderr)
×
343
            raise ContainerException(
×
344
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
345
            ) from e
346

347
    def copy_from_container(
1✔
348
        self, container_name: str, local_path: str, container_path: str
349
    ) -> None:
350
        cmd = self._docker_cmd()
×
351
        cmd += ["cp", f"{container_name}:{container_path}", local_path]
×
352
        LOG.debug("Copying from container with cmd: %s", cmd)
×
353
        try:
×
354
            run(cmd)
×
355
        except subprocess.CalledProcessError as e:
×
356
            self._check_and_raise_no_such_container_error(container_name, error=e)
×
357
            # additional check to support Podman CLI output
358
            if re.match(".*container .+ does not exist", to_str(e.stdout)):
×
359
                raise NoSuchContainer(container_name, stdout=e.stdout, stderr=e.stderr)
×
360
            raise ContainerException(
×
361
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
362
            ) from e
363

364
    def pull_image(
1✔
365
        self,
366
        docker_image: str,
367
        platform: DockerPlatform | None = None,
368
        log_handler: Callable[[str], None] | None = None,
369
    ) -> None:
370
        cmd = self._docker_cmd()
×
371
        docker_image = self.registry_resolver_strategy.resolve(docker_image)
×
372
        cmd += ["pull", docker_image]
×
373
        if platform:
×
374
            cmd += ["--platform", platform]
×
375
        LOG.debug("Pulling image with cmd: %s", cmd)
×
376
        try:
×
377
            result = run(cmd)
×
378
            # note: we could stream the results, but we'll just process everything at the end for now
379
            if log_handler:
×
380
                for line in result.split("\n"):
×
381
                    log_handler(to_str(line))
×
382
        except subprocess.CalledProcessError as e:
×
383
            stdout_str = to_str(e.stdout)
×
384
            if "pull access denied" in stdout_str:
×
385
                raise NoSuchImage(docker_image, stdout=e.stdout, stderr=e.stderr)
×
386
            # note: error message 'access to the resource is denied' raised by Podman client
387
            if "Trying to pull" in stdout_str and "access to the resource is denied" in stdout_str:
×
388
                raise NoSuchImage(docker_image, stdout=e.stdout, stderr=e.stderr)
×
389
            raise ContainerException(
×
390
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
391
            ) from e
392

393
    def push_image(self, docker_image: str) -> None:
1✔
394
        cmd = self._docker_cmd()
×
395
        cmd += ["push", docker_image]
×
396
        LOG.debug("Pushing image with cmd: %s", cmd)
×
397
        try:
×
398
            run(cmd)
×
399
        except subprocess.CalledProcessError as e:
×
400
            if "is denied" in to_str(e.stdout):
×
401
                raise AccessDenied(docker_image)
×
402
            if "requesting higher privileges than access token allows" in to_str(e.stdout):
×
403
                raise AccessDenied(docker_image)
×
404
            if "access token has insufficient scopes" in to_str(e.stdout):
×
405
                raise AccessDenied(docker_image)
×
406
            if "does not exist" in to_str(e.stdout):
×
407
                raise NoSuchImage(docker_image)
×
408
            if "connection refused" in to_str(e.stdout):
×
409
                raise RegistryConnectionError(e.stdout)
×
410
            # note: error message 'image not known' raised by Podman client
411
            if "image not known" in to_str(e.stdout):
×
412
                raise NoSuchImage(docker_image)
×
413
            raise ContainerException(
×
414
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
415
            ) from e
416

417
    def build_image(
1✔
418
        self,
419
        dockerfile_path: str,
420
        image_name: str,
421
        context_path: str = None,
422
        platform: DockerPlatform | None = None,
423
    ):
424
        cmd = self._docker_cmd()
×
425
        dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
×
426
        context_path = context_path or os.path.dirname(dockerfile_path)
×
427
        cmd += ["build", "-t", image_name, "-f", dockerfile_path]
×
428
        if platform:
×
429
            cmd += ["--platform", platform]
×
430
        cmd += [context_path]
×
431
        LOG.debug("Building Docker image: %s", cmd)
×
432
        try:
×
433
            return run(cmd)
×
434
        except subprocess.CalledProcessError as e:
×
435
            raise ContainerException(
×
436
                f"Docker build process returned with error code {e.returncode}", e.stdout, e.stderr
437
            ) from e
438

439
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
440
        cmd = self._docker_cmd()
×
441
        cmd += ["tag", source_ref, target_name]
×
442
        LOG.debug("Tagging Docker image %s as %s", source_ref, target_name)
×
443
        try:
×
444
            run(cmd)
×
445
        except subprocess.CalledProcessError as e:
×
446
            # handle different error messages for Docker and podman
447
            error_messages = ["No such image", "image not known"]
×
448
            if any(msg in to_str(e.stdout) for msg in error_messages):
×
449
                raise NoSuchImage(source_ref)
×
450
            raise ContainerException(
×
451
                f"Docker process returned with error code {e.returncode}", e.stdout, e.stderr
452
            ) from e
453

454
    def get_docker_image_names(
1✔
455
        self, strip_latest=True, include_tags=True, strip_wellknown_repo_prefixes: bool = True
456
    ):
457
        format_string = "{{.Repository}}:{{.Tag}}" if include_tags else "{{.Repository}}"
×
458
        cmd = self._docker_cmd()
×
459
        cmd += ["images", "--format", format_string]
×
460
        try:
×
461
            output = run(cmd)
×
462

463
            image_names = output.splitlines()
×
464
            if strip_wellknown_repo_prefixes:
×
465
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
×
466
            if strip_latest:
×
467
                Util.append_without_latest(image_names)
×
468

469
            return image_names
×
470
        except Exception as e:
×
471
            LOG.info('Unable to list Docker images via "%s": %s', cmd, e)
×
472
            return []
×
473

474
    def get_container_logs(self, container_name_or_id: str, safe=False) -> str:
1✔
475
        cmd = self._docker_cmd()
1✔
476
        cmd += ["logs", container_name_or_id]
1✔
477
        try:
1✔
478
            return run(cmd)
1✔
479
        except subprocess.CalledProcessError as e:
×
480
            if safe:
×
481
                return ""
×
482
            self._check_and_raise_no_such_container_error(container_name_or_id, error=e)
×
483
            raise ContainerException(
×
484
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
485
            ) from e
486

487
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
488
        self.inspect_container(container_name_or_id)  # guard to check whether container is there
1✔
489

490
        cmd = self._docker_cmd()
1✔
491
        cmd += ["logs", "--follow", container_name_or_id]
1✔
492

493
        process: subprocess.Popen = run(
1✔
494
            cmd, asynchronous=True, outfile=subprocess.PIPE, stderr=subprocess.STDOUT
495
        )
496

497
        return CancellableProcessStream(process)
1✔
498

499
    def _inspect_object(self, object_name_or_id: str) -> dict[str, dict | list | str]:
1✔
500
        cmd = self._docker_cmd()
1✔
501
        cmd += ["inspect", "--format", "{{json .}}", object_name_or_id]
1✔
502
        try:
1✔
503
            cmd_result = run(cmd, print_error=False)
1✔
504
        except subprocess.CalledProcessError as e:
1✔
505
            # note: case-insensitive comparison, to support Docker and Podman output formats
506
            if "no such object" in to_str(e.stdout).lower():
1✔
507
                raise NoSuchObject(object_name_or_id, stdout=e.stdout, stderr=e.stderr)
1✔
508
            raise ContainerException(
×
509
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
510
            ) from e
511
        object_data = json.loads(cmd_result.strip())
1✔
512
        if isinstance(object_data, list):
1✔
513
            # return first list item, for compatibility with Podman API
514
            if len(object_data) == 1:
×
515
                result = object_data[0]
×
516
                # convert first character to uppercase (e.g., `name` -> `Name`), for Podman/Docker compatibility
517
                result = {first_char_to_upper(k): v for k, v in result.items()}
×
518
                return result
×
519
            LOG.info(
×
520
                "Expected a single object for `inspect` on ID %s, got %s",
521
                object_name_or_id,
522
                len(object_data),
523
            )
524
        return object_data
1✔
525

526
    def inspect_container(self, container_name_or_id: str) -> dict[str, dict | str]:
1✔
527
        try:
1✔
528
            return self._inspect_object(container_name_or_id)
1✔
529
        except NoSuchObject as e:
1✔
530
            raise NoSuchContainer(container_name_or_id=e.object_id)
×
531

532
    def inspect_image(
1✔
533
        self,
534
        image_name: str,
535
        pull: bool = True,
536
        strip_wellknown_repo_prefixes: bool = True,
537
    ) -> dict[str, dict | list | str]:
538
        image_name = self.registry_resolver_strategy.resolve(image_name)
×
539
        try:
×
540
            result = self._inspect_object(image_name)
×
541
            if strip_wellknown_repo_prefixes:
×
542
                if result.get("RepoDigests"):
×
543
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
×
544
                        result["RepoDigests"]
545
                    )
546
                if result.get("RepoTags"):
×
547
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
×
548
            return result
×
549
        except NoSuchObject as e:
×
550
            if pull:
×
551
                self.pull_image(image_name)
×
552
                return self.inspect_image(image_name, pull=False)
×
553
            raise NoSuchImage(image_name=e.object_id)
×
554

555
    def create_network(self, network_name: str) -> str:
1✔
556
        cmd = self._docker_cmd()
1✔
557
        cmd += ["network", "create", network_name]
1✔
558
        try:
1✔
559
            return run(cmd).strip()
1✔
560
        except subprocess.CalledProcessError as e:
×
561
            raise ContainerException(
×
562
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
563
            ) from e
564

565
    def delete_network(self, network_name: str) -> None:
1✔
566
        cmd = self._docker_cmd()
1✔
567
        cmd += ["network", "rm", network_name]
1✔
568
        try:
1✔
569
            run(cmd)
1✔
570
        except subprocess.CalledProcessError as e:
×
571
            stdout_str = to_str(e.stdout)
×
572
            if re.match(r".*network (.*) not found.*", stdout_str):
×
573
                raise NoSuchNetwork(network_name=network_name)
×
574
            else:
575
                raise ContainerException(
×
576
                    f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
577
                ) from e
578

579
    def inspect_network(self, network_name: str) -> dict[str, dict | str]:
1✔
580
        try:
1✔
581
            return self._inspect_object(network_name)
1✔
582
        except NoSuchObject as e:
1✔
583
            raise NoSuchNetwork(network_name=e.object_id)
1✔
584

585
    def connect_container_to_network(
1✔
586
        self,
587
        network_name: str,
588
        container_name_or_id: str,
589
        aliases: list | None = None,
590
        link_local_ips: list[str] = None,
591
    ) -> None:
592
        LOG.debug(
×
593
            "Connecting container '%s' to network '%s' with aliases '%s'",
594
            container_name_or_id,
595
            network_name,
596
            aliases,
597
        )
598
        cmd = self._docker_cmd()
×
599
        cmd += ["network", "connect"]
×
600
        if aliases:
×
601
            cmd += ["--alias", ",".join(aliases)]
×
602
        if link_local_ips:
×
603
            cmd += ["--link-local-ip", ",".join(link_local_ips)]
×
604
        cmd += [network_name, container_name_or_id]
×
605
        try:
×
606
            run(cmd)
×
607
        except subprocess.CalledProcessError as e:
×
608
            stdout_str = to_str(e.stdout)
×
609
            if re.match(r".*network (.*) not found.*", stdout_str):
×
610
                raise NoSuchNetwork(network_name=network_name)
×
611
            self._check_and_raise_no_such_container_error(container_name_or_id, error=e)
×
612
            raise ContainerException(
×
613
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
614
            ) from e
615

616
    def disconnect_container_from_network(
1✔
617
        self, network_name: str, container_name_or_id: str
618
    ) -> None:
619
        LOG.debug(
1✔
620
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
621
        )
622
        cmd = self._docker_cmd() + ["network", "disconnect", network_name, container_name_or_id]
1✔
623
        try:
1✔
624
            run(cmd)
1✔
625
        except subprocess.CalledProcessError as e:
×
626
            stdout_str = to_str(e.stdout)
×
627
            if re.match(r".*network (.*) not found.*", stdout_str):
×
628
                raise NoSuchNetwork(network_name=network_name)
×
629
            self._check_and_raise_no_such_container_error(container_name_or_id, error=e)
×
630
            raise ContainerException(
×
631
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
632
            ) from e
633

634
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
635
        cmd = self._docker_cmd()
1✔
636
        cmd += [
1✔
637
            "inspect",
638
            "--format",
639
            "{{range .NetworkSettings.Networks}}{{.IPAddress}} {{end}}",
640
            container_name_or_id,
641
        ]
642
        try:
1✔
643
            result = run(cmd).strip()
1✔
644
            return result.split(" ")[0] if result else ""
1✔
645
        except subprocess.CalledProcessError as e:
×
646
            self._check_and_raise_no_such_container_error(container_name_or_id, error=e)
×
647
            # consider different error messages for Podman
648
            if "no such object" in to_str(e.stdout).lower():
×
649
                raise NoSuchContainer(container_name_or_id, stdout=e.stdout, stderr=e.stderr)
×
650
            raise ContainerException(
×
651
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
652
            ) from e
653

654
    def login(self, username: str, password: str, registry: str | None = None) -> None:
1✔
655
        cmd = self._docker_cmd()
×
656
        # TODO specify password via stdin
657
        cmd += ["login", "-u", username, "-p", password]
×
658
        if registry:
×
659
            cmd.append(registry)
×
660
        try:
×
661
            run(cmd)
×
662
        except subprocess.CalledProcessError as e:
×
663
            raise ContainerException(
×
664
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
665
            ) from e
666

667
    @functools.cache
1✔
668
    def has_docker(self) -> bool:
1✔
669
        try:
1✔
670
            # do not use self._docker_cmd here (would result in a loop)
671
            run(shlex.split(config.DOCKER_CMD) + ["ps"])
1✔
672
            return True
1✔
673
        except (subprocess.CalledProcessError, FileNotFoundError):
1✔
674
            return False
1✔
675

676
    def create_container(self, image_name: str, **kwargs) -> str:
1✔
677
        image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
678
        cmd, env_file = self._build_run_create_cmd("create", image_name, **kwargs)
1✔
679
        LOG.debug("Create container with cmd: %s", cmd)
1✔
680
        try:
1✔
681
            container_id = run(cmd)
1✔
682
            # Note: strip off Docker warning messages like "DNS setting (--dns=127.0.0.1) may fail in containers"
683
            container_id = container_id.strip().split("\n")[-1]
1✔
684
            return container_id.strip()
1✔
685
        except subprocess.CalledProcessError as e:
×
686
            error_messages = ["Unable to find image", "Trying to pull"]
×
687
            if any(msg in to_str(e.stdout) for msg in error_messages):
×
688
                raise NoSuchImage(image_name, stdout=e.stdout, stderr=e.stderr)
×
689
            raise ContainerException(
×
690
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
691
            ) from e
692
        finally:
693
            Util.rm_env_vars_file(env_file)
1✔
694

695
    def run_container(self, image_name: str, stdin=None, **kwargs) -> tuple[bytes, bytes]:
1✔
696
        image_name = self.registry_resolver_strategy.resolve(image_name)
×
697
        cmd, env_file = self._build_run_create_cmd("run", image_name, **kwargs)
×
698
        LOG.debug("Run container with cmd: %s", cmd)
×
699
        try:
×
700
            return self._run_async_cmd(cmd, stdin, kwargs.get("name") or "", image_name)
×
701
        except ContainerException as e:
×
702
            if "Trying to pull" in str(e) and "access to the resource is denied" in str(e):
×
703
                raise NoSuchImage(image_name, stdout=e.stdout, stderr=e.stderr) from e
×
704
            raise
×
705
        finally:
706
            Util.rm_env_vars_file(env_file)
×
707

708
    def exec_in_container(
1✔
709
        self,
710
        container_name_or_id: str,
711
        command: list[str] | str,
712
        interactive=False,
713
        detach=False,
714
        env_vars: dict[str, str | None] | None = None,
715
        stdin: bytes | None = None,
716
        user: str | None = None,
717
        workdir: str | None = None,
718
    ) -> tuple[bytes, bytes]:
719
        env_file = None
1✔
720
        cmd = self._docker_cmd()
1✔
721
        cmd.append("exec")
1✔
722
        if interactive:
1✔
723
            cmd.append("--interactive")
×
724
        if detach:
1✔
725
            cmd.append("--detach")
×
726
        if user:
1✔
727
            cmd += ["--user", user]
×
728
        if workdir:
1✔
729
            cmd += ["--workdir", workdir]
×
730
        if env_vars:
1✔
731
            env_flag, env_file = Util.create_env_vars_file_flag(env_vars)
×
732
            cmd += env_flag
×
733
        cmd.append(container_name_or_id)
1✔
734
        cmd += command if isinstance(command, list) else [command]
1✔
735
        LOG.debug("Execute command in container: %s", cmd)
1✔
736
        try:
1✔
737
            return self._run_async_cmd(cmd, stdin, container_name_or_id)
1✔
738
        finally:
739
            Util.rm_env_vars_file(env_file)
1✔
740

741
    def start_container(
1✔
742
        self,
743
        container_name_or_id: str,
744
        stdin=None,
745
        interactive: bool = False,
746
        attach: bool = False,
747
        flags: str | None = None,
748
    ) -> tuple[bytes, bytes]:
749
        cmd = self._docker_cmd() + ["start"]
1✔
750
        if flags:
1✔
751
            cmd.append(flags)
×
752
        if interactive:
1✔
753
            cmd.append("--interactive")
×
754
        if attach:
1✔
755
            cmd.append("--attach")
×
756
        cmd.append(container_name_or_id)
1✔
757
        LOG.debug("Start container with cmd: %s", cmd)
1✔
758
        return self._run_async_cmd(cmd, stdin, container_name_or_id)
1✔
759

760
    def attach_to_container(self, container_name_or_id: str):
1✔
761
        cmd = self._docker_cmd() + ["attach", container_name_or_id]
1✔
762
        LOG.debug("Attaching to container %s", container_name_or_id)
1✔
763
        return self._run_async_cmd(cmd, stdin=None, container_name=container_name_or_id)
1✔
764

765
    def _run_async_cmd(
1✔
766
        self, cmd: list[str], stdin: bytes, container_name: str, image_name=None
767
    ) -> tuple[bytes, bytes]:
768
        kwargs = {
1✔
769
            "inherit_env": True,
770
            "asynchronous": True,
771
            "stderr": subprocess.PIPE,
772
            "outfile": self.default_run_outfile or subprocess.PIPE,
773
        }
774
        if stdin:
1✔
775
            kwargs["stdin"] = True
×
776
        try:
1✔
777
            process = run(cmd, **kwargs)
1✔
778
            stdout, stderr = process.communicate(input=stdin)
1✔
779
            if process.returncode != 0:
1✔
780
                raise subprocess.CalledProcessError(
×
781
                    process.returncode,
782
                    cmd,
783
                    stdout,
784
                    stderr,
785
                )
786
            else:
787
                return stdout, stderr
1✔
788
        except subprocess.CalledProcessError as e:
×
789
            stderr_str = to_str(e.stderr)
×
790
            if "Unable to find image" in stderr_str:
×
791
                raise NoSuchImage(image_name or "", stdout=e.stdout, stderr=e.stderr)
×
792
            # consider different error messages for Docker/Podman
793
            error_messages = ("No such container", "no container with name or ID")
×
794
            if any(msg.lower() in to_str(e.stderr).lower() for msg in error_messages):
×
795
                raise NoSuchContainer(container_name, stdout=e.stdout, stderr=e.stderr)
×
796
            raise ContainerException(
×
797
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
798
            ) from e
799

800
    def _build_run_create_cmd(
1✔
801
        self,
802
        action: str,
803
        image_name: str,
804
        *,
805
        name: str | None = None,
806
        entrypoint: list[str] | str | None = None,
807
        remove: bool = False,
808
        interactive: bool = False,
809
        tty: bool = False,
810
        detach: bool = False,
811
        command: list[str] | str | None = None,
812
        volumes: list[VolumeMappingSpecification] | None = None,
813
        ports: PortMappings | None = None,
814
        exposed_ports: list[str] | None = None,
815
        env_vars: dict[str, str] | None = None,
816
        user: str | None = None,
817
        cap_add: list[str] | None = None,
818
        cap_drop: list[str] | None = None,
819
        security_opt: list[str] | None = None,
820
        network: str | None = None,
821
        dns: str | list[str] | None = None,
822
        additional_flags: str | None = None,
823
        workdir: str | None = None,
824
        privileged: bool | None = None,
825
        labels: dict[str, str] | None = None,
826
        platform: DockerPlatform | None = None,
827
        ulimits: list[Ulimit] | None = None,
828
        init: bool | None = None,
829
        log_config: LogConfig | None = None,
830
        cpu_shares: int | None = None,
831
        mem_limit: int | str | None = None,
832
    ) -> tuple[list[str], str]:
833
        env_file = None
1✔
834
        cmd = self._docker_cmd() + [action]
1✔
835
        if remove:
1✔
836
            cmd.append("--rm")
1✔
837
        if name:
1✔
838
            cmd += ["--name", name]
1✔
839
        if entrypoint is not None:  # empty string entrypoint can be intentional
1✔
840
            if isinstance(entrypoint, str):
1✔
841
                cmd += ["--entrypoint", entrypoint]
1✔
842
            else:
843
                cmd += ["--entrypoint", shlex.join(entrypoint)]
×
844
        if privileged:
1✔
845
            cmd += ["--privileged"]
×
846
        if volumes:
1✔
847
            cmd += [
1✔
848
                param for volume in volumes for param in ["-v", self._map_to_volume_param(volume)]
849
            ]
850
        if interactive:
1✔
851
            cmd.append("--interactive")
×
852
        if tty:
1✔
853
            cmd.append("--tty")
×
854
        if detach:
1✔
855
            cmd.append("--detach")
×
856
        if ports:
1✔
857
            cmd += ports.to_list()
1✔
858
        if exposed_ports:
1✔
859
            cmd += list(itertools.chain.from_iterable(["--expose", port] for port in exposed_ports))
×
860
        if env_vars:
1✔
861
            env_flags, env_file = Util.create_env_vars_file_flag(env_vars)
1✔
862
            cmd += env_flags
1✔
863
        if user:
1✔
864
            cmd += ["--user", user]
×
865
        if cap_add:
1✔
866
            cmd += list(itertools.chain.from_iterable(["--cap-add", cap] for cap in cap_add))
×
867
        if cap_drop:
1✔
868
            cmd += list(itertools.chain.from_iterable(["--cap-drop", cap] for cap in cap_drop))
×
869
        if security_opt:
1✔
870
            cmd += list(
×
871
                itertools.chain.from_iterable(["--security-opt", opt] for opt in security_opt)
872
            )
873
        if network:
1✔
874
            cmd += ["--network", network]
1✔
875
        if dns:
1✔
876
            for dns_server in ensure_list(dns):
×
877
                cmd += ["--dns", dns_server]
×
878
        if workdir:
1✔
879
            cmd += ["--workdir", workdir]
×
880
        if labels:
1✔
881
            for key, value in labels.items():
×
882
                cmd += ["--label", f"{key}={value}"]
×
883
        if platform:
1✔
884
            cmd += ["--platform", platform]
×
885
        if ulimits:
1✔
886
            cmd += list(
×
887
                itertools.chain.from_iterable(["--ulimit", str(ulimit)] for ulimit in ulimits)
888
            )
889
        if init:
1✔
890
            cmd += ["--init"]
×
891
        if log_config:
1✔
892
            cmd += ["--log-driver", log_config.type]
×
893
            for key, value in log_config.config.items():
×
894
                cmd += ["--log-opt", f"{key}={value}"]
×
895
        if cpu_shares:
1✔
896
            cmd += ["--cpu-shares", str(cpu_shares)]
×
897
        if mem_limit:
1✔
898
            cmd += ["--memory", str(mem_limit)]
×
899

900
        if additional_flags:
1✔
901
            cmd += shlex.split(additional_flags)
1✔
902
        cmd.append(image_name)
1✔
903
        if command:
1✔
904
            cmd += command if isinstance(command, list) else [command]
1✔
905
        return cmd, env_file
1✔
906

907
    @staticmethod
1✔
908
    def _map_to_volume_param(volume: VolumeMappingSpecification) -> str:
1✔
909
        """
910
        Maps the mount volume, to a parameter for the -v docker cli argument.
911

912
        Examples:
913
        (host_path, container_path) -> host_path:container_path
914
        VolumeBind(host_dir=host_path, container_dir=container_path, read_only=True) -> host_path:container_path:ro
915

916
        :param volume: Either a SimpleVolumeBind, in essence a tuple (host_dir, container_dir), or a VolumeBind object
917
        :return: String which is passable as parameter to the docker cli -v option
918
        """
919
        # TODO: move this logic to the VolumeMappingSpecification type
920
        if isinstance(volume, Mount):
1✔
921
            return volume.to_str()
1✔
922
        else:
923
            return f"{volume[0]}:{volume[1]}"
×
924

925
    def _check_and_raise_no_such_container_error(
1✔
926
        self, container_name_or_id: str, error: subprocess.CalledProcessError
927
    ):
928
        """
929
        Check the given client invocation error and raise a `NoSuchContainer` exception if it
930
        represents a `no such container` exception from Docker or Podman.
931
        """
932
        self._check_output_and_raise_no_such_container_error(
1✔
933
            container_name_or_id, str(error.stdout), error=str(error.stderr)
934
        )
935

936
    def _check_output_and_raise_no_such_container_error(
1✔
937
        self, container_name_or_id: str, output: str, error: str | None = None
938
    ):
939
        """
940
        Check the given client invocation output and raise a `NoSuchContainer` exception if it
941
        represents a `no such container` exception from Docker or Podman.
942
        """
943
        possible_not_found_messages = ("No such container", "no container with name or ID")
1✔
944
        if any(msg.lower() in output.lower() for msg in possible_not_found_messages):
1✔
945
            raise NoSuchContainer(container_name_or_id, stdout=output, stderr=error)
1✔
946

947
    def _transform_container_labels(self, labels: str | dict[str, str]) -> dict[str, str]:
1✔
948
        """
949
        Transforms the container labels returned by the docker command from the key-value pair format to a dict
950
        :param labels: Input string, comma separated key value pairs. Example: key1=value1,key2=value2
951
        :return: Dict representation of the passed values, example: {"key1": "value1", "key2": "value2"}
952
        """
953
        if isinstance(labels, dict):
1✔
954
            return labels
×
955

956
        labels = labels.split(",")
1✔
957
        labels = [label.partition("=") for label in labels]
1✔
958
        return {label[0]: label[2] for label in labels}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc