• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DataBiosphere / azul / 23174016491

17 Mar 2026 01:25AM UTC coverage: 85.09% (-0.04%) from 85.126%
23174016491

push

github

dsotirho-ucsc
[R] Expand mypy coverage (#6821, #2743, #6779, PR #7870)

20032 of 23542 relevant lines covered (85.09%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.58
src/azul/docker.py
1
from abc import (
1✔
2
    ABCMeta,
3
    abstractmethod,
4
)
5
from base64 import (
1✔
6
    b64decode,
7
    b64encode,
8
    urlsafe_b64encode,
9
)
10
from collections import (
1✔
11
    defaultdict,
12
)
13
from contextlib import (
1✔
14
    contextmanager,
15
)
16
from hashlib import (
1✔
17
    sha1,
18
    sha256,
19
)
20
import json
1✔
21
import logging
1✔
22
import os
1✔
23
import re
1✔
24
import subprocess
1✔
25
import tempfile
1✔
26
from typing import (
1✔
27
    Any,
28
    Iterable,
29
    Literal,
30
    Optional,
31
    Self,
32
    TypedDict,
33
    cast,
34
)
35

36
import attrs
1✔
37
import docker
1✔
38
from docker.models.images import (
1✔
39
    Image,
40
)
41
from dxf import (
1✔
42
    DXF,
43
    DXFBase,
44
)
45
from more_itertools import (
1✔
46
    one,
47
    padded,
48
)
49
import requests
1✔
50

51
from azul import (
1✔
52
    config,
53
)
54
from azul.lib import (
1✔
55
    R,
56
    cache,
57
    cached_property,
58
)
59
from azul.lib.types import (
1✔
60
    JSONs,
61
    json_int,
62
    json_str,
63
)
64

65
log = logging.getLogger(__name__)
1✔
66

67

68
@attrs.define(frozen=True)
1✔
69
class ImageRef(metaclass=ABCMeta):
1✔
70
    """
71
    A fully qualified reference to a Docker image in a registry.
72

73
    Does not support any abbreviations such as omitting the registry (defaulting
74
    to ``docker.io``), username (defaulting to ``library``) or tag (defaulting
75
    to ``latest``).
76
    """
77

78
    #: The part before the first slash. This is usually the domain name of image
79
    #: registry e.g., ``"docker.io"``
80
    registry: str
81

82
    #: The part between the first and second slash. This is usually the name of
83
    #: the user or organisation owning the image. It can also be a generic term
84
    #: such as ``"library"``.
85
    username: str
86

87
    #: The part after the second slash, split on the remaining slashes. Will
88
    #: have at least one element.
89
    repository: tuple[str, ...]
90

91
    @classmethod
1✔
92
    def parse(cls, image_ref: str) -> ImageRef:
1✔
93
        """
94
        >>> ImageRef.parse('2@1')
95
        DigestImageRef(registry='docker.io', username='library', repository=('2',), digest='1')
96
        >>> ImageRef.parse('3/2:1')
97
        TagImageRef(registry='docker.io', username='3', repository=('2',), tag='1')
98
        >>> ImageRef.parse('4/3/2:1')
99
        TagImageRef(registry='4', username='3', repository=('2',), tag='1')
100
        >>> ImageRef.parse('5/4/3/2:1')
101
        TagImageRef(registry='5', username='4', repository=('3', '2'), tag='1')
102
        >>> ImageRef.parse('localhost:5000/docker.io/ucscgi/azul-pycharm:2023.3.4-15')
103
        ... # doctest: +NORMALIZE_WHITESPACE
104
        TagImageRef(registry='localhost:5000',
105
                    username='docker.io',
106
                    repository=('ucscgi', 'azul-pycharm'),
107
                    tag='2023.3.4-15')
108
        """
109
        if '@' in image_ref:
1✔
110
            return DigestImageRef.parse(image_ref)
1✔
111
        else:
112
            return TagImageRef.parse(image_ref)
1✔
113

114
    @classmethod
1✔
115
    def _create(cls, name: str, **kwargs) -> Self:
1✔
116
        name = name.split('/')
1✔
117
        if len(name) == 1:
1✔
118
            registry, username, repository = 'docker.io', 'library', name
1✔
119
        elif len(name) == 2:
1✔
120
            registry, (username, *repository) = 'docker.io', name
1✔
121
        elif len(name) > 2:
1✔
122
            registry, username, *repository = name
1✔
123
        else:
124
            assert False
×
125
        # noinspection PyArgumentList
126
        return cls(registry=registry,
1✔
127
                   username=username,
128
                   repository=tuple(repository),
129
                   **kwargs)
130

131
    @property
1✔
132
    def name(self):
1✔
133
        """
134
        The name of the image, starting with the registry, up to, but not
135
        including, the tag.
136
        """
137
        return '/'.join((self.registry, self.relative_name))
1✔
138

139
    @property
1✔
140
    def relative_name(self):
1✔
141
        """
142
        The name of the image relative to the registry.
143
        """
144
        return '/'.join((self.username, *self.repository))
1✔
145

146
    @property
1✔
147
    def registry_host(self):
1✔
148
        """
149
        Same as :py:attr:``registry`` with hacks for DockerHub.
150

151
        https://github.com/docker/cli/issues/3793#issuecomment-1269051403
152
        """
153
        registry = self.registry
1✔
154
        return 'registry-1.docker.io' if registry == 'docker.io' else registry
1✔
155

156
    def with_digest(self, digest: str) -> DigestImageRef:
1✔
157
        return DigestImageRef.create(self.name, digest)
1✔
158

159
    def with_tag(self, tag: str) -> TagImageRef:
1✔
160
        return TagImageRef.create(self.name, tag)
×
161

162
    ecr_registry_host_re = re.compile(r'[\d]+\.dkr\.ecr\.[^.]+\.amazonaws\.com')
1✔
163

164
    @property
1✔
165
    def is_mirrored(self) -> bool:
1✔
166
        return self.ecr_registry_host_re.fullmatch(self.registry_host) is not None
1✔
167

168
    def port_to(self, registry: str) -> Self:
1✔
169
        """
170
        >>> ref = ImageRef.parse('a/b/c:d')
171
        >>> ref.port_to('e')
172
        TagImageRef(registry='e', username='a', repository=('b', 'c'), tag='d')
173
        >>> ref.port_to('')
174
        TagImageRef(registry='a', username='b', repository=('c',), tag='d')
175
        >>> ref.port_to('a')
176
        ... # doctest: +NORMALIZE_WHITESPACE
177
        Traceback (most recent call last):
178
        ...
179
        AssertionError: R('Reference already ported to registry',
180
        TagImageRef(registry='a', username='b', repository=('c',), tag='d'),
181
        'a')
182
        """
183
        if registry:
1✔
184
            assert self.registry != registry, R(
1✔
185
                'Reference already ported to registry',
186
                self, registry)
187
            other = type(self).parse(registry + '/' + str(self))
1✔
188
            assert isinstance(other, type(self))
1✔
189
            return other
1✔
190
        else:
191
            return self
1✔
192

193
    def port_from(self, registry: str) -> Self:
1✔
194
        """
195
        >>> ref = ImageRef.parse('a/b/c:d')
196
        >>> ref.port_to('e').port_from('e')
197
        TagImageRef(registry='a', username='b', repository=('c',), tag='d')
198
        >>> ref.port_to('').port_from('')
199
        TagImageRef(registry='a', username='b', repository=('c',), tag='d')
200
        >>> ref.port_from('e')
201
        ... # doctest: +NORMALIZE_WHITESPACE
202
        Traceback (most recent call last):
203
        ...
204
        AssertionError: R('Reference does not use the registry to port from',
205
        TagImageRef(registry='a', username='b', repository=('c',), tag='d'), 'e')
206
        """
207
        if registry:
1✔
208
            assert self.registry == registry, R(
1✔
209
                'Reference does not use the registry to port from',
210
                self, registry)
211
            other = type(self).parse(str(self).removeprefix(registry + '/'))
1✔
212
            assert isinstance(other, type(self))
1✔
213
            return other
1✔
214
        else:
215
            return self
1✔
216

217
    @property
1✔
218
    def auth_server_url(self) -> str:
1✔
219
        """
220
        The Docker client tracks credentials in ~/.docker/config.json using the
221
        URL or hostname of the server requesting authentication. Similarly, the
222
        credential helpers expect the same value on stdandard input. This method
223
        returns that value for this repository.
224
        """
225
        if self.registry == 'docker.io':
×
226
            return 'https://index.docker.io/v1/'
×
227
        else:
228
            return self.registry_host
×
229

230
    @property
1✔
231
    def tf_repository(self):
1✔
232
        """
233
        A string suitable for identifying (in Terraform config) the ECR
234
        repository resource holding this image.
235
        """
236
        hash = urlsafe_b64encode(sha1(self.name.encode()).digest()).decode()[:-1]
1✔
237
        return 'repository_' + hash
1✔
238

239
    @property
1✔
240
    def tf_alnum_repository(self):
1✔
241
        """
242
        An alphanumeric string suitable for identifying (in Terraform config)
243
        the ECR repository resource holding this image. Unlike `tf_repository`,
244
        the string may only contain characters in [0-9a-zA-Z].
245
        """
246
        return 'repository' + sha1(self.name.encode()).hexdigest()
×
247

248
    @property
1✔
249
    def tf_image(self):
1✔
250
        """
251
        A string suitable for identifying (in Terraform config) any resource
252
        specific to this image.
253
        """
254
        hash = urlsafe_b64encode(sha1(str(self).encode()).digest()).decode()[:-1]
×
255
        return 'image_' + hash
×
256

257
    @property
1✔
258
    @abstractmethod
1✔
259
    def qualifier(self) -> str:
1✔
260
        raise NotImplementedError
261

262

263
@attrs.define(frozen=True)
1✔
264
class DigestImageRef(ImageRef):
1✔
265
    """
266
    A fully qualified and stable reference to a Docker image in a registry.
267
    """
268

269
    #: The part after the '@', a hash of the image manifest. While it uniquely
270
    #: identifies an image within a registry, it is not consistent accross
271
    #: registries. The same image can have different digests in different
272
    #: registries.
273
    digest: str
274

275
    @classmethod
1✔
276
    def parse(cls, image_ref: str) -> Self:
1✔
277
        name, digest = image_ref.split('@')
1✔
278
        return cls.create(name, digest)
1✔
279

280
    @classmethod
1✔
281
    def create(cls, name: str, digest: str) -> Self:
1✔
282
        return super()._create(name, digest=digest)
1✔
283

284
    def __str__(self) -> str:
1✔
285
        """
286
        The inverse of :py:meth:`parse`.
287
        """
288
        return self.name + '@' + self.digest
1✔
289

290
    @property
1✔
291
    def qualifier(self) -> str:
1✔
292
        return self.digest
1✔
293

294

295
@attrs.define(frozen=True)
1✔
296
class TagImageRef(ImageRef):
1✔
297
    """
298
    A fully qualified reference to a tagged Docker image in a registry.
299
    """
300

301
    #: The part after the colon in an image name. This is the name of a tag
302
    #: associated with the image. Tags refer to digests and are mutable. For a
303
    #: stable references to images in a registry use :py:class:`DigestImageRef`.
304
    tag: str
305

306
    @classmethod
1✔
307
    def parse(cls, image_ref: str) -> Self:
1✔
308
        # A colon in the first part of the name might separate host and port
309
        name, _, tag = image_ref.rpartition(':')
1✔
310
        return cls.create(name, tag)
1✔
311

312
    @classmethod
1✔
313
    def create(cls, name: str, tag: str) -> Self:
1✔
314
        return super()._create(name, tag=tag)
1✔
315

316
    def __str__(self) -> str:
1✔
317
        """
318
        The inverse of :py:meth:`parse`.
319
        """
320
        return self.name + ':' + self.tag
1✔
321

322
    @property
1✔
323
    def qualifier(self) -> str:
1✔
324
        return self.tag
×
325

326

327
@attrs.define(frozen=True)
1✔
328
class Platform:
1✔
329
    os: str
330
    arch: str
331
    variant: Optional[str]
332

333
    def normalize(self) -> Self:
1✔
334
        os = _normalize_os(self.os)
1✔
335
        arch, variant = _normalize_arch(self.arch, self.variant)
1✔
336
        return attrs.evolve(self, os=os, arch=arch, variant=variant)
1✔
337

338
    @classmethod
1✔
339
    def parse(cls, platform: str) -> Self:
1✔
340
        os, arch, variant = padded(platform.split('/'), None, 3)
1✔
341
        assert os, R('Invalid operating system', platform)
1✔
342
        assert arch, R('Invalid architecture', platform)
1✔
343
        assert variant is None or variant, R('Invalid variant', platform)
1✔
344
        return cls(os=os, arch=arch, variant=variant)
1✔
345

346
    @classmethod
1✔
347
    def from_json(cls, platform, config: bool = False) -> Self:
1✔
348
        def case(s):
1✔
349
            return s.capitalize() if config else s
1✔
350

351
        return cls(os=platform[case('os')],
1✔
352
                   arch=platform[case('architecture')],
353
                   variant=platform.get(case('variant')))
354

355
    def __str__(self) -> str:
1✔
356
        result = [self.os, self.arch]
1✔
357
        if self.variant is not None:
1✔
358
            result.append(self.variant)
×
359
        return '/'.join(result)
1✔
360

361

362
images_by_alias = {
1✔
363
    alias: TagImageRef.parse(spec['ref'])
364
    for alias, spec in config.docker_images.items()
365
}
366

367
images = images_by_alias.values()
1✔
368

369
platforms = list(map(Platform.parse, config.docker_platforms))
1✔
370

371
images_by_name: dict[str, list] = defaultdict(list)
1✔
372
for image in images:
1✔
373
    images_by_name[image.name].append(image)
1✔
374
del image
1✔
375

376
images_by_tf_repository: dict[tuple[str, str], list[TagImageRef]] = {
1✔
377
    (name, one(set(image.tf_repository for image in images))): images
378
    for name, images in images_by_name.items()
379
}
380

381

382
# https://github.com/containerd/containerd/blob/1fbd70374134b891f97ce19c70b6e50c7b9f4e0d/platforms/database.go#L62
383

384
def _normalize_os(os: str) -> str:
1✔
385
    os = os and os.lower()
1✔
386
    if os == 'macos':
1✔
387
        os = 'darwin'
×
388
    return os
1✔
389

390

391
# https://github.com/containerd/containerd/blob/1fbd70374134b891f97ce19c70b6e50c7b9f4e0d/platforms/database.go#L76
392

393
def _normalize_arch(arch: str,
1✔
394
                    variant: Optional[str]
395
                    ) -> tuple[str, Optional[str]]:
396
    arch = arch.lower()
1✔
397
    variant = variant and variant.lower()
1✔
398
    if arch == 'i386':
1✔
399
        arch = '386'
×
400
        variant = None
×
401
    elif arch in ('x86_64', 'x86-64', 'amd64'):
1✔
402
        arch = 'amd64'
1✔
403
        if variant == 'v1':
1✔
404
            variant = None
×
405
    elif arch in ('aarch64', 'arm64'):
×
406
        arch = 'arm64'
×
407
        if variant in ('8', 'v8'):
×
408
            variant = None
×
409
    elif arch == 'armhf':
×
410
        arch = 'arm'
×
411
        variant = 'v7'
×
412
    elif arch == 'armel':
×
413
        arch = 'arm'
×
414
        variant = 'v6'
×
415
    elif arch == 'arm':
×
416
        if variant in (None, '7'):
×
417
            variant = 'v7'
×
418
        elif variant in ('5', '6', '8'):
×
419
            variant = 'v' + variant
×
420
    return arch, variant
1✔
421

422

423
class Gist(TypedDict):
1✔
424
    """
425
    Represents an image manifest or a blob, or any Docker artifact with a digest
426
    """
427

428
    #: A hash of the content, typically starting in `sha256:`
429
    digest: str
430

431

432
class ImageGist(Gist):
1✔
433
    """
434
    A Docker image
435
    """
436
    #: Type of system to run the image on, as in `os/arch` or `os/arch/variant`
437
    platform: str
438

439
    #: The hash of the image config JSON, most likely starting in `sha256:`.
440
    #: This is consistent accross registries and includes the hashes of the
441
    #: uncompressed, binary content of the image, and is commonly referred to as
442
    #: the "image ID".
443
    id: str
444

445

446
class IndexImageGist(Gist):
1✔
447
    """
448
    A multi-platform image, also known as an image index
449
    """
450
    #: While the inherited ``digest`` property pertains to the original
451
    #: registry, this property contains the digest of the image in the mirror
452
    #: registry, i.e. ECR.  Even though the digests of the platform-specific
453
    #: parts of a multi-platform image are the same in both registries, the
454
    #: digest of the mirrored multi-platform image usually differs from the
455
    #: original because 1) the mirror only includes a subset of the original
456
    #: parts and 2) the digest algorithm is generally sensitive to insignificant
457
    #: JSON differences in whitespace or property order.
458
    mirror_digest: str
459

460
    #: The images in the list, by platform (`os/arch` or `os/arch/variant`)
461
    parts: dict[str, ImageGist]
462

463

464
@attrs.define(frozen=True, slots=False)
1✔
465
class Repository:
1✔
466
    image_ref: ImageRef
467

468
    @cached_property
1✔
469
    def host(self) -> str:
1✔
470
        return self.image_ref.registry_host
×
471

472
    @cached_property
1✔
473
    def name(self) -> str:
1✔
474
        return self.image_ref.relative_name
×
475

476
    @classmethod
1✔
477
    def get_gists(cls) -> dict[str, ImageGist | IndexImageGist]:
1✔
478
        gists = {}
×
479
        for alias, ref in images_by_alias.items():
×
480
            log.info('Getting information for %r (%s)', alias, ref)
×
481
            repository = cls(ref)
×
482
            digest = repository.get_tag(ref.tag)
×
483
            gists[str(ref)] = repository.get_gist(digest)
×
484
        return gists
×
485

486
    def get_tag(self, tag: str) -> str:
1✔
487
        """
488
        Return the manifest digest associated with the given tag.
489
        """
490
        log.info('Getting tag %r', tag)
×
491
        digest, _ = self._client.head_manifest_and_response(tag)
×
492
        return digest
×
493

494
    def get_gist(self, digest: str) -> ImageGist | IndexImageGist:
1✔
495
        """
496
        Return the manifest for the given digest.
497
        """
498
        log.info('Getting manifest %r', digest)
×
499
        manifest, _ = self._client.get_manifest_and_response(digest)
×
500
        manifest = json.loads(manifest)
×
501
        match manifest['mediaType']:
×
502
            case ('application/vnd.oci.image.index.v1+json'
×
503
                  | 'application/vnd.docker.distribution.manifest.list.v2+json'):
504
                parts = self._get_mirrored_parts(manifest['manifests'])
×
505
                mirror_manifest = ImageIndexManifest.create({
×
506
                    platform: ImageIndexPart(digest=part['digest'], size=size)
507
                    for platform, (part, size) in parts.items()
508
                })
509
                return {
×
510
                    'digest': digest,
511
                    'mirror_digest': mirror_manifest.digest,
512
                    'parts': {str(platform): part for platform, (part, size) in parts.items()}
513
                }
514
            case ('application/vnd.docker.distribution.manifest.v2+json'
×
515
                  | 'application/vnd.oci.image.manifest.v1+json'):
516
                config_digest = manifest['config']['digest']
×
517
                config = json.loads(self.get_blob(config_digest))
×
518
                return {
×
519
                    'digest': digest,
520
                    'id': config_digest,
521
                    'platform': str(Platform.from_json(config).normalize())
522
                }
523
            case media_type:
×
524
                raise NotImplementedError(media_type)
525

526
    def _get_mirrored_parts(self,
1✔
527
                            manifests: JSONs
528
                            ) -> dict[Platform, tuple[ImageGist, int]]:
529
        gists = {}
×
530
        for manifest in manifests:
×
531
            platform = Platform.from_json(manifest['platform']).normalize()
×
532
            if platform in platforms:
×
533
                digest, size = json_str(manifest['digest']), json_int(manifest['size'])
×
534
                gist = self.get_gist(digest)
×
535
                assert gist.get('platform') == str(platform), R(
×
536
                    'Inconsistent platform between manifest and manifest list',
537
                    manifest, gist)
538
                gists[platform] = cast(ImageGist, gist), size
×
539
        return gists
×
540

541
    def get_blob(self, digest: str) -> bytes:
1✔
542
        """
543
        Return the content for the given digest.
544
        """
545
        log.info('Getting blob %r', digest)
×
546
        chunks = self._client.pull_blob(digest)
×
547
        return b''.join(chunks)
×
548

549
    @cached_property
1✔
550
    def _client(self):
1✔
551
        return DXF(host=self.host,
×
552
                   repo=self.name,
553
                   auth=self._dxf_auth,
554
                   insecure=self.host.startswith('localhost:') or self.host == 'localhost')
555

556
    def _dxf_auth(self, dxf: DXFBase, response: requests.models.Response):
1✔
557
        username, password = self._auth
×
558
        dxf.authenticate(username=username,
×
559
                         password=password,
560
                         response=response)
561

562
    @cached_property
1✔
563
    def _auth(self) -> tuple[str, str]:
1✔
564
        auth_server_url = self.image_ref.auth_server_url
×
565
        with open(os.path.expanduser('~/.docker/config.json')) as f:
×
566
            config = json.load(f)
×
567
        try:
×
568
            creds_store = config['credsStore']
×
569
        except KeyError:
×
570
            return self._decode_auth(config['auths'][auth_server_url]['auth'])
×
571
        else:
572
            command = [('docker-credential-' + creds_store), 'get']
×
573
            input = auth_server_url.encode('ascii')
×
574
            log.info('Running %r with input %r', command, input)
×
575
            process = subprocess.run(args=command, stdout=subprocess.PIPE, input=input)
×
576
            output = process.stdout
×
577
            assert process.returncode == 0, R(
×
578
                f'Command {command} failed with status code {process.returncode}',
579
                output, 'You may need to login into Docker Desktop')
580
            credentials = json.loads(output)
×
581
            return credentials['Username'], credentials['Secret']
×
582

583
    @property
1✔
584
    def encoded_auth(self) -> str:
1✔
585
        return self._encode_auth(*self._auth)
×
586

587
    def _decode_auth(self, auth: str) -> tuple[str, str]:
1✔
588
        auth = b64decode(auth.encode('ascii')).decode()
×
589
        username, _, secret = auth.partition(':')
×
590
        return username, secret
×
591

592
    def _encode_auth(self, username: str, secret: str) -> str:
1✔
593
        auth = username + ':' + secret
×
594
        return b64encode(auth.encode()).decode('ascii')
×
595

596
    @classmethod
1✔
597
    @contextmanager
1✔
598
    def temporary_auth_file(cls, *refs: ImageRef):
1✔
599
        """
600
        While some utilities in the Docker/OCI ecosystem are able to read
601
        plain-text credentials from the Docker client's configuration file
602
        (~/.docker/config.json), they often lack support for the credential
603
        helpers that can be configured there. Removing the credStore entry from
604
        that configiguration file would disable these helpers, but a prominent
605
        Docker client distribution (Docker Desktop for macOS and Windows)
606
        reinserts the entry every time it starts up.
607

608
        This context manager provides a temporary containers-auth.json [1] with
609
        plain-text credentials for the repositories hosting the given images.
610
        The credentials are obtained by extracting plain-text credentials from
611
        ~/.docker/config.json or by invoking the credStore helper configured
612
        there. The path to the temporary file is passed to the context on entry
613
        and the file is deleted when the context is exited.
614

615
        [1] https://github.com/containers/image/blob/main/docs/containers-auth.json.5.md
616
        """
617
        with tempfile.NamedTemporaryFile() as auth_file:
×
618
            auths = {
×
619
                'auths': {
620
                    ref.auth_server_url: {
621
                        'auth': cls(ref).encoded_auth
622
                    }
623
                    for ref in refs
624
                },
625
            }
626
            auth_file.write(json.dumps(auths).encode())
×
627
            auth_file.flush()
×
628
            yield auth_file.name
×
629

630

631
@attrs.frozen(kw_only=True)
1✔
632
class ImageIndexPart:
1✔
633
    digest: str
634
    size: int
635

636

637
@attrs.frozen(kw_only=True)
1✔
638
class ImageIndexManifest:
1✔
639
    json: str
640
    digest: str
641

642
    @classmethod
1✔
643
    def create(cls, parts: dict[Platform, ImageIndexPart]) -> Self:
1✔
644
        manifest = {
×
645
            'schemaVersion': 2,
646
            'mediaType': 'application/vnd.docker.distribution.manifest.list.v2+json',
647
            'manifests': [
648
                {
649
                    'mediaType': 'application/vnd.docker.distribution.manifest.v2+json',
650
                    'size': part.size,
651
                    'digest': part.digest,
652
                    'platform': {
653
                        'architecture': platform.arch,
654
                        'os': platform.os
655
                    }
656
                }
657
                for platform, part in parts.items()
658
            ]
659
        }
660
        manifest = json.dumps(manifest, indent=4)
×
661
        digest = 'sha256:' + sha256(manifest.encode()).hexdigest()
×
662
        return cls(json=manifest, digest=digest)
×
663

664

665
def pull_docker_image(ref: ImageRef) -> Image:
1✔
666
    return _push_or_pull(ref, 'pull')
1✔
667

668

669
def push_docker_image(ref: ImageRef) -> Image:
1✔
670
    return _push_or_pull(ref, 'push')
×
671

672

673
def _push_or_pull(ref: ImageRef,
1✔
674
                  direction: Literal['push'] | Literal['pull']
675
                  ) -> Image:
676
    log.info('%sing image %r …', direction.capitalize(), ref)
1✔
677
    client = docker.client.from_env()
1✔
678
    # Despite its name, the `tag` keyword argument can be a digest, too
679
    method = getattr(client.api, direction)
1✔
680
    output = method(ref.name, tag=ref.qualifier, stream=True)
1✔
681
    log_lines(ref, direction, output)
1✔
682
    log.info('%sed image %r', direction.capitalize(), ref)
1✔
683
    return client.images.get(str(ref))
1✔
684

685

686
def log_lines(context: Any, command: str, output: Iterable[bytes]):
1✔
687
    for line in output:
1✔
688
        log.debug('%s: docker %s %s', context, command, line.decode().strip())
1✔
689

690

691
def get_docker_image_gist(ref: TagImageRef) -> ImageGist | IndexImageGist:
1✔
692
    return get_docker_image_gists()[str(ref)]
1✔
693

694

695
def get_docker_image_gists() -> dict[str, ImageGist | IndexImageGist]:
1✔
696
    with open(config.docker_image_gists_path) as f:
1✔
697
        return json.load(f)
1✔
698

699

700
@cache
1✔
701
def resolve_docker_image_for_launch(alias: str) -> str:
1✔
702
    """
703
    Return an image reference that can be used to launch a container from the
704
    image with the given alias. The alias is the top level key in the JSON
705
    object contained in the environment variable `azul_docker_images`.
706
    """
707
    ref_to_pull, gist = resolve_docker_image_for_pull(alias)
1✔
708
    image = pull_docker_image(ref_to_pull)
1✔
709
    # In either case, the verification below ensures that the image we pulled
710
    # has the expected ID.
711
    try:
1✔
712
        parts = cast(IndexImageGist, gist)['parts']
1✔
713
    except KeyError:
×
714
        # For single-platform images, this is straight forward.
715
        assert image.id == cast(ImageGist, gist)['id']
×
716
    else:
717
        # When pulling multi-platform images into the `containerd` image store
718
        # (which is the default for fresh installations of Docker 29 or newer),
719
        # the ID of the pulled image is set to the digest of the manifest aka
720
        # image index. On installations that still use the older `overlay2`
721
        # image store, the ID of the pulled image is set to the config digest of
722
        # the respective platform-specific image. For now, we'll accept either
723
        # to support both image stores. For overlay2, we need to determine what
724
        # specific platform was pulled since we left it to Docker to pick the
725
        # best match.
726
        gist = cast(IndexImageGist, gist)
1✔
727
        if ref_to_pull.is_mirrored:
1✔
728
            index_digest = gist['mirror_digest']
×
729
        else:
730
            index_digest = gist['digest']
1✔
731
        platform = Platform.from_json(image.attrs, config=True).normalize()
1✔
732
        config_digest = parts[str(platform)]['id']
1✔
733
        assert image.id in (index_digest, config_digest), (image.id, gist)
1✔
734
    # Returning the image ID means that the container will be launched using
735
    # exactly the image we just pulled and verified.
736
    return image.id
1✔
737

738

739
def resolve_docker_image_for_pull(alias: str
1✔
740
                                  ) -> tuple[DigestImageRef, ImageGist | IndexImageGist]:
741
    """
742
    Return a reference to, and the gist of, the image with the given alias, for
743
    the purpose of pulling said image.
744
    """
745
    ref = TagImageRef.parse(config.docker_images[alias]['ref'])
1✔
746
    log.info('Resolving %r image %r …', alias, ref)
1✔
747
    gist = get_docker_image_gist(ref)
1✔
748
    ref = ref.port_to(config.docker_registry)
1✔
749
    # For multi-arch images, we need to use the digest of the mirrored image, if
750
    # we're pulling from a mirror.  For single-arch images, the digest is the
751
    # same between the upstream and mirror registries.
752
    if 'parts' in gist and ref.is_mirrored:
1✔
753
        digest = cast(IndexImageGist, gist)['mirror_digest']
×
754
    else:
755
        digest = gist['digest']
1✔
756
    ref = ref.with_digest(digest)
1✔
757
    log.info('Resolved %r image to %r', alias, ref)
1✔
758
    return ref, gist
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc