• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 17405457612

02 Sep 2025 01:42PM UTC coverage: 86.876% (-0.1%) from 86.992%
17405457612

push

github

web-flow
feat: handle private images from gitlab (#996)

Co-authored-by: Mohammad Alisafaee <mohammad.alisafaee@epfl.ch>

68 of 136 new or added lines in 13 files covered. (50.0%)

2 existing lines in 2 files now uncovered.

21831 of 25129 relevant lines covered (86.88%)

1.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.87
/components/renku_data_services/notebooks/api/classes/image.py
1
"""Used to get information about docker images used in jupyter servers."""
2

3
import base64
2✔
4
import re
2✔
5
from dataclasses import dataclass, field
2✔
6
from enum import Enum
2✔
7
from pathlib import PurePosixPath
2✔
8
from typing import Any, Optional, Self, cast
2✔
9

10
import httpx
2✔
11
from werkzeug.datastructures import WWWAuthenticate
2✔
12

13
from renku_data_services.errors import errors
2✔
14

15

16
class ManifestTypes(Enum):
2✔
17
    """The mime types for docker image manifests."""
18

19
    docker_v2 = "application/vnd.docker.distribution.manifest.v2+json"
2✔
20
    docker_v2_list = "application/vnd.docker.distribution.manifest.list.v2+json"
2✔
21
    oci_v1_manifest = "application/vnd.oci.image.manifest.v1+json"
2✔
22
    oci_v1_index = "application/vnd.oci.image.index.v1+json"
2✔
23

24

25
DEFAULT_PLATFORM_ARCHITECTURE = "amd64"
2✔
26
DEFAULT_PLATFORM_OS = "linux"
2✔
27

28

29
@dataclass
2✔
30
class ImageRepoDockerAPI:
2✔
31
    """Used to query the docker image repository API.
32

33
    Please note that all image repositories use this API, not just Dockerhub.
34
    """
35

36
    hostname: str
2✔
37
    oauth2_token: Optional[str] = field(default=None, repr=False)
2✔
38
    # NOTE: We need to follow redirects so that we can authenticate with the image repositories properly.
39
    # NOTE: If we do not use default_factory to create the client here requests will fail because it can happen
40
    # that the client gets created in the wrong asyncio loop.
41
    client: httpx.AsyncClient = field(default_factory=lambda: httpx.AsyncClient(timeout=10, follow_redirects=True))
2✔
42
    scheme: str = "https"
2✔
43

44
    def __post_init__(self) -> None:
2✔
45
        self.hostname = self.hostname.rstrip("/")
1✔
46
        if self.scheme == "":
1✔
NEW
47
            self.scheme = "https"
×
48

49
    async def _get_docker_token(self, image: "Image") -> Optional[str]:
2✔
50
        """Get an authorization token from the docker v2 API.
51

52
        This will return the token provided by the API (or None if no token was found).
53
        """
54
        image_digest_url = f"{self.scheme}://{self.hostname}/v2/{image.name}/manifests/{image.tag}"
1✔
55
        try:
1✔
56
            auth_req = await self.client.get(image_digest_url)
1✔
57
        except httpx.ConnectError:
×
58
            auth_req = None
×
59
        if auth_req is None or not (auth_req.status_code == 401 and "Www-Authenticate" in auth_req.headers):
1✔
60
            # the request status code and header are not what is expected
61
            return None
×
62
        www_auth = WWWAuthenticate.from_header(auth_req.headers["Www-Authenticate"])
1✔
63
        if not www_auth:
1✔
64
            return None
×
65
        params = {**www_auth.parameters}
1✔
66
        realm = params.pop("realm")
1✔
67
        if not realm:
1✔
68
            return None
×
69
        headers = {"Accept": "application/json"}
1✔
70
        if self.oauth2_token:
1✔
71
            creds = base64.urlsafe_b64encode(f"oauth2:{self.oauth2_token}".encode()).decode()
×
72
            headers["Authorization"] = f"Basic {creds}"
×
73
        token_req = await self.client.get(realm, params=params, headers=headers)
1✔
74
        return str(token_req.json().get("token"))
1✔
75

76
    async def get_image_manifest(
2✔
77
        self,
78
        image: "Image",
79
        platform_architecture: str = DEFAULT_PLATFORM_ARCHITECTURE,
80
        platform_os: str = DEFAULT_PLATFORM_OS,
81
    ) -> Optional[dict[str, Any]]:
82
        """Query the docker API to get the manifest of an image."""
83
        if image.hostname != self.hostname:
1✔
84
            raise errors.ValidationError(
×
85
                message=f"The image hostname {image.hostname} does not match the image repository {self.hostname}"
86
            )
87
        token = await self._get_docker_token(image)
1✔
88
        image_digest_url = f"{self.scheme}://{image.hostname}/v2/{image.name}/manifests/{image.tag}"
1✔
89
        headers = {"Accept": ManifestTypes.docker_v2.value}
1✔
90
        if token:
1✔
91
            headers["Authorization"] = f"Bearer {token}"
1✔
92
        res = await self.client.get(image_digest_url, headers=headers)
1✔
93
        if res.status_code != 200:
1✔
94
            headers["Accept"] = ManifestTypes.oci_v1_manifest.value
1✔
95
            res = await self.client.get(image_digest_url, headers=headers)
1✔
96
        if res.status_code != 200:
1✔
97
            headers["Accept"] = ManifestTypes.oci_v1_index.value
1✔
98
            res = await self.client.get(image_digest_url, headers=headers)
1✔
99
        if res.status_code != 200:
1✔
100
            return None
1✔
101

102
        content_type = res.headers.get("Content-Type")
1✔
103
        if content_type in [ManifestTypes.docker_v2_list.value, ManifestTypes.oci_v1_index.value]:
1✔
104
            index_parsed = res.json()
1✔
105

106
            def platform_matches(manifest: dict[str, Any]) -> bool:
1✔
107
                platform: dict[str, Any] = manifest.get("platform", {})
1✔
108
                return platform.get("architecture") == platform_architecture and platform.get("os") == platform_os
1✔
109

110
            manifest: dict[str, Any] = next(filter(platform_matches, index_parsed.get("manifests", [])), {})
1✔
111
            image_digest: str | None = manifest.get("digest")
1✔
112
            if not manifest or not image_digest:
1✔
113
                return None
×
114
            image_digest_url = f"{self.scheme}://{image.hostname}/v2/{image.name}/manifests/{image_digest}"
1✔
115
            media_type = manifest.get("mediaType")
1✔
116
            headers["Accept"] = ManifestTypes.docker_v2.value
1✔
117
            if media_type in [
1✔
118
                ManifestTypes.docker_v2.value,
119
                ManifestTypes.oci_v1_manifest.value,
120
            ]:
121
                headers["Accept"] = media_type
1✔
122
            res = await self.client.get(image_digest_url, headers=headers)
1✔
123
            if res.status_code != 200:
1✔
124
                headers["Accept"] = ManifestTypes.oci_v1_manifest.value
×
125
                res = await self.client.get(image_digest_url, headers=headers)
×
126
            if res.status_code != 200:
1✔
127
                return None
×
128

129
        if res.headers.get("Content-Type") not in [
1✔
130
            ManifestTypes.docker_v2.value,
131
            ManifestTypes.oci_v1_manifest.value,
132
        ]:
133
            return None
×
134

135
        return cast(dict[str, Any], res.json())
1✔
136

137
    async def image_exists(self, image: "Image") -> bool:
2✔
138
        """Check the docker repo API if the image exists."""
139
        return await self.get_image_manifest(image) is not None
1✔
140

141
    async def get_image_config(self, image: "Image") -> Optional[dict[str, Any]]:
2✔
142
        """Query the docker API to get the configuration of an image."""
143
        manifest = await self.get_image_manifest(image)
1✔
144
        if manifest is None:
1✔
145
            return None
1✔
146
        config_digest = manifest.get("config", {}).get("digest")
1✔
147
        if config_digest is None:
1✔
148
            return None
×
149
        token = await self._get_docker_token(image)
1✔
150
        res = await self.client.get(
1✔
151
            f"{self.scheme}://{image.hostname}/v2/{image.name}/blobs/{config_digest}",
152
            headers={
153
                "Accept": "application/json",
154
                "Authorization": f"Bearer {token}",
155
            },
156
        )
157
        if res.status_code != 200:
1✔
158
            return None
×
159
        return cast(dict[str, Any], res.json())
1✔
160

161
    async def image_workdir(self, image: "Image") -> Optional[PurePosixPath]:
2✔
162
        """Query the docker API to get the workdir of an image."""
163
        config = await self.get_image_config(image)
1✔
164
        if config is None:
1✔
165
            return None
1✔
166
        nested_config = config.get("config", {})
1✔
167
        if nested_config is None:
1✔
168
            return None
×
169
        workdir = nested_config.get("WorkingDir", "/")
1✔
170
        if workdir == "":
1✔
171
            workdir = "/"
×
172
        return PurePosixPath(workdir)
1✔
173

174
    def with_oauth2_token(self, oauth2_token: str) -> "ImageRepoDockerAPI":
2✔
175
        """Return a docker API instance with the token as authentication."""
176
        return ImageRepoDockerAPI(self.hostname, oauth2_token)
×
177

178
    def maybe_with_oauth2_token(self, token_hostname: str | None, oauth2_token: str | None) -> "ImageRepoDockerAPI":
2✔
179
        """Return a docker API instance with the token as authentication.
180

181
        The token is used only if the image hostname matches the token hostname.
182
        """
183
        if isinstance(token_hostname, str) and self.hostname == token_hostname and oauth2_token:
1✔
184
            return ImageRepoDockerAPI(self.hostname, oauth2_token)
×
185
        else:
186
            return self
1✔
187

188

189
@dataclass
2✔
190
class Image:
2✔
191
    """Representation of a docker image."""
192

193
    hostname: str
2✔
194
    name: str
2✔
195
    tag: str
2✔
196

197
    @classmethod
2✔
198
    def from_path(cls, path: str) -> Self:
2✔
199
        """Create an image from a path like 'nginx:1.28'."""
200

201
        def build_re(*parts: str) -> re.Pattern:
1✔
202
            """Assemble the regex."""
203
            return re.compile(r"^" + r"".join(parts) + r"$")
1✔
204

205
        hostname = r"(?P<hostname>(?<=^)[a-zA-Z0-9_\-]{1,}\.[a-zA-Z0-9\._\-:]{1,}(?=\/))"
1✔
206
        docker_username = r"(?P<username>(?<=^)[a-zA-Z0-9]{1,}(?=\/))"
1✔
207
        username = r"(?P<username>(?<=\/)[a-zA-Z0-9\._\-]{1,}(?=\/))"
1✔
208
        docker_image = r"(?P<image>(?:(?<=\/)|(?<=^))[a-zA-Z0-9\._\-]{1,}(?:(?=:)|(?=@)|(?=$)))"
1✔
209
        image = r"(?P<image>(?:(?<=\/)|(?<=^))[a-zA-Z0-9\._\-\/]{1,}(?:(?=:)|(?=@)|(?=$)))"
1✔
210
        sha = r"(?P<tag>(?<=@)[a-zA-Z0-9\._\-:]{1,}(?=$))"
1✔
211
        tag = r"(?P<tag>(?<=:)[a-zA-Z0-9\._\-]{1,}(?=$))"
1✔
212

213
        # a list of tuples with (regex, defaults to fill in case of match)
214
        regexes: list[tuple[re.Pattern, dict[str, str]]] = [
1✔
215
            # nginx
216
            (
217
                build_re(docker_image),
218
                {
219
                    "hostname": "registry-1.docker.io",
220
                    "username": "library",
221
                    "tag": "latest",
222
                },
223
            ),
224
            # username/image
225
            (
226
                build_re(docker_username, r"\/", docker_image),
227
                {"hostname": "registry-1.docker.io", "tag": "latest"},
228
            ),
229
            # nginx:1.28
230
            (
231
                build_re(docker_image, r":", tag),
232
                {"hostname": "registry-1.docker.io", "username": "library"},
233
            ),
234
            # username/image:1.0.0
235
            (
236
                build_re(docker_username, r"\/", docker_image, r":", tag),
237
                {"hostname": "registry-1.docker.io"},
238
            ),
239
            # nginx@sha256:24235rt2rewg345ferwf
240
            (
241
                build_re(docker_image, r"@", sha),
242
                {"hostname": "registry-1.docker.io", "username": "library"},
243
            ),
244
            # username/image@sha256:fdsaf345tre3412t1413r
245
            (
246
                build_re(docker_username, r"\/", docker_image, r"@", sha),
247
                {"hostname": "registry-1.docker.io"},
248
            ),
249
            # gitlab.com/username/project
250
            # gitlab.com/username/project/image/subimage
251
            (build_re(hostname, r"\/", username, r"\/", image), {"tag": "latest"}),
252
            # gitlab.com/username/project:1.2.3
253
            # gitlab.com/username/project/image/subimage:1.2.3
254
            (build_re(hostname, r"\/", username, r"\/", image, r":", tag), {}),
255
            # gitlab.com/username/project@sha256:324fet13t4
256
            # gitlab.com/username/project/image/subimage@sha256:324fet13t4
257
            (build_re(hostname, r"\/", username, r"\/", image, r"@", sha), {}),
258
        ]
259

260
        matches = []
1✔
261
        for regex, fill in regexes:
1✔
262
            match = regex.match(path)
1✔
263
            if match is not None:
1✔
264
                match_dict = match.groupdict()
1✔
265
                match_dict.update(fill)
1✔
266
                # lump username in image name - not required to have it separate
267
                # however separating these in the regex makes it easier to match
268
                match_dict["image"] = match_dict["username"] + "/" + match_dict["image"]
1✔
269
                match_dict.pop("username")
1✔
270
                matches.append(match_dict)
1✔
271
        if len(matches) == 1:
1✔
272
            return cls(matches[0]["hostname"], matches[0]["image"], matches[0]["tag"])
1✔
273
        elif len(matches) > 1:
×
274
            raise errors.ValidationError(message=f"Cannot parse the image {path}, too many interpretations {matches}")
×
275
        else:
276
            raise errors.ValidationError(message=f"Cannot parse the image {path}")
×
277

278
    def repo_api(self) -> ImageRepoDockerAPI:
2✔
279
        """Get the docker API from the image."""
280
        return ImageRepoDockerAPI(self.hostname)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc