• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.45
/localstack-core/localstack/utils/container_utils/container_client.py
1
import dataclasses
1✔
2
import io
1✔
3
import ipaddress
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import shlex
1✔
8
import sys
1✔
9
import tarfile
1✔
10
import tempfile
1✔
11
from abc import ABCMeta, abstractmethod
1✔
12
from enum import Enum, unique
1✔
13
from pathlib import Path
1✔
14
from typing import (
1✔
15
    Callable,
16
    Literal,
17
    NamedTuple,
18
    Optional,
19
    Protocol,
20
    TypedDict,
21
    Union,
22
    get_args,
23
)
24

25
import dotenv
1✔
26

27
from localstack import config
1✔
28
from localstack.constants import DEFAULT_VOLUME_DIR
1✔
29
from localstack.utils.collections import HashableList, ensure_list
1✔
30
from localstack.utils.files import TMP_FILES, chmod_r, rm_rf, save_file
1✔
31
from localstack.utils.no_exit_argument_parser import NoExitArgumentParser
1✔
32
from localstack.utils.strings import short_uid
1✔
33

34
LOG = logging.getLogger(__name__)
1✔
35

36
# list of well-known image repo prefixes that should be stripped off to canonicalize image names
37
WELL_KNOWN_IMAGE_REPO_PREFIXES = ("localhost/", "docker.io/library/")
1✔
38

39

40
@unique
1✔
41
class DockerContainerStatus(Enum):
1✔
42
    DOWN = -1
1✔
43
    NON_EXISTENT = 0
1✔
44
    UP = 1
1✔
45
    PAUSED = 2
1✔
46

47

48
class DockerContainerStats(TypedDict):
1✔
49
    """Container usage statistics"""
50

51
    Container: str
1✔
52
    ID: str
1✔
53
    Name: str
1✔
54
    BlockIO: tuple[int, int]
1✔
55
    CPUPerc: float
1✔
56
    MemPerc: float
1✔
57
    MemUsage: tuple[int, int]
1✔
58
    NetIO: tuple[int, int]
1✔
59
    PIDs: int
1✔
60
    SDKStats: Optional[dict]
1✔
61

62

63
class ContainerException(Exception):
1✔
64
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
1✔
65
        self.message = message or "Error during the communication with the docker daemon"
1✔
66
        self.stdout = stdout
1✔
67
        self.stderr = stderr
1✔
68

69

70
class NoSuchObject(ContainerException):
1✔
71
    def __init__(self, object_id: str, message=None, stdout=None, stderr=None) -> None:
1✔
72
        message = message or f"Docker object {object_id} not found"
1✔
73
        super().__init__(message, stdout, stderr)
1✔
74
        self.object_id = object_id
1✔
75

76

77
class NoSuchContainer(ContainerException):
1✔
78
    def __init__(self, container_name_or_id: str, message=None, stdout=None, stderr=None) -> None:
1✔
79
        message = message or f"Docker container {container_name_or_id} not found"
1✔
80
        super().__init__(message, stdout, stderr)
1✔
81
        self.container_name_or_id = container_name_or_id
1✔
82

83

84
class NoSuchImage(ContainerException):
1✔
85
    def __init__(self, image_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
86
        message = message or f"Docker image {image_name} not found"
1✔
87
        super().__init__(message, stdout, stderr)
1✔
88
        self.image_name = image_name
1✔
89

90

91
class NoSuchNetwork(ContainerException):
1✔
92
    def __init__(self, network_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
93
        message = message or f"Docker network {network_name} not found"
1✔
94
        super().__init__(message, stdout, stderr)
1✔
95
        self.network_name = network_name
1✔
96

97

98
class RegistryConnectionError(ContainerException):
1✔
99
    def __init__(self, details: str, message=None, stdout=None, stderr=None) -> None:
1✔
100
        message = message or f"Connection error: {details}"
1✔
101
        super().__init__(message, stdout, stderr)
1✔
102
        self.details = details
1✔
103

104

105
class DockerNotAvailable(ContainerException):
1✔
106
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
1✔
107
        message = message or "Docker not available"
1✔
108
        super().__init__(message, stdout, stderr)
1✔
109

110

111
class AccessDenied(ContainerException):
1✔
112
    def __init__(self, object_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
113
        message = message or f"Access denied to {object_name}"
1✔
114
        super().__init__(message, stdout, stderr)
1✔
115
        self.object_name = object_name
1✔
116

117

118
class CancellableStream(Protocol):
1✔
119
    """Describes a generator that can be closed. Borrowed from ``docker.types.daemon``."""
120

121
    def __iter__(self):
1✔
122
        raise NotImplementedError
123

124
    def __next__(self):
1✔
125
        raise NotImplementedError
126

127
    def close(self):
1✔
128
        raise NotImplementedError
129

130

131
class DockerPlatform(str):
1✔
132
    """Platform in the format ``os[/arch[/variant]]``"""
133

134
    linux_amd64 = "linux/amd64"
1✔
135
    linux_arm64 = "linux/arm64"
1✔
136

137

138
@dataclasses.dataclass
1✔
139
class Ulimit:
1✔
140
    """The ``ulimit`` settings for the container.
141
    See https://www.tutorialspoint.com/setting-ulimit-values-on-docker-containers
142
    """
143

144
    name: str
1✔
145
    soft_limit: int
1✔
146
    hard_limit: Optional[int] = None
1✔
147

148
    def __repr__(self):
149
        """Format: <type>=<soft limit>[:<hard limit>]"""
150
        ulimit_string = f"{self.name}={self.soft_limit}"
151
        if self.hard_limit:
152
            ulimit_string += f":{self.hard_limit}"
153
        return ulimit_string
154

155

156
# defines the type for port mappings (source->target port range)
157
PortRange = Union[list, HashableList]
1✔
158
# defines the protocol for a port range ("tcp" or "udp")
159
PortProtocol = str
1✔
160

161

162
def isinstance_union(obj, class_or_tuple):
1✔
163
    # that's some dirty hack
164
    if sys.version_info < (3, 10):
1✔
UNCOV
165
        return isinstance(obj, get_args(PortRange))
×
166
    else:
167
        return isinstance(obj, class_or_tuple)
1✔
168

169

170
class PortMappings:
1✔
171
    """Maps source to target port ranges for Docker port mappings."""
172

173
    # bind host to be used for defining port mappings
174
    bind_host: str
1✔
175
    # maps `from` port range to `to` port range for port mappings
176
    mappings: dict[tuple[PortRange, PortProtocol], list]
1✔
177

178
    def __init__(self, bind_host: str = None):
1✔
179
        self.bind_host = bind_host if bind_host else ""
1✔
180
        self.mappings = {}
1✔
181

182
    def add(
1✔
183
        self,
184
        port: Union[int, PortRange],
185
        mapped: Union[int, PortRange] = None,
186
        protocol: PortProtocol = "tcp",
187
    ):
188
        mapped = mapped or port
1✔
189
        if isinstance_union(port, PortRange):
1✔
190
            for i in range(port[1] - port[0] + 1):
1✔
191
                if isinstance_union(mapped, PortRange):
1✔
192
                    self.add(port[0] + i, mapped[0] + i, protocol)
1✔
193
                else:
194
                    self.add(port[0] + i, mapped, protocol)
1✔
195
            return
1✔
196
        if port is None or int(port) < 0:
1✔
UNCOV
197
            raise Exception(f"Unable to add mapping for invalid port: {port}")
×
198
        if self.contains(port, protocol):
1✔
199
            return
1✔
200
        bisected_host_port = None
1✔
201
        for (from_range, from_protocol), to_range in self.mappings.items():
1✔
202
            if not from_protocol == protocol:
1✔
203
                continue
1✔
204
            if not self.in_expanded_range(port, from_range):
1✔
205
                continue
1✔
206
            if not self.in_expanded_range(mapped, to_range):
1✔
207
                continue
1✔
208
            from_range_len = from_range[1] - from_range[0]
1✔
209
            to_range_len = to_range[1] - to_range[0]
1✔
210
            is_uniform = from_range_len == to_range_len
1✔
211
            if is_uniform:
1✔
212
                self.expand_range(port, from_range, protocol=protocol, remap=True)
1✔
213
                self.expand_range(mapped, to_range, protocol=protocol)
1✔
214
            else:
215
                if not self.in_range(mapped, to_range):
1✔
216
                    continue
1✔
217
                # extending a 1 to 1 mapping to be many to 1
218
                elif from_range_len == 1:
1✔
219
                    self.expand_range(port, from_range, protocol=protocol, remap=True)
1✔
220
                # splitting a uniform mapping
221
                else:
222
                    bisected_port_index = mapped - to_range[0]
1✔
223
                    bisected_host_port = from_range[0] + bisected_port_index
1✔
224
                    self.bisect_range(mapped, to_range, protocol=protocol)
1✔
225
                    self.bisect_range(bisected_host_port, from_range, protocol=protocol, remap=True)
1✔
226
                    break
1✔
227
            return
1✔
228
        if bisected_host_port is None:
1✔
229
            port_range = [port, port]
1✔
230
        elif bisected_host_port < port:
1✔
231
            port_range = [bisected_host_port, port]
1✔
232
        else:
UNCOV
233
            port_range = [port, bisected_host_port]
×
234
        protocol = str(protocol or "tcp").lower()
1✔
235
        self.mappings[(HashableList(port_range), protocol)] = [mapped, mapped]
1✔
236

237
    def to_str(self) -> str:
1✔
238
        bind_address = f"{self.bind_host}:" if self.bind_host else ""
1✔
239

240
        def entry(k, v):
1✔
241
            from_range, protocol = k
1✔
242
            to_range = v
1✔
243
            # use /<protocol> suffix if the protocol is not"tcp"
244
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
1✔
245
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
1✔
246
                return f"-p {bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"
1✔
247
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
1✔
248
                return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}{protocol_suffix}"
1✔
249
            return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}"
1✔
250

251
        return " ".join([entry(k, v) for k, v in self.mappings.items()])
1✔
252

253
    def to_list(self) -> list[str]:  # TODO test
1✔
254
        bind_address = f"{self.bind_host}:" if self.bind_host else ""
1✔
255

256
        def entry(k, v):
1✔
257
            from_range, protocol = k
1✔
258
            to_range = v
1✔
259
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
1✔
260
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
1✔
261
                return ["-p", f"{bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"]
1✔
262
            return [
1✔
263
                "-p",
264
                f"{bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}",
265
            ]
266

267
        return [item for k, v in self.mappings.items() for item in entry(k, v)]
1✔
268

269
    def to_dict(self) -> dict[str, Union[tuple[str, Union[int, list[int]]], int]]:
1✔
270
        bind_address = self.bind_host or ""
1✔
271

272
        def bind_port(bind_address, host_port):
1✔
273
            if host_port == 0:
1✔
274
                return None
1✔
275
            elif bind_address:
1✔
276
                return (bind_address, host_port)
1✔
277
            else:
278
                return host_port
1✔
279

280
        def entry(k, v):
1✔
281
            from_range, protocol = k
1✔
282
            to_range = v
1✔
283
            protocol_suffix = f"/{protocol}"
1✔
284
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
1✔
285
                container_port = to_range[0]
1✔
286
                host_ports = list(range(from_range[0], from_range[1] + 1))
1✔
287
                return [
1✔
288
                    (
289
                        f"{container_port}{protocol_suffix}",
290
                        (bind_address, host_ports) if bind_address else host_ports,
291
                    )
292
                ]
293
            return [
1✔
294
                (
295
                    f"{container_port}{protocol_suffix}",
296
                    bind_port(bind_address, host_port),
297
                )
298
                for container_port, host_port in zip(
299
                    range(to_range[0], to_range[1] + 1),
300
                    range(from_range[0], from_range[1] + 1),
301
                    strict=False,
302
                )
303
            ]
304

305
        items = [item for k, v in self.mappings.items() for item in entry(k, v)]
1✔
306
        return dict(items)
1✔
307

308
    def contains(self, port: int, protocol: PortProtocol = "tcp") -> bool:
1✔
309
        for from_range_w_protocol, to_range in self.mappings.items():
1✔
310
            from_protocol = from_range_w_protocol[1]
1✔
311
            if from_protocol == protocol:
1✔
312
                from_range = from_range_w_protocol[0]
1✔
313
                if self.in_range(port, from_range):
1✔
314
                    return True
1✔
315

316
    def in_range(self, port: int, range: PortRange) -> bool:
1✔
317
        return port >= range[0] and port <= range[1]
1✔
318

319
    def in_expanded_range(self, port: int, range: PortRange):
1✔
320
        return port >= range[0] - 1 and port <= range[1] + 1
1✔
321

322
    def expand_range(
1✔
323
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
324
    ):
325
        """
326
        Expand the given port range by the given port. If remap==True, put the updated range into self.mappings
327
        """
328
        if self.in_range(port, range):
1✔
329
            return
1✔
330
        new_range = list(range) if remap else range
1✔
331
        if port == range[0] - 1:
1✔
UNCOV
332
            new_range[0] = port
×
333
        elif port == range[1] + 1:
1✔
334
            new_range[1] = port
1✔
335
        else:
UNCOV
336
            raise Exception(f"Unable to add port {port} to existing range {range}")
×
337
        if remap:
1✔
338
            self._remap_range(range, new_range, protocol=protocol)
1✔
339

340
    def bisect_range(
1✔
341
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
342
    ):
343
        """
344
        Bisect a port range, at the provided port. This is needed in some cases when adding a
345
        non-uniform host to port mapping adjacent to an existing port range.
346
        If remap==True, put the updated range into self.mappings
347
        """
348
        if not self.in_range(port, range):
1✔
UNCOV
349
            return
×
350
        new_range = list(range) if remap else range
1✔
351
        if port == range[0]:
1✔
352
            new_range[0] = port + 1
×
353
        else:
354
            new_range[1] = port - 1
1✔
355
        if remap:
1✔
356
            self._remap_range(range, new_range, protocol)
1✔
357

358
    def _remap_range(self, old_key: PortRange, new_key: PortRange, protocol: PortProtocol):
1✔
359
        self.mappings[(HashableList(new_key), protocol)] = self.mappings.pop(
1✔
360
            (HashableList(old_key), protocol)
361
        )
362

363
    def __repr__(self):
364
        return f"<PortMappings: {self.to_dict()}>"
365

366

367
SimpleVolumeBind = tuple[str, str]
1✔
368
"""Type alias for a simple version of VolumeBind"""
1✔
369

370

371
@dataclasses.dataclass
1✔
372
class BindMount:
1✔
373
    """Represents a --volume argument run/create command. When using VolumeBind to bind-mount a file or directory
374
    that does not yet exist on the Docker host, -v creates the endpoint for you. It is always created as a directory.
375
    """
376

377
    host_dir: str
1✔
378
    container_dir: str
1✔
379
    read_only: bool = False
1✔
380

381
    def to_str(self) -> str:
1✔
382
        args = []
1✔
383

384
        if self.host_dir:
1✔
385
            args.append(self.host_dir)
1✔
386

387
        if not self.container_dir:
1✔
UNCOV
388
            raise ValueError("no container dir specified")
×
389

390
        args.append(self.container_dir)
1✔
391

392
        if self.read_only:
1✔
UNCOV
393
            args.append("ro")
×
394

395
        return ":".join(args)
1✔
396

397
    def to_docker_sdk_parameters(self) -> tuple[str, dict[str, str]]:
1✔
398
        return str(self.host_dir), {
1✔
399
            "bind": self.container_dir,
400
            "mode": "ro" if self.read_only else "rw",
401
        }
402

403
    @classmethod
1✔
404
    def parse(cls, param: str) -> "BindMount":
1✔
405
        parts = param.split(":")
1✔
406
        if 1 > len(parts) > 3:
1✔
UNCOV
407
            raise ValueError(f"Cannot parse volume bind {param}")
×
408

409
        volume = cls(parts[0], parts[1])
1✔
410
        if len(parts) == 3:
1✔
411
            if "ro" in parts[2].split(","):
1✔
412
                volume.read_only = True
1✔
413
        return volume
1✔
414

415

416
@dataclasses.dataclass
1✔
417
class VolumeDirMount:
1✔
418
    volume_path: str
1✔
419
    """
1✔
420
    Absolute path inside /var/lib/localstack to mount into the container
421
    """
422
    container_path: str
1✔
423
    """
1✔
424
    Target path inside the started container
425
    """
426
    read_only: bool = False
1✔
427

428
    def to_str(self) -> str:
1✔
429
        self._validate()
1✔
430
        from localstack.utils.docker_utils import get_host_path_for_path_in_docker
1✔
431

432
        host_dir = get_host_path_for_path_in_docker(self.volume_path)
1✔
433
        return f"{host_dir}:{self.container_path}{':ro' if self.read_only else ''}"
1✔
434

435
    def _validate(self):
1✔
436
        if not self.volume_path:
1✔
UNCOV
437
            raise ValueError("no volume dir specified")
×
438
        if config.is_in_docker and not self.volume_path.startswith(DEFAULT_VOLUME_DIR):
1✔
UNCOV
439
            raise ValueError(f"volume dir not starting with {DEFAULT_VOLUME_DIR}")
×
440
        if not self.container_path:
1✔
UNCOV
441
            raise ValueError("no container dir specified")
×
442

443
    def to_docker_sdk_parameters(self) -> tuple[str, dict[str, str]]:
1✔
444
        self._validate()
1✔
445
        from localstack.utils.docker_utils import get_host_path_for_path_in_docker
1✔
446

447
        host_dir = get_host_path_for_path_in_docker(self.volume_path)
1✔
448
        return host_dir, {
1✔
449
            "bind": self.container_path,
450
            "mode": "ro" if self.read_only else "rw",
451
        }
452

453

454
class VolumeMappings:
1✔
455
    mappings: list[Union[SimpleVolumeBind, BindMount]]
1✔
456

457
    def __init__(self, mappings: list[Union[SimpleVolumeBind, BindMount, VolumeDirMount]] = None):
1✔
458
        self.mappings = mappings if mappings is not None else []
1✔
459

460
    def add(self, mapping: Union[SimpleVolumeBind, BindMount, VolumeDirMount]):
1✔
461
        self.append(mapping)
1✔
462

463
    def append(
1✔
464
        self,
465
        mapping: Union[
466
            SimpleVolumeBind,
467
            BindMount,
468
            VolumeDirMount,
469
        ],
470
    ):
471
        self.mappings.append(mapping)
1✔
472

473
    def find_target_mapping(
1✔
474
        self, container_dir: str
475
    ) -> Optional[Union[SimpleVolumeBind, BindMount, VolumeDirMount]]:
476
        """
477
        Looks through the volumes and returns the one where the container dir matches ``container_dir``.
478
        Returns None if there is no volume mapping to the given container directory.
479

480
        :param container_dir: the target of the volume mapping, i.e., the path in the container
481
        :return: the volume mapping or None
482
        """
483
        for volume in self.mappings:
1✔
484
            target_dir = volume[1] if isinstance(volume, tuple) else volume.container_dir
1✔
485
            if container_dir == target_dir:
1✔
UNCOV
486
                return volume
×
487
        return None
1✔
488

489
    def __iter__(self):
1✔
490
        return self.mappings.__iter__()
1✔
491

492
    def __repr__(self):
493
        return self.mappings.__repr__()
494

495
    def __len__(self):
1✔
496
        return len(self.mappings)
1✔
497

498
    def __getitem__(self, item: int):
1✔
UNCOV
499
        return self.mappings[item]
×
500

501

502
VolumeType = Literal["bind", "volume"]
1✔
503

504

505
class VolumeInfo(NamedTuple):
1✔
506
    """Container volume information."""
507

508
    type: VolumeType
1✔
509
    source: str
1✔
510
    destination: str
1✔
511
    mode: str
1✔
512
    rw: bool
1✔
513
    propagation: str
1✔
514
    name: Optional[str] = None
1✔
515
    driver: Optional[str] = None
1✔
516

517

518
@dataclasses.dataclass
1✔
519
class LogConfig:
1✔
520
    type: Literal["json-file", "syslog", "journald", "gelf", "fluentd", "none", "awslogs", "splunk"]
1✔
521
    config: dict[str, str] = dataclasses.field(default_factory=dict)
1✔
522

523

524
@dataclasses.dataclass
1✔
525
class ContainerConfiguration:
1✔
526
    image_name: str
1✔
527
    name: Optional[str] = None
1✔
528
    volumes: VolumeMappings = dataclasses.field(default_factory=VolumeMappings)
1✔
529
    ports: PortMappings = dataclasses.field(default_factory=PortMappings)
1✔
530
    exposed_ports: list[str] = dataclasses.field(default_factory=list)
1✔
531
    entrypoint: Optional[Union[list[str], str]] = None
1✔
532
    additional_flags: Optional[str] = None
1✔
533
    command: Optional[list[str]] = None
1✔
534
    env_vars: dict[str, str] = dataclasses.field(default_factory=dict)
1✔
535

536
    privileged: bool = False
1✔
537
    remove: bool = False
1✔
538
    interactive: bool = False
1✔
539
    tty: bool = False
1✔
540
    detach: bool = False
1✔
541

542
    stdin: Optional[str] = None
1✔
543
    user: Optional[str] = None
1✔
544
    cap_add: Optional[list[str]] = None
1✔
545
    cap_drop: Optional[list[str]] = None
1✔
546
    security_opt: Optional[list[str]] = None
1✔
547
    network: Optional[str] = None
1✔
548
    dns: Optional[str] = None
1✔
549
    workdir: Optional[str] = None
1✔
550
    platform: Optional[str] = None
1✔
551
    ulimits: Optional[list[Ulimit]] = None
1✔
552
    labels: Optional[dict[str, str]] = None
1✔
553
    init: Optional[bool] = None
1✔
554
    log_config: Optional[LogConfig] = None
1✔
555

556

557
class ContainerConfigurator(Protocol):
1✔
558
    """Protocol for functional configurators. A ContainerConfigurator modifies, when called,
559
    a ContainerConfiguration in place."""
560

561
    def __call__(self, configuration: ContainerConfiguration):
1✔
562
        """
563
        Modify the given container configuration.
564

565
        :param configuration: the configuration to modify
566
        """
UNCOV
567
        ...
×
568

569

570
@dataclasses.dataclass
1✔
571
class DockerRunFlags:
1✔
572
    """Class to capture Docker run/create flags for a container.
573
    run: https://docs.docker.com/engine/reference/commandline/run/
574
    create: https://docs.docker.com/engine/reference/commandline/create/
575
    """
576

577
    env_vars: Optional[dict[str, str]]
1✔
578
    extra_hosts: Optional[dict[str, str]]
1✔
579
    labels: Optional[dict[str, str]]
1✔
580
    volumes: Optional[list[SimpleVolumeBind]]
1✔
581
    network: Optional[str]
1✔
582
    platform: Optional[DockerPlatform]
1✔
583
    privileged: Optional[bool]
1✔
584
    ports: Optional[PortMappings]
1✔
585
    ulimits: Optional[list[Ulimit]]
1✔
586
    user: Optional[str]
1✔
587
    dns: Optional[list[str]]
1✔
588

589

590
class RegistryResolverStrategy(Protocol):
1✔
591
    def resolve(self, image_name: str) -> str: ...
1✔
592

593

594
class HardCodedResolver:
1✔
595
    def resolve(self, image_name: str) -> str:  # noqa
1✔
596
        return image_name
1✔
597

598

599
# TODO: remove Docker/Podman compatibility switches (in particular strip_wellknown_repo_prefixes=...)
600
#  from the container client base interface and introduce derived Podman client implementations instead!
601
class ContainerClient(metaclass=ABCMeta):
1✔
602
    registry_resolver_strategy: RegistryResolverStrategy = HardCodedResolver()
1✔
603

604
    @abstractmethod
1✔
605
    def get_system_info(self) -> dict:
1✔
606
        """Returns the docker system-wide information as dictionary (``docker info``)."""
607

608
    def get_system_id(self) -> str:
1✔
609
        """Returns the unique and stable ID of the docker daemon."""
610
        return self.get_system_info()["ID"]
1✔
611

612
    @abstractmethod
1✔
613
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
614
        """Returns the status of the container with the given name"""
UNCOV
615
        pass
×
616

617
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
618
        """Returns the usage statistics of the container with the given name"""
UNCOV
619
        pass
×
620

621
    def get_networks(self, container_name: str) -> list[str]:
1✔
622
        LOG.debug("Getting networks for container: %s", container_name)
1✔
623
        container_attrs = self.inspect_container(container_name_or_id=container_name)
1✔
624
        return list(container_attrs["NetworkSettings"].get("Networks", {}).keys())
1✔
625

626
    def get_container_ipv4_for_network(
1✔
627
        self, container_name_or_id: str, container_network: str
628
    ) -> str:
629
        """
630
        Returns the IPv4 address for the container on the interface connected to the given network
631
        :param container_name_or_id: Container to inspect
632
        :param container_network: Network the IP address will belong to
633
        :return: IP address of the given container on the interface connected to the given network
634
        """
635
        LOG.debug(
1✔
636
            "Getting ipv4 address for container %s in network %s.",
637
            container_name_or_id,
638
            container_network,
639
        )
640
        # we always need the ID for this
641
        container_id = self.get_container_id(container_name=container_name_or_id)
1✔
642
        network_attrs = self.inspect_network(container_network)
1✔
643
        containers = network_attrs.get("Containers") or {}
1✔
644
        if container_id not in containers:
1✔
645
            LOG.debug("Network attributes: %s", network_attrs)
1✔
646
            try:
1✔
647
                inspection = self.inspect_container(container_name_or_id=container_name_or_id)
1✔
648
                LOG.debug("Container %s Attributes: %s", container_name_or_id, inspection)
1✔
649
                logs = self.get_container_logs(container_name_or_id=container_name_or_id)
1✔
650
                LOG.debug("Container %s Logs: %s", container_name_or_id, logs)
1✔
UNCOV
651
            except ContainerException as e:
×
UNCOV
652
                LOG.debug("Cannot inspect container %s: %s", container_name_or_id, e)
×
653
            raise ContainerException(
1✔
654
                "Container %s is not connected to target network %s",
655
                container_name_or_id,
656
                container_network,
657
            )
658
        try:
1✔
659
            ip = str(ipaddress.IPv4Interface(containers[container_id]["IPv4Address"]).ip)
1✔
UNCOV
660
        except Exception as e:
×
UNCOV
661
            raise ContainerException(
×
662
                f"Unable to detect IP address for container {container_name_or_id} in network {container_network}: {e}"
663
            )
664
        return ip
1✔
665

666
    @abstractmethod
1✔
667
    def stop_container(self, container_name: str, timeout: int = 10):
1✔
668
        """Stops container with given name
669
        :param container_name: Container identifier (name or id) of the container to be stopped
670
        :param timeout: Timeout after which SIGKILL is sent to the container.
671
        """
672

673
    @abstractmethod
1✔
674
    def restart_container(self, container_name: str, timeout: int = 10):
1✔
675
        """Restarts a container with the given name.
676
        :param container_name: Container identifier
677
        :param timeout: Seconds to wait for stop before killing the container
678
        """
679

680
    @abstractmethod
1✔
681
    def pause_container(self, container_name: str):
1✔
682
        """Pauses a container with the given name."""
683

684
    @abstractmethod
1✔
685
    def unpause_container(self, container_name: str):
1✔
686
        """Unpauses a container with the given name."""
687

688
    @abstractmethod
1✔
689
    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:
1✔
690
        """Removes container with given name"""
691

692
    @abstractmethod
1✔
693
    def remove_image(self, image: str, force: bool = True) -> None:
1✔
694
        """Removes an image with given name
695

696
        :param image: Image name and tag
697
        :param force: Force removal
698
        """
699

700
    @abstractmethod
1✔
701
    def list_containers(self, filter: Union[list[str], str, None] = None, all=True) -> list[dict]:
1✔
702
        """List all containers matching the given filters
703

704
        :return: A list of dicts with keys id, image, name, labels, status
705
        """
706

707
    def get_running_container_names(self) -> list[str]:
1✔
708
        """Returns a list of the names of all running containers"""
709
        result = self.list_containers(all=False)
1✔
710
        result = [container["name"] for container in result]
1✔
711
        return result
1✔
712

713
    def is_container_running(self, container_name: str) -> bool:
1✔
714
        """Checks whether a container with a given name is currently running"""
715
        return container_name in self.get_running_container_names()
1✔
716

717
    def create_file_in_container(
1✔
718
        self,
719
        container_name,
720
        file_contents: bytes,
721
        container_path: str,
722
        chmod_mode: Optional[int] = None,
723
    ) -> None:
724
        """
725
        Create a file in container with the provided content. Provide the 'chmod_mode' argument if you want the file to have specific permissions.
726
        """
727
        with tempfile.NamedTemporaryFile() as tmp:
1✔
728
            tmp.write(file_contents)
1✔
729
            tmp.flush()
1✔
730
            if chmod_mode is not None:
1✔
UNCOV
731
                chmod_r(tmp.name, chmod_mode)
×
732
            self.copy_into_container(
1✔
733
                container_name=container_name,
734
                local_path=tmp.name,
735
                container_path=container_path,
736
            )
737

738
    @abstractmethod
1✔
739
    def copy_into_container(
1✔
740
        self, container_name: str, local_path: str, container_path: str
741
    ) -> None:
742
        """Copy contents of the given local path into the container"""
743

744
    @abstractmethod
1✔
745
    def copy_from_container(
1✔
746
        self, container_name: str, local_path: str, container_path: str
747
    ) -> None:
748
        """Copy contents of the given container to the host"""
749

750
    @abstractmethod
1✔
751
    def pull_image(
1✔
752
        self,
753
        docker_image: str,
754
        platform: Optional[DockerPlatform] = None,
755
        log_handler: Optional[Callable[[str], None]] = None,
756
    ) -> None:
757
        """
758
        Pulls an image with a given name from a Docker registry
759

760
        :log_handler: Optional parameter that can be used to process the logs. Logs will be streamed if possible, but this is not guaranteed.
761
        """
762

763
    @abstractmethod
1✔
764
    def push_image(self, docker_image: str) -> None:
1✔
765
        """Pushes an image with a given name to a Docker registry"""
766

767
    @abstractmethod
1✔
768
    def build_image(
1✔
769
        self,
770
        dockerfile_path: str,
771
        image_name: str,
772
        context_path: str = None,
773
        platform: Optional[DockerPlatform] = None,
774
    ) -> str:
775
        """Builds an image from the given Dockerfile
776

777
        :param dockerfile_path: Path to Dockerfile, or a directory that contains a Dockerfile
778
        :param image_name: Name of the image to be built
779
        :param context_path: Path for build context (defaults to dirname of Dockerfile)
780
        :param platform: Target platform for build (defaults to platform of Docker host)
781
        :return: Build logs as a string.
782
        """
783

784
    @abstractmethod
1✔
785
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
786
        """Tags an image with a new name
787

788
        :param source_ref: Name or ID of the image to be tagged
789
        :param target_name: New name (tag) of the tagged image
790
        """
791

792
    @abstractmethod
1✔
793
    def get_docker_image_names(
1✔
794
        self,
795
        strip_latest: bool = True,
796
        include_tags: bool = True,
797
        strip_wellknown_repo_prefixes: bool = True,
798
    ) -> list[str]:
799
        """
800
        Get all names of docker images available to the container engine
801
        :param strip_latest: return images both with and without :latest tag
802
        :param include_tags: include tags of the images in the names
803
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
804
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
805
        :return: List of image names
806
        """
807

808
    @abstractmethod
1✔
809
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
810
        """Get all logs of a given container"""
811

812
    @abstractmethod
1✔
813
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
814
        """Returns a blocking generator you can iterate over to retrieve log output as it happens."""
815

816
    @abstractmethod
1✔
817
    def inspect_container(self, container_name_or_id: str) -> dict[str, Union[dict, str]]:
1✔
818
        """Get detailed attributes of a container.
819

820
        :return: Dict containing docker attributes as returned by the daemon
821
        """
822

823
    def inspect_container_volumes(self, container_name_or_id) -> list[VolumeInfo]:
1✔
824
        """Return information about the volumes mounted into the given container.
825

826
        :param container_name_or_id: the container name or id
827
        :return: a list of volumes
828
        """
829
        volumes = []
1✔
830
        for doc in self.inspect_container(container_name_or_id)["Mounts"]:
1✔
831
            volumes.append(VolumeInfo(**{k.lower(): v for k, v in doc.items()}))
1✔
832

833
        return volumes
1✔
834

835
    @abstractmethod
1✔
836
    def inspect_image(
1✔
837
        self, image_name: str, pull: bool = True, strip_wellknown_repo_prefixes: bool = True
838
    ) -> dict[str, Union[dict, list, str]]:
839
        """Get detailed attributes of an image.
840

841
        :param image_name: Image name to inspect
842
        :param pull: Whether to pull image if not existent
843
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
844
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
845
        :return: Dict containing docker attributes as returned by the daemon
846
        """
847

848
    @abstractmethod
1✔
849
    def create_network(self, network_name: str) -> str:
1✔
850
        """
851
        Creates a network with the given name
852
        :param network_name: Name of the network
853
        :return Network ID
854
        """
855

856
    @abstractmethod
1✔
857
    def delete_network(self, network_name: str) -> None:
1✔
858
        """
859
        Delete a network with the given name
860
        :param network_name: Name of the network
861
        """
862

863
    @abstractmethod
1✔
864
    def inspect_network(self, network_name: str) -> dict[str, Union[dict, str]]:
1✔
865
        """Get detailed attributes of an network.
866

867
        :return: Dict containing docker attributes as returned by the daemon
868
        """
869

870
    @abstractmethod
1✔
871
    def connect_container_to_network(
1✔
872
        self,
873
        network_name: str,
874
        container_name_or_id: str,
875
        aliases: Optional[list] = None,
876
        link_local_ips: list[str] = None,
877
    ) -> None:
878
        """
879
        Connects a container to a given network
880
        :param network_name: Network to connect the container to
881
        :param container_name_or_id: Container to connect to the network
882
        :param aliases: List of dns names the container should be available under in the network
883
        :param link_local_ips: List of link-local (IPv4 or IPv6) addresses
884
        """
885

886
    @abstractmethod
1✔
887
    def disconnect_container_from_network(
1✔
888
        self, network_name: str, container_name_or_id: str
889
    ) -> None:
890
        """
891
        Disconnects a container from a given network
892
        :param network_name: Network to disconnect the container from
893
        :param container_name_or_id: Container to disconnect from the network
894
        """
895

896
    def get_container_name(self, container_id: str) -> str:
1✔
897
        """Get the name of a container by a given identifier"""
898
        return self.inspect_container(container_id)["Name"].lstrip("/")
1✔
899

900
    def get_container_id(self, container_name: str) -> str:
1✔
901
        """Get the id of a container by a given name"""
902
        return self.inspect_container(container_name)["Id"]
1✔
903

904
    @abstractmethod
1✔
905
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
906
        """Get the IP address of a given container
907

908
        If container has multiple networks, it will return the IP of the first
909
        """
910

911
    def get_image_cmd(self, docker_image: str, pull: bool = True) -> list[str]:
1✔
912
        """Get the command for the given image
913
        :param docker_image: Docker image to inspect
914
        :param pull: Whether to pull if image is not present
915
        :return: Image command in its array form
916
        """
917
        cmd_list = self.inspect_image(docker_image, pull)["Config"]["Cmd"] or []
1✔
918
        return cmd_list
1✔
919

920
    def get_image_entrypoint(self, docker_image: str, pull: bool = True) -> str:
1✔
921
        """Get the entry point for the given image
922
        :param docker_image: Docker image to inspect
923
        :param pull: Whether to pull if image is not present
924
        :return: Image entrypoint
925
        """
926
        LOG.debug("Getting the entrypoint for image: %s", docker_image)
1✔
927
        entrypoint_list = self.inspect_image(docker_image, pull)["Config"].get("Entrypoint") or []
1✔
928
        return shlex.join(entrypoint_list)
1✔
929

930
    @abstractmethod
1✔
931
    def has_docker(self) -> bool:
1✔
932
        """Check if system has docker available"""
933

934
    @abstractmethod
1✔
935
    def commit(
1✔
936
        self,
937
        container_name_or_id: str,
938
        image_name: str,
939
        image_tag: str,
940
    ):
941
        """Create an image from a running container.
942

943
        :param container_name_or_id: Source container
944
        :param image_name: Destination image name
945
        :param image_tag: Destination image tag
946
        """
947

948
    def create_container_from_config(self, container_config: ContainerConfiguration) -> str:
1✔
949
        """
950
        Similar to create_container, but allows passing the whole ContainerConfiguration
951
        :param container_config: ContainerConfiguration how to start the container
952
        :return: Container ID
953
        """
954
        return self.create_container(
1✔
955
            image_name=container_config.image_name,
956
            name=container_config.name,
957
            entrypoint=container_config.entrypoint,
958
            remove=container_config.remove,
959
            interactive=container_config.interactive,
960
            tty=container_config.tty,
961
            command=container_config.command,
962
            volumes=container_config.volumes,
963
            ports=container_config.ports,
964
            exposed_ports=container_config.exposed_ports,
965
            env_vars=container_config.env_vars,
966
            user=container_config.user,
967
            cap_add=container_config.cap_add,
968
            cap_drop=container_config.cap_drop,
969
            security_opt=container_config.security_opt,
970
            network=container_config.network,
971
            dns=container_config.dns,
972
            additional_flags=container_config.additional_flags,
973
            workdir=container_config.workdir,
974
            privileged=container_config.privileged,
975
            platform=container_config.platform,
976
            labels=container_config.labels,
977
            ulimits=container_config.ulimits,
978
            init=container_config.init,
979
            log_config=container_config.log_config,
980
        )
981

982
    @abstractmethod
1✔
983
    def create_container(
1✔
984
        self,
985
        image_name: str,
986
        *,
987
        name: Optional[str] = None,
988
        entrypoint: Optional[Union[list[str], str]] = None,
989
        remove: bool = False,
990
        interactive: bool = False,
991
        tty: bool = False,
992
        detach: bool = False,
993
        command: Optional[Union[list[str], str]] = None,
994
        volumes: Optional[Union[VolumeMappings, list[SimpleVolumeBind]]] = None,
995
        ports: Optional[PortMappings] = None,
996
        exposed_ports: Optional[list[str]] = None,
997
        env_vars: Optional[dict[str, str]] = None,
998
        user: Optional[str] = None,
999
        cap_add: Optional[list[str]] = None,
1000
        cap_drop: Optional[list[str]] = None,
1001
        security_opt: Optional[list[str]] = None,
1002
        network: Optional[str] = None,
1003
        dns: Optional[Union[str, list[str]]] = None,
1004
        additional_flags: Optional[str] = None,
1005
        workdir: Optional[str] = None,
1006
        privileged: Optional[bool] = None,
1007
        labels: Optional[dict[str, str]] = None,
1008
        platform: Optional[DockerPlatform] = None,
1009
        ulimits: Optional[list[Ulimit]] = None,
1010
        init: Optional[bool] = None,
1011
        log_config: Optional[LogConfig] = None,
1012
    ) -> str:
1013
        """Creates a container with the given image
1014

1015
        :return: Container ID
1016
        """
1017

1018
    @abstractmethod
1✔
1019
    def run_container(
1✔
1020
        self,
1021
        image_name: str,
1022
        stdin: bytes = None,
1023
        *,
1024
        name: Optional[str] = None,
1025
        entrypoint: Optional[str] = None,
1026
        remove: bool = False,
1027
        interactive: bool = False,
1028
        tty: bool = False,
1029
        detach: bool = False,
1030
        command: Optional[Union[list[str], str]] = None,
1031
        volumes: Optional[Union[VolumeMappings, list[SimpleVolumeBind]]] = None,
1032
        ports: Optional[PortMappings] = None,
1033
        exposed_ports: Optional[list[str]] = None,
1034
        env_vars: Optional[dict[str, str]] = None,
1035
        user: Optional[str] = None,
1036
        cap_add: Optional[list[str]] = None,
1037
        cap_drop: Optional[list[str]] = None,
1038
        security_opt: Optional[list[str]] = None,
1039
        network: Optional[str] = None,
1040
        dns: Optional[str] = None,
1041
        additional_flags: Optional[str] = None,
1042
        workdir: Optional[str] = None,
1043
        labels: Optional[dict[str, str]] = None,
1044
        platform: Optional[DockerPlatform] = None,
1045
        privileged: Optional[bool] = None,
1046
        ulimits: Optional[list[Ulimit]] = None,
1047
        init: Optional[bool] = None,
1048
        log_config: Optional[LogConfig] = None,
1049
    ) -> tuple[bytes, bytes]:
1050
        """Creates and runs a given docker container
1051

1052
        :return: A tuple (stdout, stderr)
1053
        """
1054

1055
    def run_container_from_config(
1✔
1056
        self, container_config: ContainerConfiguration
1057
    ) -> tuple[bytes, bytes]:
1058
        """Like ``run_container`` but uses the parameters from the configuration."""
1059

UNCOV
1060
        return self.run_container(
×
1061
            image_name=container_config.image_name,
1062
            stdin=container_config.stdin,
1063
            name=container_config.name,
1064
            entrypoint=container_config.entrypoint,
1065
            remove=container_config.remove,
1066
            interactive=container_config.interactive,
1067
            tty=container_config.tty,
1068
            detach=container_config.detach,
1069
            command=container_config.command,
1070
            volumes=container_config.volumes,
1071
            ports=container_config.ports,
1072
            exposed_ports=container_config.exposed_ports,
1073
            env_vars=container_config.env_vars,
1074
            user=container_config.user,
1075
            cap_add=container_config.cap_add,
1076
            cap_drop=container_config.cap_drop,
1077
            security_opt=container_config.security_opt,
1078
            network=container_config.network,
1079
            dns=container_config.dns,
1080
            additional_flags=container_config.additional_flags,
1081
            workdir=container_config.workdir,
1082
            platform=container_config.platform,
1083
            privileged=container_config.privileged,
1084
            ulimits=container_config.ulimits,
1085
            init=container_config.init,
1086
            log_config=container_config.log_config,
1087
        )
1088

1089
    @abstractmethod
1✔
1090
    def exec_in_container(
1✔
1091
        self,
1092
        container_name_or_id: str,
1093
        command: Union[list[str], str],
1094
        interactive: bool = False,
1095
        detach: bool = False,
1096
        env_vars: Optional[dict[str, Optional[str]]] = None,
1097
        stdin: Optional[bytes] = None,
1098
        user: Optional[str] = None,
1099
        workdir: Optional[str] = None,
1100
    ) -> tuple[bytes, bytes]:
1101
        """Execute a given command in a container
1102

1103
        :return: A tuple (stdout, stderr)
1104
        """
1105

1106
    @abstractmethod
1✔
1107
    def start_container(
1✔
1108
        self,
1109
        container_name_or_id: str,
1110
        stdin: bytes = None,
1111
        interactive: bool = False,
1112
        attach: bool = False,
1113
        flags: Optional[str] = None,
1114
    ) -> tuple[bytes, bytes]:
1115
        """Start a given, already created container
1116

1117
        :return: A tuple (stdout, stderr) if attach or interactive is set, otherwise a tuple (b"container_name_or_id", b"")
1118
        """
1119

1120
    @abstractmethod
1✔
1121
    def attach_to_container(self, container_name_or_id: str):
1✔
1122
        """
1123
        Attach local standard input, output, and error streams to a running container
1124
        """
1125

1126
    @abstractmethod
1✔
1127
    def login(self, username: str, password: str, registry: Optional[str] = None) -> None:
1✔
1128
        """
1129
        Login into an OCI registry
1130

1131
        :param username: Username for the registry
1132
        :param password: Password / token for the registry
1133
        :param registry: Registry url
1134
        """
1135

1136

1137
class Util:
1✔
1138
    MAX_ENV_ARGS_LENGTH = 20000
1✔
1139

1140
    @staticmethod
1✔
1141
    def format_env_vars(key: str, value: Optional[str]):
1✔
1142
        if value is None:
1✔
UNCOV
1143
            return key
×
1144
        return f"{key}={value}"
1✔
1145

1146
    @classmethod
1✔
1147
    def create_env_vars_file_flag(cls, env_vars: dict) -> tuple[list[str], Optional[str]]:
1✔
1148
        if not env_vars:
1✔
UNCOV
1149
            return [], None
×
1150
        result = []
1✔
1151
        env_vars = dict(env_vars)
1✔
1152
        env_file = None
1✔
1153
        if len(str(env_vars)) > cls.MAX_ENV_ARGS_LENGTH:
1✔
1154
            # default ARG_MAX=131072 in Docker - let's create an env var file if the string becomes too long...
UNCOV
1155
            env_file = cls.mountable_tmp_file()
×
UNCOV
1156
            env_content = ""
×
UNCOV
1157
            for name, value in dict(env_vars).items():
×
1158
                if len(value) > cls.MAX_ENV_ARGS_LENGTH:
×
1159
                    # each line in the env file has a max size as well (error "bufio.Scanner: token too long")
1160
                    continue
×
1161
                env_vars.pop(name)
×
UNCOV
1162
                value = value.replace("\n", "\\")
×
1163
                env_content += f"{cls.format_env_vars(name, value)}\n"
×
1164
            save_file(env_file, env_content)
×
1165
            result += ["--env-file", env_file]
×
1166

1167
        env_vars_res = [
1✔
1168
            item for k, v in env_vars.items() for item in ["-e", cls.format_env_vars(k, v)]
1169
        ]
1170
        result += env_vars_res
1✔
1171
        return result, env_file
1✔
1172

1173
    @staticmethod
1✔
1174
    def rm_env_vars_file(env_vars_file) -> None:
1✔
1175
        if env_vars_file:
1✔
UNCOV
1176
            return rm_rf(env_vars_file)
×
1177

1178
    @staticmethod
1✔
1179
    def mountable_tmp_file():
1✔
UNCOV
1180
        f = os.path.join(config.dirs.mounted_tmp, short_uid())
×
UNCOV
1181
        TMP_FILES.append(f)
×
UNCOV
1182
        return f
×
1183

1184
    @staticmethod
1✔
1185
    def append_without_latest(image_names: list[str]):
1✔
1186
        suffix = ":latest"
1✔
1187
        for image in list(image_names):
1✔
1188
            if image.endswith(suffix):
1✔
1189
                image_names.append(image[: -len(suffix)])
1✔
1190

1191
    @staticmethod
1✔
1192
    def strip_wellknown_repo_prefixes(image_names: list[str]) -> list[str]:
1✔
1193
        """
1194
        Remove well-known repo prefixes like `localhost/` or `docker.io/library/` from the list of given
1195
        image names. This is mostly to ensure compatibility of our Docker client with Podman API responses.
1196
        :return: a copy of the list of image names, with well-known repo prefixes removed
1197
        """
1198
        result = []
1✔
1199
        for image in image_names:
1✔
1200
            for prefix in WELL_KNOWN_IMAGE_REPO_PREFIXES:
1✔
1201
                if image.startswith(prefix):
1✔
UNCOV
1202
                    image = image.removeprefix(prefix)
×
1203
                    # strip only one of the matching prefixes (avoid multi-stripping)
UNCOV
1204
                    break
×
1205
            result.append(image)
1✔
1206
        return result
1✔
1207

1208
    @staticmethod
1✔
1209
    def tar_path(path: str, target_path: str, is_dir: bool):
1✔
1210
        f = tempfile.NamedTemporaryFile()
1✔
1211
        with tarfile.open(mode="w", fileobj=f) as t:
1✔
1212
            abs_path = os.path.abspath(path)
1✔
1213
            arcname = (
1✔
1214
                os.path.basename(path)
1215
                if is_dir
1216
                else (os.path.basename(target_path) or os.path.basename(path))
1217
            )
1218
            t.add(abs_path, arcname=arcname)
1✔
1219

1220
        f.seek(0)
1✔
1221
        return f
1✔
1222

1223
    @staticmethod
1✔
1224
    def untar_to_path(tardata, target_path):
1✔
1225
        target_path = Path(target_path)
1✔
1226
        with tarfile.open(mode="r", fileobj=io.BytesIO(b"".join(b for b in tardata))) as t:
1✔
1227
            if target_path.is_dir():
1✔
1228
                t.extractall(path=target_path)
1✔
1229
            else:
1230
                member = t.next()
1✔
1231
                if member:
1✔
1232
                    member.name = target_path.name
1✔
1233
                    t.extract(member, target_path.parent)
1✔
1234
                else:
UNCOV
1235
                    LOG.debug("File to copy empty, ignoring...")
×
1236

1237
    @staticmethod
1✔
1238
    def _read_docker_cli_env_file(env_file: str) -> dict[str, str]:
1✔
1239
        """
1240
        Read an environment file in docker CLI format, specified here:
1241
        https://docs.docker.com/reference/cli/docker/container/run/#env
1242
        :param env_file: Path to the environment file
1243
        :return: Read environment variables
1244
        """
1245
        env_vars = {}
1✔
1246
        try:
1✔
1247
            with open(env_file) as f:
1✔
1248
                env_file_lines = f.readlines()
1✔
UNCOV
1249
        except FileNotFoundError as e:
×
UNCOV
1250
            LOG.error(
×
1251
                "Specified env file '%s' not found. Please make sure the file is properly mounted into the LocalStack container. Error: %s",
1252
                env_file,
1253
                e,
1254
            )
UNCOV
1255
            raise
×
UNCOV
1256
        except OSError as e:
×
UNCOV
1257
            LOG.error(
×
1258
                "Could not read env file '%s'. Please make sure the LocalStack container has the permissions to read it. Error: %s",
1259
                env_file,
1260
                e,
1261
            )
UNCOV
1262
            raise
×
1263
        for idx, line in enumerate(env_file_lines):
1✔
1264
            line = line.strip()
1✔
1265
            if not line or line.startswith("#"):
1✔
1266
                # skip comments or empty lines
1267
                continue
1✔
1268
            lhs, separator, rhs = line.partition("=")
1✔
1269
            if rhs or separator:
1✔
1270
                env_vars[lhs] = rhs
1✔
1271
            else:
1272
                # No "=" in the line, only the name => lookup in local env
1273
                if env_value := os.environ.get(lhs):
1✔
1274
                    env_vars[lhs] = env_value
1✔
1275
        return env_vars
1✔
1276

1277
    @staticmethod
1✔
1278
    def parse_additional_flags(
1✔
1279
        additional_flags: str,
1280
        env_vars: Optional[dict[str, str]] = None,
1281
        labels: Optional[dict[str, str]] = None,
1282
        volumes: Optional[list[SimpleVolumeBind]] = None,
1283
        network: Optional[str] = None,
1284
        platform: Optional[DockerPlatform] = None,
1285
        ports: Optional[PortMappings] = None,
1286
        privileged: Optional[bool] = None,
1287
        user: Optional[str] = None,
1288
        ulimits: Optional[list[Ulimit]] = None,
1289
        dns: Optional[Union[str, list[str]]] = None,
1290
    ) -> DockerRunFlags:
1291
        """Parses additional CLI-formatted Docker flags, which could overwrite provided defaults.
1292
        :param additional_flags: String which contains the flag definitions inspired by the Docker CLI reference:
1293
                                 https://docs.docker.com/engine/reference/commandline/run/
1294
        :param env_vars: Dict with env vars. Will be modified in place.
1295
        :param labels: Dict with labels. Will be modified in place.
1296
        :param volumes: List of mount tuples (host_path, container_path). Will be modified in place.
1297
        :param network: Existing network name (optional). Warning will be printed if network is overwritten in flags.
1298
        :param platform: Platform to execute container. Warning will be printed if platform is overwritten in flags.
1299
        :param ports: PortMapping object. Will be modified in place.
1300
        :param privileged: Run the container in privileged mode. Warning will be printed if overwritten in flags.
1301
        :param ulimits: ulimit options in the format <type>=<soft limit>[:<hard limit>]
1302
        :param user: User to run first process. Warning will be printed if user is overwritten in flags.
1303
        :param dns: List of DNS servers to configure the container with.
1304
        :return: A DockerRunFlags object that will return new objects if respective parameters were None and
1305
                additional flags contained a flag for that object or the same which are passed otherwise.
1306
        """
1307
        # Argparse refactoring opportunity: custom argparse actions can be used to modularize parsing (e.g., key=value)
1308
        # https://docs.python.org/3/library/argparse.html#action
1309

1310
        # Configure parser
1311
        parser = NoExitArgumentParser(description="Docker run flags parser")
1✔
1312
        parser.add_argument(
1✔
1313
            "--add-host",
1314
            help="Add a custom host-to-IP mapping (host:ip)",
1315
            dest="add_hosts",
1316
            action="append",
1317
        )
1318
        parser.add_argument(
1✔
1319
            "--env", "-e", help="Set environment variables", dest="envs", action="append"
1320
        )
1321
        parser.add_argument(
1✔
1322
            "--env-file",
1323
            help="Set environment variables via a file",
1324
            dest="env_files",
1325
            action="append",
1326
        )
1327
        parser.add_argument(
1✔
1328
            "--compose-env-file",
1329
            help="Set environment variables via a file, with a docker-compose supported feature set.",
1330
            dest="compose_env_files",
1331
            action="append",
1332
        )
1333
        parser.add_argument(
1✔
1334
            "--label", "-l", help="Add container meta data", dest="labels", action="append"
1335
        )
1336
        parser.add_argument("--network", help="Connect a container to a network")
1✔
1337
        parser.add_argument(
1✔
1338
            "--platform",
1339
            type=DockerPlatform,
1340
            help="Docker platform (e.g., linux/amd64 or linux/arm64)",
1341
        )
1342
        parser.add_argument(
1✔
1343
            "--privileged",
1344
            help="Give extended privileges to this container",
1345
            action="store_true",
1346
        )
1347
        parser.add_argument(
1✔
1348
            "--publish",
1349
            "-p",
1350
            help="Publish container port(s) to the host",
1351
            dest="publish_ports",
1352
            action="append",
1353
        )
1354
        parser.add_argument(
1✔
1355
            "--ulimit", help="Container ulimit settings", dest="ulimits", action="append"
1356
        )
1357
        parser.add_argument("--user", "-u", help="Username or UID to execute first process")
1✔
1358
        parser.add_argument(
1✔
1359
            "--volume", "-v", help="Bind mount a volume", dest="volumes", action="append"
1360
        )
1361
        parser.add_argument("--dns", help="Set custom DNS servers", dest="dns", action="append")
1✔
1362

1363
        # Parse
1364
        flags = shlex.split(additional_flags)
1✔
1365
        args = parser.parse_args(flags)
1✔
1366

1367
        # Post-process parsed flags
1368
        extra_hosts = None
1✔
1369
        if args.add_hosts:
1✔
1370
            for add_host in args.add_hosts:
1✔
1371
                extra_hosts = extra_hosts if extra_hosts is not None else {}
1✔
1372
                hosts_split = add_host.split(":")
1✔
1373
                extra_hosts[hosts_split[0]] = hosts_split[1]
1✔
1374

1375
        # set env file values before env values, as the latter override the earlier
1376
        if args.env_files:
1✔
1377
            env_vars = env_vars if env_vars is not None else {}
1✔
1378
            for env_file in args.env_files:
1✔
1379
                env_vars.update(Util._read_docker_cli_env_file(env_file))
1✔
1380

1381
        if args.compose_env_files:
1✔
1382
            env_vars = env_vars if env_vars is not None else {}
1✔
1383
            for env_file in args.compose_env_files:
1✔
1384
                env_vars.update(dotenv.dotenv_values(env_file))
1✔
1385

1386
        if args.envs:
1✔
1387
            env_vars = env_vars if env_vars is not None else {}
1✔
1388
            for env in args.envs:
1✔
1389
                lhs, _, rhs = env.partition("=")
1✔
1390
                env_vars[lhs] = rhs
1✔
1391

1392
        if args.labels:
1✔
1393
            labels = labels if labels is not None else {}
1✔
1394
            for label in args.labels:
1✔
1395
                key, _, value = label.partition("=")
1✔
1396
                # Only consider non-empty labels
1397
                if key:
1✔
1398
                    labels[key] = value
1✔
1399

1400
        if args.network:
1✔
1401
            LOG.warning(
1✔
1402
                "Overwriting Docker container network '%s' with new value '%s'",
1403
                network,
1404
                args.network,
1405
            )
1406
            network = args.network
1✔
1407

1408
        if args.platform:
1✔
1409
            LOG.warning(
1✔
1410
                "Overwriting Docker platform '%s' with new value '%s'",
1411
                platform,
1412
                args.platform,
1413
            )
1414
            platform = args.platform
1✔
1415

1416
        if args.privileged:
1✔
1417
            LOG.warning(
1✔
1418
                "Overwriting Docker container privileged flag %s with new value %s",
1419
                privileged,
1420
                args.privileged,
1421
            )
1422
            privileged = args.privileged
1✔
1423

1424
        if args.publish_ports:
1✔
1425
            for port_mapping in args.publish_ports:
1✔
1426
                port_split = port_mapping.split(":")
1✔
1427
                protocol = "tcp"
1✔
1428
                if len(port_split) == 2:
1✔
1429
                    host_port, container_port = port_split
1✔
1430
                elif len(port_split) == 3:
1✔
1431
                    LOG.warning(
1✔
1432
                        "Host part of port mappings are ignored currently in additional flags"
1433
                    )
1434
                    _, host_port, container_port = port_split
1✔
1435
                else:
1436
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
1✔
1437
                host_port_split = host_port.split("-")
1✔
1438
                if len(host_port_split) == 2:
1✔
1439
                    host_port = [int(host_port_split[0]), int(host_port_split[1])]
1✔
1440
                elif len(host_port_split) == 1:
1✔
1441
                    host_port = int(host_port)
1✔
1442
                else:
UNCOV
1443
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
×
1444
                if "/" in container_port:
1✔
1445
                    container_port, protocol = container_port.split("/")
1✔
1446
                ports = ports if ports is not None else PortMappings()
1✔
1447
                ports.add(host_port, int(container_port), protocol)
1✔
1448

1449
        if args.ulimits:
1✔
1450
            ulimits = ulimits if ulimits is not None else []
1✔
1451
            ulimits_dict = {ul.name: ul for ul in ulimits}
1✔
1452
            for ulimit in args.ulimits:
1✔
1453
                name, _, rhs = ulimit.partition("=")
1✔
1454
                soft, _, hard = rhs.partition(":")
1✔
1455
                hard_limit = int(hard) if hard else int(soft)
1✔
1456
                new_ulimit = Ulimit(name=name, soft_limit=int(soft), hard_limit=hard_limit)
1✔
1457
                if ulimits_dict.get(name):
1✔
1458
                    LOG.warning("Overwriting Docker ulimit %s", new_ulimit)
1✔
1459
                ulimits_dict[name] = new_ulimit
1✔
1460
            ulimits = list(ulimits_dict.values())
1✔
1461

1462
        if args.user:
1✔
1463
            LOG.warning(
1✔
1464
                "Overwriting Docker user '%s' with new value '%s'",
1465
                user,
1466
                args.user,
1467
            )
1468
            user = args.user
1✔
1469

1470
        if args.volumes:
1✔
1471
            volumes = volumes if volumes is not None else []
1✔
1472
            for volume in args.volumes:
1✔
1473
                match = re.match(
1✔
1474
                    r"(?P<host>[\w\s\\\/:\-.]+?):(?P<container>[\w\s\/\-.]+)(?::(?P<arg>ro|rw|z|Z))?",
1475
                    volume,
1476
                )
1477
                if not match:
1✔
UNCOV
1478
                    LOG.warning("Unable to parse volume mount Docker flags: %s", volume)
×
UNCOV
1479
                    continue
×
1480
                host_path = match.group("host")
1✔
1481
                container_path = match.group("container")
1✔
1482
                rw_args = match.group("arg")
1✔
1483
                if rw_args:
1✔
1484
                    LOG.info("Volume options like :ro or :rw are currently ignored.")
1✔
1485
                volumes.append((host_path, container_path))
1✔
1486

1487
        dns = ensure_list(dns or [])
1✔
1488
        if args.dns:
1✔
1489
            LOG.info(
1✔
1490
                "Extending Docker container DNS servers %s with additional values %s", dns, args.dns
1491
            )
1492
            dns.extend(args.dns)
1✔
1493

1494
        return DockerRunFlags(
1✔
1495
            env_vars=env_vars,
1496
            extra_hosts=extra_hosts,
1497
            labels=labels,
1498
            volumes=volumes,
1499
            ports=ports,
1500
            network=network,
1501
            platform=platform,
1502
            privileged=privileged,
1503
            ulimits=ulimits,
1504
            user=user,
1505
            dns=dns,
1506
        )
1507

1508
    @staticmethod
1✔
1509
    def convert_mount_list_to_dict(
1✔
1510
        volumes: Union[list[SimpleVolumeBind], VolumeMappings],
1511
    ) -> dict[str, dict[str, str]]:
1512
        """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""
1513

1514
        def _map_to_dict(paths: SimpleVolumeBind | BindMount | VolumeDirMount):
1✔
1515
            if isinstance(paths, (BindMount, VolumeDirMount)):
1✔
1516
                return paths.to_docker_sdk_parameters()
1✔
1517
            else:
UNCOV
1518
                return str(paths[0]), {"bind": paths[1], "mode": "rw"}
×
1519

1520
        return dict(
1✔
1521
            map(
1522
                _map_to_dict,
1523
                volumes,
1524
            )
1525
        )
1526

1527
    @staticmethod
1✔
1528
    def resolve_dockerfile_path(dockerfile_path: str) -> str:
1✔
1529
        """If the given path is a directory that contains a Dockerfile, then return the file path to it."""
1530
        rel_path = os.path.join(dockerfile_path, "Dockerfile")
1✔
1531
        if os.path.isdir(dockerfile_path) and os.path.exists(rel_path):
1✔
1532
            return rel_path
1✔
1533
        return dockerfile_path
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc