• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DataBiosphere / azul / 14395083375

11 Apr 2025 03:38AM UTC coverage: 85.683% (-0.01%) from 85.693%
14395083375

push

github

achave11-ucsc
Fix CVE-2024-24790 in BQ emulator image, add arm64 version (DataBiosphere/azul-private#229, #6090, PR #7029)

18673 of 21793 relevant lines covered (85.68%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.71
src/azul/docker.py
1
from abc import (
1✔
2
    ABCMeta,
3
    abstractmethod,
4
)
5
from base64 import (
1✔
6
    b64decode,
7
    b64encode,
8
    urlsafe_b64encode,
9
)
10
from collections import (
1✔
11
    defaultdict,
12
)
13
from contextlib import (
1✔
14
    contextmanager,
15
)
16
from hashlib import (
1✔
17
    sha1,
18
    sha256,
19
)
20
import json
1✔
21
import logging
1✔
22
import os
1✔
23
import re
1✔
24
import subprocess
1✔
25
import tempfile
1✔
26
from typing import (
1✔
27
    Any,
28
    Iterable,
29
    Literal,
30
    Optional,
31
    Self,
32
    TypedDict,
33
    cast,
34
)
35

36
import attrs
1✔
37
import docker
1✔
38
from docker.models.images import (
1✔
39
    Image,
40
)
41
from dxf import (
1✔
42
    DXF,
43
    DXFBase,
44
)
45
from more_itertools import (
1✔
46
    one,
47
    padded,
48
)
49
import requests
1✔
50

51
from azul import (
1✔
52
    R,
53
    cached_property,
54
    config,
55
)
56
from azul.types import (
1✔
57
    JSONs,
58
    json_int,
59
    json_str,
60
)
61

62
log = logging.getLogger(__name__)
1✔
63

64

65
@attrs.define(frozen=True)
1✔
66
class ImageRef(metaclass=ABCMeta):
1✔
67
    """
68
    A fully qualified reference to a Docker image in a registry.
69

70
    Does not support any abbreviations such as omitting the registry (defaulting
71
    to ``docker.io``), username (defaulting to ``library``) or tag (defaulting
72
    to ``latest``).
73
    """
74

75
    #: The part before the first slash. This is usually the domain name of image
76
    #: registry e.g., ``"docker.io"``
77
    registry: str
1✔
78

79
    #: The part between the first and second slash. This is usually the name of
80
    #: the user or organisation owning the image. It can also be a generic term
81
    #: such as ``"library"``.
82
    username: str
1✔
83

84
    #: The part after the second slash, split on the remaining slashes. Will
85
    #: have at least one element.
86
    repository: tuple[str, ...]
1✔
87

88
    @classmethod
1✔
89
    def parse(cls, image_ref: str) -> 'ImageRef':
1✔
90
        """
91
        >>> ImageRef.parse('2@1')
92
        DigestImageRef(registry='docker.io', username='library', repository=('2',), digest='1')
93
        >>> ImageRef.parse('3/2:1')
94
        TagImageRef(registry='docker.io', username='3', repository=('2',), tag='1')
95
        >>> ImageRef.parse('4/3/2:1')
96
        TagImageRef(registry='4', username='3', repository=('2',), tag='1')
97
        >>> ImageRef.parse('5/4/3/2:1')
98
        TagImageRef(registry='5', username='4', repository=('3', '2'), tag='1')
99
        >>> ImageRef.parse('localhost:5000/docker.io/ucscgi/azul-pycharm:2023.3.4-15')
100
        ... # doctest: +NORMALIZE_WHITESPACE
101
        TagImageRef(registry='localhost:5000',
102
                    username='docker.io',
103
                    repository=('ucscgi', 'azul-pycharm'),
104
                    tag='2023.3.4-15')
105
        """
106
        if '@' in image_ref:
1✔
107
            return DigestImageRef.parse(image_ref)
1✔
108
        else:
109
            return TagImageRef.parse(image_ref)
1✔
110

111
    @classmethod
1✔
112
    def _create(cls, name: str, **kwargs) -> Self:
1✔
113
        name = name.split('/')
1✔
114
        if len(name) == 1:
1✔
115
            registry, username, repository = 'docker.io', 'library', name
1✔
116
        elif len(name) == 2:
1✔
117
            registry, (username, *repository) = 'docker.io', name
1✔
118
        elif len(name) > 2:
1✔
119
            registry, username, *repository = name
1✔
120
        else:
121
            assert False
×
122
        # noinspection PyArgumentList
123
        return cls(registry=registry,
1✔
124
                   username=username,
125
                   repository=tuple(repository),
126
                   **kwargs)
127

128
    @property
1✔
129
    def name(self):
1✔
130
        """
131
        The name of the image, starting with the registry, up to, but not
132
        including, the tag.
133
        """
134
        return '/'.join((self.registry, self.relative_name))
1✔
135

136
    @property
1✔
137
    def relative_name(self):
1✔
138
        """
139
        The name of the image relative to the registry.
140
        """
141
        return '/'.join((self.username, *self.repository))
1✔
142

143
    @property
1✔
144
    def registry_host(self):
1✔
145
        """
146
        Same as :py:attr:``registry`` with hacks for DockerHub.
147

148
        https://github.com/docker/cli/issues/3793#issuecomment-1269051403
149
        """
150
        registry = self.registry
1✔
151
        return 'registry-1.docker.io' if registry == 'docker.io' else registry
1✔
152

153
    def with_digest(self, digest: str) -> 'DigestImageRef':
1✔
154
        return DigestImageRef.create(self.name, digest)
1✔
155

156
    def with_tag(self, tag: str) -> 'TagImageRef':
1✔
157
        return TagImageRef.create(self.name, tag)
×
158

159
    ecr_registry_host_re = re.compile(r'[\d]+\.dkr\.ecr\.[^.]+\.amazonaws\.com')
1✔
160

161
    @property
1✔
162
    def is_mirrored(self) -> bool:
1✔
163
        return self.ecr_registry_host_re.fullmatch(self.registry_host) is not None
1✔
164

165
    def port_to(self, registry: str) -> Self:
1✔
166
        """
167
        >>> ref = ImageRef.parse('a/b/c:d')
168
        >>> ref.port_to('e')
169
        TagImageRef(registry='e', username='a', repository=('b', 'c'), tag='d')
170
        >>> ref.port_to('')
171
        TagImageRef(registry='a', username='b', repository=('c',), tag='d')
172
        >>> ref.port_to('a')
173
        ... # doctest: +NORMALIZE_WHITESPACE
174
        Traceback (most recent call last):
175
        ...
176
        AssertionError: R('Reference already ported to registry',
177
        TagImageRef(registry='a', username='b', repository=('c',), tag='d'),
178
        'a')
179
        """
180
        if registry:
1✔
181
            assert self.registry != registry, R(
1✔
182
                'Reference already ported to registry',
183
                self, registry)
184
            other = type(self).parse(registry + '/' + str(self))
1✔
185
            assert isinstance(other, type(self))
1✔
186
            return other
1✔
187
        else:
188
            return self
1✔
189

190
    def port_from(self, registry: str) -> Self:
1✔
191
        """
192
        >>> ref = ImageRef.parse('a/b/c:d')
193
        >>> ref.port_to('e').port_from('e')
194
        TagImageRef(registry='a', username='b', repository=('c',), tag='d')
195
        >>> ref.port_to('').port_from('')
196
        TagImageRef(registry='a', username='b', repository=('c',), tag='d')
197
        >>> ref.port_from('e')
198
        ... # doctest: +NORMALIZE_WHITESPACE
199
        Traceback (most recent call last):
200
        ...
201
        AssertionError: R('Reference does not use the registry to port from',
202
        TagImageRef(registry='a', username='b', repository=('c',), tag='d'), 'e')
203
        """
204
        if registry:
1✔
205
            assert self.registry == registry, R(
1✔
206
                'Reference does not use the registry to port from',
207
                self, registry)
208
            other = type(self).parse(str(self).removeprefix(registry + '/'))
1✔
209
            assert isinstance(other, type(self))
1✔
210
            return other
1✔
211
        else:
212
            return self
1✔
213

214
    @property
1✔
215
    def auth_server_url(self) -> str:
1✔
216
        """
217
        The Docker client tracks credentials in ~/.docker/config.json using the
218
        URL or hostname of the server requesting authentication. Similarly, the
219
        credential helpers expect the same value on stdandard input. This method
220
        returns that value for this repository.
221
        """
222
        if self.registry == 'docker.io':
×
223
            return 'https://index.docker.io/v1/'
×
224
        else:
225
            return self.registry_host
×
226

227
    @property
1✔
228
    def tf_repository(self):
1✔
229
        """
230
        A string suitable for identifying (in Terraform config) the ECR
231
        repository resource holding this image.
232
        """
233
        hash = urlsafe_b64encode(sha1(self.name.encode()).digest()).decode()[:-1]
1✔
234
        return 'repository_' + hash
1✔
235

236
    @property
1✔
237
    def tf_alnum_repository(self):
1✔
238
        """
239
        An alphanumeric string suitable for identifying (in Terraform config)
240
        the ECR repository resource holding this image. Unlike `tf_repository`,
241
        the string may only contain characters in [0-9a-zA-Z].
242
        """
243
        return 'repository' + sha1(self.name.encode()).hexdigest()
×
244

245
    @property
1✔
246
    def tf_image(self):
1✔
247
        """
248
        A string suitable for identifying (in Terraform config) any resource
249
        specific to this image.
250
        """
251
        hash = urlsafe_b64encode(sha1(str(self).encode()).digest()).decode()[:-1]
×
252
        return 'image_' + hash
×
253

254
    @property
1✔
255
    @abstractmethod
1✔
256
    def qualifier(self) -> str:
1✔
257
        raise NotImplementedError
258

259

260
@attrs.define(frozen=True)
1✔
261
class DigestImageRef(ImageRef):
1✔
262
    """
263
    A fully qualified and stable reference to a Docker image in a registry.
264
    """
265

266
    #: The part after the '@', a hash of the image manifest. While it uniquely
267
    #: identifies an image within a registry, it is not consistent accross
268
    #: registries. The same image can have different digests in different
269
    #: registries.
270
    digest: str
1✔
271

272
    @classmethod
1✔
273
    def parse(cls, image_ref: str) -> Self:
1✔
274
        name, digest = image_ref.split('@')
1✔
275
        return cls.create(name, digest)
1✔
276

277
    @classmethod
1✔
278
    def create(cls, name: str, digest: str) -> Self:
1✔
279
        return super()._create(name, digest=digest)
1✔
280

281
    def __str__(self) -> str:
1✔
282
        """
283
        The inverse of :py:meth:`parse`.
284
        """
285
        return self.name + '@' + self.digest
1✔
286

287
    @property
1✔
288
    def qualifier(self) -> str:
1✔
289
        return self.digest
1✔
290

291

292
@attrs.define(frozen=True)
1✔
293
class TagImageRef(ImageRef):
1✔
294
    """
295
    A fully qualified reference to a tagged Docker image in a registry.
296
    """
297

298
    #: The part after the colon in an image name. This is the name of a tag
299
    #: associated with the image. Tags refer to digests and are mutable. For a
300
    #: stable references to images in a registry use :py:class:`DigestImageRef`.
301
    tag: str
1✔
302

303
    @classmethod
1✔
304
    def parse(cls, image_ref: str) -> Self:
1✔
305
        # A colon in the first part of the name might separate host and port
306
        name, _, tag = image_ref.rpartition(':')
1✔
307
        return cls.create(name, tag)
1✔
308

309
    @classmethod
1✔
310
    def create(cls, name: str, tag: str) -> Self:
1✔
311
        return super()._create(name, tag=tag)
1✔
312

313
    def __str__(self) -> str:
1✔
314
        """
315
        The inverse of :py:meth:`parse`.
316
        """
317
        return self.name + ':' + self.tag
1✔
318

319
    @property
1✔
320
    def qualifier(self) -> str:
1✔
321
        return self.tag
×
322

323

324
@attrs.define(frozen=True)
1✔
325
class Platform:
1✔
326
    os: str
1✔
327
    arch: str
1✔
328
    variant: Optional[str]
1✔
329

330
    def normalize(self) -> Self:
1✔
331
        os = _normalize_os(self.os)
1✔
332
        arch, variant = _normalize_arch(self.arch, self.variant)
1✔
333
        return attrs.evolve(self, os=os, arch=arch, variant=variant)
1✔
334

335
    @classmethod
1✔
336
    def parse(cls, platform: str) -> Self:
1✔
337
        os, arch, variant = padded(platform.split('/'), None, 3)
1✔
338
        assert os, R('Invalid operating system', platform)
1✔
339
        assert arch, R('Invalid architecture', platform)
1✔
340
        assert variant is None or variant, R('Invalid variant', platform)
1✔
341
        return cls(os=os, arch=arch, variant=variant)
1✔
342

343
    @classmethod
1✔
344
    def from_json(cls, platform, config: bool = False) -> Self:
1✔
345
        def case(s):
1✔
346
            return s.capitalize() if config else s
1✔
347

348
        return cls(os=platform[case('os')],
1✔
349
                   arch=platform[case('architecture')],
350
                   variant=platform.get(case('variant')))
351

352
    def __str__(self) -> str:
1✔
353
        result = [self.os, self.arch]
1✔
354
        if self.variant is not None:
1✔
355
            result.append(self.variant)
×
356
        return '/'.join(result)
1✔
357

358

359
images_by_alias = {
1✔
360
    alias: TagImageRef.parse(spec['ref'])
361
    for alias, spec in config.docker_images.items()
362
}
363

364
images = images_by_alias.values()
1✔
365

366
platforms = list(map(Platform.parse, config.docker_platforms))
1✔
367

368
images_by_name: dict[str, list] = defaultdict(list)
1✔
369
for image in images:
1✔
370
    images_by_name[image.name].append(image)
1✔
371
del image
1✔
372

373
images_by_tf_repository: dict[tuple[str, str], list[TagImageRef]] = {
1✔
374
    (name, one(set(image.tf_repository for image in images))): images
375
    for name, images in images_by_name.items()
376
}
377

378

379
# https://github.com/containerd/containerd/blob/1fbd70374134b891f97ce19c70b6e50c7b9f4e0d/platforms/database.go#L62
380

381
def _normalize_os(os: str) -> str:
1✔
382
    os = os and os.lower()
1✔
383
    if os == 'macos':
1✔
384
        os = 'darwin'
×
385
    return os
1✔
386

387

388
# https://github.com/containerd/containerd/blob/1fbd70374134b891f97ce19c70b6e50c7b9f4e0d/platforms/database.go#L76
389

390
def _normalize_arch(arch: str,
1✔
391
                    variant: Optional[str]
392
                    ) -> tuple[str, Optional[str]]:
393
    arch = arch.lower()
1✔
394
    variant = variant and variant.lower()
1✔
395
    if arch == 'i386':
1✔
396
        arch = '386'
×
397
        variant = None
×
398
    elif arch in ('x86_64', 'x86-64', 'amd64'):
1✔
399
        arch = 'amd64'
1✔
400
        if variant == 'v1':
1✔
401
            variant = None
×
402
    elif arch in ('aarch64', 'arm64'):
×
403
        arch = 'arm64'
×
404
        if variant in ('8', 'v8'):
×
405
            variant = None
×
406
    elif arch == 'armhf':
×
407
        arch = 'arm'
×
408
        variant = 'v7'
×
409
    elif arch == 'armel':
×
410
        arch = 'arm'
×
411
        variant = 'v6'
×
412
    elif arch == 'arm':
×
413
        if variant in (None, '7'):
×
414
            variant = 'v7'
×
415
        elif variant in ('5', '6', '8'):
×
416
            variant = 'v' + variant
×
417
    return arch, variant
1✔
418

419

420
class Gist(TypedDict):
1✔
421
    """
422
    Represents an image manifest or a blob, or any Docker artifact with a digest
423
    """
424

425
    #: A hash of the content, typically starting in `sha256:`
426
    digest: str
1✔
427

428

429
class ImageGist(Gist):
1✔
430
    """
431
    A Docker image
432
    """
433
    #: Type of system to run the image on, as in `os/arch` or `os/arch/variant`
434
    platform: str
1✔
435

436
    #: The hash of the image config JSON, most likely starting in `sha256:`.
437
    #: This is consistent accross registries and includes the hashes of the
438
    #: uncompressed, binary content of the image, and is commonly referred to as
439
    #: the "image ID".
440
    id: str
1✔
441

442

443
class IndexImageGist(Gist):
1✔
444
    """
445
    A multi-platform image, also known as an image index
446
    """
447
    #: While the inherited ``digest`` property pertains to the original
448
    #: registry, this property contains the digest of the image in the mirror
449
    #: registry, i.e. ECR.  Even though the digests of the platform-specific
450
    #: parts of a multi-platform image are the same in both registries, the
451
    #: digest of the mirrored multi-platform image usually differs from the
452
    #: original because 1) the mirror only includes a subset of the original
453
    #: parts and 2) the digest algorithm is generally sensitive to insignificant
454
    #: JSON differences in whitespace or property order.
455
    mirror_digest: str
1✔
456

457
    #: The images in the list, by platform (`os/arch` or `os/arch/variant`)
458
    parts: dict[str, ImageGist]
1✔
459

460

461
@attrs.define(frozen=True, slots=False)
1✔
462
class Repository:
1✔
463
    image_ref: ImageRef
1✔
464

465
    @cached_property
1✔
466
    def host(self) -> str:
1✔
467
        return self.image_ref.registry_host
×
468

469
    @cached_property
1✔
470
    def name(self) -> str:
1✔
471
        return self.image_ref.relative_name
×
472

473
    @classmethod
1✔
474
    def get_gists(cls) -> dict[str, ImageGist | IndexImageGist]:
1✔
475
        gists = {}
×
476
        for alias, ref in images_by_alias.items():
×
477
            log.info('Getting information for %r (%s)', alias, ref)
×
478
            repository = cls(ref)
×
479
            digest = repository.get_tag(ref.tag)
×
480
            gists[str(ref)] = repository.get_gist(digest)
×
481
        return gists
×
482

483
    def get_tag(self, tag: str) -> str:
1✔
484
        """
485
        Return the manifest digest associated with the given tag.
486
        """
487
        log.info('Getting tag %r', tag)
×
488
        digest, _ = self._client.head_manifest_and_response(tag)
×
489
        return digest
×
490

491
    def get_gist(self, digest: str) -> ImageGist | IndexImageGist:
1✔
492
        """
493
        Return the manifest for the given digest.
494
        """
495
        log.info('Getting manifest %r', digest)
×
496
        manifest, _ = self._client.get_manifest_and_response(digest)
×
497
        manifest = json.loads(manifest)
×
498
        match manifest['mediaType']:
×
499
            case ('application/vnd.oci.image.index.v1+json'
×
500
                  | 'application/vnd.docker.distribution.manifest.list.v2+json'):
501
                parts = self._get_mirrored_parts(manifest['manifests'])
×
502
                mirror_manifest = ImageIndexManifest.create({
×
503
                    platform: ImageIndexPart(digest=part['digest'], size=size)
504
                    for platform, (part, size) in parts.items()
505
                })
506
                return {
×
507
                    'digest': digest,
508
                    'mirror_digest': mirror_manifest.digest,
509
                    'parts': {str(platform): part for platform, (part, size) in parts.items()}
510
                }
511
            case ('application/vnd.docker.distribution.manifest.v2+json'
×
512
                  | 'application/vnd.oci.image.manifest.v1+json'):
513
                config_digest = manifest['config']['digest']
×
514
                config = json.loads(self.get_blob(config_digest))
×
515
                return {
×
516
                    'digest': digest,
517
                    'id': config_digest,
518
                    'platform': str(Platform.from_json(config).normalize())
519
                }
520
            case media_type:
×
521
                raise NotImplementedError(media_type)
522

523
    def _get_mirrored_parts(self,
1✔
524
                            manifests: JSONs
525
                            ) -> dict[Platform, tuple[ImageGist, int]]:
526
        gists = {}
×
527
        for manifest in manifests:
×
528
            platform = Platform.from_json(manifest['platform']).normalize()
×
529
            if platform in platforms:
×
530
                digest, size = json_str(manifest['digest']), json_int(manifest['size'])
×
531
                gist = self.get_gist(digest)
×
532
                assert gist.get('platform') == str(platform), R(
×
533
                    'Inconsistent platform between manifest and manifest list',
534
                    manifest, gist)
535
                gists[platform] = cast(ImageGist, gist), size
×
536
        return gists
×
537

538
    def get_blob(self, digest: str) -> bytes:
1✔
539
        """
540
        Return the content for the given digest.
541
        """
542
        log.info('Getting blob %r', digest)
×
543
        chunks = self._client.pull_blob(digest)
×
544
        return b''.join(chunks)
×
545

546
    @cached_property
1✔
547
    def _client(self):
1✔
548
        return DXF(host=self.host,
×
549
                   repo=self.name,
550
                   auth=self._dxf_auth,
551
                   insecure=self.host.startswith('localhost:') or self.host == 'localhost')
552

553
    def _dxf_auth(self, dxf: DXFBase, response: requests.Response):
1✔
554
        username, password = self._auth
×
555
        dxf.authenticate(username=username,
×
556
                         password=password,
557
                         response=response)
558

559
    @cached_property
1✔
560
    def _auth(self) -> tuple[str, str]:
1✔
561
        auth_server_url = self.image_ref.auth_server_url
×
562
        with open(os.path.expanduser('~/.docker/config.json')) as f:
×
563
            config = json.load(f)
×
564
        try:
×
565
            creds_store = config['credsStore']
×
566
        except KeyError:
×
567
            return self._decode_auth(config['auths'][auth_server_url]['auth'])
×
568
        else:
569
            command = 'docker-credential-' + creds_store
×
570
            output = subprocess.check_output(args=[command, 'get'],
×
571
                                             input=auth_server_url.encode('ascii'))
572
            credentials = json.loads(output)
×
573
            return credentials['Username'], credentials['Secret']
×
574

575
    @property
1✔
576
    def encoded_auth(self) -> str:
1✔
577
        return self._encode_auth(*self._auth)
×
578

579
    def _decode_auth(self, auth: str) -> tuple[str, str]:
1✔
580
        auth = b64decode(auth.encode('ascii')).decode()
×
581
        username, _, secret = auth.partition(':')
×
582
        return username, secret
×
583

584
    def _encode_auth(self, username: str, secret: str) -> str:
1✔
585
        auth = username + ':' + secret
×
586
        return b64encode(auth.encode()).decode('ascii')
×
587

588
    @classmethod
1✔
589
    @contextmanager
1✔
590
    def temporary_auth_file(cls, *refs: ImageRef):
1✔
591
        """
592
        While some utilities in the Docker/OCI ecosystem are able to read
593
        plain-text credentials from the Docker client's configuration file
594
        (~/.docker/config.json), they often lack support for the credential
595
        helpers that can be configured there. Removing the credStore entry from
596
        that configiguration file would disable these helpers, but a prominent
597
        Docker client distribution (Docker Desktop for macOS and Windows)
598
        reinserts the entry every time it starts up.
599

600
        This context manager provides a temporary containers-auth.json [1] with
601
        plain-text credentials for the repositories hosting the given images.
602
        The credentials are obtained by extracting plain-text credentials from
603
        ~/.docker/config.json or by invoking the credStore helper configured
604
        there. The path to the temporary file is passed to the context on entry
605
        and the file is deleted when the context is exited.
606

607
        [1] https://github.com/containers/image/blob/main/docs/containers-auth.json.5.md
608
        """
609
        with tempfile.NamedTemporaryFile() as auth_file:
×
610
            auths = {
×
611
                'auths': {
612
                    ref.auth_server_url: {
613
                        'auth': cls(ref).encoded_auth
614
                    }
615
                    for ref in refs
616
                },
617
            }
618
            auth_file.write(json.dumps(auths).encode())
×
619
            auth_file.flush()
×
620
            yield auth_file.name
×
621

622

623
@attrs.frozen(kw_only=True)
1✔
624
class ImageIndexPart:
1✔
625
    digest: str
1✔
626
    size: int
1✔
627

628

629
@attrs.frozen(kw_only=True)
1✔
630
class ImageIndexManifest:
1✔
631
    json: str
1✔
632
    digest: str
1✔
633

634
    @classmethod
1✔
635
    def create(cls, parts: dict[Platform, ImageIndexPart]) -> Self:
1✔
636
        manifest = {
×
637
            'schemaVersion': 2,
638
            'mediaType': 'application/vnd.docker.distribution.manifest.list.v2+json',
639
            'manifests': [
640
                {
641
                    'mediaType': 'application/vnd.docker.distribution.manifest.v2+json',
642
                    'size': part.size,
643
                    'digest': part.digest,
644
                    'platform': {
645
                        'architecture': platform.arch,
646
                        'os': platform.os
647
                    }
648
                }
649
                for platform, part in parts.items()
650
            ]
651
        }
652
        manifest = json.dumps(manifest, indent=4)
×
653
        digest = 'sha256:' + sha256(manifest.encode()).hexdigest()
×
654
        return cls(json=manifest, digest=digest)
×
655

656

657
def pull_docker_image(ref: ImageRef) -> Image:
1✔
658
    return _push_or_pull(ref, 'pull')
1✔
659

660

661
def push_docker_image(ref: ImageRef) -> Image:
1✔
662
    return _push_or_pull(ref, 'push')
×
663

664

665
def _push_or_pull(ref: ImageRef,
1✔
666
                  direction: Literal['push'] | Literal['pull']
667
                  ) -> Image:
668
    log.info('%sing image %r …', direction.capitalize(), ref)
1✔
669
    client = docker.client.from_env()
1✔
670
    # Despite its name, the `tag` keyword argument can be a digest, too
671
    method = getattr(client.api, direction)
1✔
672
    output = method(ref.name, tag=ref.qualifier, stream=True)
1✔
673
    log_lines(ref, direction, output)
1✔
674
    log.info('%sed image %r', direction.capitalize(), ref)
1✔
675
    return client.images.get(str(ref))
1✔
676

677

678
def log_lines(context: Any, command: str, output: Iterable[bytes]):
1✔
679
    for line in output:
1✔
680
        log.debug('%s: docker %s %s', context, command, line.decode().strip())
1✔
681

682

683
def get_docker_image_gist(ref: TagImageRef) -> ImageGist | IndexImageGist:
1✔
684
    return get_docker_image_gists()[str(ref)]
1✔
685

686

687
def get_docker_image_gists() -> dict[str, ImageGist | IndexImageGist]:
1✔
688
    with open(config.docker_image_gists_path) as f:
1✔
689
        return json.load(f)
1✔
690

691

692
def resolve_docker_image_for_launch(alias: str) -> str:
1✔
693
    """
694
    Return an image reference that can be used to launch a container from the
695
    image with the given alias. The alias is the top level key in the JSON
696
    object contained in the environment variable `azul_docker_images`.
697
    """
698
    ref_to_pull, gist = resolve_docker_image_for_pull(alias)
1✔
699
    image = pull_docker_image(ref_to_pull)
1✔
700
    # In either case, the verification below ensures that the image we pulled
701
    # has the expected ID.
702
    try:
1✔
703
        parts = cast(IndexImageGist, gist)['parts']
1✔
704
    except KeyError:
×
705
        # For single-platform images, this is straight forward.
706
        assert image.id == cast(ImageGist, gist)['id']
×
707
    else:
708
        # To determine the expected ID for images that are part of a multi-
709
        # platform image aka "manifest list" aka "image index", we need to know
710
        # what specific platform was pulled since we left it to Docker to
711
        # determine the best match.
712
        platform = Platform.from_json(image.attrs, config=True).normalize()
1✔
713
        assert image.id == parts[str(platform)]['id']
1✔
714
    # Returning the image ID means that the container will be launched using
715
    # exactly the image we just pulled and verified.
716
    return image.id
1✔
717

718

719
def resolve_docker_image_for_pull(alias: str
1✔
720
                                  ) -> tuple[DigestImageRef, ImageGist | IndexImageGist]:
721
    """
722
    Return a reference to, and the gist of, the image with the given alias, for
723
    the purpose of pulling said image.
724
    """
725
    ref = TagImageRef.parse(config.docker_images[alias]['ref'])
1✔
726
    log.info('Resolving %r image %r …', alias, ref)
1✔
727
    gist = get_docker_image_gist(ref)
1✔
728
    ref = ref.port_to(config.docker_registry)
1✔
729
    # For multi-arch images, we need to use the digest of the mirrored image, if
730
    # we're pulling from a mirror.  For single-arch images, the digest is the
731
    # same between the upstream and mirror registries.
732
    if 'parts' in gist and ref.is_mirrored:
1✔
733
        digest = cast(IndexImageGist, gist)['mirror_digest']
×
734
    else:
735
        digest = gist['digest']
1✔
736
    ref = ref.with_digest(digest)
1✔
737
    log.info('Resolved %r image to %r', alias, ref)
1✔
738
    return ref, gist
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc