• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / d166ec6d-e321-44a4-963c-9d9dd94aa89d

11 Mar 2025 05:46PM UTC coverage: 86.936% (+0.04%) from 86.901%
d166ec6d-e321-44a4-963c-9d9dd94aa89d

push

circleci

web-flow
chore: improve test snapshot (#12367)

62152 of 71492 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.53
/localstack-core/localstack/utils/container_utils/container_client.py
1
import dataclasses
1✔
2
import io
1✔
3
import ipaddress
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import shlex
1✔
8
import sys
1✔
9
import tarfile
1✔
10
import tempfile
1✔
11
from abc import ABCMeta, abstractmethod
1✔
12
from enum import Enum, unique
1✔
13
from pathlib import Path
1✔
14
from typing import (
1✔
15
    Dict,
16
    List,
17
    Literal,
18
    NamedTuple,
19
    Optional,
20
    Protocol,
21
    Tuple,
22
    TypedDict,
23
    Union,
24
    get_args,
25
)
26

27
import dotenv
1✔
28

29
from localstack import config
1✔
30
from localstack.utils.collections import HashableList, ensure_list
1✔
31
from localstack.utils.files import TMP_FILES, chmod_r, rm_rf, save_file
1✔
32
from localstack.utils.no_exit_argument_parser import NoExitArgumentParser
1✔
33
from localstack.utils.strings import short_uid
1✔
34

35
LOG = logging.getLogger(__name__)
1✔
36

37
# list of well-known image repo prefixes that should be stripped off to canonicalize image names
38
WELL_KNOWN_IMAGE_REPO_PREFIXES = ("localhost/", "docker.io/library/")
1✔
39

40

41
@unique
1✔
42
class DockerContainerStatus(Enum):
1✔
43
    DOWN = -1
1✔
44
    NON_EXISTENT = 0
1✔
45
    UP = 1
1✔
46
    PAUSED = 2
1✔
47

48

49
class DockerContainerStats(TypedDict):
1✔
50
    """Container usage statistics"""
51

52
    Container: str
1✔
53
    ID: str
1✔
54
    Name: str
1✔
55
    BlockIO: tuple[int, int]
1✔
56
    CPUPerc: float
1✔
57
    MemPerc: float
1✔
58
    MemUsage: tuple[int, int]
1✔
59
    NetIO: tuple[int, int]
1✔
60
    PIDs: int
1✔
61
    SDKStats: Optional[dict]
1✔
62

63

64
class ContainerException(Exception):
1✔
65
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
1✔
66
        self.message = message or "Error during the communication with the docker daemon"
1✔
67
        self.stdout = stdout
1✔
68
        self.stderr = stderr
1✔
69

70

71
class NoSuchObject(ContainerException):
1✔
72
    def __init__(self, object_id: str, message=None, stdout=None, stderr=None) -> None:
1✔
73
        message = message or f"Docker object {object_id} not found"
×
74
        super().__init__(message, stdout, stderr)
×
75
        self.object_id = object_id
×
76

77

78
class NoSuchContainer(ContainerException):
1✔
79
    def __init__(self, container_name_or_id: str, message=None, stdout=None, stderr=None) -> None:
1✔
80
        message = message or f"Docker container {container_name_or_id} not found"
1✔
81
        super().__init__(message, stdout, stderr)
1✔
82
        self.container_name_or_id = container_name_or_id
1✔
83

84

85
class NoSuchImage(ContainerException):
1✔
86
    def __init__(self, image_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
87
        message = message or f"Docker image {image_name} not found"
1✔
88
        super().__init__(message, stdout, stderr)
1✔
89
        self.image_name = image_name
1✔
90

91

92
class NoSuchNetwork(ContainerException):
1✔
93
    def __init__(self, network_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
94
        message = message or f"Docker network {network_name} not found"
1✔
95
        super().__init__(message, stdout, stderr)
1✔
96
        self.network_name = network_name
1✔
97

98

99
class RegistryConnectionError(ContainerException):
1✔
100
    def __init__(self, details: str, message=None, stdout=None, stderr=None) -> None:
1✔
101
        message = message or f"Connection error: {details}"
1✔
102
        super().__init__(message, stdout, stderr)
1✔
103
        self.details = details
1✔
104

105

106
class DockerNotAvailable(ContainerException):
1✔
107
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
1✔
108
        message = message or "Docker not available"
1✔
109
        super().__init__(message, stdout, stderr)
1✔
110

111

112
class AccessDenied(ContainerException):
1✔
113
    def __init__(self, object_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
114
        message = message or f"Access denied to {object_name}"
1✔
115
        super().__init__(message, stdout, stderr)
1✔
116
        self.object_name = object_name
1✔
117

118

119
class CancellableStream(Protocol):
1✔
120
    """Describes a generator that can be closed. Borrowed from ``docker.types.daemon``."""
121

122
    def __iter__(self):
1✔
123
        raise NotImplementedError
124

125
    def __next__(self):
1✔
126
        raise NotImplementedError
127

128
    def close(self):
1✔
129
        raise NotImplementedError
130

131

132
class DockerPlatform(str):
1✔
133
    """Platform in the format ``os[/arch[/variant]]``"""
134

135
    linux_amd64 = "linux/amd64"
1✔
136
    linux_arm64 = "linux/arm64"
1✔
137

138

139
@dataclasses.dataclass
1✔
140
class Ulimit:
1✔
141
    """The ``ulimit`` settings for the container.
142
    See https://www.tutorialspoint.com/setting-ulimit-values-on-docker-containers
143
    """
144

145
    name: str
1✔
146
    soft_limit: int
1✔
147
    hard_limit: Optional[int] = None
1✔
148

149
    def __repr__(self):
150
        """Format: <type>=<soft limit>[:<hard limit>]"""
151
        ulimit_string = f"{self.name}={self.soft_limit}"
152
        if self.hard_limit:
153
            ulimit_string += f":{self.hard_limit}"
154
        return ulimit_string
155

156

157
# defines the type for port mappings (source->target port range)
158
PortRange = Union[List, HashableList]
1✔
159
# defines the protocol for a port range ("tcp" or "udp")
160
PortProtocol = str
1✔
161

162

163
def isinstance_union(obj, class_or_tuple):
1✔
164
    # that's some dirty hack
165
    if sys.version_info < (3, 10):
1✔
166
        return isinstance(obj, get_args(PortRange))
×
167
    else:
168
        return isinstance(obj, class_or_tuple)
1✔
169

170

171
class PortMappings:
1✔
172
    """Maps source to target port ranges for Docker port mappings."""
173

174
    # bind host to be used for defining port mappings
175
    bind_host: str
1✔
176
    # maps `from` port range to `to` port range for port mappings
177
    mappings: Dict[Tuple[PortRange, PortProtocol], List]
1✔
178

179
    def __init__(self, bind_host: str = None):
1✔
180
        self.bind_host = bind_host if bind_host else ""
1✔
181
        self.mappings = {}
1✔
182

183
    def add(
1✔
184
        self,
185
        port: Union[int, PortRange],
186
        mapped: Union[int, PortRange] = None,
187
        protocol: PortProtocol = "tcp",
188
    ):
189
        mapped = mapped or port
1✔
190
        if isinstance_union(port, PortRange):
1✔
191
            for i in range(port[1] - port[0] + 1):
1✔
192
                if isinstance_union(mapped, PortRange):
1✔
193
                    self.add(port[0] + i, mapped[0] + i, protocol)
1✔
194
                else:
195
                    self.add(port[0] + i, mapped, protocol)
1✔
196
            return
1✔
197
        if port is None or int(port) < 0:
1✔
198
            raise Exception(f"Unable to add mapping for invalid port: {port}")
×
199
        if self.contains(port, protocol):
1✔
200
            return
1✔
201
        bisected_host_port = None
1✔
202
        for (from_range, from_protocol), to_range in self.mappings.items():
1✔
203
            if not from_protocol == protocol:
1✔
204
                continue
1✔
205
            if not self.in_expanded_range(port, from_range):
1✔
206
                continue
1✔
207
            if not self.in_expanded_range(mapped, to_range):
1✔
208
                continue
1✔
209
            from_range_len = from_range[1] - from_range[0]
1✔
210
            to_range_len = to_range[1] - to_range[0]
1✔
211
            is_uniform = from_range_len == to_range_len
1✔
212
            if is_uniform:
1✔
213
                self.expand_range(port, from_range, protocol=protocol, remap=True)
1✔
214
                self.expand_range(mapped, to_range, protocol=protocol)
1✔
215
            else:
216
                if not self.in_range(mapped, to_range):
1✔
217
                    continue
1✔
218
                # extending a 1 to 1 mapping to be many to 1
219
                elif from_range_len == 1:
1✔
220
                    self.expand_range(port, from_range, protocol=protocol, remap=True)
1✔
221
                # splitting a uniform mapping
222
                else:
223
                    bisected_port_index = mapped - to_range[0]
1✔
224
                    bisected_host_port = from_range[0] + bisected_port_index
1✔
225
                    self.bisect_range(mapped, to_range, protocol=protocol)
1✔
226
                    self.bisect_range(bisected_host_port, from_range, protocol=protocol, remap=True)
1✔
227
                    break
1✔
228
            return
1✔
229
        if bisected_host_port is None:
1✔
230
            port_range = [port, port]
1✔
231
        elif bisected_host_port < port:
1✔
232
            port_range = [bisected_host_port, port]
1✔
233
        else:
234
            port_range = [port, bisected_host_port]
×
235
        protocol = str(protocol or "tcp").lower()
1✔
236
        self.mappings[(HashableList(port_range), protocol)] = [mapped, mapped]
1✔
237

238
    def to_str(self) -> str:
1✔
239
        bind_address = f"{self.bind_host}:" if self.bind_host else ""
1✔
240

241
        def entry(k, v):
1✔
242
            from_range, protocol = k
1✔
243
            to_range = v
1✔
244
            # use /<protocol> suffix if the protocol is not"tcp"
245
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
1✔
246
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
1✔
247
                return f"-p {bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"
1✔
248
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
1✔
249
                return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}{protocol_suffix}"
1✔
250
            return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}"
1✔
251

252
        return " ".join([entry(k, v) for k, v in self.mappings.items()])
1✔
253

254
    def to_list(self) -> List[str]:  # TODO test
1✔
255
        bind_address = f"{self.bind_host}:" if self.bind_host else ""
1✔
256

257
        def entry(k, v):
1✔
258
            from_range, protocol = k
1✔
259
            to_range = v
1✔
260
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
1✔
261
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
1✔
262
                return ["-p", f"{bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"]
1✔
263
            return [
1✔
264
                "-p",
265
                f"{bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}",
266
            ]
267

268
        return [item for k, v in self.mappings.items() for item in entry(k, v)]
1✔
269

270
    def to_dict(self) -> Dict[str, Union[Tuple[str, Union[int, List[int]]], int]]:
1✔
271
        bind_address = self.bind_host or ""
1✔
272

273
        def bind_port(bind_address, host_port):
1✔
274
            if host_port == 0:
1✔
275
                return None
1✔
276
            elif bind_address:
1✔
277
                return (bind_address, host_port)
1✔
278
            else:
279
                return host_port
1✔
280

281
        def entry(k, v):
1✔
282
            from_range, protocol = k
1✔
283
            to_range = v
1✔
284
            protocol_suffix = f"/{protocol}"
1✔
285
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
1✔
286
                container_port = to_range[0]
1✔
287
                host_ports = list(range(from_range[0], from_range[1] + 1))
1✔
288
                return [
1✔
289
                    (
290
                        f"{container_port}{protocol_suffix}",
291
                        (bind_address, host_ports) if bind_address else host_ports,
292
                    )
293
                ]
294
            return [
1✔
295
                (
296
                    f"{container_port}{protocol_suffix}",
297
                    bind_port(bind_address, host_port),
298
                )
299
                for container_port, host_port in zip(
300
                    range(to_range[0], to_range[1] + 1),
301
                    range(from_range[0], from_range[1] + 1),
302
                    strict=False,
303
                )
304
            ]
305

306
        items = [item for k, v in self.mappings.items() for item in entry(k, v)]
1✔
307
        return dict(items)
1✔
308

309
    def contains(self, port: int, protocol: PortProtocol = "tcp") -> bool:
1✔
310
        for from_range_w_protocol, to_range in self.mappings.items():
1✔
311
            from_protocol = from_range_w_protocol[1]
1✔
312
            if from_protocol == protocol:
1✔
313
                from_range = from_range_w_protocol[0]
1✔
314
                if self.in_range(port, from_range):
1✔
315
                    return True
1✔
316

317
    def in_range(self, port: int, range: PortRange) -> bool:
1✔
318
        return port >= range[0] and port <= range[1]
1✔
319

320
    def in_expanded_range(self, port: int, range: PortRange):
1✔
321
        return port >= range[0] - 1 and port <= range[1] + 1
1✔
322

323
    def expand_range(
1✔
324
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
325
    ):
326
        """
327
        Expand the given port range by the given port. If remap==True, put the updated range into self.mappings
328
        """
329
        if self.in_range(port, range):
1✔
330
            return
1✔
331
        new_range = list(range) if remap else range
1✔
332
        if port == range[0] - 1:
1✔
333
            new_range[0] = port
×
334
        elif port == range[1] + 1:
1✔
335
            new_range[1] = port
1✔
336
        else:
337
            raise Exception(f"Unable to add port {port} to existing range {range}")
×
338
        if remap:
1✔
339
            self._remap_range(range, new_range, protocol=protocol)
1✔
340

341
    def bisect_range(
1✔
342
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
343
    ):
344
        """
345
        Bisect a port range, at the provided port. This is needed in some cases when adding a
346
        non-uniform host to port mapping adjacent to an existing port range.
347
        If remap==True, put the updated range into self.mappings
348
        """
349
        if not self.in_range(port, range):
1✔
350
            return
×
351
        new_range = list(range) if remap else range
1✔
352
        if port == range[0]:
1✔
353
            new_range[0] = port + 1
×
354
        else:
355
            new_range[1] = port - 1
1✔
356
        if remap:
1✔
357
            self._remap_range(range, new_range, protocol)
1✔
358

359
    def _remap_range(self, old_key: PortRange, new_key: PortRange, protocol: PortProtocol):
1✔
360
        self.mappings[(HashableList(new_key), protocol)] = self.mappings.pop(
1✔
361
            (HashableList(old_key), protocol)
362
        )
363

364
    def __repr__(self):
365
        return f"<PortMappings: {self.to_dict()}>"
366

367

368
SimpleVolumeBind = Tuple[str, str]
1✔
369
"""Type alias for a simple version of VolumeBind"""
1✔
370

371

372
@dataclasses.dataclass
1✔
373
class VolumeBind:
1✔
374
    """Represents a --volume argument run/create command. When using VolumeBind to bind-mount a file or directory
375
    that does not yet exist on the Docker host, -v creates the endpoint for you. It is always created as a directory.
376
    """
377

378
    host_dir: str
1✔
379
    container_dir: str
1✔
380
    read_only: bool = False
1✔
381

382
    def to_str(self) -> str:
1✔
383
        args = []
×
384

385
        if self.host_dir:
×
386
            args.append(self.host_dir)
×
387

388
        if not self.container_dir:
×
389
            raise ValueError("no container dir specified")
×
390

391
        args.append(self.container_dir)
×
392

393
        if self.read_only:
×
394
            args.append("ro")
×
395

396
        return ":".join(args)
×
397

398
    @classmethod
1✔
399
    def parse(cls, param: str) -> "VolumeBind":
1✔
400
        parts = param.split(":")
1✔
401
        if 1 > len(parts) > 3:
1✔
402
            raise ValueError(f"Cannot parse volume bind {param}")
×
403

404
        volume = cls(parts[0], parts[1])
1✔
405
        if len(parts) == 3:
1✔
406
            if "ro" in parts[2].split(","):
1✔
407
                volume.read_only = True
1✔
408
        return volume
1✔
409

410

411
class VolumeMappings:
1✔
412
    mappings: List[Union[SimpleVolumeBind, VolumeBind]]
1✔
413

414
    def __init__(self, mappings: List[Union[SimpleVolumeBind, VolumeBind]] = None):
1✔
415
        self.mappings = mappings if mappings is not None else []
1✔
416

417
    def add(self, mapping: Union[SimpleVolumeBind, VolumeBind]):
1✔
418
        self.append(mapping)
1✔
419

420
    def append(
1✔
421
        self,
422
        mapping: Union[
423
            SimpleVolumeBind,
424
            VolumeBind,
425
        ],
426
    ):
427
        self.mappings.append(mapping)
1✔
428

429
    def find_target_mapping(
1✔
430
        self, container_dir: str
431
    ) -> Optional[Union[SimpleVolumeBind, VolumeBind]]:
432
        """
433
        Looks through the volumes and returns the one where the container dir matches ``container_dir``.
434
        Returns None if there is no volume mapping to the given container directory.
435

436
        :param container_dir: the target of the volume mapping, i.e., the path in the container
437
        :return: the volume mapping or None
438
        """
439
        for volume in self.mappings:
×
440
            target_dir = volume[1] if isinstance(volume, tuple) else volume.container_dir
×
441
            if container_dir == target_dir:
×
442
                return volume
×
443
        return None
×
444

445
    def __iter__(self):
1✔
446
        return self.mappings.__iter__()
1✔
447

448
    def __repr__(self):
449
        return self.mappings.__repr__()
450

451

452
VolumeType = Literal["bind", "volume"]
1✔
453

454

455
class VolumeInfo(NamedTuple):
1✔
456
    """Container volume information."""
457

458
    type: VolumeType
1✔
459
    source: str
1✔
460
    destination: str
1✔
461
    mode: str
1✔
462
    rw: bool
1✔
463
    propagation: str
1✔
464
    name: Optional[str] = None
1✔
465
    driver: Optional[str] = None
1✔
466

467

468
@dataclasses.dataclass
1✔
469
class LogConfig:
1✔
470
    type: Literal["json-file", "syslog", "journald", "gelf", "fluentd", "none", "awslogs", "splunk"]
1✔
471
    config: Dict[str, str] = dataclasses.field(default_factory=dict)
1✔
472

473

474
@dataclasses.dataclass
1✔
475
class ContainerConfiguration:
1✔
476
    image_name: str
1✔
477
    name: Optional[str] = None
1✔
478
    volumes: VolumeMappings = dataclasses.field(default_factory=VolumeMappings)
1✔
479
    ports: PortMappings = dataclasses.field(default_factory=PortMappings)
1✔
480
    exposed_ports: List[str] = dataclasses.field(default_factory=list)
1✔
481
    entrypoint: Optional[Union[List[str], str]] = None
1✔
482
    additional_flags: Optional[str] = None
1✔
483
    command: Optional[List[str]] = None
1✔
484
    env_vars: Dict[str, str] = dataclasses.field(default_factory=dict)
1✔
485

486
    privileged: bool = False
1✔
487
    remove: bool = False
1✔
488
    interactive: bool = False
1✔
489
    tty: bool = False
1✔
490
    detach: bool = False
1✔
491

492
    stdin: Optional[str] = None
1✔
493
    user: Optional[str] = None
1✔
494
    cap_add: Optional[List[str]] = None
1✔
495
    cap_drop: Optional[List[str]] = None
1✔
496
    security_opt: Optional[List[str]] = None
1✔
497
    network: Optional[str] = None
1✔
498
    dns: Optional[str] = None
1✔
499
    workdir: Optional[str] = None
1✔
500
    platform: Optional[str] = None
1✔
501
    ulimits: Optional[List[Ulimit]] = None
1✔
502
    labels: Optional[Dict[str, str]] = None
1✔
503
    init: Optional[bool] = None
1✔
504
    log_config: Optional[LogConfig] = None
1✔
505

506

507
class ContainerConfigurator(Protocol):
1✔
508
    """Protocol for functional configurators. A ContainerConfigurator modifies, when called,
509
    a ContainerConfiguration in place."""
510

511
    def __call__(self, configuration: ContainerConfiguration):
1✔
512
        """
513
        Modify the given container configuration.
514

515
        :param configuration: the configuration to modify
516
        """
517
        ...
×
518

519

520
@dataclasses.dataclass
1✔
521
class DockerRunFlags:
1✔
522
    """Class to capture Docker run/create flags for a container.
523
    run: https://docs.docker.com/engine/reference/commandline/run/
524
    create: https://docs.docker.com/engine/reference/commandline/create/
525
    """
526

527
    env_vars: Optional[Dict[str, str]]
1✔
528
    extra_hosts: Optional[Dict[str, str]]
1✔
529
    labels: Optional[Dict[str, str]]
1✔
530
    volumes: Optional[List[SimpleVolumeBind]]
1✔
531
    network: Optional[str]
1✔
532
    platform: Optional[DockerPlatform]
1✔
533
    privileged: Optional[bool]
1✔
534
    ports: Optional[PortMappings]
1✔
535
    ulimits: Optional[List[Ulimit]]
1✔
536
    user: Optional[str]
1✔
537
    dns: Optional[List[str]]
1✔
538

539

540
# TODO: remove Docker/Podman compatibility switches (in particular strip_wellknown_repo_prefixes=...)
541
#  from the container client base interface and introduce derived Podman client implementations instead!
542
class ContainerClient(metaclass=ABCMeta):
1✔
543
    @abstractmethod
1✔
544
    def get_system_info(self) -> dict:
1✔
545
        """Returns the docker system-wide information as dictionary (``docker info``)."""
546

547
    def get_system_id(self) -> str:
1✔
548
        """Returns the unique and stable ID of the docker daemon."""
549
        return self.get_system_info()["ID"]
1✔
550

551
    @abstractmethod
1✔
552
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
553
        """Returns the status of the container with the given name"""
554
        pass
×
555

556
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
557
        """Returns the usage statistics of the container with the given name"""
558
        pass
×
559

560
    def get_networks(self, container_name: str) -> List[str]:
1✔
561
        LOG.debug("Getting networks for container: %s", container_name)
1✔
562
        container_attrs = self.inspect_container(container_name_or_id=container_name)
1✔
563
        return list(container_attrs["NetworkSettings"].get("Networks", {}).keys())
1✔
564

565
    def get_container_ipv4_for_network(
1✔
566
        self, container_name_or_id: str, container_network: str
567
    ) -> str:
568
        """
569
        Returns the IPv4 address for the container on the interface connected to the given network
570
        :param container_name_or_id: Container to inspect
571
        :param container_network: Network the IP address will belong to
572
        :return: IP address of the given container on the interface connected to the given network
573
        """
574
        LOG.debug(
1✔
575
            "Getting ipv4 address for container %s in network %s.",
576
            container_name_or_id,
577
            container_network,
578
        )
579
        # we always need the ID for this
580
        container_id = self.get_container_id(container_name=container_name_or_id)
1✔
581
        network_attrs = self.inspect_network(container_network)
1✔
582
        containers = network_attrs.get("Containers") or {}
1✔
583
        if container_id not in containers:
1✔
584
            LOG.debug("Network attributes: %s", network_attrs)
1✔
585
            try:
1✔
586
                inspection = self.inspect_container(container_name_or_id=container_name_or_id)
1✔
587
                LOG.debug("Container %s Attributes: %s", container_name_or_id, inspection)
1✔
588
                logs = self.get_container_logs(container_name_or_id=container_name_or_id)
1✔
589
                LOG.debug("Container %s Logs: %s", container_name_or_id, logs)
1✔
590
            except ContainerException as e:
×
591
                LOG.debug("Cannot inspect container %s: %s", container_name_or_id, e)
×
592
            raise ContainerException(
1✔
593
                "Container %s is not connected to target network %s",
594
                container_name_or_id,
595
                container_network,
596
            )
597
        try:
1✔
598
            ip = str(ipaddress.IPv4Interface(containers[container_id]["IPv4Address"]).ip)
1✔
599
        except Exception as e:
×
600
            raise ContainerException(
×
601
                f"Unable to detect IP address for container {container_name_or_id} in network {container_network}: {e}"
602
            )
603
        return ip
1✔
604

605
    @abstractmethod
1✔
606
    def stop_container(self, container_name: str, timeout: int = 10):
1✔
607
        """Stops container with given name
608
        :param container_name: Container identifier (name or id) of the container to be stopped
609
        :param timeout: Timeout after which SIGKILL is sent to the container.
610
        """
611

612
    @abstractmethod
1✔
613
    def restart_container(self, container_name: str, timeout: int = 10):
1✔
614
        """Restarts a container with the given name.
615
        :param container_name: Container identifier
616
        :param timeout: Seconds to wait for stop before killing the container
617
        """
618

619
    @abstractmethod
1✔
620
    def pause_container(self, container_name: str):
1✔
621
        """Pauses a container with the given name."""
622

623
    @abstractmethod
1✔
624
    def unpause_container(self, container_name: str):
1✔
625
        """Unpauses a container with the given name."""
626

627
    @abstractmethod
1✔
628
    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:
1✔
629
        """Removes container with given name"""
630

631
    @abstractmethod
1✔
632
    def remove_image(self, image: str, force: bool = True) -> None:
1✔
633
        """Removes an image with given name
634

635
        :param image: Image name and tag
636
        :param force: Force removal
637
        """
638

639
    @abstractmethod
1✔
640
    def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:
1✔
641
        """List all containers matching the given filters
642

643
        :return: A list of dicts with keys id, image, name, labels, status
644
        """
645

646
    def get_running_container_names(self) -> List[str]:
1✔
647
        """Returns a list of the names of all running containers"""
648
        result = self.list_containers(all=False)
1✔
649
        result = [container["name"] for container in result]
1✔
650
        return result
1✔
651

652
    def is_container_running(self, container_name: str) -> bool:
1✔
653
        """Checks whether a container with a given name is currently running"""
654
        return container_name in self.get_running_container_names()
1✔
655

656
    def create_file_in_container(
1✔
657
        self,
658
        container_name,
659
        file_contents: bytes,
660
        container_path: str,
661
        chmod_mode: Optional[int] = None,
662
    ) -> None:
663
        """
664
        Create a file in container with the provided content. Provide the 'chmod_mode' argument if you want the file to have specific permissions.
665
        """
666
        with tempfile.NamedTemporaryFile() as tmp:
1✔
667
            tmp.write(file_contents)
1✔
668
            tmp.flush()
1✔
669
            if chmod_mode is not None:
1✔
670
                chmod_r(tmp.name, chmod_mode)
×
671
            self.copy_into_container(
1✔
672
                container_name=container_name,
673
                local_path=tmp.name,
674
                container_path=container_path,
675
            )
676

677
    @abstractmethod
1✔
678
    def copy_into_container(
1✔
679
        self, container_name: str, local_path: str, container_path: str
680
    ) -> None:
681
        """Copy contents of the given local path into the container"""
682

683
    @abstractmethod
1✔
684
    def copy_from_container(
1✔
685
        self, container_name: str, local_path: str, container_path: str
686
    ) -> None:
687
        """Copy contents of the given container to the host"""
688

689
    @abstractmethod
1✔
690
    def pull_image(self, docker_image: str, platform: Optional[DockerPlatform] = None) -> None:
1✔
691
        """Pulls an image with a given name from a Docker registry"""
692

693
    @abstractmethod
1✔
694
    def push_image(self, docker_image: str) -> None:
1✔
695
        """Pushes an image with a given name to a Docker registry"""
696

697
    @abstractmethod
1✔
698
    def build_image(
1✔
699
        self,
700
        dockerfile_path: str,
701
        image_name: str,
702
        context_path: str = None,
703
        platform: Optional[DockerPlatform] = None,
704
    ) -> None:
705
        """Builds an image from the given Dockerfile
706

707
        :param dockerfile_path: Path to Dockerfile, or a directory that contains a Dockerfile
708
        :param image_name: Name of the image to be built
709
        :param context_path: Path for build context (defaults to dirname of Dockerfile)
710
        :param platform: Target platform for build (defaults to platform of Docker host)
711
        """
712

713
    @abstractmethod
1✔
714
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
715
        """Tags an image with a new name
716

717
        :param source_ref: Name or ID of the image to be tagged
718
        :param target_name: New name (tag) of the tagged image
719
        """
720

721
    @abstractmethod
1✔
722
    def get_docker_image_names(
1✔
723
        self,
724
        strip_latest: bool = True,
725
        include_tags: bool = True,
726
        strip_wellknown_repo_prefixes: bool = True,
727
    ) -> List[str]:
728
        """
729
        Get all names of docker images available to the container engine
730
        :param strip_latest: return images both with and without :latest tag
731
        :param include_tags: include tags of the images in the names
732
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
733
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
734
        :return: List of image names
735
        """
736

737
    @abstractmethod
1✔
738
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
739
        """Get all logs of a given container"""
740

741
    @abstractmethod
1✔
742
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
743
        """Returns a blocking generator you can iterate over to retrieve log output as it happens."""
744

745
    @abstractmethod
1✔
746
    def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:
1✔
747
        """Get detailed attributes of a container.
748

749
        :return: Dict containing docker attributes as returned by the daemon
750
        """
751

752
    def inspect_container_volumes(self, container_name_or_id) -> List[VolumeInfo]:
1✔
753
        """Return information about the volumes mounted into the given container.
754

755
        :param container_name_or_id: the container name or id
756
        :return: a list of volumes
757
        """
758
        volumes = []
1✔
759
        for doc in self.inspect_container(container_name_or_id)["Mounts"]:
1✔
760
            volumes.append(VolumeInfo(**{k.lower(): v for k, v in doc.items()}))
1✔
761

762
        return volumes
1✔
763

764
    @abstractmethod
1✔
765
    def inspect_image(
1✔
766
        self, image_name: str, pull: bool = True, strip_wellknown_repo_prefixes: bool = True
767
    ) -> Dict[str, Union[dict, list, str]]:
768
        """Get detailed attributes of an image.
769

770
        :param image_name: Image name to inspect
771
        :param pull: Whether to pull image if not existent
772
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
773
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
774
        :return: Dict containing docker attributes as returned by the daemon
775
        """
776

777
    @abstractmethod
1✔
778
    def create_network(self, network_name: str) -> str:
1✔
779
        """
780
        Creates a network with the given name
781
        :param network_name: Name of the network
782
        :return Network ID
783
        """
784

785
    @abstractmethod
1✔
786
    def delete_network(self, network_name: str) -> None:
1✔
787
        """
788
        Delete a network with the given name
789
        :param network_name: Name of the network
790
        """
791

792
    @abstractmethod
1✔
793
    def inspect_network(self, network_name: str) -> Dict[str, Union[Dict, str]]:
1✔
794
        """Get detailed attributes of an network.
795

796
        :return: Dict containing docker attributes as returned by the daemon
797
        """
798

799
    @abstractmethod
1✔
800
    def connect_container_to_network(
1✔
801
        self,
802
        network_name: str,
803
        container_name_or_id: str,
804
        aliases: Optional[List] = None,
805
        link_local_ips: List[str] = None,
806
    ) -> None:
807
        """
808
        Connects a container to a given network
809
        :param network_name: Network to connect the container to
810
        :param container_name_or_id: Container to connect to the network
811
        :param aliases: List of dns names the container should be available under in the network
812
        :param link_local_ips: List of link-local (IPv4 or IPv6) addresses
813
        """
814

815
    @abstractmethod
1✔
816
    def disconnect_container_from_network(
1✔
817
        self, network_name: str, container_name_or_id: str
818
    ) -> None:
819
        """
820
        Disconnects a container from a given network
821
        :param network_name: Network to disconnect the container from
822
        :param container_name_or_id: Container to disconnect from the network
823
        """
824

825
    def get_container_name(self, container_id: str) -> str:
1✔
826
        """Get the name of a container by a given identifier"""
827
        return self.inspect_container(container_id)["Name"].lstrip("/")
1✔
828

829
    def get_container_id(self, container_name: str) -> str:
1✔
830
        """Get the id of a container by a given name"""
831
        return self.inspect_container(container_name)["Id"]
1✔
832

833
    @abstractmethod
1✔
834
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
835
        """Get the IP address of a given container
836

837
        If container has multiple networks, it will return the IP of the first
838
        """
839

840
    def get_image_cmd(self, docker_image: str, pull: bool = True) -> List[str]:
1✔
841
        """Get the command for the given image
842
        :param docker_image: Docker image to inspect
843
        :param pull: Whether to pull if image is not present
844
        :return: Image command in its array form
845
        """
846
        cmd_list = self.inspect_image(docker_image, pull)["Config"]["Cmd"] or []
1✔
847
        return cmd_list
1✔
848

849
    def get_image_entrypoint(self, docker_image: str, pull: bool = True) -> str:
1✔
850
        """Get the entry point for the given image
851
        :param docker_image: Docker image to inspect
852
        :param pull: Whether to pull if image is not present
853
        :return: Image entrypoint
854
        """
855
        LOG.debug("Getting the entrypoint for image: %s", docker_image)
1✔
856
        entrypoint_list = self.inspect_image(docker_image, pull)["Config"].get("Entrypoint") or []
1✔
857
        return shlex.join(entrypoint_list)
1✔
858

859
    @abstractmethod
1✔
860
    def has_docker(self) -> bool:
1✔
861
        """Check if system has docker available"""
862

863
    @abstractmethod
1✔
864
    def commit(
1✔
865
        self,
866
        container_name_or_id: str,
867
        image_name: str,
868
        image_tag: str,
869
    ):
870
        """Create an image from a running container.
871

872
        :param container_name_or_id: Source container
873
        :param image_name: Destination image name
874
        :param image_tag: Destination image tag
875
        """
876

877
    def create_container_from_config(self, container_config: ContainerConfiguration) -> str:
1✔
878
        """
879
        Similar to create_container, but allows passing the whole ContainerConfiguration
880
        :param container_config: ContainerConfiguration how to start the container
881
        :return: Container ID
882
        """
883
        return self.create_container(
1✔
884
            image_name=container_config.image_name,
885
            name=container_config.name,
886
            entrypoint=container_config.entrypoint,
887
            remove=container_config.remove,
888
            interactive=container_config.interactive,
889
            tty=container_config.tty,
890
            command=container_config.command,
891
            volumes=container_config.volumes,
892
            ports=container_config.ports,
893
            exposed_ports=container_config.exposed_ports,
894
            env_vars=container_config.env_vars,
895
            user=container_config.user,
896
            cap_add=container_config.cap_add,
897
            cap_drop=container_config.cap_drop,
898
            security_opt=container_config.security_opt,
899
            network=container_config.network,
900
            dns=container_config.dns,
901
            additional_flags=container_config.additional_flags,
902
            workdir=container_config.workdir,
903
            privileged=container_config.privileged,
904
            platform=container_config.platform,
905
            labels=container_config.labels,
906
            ulimits=container_config.ulimits,
907
            init=container_config.init,
908
            log_config=container_config.log_config,
909
        )
910

911
    @abstractmethod
1✔
912
    def create_container(
1✔
913
        self,
914
        image_name: str,
915
        *,
916
        name: Optional[str] = None,
917
        entrypoint: Optional[Union[List[str], str]] = None,
918
        remove: bool = False,
919
        interactive: bool = False,
920
        tty: bool = False,
921
        detach: bool = False,
922
        command: Optional[Union[List[str], str]] = None,
923
        volumes: Optional[Union[VolumeMappings, List[SimpleVolumeBind]]] = None,
924
        ports: Optional[PortMappings] = None,
925
        exposed_ports: Optional[List[str]] = None,
926
        env_vars: Optional[Dict[str, str]] = None,
927
        user: Optional[str] = None,
928
        cap_add: Optional[List[str]] = None,
929
        cap_drop: Optional[List[str]] = None,
930
        security_opt: Optional[List[str]] = None,
931
        network: Optional[str] = None,
932
        dns: Optional[Union[str, List[str]]] = None,
933
        additional_flags: Optional[str] = None,
934
        workdir: Optional[str] = None,
935
        privileged: Optional[bool] = None,
936
        labels: Optional[Dict[str, str]] = None,
937
        platform: Optional[DockerPlatform] = None,
938
        ulimits: Optional[List[Ulimit]] = None,
939
        init: Optional[bool] = None,
940
        log_config: Optional[LogConfig] = None,
941
    ) -> str:
942
        """Creates a container with the given image
943

944
        :return: Container ID
945
        """
946

947
    @abstractmethod
1✔
948
    def run_container(
1✔
949
        self,
950
        image_name: str,
951
        stdin: bytes = None,
952
        *,
953
        name: Optional[str] = None,
954
        entrypoint: Optional[str] = None,
955
        remove: bool = False,
956
        interactive: bool = False,
957
        tty: bool = False,
958
        detach: bool = False,
959
        command: Optional[Union[List[str], str]] = None,
960
        volumes: Optional[Union[VolumeMappings, List[SimpleVolumeBind]]] = None,
961
        ports: Optional[PortMappings] = None,
962
        exposed_ports: Optional[List[str]] = None,
963
        env_vars: Optional[Dict[str, str]] = None,
964
        user: Optional[str] = None,
965
        cap_add: Optional[List[str]] = None,
966
        cap_drop: Optional[List[str]] = None,
967
        security_opt: Optional[List[str]] = None,
968
        network: Optional[str] = None,
969
        dns: Optional[str] = None,
970
        additional_flags: Optional[str] = None,
971
        workdir: Optional[str] = None,
972
        labels: Optional[Dict[str, str]] = None,
973
        platform: Optional[DockerPlatform] = None,
974
        privileged: Optional[bool] = None,
975
        ulimits: Optional[List[Ulimit]] = None,
976
        init: Optional[bool] = None,
977
        log_config: Optional[LogConfig] = None,
978
    ) -> Tuple[bytes, bytes]:
979
        """Creates and runs a given docker container
980

981
        :return: A tuple (stdout, stderr)
982
        """
983

984
    def run_container_from_config(
1✔
985
        self, container_config: ContainerConfiguration
986
    ) -> Tuple[bytes, bytes]:
987
        """Like ``run_container`` but uses the parameters from the configuration."""
988

989
        return self.run_container(
×
990
            image_name=container_config.image_name,
991
            stdin=container_config.stdin,
992
            name=container_config.name,
993
            entrypoint=container_config.entrypoint,
994
            remove=container_config.remove,
995
            interactive=container_config.interactive,
996
            tty=container_config.tty,
997
            detach=container_config.detach,
998
            command=container_config.command,
999
            volumes=container_config.volumes,
1000
            ports=container_config.ports,
1001
            exposed_ports=container_config.exposed_ports,
1002
            env_vars=container_config.env_vars,
1003
            user=container_config.user,
1004
            cap_add=container_config.cap_add,
1005
            cap_drop=container_config.cap_drop,
1006
            security_opt=container_config.security_opt,
1007
            network=container_config.network,
1008
            dns=container_config.dns,
1009
            additional_flags=container_config.additional_flags,
1010
            workdir=container_config.workdir,
1011
            platform=container_config.platform,
1012
            privileged=container_config.privileged,
1013
            ulimits=container_config.ulimits,
1014
            init=container_config.init,
1015
            log_config=container_config.log_config,
1016
        )
1017

1018
    @abstractmethod
1✔
1019
    def exec_in_container(
1✔
1020
        self,
1021
        container_name_or_id: str,
1022
        command: Union[List[str], str],
1023
        interactive: bool = False,
1024
        detach: bool = False,
1025
        env_vars: Optional[Dict[str, Optional[str]]] = None,
1026
        stdin: Optional[bytes] = None,
1027
        user: Optional[str] = None,
1028
        workdir: Optional[str] = None,
1029
    ) -> Tuple[bytes, bytes]:
1030
        """Execute a given command in a container
1031

1032
        :return: A tuple (stdout, stderr)
1033
        """
1034

1035
    @abstractmethod
1✔
1036
    def start_container(
1✔
1037
        self,
1038
        container_name_or_id: str,
1039
        stdin: bytes = None,
1040
        interactive: bool = False,
1041
        attach: bool = False,
1042
        flags: Optional[str] = None,
1043
    ) -> Tuple[bytes, bytes]:
1044
        """Start a given, already created container
1045

1046
        :return: A tuple (stdout, stderr) if attach or interactive is set, otherwise a tuple (b"container_name_or_id", b"")
1047
        """
1048

1049
    @abstractmethod
1✔
1050
    def attach_to_container(self, container_name_or_id: str):
1✔
1051
        """
1052
        Attach local standard input, output, and error streams to a running container
1053
        """
1054

1055
    @abstractmethod
1✔
1056
    def login(self, username: str, password: str, registry: Optional[str] = None) -> None:
1✔
1057
        """
1058
        Login into an OCI registry
1059

1060
        :param username: Username for the registry
1061
        :param password: Password / token for the registry
1062
        :param registry: Registry url
1063
        """
1064

1065

1066
class Util:
1✔
1067
    MAX_ENV_ARGS_LENGTH = 20000
1✔
1068

1069
    @staticmethod
1✔
1070
    def format_env_vars(key: str, value: Optional[str]):
1✔
1071
        if value is None:
1✔
1072
            return key
×
1073
        return f"{key}={value}"
1✔
1074

1075
    @classmethod
1✔
1076
    def create_env_vars_file_flag(cls, env_vars: Dict) -> Tuple[List[str], Optional[str]]:
1✔
1077
        if not env_vars:
1✔
1078
            return [], None
×
1079
        result = []
1✔
1080
        env_vars = dict(env_vars)
1✔
1081
        env_file = None
1✔
1082
        if len(str(env_vars)) > cls.MAX_ENV_ARGS_LENGTH:
1✔
1083
            # default ARG_MAX=131072 in Docker - let's create an env var file if the string becomes too long...
1084
            env_file = cls.mountable_tmp_file()
×
1085
            env_content = ""
×
1086
            for name, value in dict(env_vars).items():
×
1087
                if len(value) > cls.MAX_ENV_ARGS_LENGTH:
×
1088
                    # each line in the env file has a max size as well (error "bufio.Scanner: token too long")
1089
                    continue
×
1090
                env_vars.pop(name)
×
1091
                value = value.replace("\n", "\\")
×
1092
                env_content += f"{cls.format_env_vars(name, value)}\n"
×
1093
            save_file(env_file, env_content)
×
1094
            result += ["--env-file", env_file]
×
1095

1096
        env_vars_res = [
1✔
1097
            item for k, v in env_vars.items() for item in ["-e", cls.format_env_vars(k, v)]
1098
        ]
1099
        result += env_vars_res
1✔
1100
        return result, env_file
1✔
1101

1102
    @staticmethod
1✔
1103
    def rm_env_vars_file(env_vars_file) -> None:
1✔
1104
        if env_vars_file:
1✔
1105
            return rm_rf(env_vars_file)
×
1106

1107
    @staticmethod
1✔
1108
    def mountable_tmp_file():
1✔
1109
        f = os.path.join(config.dirs.mounted_tmp, short_uid())
×
1110
        TMP_FILES.append(f)
×
1111
        return f
×
1112

1113
    @staticmethod
1✔
1114
    def append_without_latest(image_names: List[str]):
1✔
1115
        suffix = ":latest"
1✔
1116
        for image in list(image_names):
1✔
1117
            if image.endswith(suffix):
1✔
1118
                image_names.append(image[: -len(suffix)])
1✔
1119

1120
    @staticmethod
1✔
1121
    def strip_wellknown_repo_prefixes(image_names: List[str]) -> List[str]:
1✔
1122
        """
1123
        Remove well-known repo prefixes like `localhost/` or `docker.io/library/` from the list of given
1124
        image names. This is mostly to ensure compatibility of our Docker client with Podman API responses.
1125
        :return: a copy of the list of image names, with well-known repo prefixes removed
1126
        """
1127
        result = []
1✔
1128
        for image in image_names:
1✔
1129
            for prefix in WELL_KNOWN_IMAGE_REPO_PREFIXES:
1✔
1130
                if image.startswith(prefix):
1✔
1131
                    image = image.removeprefix(prefix)
×
1132
                    # strip only one of the matching prefixes (avoid multi-stripping)
1133
                    break
×
1134
            result.append(image)
1✔
1135
        return result
1✔
1136

1137
    @staticmethod
1✔
1138
    def tar_path(path: str, target_path: str, is_dir: bool):
1✔
1139
        f = tempfile.NamedTemporaryFile()
1✔
1140
        with tarfile.open(mode="w", fileobj=f) as t:
1✔
1141
            abs_path = os.path.abspath(path)
1✔
1142
            arcname = (
1✔
1143
                os.path.basename(path)
1144
                if is_dir
1145
                else (os.path.basename(target_path) or os.path.basename(path))
1146
            )
1147
            t.add(abs_path, arcname=arcname)
1✔
1148

1149
        f.seek(0)
1✔
1150
        return f
1✔
1151

1152
    @staticmethod
1✔
1153
    def untar_to_path(tardata, target_path):
1✔
1154
        target_path = Path(target_path)
1✔
1155
        with tarfile.open(mode="r", fileobj=io.BytesIO(b"".join(b for b in tardata))) as t:
1✔
1156
            if target_path.is_dir():
1✔
1157
                t.extractall(path=target_path)
1✔
1158
            else:
1159
                member = t.next()
1✔
1160
                if member:
1✔
1161
                    member.name = target_path.name
1✔
1162
                    t.extract(member, target_path.parent)
1✔
1163
                else:
1164
                    LOG.debug("File to copy empty, ignoring...")
×
1165

1166
    @staticmethod
1✔
1167
    def _read_docker_cli_env_file(env_file: str) -> Dict[str, str]:
1✔
1168
        """
1169
        Read an environment file in docker CLI format, specified here:
1170
        https://docs.docker.com/reference/cli/docker/container/run/#env
1171
        :param env_file: Path to the environment file
1172
        :return: Read environment variables
1173
        """
1174
        env_vars = {}
1✔
1175
        try:
1✔
1176
            with open(env_file, mode="rt") as f:
1✔
1177
                env_file_lines = f.readlines()
1✔
1178
        except FileNotFoundError as e:
×
1179
            LOG.error(
×
1180
                "Specified env file '%s' not found. Please make sure the file is properly mounted into the LocalStack container. Error: %s",
1181
                env_file,
1182
                e,
1183
            )
1184
            raise
×
1185
        except OSError as e:
×
1186
            LOG.error(
×
1187
                "Could not read env file '%s'. Please make sure the LocalStack container has the permissions to read it. Error: %s",
1188
                env_file,
1189
                e,
1190
            )
1191
            raise
×
1192
        for idx, line in enumerate(env_file_lines):
1✔
1193
            line = line.strip()
1✔
1194
            if not line or line.startswith("#"):
1✔
1195
                # skip comments or empty lines
1196
                continue
1✔
1197
            lhs, separator, rhs = line.partition("=")
1✔
1198
            if rhs or separator:
1✔
1199
                env_vars[lhs] = rhs
1✔
1200
            else:
1201
                # No "=" in the line, only the name => lookup in local env
1202
                if env_value := os.environ.get(lhs):
1✔
1203
                    env_vars[lhs] = env_value
1✔
1204
        return env_vars
1✔
1205

1206
    @staticmethod
1✔
1207
    def parse_additional_flags(
1✔
1208
        additional_flags: str,
1209
        env_vars: Optional[Dict[str, str]] = None,
1210
        labels: Optional[Dict[str, str]] = None,
1211
        volumes: Optional[List[SimpleVolumeBind]] = None,
1212
        network: Optional[str] = None,
1213
        platform: Optional[DockerPlatform] = None,
1214
        ports: Optional[PortMappings] = None,
1215
        privileged: Optional[bool] = None,
1216
        user: Optional[str] = None,
1217
        ulimits: Optional[List[Ulimit]] = None,
1218
        dns: Optional[Union[str, List[str]]] = None,
1219
    ) -> DockerRunFlags:
1220
        """Parses additional CLI-formatted Docker flags, which could overwrite provided defaults.
1221
        :param additional_flags: String which contains the flag definitions inspired by the Docker CLI reference:
1222
                                 https://docs.docker.com/engine/reference/commandline/run/
1223
        :param env_vars: Dict with env vars. Will be modified in place.
1224
        :param labels: Dict with labels. Will be modified in place.
1225
        :param volumes: List of mount tuples (host_path, container_path). Will be modified in place.
1226
        :param network: Existing network name (optional). Warning will be printed if network is overwritten in flags.
1227
        :param platform: Platform to execute container. Warning will be printed if platform is overwritten in flags.
1228
        :param ports: PortMapping object. Will be modified in place.
1229
        :param privileged: Run the container in privileged mode. Warning will be printed if overwritten in flags.
1230
        :param ulimits: ulimit options in the format <type>=<soft limit>[:<hard limit>]
1231
        :param user: User to run first process. Warning will be printed if user is overwritten in flags.
1232
        :param dns: List of DNS servers to configure the container with.
1233
        :return: A DockerRunFlags object that will return new objects if respective parameters were None and
1234
                additional flags contained a flag for that object or the same which are passed otherwise.
1235
        """
1236
        # Argparse refactoring opportunity: custom argparse actions can be used to modularize parsing (e.g., key=value)
1237
        # https://docs.python.org/3/library/argparse.html#action
1238

1239
        # Configure parser
1240
        parser = NoExitArgumentParser(description="Docker run flags parser")
1✔
1241
        parser.add_argument(
1✔
1242
            "--add-host",
1243
            help="Add a custom host-to-IP mapping (host:ip)",
1244
            dest="add_hosts",
1245
            action="append",
1246
        )
1247
        parser.add_argument(
1✔
1248
            "--env", "-e", help="Set environment variables", dest="envs", action="append"
1249
        )
1250
        parser.add_argument(
1✔
1251
            "--env-file",
1252
            help="Set environment variables via a file",
1253
            dest="env_files",
1254
            action="append",
1255
        )
1256
        parser.add_argument(
1✔
1257
            "--compose-env-file",
1258
            help="Set environment variables via a file, with a docker-compose supported feature set.",
1259
            dest="compose_env_files",
1260
            action="append",
1261
        )
1262
        parser.add_argument(
1✔
1263
            "--label", "-l", help="Add container meta data", dest="labels", action="append"
1264
        )
1265
        parser.add_argument("--network", help="Connect a container to a network")
1✔
1266
        parser.add_argument(
1✔
1267
            "--platform",
1268
            type=DockerPlatform,
1269
            help="Docker platform (e.g., linux/amd64 or linux/arm64)",
1270
        )
1271
        parser.add_argument(
1✔
1272
            "--privileged",
1273
            help="Give extended privileges to this container",
1274
            action="store_true",
1275
        )
1276
        parser.add_argument(
1✔
1277
            "--publish",
1278
            "-p",
1279
            help="Publish container port(s) to the host",
1280
            dest="publish_ports",
1281
            action="append",
1282
        )
1283
        parser.add_argument(
1✔
1284
            "--ulimit", help="Container ulimit settings", dest="ulimits", action="append"
1285
        )
1286
        parser.add_argument("--user", "-u", help="Username or UID to execute first process")
1✔
1287
        parser.add_argument(
1✔
1288
            "--volume", "-v", help="Bind mount a volume", dest="volumes", action="append"
1289
        )
1290
        parser.add_argument("--dns", help="Set custom DNS servers", dest="dns", action="append")
1✔
1291

1292
        # Parse
1293
        flags = shlex.split(additional_flags)
1✔
1294
        args = parser.parse_args(flags)
1✔
1295

1296
        # Post-process parsed flags
1297
        extra_hosts = None
1✔
1298
        if args.add_hosts:
1✔
1299
            for add_host in args.add_hosts:
1✔
1300
                extra_hosts = extra_hosts if extra_hosts is not None else {}
1✔
1301
                hosts_split = add_host.split(":")
1✔
1302
                extra_hosts[hosts_split[0]] = hosts_split[1]
1✔
1303

1304
        # set env file values before env values, as the latter override the earlier
1305
        if args.env_files:
1✔
1306
            env_vars = env_vars if env_vars is not None else {}
1✔
1307
            for env_file in args.env_files:
1✔
1308
                env_vars.update(Util._read_docker_cli_env_file(env_file))
1✔
1309

1310
        if args.compose_env_files:
1✔
1311
            env_vars = env_vars if env_vars is not None else {}
1✔
1312
            for env_file in args.compose_env_files:
1✔
1313
                env_vars.update(dotenv.dotenv_values(env_file))
1✔
1314

1315
        if args.envs:
1✔
1316
            env_vars = env_vars if env_vars is not None else {}
1✔
1317
            for env in args.envs:
1✔
1318
                lhs, _, rhs = env.partition("=")
1✔
1319
                env_vars[lhs] = rhs
1✔
1320

1321
        if args.labels:
1✔
1322
            labels = labels if labels is not None else {}
1✔
1323
            for label in args.labels:
1✔
1324
                key, _, value = label.partition("=")
1✔
1325
                # Only consider non-empty labels
1326
                if key:
1✔
1327
                    labels[key] = value
1✔
1328

1329
        if args.network:
1✔
1330
            LOG.warning(
1✔
1331
                "Overwriting Docker container network '%s' with new value '%s'",
1332
                network,
1333
                args.network,
1334
            )
1335
            network = args.network
1✔
1336

1337
        if args.platform:
1✔
1338
            LOG.warning(
1✔
1339
                "Overwriting Docker platform '%s' with new value '%s'",
1340
                platform,
1341
                args.platform,
1342
            )
1343
            platform = args.platform
1✔
1344

1345
        if args.privileged:
1✔
1346
            LOG.warning(
1✔
1347
                "Overwriting Docker container privileged flag %s with new value %s",
1348
                privileged,
1349
                args.privileged,
1350
            )
1351
            privileged = args.privileged
1✔
1352

1353
        if args.publish_ports:
1✔
1354
            for port_mapping in args.publish_ports:
1✔
1355
                port_split = port_mapping.split(":")
1✔
1356
                protocol = "tcp"
1✔
1357
                if len(port_split) == 2:
1✔
1358
                    host_port, container_port = port_split
1✔
1359
                elif len(port_split) == 3:
1✔
1360
                    LOG.warning(
1✔
1361
                        "Host part of port mappings are ignored currently in additional flags"
1362
                    )
1363
                    _, host_port, container_port = port_split
1✔
1364
                else:
1365
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
1✔
1366
                host_port_split = host_port.split("-")
1✔
1367
                if len(host_port_split) == 2:
1✔
1368
                    host_port = [int(host_port_split[0]), int(host_port_split[1])]
1✔
1369
                elif len(host_port_split) == 1:
1✔
1370
                    host_port = int(host_port)
1✔
1371
                else:
1372
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
×
1373
                if "/" in container_port:
1✔
1374
                    container_port, protocol = container_port.split("/")
1✔
1375
                ports = ports if ports is not None else PortMappings()
1✔
1376
                ports.add(host_port, int(container_port), protocol)
1✔
1377

1378
        if args.ulimits:
1✔
1379
            ulimits = ulimits if ulimits is not None else []
1✔
1380
            ulimits_dict = {ul.name: ul for ul in ulimits}
1✔
1381
            for ulimit in args.ulimits:
1✔
1382
                name, _, rhs = ulimit.partition("=")
1✔
1383
                soft, _, hard = rhs.partition(":")
1✔
1384
                hard_limit = int(hard) if hard else int(soft)
1✔
1385
                new_ulimit = Ulimit(name=name, soft_limit=int(soft), hard_limit=hard_limit)
1✔
1386
                if ulimits_dict.get(name):
1✔
1387
                    LOG.warning("Overwriting Docker ulimit %s", new_ulimit)
1✔
1388
                ulimits_dict[name] = new_ulimit
1✔
1389
            ulimits = list(ulimits_dict.values())
1✔
1390

1391
        if args.user:
1✔
1392
            LOG.warning(
1✔
1393
                "Overwriting Docker user '%s' with new value '%s'",
1394
                user,
1395
                args.user,
1396
            )
1397
            user = args.user
1✔
1398

1399
        if args.volumes:
1✔
1400
            volumes = volumes if volumes is not None else []
1✔
1401
            for volume in args.volumes:
1✔
1402
                match = re.match(
1✔
1403
                    r"(?P<host>[\w\s\\\/:\-.]+?):(?P<container>[\w\s\/\-.]+)(?::(?P<arg>ro|rw|z|Z))?",
1404
                    volume,
1405
                )
1406
                if not match:
1✔
1407
                    LOG.warning("Unable to parse volume mount Docker flags: %s", volume)
×
1408
                    continue
×
1409
                host_path = match.group("host")
1✔
1410
                container_path = match.group("container")
1✔
1411
                rw_args = match.group("arg")
1✔
1412
                if rw_args:
1✔
1413
                    LOG.info("Volume options like :ro or :rw are currently ignored.")
1✔
1414
                volumes.append((host_path, container_path))
1✔
1415

1416
        dns = ensure_list(dns or [])
1✔
1417
        if args.dns:
1✔
1418
            LOG.info(
1✔
1419
                "Extending Docker container DNS servers %s with additional values %s", dns, args.dns
1420
            )
1421
            dns.extend(args.dns)
1✔
1422

1423
        return DockerRunFlags(
1✔
1424
            env_vars=env_vars,
1425
            extra_hosts=extra_hosts,
1426
            labels=labels,
1427
            volumes=volumes,
1428
            ports=ports,
1429
            network=network,
1430
            platform=platform,
1431
            privileged=privileged,
1432
            ulimits=ulimits,
1433
            user=user,
1434
            dns=dns,
1435
        )
1436

1437
    @staticmethod
1✔
1438
    def convert_mount_list_to_dict(
1✔
1439
        volumes: Union[List[SimpleVolumeBind], VolumeMappings],
1440
    ) -> Dict[str, Dict[str, str]]:
1441
        """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""
1442

1443
        def _map_to_dict(paths: SimpleVolumeBind | VolumeBind):
1✔
1444
            if isinstance(paths, VolumeBind):
1✔
1445
                return str(paths.host_dir), {
1✔
1446
                    "bind": paths.container_dir,
1447
                    "mode": "ro" if paths.read_only else "rw",
1448
                }
1449
            else:
1450
                return str(paths[0]), {"bind": paths[1], "mode": "rw"}
×
1451

1452
        return dict(
1✔
1453
            map(
1454
                _map_to_dict,
1455
                volumes,
1456
            )
1457
        )
1458

1459
    @staticmethod
1✔
1460
    def resolve_dockerfile_path(dockerfile_path: str) -> str:
1✔
1461
        """If the given path is a directory that contains a Dockerfile, then return the file path to it."""
1462
        rel_path = os.path.join(dockerfile_path, "Dockerfile")
1✔
1463
        if os.path.isdir(dockerfile_path) and os.path.exists(rel_path):
1✔
1464
            return rel_path
1✔
1465
        return dockerfile_path
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc