• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20942662173

12 Jan 2026 04:45PM UTC coverage: 86.905% (-0.03%) from 86.936%
20942662173

push

github

web-flow
Allow authenticated pull and push of docker images (#13569)

34 of 51 new or added lines in 4 files covered. (66.67%)

247 existing lines in 15 files now uncovered.

70218 of 80799 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.79
/localstack-core/localstack/utils/container_utils/container_client.py
1
import dataclasses
1✔
2
import io
1✔
3
import ipaddress
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import shlex
1✔
8
import tarfile
1✔
9
import tempfile
1✔
10
from abc import ABCMeta, abstractmethod
1✔
11
from collections.abc import Callable
1✔
12
from enum import Enum, unique
1✔
13
from pathlib import Path
1✔
14
from typing import (
1✔
15
    Literal,
16
    NamedTuple,
17
    Protocol,
18
    TypeAlias,
19
    TypedDict,
20
)
21

22
import dotenv
1✔
23

24
from localstack import config
1✔
25
from localstack.constants import DEFAULT_VOLUME_DIR
1✔
26
from localstack.utils.collections import HashableList, ensure_list
1✔
27
from localstack.utils.files import TMP_FILES, chmod_r, rm_rf, save_file
1✔
28
from localstack.utils.no_exit_argument_parser import NoExitArgumentParser
1✔
29
from localstack.utils.strings import short_uid
1✔
30

31
LOG = logging.getLogger(__name__)
1✔
32

33
# list of well-known image repo prefixes that should be stripped off to canonicalize image names
34
WELL_KNOWN_IMAGE_REPO_PREFIXES = ("localhost/", "docker.io/library/")
1✔
35

36

37
def get_registry_from_image_name(image_name: str) -> str:
1✔
38
    parts = image_name.split("/", maxsplit=1)
1✔
39

40
    if prefix := config.DOCKER_GLOBAL_IMAGE_PREFIX:
1✔
41
        return prefix
1✔
42

43
    if len(parts) == 1:
1✔
44
        # If no slash is present at all, it's an image name
45
        return "docker.io"
1✔
46

47
    potential_registry = parts[0]
1✔
48

49
    registry_indicators = (".", ":", "localhost")
1✔
50
    if any(indicator in potential_registry for indicator in registry_indicators):
1✔
51
        # This indicates a registry domain or a local registry
52
        return potential_registry
1✔
53

54
    # No explicit registry, assume Docker Hub
55
    return "docker.io"
1✔
56

57

58
@unique
1✔
59
class DockerContainerStatus(Enum):
1✔
60
    DOWN = -1
1✔
61
    NON_EXISTENT = 0
1✔
62
    UP = 1
1✔
63
    PAUSED = 2
1✔
64

65

66
class DockerContainerStats(TypedDict):
1✔
67
    """Container usage statistics"""
68

69
    Container: str
1✔
70
    ID: str
1✔
71
    Name: str
1✔
72
    BlockIO: tuple[int, int]
1✔
73
    CPUPerc: float
1✔
74
    MemPerc: float
1✔
75
    MemUsage: tuple[int, int]
1✔
76
    NetIO: tuple[int, int]
1✔
77
    PIDs: int
1✔
78
    SDKStats: dict | None
1✔
79

80

81
class ContainerException(Exception):
1✔
82
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
1✔
83
        self.message = message or "Error during the communication with the docker daemon"
1✔
84
        self.stdout = stdout
1✔
85
        self.stderr = stderr
1✔
86

87

88
class NoSuchObject(ContainerException):
1✔
89
    def __init__(self, object_id: str, message=None, stdout=None, stderr=None) -> None:
1✔
90
        message = message or f"Docker object {object_id} not found"
1✔
91
        super().__init__(message, stdout, stderr)
1✔
92
        self.object_id = object_id
1✔
93

94

95
class NoSuchContainer(ContainerException):
1✔
96
    def __init__(self, container_name_or_id: str, message=None, stdout=None, stderr=None) -> None:
1✔
97
        message = message or f"Docker container {container_name_or_id} not found"
1✔
98
        super().__init__(message, stdout, stderr)
1✔
99
        self.container_name_or_id = container_name_or_id
1✔
100

101

102
class NoSuchImage(ContainerException):
1✔
103
    def __init__(self, image_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
104
        message = message or f"Docker image {image_name} not found"
1✔
105
        super().__init__(message, stdout, stderr)
1✔
106
        self.image_name = image_name
1✔
107

108

109
class NoSuchNetwork(ContainerException):
1✔
110
    def __init__(self, network_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
111
        message = message or f"Docker network {network_name} not found"
1✔
112
        super().__init__(message, stdout, stderr)
1✔
113
        self.network_name = network_name
1✔
114

115

116
class RegistryConnectionError(ContainerException):
1✔
117
    def __init__(self, details: str, message=None, stdout=None, stderr=None) -> None:
1✔
118
        message = message or f"Connection error: {details}"
1✔
119
        super().__init__(message, stdout, stderr)
1✔
120
        self.details = details
1✔
121

122

123
class DockerNotAvailable(ContainerException):
1✔
124
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
1✔
125
        message = message or "Docker not available"
1✔
126
        super().__init__(message, stdout, stderr)
1✔
127

128

129
class AccessDenied(ContainerException):
1✔
130
    def __init__(self, object_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
131
        message = message or f"Access denied to {object_name}"
1✔
132
        super().__init__(message, stdout, stderr)
1✔
133
        self.object_name = object_name
1✔
134

135

136
class CancellableStream(Protocol):
1✔
137
    """Describes a generator that can be closed. Borrowed from ``docker.types.daemon``."""
138

139
    def __iter__(self):
1✔
140
        raise NotImplementedError
141

142
    def __next__(self):
1✔
143
        raise NotImplementedError
144

145
    def close(self):
1✔
146
        raise NotImplementedError
147

148

149
# TODO: Migrate to StrEnum once the CLI does not need to support Python 3.10 (EOL Oct'26) anymore
150
class DockerPlatform(str):
1✔
151
    """Platform in the format ``os[/arch[/variant]]``"""
152

153
    linux_amd64 = "linux/amd64"
1✔
154
    linux_arm64 = "linux/arm64"
1✔
155

156

157
@dataclasses.dataclass
1✔
158
class Ulimit:
1✔
159
    """The ``ulimit`` settings for the container.
160
    See https://www.tutorialspoint.com/setting-ulimit-values-on-docker-containers
161
    """
162

163
    name: str
1✔
164
    soft_limit: int
1✔
165
    hard_limit: int | None = None
1✔
166

167
    def __repr__(self):
168
        """Format: <type>=<soft limit>[:<hard limit>]"""
169
        ulimit_string = f"{self.name}={self.soft_limit}"
170
        if self.hard_limit:
171
            ulimit_string += f":{self.hard_limit}"
172
        return ulimit_string
173

174

175
# defines the type for port mappings (source->target port range)
176
PortRange = list | HashableList
1✔
177
# defines the protocol for a port range ("tcp" or "udp")
178
PortProtocol = str
1✔
179

180

181
class PortMappings:
1✔
182
    """Maps source to target port ranges for Docker port mappings."""
183

184
    # bind host to be used for defining port mappings
185
    bind_host: str
1✔
186
    # maps `from` port range to `to` port range for port mappings
187
    mappings: dict[tuple[PortRange, PortProtocol], list]
1✔
188

189
    def __init__(self, bind_host: str = None):
1✔
190
        self.bind_host = bind_host if bind_host else ""
1✔
191
        self.mappings = {}
1✔
192

193
    def add(
1✔
194
        self,
195
        port: int | PortRange,
196
        mapped: int | PortRange = None,
197
        protocol: PortProtocol = "tcp",
198
    ):
199
        mapped = mapped or port
1✔
200
        if isinstance(port, PortRange):
1✔
201
            for i in range(port[1] - port[0] + 1):
1✔
202
                if isinstance(mapped, PortRange):
1✔
203
                    self.add(port[0] + i, mapped[0] + i, protocol)
1✔
204
                else:
205
                    self.add(port[0] + i, mapped, protocol)
1✔
206
            return
1✔
207
        if port is None or int(port) < 0:
1✔
UNCOV
208
            raise Exception(f"Unable to add mapping for invalid port: {port}")
×
209
        if self.contains(port, protocol):
1✔
210
            return
1✔
211
        bisected_host_port = None
1✔
212
        for (from_range, from_protocol), to_range in self.mappings.items():
1✔
213
            if not from_protocol == protocol:
1✔
214
                continue
1✔
215
            if not self.in_expanded_range(port, from_range):
1✔
216
                continue
1✔
217
            if not self.in_expanded_range(mapped, to_range):
1✔
218
                continue
1✔
219
            from_range_len = from_range[1] - from_range[0]
1✔
220
            to_range_len = to_range[1] - to_range[0]
1✔
221
            is_uniform = from_range_len == to_range_len
1✔
222
            if is_uniform:
1✔
223
                self.expand_range(port, from_range, protocol=protocol, remap=True)
1✔
224
                self.expand_range(mapped, to_range, protocol=protocol)
1✔
225
            else:
226
                if not self.in_range(mapped, to_range):
1✔
227
                    continue
1✔
228
                # extending a 1 to 1 mapping to be many to 1
229
                elif from_range_len == 1:
1✔
230
                    self.expand_range(port, from_range, protocol=protocol, remap=True)
1✔
231
                # splitting a uniform mapping
232
                else:
233
                    bisected_port_index = mapped - to_range[0]
1✔
234
                    bisected_host_port = from_range[0] + bisected_port_index
1✔
235
                    self.bisect_range(mapped, to_range, protocol=protocol)
1✔
236
                    self.bisect_range(bisected_host_port, from_range, protocol=protocol, remap=True)
1✔
237
                    break
1✔
238
            return
1✔
239
        if bisected_host_port is None:
1✔
240
            port_range = [port, port]
1✔
241
        elif bisected_host_port < port:
1✔
242
            port_range = [bisected_host_port, port]
1✔
243
        else:
UNCOV
244
            port_range = [port, bisected_host_port]
×
245
        protocol = str(protocol or "tcp").lower()
1✔
246
        self.mappings[(HashableList(port_range), protocol)] = [mapped, mapped]
1✔
247

248
    def to_str(self) -> str:
1✔
249
        bind_address = f"{self.bind_host}:" if self.bind_host else ""
1✔
250

251
        def entry(k, v):
1✔
252
            from_range, protocol = k
1✔
253
            to_range = v
1✔
254
            # use /<protocol> suffix if the protocol is not"tcp"
255
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
1✔
256
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
1✔
257
                return f"-p {bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"
1✔
258
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
1✔
259
                return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}{protocol_suffix}"
1✔
260
            return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}"
1✔
261

262
        return " ".join([entry(k, v) for k, v in self.mappings.items()])
1✔
263

264
    def to_list(self) -> list[str]:  # TODO test
1✔
265
        bind_address = f"{self.bind_host}:" if self.bind_host else ""
1✔
266

267
        def entry(k, v):
1✔
268
            from_range, protocol = k
1✔
269
            to_range = v
1✔
270
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
1✔
271
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
1✔
272
                return ["-p", f"{bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"]
1✔
273
            return [
1✔
274
                "-p",
275
                f"{bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}",
276
            ]
277

278
        return [item for k, v in self.mappings.items() for item in entry(k, v)]
1✔
279

280
    def to_dict(self) -> dict[str, tuple[str, int | list[int]] | int]:
1✔
281
        bind_address = self.bind_host or ""
1✔
282

283
        def bind_port(bind_address, host_port):
1✔
284
            if host_port == 0:
1✔
285
                return None
1✔
286
            elif bind_address:
1✔
287
                return (bind_address, host_port)
1✔
288
            else:
289
                return host_port
1✔
290

291
        def entry(k, v):
1✔
292
            from_range, protocol = k
1✔
293
            to_range = v
1✔
294
            protocol_suffix = f"/{protocol}"
1✔
295
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
1✔
296
                container_port = to_range[0]
1✔
297
                host_ports = list(range(from_range[0], from_range[1] + 1))
1✔
298
                return [
1✔
299
                    (
300
                        f"{container_port}{protocol_suffix}",
301
                        (bind_address, host_ports) if bind_address else host_ports,
302
                    )
303
                ]
304
            return [
1✔
305
                (
306
                    f"{container_port}{protocol_suffix}",
307
                    bind_port(bind_address, host_port),
308
                )
309
                for container_port, host_port in zip(
310
                    range(to_range[0], to_range[1] + 1),
311
                    range(from_range[0], from_range[1] + 1),
312
                    strict=False,
313
                )
314
            ]
315

316
        items = [item for k, v in self.mappings.items() for item in entry(k, v)]
1✔
317
        return dict(items)
1✔
318

319
    def contains(self, port: int, protocol: PortProtocol = "tcp") -> bool:
1✔
320
        for from_range_w_protocol, to_range in self.mappings.items():
1✔
321
            from_protocol = from_range_w_protocol[1]
1✔
322
            if from_protocol == protocol:
1✔
323
                from_range = from_range_w_protocol[0]
1✔
324
                if self.in_range(port, from_range):
1✔
325
                    return True
1✔
326

327
    def in_range(self, port: int, range: PortRange) -> bool:
1✔
328
        return port >= range[0] and port <= range[1]
1✔
329

330
    def in_expanded_range(self, port: int, range: PortRange):
1✔
331
        return port >= range[0] - 1 and port <= range[1] + 1
1✔
332

333
    def expand_range(
1✔
334
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
335
    ):
336
        """
337
        Expand the given port range by the given port. If remap==True, put the updated range into self.mappings
338
        """
339
        if self.in_range(port, range):
1✔
340
            return
1✔
341
        new_range = list(range) if remap else range
1✔
342
        if port == range[0] - 1:
1✔
UNCOV
343
            new_range[0] = port
×
344
        elif port == range[1] + 1:
1✔
345
            new_range[1] = port
1✔
346
        else:
UNCOV
347
            raise Exception(f"Unable to add port {port} to existing range {range}")
×
348
        if remap:
1✔
349
            self._remap_range(range, new_range, protocol=protocol)
1✔
350

351
    def bisect_range(
1✔
352
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
353
    ):
354
        """
355
        Bisect a port range, at the provided port. This is needed in some cases when adding a
356
        non-uniform host to port mapping adjacent to an existing port range.
357
        If remap==True, put the updated range into self.mappings
358
        """
359
        if not self.in_range(port, range):
1✔
UNCOV
360
            return
×
361
        new_range = list(range) if remap else range
1✔
362
        if port == range[0]:
1✔
UNCOV
363
            new_range[0] = port + 1
×
364
        else:
365
            new_range[1] = port - 1
1✔
366
        if remap:
1✔
367
            self._remap_range(range, new_range, protocol)
1✔
368

369
    def _remap_range(self, old_key: PortRange, new_key: PortRange, protocol: PortProtocol):
1✔
370
        self.mappings[(HashableList(new_key), protocol)] = self.mappings.pop(
1✔
371
            (HashableList(old_key), protocol)
372
        )
373

374
    def __repr__(self):
375
        return f"<PortMappings: {self.to_dict()}>"
376

377

378
SimpleVolumeBind = tuple[str, str]
1✔
379
"""Type alias for a simple version of VolumeBind"""
1✔
380

381

382
@dataclasses.dataclass
1✔
383
class Mount:
1✔
384
    def to_str(self) -> str:
1✔
UNCOV
385
        return str(self)
×
386

387

388
@dataclasses.dataclass
1✔
389
class BindMount(Mount):
1✔
390
    """Represents a --volume argument run/create command. When using VolumeBind to bind-mount a file or directory
391
    that does not yet exist on the Docker host, -v creates the endpoint for you. It is always created as a directory.
392
    """
393

394
    host_dir: str
1✔
395
    container_dir: str
1✔
396
    read_only: bool = False
1✔
397

398
    def to_str(self) -> str:
1✔
399
        args = []
1✔
400

401
        if self.host_dir:
1✔
402
            args.append(self.host_dir)
1✔
403

404
        if not self.container_dir:
1✔
UNCOV
405
            raise ValueError("no container dir specified")
×
406

407
        args.append(self.container_dir)
1✔
408

409
        if self.read_only:
1✔
UNCOV
410
            args.append("ro")
×
411

412
        return ":".join(args)
1✔
413

414
    def to_docker_sdk_parameters(self) -> tuple[str, dict[str, str]]:
1✔
415
        return str(self.host_dir), {
1✔
416
            "bind": self.container_dir,
417
            "mode": "ro" if self.read_only else "rw",
418
        }
419

420
    @classmethod
1✔
421
    def parse(cls, param: str) -> "BindMount":
1✔
422
        parts = param.split(":")
1✔
423
        if 1 > len(parts) > 3:
1✔
UNCOV
424
            raise ValueError(f"Cannot parse volume bind {param}")
×
425

426
        volume = cls(parts[0], parts[1])
1✔
427
        if len(parts) == 3:
1✔
428
            if "ro" in parts[2].split(","):
1✔
429
                volume.read_only = True
1✔
430
        return volume
1✔
431

432

433
@dataclasses.dataclass
1✔
434
class VolumeDirMount(Mount):
1✔
435
    volume_path: str
1✔
436
    """
1✔
437
    Absolute path inside /var/lib/localstack to mount into the container
438
    """
439
    container_path: str
1✔
440
    """
1✔
441
    Target path inside the started container
442
    """
443
    read_only: bool = False
1✔
444

445
    def to_str(self) -> str:
1✔
446
        self._validate()
1✔
447
        from localstack.utils.docker_utils import get_host_path_for_path_in_docker
1✔
448

449
        host_dir = get_host_path_for_path_in_docker(self.volume_path)
1✔
450
        return f"{host_dir}:{self.container_path}{':ro' if self.read_only else ''}"
1✔
451

452
    def _validate(self):
1✔
453
        if not self.volume_path:
1✔
UNCOV
454
            raise ValueError("no volume dir specified")
×
455
        if config.is_in_docker and not self.volume_path.startswith(DEFAULT_VOLUME_DIR):
1✔
UNCOV
456
            raise ValueError(f"volume dir not starting with {DEFAULT_VOLUME_DIR}")
×
457
        if not self.container_path:
1✔
UNCOV
458
            raise ValueError("no container dir specified")
×
459

460
    def to_docker_sdk_parameters(self) -> tuple[str, dict[str, str]]:
1✔
461
        self._validate()
1✔
462
        from localstack.utils.docker_utils import get_host_path_for_path_in_docker
1✔
463

464
        host_dir = get_host_path_for_path_in_docker(self.volume_path)
1✔
465
        return host_dir, {
1✔
466
            "bind": self.container_path,
467
            "mode": "ro" if self.read_only else "rw",
468
        }
469

470

471
VolumeMappingSpecification: TypeAlias = SimpleVolumeBind | Mount
1✔
472

473

474
class VolumeMappings:
1✔
475
    mappings: list[VolumeMappingSpecification]
1✔
476

477
    def __init__(
1✔
478
        self,
479
        mappings: list[VolumeMappingSpecification] = None,
480
    ):
481
        self.mappings = mappings if mappings is not None else []
1✔
482

483
    def add(self, mapping: VolumeMappingSpecification):
1✔
484
        self.append(mapping)
1✔
485

486
    def append(self, mapping: VolumeMappingSpecification):
1✔
487
        self.mappings.append(mapping)
1✔
488

489
    def find_target_mapping(self, container_dir: str) -> VolumeMappingSpecification | None:
1✔
490
        """
491
        Looks through the volumes and returns the one where the container dir matches ``container_dir``.
492
        Returns None if there is no volume mapping to the given container directory.
493

494
        :param container_dir: the target of the volume mapping, i.e., the path in the container
495
        :return: the volume mapping or None
496
        """
497
        for volume in self.mappings:
1✔
498
            target_dir = volume[1] if isinstance(volume, tuple) else volume.container_dir
1✔
499
            if container_dir == target_dir:
1✔
UNCOV
500
                return volume
×
501
        return None
1✔
502

503
    def __iter__(self):
1✔
504
        return self.mappings.__iter__()
1✔
505

506
    def __repr__(self):
507
        return self.mappings.__repr__()
508

509
    def __len__(self):
1✔
510
        return len(self.mappings)
1✔
511

512
    def __getitem__(self, item: int):
1✔
UNCOV
513
        return self.mappings[item]
×
514

515

516
VolumeType = Literal["bind", "volume"]
1✔
517

518

519
class VolumeInfo(NamedTuple):
1✔
520
    """Container volume information."""
521

522
    type: VolumeType
1✔
523
    source: str
1✔
524
    destination: str
1✔
525
    mode: str
1✔
526
    rw: bool
1✔
527
    propagation: str
1✔
528
    name: str | None = None
1✔
529
    driver: str | None = None
1✔
530

531

532
@dataclasses.dataclass
1✔
533
class LogConfig:
1✔
534
    type: Literal["json-file", "syslog", "journald", "gelf", "fluentd", "none", "awslogs", "splunk"]
1✔
535
    config: dict[str, str] = dataclasses.field(default_factory=dict)
1✔
536

537

538
@dataclasses.dataclass
1✔
539
class ContainerConfiguration:
1✔
540
    image_name: str
1✔
541
    name: str | None = None
1✔
542
    volumes: VolumeMappings = dataclasses.field(default_factory=VolumeMappings)
1✔
543
    ports: PortMappings = dataclasses.field(default_factory=PortMappings)
1✔
544
    exposed_ports: list[str] = dataclasses.field(default_factory=list)
1✔
545
    entrypoint: list[str] | str | None = None
1✔
546
    additional_flags: str | None = None
1✔
547
    command: list[str] | None = None
1✔
548
    env_vars: dict[str, str] = dataclasses.field(default_factory=dict)
1✔
549

550
    privileged: bool = False
1✔
551
    remove: bool = False
1✔
552
    interactive: bool = False
1✔
553
    tty: bool = False
1✔
554
    detach: bool = False
1✔
555

556
    stdin: str | None = None
1✔
557
    user: str | None = None
1✔
558
    cap_add: list[str] | None = None
1✔
559
    cap_drop: list[str] | None = None
1✔
560
    security_opt: list[str] | None = None
1✔
561
    network: str | None = None
1✔
562
    dns: str | None = None
1✔
563
    workdir: str | None = None
1✔
564
    platform: str | None = None
1✔
565
    ulimits: list[Ulimit] | None = None
1✔
566
    labels: dict[str, str] | None = None
1✔
567
    init: bool | None = None
1✔
568
    log_config: LogConfig | None = None
1✔
569
    cpu_shares: int | None = None
1✔
570
    mem_limit: int | str | None = None
1✔
571
    auth_config: dict[str, str] | None = None
1✔
572

573

574
class ContainerConfigurator(Protocol):
1✔
575
    """Protocol for functional configurators. A ContainerConfigurator modifies, when called,
576
    a ContainerConfiguration in place."""
577

578
    def __call__(self, configuration: ContainerConfiguration):
1✔
579
        """
580
        Modify the given container configuration.
581

582
        :param configuration: the configuration to modify
583
        """
UNCOV
584
        ...
×
585

586

587
@dataclasses.dataclass
1✔
588
class DockerRunFlags:
1✔
589
    """Class to capture Docker run/create flags for a container.
590
    run: https://docs.docker.com/engine/reference/commandline/run/
591
    create: https://docs.docker.com/engine/reference/commandline/create/
592
    """
593

594
    env_vars: dict[str, str] | None
1✔
595
    extra_hosts: dict[str, str] | None
1✔
596
    labels: dict[str, str] | None
1✔
597
    volumes: list[SimpleVolumeBind] | None
1✔
598
    network: str | None
1✔
599
    platform: DockerPlatform | None
1✔
600
    privileged: bool | None
1✔
601
    ports: PortMappings | None
1✔
602
    ulimits: list[Ulimit] | None
1✔
603
    user: str | None
1✔
604
    dns: list[str] | None
1✔
605

606

607
class RegistryResolverStrategy(Protocol):
1✔
608
    def resolve(self, image_name: str) -> str: ...
1✔
609

610

611
class HardCodedResolver:
1✔
612
    def resolve(self, image_name: str) -> str:  # noqa
1✔
613
        return image_name
1✔
614

615

616
# TODO: remove Docker/Podman compatibility switches (in particular strip_wellknown_repo_prefixes=...)
617
#  from the container client base interface and introduce derived Podman client implementations instead!
618
class ContainerClient(metaclass=ABCMeta):
1✔
619
    registry_resolver_strategy: RegistryResolverStrategy = HardCodedResolver()
1✔
620

621
    @abstractmethod
1✔
622
    def get_system_info(self) -> dict:
1✔
623
        """Returns the docker system-wide information as dictionary (``docker info``)."""
624

625
    def get_system_id(self) -> str:
1✔
626
        """Returns the unique and stable ID of the docker daemon."""
627
        return self.get_system_info()["ID"]
1✔
628

629
    @abstractmethod
1✔
630
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
631
        """Returns the status of the container with the given name"""
UNCOV
632
        pass
×
633

634
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
635
        """Returns the usage statistics of the container with the given name"""
UNCOV
636
        pass
×
637

638
    def get_networks(self, container_name: str) -> list[str]:
1✔
639
        LOG.debug("Getting networks for container: %s", container_name)
1✔
640
        container_attrs = self.inspect_container(container_name_or_id=container_name)
1✔
641
        return list(container_attrs["NetworkSettings"].get("Networks", {}).keys())
1✔
642

643
    def get_container_ipv4_for_network(
1✔
644
        self, container_name_or_id: str, container_network: str
645
    ) -> str:
646
        """
647
        Returns the IPv4 address for the container on the interface connected to the given network
648
        :param container_name_or_id: Container to inspect
649
        :param container_network: Network the IP address will belong to
650
        :return: IP address of the given container on the interface connected to the given network
651
        """
652
        LOG.debug(
1✔
653
            "Getting ipv4 address for container %s in network %s.",
654
            container_name_or_id,
655
            container_network,
656
        )
657
        # we always need the ID for this
658
        container_id = self.get_container_id(container_name=container_name_or_id)
1✔
659
        network_attrs = self.inspect_network(container_network)
1✔
660
        containers = network_attrs.get("Containers") or {}
1✔
661
        if container_id not in containers:
1✔
662
            LOG.debug("Network attributes: %s", network_attrs)
1✔
663
            try:
1✔
664
                inspection = self.inspect_container(container_name_or_id=container_name_or_id)
1✔
665
                LOG.debug("Container %s Attributes: %s", container_name_or_id, inspection)
1✔
666
                logs = self.get_container_logs(container_name_or_id=container_name_or_id)
1✔
667
                LOG.debug("Container %s Logs: %s", container_name_or_id, logs)
1✔
668
            except ContainerException as e:
×
UNCOV
669
                LOG.debug("Cannot inspect container %s: %s", container_name_or_id, e)
×
670
            raise ContainerException(
1✔
671
                "Container %s is not connected to target network %s",
672
                container_name_or_id,
673
                container_network,
674
            )
675
        try:
1✔
676
            ip = str(ipaddress.IPv4Interface(containers[container_id]["IPv4Address"]).ip)
1✔
677
        except Exception as e:
×
UNCOV
678
            raise ContainerException(
×
679
                f"Unable to detect IP address for container {container_name_or_id} in network {container_network}: {e}"
680
            )
681
        return ip
1✔
682

683
    @abstractmethod
1✔
684
    def stop_container(self, container_name: str, timeout: int = 10):
1✔
685
        """Stops container with given name
686
        :param container_name: Container identifier (name or id) of the container to be stopped
687
        :param timeout: Timeout after which SIGKILL is sent to the container.
688
        """
689

690
    @abstractmethod
1✔
691
    def restart_container(self, container_name: str, timeout: int = 10):
1✔
692
        """Restarts a container with the given name.
693
        :param container_name: Container identifier
694
        :param timeout: Seconds to wait for stop before killing the container
695
        """
696

697
    @abstractmethod
1✔
698
    def pause_container(self, container_name: str):
1✔
699
        """Pauses a container with the given name."""
700

701
    @abstractmethod
1✔
702
    def unpause_container(self, container_name: str):
1✔
703
        """Unpauses a container with the given name."""
704

705
    @abstractmethod
1✔
706
    def remove_container(
1✔
707
        self, container_name: str, force=True, check_existence=False, volumes=False
708
    ) -> None:
709
        """Removes container
710

711
        :param container_name: Name of the container
712
        :param force: Force the removal of a running container (uses SIGKILL)
713
        :param check_existence: Return if container doesn't exist
714
        :param volumes: Remove anonymous volumes associated with the container
715
        """
716

717
    @abstractmethod
1✔
718
    def remove_image(self, image: str, force: bool = True) -> None:
1✔
719
        """Removes an image with given name
720

721
        :param image: Image name and tag
722
        :param force: Force removal
723
        """
724

725
    @abstractmethod
1✔
726
    def list_containers(self, filter: list[str] | str | None = None, all=True) -> list[dict]:
1✔
727
        """List all containers matching the given filters
728

729
        :return: A list of dicts with keys id, image, name, labels, status
730
        """
731

732
    def get_running_container_names(self) -> list[str]:
1✔
733
        """Returns a list of the names of all running containers"""
734
        return self.__get_container_names(return_all=False)
1✔
735

736
    def get_all_container_names(self) -> list[str]:
1✔
737
        """Returns a list of the names of all containers including stopped ones"""
738
        return self.__get_container_names(return_all=True)
1✔
739

740
    def is_container_running(self, container_name: str) -> bool:
1✔
741
        """Checks whether a container with a given name is currently running"""
742
        return container_name in self.get_running_container_names()
1✔
743

744
    def create_file_in_container(
1✔
745
        self,
746
        container_name,
747
        file_contents: bytes,
748
        container_path: str,
749
        chmod_mode: int | None = None,
750
    ) -> None:
751
        """
752
        Create a file in container with the provided content. Provide the 'chmod_mode' argument if you want the file to have specific permissions.
753
        """
754
        with tempfile.NamedTemporaryFile() as tmp:
1✔
755
            tmp.write(file_contents)
1✔
756
            tmp.flush()
1✔
757
            if chmod_mode is not None:
1✔
UNCOV
758
                chmod_r(tmp.name, chmod_mode)
×
759
            self.copy_into_container(
1✔
760
                container_name=container_name,
761
                local_path=tmp.name,
762
                container_path=container_path,
763
            )
764

765
    @abstractmethod
1✔
766
    def copy_into_container(
1✔
767
        self, container_name: str, local_path: str, container_path: str
768
    ) -> None:
769
        """Copy contents of the given local path into the container"""
770

771
    @abstractmethod
1✔
772
    def copy_from_container(
1✔
773
        self, container_name: str, local_path: str, container_path: str
774
    ) -> None:
775
        """Copy contents of the given container to the host"""
776

777
    @abstractmethod
1✔
778
    def pull_image(
1✔
779
        self,
780
        docker_image: str,
781
        platform: DockerPlatform | None = None,
782
        log_handler: Callable[[str], None] | None = None,
783
        auth_config: dict[str, str] | None = None,
784
    ) -> None:
785
        """
786
        Pulls an image with a given name from a Docker registry
787

788
        :log_handler: Optional parameter that can be used to process the logs. Logs will be streamed if possible, but this is not guaranteed.
789
        :auth_config: Optional authentication configuration for private registries. Dict with keys: username, password, registry
790
        """
791

792
    @abstractmethod
1✔
793
    def push_image(self, docker_image: str, auth_config: dict[str, str] | None = None) -> None:
1✔
794
        """
795
        Pushes an image with a given name to a Docker registry
796

797
        :param docker_image: Image name and tag to push
798
        :param auth_config: Optional authentication configuration for private registries. Dict with keys: username, password, registry
799
        """
800

801
    @abstractmethod
1✔
802
    def build_image(
1✔
803
        self,
804
        dockerfile_path: str,
805
        image_name: str,
806
        context_path: str = None,
807
        platform: DockerPlatform | None = None,
808
    ) -> str:
809
        """Builds an image from the given Dockerfile
810

811
        :param dockerfile_path: Path to Dockerfile, or a directory that contains a Dockerfile
812
        :param image_name: Name of the image to be built
813
        :param context_path: Path for build context (defaults to dirname of Dockerfile)
814
        :param platform: Target platform for build (defaults to platform of Docker host)
815
        :return: Build logs as a string.
816
        """
817

818
    @abstractmethod
1✔
819
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
820
        """Tags an image with a new name
821

822
        :param source_ref: Name or ID of the image to be tagged
823
        :param target_name: New name (tag) of the tagged image
824
        """
825

826
    @abstractmethod
1✔
827
    def get_docker_image_names(
1✔
828
        self,
829
        strip_latest: bool = True,
830
        include_tags: bool = True,
831
        strip_wellknown_repo_prefixes: bool = True,
832
    ) -> list[str]:
833
        """
834
        Get all names of docker images available to the container engine
835
        :param strip_latest: return images both with and without :latest tag
836
        :param include_tags: include tags of the images in the names
837
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
838
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
839
        :return: List of image names
840
        """
841

842
    @abstractmethod
1✔
843
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
844
        """Get all logs of a given container"""
845

846
    @abstractmethod
1✔
847
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
848
        """Returns a blocking generator you can iterate over to retrieve log output as it happens."""
849

850
    @abstractmethod
1✔
851
    def inspect_container(self, container_name_or_id: str) -> dict[str, dict | str]:
1✔
852
        """Get detailed attributes of a container.
853

854
        :return: Dict containing docker attributes as returned by the daemon
855
        """
856

857
    def inspect_container_volumes(self, container_name_or_id) -> list[VolumeInfo]:
1✔
858
        """Return information about the volumes mounted into the given container.
859

860
        :param container_name_or_id: the container name or id
861
        :return: a list of volumes
862
        """
863
        volumes = []
1✔
864
        for doc in self.inspect_container(container_name_or_id)["Mounts"]:
1✔
865
            volumes.append(VolumeInfo(**{k.lower(): v for k, v in doc.items()}))
1✔
866

867
        return volumes
1✔
868

869
    @abstractmethod
1✔
870
    def inspect_image(
1✔
871
        self, image_name: str, pull: bool = True, strip_wellknown_repo_prefixes: bool = True
872
    ) -> dict[str, dict | list | str]:
873
        """Get detailed attributes of an image.
874

875
        :param image_name: Image name to inspect
876
        :param pull: Whether to pull image if not existent
877
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
878
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
879
        :return: Dict containing docker attributes as returned by the daemon
880
        """
881

882
    @abstractmethod
1✔
883
    def create_network(self, network_name: str) -> str:
1✔
884
        """
885
        Creates a network with the given name
886
        :param network_name: Name of the network
887
        :return Network ID
888
        """
889

890
    @abstractmethod
1✔
891
    def delete_network(self, network_name: str) -> None:
1✔
892
        """
893
        Delete a network with the given name
894
        :param network_name: Name of the network
895
        """
896

897
    @abstractmethod
1✔
898
    def inspect_network(self, network_name: str) -> dict[str, dict | str]:
1✔
899
        """Get detailed attributes of an network.
900

901
        :return: Dict containing docker attributes as returned by the daemon
902
        """
903

904
    @abstractmethod
1✔
905
    def connect_container_to_network(
1✔
906
        self,
907
        network_name: str,
908
        container_name_or_id: str,
909
        aliases: list | None = None,
910
        link_local_ips: list[str] = None,
911
    ) -> None:
912
        """
913
        Connects a container to a given network
914
        :param network_name: Network to connect the container to
915
        :param container_name_or_id: Container to connect to the network
916
        :param aliases: List of dns names the container should be available under in the network
917
        :param link_local_ips: List of link-local (IPv4 or IPv6) addresses
918
        """
919

920
    @abstractmethod
1✔
921
    def disconnect_container_from_network(
1✔
922
        self, network_name: str, container_name_or_id: str
923
    ) -> None:
924
        """
925
        Disconnects a container from a given network
926
        :param network_name: Network to disconnect the container from
927
        :param container_name_or_id: Container to disconnect from the network
928
        """
929

930
    def get_container_name(self, container_id: str) -> str:
1✔
931
        """Get the name of a container by a given identifier"""
932
        return self.inspect_container(container_id)["Name"].lstrip("/")
1✔
933

934
    def get_container_id(self, container_name: str) -> str:
1✔
935
        """Get the id of a container by a given name"""
936
        return self.inspect_container(container_name)["Id"]
1✔
937

938
    @abstractmethod
1✔
939
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
940
        """Get the IP address of a given container
941

942
        If container has multiple networks, it will return the IP of the first
943
        """
944

945
    def get_image_cmd(self, docker_image: str, pull: bool = True) -> list[str]:
1✔
946
        """Get the command for the given image
947
        :param docker_image: Docker image to inspect
948
        :param pull: Whether to pull if image is not present
949
        :return: Image command in its array form
950
        """
951
        cmd_list = self.inspect_image(docker_image, pull)["Config"]["Cmd"] or []
1✔
952
        return cmd_list
1✔
953

954
    def get_image_entrypoint(self, docker_image: str, pull: bool = True) -> str:
1✔
955
        """Get the entry point for the given image
956
        :param docker_image: Docker image to inspect
957
        :param pull: Whether to pull if image is not present
958
        :return: Image entrypoint
959
        """
960
        LOG.debug("Getting the entrypoint for image: %s", docker_image)
1✔
961
        entrypoint_list = self.inspect_image(docker_image, pull)["Config"].get("Entrypoint") or []
1✔
962
        return shlex.join(entrypoint_list)
1✔
963

964
    @abstractmethod
1✔
965
    def has_docker(self) -> bool:
1✔
966
        """Check if system has docker available"""
967

968
    @abstractmethod
1✔
969
    def commit(
1✔
970
        self,
971
        container_name_or_id: str,
972
        image_name: str,
973
        image_tag: str,
974
    ):
975
        """Create an image from a running container.
976

977
        :param container_name_or_id: Source container
978
        :param image_name: Destination image name
979
        :param image_tag: Destination image tag
980
        """
981

982
    def create_container_from_config(self, container_config: ContainerConfiguration) -> str:
1✔
983
        """
984
        Similar to create_container, but allows passing the whole ContainerConfiguration
985
        :param container_config: ContainerConfiguration how to start the container
986
        :return: Container ID
987
        """
988
        return self.create_container(
1✔
989
            image_name=container_config.image_name,
990
            name=container_config.name,
991
            entrypoint=container_config.entrypoint,
992
            remove=container_config.remove,
993
            interactive=container_config.interactive,
994
            tty=container_config.tty,
995
            command=container_config.command,
996
            volumes=container_config.volumes,
997
            ports=container_config.ports,
998
            exposed_ports=container_config.exposed_ports,
999
            env_vars=container_config.env_vars,
1000
            user=container_config.user,
1001
            cap_add=container_config.cap_add,
1002
            cap_drop=container_config.cap_drop,
1003
            security_opt=container_config.security_opt,
1004
            network=container_config.network,
1005
            dns=container_config.dns,
1006
            additional_flags=container_config.additional_flags,
1007
            workdir=container_config.workdir,
1008
            privileged=container_config.privileged,
1009
            platform=container_config.platform,
1010
            labels=container_config.labels,
1011
            ulimits=container_config.ulimits,
1012
            init=container_config.init,
1013
            log_config=container_config.log_config,
1014
            cpu_shares=container_config.cpu_shares,
1015
            mem_limit=container_config.mem_limit,
1016
            auth_config=container_config.auth_config,
1017
        )
1018

1019
    @abstractmethod
1✔
1020
    def create_container(
1✔
1021
        self,
1022
        image_name: str,
1023
        *,
1024
        name: str | None = None,
1025
        entrypoint: list[str] | str | None = None,
1026
        remove: bool = False,
1027
        interactive: bool = False,
1028
        tty: bool = False,
1029
        detach: bool = False,
1030
        command: list[str] | str | None = None,
1031
        volumes: VolumeMappings | list[SimpleVolumeBind] | None = None,
1032
        ports: PortMappings | None = None,
1033
        exposed_ports: list[str] | None = None,
1034
        env_vars: dict[str, str] | None = None,
1035
        user: str | None = None,
1036
        cap_add: list[str] | None = None,
1037
        cap_drop: list[str] | None = None,
1038
        security_opt: list[str] | None = None,
1039
        network: str | None = None,
1040
        dns: str | list[str] | None = None,
1041
        additional_flags: str | None = None,
1042
        workdir: str | None = None,
1043
        privileged: bool | None = None,
1044
        labels: dict[str, str] | None = None,
1045
        platform: DockerPlatform | None = None,
1046
        ulimits: list[Ulimit] | None = None,
1047
        init: bool | None = None,
1048
        log_config: LogConfig | None = None,
1049
        cpu_shares: int | None = None,
1050
        mem_limit: int | str | None = None,
1051
        auth_config: dict[str, str] | None = None,
1052
    ) -> str:
1053
        """Creates a container with the given image
1054

1055
        :return: Container ID
1056
        """
1057

1058
    @abstractmethod
1✔
1059
    def run_container(
1✔
1060
        self,
1061
        image_name: str,
1062
        stdin: bytes = None,
1063
        *,
1064
        name: str | None = None,
1065
        entrypoint: str | None = None,
1066
        remove: bool = False,
1067
        interactive: bool = False,
1068
        tty: bool = False,
1069
        detach: bool = False,
1070
        command: list[str] | str | None = None,
1071
        volumes: VolumeMappings | list[SimpleVolumeBind] | None = None,
1072
        ports: PortMappings | None = None,
1073
        exposed_ports: list[str] | None = None,
1074
        env_vars: dict[str, str] | None = None,
1075
        user: str | None = None,
1076
        cap_add: list[str] | None = None,
1077
        cap_drop: list[str] | None = None,
1078
        security_opt: list[str] | None = None,
1079
        network: str | None = None,
1080
        dns: str | None = None,
1081
        additional_flags: str | None = None,
1082
        workdir: str | None = None,
1083
        labels: dict[str, str] | None = None,
1084
        platform: DockerPlatform | None = None,
1085
        privileged: bool | None = None,
1086
        ulimits: list[Ulimit] | None = None,
1087
        init: bool | None = None,
1088
        log_config: LogConfig | None = None,
1089
        cpu_shares: int | None = None,
1090
        mem_limit: int | str | None = None,
1091
        auth_config: dict[str, str] | None = None,
1092
    ) -> tuple[bytes, bytes]:
1093
        """Creates and runs a given docker container
1094

1095
        :return: A tuple (stdout, stderr)
1096
        """
1097

1098
    def run_container_from_config(
1✔
1099
        self, container_config: ContainerConfiguration
1100
    ) -> tuple[bytes, bytes]:
1101
        """Like ``run_container`` but uses the parameters from the configuration."""
1102

1103
        return self.run_container(
1✔
1104
            image_name=container_config.image_name,
1105
            stdin=container_config.stdin,
1106
            name=container_config.name,
1107
            entrypoint=container_config.entrypoint,
1108
            remove=container_config.remove,
1109
            interactive=container_config.interactive,
1110
            tty=container_config.tty,
1111
            detach=container_config.detach,
1112
            command=container_config.command,
1113
            volumes=container_config.volumes,
1114
            ports=container_config.ports,
1115
            exposed_ports=container_config.exposed_ports,
1116
            env_vars=container_config.env_vars,
1117
            user=container_config.user,
1118
            cap_add=container_config.cap_add,
1119
            cap_drop=container_config.cap_drop,
1120
            security_opt=container_config.security_opt,
1121
            network=container_config.network,
1122
            dns=container_config.dns,
1123
            additional_flags=container_config.additional_flags,
1124
            workdir=container_config.workdir,
1125
            platform=container_config.platform,
1126
            privileged=container_config.privileged,
1127
            ulimits=container_config.ulimits,
1128
            init=container_config.init,
1129
            log_config=container_config.log_config,
1130
            cpu_shares=container_config.cpu_shares,
1131
            mem_limit=container_config.mem_limit,
1132
            auth_config=container_config.auth_config,
1133
        )
1134

1135
    @abstractmethod
1✔
1136
    def exec_in_container(
1✔
1137
        self,
1138
        container_name_or_id: str,
1139
        command: list[str] | str,
1140
        interactive: bool = False,
1141
        detach: bool = False,
1142
        env_vars: dict[str, str | None] | None = None,
1143
        stdin: bytes | None = None,
1144
        user: str | None = None,
1145
        workdir: str | None = None,
1146
    ) -> tuple[bytes, bytes]:
1147
        """Execute a given command in a container
1148

1149
        :return: A tuple (stdout, stderr)
1150
        """
1151

1152
    @abstractmethod
1✔
1153
    def start_container(
1✔
1154
        self,
1155
        container_name_or_id: str,
1156
        stdin: bytes = None,
1157
        interactive: bool = False,
1158
        attach: bool = False,
1159
        flags: str | None = None,
1160
    ) -> tuple[bytes, bytes]:
1161
        """Start a given, already created container
1162

1163
        :return: A tuple (stdout, stderr) if attach or interactive is set, otherwise a tuple (b"container_name_or_id", b"")
1164
        """
1165

1166
    @abstractmethod
1✔
1167
    def attach_to_container(self, container_name_or_id: str):
1✔
1168
        """
1169
        Attach local standard input, output, and error streams to a running container
1170
        """
1171

1172
    @abstractmethod
1✔
1173
    def login(self, username: str, password: str, registry: str | None = None) -> None:
1✔
1174
        """
1175
        Login into an OCI registry
1176

1177
        :param username: Username for the registry
1178
        :param password: Password / token for the registry
1179
        :param registry: Registry url
1180
        """
1181

1182
    def __get_container_names(self, return_all: bool) -> list[str]:
1✔
1183
        result = self.list_containers(all=return_all)
1✔
1184
        result = [container["name"] for container in result]
1✔
1185
        return result
1✔
1186

1187

1188
class Util:
1✔
1189
    MAX_ENV_ARGS_LENGTH = 20000
1✔
1190

1191
    @staticmethod
1✔
1192
    def format_env_vars(key: str, value: str | None):
1✔
1193
        if value is None:
1✔
UNCOV
1194
            return key
×
1195
        return f"{key}={value}"
1✔
1196

1197
    @classmethod
1✔
1198
    def create_env_vars_file_flag(cls, env_vars: dict) -> tuple[list[str], str | None]:
1✔
1199
        if not env_vars:
1✔
UNCOV
1200
            return [], None
×
1201
        result = []
1✔
1202
        env_vars = dict(env_vars)
1✔
1203
        env_file = None
1✔
1204
        if len(str(env_vars)) > cls.MAX_ENV_ARGS_LENGTH:
1✔
1205
            # default ARG_MAX=131072 in Docker - let's create an env var file if the string becomes too long...
1206
            env_file = cls.mountable_tmp_file()
×
1207
            env_content = ""
×
1208
            for name, value in dict(env_vars).items():
×
UNCOV
1209
                if len(value) > cls.MAX_ENV_ARGS_LENGTH:
×
1210
                    # each line in the env file has a max size as well (error "bufio.Scanner: token too long")
1211
                    continue
×
1212
                env_vars.pop(name)
×
1213
                value = value.replace("\n", "\\")
×
1214
                env_content += f"{cls.format_env_vars(name, value)}\n"
×
1215
            save_file(env_file, env_content)
×
UNCOV
1216
            result += ["--env-file", env_file]
×
1217

1218
        env_vars_res = [
1✔
1219
            item for k, v in env_vars.items() for item in ["-e", cls.format_env_vars(k, v)]
1220
        ]
1221
        result += env_vars_res
1✔
1222
        return result, env_file
1✔
1223

1224
    @staticmethod
1✔
1225
    def rm_env_vars_file(env_vars_file) -> None:
1✔
1226
        if env_vars_file:
1✔
UNCOV
1227
            return rm_rf(env_vars_file)
×
1228

1229
    @staticmethod
1✔
1230
    def mountable_tmp_file():
1✔
1231
        f = os.path.join(config.dirs.mounted_tmp, short_uid())
×
1232
        TMP_FILES.append(f)
×
UNCOV
1233
        return f
×
1234

1235
    @staticmethod
1✔
1236
    def append_without_latest(image_names: list[str]):
1✔
1237
        suffix = ":latest"
1✔
1238
        for image in list(image_names):
1✔
1239
            if image.endswith(suffix):
1✔
1240
                image_names.append(image[: -len(suffix)])
1✔
1241

1242
    @staticmethod
1✔
1243
    def strip_wellknown_repo_prefixes(image_names: list[str]) -> list[str]:
1✔
1244
        """
1245
        Remove well-known repo prefixes like `localhost/` or `docker.io/library/` from the list of given
1246
        image names. This is mostly to ensure compatibility of our Docker client with Podman API responses.
1247
        :return: a copy of the list of image names, with well-known repo prefixes removed
1248
        """
1249
        result = []
1✔
1250
        for image in image_names:
1✔
1251
            for prefix in WELL_KNOWN_IMAGE_REPO_PREFIXES:
1✔
1252
                if image.startswith(prefix):
1✔
UNCOV
1253
                    image = image.removeprefix(prefix)
×
1254
                    # strip only one of the matching prefixes (avoid multi-stripping)
UNCOV
1255
                    break
×
1256
            result.append(image)
1✔
1257
        return result
1✔
1258

1259
    @staticmethod
1✔
1260
    def tar_path(path: str, target_path: str, is_dir: bool):
1✔
1261
        f = tempfile.NamedTemporaryFile()
1✔
1262
        with tarfile.open(mode="w", fileobj=f) as t:
1✔
1263
            abs_path = os.path.abspath(path)
1✔
1264
            arcname = (
1✔
1265
                os.path.basename(path)
1266
                if is_dir
1267
                else (os.path.basename(target_path) or os.path.basename(path))
1268
            )
1269
            t.add(abs_path, arcname=arcname)
1✔
1270

1271
        f.seek(0)
1✔
1272
        return f
1✔
1273

1274
    @staticmethod
1✔
1275
    def untar_to_path(tardata, target_path):
1✔
1276
        target_path = Path(target_path)
1✔
1277
        with tarfile.open(mode="r", fileobj=io.BytesIO(b"".join(b for b in tardata))) as t:
1✔
1278
            if target_path.is_dir():
1✔
1279
                t.extractall(path=target_path)
1✔
1280
            else:
1281
                member = t.next()
1✔
1282
                if member:
1✔
1283
                    member.name = target_path.name
1✔
1284
                    t.extract(member, target_path.parent)
1✔
1285
                else:
UNCOV
1286
                    LOG.debug("File to copy empty, ignoring...")
×
1287

1288
    @staticmethod
1✔
1289
    def _read_docker_cli_env_file(env_file: str) -> dict[str, str]:
1✔
1290
        """
1291
        Read an environment file in docker CLI format, specified here:
1292
        https://docs.docker.com/reference/cli/docker/container/run/#env
1293
        :param env_file: Path to the environment file
1294
        :return: Read environment variables
1295
        """
1296
        env_vars = {}
1✔
1297
        try:
1✔
1298
            with open(env_file) as f:
1✔
1299
                env_file_lines = f.readlines()
1✔
1300
        except FileNotFoundError as e:
×
UNCOV
1301
            LOG.error(
×
1302
                "Specified env file '%s' not found. Please make sure the file is properly mounted into the LocalStack container. Error: %s",
1303
                env_file,
1304
                e,
1305
            )
1306
            raise
×
1307
        except OSError as e:
×
UNCOV
1308
            LOG.error(
×
1309
                "Could not read env file '%s'. Please make sure the LocalStack container has the permissions to read it. Error: %s",
1310
                env_file,
1311
                e,
1312
            )
UNCOV
1313
            raise
×
1314
        for idx, line in enumerate(env_file_lines):
1✔
1315
            line = line.strip()
1✔
1316
            if not line or line.startswith("#"):
1✔
1317
                # skip comments or empty lines
1318
                continue
1✔
1319
            lhs, separator, rhs = line.partition("=")
1✔
1320
            if rhs or separator:
1✔
1321
                env_vars[lhs] = rhs
1✔
1322
            else:
1323
                # No "=" in the line, only the name => lookup in local env
1324
                if env_value := os.environ.get(lhs):
1✔
1325
                    env_vars[lhs] = env_value
1✔
1326
        return env_vars
1✔
1327

1328
    @staticmethod
1✔
1329
    def parse_additional_flags(
1✔
1330
        additional_flags: str,
1331
        env_vars: dict[str, str] | None = None,
1332
        labels: dict[str, str] | None = None,
1333
        volumes: list[SimpleVolumeBind] | None = None,
1334
        network: str | None = None,
1335
        platform: DockerPlatform | None = None,
1336
        ports: PortMappings | None = None,
1337
        privileged: bool | None = None,
1338
        user: str | None = None,
1339
        ulimits: list[Ulimit] | None = None,
1340
        dns: str | list[str] | None = None,
1341
    ) -> DockerRunFlags:
1342
        """Parses additional CLI-formatted Docker flags, which could overwrite provided defaults.
1343
        :param additional_flags: String which contains the flag definitions inspired by the Docker CLI reference:
1344
                                 https://docs.docker.com/engine/reference/commandline/run/
1345
        :param env_vars: Dict with env vars. Will be modified in place.
1346
        :param labels: Dict with labels. Will be modified in place.
1347
        :param volumes: List of mount tuples (host_path, container_path). Will be modified in place.
1348
        :param network: Existing network name (optional). Warning will be printed if network is overwritten in flags.
1349
        :param platform: Platform to execute container. Warning will be printed if platform is overwritten in flags.
1350
        :param ports: PortMapping object. Will be modified in place.
1351
        :param privileged: Run the container in privileged mode. Warning will be printed if overwritten in flags.
1352
        :param ulimits: ulimit options in the format <type>=<soft limit>[:<hard limit>]
1353
        :param user: User to run first process. Warning will be printed if user is overwritten in flags.
1354
        :param dns: List of DNS servers to configure the container with.
1355
        :return: A DockerRunFlags object that will return new objects if respective parameters were None and
1356
                additional flags contained a flag for that object or the same which are passed otherwise.
1357
        """
1358
        # Argparse refactoring opportunity: custom argparse actions can be used to modularize parsing (e.g., key=value)
1359
        # https://docs.python.org/3/library/argparse.html#action
1360

1361
        # Configure parser
1362
        parser = NoExitArgumentParser(description="Docker run flags parser")
1✔
1363
        parser.add_argument(
1✔
1364
            "--add-host",
1365
            help="Add a custom host-to-IP mapping (host:ip)",
1366
            dest="add_hosts",
1367
            action="append",
1368
        )
1369
        parser.add_argument(
1✔
1370
            "--env", "-e", help="Set environment variables", dest="envs", action="append"
1371
        )
1372
        parser.add_argument(
1✔
1373
            "--env-file",
1374
            help="Set environment variables via a file",
1375
            dest="env_files",
1376
            action="append",
1377
        )
1378
        parser.add_argument(
1✔
1379
            "--compose-env-file",
1380
            help="Set environment variables via a file, with a docker-compose supported feature set.",
1381
            dest="compose_env_files",
1382
            action="append",
1383
        )
1384
        parser.add_argument(
1✔
1385
            "--label", "-l", help="Add container meta data", dest="labels", action="append"
1386
        )
1387
        parser.add_argument("--network", help="Connect a container to a network")
1✔
1388
        parser.add_argument(
1✔
1389
            "--platform",
1390
            type=DockerPlatform,
1391
            help="Docker platform (e.g., linux/amd64 or linux/arm64)",
1392
        )
1393
        parser.add_argument(
1✔
1394
            "--privileged",
1395
            help="Give extended privileges to this container",
1396
            action="store_true",
1397
        )
1398
        parser.add_argument(
1✔
1399
            "--publish",
1400
            "-p",
1401
            help="Publish container port(s) to the host",
1402
            dest="publish_ports",
1403
            action="append",
1404
        )
1405
        parser.add_argument(
1✔
1406
            "--ulimit", help="Container ulimit settings", dest="ulimits", action="append"
1407
        )
1408
        parser.add_argument("--user", "-u", help="Username or UID to execute first process")
1✔
1409
        parser.add_argument(
1✔
1410
            "--volume", "-v", help="Bind mount a volume", dest="volumes", action="append"
1411
        )
1412
        parser.add_argument("--dns", help="Set custom DNS servers", dest="dns", action="append")
1✔
1413

1414
        # Parse
1415
        flags = shlex.split(additional_flags)
1✔
1416
        args = parser.parse_args(flags)
1✔
1417

1418
        # Post-process parsed flags
1419
        extra_hosts = None
1✔
1420
        if args.add_hosts:
1✔
1421
            for add_host in args.add_hosts:
1✔
1422
                extra_hosts = extra_hosts if extra_hosts is not None else {}
1✔
1423
                hosts_split = add_host.split(":")
1✔
1424
                extra_hosts[hosts_split[0]] = hosts_split[1]
1✔
1425

1426
        # set env file values before env values, as the latter override the earlier
1427
        if args.env_files:
1✔
1428
            env_vars = env_vars if env_vars is not None else {}
1✔
1429
            for env_file in args.env_files:
1✔
1430
                env_vars.update(Util._read_docker_cli_env_file(env_file))
1✔
1431

1432
        if args.compose_env_files:
1✔
1433
            env_vars = env_vars if env_vars is not None else {}
1✔
1434
            for env_file in args.compose_env_files:
1✔
1435
                env_vars.update(dotenv.dotenv_values(env_file))
1✔
1436

1437
        if args.envs:
1✔
1438
            env_vars = env_vars if env_vars is not None else {}
1✔
1439
            for env in args.envs:
1✔
1440
                lhs, _, rhs = env.partition("=")
1✔
1441
                env_vars[lhs] = rhs
1✔
1442

1443
        if args.labels:
1✔
1444
            labels = labels if labels is not None else {}
1✔
1445
            for label in args.labels:
1✔
1446
                key, _, value = label.partition("=")
1✔
1447
                # Only consider non-empty labels
1448
                if key:
1✔
1449
                    labels[key] = value
1✔
1450

1451
        if args.network:
1✔
1452
            LOG.warning(
1✔
1453
                "Overwriting Docker container network '%s' with new value '%s'",
1454
                network,
1455
                args.network,
1456
            )
1457
            network = args.network
1✔
1458

1459
        if args.platform:
1✔
1460
            LOG.warning(
1✔
1461
                "Overwriting Docker platform '%s' with new value '%s'",
1462
                platform,
1463
                args.platform,
1464
            )
1465
            platform = args.platform
1✔
1466

1467
        if args.privileged:
1✔
1468
            LOG.warning(
1✔
1469
                "Overwriting Docker container privileged flag %s with new value %s",
1470
                privileged,
1471
                args.privileged,
1472
            )
1473
            privileged = args.privileged
1✔
1474

1475
        if args.publish_ports:
1✔
1476
            for port_mapping in args.publish_ports:
1✔
1477
                port_split = port_mapping.split(":")
1✔
1478
                protocol = "tcp"
1✔
1479
                if len(port_split) == 2:
1✔
1480
                    host_port, container_port = port_split
1✔
1481
                elif len(port_split) == 3:
1✔
1482
                    LOG.warning(
1✔
1483
                        "Host part of port mappings are ignored currently in additional flags"
1484
                    )
1485
                    _, host_port, container_port = port_split
1✔
1486
                else:
1487
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
1✔
1488
                host_port_split = host_port.split("-")
1✔
1489
                if len(host_port_split) == 2:
1✔
1490
                    host_port = [int(host_port_split[0]), int(host_port_split[1])]
1✔
1491
                elif len(host_port_split) == 1:
1✔
1492
                    host_port = int(host_port)
1✔
1493
                else:
UNCOV
1494
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
×
1495
                if "/" in container_port:
1✔
1496
                    container_port, protocol = container_port.split("/")
1✔
1497
                ports = ports if ports is not None else PortMappings()
1✔
1498
                ports.add(host_port, int(container_port), protocol)
1✔
1499

1500
        if args.ulimits:
1✔
1501
            ulimits = ulimits if ulimits is not None else []
1✔
1502
            ulimits_dict = {ul.name: ul for ul in ulimits}
1✔
1503
            for ulimit in args.ulimits:
1✔
1504
                name, _, rhs = ulimit.partition("=")
1✔
1505
                soft, _, hard = rhs.partition(":")
1✔
1506
                hard_limit = int(hard) if hard else int(soft)
1✔
1507
                new_ulimit = Ulimit(name=name, soft_limit=int(soft), hard_limit=hard_limit)
1✔
1508
                if ulimits_dict.get(name):
1✔
1509
                    LOG.warning("Overwriting Docker ulimit %s", new_ulimit)
1✔
1510
                ulimits_dict[name] = new_ulimit
1✔
1511
            ulimits = list(ulimits_dict.values())
1✔
1512

1513
        if args.user:
1✔
1514
            LOG.warning(
1✔
1515
                "Overwriting Docker user '%s' with new value '%s'",
1516
                user,
1517
                args.user,
1518
            )
1519
            user = args.user
1✔
1520

1521
        if args.volumes:
1✔
1522
            volumes = volumes if volumes is not None else []
1✔
1523
            for volume in args.volumes:
1✔
1524
                match = re.match(
1✔
1525
                    r"(?P<host>[\w\s\\\/:\-.]+?):(?P<container>[\w\s\/\-.]+)(?::(?P<arg>ro|rw|z|Z))?",
1526
                    volume,
1527
                )
1528
                if not match:
1✔
1529
                    LOG.warning("Unable to parse volume mount Docker flags: %s", volume)
×
UNCOV
1530
                    continue
×
1531
                host_path = match.group("host")
1✔
1532
                container_path = match.group("container")
1✔
1533
                rw_args = match.group("arg")
1✔
1534
                if rw_args:
1✔
1535
                    LOG.info("Volume options like :ro or :rw are currently ignored.")
1✔
1536
                volumes.append((host_path, container_path))
1✔
1537

1538
        dns = ensure_list(dns or [])
1✔
1539
        if args.dns:
1✔
1540
            LOG.info(
1✔
1541
                "Extending Docker container DNS servers %s with additional values %s", dns, args.dns
1542
            )
1543
            dns.extend(args.dns)
1✔
1544

1545
        return DockerRunFlags(
1✔
1546
            env_vars=env_vars,
1547
            extra_hosts=extra_hosts,
1548
            labels=labels,
1549
            volumes=volumes,
1550
            ports=ports,
1551
            network=network,
1552
            platform=platform,
1553
            privileged=privileged,
1554
            ulimits=ulimits,
1555
            user=user,
1556
            dns=dns,
1557
        )
1558

1559
    @staticmethod
1✔
1560
    def convert_mount_list_to_dict(
1✔
1561
        volumes: list[SimpleVolumeBind] | VolumeMappings,
1562
    ) -> dict[str, dict[str, str]]:
1563
        """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""
1564

1565
        def _map_to_dict(paths: VolumeMappingSpecification):
1✔
1566
            # TODO: move this logic to the `Mount` base class
1567
            if isinstance(paths, (BindMount, VolumeDirMount)):
1✔
1568
                return paths.to_docker_sdk_parameters()
1✔
1569
            else:
UNCOV
1570
                return str(paths[0]), {"bind": paths[1], "mode": "rw"}
×
1571

1572
        return dict(
1✔
1573
            map(
1574
                _map_to_dict,
1575
                volumes,
1576
            )
1577
        )
1578

1579
    @staticmethod
1✔
1580
    def resolve_dockerfile_path(dockerfile_path: str) -> str:
1✔
1581
        """If the given path is a directory that contains a Dockerfile, then return the file path to it."""
1582
        rel_path = os.path.join(dockerfile_path, "Dockerfile")
1✔
1583
        if os.path.isdir(dockerfile_path) and os.path.exists(rel_path):
1✔
1584
            return rel_path
1✔
1585
        return dockerfile_path
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc