• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20420150350

19 Dec 2025 10:27AM UTC coverage: 86.92% (+0.007%) from 86.913%
20420150350

push

github

web-flow
Fix Lambda CI log pollution issues (#13546)

2 of 4 new or added lines in 1 file covered. (50.0%)

75 existing lines in 6 files now uncovered.

70016 of 80552 relevant lines covered (86.92%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.52
/localstack-core/localstack/utils/container_utils/container_client.py
1
import dataclasses
1✔
2
import io
1✔
3
import ipaddress
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import shlex
1✔
8
import tarfile
1✔
9
import tempfile
1✔
10
from abc import ABCMeta, abstractmethod
1✔
11
from collections.abc import Callable
1✔
12
from enum import Enum, unique
1✔
13
from pathlib import Path
1✔
14
from typing import (
1✔
15
    Literal,
16
    NamedTuple,
17
    Protocol,
18
    TypeAlias,
19
    TypedDict,
20
)
21

22
import dotenv
1✔
23

24
from localstack import config
1✔
25
from localstack.constants import DEFAULT_VOLUME_DIR
1✔
26
from localstack.utils.collections import HashableList, ensure_list
1✔
27
from localstack.utils.files import TMP_FILES, chmod_r, rm_rf, save_file
1✔
28
from localstack.utils.no_exit_argument_parser import NoExitArgumentParser
1✔
29
from localstack.utils.strings import short_uid
1✔
30

31
LOG = logging.getLogger(__name__)
1✔
32

33
# list of well-known image repo prefixes that should be stripped off to canonicalize image names
34
WELL_KNOWN_IMAGE_REPO_PREFIXES = ("localhost/", "docker.io/library/")
1✔
35

36

37
@unique
1✔
38
class DockerContainerStatus(Enum):
1✔
39
    DOWN = -1
1✔
40
    NON_EXISTENT = 0
1✔
41
    UP = 1
1✔
42
    PAUSED = 2
1✔
43

44

45
class DockerContainerStats(TypedDict):
1✔
46
    """Container usage statistics"""
47

48
    Container: str
1✔
49
    ID: str
1✔
50
    Name: str
1✔
51
    BlockIO: tuple[int, int]
1✔
52
    CPUPerc: float
1✔
53
    MemPerc: float
1✔
54
    MemUsage: tuple[int, int]
1✔
55
    NetIO: tuple[int, int]
1✔
56
    PIDs: int
1✔
57
    SDKStats: dict | None
1✔
58

59

60
class ContainerException(Exception):
1✔
61
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
1✔
62
        self.message = message or "Error during the communication with the docker daemon"
1✔
63
        self.stdout = stdout
1✔
64
        self.stderr = stderr
1✔
65

66

67
class NoSuchObject(ContainerException):
1✔
68
    def __init__(self, object_id: str, message=None, stdout=None, stderr=None) -> None:
1✔
69
        message = message or f"Docker object {object_id} not found"
1✔
70
        super().__init__(message, stdout, stderr)
1✔
71
        self.object_id = object_id
1✔
72

73

74
class NoSuchContainer(ContainerException):
1✔
75
    def __init__(self, container_name_or_id: str, message=None, stdout=None, stderr=None) -> None:
1✔
76
        message = message or f"Docker container {container_name_or_id} not found"
1✔
77
        super().__init__(message, stdout, stderr)
1✔
78
        self.container_name_or_id = container_name_or_id
1✔
79

80

81
class NoSuchImage(ContainerException):
1✔
82
    def __init__(self, image_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
83
        message = message or f"Docker image {image_name} not found"
1✔
84
        super().__init__(message, stdout, stderr)
1✔
85
        self.image_name = image_name
1✔
86

87

88
class NoSuchNetwork(ContainerException):
1✔
89
    def __init__(self, network_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
90
        message = message or f"Docker network {network_name} not found"
1✔
91
        super().__init__(message, stdout, stderr)
1✔
92
        self.network_name = network_name
1✔
93

94

95
class RegistryConnectionError(ContainerException):
1✔
96
    def __init__(self, details: str, message=None, stdout=None, stderr=None) -> None:
1✔
97
        message = message or f"Connection error: {details}"
1✔
98
        super().__init__(message, stdout, stderr)
1✔
99
        self.details = details
1✔
100

101

102
class DockerNotAvailable(ContainerException):
1✔
103
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
1✔
104
        message = message or "Docker not available"
1✔
105
        super().__init__(message, stdout, stderr)
1✔
106

107

108
class AccessDenied(ContainerException):
1✔
109
    def __init__(self, object_name: str, message=None, stdout=None, stderr=None) -> None:
1✔
110
        message = message or f"Access denied to {object_name}"
1✔
111
        super().__init__(message, stdout, stderr)
1✔
112
        self.object_name = object_name
1✔
113

114

115
class CancellableStream(Protocol):
1✔
116
    """Describes a generator that can be closed. Borrowed from ``docker.types.daemon``."""
117

118
    def __iter__(self):
1✔
119
        raise NotImplementedError
120

121
    def __next__(self):
1✔
122
        raise NotImplementedError
123

124
    def close(self):
1✔
125
        raise NotImplementedError
126

127

128
class DockerPlatform(str):
1✔
129
    """Platform in the format ``os[/arch[/variant]]``"""
130

131
    linux_amd64 = "linux/amd64"
1✔
132
    linux_arm64 = "linux/arm64"
1✔
133

134

135
@dataclasses.dataclass
1✔
136
class Ulimit:
1✔
137
    """The ``ulimit`` settings for the container.
138
    See https://www.tutorialspoint.com/setting-ulimit-values-on-docker-containers
139
    """
140

141
    name: str
1✔
142
    soft_limit: int
1✔
143
    hard_limit: int | None = None
1✔
144

145
    def __repr__(self):
146
        """Format: <type>=<soft limit>[:<hard limit>]"""
147
        ulimit_string = f"{self.name}={self.soft_limit}"
148
        if self.hard_limit:
149
            ulimit_string += f":{self.hard_limit}"
150
        return ulimit_string
151

152

153
# defines the type for port mappings (source->target port range)
154
PortRange = list | HashableList
1✔
155
# defines the protocol for a port range ("tcp" or "udp")
156
PortProtocol = str
1✔
157

158

159
class PortMappings:
1✔
160
    """Maps source to target port ranges for Docker port mappings."""
161

162
    # bind host to be used for defining port mappings
163
    bind_host: str
1✔
164
    # maps `from` port range to `to` port range for port mappings
165
    mappings: dict[tuple[PortRange, PortProtocol], list]
1✔
166

167
    def __init__(self, bind_host: str = None):
1✔
168
        self.bind_host = bind_host if bind_host else ""
1✔
169
        self.mappings = {}
1✔
170

171
    def add(
1✔
172
        self,
173
        port: int | PortRange,
174
        mapped: int | PortRange = None,
175
        protocol: PortProtocol = "tcp",
176
    ):
177
        mapped = mapped or port
1✔
178
        if isinstance(port, PortRange):
1✔
179
            for i in range(port[1] - port[0] + 1):
1✔
180
                if isinstance(mapped, PortRange):
1✔
181
                    self.add(port[0] + i, mapped[0] + i, protocol)
1✔
182
                else:
183
                    self.add(port[0] + i, mapped, protocol)
1✔
184
            return
1✔
185
        if port is None or int(port) < 0:
1✔
186
            raise Exception(f"Unable to add mapping for invalid port: {port}")
×
187
        if self.contains(port, protocol):
1✔
188
            return
1✔
189
        bisected_host_port = None
1✔
190
        for (from_range, from_protocol), to_range in self.mappings.items():
1✔
191
            if not from_protocol == protocol:
1✔
192
                continue
1✔
193
            if not self.in_expanded_range(port, from_range):
1✔
194
                continue
1✔
195
            if not self.in_expanded_range(mapped, to_range):
1✔
196
                continue
1✔
197
            from_range_len = from_range[1] - from_range[0]
1✔
198
            to_range_len = to_range[1] - to_range[0]
1✔
199
            is_uniform = from_range_len == to_range_len
1✔
200
            if is_uniform:
1✔
201
                self.expand_range(port, from_range, protocol=protocol, remap=True)
1✔
202
                self.expand_range(mapped, to_range, protocol=protocol)
1✔
203
            else:
204
                if not self.in_range(mapped, to_range):
1✔
205
                    continue
1✔
206
                # extending a 1 to 1 mapping to be many to 1
207
                elif from_range_len == 1:
1✔
208
                    self.expand_range(port, from_range, protocol=protocol, remap=True)
1✔
209
                # splitting a uniform mapping
210
                else:
211
                    bisected_port_index = mapped - to_range[0]
1✔
212
                    bisected_host_port = from_range[0] + bisected_port_index
1✔
213
                    self.bisect_range(mapped, to_range, protocol=protocol)
1✔
214
                    self.bisect_range(bisected_host_port, from_range, protocol=protocol, remap=True)
1✔
215
                    break
1✔
216
            return
1✔
217
        if bisected_host_port is None:
1✔
218
            port_range = [port, port]
1✔
219
        elif bisected_host_port < port:
1✔
220
            port_range = [bisected_host_port, port]
1✔
221
        else:
222
            port_range = [port, bisected_host_port]
×
223
        protocol = str(protocol or "tcp").lower()
1✔
224
        self.mappings[(HashableList(port_range), protocol)] = [mapped, mapped]
1✔
225

226
    def to_str(self) -> str:
1✔
227
        bind_address = f"{self.bind_host}:" if self.bind_host else ""
1✔
228

229
        def entry(k, v):
1✔
230
            from_range, protocol = k
1✔
231
            to_range = v
1✔
232
            # use /<protocol> suffix if the protocol is not"tcp"
233
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
1✔
234
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
1✔
235
                return f"-p {bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"
1✔
236
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
1✔
237
                return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}{protocol_suffix}"
1✔
238
            return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}"
1✔
239

240
        return " ".join([entry(k, v) for k, v in self.mappings.items()])
1✔
241

242
    def to_list(self) -> list[str]:  # TODO test
1✔
243
        bind_address = f"{self.bind_host}:" if self.bind_host else ""
1✔
244

245
        def entry(k, v):
1✔
246
            from_range, protocol = k
1✔
247
            to_range = v
1✔
248
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
1✔
249
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
1✔
250
                return ["-p", f"{bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"]
1✔
251
            return [
1✔
252
                "-p",
253
                f"{bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}",
254
            ]
255

256
        return [item for k, v in self.mappings.items() for item in entry(k, v)]
1✔
257

258
    def to_dict(self) -> dict[str, tuple[str, int | list[int]] | int]:
1✔
259
        bind_address = self.bind_host or ""
1✔
260

261
        def bind_port(bind_address, host_port):
1✔
262
            if host_port == 0:
1✔
263
                return None
1✔
264
            elif bind_address:
1✔
265
                return (bind_address, host_port)
1✔
266
            else:
267
                return host_port
1✔
268

269
        def entry(k, v):
1✔
270
            from_range, protocol = k
1✔
271
            to_range = v
1✔
272
            protocol_suffix = f"/{protocol}"
1✔
273
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
1✔
274
                container_port = to_range[0]
1✔
275
                host_ports = list(range(from_range[0], from_range[1] + 1))
1✔
276
                return [
1✔
277
                    (
278
                        f"{container_port}{protocol_suffix}",
279
                        (bind_address, host_ports) if bind_address else host_ports,
280
                    )
281
                ]
282
            return [
1✔
283
                (
284
                    f"{container_port}{protocol_suffix}",
285
                    bind_port(bind_address, host_port),
286
                )
287
                for container_port, host_port in zip(
288
                    range(to_range[0], to_range[1] + 1),
289
                    range(from_range[0], from_range[1] + 1),
290
                    strict=False,
291
                )
292
            ]
293

294
        items = [item for k, v in self.mappings.items() for item in entry(k, v)]
1✔
295
        return dict(items)
1✔
296

297
    def contains(self, port: int, protocol: PortProtocol = "tcp") -> bool:
1✔
298
        for from_range_w_protocol, to_range in self.mappings.items():
1✔
299
            from_protocol = from_range_w_protocol[1]
1✔
300
            if from_protocol == protocol:
1✔
301
                from_range = from_range_w_protocol[0]
1✔
302
                if self.in_range(port, from_range):
1✔
303
                    return True
1✔
304

305
    def in_range(self, port: int, range: PortRange) -> bool:
1✔
306
        return port >= range[0] and port <= range[1]
1✔
307

308
    def in_expanded_range(self, port: int, range: PortRange):
1✔
309
        return port >= range[0] - 1 and port <= range[1] + 1
1✔
310

311
    def expand_range(
1✔
312
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
313
    ):
314
        """
315
        Expand the given port range by the given port. If remap==True, put the updated range into self.mappings
316
        """
317
        if self.in_range(port, range):
1✔
318
            return
1✔
319
        new_range = list(range) if remap else range
1✔
320
        if port == range[0] - 1:
1✔
321
            new_range[0] = port
×
322
        elif port == range[1] + 1:
1✔
323
            new_range[1] = port
1✔
324
        else:
325
            raise Exception(f"Unable to add port {port} to existing range {range}")
×
326
        if remap:
1✔
327
            self._remap_range(range, new_range, protocol=protocol)
1✔
328

329
    def bisect_range(
1✔
330
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
331
    ):
332
        """
333
        Bisect a port range, at the provided port. This is needed in some cases when adding a
334
        non-uniform host to port mapping adjacent to an existing port range.
335
        If remap==True, put the updated range into self.mappings
336
        """
337
        if not self.in_range(port, range):
1✔
338
            return
×
339
        new_range = list(range) if remap else range
1✔
340
        if port == range[0]:
1✔
341
            new_range[0] = port + 1
×
342
        else:
343
            new_range[1] = port - 1
1✔
344
        if remap:
1✔
345
            self._remap_range(range, new_range, protocol)
1✔
346

347
    def _remap_range(self, old_key: PortRange, new_key: PortRange, protocol: PortProtocol):
1✔
348
        self.mappings[(HashableList(new_key), protocol)] = self.mappings.pop(
1✔
349
            (HashableList(old_key), protocol)
350
        )
351

352
    def __repr__(self):
353
        return f"<PortMappings: {self.to_dict()}>"
354

355

356
SimpleVolumeBind = tuple[str, str]
1✔
357
"""Type alias for a simple version of VolumeBind"""
1✔
358

359

360
@dataclasses.dataclass
1✔
361
class Mount:
1✔
362
    def to_str(self) -> str:
1✔
363
        return str(self)
×
364

365

366
@dataclasses.dataclass
1✔
367
class BindMount(Mount):
1✔
368
    """Represents a --volume argument run/create command. When using VolumeBind to bind-mount a file or directory
369
    that does not yet exist on the Docker host, -v creates the endpoint for you. It is always created as a directory.
370
    """
371

372
    host_dir: str
1✔
373
    container_dir: str
1✔
374
    read_only: bool = False
1✔
375

376
    def to_str(self) -> str:
1✔
377
        args = []
1✔
378

379
        if self.host_dir:
1✔
380
            args.append(self.host_dir)
1✔
381

382
        if not self.container_dir:
1✔
383
            raise ValueError("no container dir specified")
×
384

385
        args.append(self.container_dir)
1✔
386

387
        if self.read_only:
1✔
388
            args.append("ro")
×
389

390
        return ":".join(args)
1✔
391

392
    def to_docker_sdk_parameters(self) -> tuple[str, dict[str, str]]:
1✔
393
        return str(self.host_dir), {
1✔
394
            "bind": self.container_dir,
395
            "mode": "ro" if self.read_only else "rw",
396
        }
397

398
    @classmethod
1✔
399
    def parse(cls, param: str) -> "BindMount":
1✔
400
        parts = param.split(":")
1✔
401
        if 1 > len(parts) > 3:
1✔
402
            raise ValueError(f"Cannot parse volume bind {param}")
×
403

404
        volume = cls(parts[0], parts[1])
1✔
405
        if len(parts) == 3:
1✔
406
            if "ro" in parts[2].split(","):
1✔
407
                volume.read_only = True
1✔
408
        return volume
1✔
409

410

411
@dataclasses.dataclass
1✔
412
class VolumeDirMount(Mount):
1✔
413
    volume_path: str
1✔
414
    """
1✔
415
    Absolute path inside /var/lib/localstack to mount into the container
416
    """
417
    container_path: str
1✔
418
    """
1✔
419
    Target path inside the started container
420
    """
421
    read_only: bool = False
1✔
422

423
    def to_str(self) -> str:
1✔
424
        self._validate()
1✔
425
        from localstack.utils.docker_utils import get_host_path_for_path_in_docker
1✔
426

427
        host_dir = get_host_path_for_path_in_docker(self.volume_path)
1✔
428
        return f"{host_dir}:{self.container_path}{':ro' if self.read_only else ''}"
1✔
429

430
    def _validate(self):
1✔
431
        if not self.volume_path:
1✔
432
            raise ValueError("no volume dir specified")
×
433
        if config.is_in_docker and not self.volume_path.startswith(DEFAULT_VOLUME_DIR):
1✔
434
            raise ValueError(f"volume dir not starting with {DEFAULT_VOLUME_DIR}")
×
435
        if not self.container_path:
1✔
436
            raise ValueError("no container dir specified")
×
437

438
    def to_docker_sdk_parameters(self) -> tuple[str, dict[str, str]]:
1✔
439
        self._validate()
1✔
440
        from localstack.utils.docker_utils import get_host_path_for_path_in_docker
1✔
441

442
        host_dir = get_host_path_for_path_in_docker(self.volume_path)
1✔
443
        return host_dir, {
1✔
444
            "bind": self.container_path,
445
            "mode": "ro" if self.read_only else "rw",
446
        }
447

448

449
VolumeMappingSpecification: TypeAlias = SimpleVolumeBind | Mount
1✔
450

451

452
class VolumeMappings:
1✔
453
    mappings: list[VolumeMappingSpecification]
1✔
454

455
    def __init__(
1✔
456
        self,
457
        mappings: list[VolumeMappingSpecification] = None,
458
    ):
459
        self.mappings = mappings if mappings is not None else []
1✔
460

461
    def add(self, mapping: VolumeMappingSpecification):
1✔
462
        self.append(mapping)
1✔
463

464
    def append(self, mapping: VolumeMappingSpecification):
1✔
465
        self.mappings.append(mapping)
1✔
466

467
    def find_target_mapping(self, container_dir: str) -> VolumeMappingSpecification | None:
1✔
468
        """
469
        Looks through the volumes and returns the one where the container dir matches ``container_dir``.
470
        Returns None if there is no volume mapping to the given container directory.
471

472
        :param container_dir: the target of the volume mapping, i.e., the path in the container
473
        :return: the volume mapping or None
474
        """
475
        for volume in self.mappings:
1✔
476
            target_dir = volume[1] if isinstance(volume, tuple) else volume.container_dir
1✔
477
            if container_dir == target_dir:
1✔
478
                return volume
×
479
        return None
1✔
480

481
    def __iter__(self):
1✔
482
        return self.mappings.__iter__()
1✔
483

484
    def __repr__(self):
485
        return self.mappings.__repr__()
486

487
    def __len__(self):
1✔
488
        return len(self.mappings)
1✔
489

490
    def __getitem__(self, item: int):
1✔
491
        return self.mappings[item]
×
492

493

494
VolumeType = Literal["bind", "volume"]
1✔
495

496

497
class VolumeInfo(NamedTuple):
1✔
498
    """Container volume information."""
499

500
    type: VolumeType
1✔
501
    source: str
1✔
502
    destination: str
1✔
503
    mode: str
1✔
504
    rw: bool
1✔
505
    propagation: str
1✔
506
    name: str | None = None
1✔
507
    driver: str | None = None
1✔
508

509

510
@dataclasses.dataclass
1✔
511
class LogConfig:
1✔
512
    type: Literal["json-file", "syslog", "journald", "gelf", "fluentd", "none", "awslogs", "splunk"]
1✔
513
    config: dict[str, str] = dataclasses.field(default_factory=dict)
1✔
514

515

516
@dataclasses.dataclass
1✔
517
class ContainerConfiguration:
1✔
518
    image_name: str
1✔
519
    name: str | None = None
1✔
520
    volumes: VolumeMappings = dataclasses.field(default_factory=VolumeMappings)
1✔
521
    ports: PortMappings = dataclasses.field(default_factory=PortMappings)
1✔
522
    exposed_ports: list[str] = dataclasses.field(default_factory=list)
1✔
523
    entrypoint: list[str] | str | None = None
1✔
524
    additional_flags: str | None = None
1✔
525
    command: list[str] | None = None
1✔
526
    env_vars: dict[str, str] = dataclasses.field(default_factory=dict)
1✔
527

528
    privileged: bool = False
1✔
529
    remove: bool = False
1✔
530
    interactive: bool = False
1✔
531
    tty: bool = False
1✔
532
    detach: bool = False
1✔
533

534
    stdin: str | None = None
1✔
535
    user: str | None = None
1✔
536
    cap_add: list[str] | None = None
1✔
537
    cap_drop: list[str] | None = None
1✔
538
    security_opt: list[str] | None = None
1✔
539
    network: str | None = None
1✔
540
    dns: str | None = None
1✔
541
    workdir: str | None = None
1✔
542
    platform: str | None = None
1✔
543
    ulimits: list[Ulimit] | None = None
1✔
544
    labels: dict[str, str] | None = None
1✔
545
    init: bool | None = None
1✔
546
    log_config: LogConfig | None = None
1✔
547
    cpu_shares: int | None = None
1✔
548
    mem_limit: int | str | None = None
1✔
549

550

551
class ContainerConfigurator(Protocol):
1✔
552
    """Protocol for functional configurators. A ContainerConfigurator modifies, when called,
553
    a ContainerConfiguration in place."""
554

555
    def __call__(self, configuration: ContainerConfiguration):
1✔
556
        """
557
        Modify the given container configuration.
558

559
        :param configuration: the configuration to modify
560
        """
UNCOV
561
        ...
×
562

563

564
@dataclasses.dataclass
1✔
565
class DockerRunFlags:
1✔
566
    """Class to capture Docker run/create flags for a container.
567
    run: https://docs.docker.com/engine/reference/commandline/run/
568
    create: https://docs.docker.com/engine/reference/commandline/create/
569
    """
570

571
    env_vars: dict[str, str] | None
1✔
572
    extra_hosts: dict[str, str] | None
1✔
573
    labels: dict[str, str] | None
1✔
574
    volumes: list[SimpleVolumeBind] | None
1✔
575
    network: str | None
1✔
576
    platform: DockerPlatform | None
1✔
577
    privileged: bool | None
1✔
578
    ports: PortMappings | None
1✔
579
    ulimits: list[Ulimit] | None
1✔
580
    user: str | None
1✔
581
    dns: list[str] | None
1✔
582

583

584
class RegistryResolverStrategy(Protocol):
1✔
585
    def resolve(self, image_name: str) -> str: ...
1✔
586

587

588
class HardCodedResolver:
1✔
589
    def resolve(self, image_name: str) -> str:  # noqa
1✔
590
        return image_name
1✔
591

592

593
# TODO: remove Docker/Podman compatibility switches (in particular strip_wellknown_repo_prefixes=...)
594
#  from the container client base interface and introduce derived Podman client implementations instead!
595
class ContainerClient(metaclass=ABCMeta):
1✔
596
    registry_resolver_strategy: RegistryResolverStrategy = HardCodedResolver()
1✔
597

598
    @abstractmethod
1✔
599
    def get_system_info(self) -> dict:
1✔
600
        """Returns the docker system-wide information as dictionary (``docker info``)."""
601

602
    def get_system_id(self) -> str:
1✔
603
        """Returns the unique and stable ID of the docker daemon."""
604
        return self.get_system_info()["ID"]
1✔
605

606
    @abstractmethod
1✔
607
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
1✔
608
        """Returns the status of the container with the given name"""
UNCOV
609
        pass
×
610

611
    def get_container_stats(self, container_name: str) -> DockerContainerStats:
1✔
612
        """Returns the usage statistics of the container with the given name"""
UNCOV
613
        pass
×
614

615
    def get_networks(self, container_name: str) -> list[str]:
1✔
616
        LOG.debug("Getting networks for container: %s", container_name)
1✔
617
        container_attrs = self.inspect_container(container_name_or_id=container_name)
1✔
618
        return list(container_attrs["NetworkSettings"].get("Networks", {}).keys())
1✔
619

620
    def get_container_ipv4_for_network(
1✔
621
        self, container_name_or_id: str, container_network: str
622
    ) -> str:
623
        """
624
        Returns the IPv4 address for the container on the interface connected to the given network
625
        :param container_name_or_id: Container to inspect
626
        :param container_network: Network the IP address will belong to
627
        :return: IP address of the given container on the interface connected to the given network
628
        """
629
        LOG.debug(
1✔
630
            "Getting ipv4 address for container %s in network %s.",
631
            container_name_or_id,
632
            container_network,
633
        )
634
        # we always need the ID for this
635
        container_id = self.get_container_id(container_name=container_name_or_id)
1✔
636
        network_attrs = self.inspect_network(container_network)
1✔
637
        containers = network_attrs.get("Containers") or {}
1✔
638
        if container_id not in containers:
1✔
639
            LOG.debug("Network attributes: %s", network_attrs)
1✔
640
            try:
1✔
641
                inspection = self.inspect_container(container_name_or_id=container_name_or_id)
1✔
642
                LOG.debug("Container %s Attributes: %s", container_name_or_id, inspection)
1✔
643
                logs = self.get_container_logs(container_name_or_id=container_name_or_id)
1✔
644
                LOG.debug("Container %s Logs: %s", container_name_or_id, logs)
1✔
UNCOV
645
            except ContainerException as e:
×
UNCOV
646
                LOG.debug("Cannot inspect container %s: %s", container_name_or_id, e)
×
647
            raise ContainerException(
1✔
648
                "Container %s is not connected to target network %s",
649
                container_name_or_id,
650
                container_network,
651
            )
652
        try:
1✔
653
            ip = str(ipaddress.IPv4Interface(containers[container_id]["IPv4Address"]).ip)
1✔
UNCOV
654
        except Exception as e:
×
UNCOV
655
            raise ContainerException(
×
656
                f"Unable to detect IP address for container {container_name_or_id} in network {container_network}: {e}"
657
            )
658
        return ip
1✔
659

660
    @abstractmethod
1✔
661
    def stop_container(self, container_name: str, timeout: int = 10):
1✔
662
        """Stops container with given name
663
        :param container_name: Container identifier (name or id) of the container to be stopped
664
        :param timeout: Timeout after which SIGKILL is sent to the container.
665
        """
666

667
    @abstractmethod
1✔
668
    def restart_container(self, container_name: str, timeout: int = 10):
1✔
669
        """Restarts a container with the given name.
670
        :param container_name: Container identifier
671
        :param timeout: Seconds to wait for stop before killing the container
672
        """
673

674
    @abstractmethod
1✔
675
    def pause_container(self, container_name: str):
1✔
676
        """Pauses a container with the given name."""
677

678
    @abstractmethod
1✔
679
    def unpause_container(self, container_name: str):
1✔
680
        """Unpauses a container with the given name."""
681

682
    @abstractmethod
1✔
683
    def remove_container(
1✔
684
        self, container_name: str, force=True, check_existence=False, volumes=False
685
    ) -> None:
686
        """Removes container
687

688
        :param container_name: Name of the container
689
        :param force: Force the removal of a running container (uses SIGKILL)
690
        :param check_existence: Return if container doesn't exist
691
        :param volumes: Remove anonymous volumes associated with the container
692
        """
693

694
    @abstractmethod
1✔
695
    def remove_image(self, image: str, force: bool = True) -> None:
1✔
696
        """Removes an image with given name
697

698
        :param image: Image name and tag
699
        :param force: Force removal
700
        """
701

702
    @abstractmethod
1✔
703
    def list_containers(self, filter: list[str] | str | None = None, all=True) -> list[dict]:
1✔
704
        """List all containers matching the given filters
705

706
        :return: A list of dicts with keys id, image, name, labels, status
707
        """
708

709
    def get_running_container_names(self) -> list[str]:
1✔
710
        """Returns a list of the names of all running containers"""
711
        return self.__get_container_names(return_all=False)
1✔
712

713
    def get_all_container_names(self) -> list[str]:
1✔
714
        """Returns a list of the names of all containers including stopped ones"""
715
        return self.__get_container_names(return_all=True)
1✔
716

717
    def is_container_running(self, container_name: str) -> bool:
1✔
718
        """Checks whether a container with a given name is currently running"""
719
        return container_name in self.get_running_container_names()
1✔
720

721
    def create_file_in_container(
1✔
722
        self,
723
        container_name,
724
        file_contents: bytes,
725
        container_path: str,
726
        chmod_mode: int | None = None,
727
    ) -> None:
728
        """
729
        Create a file in container with the provided content. Provide the 'chmod_mode' argument if you want the file to have specific permissions.
730
        """
731
        with tempfile.NamedTemporaryFile() as tmp:
1✔
732
            tmp.write(file_contents)
1✔
733
            tmp.flush()
1✔
734
            if chmod_mode is not None:
1✔
UNCOV
735
                chmod_r(tmp.name, chmod_mode)
×
736
            self.copy_into_container(
1✔
737
                container_name=container_name,
738
                local_path=tmp.name,
739
                container_path=container_path,
740
            )
741

742
    @abstractmethod
1✔
743
    def copy_into_container(
1✔
744
        self, container_name: str, local_path: str, container_path: str
745
    ) -> None:
746
        """Copy contents of the given local path into the container"""
747

748
    @abstractmethod
1✔
749
    def copy_from_container(
1✔
750
        self, container_name: str, local_path: str, container_path: str
751
    ) -> None:
752
        """Copy contents of the given container to the host"""
753

754
    @abstractmethod
1✔
755
    def pull_image(
1✔
756
        self,
757
        docker_image: str,
758
        platform: DockerPlatform | None = None,
759
        log_handler: Callable[[str], None] | None = None,
760
    ) -> None:
761
        """
762
        Pulls an image with a given name from a Docker registry
763

764
        :log_handler: Optional parameter that can be used to process the logs. Logs will be streamed if possible, but this is not guaranteed.
765
        """
766

767
    @abstractmethod
1✔
768
    def push_image(self, docker_image: str) -> None:
1✔
769
        """Pushes an image with a given name to a Docker registry"""
770

771
    @abstractmethod
1✔
772
    def build_image(
1✔
773
        self,
774
        dockerfile_path: str,
775
        image_name: str,
776
        context_path: str = None,
777
        platform: DockerPlatform | None = None,
778
    ) -> str:
779
        """Builds an image from the given Dockerfile
780

781
        :param dockerfile_path: Path to Dockerfile, or a directory that contains a Dockerfile
782
        :param image_name: Name of the image to be built
783
        :param context_path: Path for build context (defaults to dirname of Dockerfile)
784
        :param platform: Target platform for build (defaults to platform of Docker host)
785
        :return: Build logs as a string.
786
        """
787

788
    @abstractmethod
1✔
789
    def tag_image(self, source_ref: str, target_name: str) -> None:
1✔
790
        """Tags an image with a new name
791

792
        :param source_ref: Name or ID of the image to be tagged
793
        :param target_name: New name (tag) of the tagged image
794
        """
795

796
    @abstractmethod
1✔
797
    def get_docker_image_names(
1✔
798
        self,
799
        strip_latest: bool = True,
800
        include_tags: bool = True,
801
        strip_wellknown_repo_prefixes: bool = True,
802
    ) -> list[str]:
803
        """
804
        Get all names of docker images available to the container engine
805
        :param strip_latest: return images both with and without :latest tag
806
        :param include_tags: include tags of the images in the names
807
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
808
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
809
        :return: List of image names
810
        """
811

812
    @abstractmethod
1✔
813
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
1✔
814
        """Get all logs of a given container"""
815

816
    @abstractmethod
1✔
817
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
1✔
818
        """Returns a blocking generator you can iterate over to retrieve log output as it happens."""
819

820
    @abstractmethod
1✔
821
    def inspect_container(self, container_name_or_id: str) -> dict[str, dict | str]:
1✔
822
        """Get detailed attributes of a container.
823

824
        :return: Dict containing docker attributes as returned by the daemon
825
        """
826

827
    def inspect_container_volumes(self, container_name_or_id) -> list[VolumeInfo]:
1✔
828
        """Return information about the volumes mounted into the given container.
829

830
        :param container_name_or_id: the container name or id
831
        :return: a list of volumes
832
        """
833
        volumes = []
1✔
834
        for doc in self.inspect_container(container_name_or_id)["Mounts"]:
1✔
835
            volumes.append(VolumeInfo(**{k.lower(): v for k, v in doc.items()}))
1✔
836

837
        return volumes
1✔
838

839
    @abstractmethod
1✔
840
    def inspect_image(
1✔
841
        self, image_name: str, pull: bool = True, strip_wellknown_repo_prefixes: bool = True
842
    ) -> dict[str, dict | list | str]:
843
        """Get detailed attributes of an image.
844

845
        :param image_name: Image name to inspect
846
        :param pull: Whether to pull image if not existent
847
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
848
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
849
        :return: Dict containing docker attributes as returned by the daemon
850
        """
851

852
    @abstractmethod
1✔
853
    def create_network(self, network_name: str) -> str:
1✔
854
        """
855
        Creates a network with the given name
856
        :param network_name: Name of the network
857
        :return Network ID
858
        """
859

860
    @abstractmethod
1✔
861
    def delete_network(self, network_name: str) -> None:
1✔
862
        """
863
        Delete a network with the given name
864
        :param network_name: Name of the network
865
        """
866

867
    @abstractmethod
1✔
868
    def inspect_network(self, network_name: str) -> dict[str, dict | str]:
1✔
869
        """Get detailed attributes of an network.
870

871
        :return: Dict containing docker attributes as returned by the daemon
872
        """
873

874
    @abstractmethod
1✔
875
    def connect_container_to_network(
1✔
876
        self,
877
        network_name: str,
878
        container_name_or_id: str,
879
        aliases: list | None = None,
880
        link_local_ips: list[str] = None,
881
    ) -> None:
882
        """
883
        Connects a container to a given network
884
        :param network_name: Network to connect the container to
885
        :param container_name_or_id: Container to connect to the network
886
        :param aliases: List of dns names the container should be available under in the network
887
        :param link_local_ips: List of link-local (IPv4 or IPv6) addresses
888
        """
889

890
    @abstractmethod
1✔
891
    def disconnect_container_from_network(
1✔
892
        self, network_name: str, container_name_or_id: str
893
    ) -> None:
894
        """
895
        Disconnects a container from a given network
896
        :param network_name: Network to disconnect the container from
897
        :param container_name_or_id: Container to disconnect from the network
898
        """
899

900
    def get_container_name(self, container_id: str) -> str:
1✔
901
        """Get the name of a container by a given identifier"""
902
        return self.inspect_container(container_id)["Name"].lstrip("/")
1✔
903

904
    def get_container_id(self, container_name: str) -> str:
1✔
905
        """Get the id of a container by a given name"""
906
        return self.inspect_container(container_name)["Id"]
1✔
907

908
    @abstractmethod
1✔
909
    def get_container_ip(self, container_name_or_id: str) -> str:
1✔
910
        """Get the IP address of a given container
911

912
        If container has multiple networks, it will return the IP of the first
913
        """
914

915
    def get_image_cmd(self, docker_image: str, pull: bool = True) -> list[str]:
1✔
916
        """Get the command for the given image
917
        :param docker_image: Docker image to inspect
918
        :param pull: Whether to pull if image is not present
919
        :return: Image command in its array form
920
        """
921
        cmd_list = self.inspect_image(docker_image, pull)["Config"]["Cmd"] or []
1✔
922
        return cmd_list
1✔
923

924
    def get_image_entrypoint(self, docker_image: str, pull: bool = True) -> str:
1✔
925
        """Get the entry point for the given image
926
        :param docker_image: Docker image to inspect
927
        :param pull: Whether to pull if image is not present
928
        :return: Image entrypoint
929
        """
930
        LOG.debug("Getting the entrypoint for image: %s", docker_image)
1✔
931
        entrypoint_list = self.inspect_image(docker_image, pull)["Config"].get("Entrypoint") or []
1✔
932
        return shlex.join(entrypoint_list)
1✔
933

934
    @abstractmethod
1✔
935
    def has_docker(self) -> bool:
1✔
936
        """Check if system has docker available"""
937

938
    @abstractmethod
1✔
939
    def commit(
1✔
940
        self,
941
        container_name_or_id: str,
942
        image_name: str,
943
        image_tag: str,
944
    ):
945
        """Create an image from a running container.
946

947
        :param container_name_or_id: Source container
948
        :param image_name: Destination image name
949
        :param image_tag: Destination image tag
950
        """
951

952
    def create_container_from_config(self, container_config: ContainerConfiguration) -> str:
1✔
953
        """
954
        Similar to create_container, but allows passing the whole ContainerConfiguration
955
        :param container_config: ContainerConfiguration how to start the container
956
        :return: Container ID
957
        """
958
        return self.create_container(
1✔
959
            image_name=container_config.image_name,
960
            name=container_config.name,
961
            entrypoint=container_config.entrypoint,
962
            remove=container_config.remove,
963
            interactive=container_config.interactive,
964
            tty=container_config.tty,
965
            command=container_config.command,
966
            volumes=container_config.volumes,
967
            ports=container_config.ports,
968
            exposed_ports=container_config.exposed_ports,
969
            env_vars=container_config.env_vars,
970
            user=container_config.user,
971
            cap_add=container_config.cap_add,
972
            cap_drop=container_config.cap_drop,
973
            security_opt=container_config.security_opt,
974
            network=container_config.network,
975
            dns=container_config.dns,
976
            additional_flags=container_config.additional_flags,
977
            workdir=container_config.workdir,
978
            privileged=container_config.privileged,
979
            platform=container_config.platform,
980
            labels=container_config.labels,
981
            ulimits=container_config.ulimits,
982
            init=container_config.init,
983
            log_config=container_config.log_config,
984
            cpu_shares=container_config.cpu_shares,
985
            mem_limit=container_config.mem_limit,
986
        )
987

988
    @abstractmethod
1✔
989
    def create_container(
1✔
990
        self,
991
        image_name: str,
992
        *,
993
        name: str | None = None,
994
        entrypoint: list[str] | str | None = None,
995
        remove: bool = False,
996
        interactive: bool = False,
997
        tty: bool = False,
998
        detach: bool = False,
999
        command: list[str] | str | None = None,
1000
        volumes: VolumeMappings | list[SimpleVolumeBind] | None = None,
1001
        ports: PortMappings | None = None,
1002
        exposed_ports: list[str] | None = None,
1003
        env_vars: dict[str, str] | None = None,
1004
        user: str | None = None,
1005
        cap_add: list[str] | None = None,
1006
        cap_drop: list[str] | None = None,
1007
        security_opt: list[str] | None = None,
1008
        network: str | None = None,
1009
        dns: str | list[str] | None = None,
1010
        additional_flags: str | None = None,
1011
        workdir: str | None = None,
1012
        privileged: bool | None = None,
1013
        labels: dict[str, str] | None = None,
1014
        platform: DockerPlatform | None = None,
1015
        ulimits: list[Ulimit] | None = None,
1016
        init: bool | None = None,
1017
        log_config: LogConfig | None = None,
1018
        cpu_shares: int | None = None,
1019
        mem_limit: int | str | None = None,
1020
    ) -> str:
1021
        """Creates a container with the given image
1022

1023
        :return: Container ID
1024
        """
1025

1026
    @abstractmethod
1✔
1027
    def run_container(
1✔
1028
        self,
1029
        image_name: str,
1030
        stdin: bytes = None,
1031
        *,
1032
        name: str | None = None,
1033
        entrypoint: str | None = None,
1034
        remove: bool = False,
1035
        interactive: bool = False,
1036
        tty: bool = False,
1037
        detach: bool = False,
1038
        command: list[str] | str | None = None,
1039
        volumes: VolumeMappings | list[SimpleVolumeBind] | None = None,
1040
        ports: PortMappings | None = None,
1041
        exposed_ports: list[str] | None = None,
1042
        env_vars: dict[str, str] | None = None,
1043
        user: str | None = None,
1044
        cap_add: list[str] | None = None,
1045
        cap_drop: list[str] | None = None,
1046
        security_opt: list[str] | None = None,
1047
        network: str | None = None,
1048
        dns: str | None = None,
1049
        additional_flags: str | None = None,
1050
        workdir: str | None = None,
1051
        labels: dict[str, str] | None = None,
1052
        platform: DockerPlatform | None = None,
1053
        privileged: bool | None = None,
1054
        ulimits: list[Ulimit] | None = None,
1055
        init: bool | None = None,
1056
        log_config: LogConfig | None = None,
1057
        cpu_shares: int | None = None,
1058
        mem_limit: int | str | None = None,
1059
    ) -> tuple[bytes, bytes]:
1060
        """Creates and runs a given docker container
1061

1062
        :return: A tuple (stdout, stderr)
1063
        """
1064

1065
    def run_container_from_config(
1✔
1066
        self, container_config: ContainerConfiguration
1067
    ) -> tuple[bytes, bytes]:
1068
        """Like ``run_container`` but uses the parameters from the configuration."""
1069

UNCOV
1070
        return self.run_container(
×
1071
            image_name=container_config.image_name,
1072
            stdin=container_config.stdin,
1073
            name=container_config.name,
1074
            entrypoint=container_config.entrypoint,
1075
            remove=container_config.remove,
1076
            interactive=container_config.interactive,
1077
            tty=container_config.tty,
1078
            detach=container_config.detach,
1079
            command=container_config.command,
1080
            volumes=container_config.volumes,
1081
            ports=container_config.ports,
1082
            exposed_ports=container_config.exposed_ports,
1083
            env_vars=container_config.env_vars,
1084
            user=container_config.user,
1085
            cap_add=container_config.cap_add,
1086
            cap_drop=container_config.cap_drop,
1087
            security_opt=container_config.security_opt,
1088
            network=container_config.network,
1089
            dns=container_config.dns,
1090
            additional_flags=container_config.additional_flags,
1091
            workdir=container_config.workdir,
1092
            platform=container_config.platform,
1093
            privileged=container_config.privileged,
1094
            ulimits=container_config.ulimits,
1095
            init=container_config.init,
1096
            log_config=container_config.log_config,
1097
            cpu_shares=container_config.cpu_shares,
1098
            mem_limit=container_config.mem_limit,
1099
        )
1100

1101
    @abstractmethod
1✔
1102
    def exec_in_container(
1✔
1103
        self,
1104
        container_name_or_id: str,
1105
        command: list[str] | str,
1106
        interactive: bool = False,
1107
        detach: bool = False,
1108
        env_vars: dict[str, str | None] | None = None,
1109
        stdin: bytes | None = None,
1110
        user: str | None = None,
1111
        workdir: str | None = None,
1112
    ) -> tuple[bytes, bytes]:
1113
        """Execute a given command in a container
1114

1115
        :return: A tuple (stdout, stderr)
1116
        """
1117

1118
    @abstractmethod
1✔
1119
    def start_container(
1✔
1120
        self,
1121
        container_name_or_id: str,
1122
        stdin: bytes = None,
1123
        interactive: bool = False,
1124
        attach: bool = False,
1125
        flags: str | None = None,
1126
    ) -> tuple[bytes, bytes]:
1127
        """Start a given, already created container
1128

1129
        :return: A tuple (stdout, stderr) if attach or interactive is set, otherwise a tuple (b"container_name_or_id", b"")
1130
        """
1131

1132
    @abstractmethod
1✔
1133
    def attach_to_container(self, container_name_or_id: str):
1✔
1134
        """
1135
        Attach local standard input, output, and error streams to a running container
1136
        """
1137

1138
    @abstractmethod
1✔
1139
    def login(self, username: str, password: str, registry: str | None = None) -> None:
1✔
1140
        """
1141
        Login into an OCI registry
1142

1143
        :param username: Username for the registry
1144
        :param password: Password / token for the registry
1145
        :param registry: Registry url
1146
        """
1147

1148
    def __get_container_names(self, return_all: bool) -> list[str]:
1✔
1149
        result = self.list_containers(all=return_all)
1✔
1150
        result = [container["name"] for container in result]
1✔
1151
        return result
1✔
1152

1153

1154
class Util:
1✔
1155
    MAX_ENV_ARGS_LENGTH = 20000
1✔
1156

1157
    @staticmethod
1✔
1158
    def format_env_vars(key: str, value: str | None):
1✔
1159
        if value is None:
1✔
UNCOV
1160
            return key
×
1161
        return f"{key}={value}"
1✔
1162

1163
    @classmethod
1✔
1164
    def create_env_vars_file_flag(cls, env_vars: dict) -> tuple[list[str], str | None]:
1✔
1165
        if not env_vars:
1✔
UNCOV
1166
            return [], None
×
1167
        result = []
1✔
1168
        env_vars = dict(env_vars)
1✔
1169
        env_file = None
1✔
1170
        if len(str(env_vars)) > cls.MAX_ENV_ARGS_LENGTH:
1✔
1171
            # default ARG_MAX=131072 in Docker - let's create an env var file if the string becomes too long...
1172
            env_file = cls.mountable_tmp_file()
×
UNCOV
1173
            env_content = ""
×
UNCOV
1174
            for name, value in dict(env_vars).items():
×
UNCOV
1175
                if len(value) > cls.MAX_ENV_ARGS_LENGTH:
×
1176
                    # each line in the env file has a max size as well (error "bufio.Scanner: token too long")
UNCOV
1177
                    continue
×
UNCOV
1178
                env_vars.pop(name)
×
UNCOV
1179
                value = value.replace("\n", "\\")
×
UNCOV
1180
                env_content += f"{cls.format_env_vars(name, value)}\n"
×
UNCOV
1181
            save_file(env_file, env_content)
×
UNCOV
1182
            result += ["--env-file", env_file]
×
1183

1184
        env_vars_res = [
1✔
1185
            item for k, v in env_vars.items() for item in ["-e", cls.format_env_vars(k, v)]
1186
        ]
1187
        result += env_vars_res
1✔
1188
        return result, env_file
1✔
1189

1190
    @staticmethod
1✔
1191
    def rm_env_vars_file(env_vars_file) -> None:
1✔
1192
        if env_vars_file:
1✔
UNCOV
1193
            return rm_rf(env_vars_file)
×
1194

1195
    @staticmethod
1✔
1196
    def mountable_tmp_file():
1✔
UNCOV
1197
        f = os.path.join(config.dirs.mounted_tmp, short_uid())
×
UNCOV
1198
        TMP_FILES.append(f)
×
UNCOV
1199
        return f
×
1200

1201
    @staticmethod
1✔
1202
    def append_without_latest(image_names: list[str]):
1✔
1203
        suffix = ":latest"
1✔
1204
        for image in list(image_names):
1✔
1205
            if image.endswith(suffix):
1✔
1206
                image_names.append(image[: -len(suffix)])
1✔
1207

1208
    @staticmethod
1✔
1209
    def strip_wellknown_repo_prefixes(image_names: list[str]) -> list[str]:
1✔
1210
        """
1211
        Remove well-known repo prefixes like `localhost/` or `docker.io/library/` from the list of given
1212
        image names. This is mostly to ensure compatibility of our Docker client with Podman API responses.
1213
        :return: a copy of the list of image names, with well-known repo prefixes removed
1214
        """
1215
        result = []
1✔
1216
        for image in image_names:
1✔
1217
            for prefix in WELL_KNOWN_IMAGE_REPO_PREFIXES:
1✔
1218
                if image.startswith(prefix):
1✔
UNCOV
1219
                    image = image.removeprefix(prefix)
×
1220
                    # strip only one of the matching prefixes (avoid multi-stripping)
UNCOV
1221
                    break
×
1222
            result.append(image)
1✔
1223
        return result
1✔
1224

1225
    @staticmethod
1✔
1226
    def tar_path(path: str, target_path: str, is_dir: bool):
1✔
1227
        f = tempfile.NamedTemporaryFile()
1✔
1228
        with tarfile.open(mode="w", fileobj=f) as t:
1✔
1229
            abs_path = os.path.abspath(path)
1✔
1230
            arcname = (
1✔
1231
                os.path.basename(path)
1232
                if is_dir
1233
                else (os.path.basename(target_path) or os.path.basename(path))
1234
            )
1235
            t.add(abs_path, arcname=arcname)
1✔
1236

1237
        f.seek(0)
1✔
1238
        return f
1✔
1239

1240
    @staticmethod
1✔
1241
    def untar_to_path(tardata, target_path):
1✔
1242
        target_path = Path(target_path)
1✔
1243
        with tarfile.open(mode="r", fileobj=io.BytesIO(b"".join(b for b in tardata))) as t:
1✔
1244
            if target_path.is_dir():
1✔
1245
                t.extractall(path=target_path)
1✔
1246
            else:
1247
                member = t.next()
1✔
1248
                if member:
1✔
1249
                    member.name = target_path.name
1✔
1250
                    t.extract(member, target_path.parent)
1✔
1251
                else:
UNCOV
1252
                    LOG.debug("File to copy empty, ignoring...")
×
1253

1254
    @staticmethod
1✔
1255
    def _read_docker_cli_env_file(env_file: str) -> dict[str, str]:
1✔
1256
        """
1257
        Read an environment file in docker CLI format, specified here:
1258
        https://docs.docker.com/reference/cli/docker/container/run/#env
1259
        :param env_file: Path to the environment file
1260
        :return: Read environment variables
1261
        """
1262
        env_vars = {}
1✔
1263
        try:
1✔
1264
            with open(env_file) as f:
1✔
1265
                env_file_lines = f.readlines()
1✔
UNCOV
1266
        except FileNotFoundError as e:
×
UNCOV
1267
            LOG.error(
×
1268
                "Specified env file '%s' not found. Please make sure the file is properly mounted into the LocalStack container. Error: %s",
1269
                env_file,
1270
                e,
1271
            )
UNCOV
1272
            raise
×
UNCOV
1273
        except OSError as e:
×
UNCOV
1274
            LOG.error(
×
1275
                "Could not read env file '%s'. Please make sure the LocalStack container has the permissions to read it. Error: %s",
1276
                env_file,
1277
                e,
1278
            )
UNCOV
1279
            raise
×
1280
        for idx, line in enumerate(env_file_lines):
1✔
1281
            line = line.strip()
1✔
1282
            if not line or line.startswith("#"):
1✔
1283
                # skip comments or empty lines
1284
                continue
1✔
1285
            lhs, separator, rhs = line.partition("=")
1✔
1286
            if rhs or separator:
1✔
1287
                env_vars[lhs] = rhs
1✔
1288
            else:
1289
                # No "=" in the line, only the name => lookup in local env
1290
                if env_value := os.environ.get(lhs):
1✔
1291
                    env_vars[lhs] = env_value
1✔
1292
        return env_vars
1✔
1293

1294
    @staticmethod
1✔
1295
    def parse_additional_flags(
1✔
1296
        additional_flags: str,
1297
        env_vars: dict[str, str] | None = None,
1298
        labels: dict[str, str] | None = None,
1299
        volumes: list[SimpleVolumeBind] | None = None,
1300
        network: str | None = None,
1301
        platform: DockerPlatform | None = None,
1302
        ports: PortMappings | None = None,
1303
        privileged: bool | None = None,
1304
        user: str | None = None,
1305
        ulimits: list[Ulimit] | None = None,
1306
        dns: str | list[str] | None = None,
1307
    ) -> DockerRunFlags:
1308
        """Parses additional CLI-formatted Docker flags, which could overwrite provided defaults.
1309
        :param additional_flags: String which contains the flag definitions inspired by the Docker CLI reference:
1310
                                 https://docs.docker.com/engine/reference/commandline/run/
1311
        :param env_vars: Dict with env vars. Will be modified in place.
1312
        :param labels: Dict with labels. Will be modified in place.
1313
        :param volumes: List of mount tuples (host_path, container_path). Will be modified in place.
1314
        :param network: Existing network name (optional). Warning will be printed if network is overwritten in flags.
1315
        :param platform: Platform to execute container. Warning will be printed if platform is overwritten in flags.
1316
        :param ports: PortMapping object. Will be modified in place.
1317
        :param privileged: Run the container in privileged mode. Warning will be printed if overwritten in flags.
1318
        :param ulimits: ulimit options in the format <type>=<soft limit>[:<hard limit>]
1319
        :param user: User to run first process. Warning will be printed if user is overwritten in flags.
1320
        :param dns: List of DNS servers to configure the container with.
1321
        :return: A DockerRunFlags object that will return new objects if respective parameters were None and
1322
                additional flags contained a flag for that object or the same which are passed otherwise.
1323
        """
1324
        # Argparse refactoring opportunity: custom argparse actions can be used to modularize parsing (e.g., key=value)
1325
        # https://docs.python.org/3/library/argparse.html#action
1326

1327
        # Configure parser
1328
        parser = NoExitArgumentParser(description="Docker run flags parser")
1✔
1329
        parser.add_argument(
1✔
1330
            "--add-host",
1331
            help="Add a custom host-to-IP mapping (host:ip)",
1332
            dest="add_hosts",
1333
            action="append",
1334
        )
1335
        parser.add_argument(
1✔
1336
            "--env", "-e", help="Set environment variables", dest="envs", action="append"
1337
        )
1338
        parser.add_argument(
1✔
1339
            "--env-file",
1340
            help="Set environment variables via a file",
1341
            dest="env_files",
1342
            action="append",
1343
        )
1344
        parser.add_argument(
1✔
1345
            "--compose-env-file",
1346
            help="Set environment variables via a file, with a docker-compose supported feature set.",
1347
            dest="compose_env_files",
1348
            action="append",
1349
        )
1350
        parser.add_argument(
1✔
1351
            "--label", "-l", help="Add container meta data", dest="labels", action="append"
1352
        )
1353
        parser.add_argument("--network", help="Connect a container to a network")
1✔
1354
        parser.add_argument(
1✔
1355
            "--platform",
1356
            type=DockerPlatform,
1357
            help="Docker platform (e.g., linux/amd64 or linux/arm64)",
1358
        )
1359
        parser.add_argument(
1✔
1360
            "--privileged",
1361
            help="Give extended privileges to this container",
1362
            action="store_true",
1363
        )
1364
        parser.add_argument(
1✔
1365
            "--publish",
1366
            "-p",
1367
            help="Publish container port(s) to the host",
1368
            dest="publish_ports",
1369
            action="append",
1370
        )
1371
        parser.add_argument(
1✔
1372
            "--ulimit", help="Container ulimit settings", dest="ulimits", action="append"
1373
        )
1374
        parser.add_argument("--user", "-u", help="Username or UID to execute first process")
1✔
1375
        parser.add_argument(
1✔
1376
            "--volume", "-v", help="Bind mount a volume", dest="volumes", action="append"
1377
        )
1378
        parser.add_argument("--dns", help="Set custom DNS servers", dest="dns", action="append")
1✔
1379

1380
        # Parse
1381
        flags = shlex.split(additional_flags)
1✔
1382
        args = parser.parse_args(flags)
1✔
1383

1384
        # Post-process parsed flags
1385
        extra_hosts = None
1✔
1386
        if args.add_hosts:
1✔
1387
            for add_host in args.add_hosts:
1✔
1388
                extra_hosts = extra_hosts if extra_hosts is not None else {}
1✔
1389
                hosts_split = add_host.split(":")
1✔
1390
                extra_hosts[hosts_split[0]] = hosts_split[1]
1✔
1391

1392
        # set env file values before env values, as the latter override the earlier
1393
        if args.env_files:
1✔
1394
            env_vars = env_vars if env_vars is not None else {}
1✔
1395
            for env_file in args.env_files:
1✔
1396
                env_vars.update(Util._read_docker_cli_env_file(env_file))
1✔
1397

1398
        if args.compose_env_files:
1✔
1399
            env_vars = env_vars if env_vars is not None else {}
1✔
1400
            for env_file in args.compose_env_files:
1✔
1401
                env_vars.update(dotenv.dotenv_values(env_file))
1✔
1402

1403
        if args.envs:
1✔
1404
            env_vars = env_vars if env_vars is not None else {}
1✔
1405
            for env in args.envs:
1✔
1406
                lhs, _, rhs = env.partition("=")
1✔
1407
                env_vars[lhs] = rhs
1✔
1408

1409
        if args.labels:
1✔
1410
            labels = labels if labels is not None else {}
1✔
1411
            for label in args.labels:
1✔
1412
                key, _, value = label.partition("=")
1✔
1413
                # Only consider non-empty labels
1414
                if key:
1✔
1415
                    labels[key] = value
1✔
1416

1417
        if args.network:
1✔
1418
            LOG.warning(
1✔
1419
                "Overwriting Docker container network '%s' with new value '%s'",
1420
                network,
1421
                args.network,
1422
            )
1423
            network = args.network
1✔
1424

1425
        if args.platform:
1✔
1426
            LOG.warning(
1✔
1427
                "Overwriting Docker platform '%s' with new value '%s'",
1428
                platform,
1429
                args.platform,
1430
            )
1431
            platform = args.platform
1✔
1432

1433
        if args.privileged:
1✔
1434
            LOG.warning(
1✔
1435
                "Overwriting Docker container privileged flag %s with new value %s",
1436
                privileged,
1437
                args.privileged,
1438
            )
1439
            privileged = args.privileged
1✔
1440

1441
        if args.publish_ports:
1✔
1442
            for port_mapping in args.publish_ports:
1✔
1443
                port_split = port_mapping.split(":")
1✔
1444
                protocol = "tcp"
1✔
1445
                if len(port_split) == 2:
1✔
1446
                    host_port, container_port = port_split
1✔
1447
                elif len(port_split) == 3:
1✔
1448
                    LOG.warning(
1✔
1449
                        "Host part of port mappings are ignored currently in additional flags"
1450
                    )
1451
                    _, host_port, container_port = port_split
1✔
1452
                else:
1453
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
1✔
1454
                host_port_split = host_port.split("-")
1✔
1455
                if len(host_port_split) == 2:
1✔
1456
                    host_port = [int(host_port_split[0]), int(host_port_split[1])]
1✔
1457
                elif len(host_port_split) == 1:
1✔
1458
                    host_port = int(host_port)
1✔
1459
                else:
UNCOV
1460
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
×
1461
                if "/" in container_port:
1✔
1462
                    container_port, protocol = container_port.split("/")
1✔
1463
                ports = ports if ports is not None else PortMappings()
1✔
1464
                ports.add(host_port, int(container_port), protocol)
1✔
1465

1466
        if args.ulimits:
1✔
1467
            ulimits = ulimits if ulimits is not None else []
1✔
1468
            ulimits_dict = {ul.name: ul for ul in ulimits}
1✔
1469
            for ulimit in args.ulimits:
1✔
1470
                name, _, rhs = ulimit.partition("=")
1✔
1471
                soft, _, hard = rhs.partition(":")
1✔
1472
                hard_limit = int(hard) if hard else int(soft)
1✔
1473
                new_ulimit = Ulimit(name=name, soft_limit=int(soft), hard_limit=hard_limit)
1✔
1474
                if ulimits_dict.get(name):
1✔
1475
                    LOG.warning("Overwriting Docker ulimit %s", new_ulimit)
1✔
1476
                ulimits_dict[name] = new_ulimit
1✔
1477
            ulimits = list(ulimits_dict.values())
1✔
1478

1479
        if args.user:
1✔
1480
            LOG.warning(
1✔
1481
                "Overwriting Docker user '%s' with new value '%s'",
1482
                user,
1483
                args.user,
1484
            )
1485
            user = args.user
1✔
1486

1487
        if args.volumes:
1✔
1488
            volumes = volumes if volumes is not None else []
1✔
1489
            for volume in args.volumes:
1✔
1490
                match = re.match(
1✔
1491
                    r"(?P<host>[\w\s\\\/:\-.]+?):(?P<container>[\w\s\/\-.]+)(?::(?P<arg>ro|rw|z|Z))?",
1492
                    volume,
1493
                )
1494
                if not match:
1✔
UNCOV
1495
                    LOG.warning("Unable to parse volume mount Docker flags: %s", volume)
×
UNCOV
1496
                    continue
×
1497
                host_path = match.group("host")
1✔
1498
                container_path = match.group("container")
1✔
1499
                rw_args = match.group("arg")
1✔
1500
                if rw_args:
1✔
1501
                    LOG.info("Volume options like :ro or :rw are currently ignored.")
1✔
1502
                volumes.append((host_path, container_path))
1✔
1503

1504
        dns = ensure_list(dns or [])
1✔
1505
        if args.dns:
1✔
1506
            LOG.info(
1✔
1507
                "Extending Docker container DNS servers %s with additional values %s", dns, args.dns
1508
            )
1509
            dns.extend(args.dns)
1✔
1510

1511
        return DockerRunFlags(
1✔
1512
            env_vars=env_vars,
1513
            extra_hosts=extra_hosts,
1514
            labels=labels,
1515
            volumes=volumes,
1516
            ports=ports,
1517
            network=network,
1518
            platform=platform,
1519
            privileged=privileged,
1520
            ulimits=ulimits,
1521
            user=user,
1522
            dns=dns,
1523
        )
1524

1525
    @staticmethod
1✔
1526
    def convert_mount_list_to_dict(
1✔
1527
        volumes: list[SimpleVolumeBind] | VolumeMappings,
1528
    ) -> dict[str, dict[str, str]]:
1529
        """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""
1530

1531
        def _map_to_dict(paths: VolumeMappingSpecification):
1✔
1532
            # TODO: move this logic to the `Mount` base class
1533
            if isinstance(paths, (BindMount, VolumeDirMount)):
1✔
1534
                return paths.to_docker_sdk_parameters()
1✔
1535
            else:
UNCOV
1536
                return str(paths[0]), {"bind": paths[1], "mode": "rw"}
×
1537

1538
        return dict(
1✔
1539
            map(
1540
                _map_to_dict,
1541
                volumes,
1542
            )
1543
        )
1544

1545
    @staticmethod
1✔
1546
    def resolve_dockerfile_path(dockerfile_path: str) -> str:
1✔
1547
        """If the given path is a directory that contains a Dockerfile, then return the file path to it."""
1548
        rel_path = os.path.join(dockerfile_path, "Dockerfile")
1✔
1549
        if os.path.isdir(dockerfile_path) and os.path.exists(rel_path):
1✔
1550
            return rel_path
1✔
1551
        return dockerfile_path
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc