• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17197523151

23 Aug 2025 03:45PM UTC coverage: 86.837% (-0.006%) from 86.843%
17197523151

push

github

web-flow
add traceback propagation of internal errors (#13044)

4 of 5 new or added lines in 1 file covered. (80.0%)

138 existing lines in 4 files now uncovered.

67071 of 77238 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.49
/localstack-core/localstack/utils/container_utils/container_client.py
1
import dataclasses
1✔
2
import io
1✔
3
import ipaddress
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import shlex
1✔
8
import sys
1✔
9
import tarfile
1✔
10
import tempfile
1✔
11
from abc import ABCMeta, abstractmethod
1✔
12
from enum import Enum, unique
1✔
13
from pathlib import Path
1✔
14
from typing import (
1✔
15
    Callable,
16
    Literal,
17
    NamedTuple,
18
    Optional,
19
    Protocol,
20
    TypedDict,
21
    Union,
22
    get_args,
23
)
24

25
import dotenv
1✔
26

27
from localstack import config
1✔
28
from localstack.constants import DEFAULT_VOLUME_DIR
1✔
29
from localstack.utils.collections import HashableList, ensure_list
1✔
30
from localstack.utils.files import TMP_FILES, chmod_r, rm_rf, save_file
1✔
31
from localstack.utils.no_exit_argument_parser import NoExitArgumentParser
1✔
32
from localstack.utils.strings import short_uid
1✔
33

34
LOG = logging.getLogger(__name__)
1✔
35

36
# list of well-known image repo prefixes that should be stripped off to canonicalize image names
37
WELL_KNOWN_IMAGE_REPO_PREFIXES = ("localhost/", "docker.io/library/")
1✔
38

39

40
@unique
1✔
41
class DockerContainerStatus(Enum):
1✔
42
    DOWN = -1
1✔
43
    NON_EXISTENT = 0
1✔
44
    UP = 1
1✔
45
    PAUSED = 2
1✔
46

47

48
class DockerContainerStats(TypedDict):
1✔
49
    """Container usage statistics"""
50

51
    Container: str
1✔
52
    ID: str
1✔
53
    Name: str
1✔
54
    BlockIO: tuple[int, int]
1✔
55
    CPUPerc: float
1✔
56
    MemPerc: float
1✔
57
    MemUsage: tuple[int, int]
1✔
58
    NetIO: tuple[int, int]
1✔
59
    PIDs: int
1✔
60
    SDKStats: Optional[dict]
1✔
61

62

63
class ContainerException(Exception):
1✔
64
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
1✔
65
        self.message = message or "Error during the communication with the docker daemon"
1✔
66
        self.stdout = stdout
1✔
67
        self.stderr = stderr
1✔
68

69

70
class NoSuchObject(ContainerException):
1✔
71
    def __init__(self, object_id: str, message=None, stdout=None, stderr=None) -> None:
1✔
72
        message = message or f"Docker object {object_id} not found"
1✔
73
        super().__init__(message, stdout, stderr)
1✔
74
        self.object_id = object_id
1✔
75

76

77
class NoSuchContainer(ContainerException):
1✔
78
    def __init__(self, container_name_or_id: str, message=None, stdout=None, stderr=None) -> None:
1✔
79
        message = message or f"Docker container {container_name_or_id} not found"
1✔
80
        super().__init__(message, stdout, stderr)
1✔
81
        self.container_name_or_id = container_name_or_id
1✔
82

83

84
class NoSuchImage(ContainerException):
1✔
85
    def __init__(self, image_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
86
        message = message or f"Docker image {image_name} not found"
1✔
87
        super().__init__(message, stdout, stderr)
1✔
88
        self.image_name = image_name
1✔
89

90

91
class NoSuchNetwork(ContainerException):
1✔
92
    def __init__(self, network_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
93
        message = message or f"Docker network {network_name} not found"
1✔
94
        super().__init__(message, stdout, stderr)
1✔
95
        self.network_name = network_name
1✔
96

97

98
class RegistryConnectionError(ContainerException):
1✔
99
    def __init__(self, details: str, message=None, stdout=None, stderr=None) -> None:
1✔
100
        message = message or f"Connection error: {details}"
1✔
101
        super().__init__(message, stdout, stderr)
1✔
102
        self.details = details
1✔
103

104

105
class DockerNotAvailable(ContainerException):
1✔
106
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
1✔
107
        message = message or "Docker not available"
1✔
108
        super().__init__(message, stdout, stderr)
1✔
109

110

111
class AccessDenied(ContainerException):
1✔
112
    def __init__(self, object_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
113
        message = message or f"Access denied to {object_name}"
1✔
114
        super().__init__(message, stdout, stderr)
1✔
115
        self.object_name = object_name
1✔
116

117

118
class CancellableStream(Protocol):
1✔
119
    """Describes a generator that can be closed. Borrowed from ``docker.types.daemon``."""
120

121
    def __iter__(self):
1✔
122
        raise NotImplementedError
123

124
    def __next__(self):
1✔
125
        raise NotImplementedError
126

127
    def close(self):
1✔
128
        raise NotImplementedError
129

130

131
class DockerPlatform(str):
1✔
132
    """Platform in the format ``os[/arch[/variant]]``"""
133

134
    linux_amd64 = "linux/amd64"
1✔
135
    linux_arm64 = "linux/arm64"
1✔
136

137

138
@dataclasses.dataclass
1✔
139
class Ulimit:
1✔
140
    """The ``ulimit`` settings for the container.
141
    See https://www.tutorialspoint.com/setting-ulimit-values-on-docker-containers
142
    """
143

144
    name: str
1✔
145
    soft_limit: int
1✔
146
    hard_limit: Optional[int] = None
1✔
147

148
    def __repr__(self):
149
        """Format: <type>=<soft limit>[:<hard limit>]"""
150
        ulimit_string = f"{self.name}={self.soft_limit}"
151
        if self.hard_limit:
152
            ulimit_string += f":{self.hard_limit}"
153
        return ulimit_string
154

155

156
# defines the type for port mappings (source->target port range)
157
PortRange = Union[list, HashableList]
1✔
158
# defines the protocol for a port range ("tcp" or "udp")
159
PortProtocol = str
1✔
160

161

162
def isinstance_union(obj, class_or_tuple):
1✔
163
    # that's some dirty hack
164
    if sys.version_info < (3, 10):
1✔
165
        return isinstance(obj, get_args(PortRange))
×
166
    else:
167
        return isinstance(obj, class_or_tuple)
1✔
168

169

170
class PortMappings:
1✔
171
    """Maps source to target port ranges for Docker port mappings."""
172

173
    # bind host to be used for defining port mappings
174
    bind_host: str
1✔
175
    # maps `from` port range to `to` port range for port mappings
176
    mappings: dict[tuple[PortRange, PortProtocol], list]
1✔
177

178
    def __init__(self, bind_host: str = None):
1✔
179
        self.bind_host = bind_host if bind_host else ""
1✔
180
        self.mappings = {}
1✔
181

182
    def add(
1✔
183
        self,
184
        port: Union[int, PortRange],
185
        mapped: Union[int, PortRange] = None,
186
        protocol: PortProtocol = "tcp",
187
    ):
188
        mapped = mapped or port
1✔
189
        if isinstance_union(port, PortRange):
1✔
190
            for i in range(port[1] - port[0] + 1):
1✔
191
                if isinstance_union(mapped, PortRange):
1✔
192
                    self.add(port[0] + i, mapped[0] + i, protocol)
1✔
193
                else:
194
                    self.add(port[0] + i, mapped, protocol)
1✔
195
            return
1✔
196
        if port is None or int(port) < 0:
1✔
197
            raise Exception(f"Unable to add mapping for invalid port: {port}")
×
198
        if self.contains(port, protocol):
1✔
199
            return
1✔
200
        bisected_host_port = None
1✔
201
        for (from_range, from_protocol), to_range in self.mappings.items():
1✔
202
            if not from_protocol == protocol:
1✔
203
                continue
1✔
204
            if not self.in_expanded_range(port, from_range):
1✔
205
                continue
1✔
206
            if not self.in_expanded_range(mapped, to_range):
1✔
207
                continue
1✔
208
            from_range_len = from_range[1] - from_range[0]
1✔
209
            to_range_len = to_range[1] - to_range[0]
1✔
210
            is_uniform = from_range_len == to_range_len
1✔
211
            if is_uniform:
1✔
212
                self.expand_range(port, from_range, protocol=protocol, remap=True)
1✔
213
                self.expand_range(mapped, to_range, protocol=protocol)
1✔
214
            else:
215
                if not self.in_range(mapped, to_range):
1✔
216
                    continue
1✔
217
                # extending a 1 to 1 mapping to be many to 1
218
                elif from_range_len == 1:
1✔
219
                    self.expand_range(port, from_range, protocol=protocol, remap=True)
1✔
220
                # splitting a uniform mapping
221
                else:
222
                    bisected_port_index = mapped - to_range[0]
1✔
223
                    bisected_host_port = from_range[0] + bisected_port_index
1✔
224
                    self.bisect_range(mapped, to_range, protocol=protocol)
1✔
225
                    self.bisect_range(bisected_host_port, from_range, protocol=protocol, remap=True)
1✔
226
                    break
1✔
227
            return
1✔
228
        if bisected_host_port is None:
1✔
229
            port_range = [port, port]
1✔
230
        elif bisected_host_port < port:
1✔
231
            port_range = [bisected_host_port, port]
1✔
232
        else:
233
            port_range = [port, bisected_host_port]
×
234
        protocol = str(protocol or "tcp").lower()
1✔
235
        self.mappings[(HashableList(port_range), protocol)] = [mapped, mapped]
1✔
236

237
    def to_str(self) -> str:
1✔
238
        bind_address = f"{self.bind_host}:" if self.bind_host else ""
1✔
239

240
        def entry(k, v):
1✔
241
            from_range, protocol = k
1✔
242
            to_range = v
1✔
243
            # use /<protocol> suffix if the protocol is not"tcp"
244
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
1✔
245
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
1✔
246
                return f"-p {bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"
1✔
247
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
1✔
248
                return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}{protocol_suffix}"
1✔
249
            return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}"
1✔
250

251
        return " ".join([entry(k, v) for k, v in self.mappings.items()])
1✔
252

253
    def to_list(self) -> list[str]:  # TODO test
1✔
254
        bind_address = f"{self.bind_host}:" if self.bind_host else ""
1✔
255

256
        def entry(k, v):
1✔
257
            from_range, protocol = k
1✔
258
            to_range = v
1✔
259
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
1✔
260
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
1✔
261
                return ["-p", f"{bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"]
1✔
262
            return [
1✔
263
                "-p",
264
                f"{bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}",
265
            ]
266

267
        return [item for k, v in self.mappings.items() for item in entry(k, v)]
1✔
268

269
    def to_dict(self) -> dict[str, Union[tuple[str, Union[int, list[int]]], int]]:
1✔
270
        bind_address = self.bind_host or ""
1✔
271

272
        def bind_port(bind_address, host_port):
1✔
273
            if host_port == 0:
1✔
274
                return None
1✔
275
            elif bind_address:
1✔
276
                return (bind_address, host_port)
1✔
277
            else:
278
                return host_port
1✔
279

280
        def entry(k, v):
1✔
281
            from_range, protocol = k
1✔
282
            to_range = v
1✔
283
            protocol_suffix = f"/{protocol}"
1✔
284
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
1✔
285
                container_port = to_range[0]
1✔
286
                host_ports = list(range(from_range[0], from_range[1] + 1))
1✔
287
                return [
1✔
288
                    (
289
                        f"{container_port}{protocol_suffix}",
290
                        (bind_address, host_ports) if bind_address else host_ports,
291
                    )
292
                ]
293
            return [
1✔
294
                (
295
                    f"{container_port}{protocol_suffix}",
296
                    bind_port(bind_address, host_port),
297
                )
298
                for container_port, host_port in zip(
299
                    range(to_range[0], to_range[1] + 1),
300
                    range(from_range[0], from_range[1] + 1),
301
                    strict=False,
302
                )
303
            ]
304

305
        items = [item for k, v in self.mappings.items() for item in entry(k, v)]
1✔
306
        return dict(items)
1✔
307

308
    def contains(self, port: int, protocol: PortProtocol = "tcp") -> bool:
1✔
309
        for from_range_w_protocol, to_range in self.mappings.items():
1✔
310
            from_protocol = from_range_w_protocol[1]
1✔
311
            if from_protocol == protocol:
1✔
312
                from_range = from_range_w_protocol[0]
1✔
313
                if self.in_range(port, from_range):
1✔
314
                    return True
1✔
315

316
    def in_range(self, port: int, range: PortRange) -> bool:
1✔
317
        return port >= range[0] and port <= range[1]
1✔
318

319
    def in_expanded_range(self, port: int, range: PortRange):
1✔
320
        return port >= range[0] - 1 and port <= range[1] + 1
1✔
321

322
    def expand_range(
1✔
323
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
324
    ):
325
        """
326
        Expand the given port range by the given port. If remap==True, put the updated range into self.mappings
327
        """
328
        if self.in_range(port, range):
1✔
329
            return
1✔
330
        new_range = list(range) if remap else range
1✔
331
        if port == range[0] - 1:
1✔
332
            new_range[0] = port
×
333
        elif port == range[1] + 1:
1✔
334
            new_range[1] = port
1✔
335
        else:
336
            raise Exception(f"Unable to add port {port} to existing range {range}")
×
337
        if remap:
1✔
338
            self._remap_range(range, new_range, protocol=protocol)
1✔
339

340
    def bisect_range(
1✔
341
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
342
    ):
343
        """
344
        Bisect a port range, at the provided port. This is needed in some cases when adding a
345
        non-uniform host to port mapping adjacent to an existing port range.
346
        If remap==True, put the updated range into self.mappings
347
        """
348
        if not self.in_range(port, range):
1✔
349
            return
×
350
        new_range = list(range) if remap else range
1✔
351
        if port == range[0]:
1✔
352
            new_range[0] = port + 1
×
353
        else:
354
            new_range[1] = port - 1
1✔
355
        if remap:
1✔
356
            self._remap_range(range, new_range, protocol)
1✔
357

358
    def _remap_range(self, old_key: PortRange, new_key: PortRange, protocol: PortProtocol):
1✔
359
        self.mappings[(HashableList(new_key), protocol)] = self.mappings.pop(
1✔
360
            (HashableList(old_key), protocol)
361
        )
362

363
    def __repr__(self):
364
        return f"<PortMappings: {self.to_dict()}>"
365

366

367
SimpleVolumeBind = tuple[str, str]
1✔
368
"""Type alias for a simple version of VolumeBind"""
1✔
369

370

371
@dataclasses.dataclass
1✔
372
class BindMount:
1✔
373
    """Represents a --volume argument run/create command. When using VolumeBind to bind-mount a file or directory
374
    that does not yet exist on the Docker host, -v creates the endpoint for you. It is always created as a directory.
375
    """
376

377
    host_dir: str
1✔
378
    container_dir: str
1✔
379
    read_only: bool = False
1✔
380

381
    def to_str(self) -> str:
1✔
382
        args = []
1✔
383

384
        if self.host_dir:
1✔
385
            args.append(self.host_dir)
1✔
386

387
        if not self.container_dir:
1✔
388
            raise ValueError("no container dir specified")
×
389

390
        args.append(self.container_dir)
1✔
391

392
        if self.read_only:
1✔
393
            args.append("ro")
×
394

395
        return ":".join(args)
1✔
396

397
    def to_docker_sdk_parameters(self) -> tuple[str, dict[str, str]]:
1✔
398
        return str(self.host_dir), {
1✔
399
            "bind": self.container_dir,
400
            "mode": "ro" if self.read_only else "rw",
401
        }
402

403
    @classmethod
1✔
404
    def parse(cls, param: str) -> "BindMount":
1✔
405
        parts = param.split(":")
1✔
406
        if 1 > len(parts) > 3:
1✔
407
            raise ValueError(f"Cannot parse volume bind {param}")
×
408

409
        volume = cls(parts[0], parts[1])
1✔
410
        if len(parts) == 3:
1✔
411
            if "ro" in parts[2].split(","):
1✔
412
                volume.read_only = True
1✔
413
        return volume
1✔
414

415

416
@dataclasses.dataclass
1✔
417
class VolumeDirMount:
1✔
418
    volume_path: str
1✔
419
    """
1✔
420
    Absolute path inside /var/lib/localstack to mount into the container
421
    """
422
    container_path: str
1✔
423
    """
1✔
424
    Target path inside the started container
425
    """
426
    read_only: bool = False
1✔
427

428
    def to_str(self) -> str:
1✔
429
        self._validate()
1✔
430
        from localstack.utils.docker_utils import get_host_path_for_path_in_docker
1✔
431

432
        host_dir = get_host_path_for_path_in_docker(self.volume_path)
1✔
433
        return f"{host_dir}:{self.container_path}{':ro' if self.read_only else ''}"
1✔
434

435
    def _validate(self):
1✔
436
        if not self.volume_path:
1✔
437
            raise ValueError("no volume dir specified")
×
438
        if config.is_in_docker and not self.volume_path.startswith(DEFAULT_VOLUME_DIR):
1✔
439
            raise ValueError(f"volume dir not starting with {DEFAULT_VOLUME_DIR}")
×
440
        if not self.container_path:
1✔
441
            raise ValueError("no container dir specified")
×
442

443
    def to_docker_sdk_parameters(self) -> tuple[str, dict[str, str]]:
1✔
444
        self._validate()
1✔
445
        from localstack.utils.docker_utils import get_host_path_for_path_in_docker
1✔
446

447
        host_dir = get_host_path_for_path_in_docker(self.volume_path)
1✔
448
        return host_dir, {
1✔
449
            "bind": self.container_path,
450
            "mode": "ro" if self.read_only else "rw",
451
        }
452

453

454
class VolumeMappings:
1✔
455
    mappings: list[Union[SimpleVolumeBind, BindMount]]
1✔
456

457
    def __init__(self, mappings: list[Union[SimpleVolumeBind, BindMount, VolumeDirMount]] = None):
1✔
458
        self.mappings = mappings if mappings is not None else []
1✔
459

460
    def add(self, mapping: Union[SimpleVolumeBind, BindMount, VolumeDirMount]):
1✔
461
        self.append(mapping)
1✔
462

463
    def append(
1✔
464
        self,
465
        mapping: Union[
466
            SimpleVolumeBind,
467
            BindMount,
468
            VolumeDirMount,
469
        ],
470
    ):
471
        self.mappings.append(mapping)
1✔
472

473
    def find_target_mapping(
1✔
474
        self, container_dir: str
475
    ) -> Optional[Union[SimpleVolumeBind, BindMount, VolumeDirMount]]:
476
        """
477
        Looks through the volumes and returns the one where the container dir matches ``container_dir``.
478
        Returns None if there is no volume mapping to the given container directory.
479

480
        :param container_dir: the target of the volume mapping, i.e., the path in the container
481
        :return: the volume mapping or None
482
        """
483
        for volume in self.mappings:
1✔
484
            target_dir = volume[1] if isinstance(volume, tuple) else volume.container_dir
1✔
485
            if container_dir == target_dir:
1✔
486
                return volume
×
487
        return None
1✔
488

489
    def __iter__(self):
1✔
490
        return self.mappings.__iter__()
1✔
491

492
    def __repr__(self):
493
        return self.mappings.__repr__()
494

495
    def __len__(self):
1✔
496
        return len(self.mappings)
1✔
497

498
    def __getitem__(self, item: int):
1✔
499
        return self.mappings[item]
×
500

501

502
VolumeType = Literal["bind", "volume"]
1✔
503

504

505
class VolumeInfo(NamedTuple):
1✔
506
    """Container volume information."""
507

508
    type: VolumeType
1✔
509
    source: str
1✔
510
    destination: str
1✔
511
    mode: str
1✔
512
    rw: bool
1✔
513
    propagation: str
1✔
514
    name: Optional[str] = None
1✔
515
    driver: Optional[str] = None
1✔
516

517

518
@dataclasses.dataclass
1✔
519
class LogConfig:
1✔
520
    type: Literal["json-file", "syslog", "journald", "gelf", "fluentd", "none", "awslogs", "splunk"]
1✔
521
    config: dict[str, str] = dataclasses.field(default_factory=dict)
1✔
522

523

524
@dataclasses.dataclass
1✔
525
class ContainerConfiguration:
1✔
526
    image_name: str
1✔
527
    name: Optional[str] = None
1✔
528
    volumes: VolumeMappings = dataclasses.field(default_factory=VolumeMappings)
1✔
529
    ports: PortMappings = dataclasses.field(default_factory=PortMappings)
1✔
530
    exposed_ports: list[str] = dataclasses.field(default_factory=list)
1✔
531
    entrypoint: Optional[Union[list[str], str]] = None
1✔
532
    additional_flags: Optional[str] = None
1✔
533
    command: Optional[list[str]] = None
1✔
534
    env_vars: dict[str, str] = dataclasses.field(default_factory=dict)
1✔
535

536
    privileged: bool = False
1✔
537
    remove: bool = False
1✔
538
    interactive: bool = False
1✔
539
    tty: bool = False
1✔
540
    detach: bool = False
1✔
541

542
    stdin: Optional[str] = None
1✔
543
    user: Optional[str] = None
1✔
544
    cap_add: Optional[list[str]] = None
1✔
545
    cap_drop: Optional[list[str]] = None
1✔
546
    security_opt: Optional[list[str]] = None
1✔
547
    network: Optional[str] = None
1✔
548
    dns: Optional[str] = None
1✔
549
    workdir: Optional[str] = None
1✔
550
    platform: Optional[str] = None
1✔
551
    ulimits: Optional[list[Ulimit]] = None
1✔
552
    labels: Optional[dict[str, str]] = None
1✔
553
    init: Optional[bool] = None
1✔
554
    log_config: Optional[LogConfig] = None
1✔
555

556

557
class ContainerConfigurator(Protocol):
1✔
558
    """Protocol for functional configurators. A ContainerConfigurator modifies, when called,
559
    a ContainerConfiguration in place."""
560

561
    def __call__(self, configuration: ContainerConfiguration):
1✔
562
        """
563
        Modify the given container configuration.
564

565
        :param configuration: the configuration to modify
566
        """
567
        ...
×
568

569

570
@dataclasses.dataclass
1✔
571
class DockerRunFlags:
1✔
572
    """Class to capture Docker run/create flags for a container.
573
    run: https://docs.docker.com/engine/reference/commandline/run/
574
    create: https://docs.docker.com/engine/reference/commandline/create/
575
    """
576

577
    env_vars: Optional[dict[str, str]]
1✔
578
    extra_hosts: Optional[dict[str, str]]
1✔
579
    labels: Optional[dict[str, str]]
1✔
580
    volumes: Optional[list[SimpleVolumeBind]]
1✔
581
    network: Optional[str]
1✔
582
    platform: Optional[DockerPlatform]
1✔
583
    privileged: Optional[bool]
1✔
584
    ports: Optional[PortMappings]
1✔
585
    ulimits: Optional[list[Ulimit]]
1✔
586
    user: Optional[str]
1✔
587
    dns: Optional[list[str]]
1✔
588

589

590
class RegistryResolverStrategy(Protocol):
1✔
591
    def resolve(self, image_name: str) -> str: ...
1✔
592

593

594
class HardCodedResolver:
1✔
595
    def resolve(self, image_name: str) -> str:  # noqa
1✔
596
        return image_name
1✔
597

598

599
# TODO: remove Docker/Podman compatibility switches (in particular strip_wellknown_repo_prefixes=...)
600
#  from the container client base interface and introduce derived Podman client implementations instead!
601
class ContainerClient(metaclass=ABCMeta):
1✔
602
    registry_resolver_strategy: RegistryResolverStrategy = HardCodedResolver()
1✔
603

604
    @abstractmethod
1✔
605
    def get_system_info(self) -> dict:
1✔
606
        """Returns the docker system-wide information as dictionary (``docker info``)."""
607

608
    def get_system_id(self) -> str:
1✔
609
        """Returns the unique and stable ID of the docker daemon."""
610
        return self.get_system_info()["ID"]
1✔
611

612
    @abstractmethod
1✔
613
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
614
        """Returns the status of the container with the given name"""
615
        pass
×
616

617
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
618
        """Returns the usage statistics of the container with the given name"""
619
        pass
×
620

621
    def get_networks(self, container_name: str) -> list[str]:
1✔
622
        LOG.debug("Getting networks for container: %s", container_name)
1✔
623
        container_attrs = self.inspect_container(container_name_or_id=container_name)
1✔
624
        return list(container_attrs["NetworkSettings"].get("Networks", {}).keys())
1✔
625

626
    def get_container_ipv4_for_network(
1✔
627
        self, container_name_or_id: str, container_network: str
628
    ) -> str:
629
        """
630
        Returns the IPv4 address for the container on the interface connected to the given network
631
        :param container_name_or_id: Container to inspect
632
        :param container_network: Network the IP address will belong to
633
        :return: IP address of the given container on the interface connected to the given network
634
        """
635
        LOG.debug(
1✔
636
            "Getting ipv4 address for container %s in network %s.",
637
            container_name_or_id,
638
            container_network,
639
        )
640
        # we always need the ID for this
641
        container_id = self.get_container_id(container_name=container_name_or_id)
1✔
642
        network_attrs = self.inspect_network(container_network)
1✔
643
        containers = network_attrs.get("Containers") or {}
1✔
644
        if container_id not in containers:
1✔
645
            LOG.debug("Network attributes: %s", network_attrs)
1✔
646
            try:
1✔
647
                inspection = self.inspect_container(container_name_or_id=container_name_or_id)
1✔
648
                LOG.debug("Container %s Attributes: %s", container_name_or_id, inspection)
1✔
649
                logs = self.get_container_logs(container_name_or_id=container_name_or_id)
1✔
650
                LOG.debug("Container %s Logs: %s", container_name_or_id, logs)
1✔
651
            except ContainerException as e:
×
652
                LOG.debug("Cannot inspect container %s: %s", container_name_or_id, e)
×
653
            raise ContainerException(
1✔
654
                "Container %s is not connected to target network %s",
655
                container_name_or_id,
656
                container_network,
657
            )
658
        try:
1✔
659
            ip = str(ipaddress.IPv4Interface(containers[container_id]["IPv4Address"]).ip)
1✔
660
        except Exception as e:
×
661
            raise ContainerException(
×
662
                f"Unable to detect IP address for container {container_name_or_id} in network {container_network}: {e}"
663
            )
664
        return ip
1✔
665

666
    @abstractmethod
1✔
667
    def stop_container(self, container_name: str, timeout: int = 10):
1✔
668
        """Stops container with given name
669
        :param container_name: Container identifier (name or id) of the container to be stopped
670
        :param timeout: Timeout after which SIGKILL is sent to the container.
671
        """
672

673
    @abstractmethod
1✔
674
    def restart_container(self, container_name: str, timeout: int = 10):
1✔
675
        """Restarts a container with the given name.
676
        :param container_name: Container identifier
677
        :param timeout: Seconds to wait for stop before killing the container
678
        """
679

680
    @abstractmethod
1✔
681
    def pause_container(self, container_name: str):
1✔
682
        """Pauses a container with the given name."""
683

684
    @abstractmethod
1✔
685
    def unpause_container(self, container_name: str):
1✔
686
        """Unpauses a container with the given name."""
687

688
    @abstractmethod
1✔
689
    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:
1✔
690
        """Removes container with given name"""
691

692
    @abstractmethod
1✔
693
    def remove_image(self, image: str, force: bool = True) -> None:
1✔
694
        """Removes an image with given name
695

696
        :param image: Image name and tag
697
        :param force: Force removal
698
        """
699

700
    @abstractmethod
1✔
701
    def list_containers(self, filter: Union[list[str], str, None] = None, all=True) -> list[dict]:
1✔
702
        """List all containers matching the given filters
703

704
        :return: A list of dicts with keys id, image, name, labels, status
705
        """
706

707
    def get_running_container_names(self) -> list[str]:
1✔
708
        """Returns a list of the names of all running containers"""
709
        return self.__get_container_names(return_all=False)
1✔
710

711
    def get_all_container_names(self) -> list[str]:
1✔
712
        """Returns a list of the names of all containers including stopped ones"""
713
        return self.__get_container_names(return_all=True)
1✔
714

715
    def is_container_running(self, container_name: str) -> bool:
1✔
716
        """Checks whether a container with a given name is currently running"""
717
        return container_name in self.get_running_container_names()
1✔
718

719
    def create_file_in_container(
1✔
720
        self,
721
        container_name,
722
        file_contents: bytes,
723
        container_path: str,
724
        chmod_mode: Optional[int] = None,
725
    ) -> None:
726
        """
727
        Create a file in container with the provided content. Provide the 'chmod_mode' argument if you want the file to have specific permissions.
728
        """
729
        with tempfile.NamedTemporaryFile() as tmp:
1✔
730
            tmp.write(file_contents)
1✔
731
            tmp.flush()
1✔
732
            if chmod_mode is not None:
1✔
UNCOV
733
                chmod_r(tmp.name, chmod_mode)
×
734
            self.copy_into_container(
1✔
735
                container_name=container_name,
736
                local_path=tmp.name,
737
                container_path=container_path,
738
            )
739

740
    @abstractmethod
1✔
741
    def copy_into_container(
1✔
742
        self, container_name: str, local_path: str, container_path: str
743
    ) -> None:
744
        """Copy contents of the given local path into the container"""
745

746
    @abstractmethod
1✔
747
    def copy_from_container(
1✔
748
        self, container_name: str, local_path: str, container_path: str
749
    ) -> None:
750
        """Copy contents of the given container to the host"""
751

752
    @abstractmethod
1✔
753
    def pull_image(
1✔
754
        self,
755
        docker_image: str,
756
        platform: Optional[DockerPlatform] = None,
757
        log_handler: Optional[Callable[[str], None]] = None,
758
    ) -> None:
759
        """
760
        Pulls an image with a given name from a Docker registry
761

762
        :log_handler: Optional parameter that can be used to process the logs. Logs will be streamed if possible, but this is not guaranteed.
763
        """
764

765
    @abstractmethod
1✔
766
    def push_image(self, docker_image: str) -> None:
1✔
767
        """Pushes an image with a given name to a Docker registry"""
768

769
    @abstractmethod
1✔
770
    def build_image(
1✔
771
        self,
772
        dockerfile_path: str,
773
        image_name: str,
774
        context_path: str = None,
775
        platform: Optional[DockerPlatform] = None,
776
    ) -> str:
777
        """Builds an image from the given Dockerfile
778

779
        :param dockerfile_path: Path to Dockerfile, or a directory that contains a Dockerfile
780
        :param image_name: Name of the image to be built
781
        :param context_path: Path for build context (defaults to dirname of Dockerfile)
782
        :param platform: Target platform for build (defaults to platform of Docker host)
783
        :return: Build logs as a string.
784
        """
785

786
    @abstractmethod
1✔
787
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
788
        """Tags an image with a new name
789

790
        :param source_ref: Name or ID of the image to be tagged
791
        :param target_name: New name (tag) of the tagged image
792
        """
793

794
    @abstractmethod
1✔
795
    def get_docker_image_names(
1✔
796
        self,
797
        strip_latest: bool = True,
798
        include_tags: bool = True,
799
        strip_wellknown_repo_prefixes: bool = True,
800
    ) -> list[str]:
801
        """
802
        Get all names of docker images available to the container engine
803
        :param strip_latest: return images both with and without :latest tag
804
        :param include_tags: include tags of the images in the names
805
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
806
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
807
        :return: List of image names
808
        """
809

810
    @abstractmethod
1✔
811
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
812
        """Get all logs of a given container"""
813

814
    @abstractmethod
1✔
815
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
816
        """Returns a blocking generator you can iterate over to retrieve log output as it happens."""
817

818
    @abstractmethod
1✔
819
    def inspect_container(self, container_name_or_id: str) -> dict[str, Union[dict, str]]:
1✔
820
        """Get detailed attributes of a container.
821

822
        :return: Dict containing docker attributes as returned by the daemon
823
        """
824

825
    def inspect_container_volumes(self, container_name_or_id) -> list[VolumeInfo]:
1✔
826
        """Return information about the volumes mounted into the given container.
827

828
        :param container_name_or_id: the container name or id
829
        :return: a list of volumes
830
        """
831
        volumes = []
1✔
832
        for doc in self.inspect_container(container_name_or_id)["Mounts"]:
1✔
833
            volumes.append(VolumeInfo(**{k.lower(): v for k, v in doc.items()}))
1✔
834

835
        return volumes
1✔
836

837
    @abstractmethod
1✔
838
    def inspect_image(
1✔
839
        self, image_name: str, pull: bool = True, strip_wellknown_repo_prefixes: bool = True
840
    ) -> dict[str, Union[dict, list, str]]:
841
        """Get detailed attributes of an image.
842

843
        :param image_name: Image name to inspect
844
        :param pull: Whether to pull image if not existent
845
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
846
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
847
        :return: Dict containing docker attributes as returned by the daemon
848
        """
849

850
    @abstractmethod
1✔
851
    def create_network(self, network_name: str) -> str:
1✔
852
        """
853
        Creates a network with the given name
854
        :param network_name: Name of the network
855
        :return Network ID
856
        """
857

858
    @abstractmethod
1✔
859
    def delete_network(self, network_name: str) -> None:
1✔
860
        """
861
        Delete a network with the given name
862
        :param network_name: Name of the network
863
        """
864

865
    @abstractmethod
1✔
866
    def inspect_network(self, network_name: str) -> dict[str, Union[dict, str]]:
1✔
867
        """Get detailed attributes of an network.
868

869
        :return: Dict containing docker attributes as returned by the daemon
870
        """
871

872
    @abstractmethod
1✔
873
    def connect_container_to_network(
1✔
874
        self,
875
        network_name: str,
876
        container_name_or_id: str,
877
        aliases: Optional[list] = None,
878
        link_local_ips: list[str] = None,
879
    ) -> None:
880
        """
881
        Connects a container to a given network
882
        :param network_name: Network to connect the container to
883
        :param container_name_or_id: Container to connect to the network
884
        :param aliases: List of dns names the container should be available under in the network
885
        :param link_local_ips: List of link-local (IPv4 or IPv6) addresses
886
        """
887

888
    @abstractmethod
1✔
889
    def disconnect_container_from_network(
1✔
890
        self, network_name: str, container_name_or_id: str
891
    ) -> None:
892
        """
893
        Disconnects a container from a given network
894
        :param network_name: Network to disconnect the container from
895
        :param container_name_or_id: Container to disconnect from the network
896
        """
897

898
    def get_container_name(self, container_id: str) -> str:
1✔
899
        """Get the name of a container by a given identifier"""
900
        return self.inspect_container(container_id)["Name"].lstrip("/")
1✔
901

902
    def get_container_id(self, container_name: str) -> str:
1✔
903
        """Get the id of a container by a given name"""
904
        return self.inspect_container(container_name)["Id"]
1✔
905

906
    @abstractmethod
1✔
907
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
908
        """Get the IP address of a given container
909

910
        If container has multiple networks, it will return the IP of the first
911
        """
912

913
    def get_image_cmd(self, docker_image: str, pull: bool = True) -> list[str]:
1✔
914
        """Get the command for the given image
915
        :param docker_image: Docker image to inspect
916
        :param pull: Whether to pull if image is not present
917
        :return: Image command in its array form
918
        """
919
        cmd_list = self.inspect_image(docker_image, pull)["Config"]["Cmd"] or []
1✔
920
        return cmd_list
1✔
921

922
    def get_image_entrypoint(self, docker_image: str, pull: bool = True) -> str:
1✔
923
        """Get the entry point for the given image
924
        :param docker_image: Docker image to inspect
925
        :param pull: Whether to pull if image is not present
926
        :return: Image entrypoint
927
        """
928
        LOG.debug("Getting the entrypoint for image: %s", docker_image)
1✔
929
        entrypoint_list = self.inspect_image(docker_image, pull)["Config"].get("Entrypoint") or []
1✔
930
        return shlex.join(entrypoint_list)
1✔
931

932
    @abstractmethod
1✔
933
    def has_docker(self) -> bool:
1✔
934
        """Check if system has docker available"""
935

936
    @abstractmethod
1✔
937
    def commit(
1✔
938
        self,
939
        container_name_or_id: str,
940
        image_name: str,
941
        image_tag: str,
942
    ):
943
        """Create an image from a running container.
944

945
        :param container_name_or_id: Source container
946
        :param image_name: Destination image name
947
        :param image_tag: Destination image tag
948
        """
949

950
    def create_container_from_config(self, container_config: ContainerConfiguration) -> str:
1✔
951
        """
952
        Similar to create_container, but allows passing the whole ContainerConfiguration
953
        :param container_config: ContainerConfiguration how to start the container
954
        :return: Container ID
955
        """
956
        return self.create_container(
1✔
957
            image_name=container_config.image_name,
958
            name=container_config.name,
959
            entrypoint=container_config.entrypoint,
960
            remove=container_config.remove,
961
            interactive=container_config.interactive,
962
            tty=container_config.tty,
963
            command=container_config.command,
964
            volumes=container_config.volumes,
965
            ports=container_config.ports,
966
            exposed_ports=container_config.exposed_ports,
967
            env_vars=container_config.env_vars,
968
            user=container_config.user,
969
            cap_add=container_config.cap_add,
970
            cap_drop=container_config.cap_drop,
971
            security_opt=container_config.security_opt,
972
            network=container_config.network,
973
            dns=container_config.dns,
974
            additional_flags=container_config.additional_flags,
975
            workdir=container_config.workdir,
976
            privileged=container_config.privileged,
977
            platform=container_config.platform,
978
            labels=container_config.labels,
979
            ulimits=container_config.ulimits,
980
            init=container_config.init,
981
            log_config=container_config.log_config,
982
        )
983

984
    @abstractmethod
1✔
985
    def create_container(
1✔
986
        self,
987
        image_name: str,
988
        *,
989
        name: Optional[str] = None,
990
        entrypoint: Optional[Union[list[str], str]] = None,
991
        remove: bool = False,
992
        interactive: bool = False,
993
        tty: bool = False,
994
        detach: bool = False,
995
        command: Optional[Union[list[str], str]] = None,
996
        volumes: Optional[Union[VolumeMappings, list[SimpleVolumeBind]]] = None,
997
        ports: Optional[PortMappings] = None,
998
        exposed_ports: Optional[list[str]] = None,
999
        env_vars: Optional[dict[str, str]] = None,
1000
        user: Optional[str] = None,
1001
        cap_add: Optional[list[str]] = None,
1002
        cap_drop: Optional[list[str]] = None,
1003
        security_opt: Optional[list[str]] = None,
1004
        network: Optional[str] = None,
1005
        dns: Optional[Union[str, list[str]]] = None,
1006
        additional_flags: Optional[str] = None,
1007
        workdir: Optional[str] = None,
1008
        privileged: Optional[bool] = None,
1009
        labels: Optional[dict[str, str]] = None,
1010
        platform: Optional[DockerPlatform] = None,
1011
        ulimits: Optional[list[Ulimit]] = None,
1012
        init: Optional[bool] = None,
1013
        log_config: Optional[LogConfig] = None,
1014
    ) -> str:
1015
        """Creates a container with the given image
1016

1017
        :return: Container ID
1018
        """
1019

1020
    @abstractmethod
1✔
1021
    def run_container(
1✔
1022
        self,
1023
        image_name: str,
1024
        stdin: bytes = None,
1025
        *,
1026
        name: Optional[str] = None,
1027
        entrypoint: Optional[str] = None,
1028
        remove: bool = False,
1029
        interactive: bool = False,
1030
        tty: bool = False,
1031
        detach: bool = False,
1032
        command: Optional[Union[list[str], str]] = None,
1033
        volumes: Optional[Union[VolumeMappings, list[SimpleVolumeBind]]] = None,
1034
        ports: Optional[PortMappings] = None,
1035
        exposed_ports: Optional[list[str]] = None,
1036
        env_vars: Optional[dict[str, str]] = None,
1037
        user: Optional[str] = None,
1038
        cap_add: Optional[list[str]] = None,
1039
        cap_drop: Optional[list[str]] = None,
1040
        security_opt: Optional[list[str]] = None,
1041
        network: Optional[str] = None,
1042
        dns: Optional[str] = None,
1043
        additional_flags: Optional[str] = None,
1044
        workdir: Optional[str] = None,
1045
        labels: Optional[dict[str, str]] = None,
1046
        platform: Optional[DockerPlatform] = None,
1047
        privileged: Optional[bool] = None,
1048
        ulimits: Optional[list[Ulimit]] = None,
1049
        init: Optional[bool] = None,
1050
        log_config: Optional[LogConfig] = None,
1051
    ) -> tuple[bytes, bytes]:
1052
        """Creates and runs a given docker container
1053

1054
        :return: A tuple (stdout, stderr)
1055
        """
1056

1057
    def run_container_from_config(
1✔
1058
        self, container_config: ContainerConfiguration
1059
    ) -> tuple[bytes, bytes]:
1060
        """Like ``run_container`` but uses the parameters from the configuration."""
1061

UNCOV
1062
        return self.run_container(
×
1063
            image_name=container_config.image_name,
1064
            stdin=container_config.stdin,
1065
            name=container_config.name,
1066
            entrypoint=container_config.entrypoint,
1067
            remove=container_config.remove,
1068
            interactive=container_config.interactive,
1069
            tty=container_config.tty,
1070
            detach=container_config.detach,
1071
            command=container_config.command,
1072
            volumes=container_config.volumes,
1073
            ports=container_config.ports,
1074
            exposed_ports=container_config.exposed_ports,
1075
            env_vars=container_config.env_vars,
1076
            user=container_config.user,
1077
            cap_add=container_config.cap_add,
1078
            cap_drop=container_config.cap_drop,
1079
            security_opt=container_config.security_opt,
1080
            network=container_config.network,
1081
            dns=container_config.dns,
1082
            additional_flags=container_config.additional_flags,
1083
            workdir=container_config.workdir,
1084
            platform=container_config.platform,
1085
            privileged=container_config.privileged,
1086
            ulimits=container_config.ulimits,
1087
            init=container_config.init,
1088
            log_config=container_config.log_config,
1089
        )
1090

1091
    @abstractmethod
1✔
1092
    def exec_in_container(
1✔
1093
        self,
1094
        container_name_or_id: str,
1095
        command: Union[list[str], str],
1096
        interactive: bool = False,
1097
        detach: bool = False,
1098
        env_vars: Optional[dict[str, Optional[str]]] = None,
1099
        stdin: Optional[bytes] = None,
1100
        user: Optional[str] = None,
1101
        workdir: Optional[str] = None,
1102
    ) -> tuple[bytes, bytes]:
1103
        """Execute a given command in a container
1104

1105
        :return: A tuple (stdout, stderr)
1106
        """
1107

1108
    @abstractmethod
1✔
1109
    def start_container(
1✔
1110
        self,
1111
        container_name_or_id: str,
1112
        stdin: bytes = None,
1113
        interactive: bool = False,
1114
        attach: bool = False,
1115
        flags: Optional[str] = None,
1116
    ) -> tuple[bytes, bytes]:
1117
        """Start a given, already created container
1118

1119
        :return: A tuple (stdout, stderr) if attach or interactive is set, otherwise a tuple (b"container_name_or_id", b"")
1120
        """
1121

1122
    @abstractmethod
1✔
1123
    def attach_to_container(self, container_name_or_id: str):
1✔
1124
        """
1125
        Attach local standard input, output, and error streams to a running container
1126
        """
1127

1128
    @abstractmethod
1✔
1129
    def login(self, username: str, password: str, registry: Optional[str] = None) -> None:
1✔
1130
        """
1131
        Login into an OCI registry
1132

1133
        :param username: Username for the registry
1134
        :param password: Password / token for the registry
1135
        :param registry: Registry url
1136
        """
1137

1138
    def __get_container_names(self, return_all: bool) -> list[str]:
1✔
1139
        result = self.list_containers(all=return_all)
1✔
1140
        result = [container["name"] for container in result]
1✔
1141
        return result
1✔
1142

1143

1144
class Util:
1✔
1145
    MAX_ENV_ARGS_LENGTH = 20000
1✔
1146

1147
    @staticmethod
1✔
1148
    def format_env_vars(key: str, value: Optional[str]):
1✔
1149
        if value is None:
1✔
UNCOV
1150
            return key
×
1151
        return f"{key}={value}"
1✔
1152

1153
    @classmethod
1✔
1154
    def create_env_vars_file_flag(cls, env_vars: dict) -> tuple[list[str], Optional[str]]:
1✔
1155
        if not env_vars:
1✔
1156
            return [], None
×
1157
        result = []
1✔
1158
        env_vars = dict(env_vars)
1✔
1159
        env_file = None
1✔
1160
        if len(str(env_vars)) > cls.MAX_ENV_ARGS_LENGTH:
1✔
1161
            # default ARG_MAX=131072 in Docker - let's create an env var file if the string becomes too long...
1162
            env_file = cls.mountable_tmp_file()
×
1163
            env_content = ""
×
1164
            for name, value in dict(env_vars).items():
×
1165
                if len(value) > cls.MAX_ENV_ARGS_LENGTH:
×
1166
                    # each line in the env file has a max size as well (error "bufio.Scanner: token too long")
UNCOV
1167
                    continue
×
UNCOV
1168
                env_vars.pop(name)
×
UNCOV
1169
                value = value.replace("\n", "\\")
×
UNCOV
1170
                env_content += f"{cls.format_env_vars(name, value)}\n"
×
UNCOV
1171
            save_file(env_file, env_content)
×
UNCOV
1172
            result += ["--env-file", env_file]
×
1173

1174
        env_vars_res = [
1✔
1175
            item for k, v in env_vars.items() for item in ["-e", cls.format_env_vars(k, v)]
1176
        ]
1177
        result += env_vars_res
1✔
1178
        return result, env_file
1✔
1179

1180
    @staticmethod
1✔
1181
    def rm_env_vars_file(env_vars_file) -> None:
1✔
1182
        if env_vars_file:
1✔
UNCOV
1183
            return rm_rf(env_vars_file)
×
1184

1185
    @staticmethod
1✔
1186
    def mountable_tmp_file():
1✔
UNCOV
1187
        f = os.path.join(config.dirs.mounted_tmp, short_uid())
×
UNCOV
1188
        TMP_FILES.append(f)
×
UNCOV
1189
        return f
×
1190

1191
    @staticmethod
1✔
1192
    def append_without_latest(image_names: list[str]):
1✔
1193
        suffix = ":latest"
1✔
1194
        for image in list(image_names):
1✔
1195
            if image.endswith(suffix):
1✔
1196
                image_names.append(image[: -len(suffix)])
1✔
1197

1198
    @staticmethod
1✔
1199
    def strip_wellknown_repo_prefixes(image_names: list[str]) -> list[str]:
1✔
1200
        """
1201
        Remove well-known repo prefixes like `localhost/` or `docker.io/library/` from the list of given
1202
        image names. This is mostly to ensure compatibility of our Docker client with Podman API responses.
1203
        :return: a copy of the list of image names, with well-known repo prefixes removed
1204
        """
1205
        result = []
1✔
1206
        for image in image_names:
1✔
1207
            for prefix in WELL_KNOWN_IMAGE_REPO_PREFIXES:
1✔
1208
                if image.startswith(prefix):
1✔
UNCOV
1209
                    image = image.removeprefix(prefix)
×
1210
                    # strip only one of the matching prefixes (avoid multi-stripping)
UNCOV
1211
                    break
×
1212
            result.append(image)
1✔
1213
        return result
1✔
1214

1215
    @staticmethod
1✔
1216
    def tar_path(path: str, target_path: str, is_dir: bool):
1✔
1217
        f = tempfile.NamedTemporaryFile()
1✔
1218
        with tarfile.open(mode="w", fileobj=f) as t:
1✔
1219
            abs_path = os.path.abspath(path)
1✔
1220
            arcname = (
1✔
1221
                os.path.basename(path)
1222
                if is_dir
1223
                else (os.path.basename(target_path) or os.path.basename(path))
1224
            )
1225
            t.add(abs_path, arcname=arcname)
1✔
1226

1227
        f.seek(0)
1✔
1228
        return f
1✔
1229

1230
    @staticmethod
1✔
1231
    def untar_to_path(tardata, target_path):
1✔
1232
        target_path = Path(target_path)
1✔
1233
        with tarfile.open(mode="r", fileobj=io.BytesIO(b"".join(b for b in tardata))) as t:
1✔
1234
            if target_path.is_dir():
1✔
1235
                t.extractall(path=target_path)
1✔
1236
            else:
1237
                member = t.next()
1✔
1238
                if member:
1✔
1239
                    member.name = target_path.name
1✔
1240
                    t.extract(member, target_path.parent)
1✔
1241
                else:
UNCOV
1242
                    LOG.debug("File to copy empty, ignoring...")
×
1243

1244
    @staticmethod
1✔
1245
    def _read_docker_cli_env_file(env_file: str) -> dict[str, str]:
1✔
1246
        """
1247
        Read an environment file in docker CLI format, specified here:
1248
        https://docs.docker.com/reference/cli/docker/container/run/#env
1249
        :param env_file: Path to the environment file
1250
        :return: Read environment variables
1251
        """
1252
        env_vars = {}
1✔
1253
        try:
1✔
1254
            with open(env_file) as f:
1✔
1255
                env_file_lines = f.readlines()
1✔
1256
        except FileNotFoundError as e:
×
1257
            LOG.error(
×
1258
                "Specified env file '%s' not found. Please make sure the file is properly mounted into the LocalStack container. Error: %s",
1259
                env_file,
1260
                e,
1261
            )
1262
            raise
×
UNCOV
1263
        except OSError as e:
×
UNCOV
1264
            LOG.error(
×
1265
                "Could not read env file '%s'. Please make sure the LocalStack container has the permissions to read it. Error: %s",
1266
                env_file,
1267
                e,
1268
            )
UNCOV
1269
            raise
×
1270
        for idx, line in enumerate(env_file_lines):
1✔
1271
            line = line.strip()
1✔
1272
            if not line or line.startswith("#"):
1✔
1273
                # skip comments or empty lines
1274
                continue
1✔
1275
            lhs, separator, rhs = line.partition("=")
1✔
1276
            if rhs or separator:
1✔
1277
                env_vars[lhs] = rhs
1✔
1278
            else:
1279
                # No "=" in the line, only the name => lookup in local env
1280
                if env_value := os.environ.get(lhs):
1✔
1281
                    env_vars[lhs] = env_value
1✔
1282
        return env_vars
1✔
1283

1284
    @staticmethod
1✔
1285
    def parse_additional_flags(
1✔
1286
        additional_flags: str,
1287
        env_vars: Optional[dict[str, str]] = None,
1288
        labels: Optional[dict[str, str]] = None,
1289
        volumes: Optional[list[SimpleVolumeBind]] = None,
1290
        network: Optional[str] = None,
1291
        platform: Optional[DockerPlatform] = None,
1292
        ports: Optional[PortMappings] = None,
1293
        privileged: Optional[bool] = None,
1294
        user: Optional[str] = None,
1295
        ulimits: Optional[list[Ulimit]] = None,
1296
        dns: Optional[Union[str, list[str]]] = None,
1297
    ) -> DockerRunFlags:
1298
        """Parses additional CLI-formatted Docker flags, which could overwrite provided defaults.
1299
        :param additional_flags: String which contains the flag definitions inspired by the Docker CLI reference:
1300
                                 https://docs.docker.com/engine/reference/commandline/run/
1301
        :param env_vars: Dict with env vars. Will be modified in place.
1302
        :param labels: Dict with labels. Will be modified in place.
1303
        :param volumes: List of mount tuples (host_path, container_path). Will be modified in place.
1304
        :param network: Existing network name (optional). Warning will be printed if network is overwritten in flags.
1305
        :param platform: Platform to execute container. Warning will be printed if platform is overwritten in flags.
1306
        :param ports: PortMapping object. Will be modified in place.
1307
        :param privileged: Run the container in privileged mode. Warning will be printed if overwritten in flags.
1308
        :param ulimits: ulimit options in the format <type>=<soft limit>[:<hard limit>]
1309
        :param user: User to run first process. Warning will be printed if user is overwritten in flags.
1310
        :param dns: List of DNS servers to configure the container with.
1311
        :return: A DockerRunFlags object that will return new objects if respective parameters were None and
1312
                additional flags contained a flag for that object or the same which are passed otherwise.
1313
        """
1314
        # Argparse refactoring opportunity: custom argparse actions can be used to modularize parsing (e.g., key=value)
1315
        # https://docs.python.org/3/library/argparse.html#action
1316

1317
        # Configure parser
1318
        parser = NoExitArgumentParser(description="Docker run flags parser")
1✔
1319
        parser.add_argument(
1✔
1320
            "--add-host",
1321
            help="Add a custom host-to-IP mapping (host:ip)",
1322
            dest="add_hosts",
1323
            action="append",
1324
        )
1325
        parser.add_argument(
1✔
1326
            "--env", "-e", help="Set environment variables", dest="envs", action="append"
1327
        )
1328
        parser.add_argument(
1✔
1329
            "--env-file",
1330
            help="Set environment variables via a file",
1331
            dest="env_files",
1332
            action="append",
1333
        )
1334
        parser.add_argument(
1✔
1335
            "--compose-env-file",
1336
            help="Set environment variables via a file, with a docker-compose supported feature set.",
1337
            dest="compose_env_files",
1338
            action="append",
1339
        )
1340
        parser.add_argument(
1✔
1341
            "--label", "-l", help="Add container meta data", dest="labels", action="append"
1342
        )
1343
        parser.add_argument("--network", help="Connect a container to a network")
1✔
1344
        parser.add_argument(
1✔
1345
            "--platform",
1346
            type=DockerPlatform,
1347
            help="Docker platform (e.g., linux/amd64 or linux/arm64)",
1348
        )
1349
        parser.add_argument(
1✔
1350
            "--privileged",
1351
            help="Give extended privileges to this container",
1352
            action="store_true",
1353
        )
1354
        parser.add_argument(
1✔
1355
            "--publish",
1356
            "-p",
1357
            help="Publish container port(s) to the host",
1358
            dest="publish_ports",
1359
            action="append",
1360
        )
1361
        parser.add_argument(
1✔
1362
            "--ulimit", help="Container ulimit settings", dest="ulimits", action="append"
1363
        )
1364
        parser.add_argument("--user", "-u", help="Username or UID to execute first process")
1✔
1365
        parser.add_argument(
1✔
1366
            "--volume", "-v", help="Bind mount a volume", dest="volumes", action="append"
1367
        )
1368
        parser.add_argument("--dns", help="Set custom DNS servers", dest="dns", action="append")
1✔
1369

1370
        # Parse
1371
        flags = shlex.split(additional_flags)
1✔
1372
        args = parser.parse_args(flags)
1✔
1373

1374
        # Post-process parsed flags
1375
        extra_hosts = None
1✔
1376
        if args.add_hosts:
1✔
1377
            for add_host in args.add_hosts:
1✔
1378
                extra_hosts = extra_hosts if extra_hosts is not None else {}
1✔
1379
                hosts_split = add_host.split(":")
1✔
1380
                extra_hosts[hosts_split[0]] = hosts_split[1]
1✔
1381

1382
        # set env file values before env values, as the latter override the earlier
1383
        if args.env_files:
1✔
1384
            env_vars = env_vars if env_vars is not None else {}
1✔
1385
            for env_file in args.env_files:
1✔
1386
                env_vars.update(Util._read_docker_cli_env_file(env_file))
1✔
1387

1388
        if args.compose_env_files:
1✔
1389
            env_vars = env_vars if env_vars is not None else {}
1✔
1390
            for env_file in args.compose_env_files:
1✔
1391
                env_vars.update(dotenv.dotenv_values(env_file))
1✔
1392

1393
        if args.envs:
1✔
1394
            env_vars = env_vars if env_vars is not None else {}
1✔
1395
            for env in args.envs:
1✔
1396
                lhs, _, rhs = env.partition("=")
1✔
1397
                env_vars[lhs] = rhs
1✔
1398

1399
        if args.labels:
1✔
1400
            labels = labels if labels is not None else {}
1✔
1401
            for label in args.labels:
1✔
1402
                key, _, value = label.partition("=")
1✔
1403
                # Only consider non-empty labels
1404
                if key:
1✔
1405
                    labels[key] = value
1✔
1406

1407
        if args.network:
1✔
1408
            LOG.warning(
1✔
1409
                "Overwriting Docker container network '%s' with new value '%s'",
1410
                network,
1411
                args.network,
1412
            )
1413
            network = args.network
1✔
1414

1415
        if args.platform:
1✔
1416
            LOG.warning(
1✔
1417
                "Overwriting Docker platform '%s' with new value '%s'",
1418
                platform,
1419
                args.platform,
1420
            )
1421
            platform = args.platform
1✔
1422

1423
        if args.privileged:
1✔
1424
            LOG.warning(
1✔
1425
                "Overwriting Docker container privileged flag %s with new value %s",
1426
                privileged,
1427
                args.privileged,
1428
            )
1429
            privileged = args.privileged
1✔
1430

1431
        if args.publish_ports:
1✔
1432
            for port_mapping in args.publish_ports:
1✔
1433
                port_split = port_mapping.split(":")
1✔
1434
                protocol = "tcp"
1✔
1435
                if len(port_split) == 2:
1✔
1436
                    host_port, container_port = port_split
1✔
1437
                elif len(port_split) == 3:
1✔
1438
                    LOG.warning(
1✔
1439
                        "Host part of port mappings are ignored currently in additional flags"
1440
                    )
1441
                    _, host_port, container_port = port_split
1✔
1442
                else:
1443
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
1✔
1444
                host_port_split = host_port.split("-")
1✔
1445
                if len(host_port_split) == 2:
1✔
1446
                    host_port = [int(host_port_split[0]), int(host_port_split[1])]
1✔
1447
                elif len(host_port_split) == 1:
1✔
1448
                    host_port = int(host_port)
1✔
1449
                else:
UNCOV
1450
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
×
1451
                if "/" in container_port:
1✔
1452
                    container_port, protocol = container_port.split("/")
1✔
1453
                ports = ports if ports is not None else PortMappings()
1✔
1454
                ports.add(host_port, int(container_port), protocol)
1✔
1455

1456
        if args.ulimits:
1✔
1457
            ulimits = ulimits if ulimits is not None else []
1✔
1458
            ulimits_dict = {ul.name: ul for ul in ulimits}
1✔
1459
            for ulimit in args.ulimits:
1✔
1460
                name, _, rhs = ulimit.partition("=")
1✔
1461
                soft, _, hard = rhs.partition(":")
1✔
1462
                hard_limit = int(hard) if hard else int(soft)
1✔
1463
                new_ulimit = Ulimit(name=name, soft_limit=int(soft), hard_limit=hard_limit)
1✔
1464
                if ulimits_dict.get(name):
1✔
1465
                    LOG.warning("Overwriting Docker ulimit %s", new_ulimit)
1✔
1466
                ulimits_dict[name] = new_ulimit
1✔
1467
            ulimits = list(ulimits_dict.values())
1✔
1468

1469
        if args.user:
1✔
1470
            LOG.warning(
1✔
1471
                "Overwriting Docker user '%s' with new value '%s'",
1472
                user,
1473
                args.user,
1474
            )
1475
            user = args.user
1✔
1476

1477
        if args.volumes:
1✔
1478
            volumes = volumes if volumes is not None else []
1✔
1479
            for volume in args.volumes:
1✔
1480
                match = re.match(
1✔
1481
                    r"(?P<host>[\w\s\\\/:\-.]+?):(?P<container>[\w\s\/\-.]+)(?::(?P<arg>ro|rw|z|Z))?",
1482
                    volume,
1483
                )
1484
                if not match:
1✔
UNCOV
1485
                    LOG.warning("Unable to parse volume mount Docker flags: %s", volume)
×
UNCOV
1486
                    continue
×
1487
                host_path = match.group("host")
1✔
1488
                container_path = match.group("container")
1✔
1489
                rw_args = match.group("arg")
1✔
1490
                if rw_args:
1✔
1491
                    LOG.info("Volume options like :ro or :rw are currently ignored.")
1✔
1492
                volumes.append((host_path, container_path))
1✔
1493

1494
        dns = ensure_list(dns or [])
1✔
1495
        if args.dns:
1✔
1496
            LOG.info(
1✔
1497
                "Extending Docker container DNS servers %s with additional values %s", dns, args.dns
1498
            )
1499
            dns.extend(args.dns)
1✔
1500

1501
        return DockerRunFlags(
1✔
1502
            env_vars=env_vars,
1503
            extra_hosts=extra_hosts,
1504
            labels=labels,
1505
            volumes=volumes,
1506
            ports=ports,
1507
            network=network,
1508
            platform=platform,
1509
            privileged=privileged,
1510
            ulimits=ulimits,
1511
            user=user,
1512
            dns=dns,
1513
        )
1514

1515
    @staticmethod
1✔
1516
    def convert_mount_list_to_dict(
1✔
1517
        volumes: Union[list[SimpleVolumeBind], VolumeMappings],
1518
    ) -> dict[str, dict[str, str]]:
1519
        """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""
1520

1521
        def _map_to_dict(paths: SimpleVolumeBind | BindMount | VolumeDirMount):
1✔
1522
            if isinstance(paths, (BindMount, VolumeDirMount)):
1✔
1523
                return paths.to_docker_sdk_parameters()
1✔
1524
            else:
UNCOV
1525
                return str(paths[0]), {"bind": paths[1], "mode": "rw"}
×
1526

1527
        return dict(
1✔
1528
            map(
1529
                _map_to_dict,
1530
                volumes,
1531
            )
1532
        )
1533

1534
    @staticmethod
1✔
1535
    def resolve_dockerfile_path(dockerfile_path: str) -> str:
1✔
1536
        """If the given path is a directory that contains a Dockerfile, then return the file path to it."""
1537
        rel_path = os.path.join(dockerfile_path, "Dockerfile")
1✔
1538
        if os.path.isdir(dockerfile_path) and os.path.exists(rel_path):
1✔
1539
            return rel_path
1✔
1540
        return dockerfile_path
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc