• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17391568346

01 Sep 2025 02:21PM UTC coverage: 86.858% (-0.007%) from 86.865%
17391568346

push

github

web-flow
Fix typing for the tagging service (#13077)

6 of 6 new or added lines in 1 file covered. (100.0%)

9 existing lines in 3 files now uncovered.

67082 of 77232 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.82
/localstack-core/localstack/utils/container_utils/docker_cmd_client.py
1
import functools
1✔
2
import itertools
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import shlex
1✔
8
import subprocess
1✔
9
from typing import Callable, Optional, Union
1✔
10

11
from localstack import config
1✔
12
from localstack.utils.collections import ensure_list
1✔
13
from localstack.utils.container_utils.container_client import (
1✔
14
    AccessDenied,
15
    BindMount,
16
    CancellableStream,
17
    ContainerClient,
18
    ContainerException,
19
    DockerContainerStats,
20
    DockerContainerStatus,
21
    DockerNotAvailable,
22
    DockerPlatform,
23
    LogConfig,
24
    NoSuchContainer,
25
    NoSuchImage,
26
    NoSuchNetwork,
27
    NoSuchObject,
28
    PortMappings,
29
    RegistryConnectionError,
30
    SimpleVolumeBind,
31
    Ulimit,
32
    Util,
33
    VolumeDirMount,
34
)
35
from localstack.utils.run import run
1✔
36
from localstack.utils.strings import first_char_to_upper, to_str
1✔
37

38
LOG = logging.getLogger(__name__)
1✔
39

40

41
class CancellableProcessStream(CancellableStream):
1✔
42
    process: subprocess.Popen
1✔
43

44
    def __init__(self, process: subprocess.Popen) -> None:
1✔
45
        super().__init__()
1✔
46
        self.process = process
1✔
47

48
    def __iter__(self):
1✔
49
        return self
1✔
50

51
    def __next__(self):
1✔
52
        line = self.process.stdout.readline()
1✔
53
        if not line:
1✔
54
            raise StopIteration
1✔
55
        return line
1✔
56

57
    def close(self):
1✔
58
        return self.process.terminate()
1✔
59

60

61
def parse_size_string(size_str: str) -> int:
1✔
62
    """Parse human-readable size strings from Docker CLI into bytes"""
63
    size_str = size_str.strip().replace(" ", "").upper()
×
64
    if size_str == "0B":
×
65
        return 0
×
66

67
    # Match value and unit using regex
68
    match = re.match(r"^([\d.]+)([A-Za-z]+)$", size_str)
×
69
    if not match:
×
70
        return 0
×
71

72
    value = float(match.group(1))
×
73
    unit = match.group(2)
×
74

75
    unit_factors = {
×
76
        "B": 1,
77
        "KB": 10**3,
78
        "MB": 10**6,
79
        "GB": 10**9,
80
        "TB": 10**12,
81
        "KIB": 2**10,
82
        "MIB": 2**20,
83
        "GIB": 2**30,
84
        "TIB": 2**40,
85
    }
86

87
    return int(value * unit_factors.get(unit, 1))
×
88

89

90
class CmdDockerClient(ContainerClient):
1✔
91
    """
92
    Class for managing Docker (or Podman) containers using the command line executable.
93

94
    The client also supports targeting Podman engines, as Podman is almost a drop-in replacement
95
    for Docker these days. The majority of compatibility switches in this class is to handle slightly
96
    different response payloads or error messages returned by the `docker` vs `podman` commands.
97
    """
98

99
    default_run_outfile: Optional[str] = None
1✔
100

101
    def _docker_cmd(self) -> list[str]:
1✔
102
        """
103
        Get the configured, tested Docker CMD.
104
        :return: string to be used for running Docker commands
105
        :raises: DockerNotAvailable exception if the Docker command or the socker is not available
106
        """
107
        if not self.has_docker():
1✔
108
            raise DockerNotAvailable()
1✔
109
        return shlex.split(config.DOCKER_CMD)
1✔
110

111
    def get_system_info(self) -> dict:
1✔
112
        cmd = [
1✔
113
            *self._docker_cmd(),
114
            "info",
115
            "--format",
116
            "{{json .}}",
117
        ]
118
        cmd_result = run(cmd)
1✔
119

120
        return json.loads(cmd_result)
1✔
121

122
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
123
        cmd = self._docker_cmd()
1✔
124
        cmd += [
1✔
125
            "ps",
126
            "-a",
127
            "--filter",
128
            f"name={container_name}",
129
            "--format",
130
            "{{ .Status }} - {{ .Names }}",
131
        ]
132
        cmd_result = run(cmd)
1✔
133

134
        # filter empty / invalid lines from docker ps output
135
        cmd_result = next((line for line in cmd_result.splitlines() if container_name in line), "")
1✔
136
        container_status = cmd_result.strip().lower()
1✔
137
        if len(container_status) == 0:
1✔
138
            return DockerContainerStatus.NON_EXISTENT
1✔
139
        elif "(paused)" in container_status:
1✔
140
            return DockerContainerStatus.PAUSED
×
141
        elif container_status.startswith("up "):
1✔
142
            return DockerContainerStatus.UP
1✔
143
        else:
144
            return DockerContainerStatus.DOWN
1✔
145

146
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
147
        cmd = self._docker_cmd()
×
148
        cmd += ["stats", "--no-stream", "--format", "{{json .}}", container_name]
×
149
        cmd_result = run(cmd)
×
150
        raw_stats = json.loads(cmd_result)
×
151

152
        # BlockIO (read, write)
153
        block_io_parts = raw_stats["BlockIO"].split("/")
×
154
        block_read = parse_size_string(block_io_parts[0])
×
155
        block_write = parse_size_string(block_io_parts[1])
×
156

157
        # CPU percentage
158
        cpu_percentage = float(raw_stats["CPUPerc"].strip("%"))
×
159

160
        # Memory (usage, limit)
161
        mem_parts = raw_stats["MemUsage"].split("/")
×
162
        mem_used = parse_size_string(mem_parts[0])
×
163
        mem_limit = parse_size_string(mem_parts[1])
×
164
        mem_percentage = float(raw_stats["MemPerc"].strip("%"))
×
165

166
        # Network (rx, tx)
167
        net_parts = raw_stats["NetIO"].split("/")
×
168
        net_rx = parse_size_string(net_parts[0])
×
169
        net_tx = parse_size_string(net_parts[1])
×
170

171
        return DockerContainerStats(
×
172
            Container=raw_stats["ID"],
173
            ID=raw_stats["ID"],
174
            Name=raw_stats["Name"],
175
            BlockIO=(block_read, block_write),
176
            CPUPerc=round(cpu_percentage, 2),
177
            MemPerc=round(mem_percentage, 2),
178
            MemUsage=(mem_used, mem_limit),
179
            NetIO=(net_rx, net_tx),
180
            PIDs=int(raw_stats["PIDs"]),
181
            SDKStats=None,
182
        )
183

184
    def stop_container(self, container_name: str, timeout: int = 10) -> None:
1✔
185
        cmd = self._docker_cmd()
1✔
186
        cmd += ["stop", "--time", str(timeout), container_name]
1✔
187
        LOG.debug("Stopping container with cmd %s", cmd)
1✔
188
        try:
1✔
189
            run(cmd)
1✔
190
        except subprocess.CalledProcessError as e:
1✔
191
            self._check_and_raise_no_such_container_error(container_name, error=e)
1✔
192
            raise ContainerException(
×
193
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
194
            ) from e
195

196
    def restart_container(self, container_name: str, timeout: int = 10) -> None:
1✔
197
        cmd = self._docker_cmd()
×
198
        cmd += ["restart", "--time", str(timeout), container_name]
×
199
        LOG.debug("Restarting container with cmd %s", cmd)
×
200
        try:
×
201
            run(cmd)
×
202
        except subprocess.CalledProcessError as e:
×
203
            self._check_and_raise_no_such_container_error(container_name, error=e)
×
204
            raise ContainerException(
×
205
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
206
            ) from e
207

208
    def pause_container(self, container_name: str) -> None:
1✔
209
        cmd = self._docker_cmd()
×
210
        cmd += ["pause", container_name]
×
211
        LOG.debug("Pausing container with cmd %s", cmd)
×
212
        try:
×
213
            run(cmd)
×
214
        except subprocess.CalledProcessError as e:
×
215
            self._check_and_raise_no_such_container_error(container_name, error=e)
×
216
            raise ContainerException(
×
217
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
218
            ) from e
219

220
    def unpause_container(self, container_name: str) -> None:
1✔
221
        cmd = self._docker_cmd()
×
222
        cmd += ["unpause", container_name]
×
223
        LOG.debug("Unpausing container with cmd %s", cmd)
×
224
        try:
×
225
            run(cmd)
×
226
        except subprocess.CalledProcessError as e:
×
227
            self._check_and_raise_no_such_container_error(container_name, error=e)
×
228
            raise ContainerException(
×
229
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
230
            ) from e
231

232
    def remove_image(self, image: str, force: bool = True) -> None:
1✔
233
        cmd = self._docker_cmd()
×
234
        cmd += ["rmi", image]
×
235
        if force:
×
236
            cmd += ["--force"]
×
237
        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")
×
238
        try:
×
239
            run(cmd)
×
240
        except subprocess.CalledProcessError as e:
×
241
            # handle different error messages for Docker and podman
242
            error_messages = ["No such image", "image not known"]
×
243
            if any(msg in to_str(e.stdout) for msg in error_messages):
×
244
                raise NoSuchImage(image, stdout=e.stdout, stderr=e.stderr)
×
245
            raise ContainerException(
×
246
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
247
            ) from e
248

249
    def commit(
1✔
250
        self,
251
        container_name_or_id: str,
252
        image_name: str,
253
        image_tag: str,
254
    ):
255
        cmd = self._docker_cmd()
×
256
        cmd += ["commit", container_name_or_id, f"{image_name}:{image_tag}"]
×
257
        LOG.debug(
×
258
            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag
259
        )
260
        try:
×
261
            run(cmd)
×
262
        except subprocess.CalledProcessError as e:
×
263
            self._check_and_raise_no_such_container_error(container_name_or_id, error=e)
×
264
            raise ContainerException(
×
265
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
266
            ) from e
267

268
    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:
1✔
269
        if check_existence and container_name not in self.get_all_container_names():
1✔
270
            return
×
271
        cmd = self._docker_cmd() + ["rm"]
1✔
272
        if force:
1✔
273
            cmd.append("-f")
1✔
274
        cmd.append(container_name)
1✔
275
        LOG.debug("Removing container with cmd %s", cmd)
1✔
276
        try:
1✔
277
            output = run(cmd)
1✔
278
            # When the container does not exist, the output could have the error message without any exception
279
            if isinstance(output, str) and not force:
1✔
280
                self._check_output_and_raise_no_such_container_error(container_name, output=output)
×
UNCOV
281
        except subprocess.CalledProcessError as e:
×
UNCOV
282
            if not force:
×
283
                self._check_and_raise_no_such_container_error(container_name, error=e)
×
UNCOV
284
            raise ContainerException(
×
285
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
286
            ) from e
287

288
    def list_containers(self, filter: Union[list[str], str, None] = None, all=True) -> list[dict]:
1✔
289
        filter = [filter] if isinstance(filter, str) else filter
1✔
290
        cmd = self._docker_cmd()
1✔
291
        cmd.append("ps")
1✔
292
        if all:
1✔
293
            cmd.append("-a")
1✔
294
        options = []
1✔
295
        if filter:
1✔
296
            options += [y for filter_item in filter for y in ["--filter", filter_item]]
×
297
        cmd += options
1✔
298
        cmd.append("--format")
1✔
299
        cmd.append("{{json . }}")
1✔
300
        try:
1✔
301
            cmd_result = run(cmd).strip()
1✔
302
        except subprocess.CalledProcessError as e:
×
303
            raise ContainerException(
×
304
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
305
            ) from e
306
        container_list = []
1✔
307
        if cmd_result:
1✔
308
            if cmd_result[0] == "[":
1✔
309
                container_list = json.loads(cmd_result)
×
310
            else:
311
                container_list = [json.loads(line) for line in cmd_result.splitlines()]
1✔
312
        result = []
1✔
313
        for container in container_list:
1✔
314
            labels = self._transform_container_labels(container["Labels"])
1✔
315
            result.append(
1✔
316
                {
317
                    # support both, Docker and podman API response formats (`ID` vs `Id`)
318
                    "id": container.get("ID") or container["Id"],
319
                    "image": container["Image"],
320
                    # Docker returns a single string for `Names`, whereas podman returns a list of names
321
                    "name": ensure_list(container["Names"])[0],
322
                    "status": container["State"],
323
                    "labels": labels,
324
                }
325
            )
326
        return result
1✔
327

328
    def copy_into_container(
1✔
329
        self, container_name: str, local_path: str, container_path: str
330
    ) -> None:
331
        cmd = self._docker_cmd()
1✔
332
        cmd += ["cp", local_path, f"{container_name}:{container_path}"]
1✔
333
        LOG.debug("Copying into container with cmd: %s", cmd)
1✔
334
        try:
1✔
335
            run(cmd)
1✔
336
        except subprocess.CalledProcessError as e:
×
337
            self._check_and_raise_no_such_container_error(container_name, error=e)
×
338
            if "does not exist" in to_str(e.stdout):
×
339
                raise NoSuchContainer(container_name, stdout=e.stdout, stderr=e.stderr)
×
340
            raise ContainerException(
×
341
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
342
            ) from e
343

344
    def copy_from_container(
1✔
345
        self, container_name: str, local_path: str, container_path: str
346
    ) -> None:
347
        cmd = self._docker_cmd()
×
348
        cmd += ["cp", f"{container_name}:{container_path}", local_path]
×
349
        LOG.debug("Copying from container with cmd: %s", cmd)
×
350
        try:
×
351
            run(cmd)
×
352
        except subprocess.CalledProcessError as e:
×
353
            self._check_and_raise_no_such_container_error(container_name, error=e)
×
354
            # additional check to support Podman CLI output
355
            if re.match(".*container .+ does not exist", to_str(e.stdout)):
×
356
                raise NoSuchContainer(container_name, stdout=e.stdout, stderr=e.stderr)
×
357
            raise ContainerException(
×
358
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
359
            ) from e
360

361
    def pull_image(
1✔
362
        self,
363
        docker_image: str,
364
        platform: Optional[DockerPlatform] = None,
365
        log_handler: Optional[Callable[[str], None]] = None,
366
    ) -> None:
367
        cmd = self._docker_cmd()
1✔
368
        docker_image = self.registry_resolver_strategy.resolve(docker_image)
1✔
369
        cmd += ["pull", docker_image]
1✔
370
        if platform:
1✔
371
            cmd += ["--platform", platform]
1✔
372
        LOG.debug("Pulling image with cmd: %s", cmd)
1✔
373
        try:
1✔
374
            result = run(cmd)
1✔
375
            # note: we could stream the results, but we'll just process everything at the end for now
376
            if log_handler:
1✔
377
                for line in result.split("\n"):
×
378
                    log_handler(to_str(line))
×
379
        except subprocess.CalledProcessError as e:
×
380
            stdout_str = to_str(e.stdout)
×
381
            if "pull access denied" in stdout_str:
×
382
                raise NoSuchImage(docker_image, stdout=e.stdout, stderr=e.stderr)
×
383
            # note: error message 'access to the resource is denied' raised by Podman client
384
            if "Trying to pull" in stdout_str and "access to the resource is denied" in stdout_str:
×
385
                raise NoSuchImage(docker_image, stdout=e.stdout, stderr=e.stderr)
×
386
            raise ContainerException(
×
387
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
388
            ) from e
389

390
    def push_image(self, docker_image: str) -> None:
1✔
391
        cmd = self._docker_cmd()
×
392
        cmd += ["push", docker_image]
×
393
        LOG.debug("Pushing image with cmd: %s", cmd)
×
394
        try:
×
395
            run(cmd)
×
396
        except subprocess.CalledProcessError as e:
×
397
            if "is denied" in to_str(e.stdout):
×
398
                raise AccessDenied(docker_image)
×
399
            if "requesting higher privileges than access token allows" in to_str(e.stdout):
×
400
                raise AccessDenied(docker_image)
×
401
            if "access token has insufficient scopes" in to_str(e.stdout):
×
402
                raise AccessDenied(docker_image)
×
403
            if "does not exist" in to_str(e.stdout):
×
404
                raise NoSuchImage(docker_image)
×
405
            if "connection refused" in to_str(e.stdout):
×
406
                raise RegistryConnectionError(e.stdout)
×
407
            # note: error message 'image not known' raised by Podman client
408
            if "image not known" in to_str(e.stdout):
×
409
                raise NoSuchImage(docker_image)
×
410
            raise ContainerException(
×
411
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
412
            ) from e
413

414
    def build_image(
1✔
415
        self,
416
        dockerfile_path: str,
417
        image_name: str,
418
        context_path: str = None,
419
        platform: Optional[DockerPlatform] = None,
420
    ):
421
        cmd = self._docker_cmd()
×
422
        dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)
×
423
        context_path = context_path or os.path.dirname(dockerfile_path)
×
424
        cmd += ["build", "-t", image_name, "-f", dockerfile_path]
×
425
        if platform:
×
426
            cmd += ["--platform", platform]
×
427
        cmd += [context_path]
×
428
        LOG.debug("Building Docker image: %s", cmd)
×
429
        try:
×
430
            return run(cmd)
×
431
        except subprocess.CalledProcessError as e:
×
432
            raise ContainerException(
×
433
                f"Docker build process returned with error code {e.returncode}", e.stdout, e.stderr
434
            ) from e
435

436
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
437
        cmd = self._docker_cmd()
×
438
        cmd += ["tag", source_ref, target_name]
×
439
        LOG.debug("Tagging Docker image %s as %s", source_ref, target_name)
×
440
        try:
×
441
            run(cmd)
×
442
        except subprocess.CalledProcessError as e:
×
443
            # handle different error messages for Docker and podman
444
            error_messages = ["No such image", "image not known"]
×
445
            if any(msg in to_str(e.stdout) for msg in error_messages):
×
446
                raise NoSuchImage(source_ref)
×
447
            raise ContainerException(
×
448
                f"Docker process returned with error code {e.returncode}", e.stdout, e.stderr
449
            ) from e
450

451
    def get_docker_image_names(
1✔
452
        self, strip_latest=True, include_tags=True, strip_wellknown_repo_prefixes: bool = True
453
    ):
454
        format_string = "{{.Repository}}:{{.Tag}}" if include_tags else "{{.Repository}}"
×
455
        cmd = self._docker_cmd()
×
456
        cmd += ["images", "--format", format_string]
×
457
        try:
×
458
            output = run(cmd)
×
459

460
            image_names = output.splitlines()
×
461
            if strip_wellknown_repo_prefixes:
×
462
                image_names = Util.strip_wellknown_repo_prefixes(image_names)
×
463
            if strip_latest:
×
464
                Util.append_without_latest(image_names)
×
465

466
            return image_names
×
467
        except Exception as e:
×
468
            LOG.info('Unable to list Docker images via "%s": %s', cmd, e)
×
469
            return []
×
470

471
    def get_container_logs(self, container_name_or_id: str, safe=False) -> str:
1✔
472
        cmd = self._docker_cmd()
1✔
473
        cmd += ["logs", container_name_or_id]
1✔
474
        try:
1✔
475
            return run(cmd)
1✔
476
        except subprocess.CalledProcessError as e:
×
477
            if safe:
×
478
                return ""
×
479
            self._check_and_raise_no_such_container_error(container_name_or_id, error=e)
×
480
            raise ContainerException(
×
481
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
482
            ) from e
483

484
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
485
        self.inspect_container(container_name_or_id)  # guard to check whether container is there
1✔
486

487
        cmd = self._docker_cmd()
1✔
488
        cmd += ["logs", "--follow", container_name_or_id]
1✔
489

490
        process: subprocess.Popen = run(
1✔
491
            cmd, asynchronous=True, outfile=subprocess.PIPE, stderr=subprocess.STDOUT
492
        )
493

494
        return CancellableProcessStream(process)
1✔
495

496
    def _inspect_object(self, object_name_or_id: str) -> dict[str, Union[dict, list, str]]:
1✔
497
        cmd = self._docker_cmd()
1✔
498
        cmd += ["inspect", "--format", "{{json .}}", object_name_or_id]
1✔
499
        try:
1✔
500
            cmd_result = run(cmd, print_error=False)
1✔
501
        except subprocess.CalledProcessError as e:
1✔
502
            # note: case-insensitive comparison, to support Docker and Podman output formats
503
            if "no such object" in to_str(e.stdout).lower():
1✔
504
                raise NoSuchObject(object_name_or_id, stdout=e.stdout, stderr=e.stderr)
1✔
505
            raise ContainerException(
×
506
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
507
            ) from e
508
        object_data = json.loads(cmd_result.strip())
1✔
509
        if isinstance(object_data, list):
1✔
510
            # return first list item, for compatibility with Podman API
511
            if len(object_data) == 1:
×
512
                result = object_data[0]
×
513
                # convert first character to uppercase (e.g., `name` -> `Name`), for Podman/Docker compatibility
514
                result = {first_char_to_upper(k): v for k, v in result.items()}
×
515
                return result
×
516
            LOG.info(
×
517
                "Expected a single object for `inspect` on ID %s, got %s",
518
                object_name_or_id,
519
                len(object_data),
520
            )
521
        return object_data
1✔
522

523
    def inspect_container(self, container_name_or_id: str) -> dict[str, Union[dict, str]]:
1✔
524
        try:
1✔
525
            return self._inspect_object(container_name_or_id)
1✔
526
        except NoSuchObject as e:
1✔
527
            raise NoSuchContainer(container_name_or_id=e.object_id)
×
528

529
    def inspect_image(
1✔
530
        self,
531
        image_name: str,
532
        pull: bool = True,
533
        strip_wellknown_repo_prefixes: bool = True,
534
    ) -> dict[str, Union[dict, list, str]]:
535
        image_name = self.registry_resolver_strategy.resolve(image_name)
×
536
        try:
×
537
            result = self._inspect_object(image_name)
×
538
            if strip_wellknown_repo_prefixes:
×
539
                if result.get("RepoDigests"):
×
540
                    result["RepoDigests"] = Util.strip_wellknown_repo_prefixes(
×
541
                        result["RepoDigests"]
542
                    )
543
                if result.get("RepoTags"):
×
544
                    result["RepoTags"] = Util.strip_wellknown_repo_prefixes(result["RepoTags"])
×
545
            return result
×
546
        except NoSuchObject as e:
×
547
            if pull:
×
548
                self.pull_image(image_name)
×
549
                return self.inspect_image(image_name, pull=False)
×
550
            raise NoSuchImage(image_name=e.object_id)
×
551

552
    def create_network(self, network_name: str) -> str:
1✔
553
        cmd = self._docker_cmd()
1✔
554
        cmd += ["network", "create", network_name]
1✔
555
        try:
1✔
556
            return run(cmd).strip()
1✔
557
        except subprocess.CalledProcessError as e:
×
558
            raise ContainerException(
×
559
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
560
            ) from e
561

562
    def delete_network(self, network_name: str) -> None:
1✔
563
        cmd = self._docker_cmd()
1✔
564
        cmd += ["network", "rm", network_name]
1✔
565
        try:
1✔
566
            run(cmd)
1✔
567
        except subprocess.CalledProcessError as e:
×
568
            stdout_str = to_str(e.stdout)
×
569
            if re.match(r".*network (.*) not found.*", stdout_str):
×
570
                raise NoSuchNetwork(network_name=network_name)
×
571
            else:
572
                raise ContainerException(
×
573
                    f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
574
                ) from e
575

576
    def inspect_network(self, network_name: str) -> dict[str, Union[dict, str]]:
1✔
577
        try:
1✔
578
            return self._inspect_object(network_name)
1✔
579
        except NoSuchObject as e:
1✔
580
            raise NoSuchNetwork(network_name=e.object_id)
1✔
581

582
    def connect_container_to_network(
1✔
583
        self,
584
        network_name: str,
585
        container_name_or_id: str,
586
        aliases: Optional[list] = None,
587
        link_local_ips: list[str] = None,
588
    ) -> None:
589
        LOG.debug(
×
590
            "Connecting container '%s' to network '%s' with aliases '%s'",
591
            container_name_or_id,
592
            network_name,
593
            aliases,
594
        )
595
        cmd = self._docker_cmd()
×
596
        cmd += ["network", "connect"]
×
597
        if aliases:
×
598
            cmd += ["--alias", ",".join(aliases)]
×
599
        if link_local_ips:
×
600
            cmd += ["--link-local-ip", ",".join(link_local_ips)]
×
601
        cmd += [network_name, container_name_or_id]
×
602
        try:
×
603
            run(cmd)
×
604
        except subprocess.CalledProcessError as e:
×
605
            stdout_str = to_str(e.stdout)
×
606
            if re.match(r".*network (.*) not found.*", stdout_str):
×
607
                raise NoSuchNetwork(network_name=network_name)
×
608
            self._check_and_raise_no_such_container_error(container_name_or_id, error=e)
×
609
            raise ContainerException(
×
610
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
611
            ) from e
612

613
    def disconnect_container_from_network(
1✔
614
        self, network_name: str, container_name_or_id: str
615
    ) -> None:
616
        LOG.debug(
1✔
617
            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name
618
        )
619
        cmd = self._docker_cmd() + ["network", "disconnect", network_name, container_name_or_id]
1✔
620
        try:
1✔
621
            run(cmd)
1✔
622
        except subprocess.CalledProcessError as e:
×
623
            stdout_str = to_str(e.stdout)
×
624
            if re.match(r".*network (.*) not found.*", stdout_str):
×
625
                raise NoSuchNetwork(network_name=network_name)
×
626
            self._check_and_raise_no_such_container_error(container_name_or_id, error=e)
×
627
            raise ContainerException(
×
628
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
629
            ) from e
630

631
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
632
        cmd = self._docker_cmd()
1✔
633
        cmd += [
1✔
634
            "inspect",
635
            "--format",
636
            "{{range .NetworkSettings.Networks}}{{.IPAddress}} {{end}}",
637
            container_name_or_id,
638
        ]
639
        try:
1✔
640
            result = run(cmd).strip()
1✔
641
            return result.split(" ")[0] if result else ""
1✔
642
        except subprocess.CalledProcessError as e:
×
643
            self._check_and_raise_no_such_container_error(container_name_or_id, error=e)
×
644
            # consider different error messages for Podman
645
            if "no such object" in to_str(e.stdout).lower():
×
646
                raise NoSuchContainer(container_name_or_id, stdout=e.stdout, stderr=e.stderr)
×
647
            raise ContainerException(
×
648
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
649
            ) from e
650

651
    def login(self, username: str, password: str, registry: Optional[str] = None) -> None:
1✔
652
        cmd = self._docker_cmd()
×
653
        # TODO specify password via stdin
654
        cmd += ["login", "-u", username, "-p", password]
×
655
        if registry:
×
656
            cmd.append(registry)
×
657
        try:
×
658
            run(cmd)
×
659
        except subprocess.CalledProcessError as e:
×
660
            raise ContainerException(
×
661
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
662
            ) from e
663

664
    @functools.cache
1✔
665
    def has_docker(self) -> bool:
1✔
666
        try:
1✔
667
            # do not use self._docker_cmd here (would result in a loop)
668
            run(shlex.split(config.DOCKER_CMD) + ["ps"])
1✔
669
            return True
1✔
670
        except (subprocess.CalledProcessError, FileNotFoundError):
1✔
671
            return False
1✔
672

673
    def create_container(self, image_name: str, **kwargs) -> str:
1✔
674
        image_name = self.registry_resolver_strategy.resolve(image_name)
1✔
675
        cmd, env_file = self._build_run_create_cmd("create", image_name, **kwargs)
1✔
676
        LOG.debug("Create container with cmd: %s", cmd)
1✔
677
        try:
1✔
678
            container_id = run(cmd)
1✔
679
            # Note: strip off Docker warning messages like "DNS setting (--dns=127.0.0.1) may fail in containers"
680
            container_id = container_id.strip().split("\n")[-1]
1✔
681
            return container_id.strip()
1✔
682
        except subprocess.CalledProcessError as e:
×
683
            error_messages = ["Unable to find image", "Trying to pull"]
×
684
            if any(msg in to_str(e.stdout) for msg in error_messages):
×
685
                raise NoSuchImage(image_name, stdout=e.stdout, stderr=e.stderr)
×
686
            raise ContainerException(
×
687
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
688
            ) from e
689
        finally:
690
            Util.rm_env_vars_file(env_file)
1✔
691

692
    def run_container(self, image_name: str, stdin=None, **kwargs) -> tuple[bytes, bytes]:
1✔
693
        image_name = self.registry_resolver_strategy.resolve(image_name)
×
694
        cmd, env_file = self._build_run_create_cmd("run", image_name, **kwargs)
×
695
        LOG.debug("Run container with cmd: %s", cmd)
×
696
        try:
×
697
            return self._run_async_cmd(cmd, stdin, kwargs.get("name") or "", image_name)
×
698
        except ContainerException as e:
×
699
            if "Trying to pull" in str(e) and "access to the resource is denied" in str(e):
×
700
                raise NoSuchImage(image_name, stdout=e.stdout, stderr=e.stderr) from e
×
701
            raise
×
702
        finally:
703
            Util.rm_env_vars_file(env_file)
×
704

705
    def exec_in_container(
1✔
706
        self,
707
        container_name_or_id: str,
708
        command: Union[list[str], str],
709
        interactive=False,
710
        detach=False,
711
        env_vars: Optional[dict[str, Optional[str]]] = None,
712
        stdin: Optional[bytes] = None,
713
        user: Optional[str] = None,
714
        workdir: Optional[str] = None,
715
    ) -> tuple[bytes, bytes]:
716
        env_file = None
1✔
717
        cmd = self._docker_cmd()
1✔
718
        cmd.append("exec")
1✔
719
        if interactive:
1✔
720
            cmd.append("--interactive")
×
721
        if detach:
1✔
722
            cmd.append("--detach")
×
723
        if user:
1✔
724
            cmd += ["--user", user]
×
725
        if workdir:
1✔
726
            cmd += ["--workdir", workdir]
×
727
        if env_vars:
1✔
728
            env_flag, env_file = Util.create_env_vars_file_flag(env_vars)
×
729
            cmd += env_flag
×
730
        cmd.append(container_name_or_id)
1✔
731
        cmd += command if isinstance(command, list) else [command]
1✔
732
        LOG.debug("Execute command in container: %s", cmd)
1✔
733
        try:
1✔
734
            return self._run_async_cmd(cmd, stdin, container_name_or_id)
1✔
735
        finally:
736
            Util.rm_env_vars_file(env_file)
1✔
737

738
    def start_container(
1✔
739
        self,
740
        container_name_or_id: str,
741
        stdin=None,
742
        interactive: bool = False,
743
        attach: bool = False,
744
        flags: Optional[str] = None,
745
    ) -> tuple[bytes, bytes]:
746
        cmd = self._docker_cmd() + ["start"]
1✔
747
        if flags:
1✔
748
            cmd.append(flags)
×
749
        if interactive:
1✔
750
            cmd.append("--interactive")
×
751
        if attach:
1✔
752
            cmd.append("--attach")
×
753
        cmd.append(container_name_or_id)
1✔
754
        LOG.debug("Start container with cmd: %s", cmd)
1✔
755
        return self._run_async_cmd(cmd, stdin, container_name_or_id)
1✔
756

757
    def attach_to_container(self, container_name_or_id: str):
1✔
758
        cmd = self._docker_cmd() + ["attach", container_name_or_id]
1✔
759
        LOG.debug("Attaching to container %s", container_name_or_id)
1✔
760
        return self._run_async_cmd(cmd, stdin=None, container_name=container_name_or_id)
1✔
761

762
    def _run_async_cmd(
1✔
763
        self, cmd: list[str], stdin: bytes, container_name: str, image_name=None
764
    ) -> tuple[bytes, bytes]:
765
        kwargs = {
1✔
766
            "inherit_env": True,
767
            "asynchronous": True,
768
            "stderr": subprocess.PIPE,
769
            "outfile": self.default_run_outfile or subprocess.PIPE,
770
        }
771
        if stdin:
1✔
772
            kwargs["stdin"] = True
×
773
        try:
1✔
774
            process = run(cmd, **kwargs)
1✔
775
            stdout, stderr = process.communicate(input=stdin)
1✔
776
            if process.returncode != 0:
1✔
777
                raise subprocess.CalledProcessError(
×
778
                    process.returncode,
779
                    cmd,
780
                    stdout,
781
                    stderr,
782
                )
783
            else:
784
                return stdout, stderr
1✔
785
        except subprocess.CalledProcessError as e:
×
786
            stderr_str = to_str(e.stderr)
×
787
            if "Unable to find image" in stderr_str:
×
788
                raise NoSuchImage(image_name or "", stdout=e.stdout, stderr=e.stderr)
×
789
            # consider different error messages for Docker/Podman
790
            error_messages = ("No such container", "no container with name or ID")
×
791
            if any(msg.lower() in to_str(e.stderr).lower() for msg in error_messages):
×
792
                raise NoSuchContainer(container_name, stdout=e.stdout, stderr=e.stderr)
×
793
            raise ContainerException(
×
794
                f"Docker process returned with errorcode {e.returncode}", e.stdout, e.stderr
795
            ) from e
796

797
    def _build_run_create_cmd(
1✔
798
        self,
799
        action: str,
800
        image_name: str,
801
        *,
802
        name: Optional[str] = None,
803
        entrypoint: Optional[Union[list[str], str]] = None,
804
        remove: bool = False,
805
        interactive: bool = False,
806
        tty: bool = False,
807
        detach: bool = False,
808
        command: Optional[Union[list[str], str]] = None,
809
        volumes: Optional[list[SimpleVolumeBind]] = None,
810
        ports: Optional[PortMappings] = None,
811
        exposed_ports: Optional[list[str]] = None,
812
        env_vars: Optional[dict[str, str]] = None,
813
        user: Optional[str] = None,
814
        cap_add: Optional[list[str]] = None,
815
        cap_drop: Optional[list[str]] = None,
816
        security_opt: Optional[list[str]] = None,
817
        network: Optional[str] = None,
818
        dns: Optional[Union[str, list[str]]] = None,
819
        additional_flags: Optional[str] = None,
820
        workdir: Optional[str] = None,
821
        privileged: Optional[bool] = None,
822
        labels: Optional[dict[str, str]] = None,
823
        platform: Optional[DockerPlatform] = None,
824
        ulimits: Optional[list[Ulimit]] = None,
825
        init: Optional[bool] = None,
826
        log_config: Optional[LogConfig] = None,
827
    ) -> tuple[list[str], str]:
828
        env_file = None
1✔
829
        cmd = self._docker_cmd() + [action]
1✔
830
        if remove:
1✔
831
            cmd.append("--rm")
1✔
832
        if name:
1✔
833
            cmd += ["--name", name]
1✔
834
        if entrypoint is not None:  # empty string entrypoint can be intentional
1✔
835
            if isinstance(entrypoint, str):
1✔
836
                cmd += ["--entrypoint", entrypoint]
1✔
837
            else:
838
                cmd += ["--entrypoint", shlex.join(entrypoint)]
×
839
        if privileged:
1✔
840
            cmd += ["--privileged"]
×
841
        if volumes:
1✔
842
            cmd += [
1✔
843
                param for volume in volumes for param in ["-v", self._map_to_volume_param(volume)]
844
            ]
845
        if interactive:
1✔
846
            cmd.append("--interactive")
×
847
        if tty:
1✔
848
            cmd.append("--tty")
×
849
        if detach:
1✔
850
            cmd.append("--detach")
×
851
        if ports:
1✔
852
            cmd += ports.to_list()
1✔
853
        if exposed_ports:
1✔
854
            cmd += list(itertools.chain.from_iterable(["--expose", port] for port in exposed_ports))
×
855
        if env_vars:
1✔
856
            env_flags, env_file = Util.create_env_vars_file_flag(env_vars)
1✔
857
            cmd += env_flags
1✔
858
        if user:
1✔
859
            cmd += ["--user", user]
×
860
        if cap_add:
1✔
861
            cmd += list(itertools.chain.from_iterable(["--cap-add", cap] for cap in cap_add))
×
862
        if cap_drop:
1✔
863
            cmd += list(itertools.chain.from_iterable(["--cap-drop", cap] for cap in cap_drop))
×
864
        if security_opt:
1✔
865
            cmd += list(
×
866
                itertools.chain.from_iterable(["--security-opt", opt] for opt in security_opt)
867
            )
868
        if network:
1✔
869
            cmd += ["--network", network]
1✔
870
        if dns:
1✔
871
            for dns_server in ensure_list(dns):
×
872
                cmd += ["--dns", dns_server]
×
873
        if workdir:
1✔
874
            cmd += ["--workdir", workdir]
×
875
        if labels:
1✔
876
            for key, value in labels.items():
×
877
                cmd += ["--label", f"{key}={value}"]
×
878
        if platform:
1✔
879
            cmd += ["--platform", platform]
1✔
880
        if ulimits:
1✔
881
            cmd += list(
×
882
                itertools.chain.from_iterable(["--ulimit", str(ulimit)] for ulimit in ulimits)
883
            )
884
        if init:
1✔
885
            cmd += ["--init"]
×
886
        if log_config:
1✔
887
            cmd += ["--log-driver", log_config.type]
×
888
            for key, value in log_config.config.items():
×
889
                cmd += ["--log-opt", f"{key}={value}"]
×
890

891
        if additional_flags:
1✔
892
            cmd += shlex.split(additional_flags)
1✔
893
        cmd.append(image_name)
1✔
894
        if command:
1✔
895
            cmd += command if isinstance(command, list) else [command]
1✔
896
        return cmd, env_file
1✔
897

898
    @staticmethod
1✔
899
    def _map_to_volume_param(volume: Union[SimpleVolumeBind, BindMount, VolumeDirMount]) -> str:
1✔
900
        """
901
        Maps the mount volume, to a parameter for the -v docker cli argument.
902

903
        Examples:
904
        (host_path, container_path) -> host_path:container_path
905
        VolumeBind(host_dir=host_path, container_dir=container_path, read_only=True) -> host_path:container_path:ro
906

907
        :param volume: Either a SimpleVolumeBind, in essence a tuple (host_dir, container_dir), or a VolumeBind object
908
        :return: String which is passable as parameter to the docker cli -v option
909
        """
910
        if isinstance(volume, (BindMount, VolumeDirMount)):
1✔
911
            return volume.to_str()
1✔
912
        else:
913
            return f"{volume[0]}:{volume[1]}"
×
914

915
    def _check_and_raise_no_such_container_error(
1✔
916
        self, container_name_or_id: str, error: subprocess.CalledProcessError
917
    ):
918
        """
919
        Check the given client invocation error and raise a `NoSuchContainer` exception if it
920
        represents a `no such container` exception from Docker or Podman.
921
        """
922
        self._check_output_and_raise_no_such_container_error(
1✔
923
            container_name_or_id, str(error.stdout), error=str(error.stderr)
924
        )
925

926
    def _check_output_and_raise_no_such_container_error(
1✔
927
        self, container_name_or_id: str, output: str, error: Optional[str] = None
928
    ):
929
        """
930
        Check the given client invocation output and raise a `NoSuchContainer` exception if it
931
        represents a `no such container` exception from Docker or Podman.
932
        """
933
        possible_not_found_messages = ("No such container", "no container with name or ID")
1✔
934
        if any(msg.lower() in output.lower() for msg in possible_not_found_messages):
1✔
935
            raise NoSuchContainer(container_name_or_id, stdout=output, stderr=error)
1✔
936

937
    def _transform_container_labels(self, labels: Union[str, dict[str, str]]) -> dict[str, str]:
1✔
938
        """
939
        Transforms the container labels returned by the docker command from the key-value pair format to a dict
940
        :param labels: Input string, comma separated key value pairs. Example: key1=value1,key2=value2
941
        :return: Dict representation of the passed values, example: {"key1": "value1", "key2": "value2"}
942
        """
943
        if isinstance(labels, dict):
1✔
944
            return labels
×
945

946
        labels = labels.split(",")
1✔
947
        labels = [label.partition("=") for label in labels]
1✔
948
        return {label[0]: label[2] for label in labels}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc