• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 18123243513

30 Sep 2025 08:10AM UTC coverage: 86.702% (-0.01%) from 86.714%
18123243513

Pull #1019

github

web-flow
Merge e726c4543 into 0690bab65
Pull Request #1019: feat: Attempt to support dockerhub private images

70 of 101 new or added lines in 9 files covered. (69.31%)

106 existing lines in 6 files now uncovered.

22357 of 25786 relevant lines covered (86.7%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.36
/components/renku_data_services/notebooks/core_sessions.py
1
"""A selection of core functions for AmaltheaSessions."""
2

3
import base64
2✔
4
import json
2✔
5
import os
2✔
6
import random
2✔
7
import string
2✔
8
from collections.abc import AsyncIterator, Sequence
2✔
9
from datetime import timedelta
2✔
10
from pathlib import PurePosixPath
2✔
11
from typing import Protocol, TypeVar, cast
2✔
12
from urllib.parse import urljoin, urlparse
2✔
13

14
import httpx
2✔
15
from kubernetes.client import V1ObjectMeta, V1Secret
2✔
16
from sanic import Request
2✔
17
from toml import dumps
2✔
18
from ulid import ULID
2✔
19
from yaml import safe_dump
2✔
20

21
import renku_data_services.notebooks.image_check as ic
2✔
22
from renku_data_services.app_config import logging
2✔
23
from renku_data_services.base_models import AnonymousAPIUser, APIUser, AuthenticatedAPIUser
2✔
24
from renku_data_services.base_models.metrics import MetricsService
2✔
25
from renku_data_services.connected_services.db import ConnectedServicesRepository
2✔
26
from renku_data_services.connected_services.models import ProviderKind
2✔
27
from renku_data_services.crc.db import ClusterRepository, ResourcePoolRepository
2✔
28
from renku_data_services.crc.models import GpuKind, ResourceClass, ResourcePool
2✔
29
from renku_data_services.data_connectors.db import (
2✔
30
    DataConnectorSecretRepository,
31
)
32
from renku_data_services.data_connectors.models import DataConnectorSecret, DataConnectorWithSecrets
2✔
33
from renku_data_services.errors import errors
2✔
34
from renku_data_services.k8s.models import K8sSecret, sanitizer
2✔
35
from renku_data_services.notebooks import apispec, core
2✔
36
from renku_data_services.notebooks.api.amalthea_patches import git_proxy, init_containers
2✔
37
from renku_data_services.notebooks.api.amalthea_patches.init_containers import user_secrets_extras
2✔
38
from renku_data_services.notebooks.api.classes.image import Image
2✔
39
from renku_data_services.notebooks.api.classes.repository import GitProvider, Repository
2✔
40
from renku_data_services.notebooks.api.schemas.cloud_storage import RCloneStorage
2✔
41
from renku_data_services.notebooks.config import NotebooksConfig
2✔
42
from renku_data_services.notebooks.cr_amalthea_session import TlsSecret
2✔
43
from renku_data_services.notebooks.crs import (
2✔
44
    AmaltheaSessionSpec,
45
    AmaltheaSessionV1Alpha1,
46
    AmaltheaSessionV1Alpha1MetadataPatch,
47
    AmaltheaSessionV1Alpha1Patch,
48
    AmaltheaSessionV1Alpha1SpecPatch,
49
    AmaltheaSessionV1Alpha1SpecSessionPatch,
50
    Authentication,
51
    AuthenticationType,
52
    Culling,
53
    DataSource,
54
    ExtraContainer,
55
    ExtraVolume,
56
    ExtraVolumeMount,
57
    ImagePullPolicy,
58
    ImagePullSecret,
59
    Ingress,
60
    InitContainer,
61
    Limits,
62
    LimitsStr,
63
    Metadata,
64
    ReconcileStrategy,
65
    Requests,
66
    RequestsStr,
67
    Resources,
68
    SecretAsVolume,
69
    SecretAsVolumeItem,
70
    Session,
71
    SessionEnvItem,
72
    ShmSizeStr,
73
    SizeStr,
74
    State,
75
    Storage,
76
)
77
from renku_data_services.notebooks.models import ExtraSecret, SessionExtraResources
2✔
78
from renku_data_services.notebooks.util.kubernetes_ import (
2✔
79
    renku_2_make_server_name,
80
)
81
from renku_data_services.notebooks.utils import (
2✔
82
    node_affinity_from_resource_class,
83
    tolerations_from_resource_class,
84
)
85
from renku_data_services.project.db import ProjectRepository, ProjectSessionSecretRepository
2✔
86
from renku_data_services.project.models import Project, SessionSecret
2✔
87
from renku_data_services.session.db import SessionRepository
2✔
88
from renku_data_services.session.models import SessionLauncher
2✔
89
from renku_data_services.users.db import UserRepo
2✔
90
from renku_data_services.utils.cryptography import get_encryption_key
2✔
91

92
logger = logging.getLogger(__name__)
2✔
93

94

95
async def get_extra_init_containers(
2✔
96
    nb_config: NotebooksConfig,
97
    user: AnonymousAPIUser | AuthenticatedAPIUser,
98
    repositories: list[Repository],
99
    git_providers: list[GitProvider],
100
    storage_mount: PurePosixPath,
101
    work_dir: PurePosixPath,
102
    uid: int = 1000,
103
    gid: int = 1000,
104
) -> SessionExtraResources:
105
    """Get all extra init containers that should be added to an amalthea session."""
106
    # TODO: The above statement is not correct: the init container for user secrets is not included here
107
    cert_init, cert_vols = init_containers.certificates_container(nb_config)
×
108
    session_init_containers = [InitContainer.model_validate(sanitizer(cert_init))]
×
109
    extra_volumes = [ExtraVolume.model_validate(sanitizer(volume)) for volume in cert_vols]
×
110
    git_clone = await init_containers.git_clone_container_v2(
×
111
        user=user,
112
        config=nb_config,
113
        repositories=repositories,
114
        git_providers=git_providers,
115
        workspace_mount_path=storage_mount,
116
        work_dir=work_dir,
117
        uid=uid,
118
        gid=gid,
119
    )
120
    if git_clone is not None:
×
121
        session_init_containers.append(InitContainer.model_validate(git_clone))
×
122
    return SessionExtraResources(
×
123
        init_containers=session_init_containers,
124
        volumes=extra_volumes,
125
    )
126

127

128
async def get_extra_containers(
2✔
129
    nb_config: NotebooksConfig,
130
    user: AnonymousAPIUser | AuthenticatedAPIUser,
131
    repositories: list[Repository],
132
    git_providers: list[GitProvider],
133
) -> SessionExtraResources:
134
    """Get the extra containers added to amalthea sessions."""
135
    conts: list[ExtraContainer] = []
×
136
    git_proxy_container = await git_proxy.main_container(
×
137
        user=user, config=nb_config, repositories=repositories, git_providers=git_providers
138
    )
139
    if git_proxy_container:
×
140
        conts.append(ExtraContainer.model_validate(sanitizer(git_proxy_container)))
×
141
    return SessionExtraResources(containers=conts)
×
142

143

144
async def get_auth_secret_authenticated(
2✔
145
    nb_config: NotebooksConfig,
146
    user: AuthenticatedAPIUser,
147
    server_name: str,
148
    base_server_url: str,
149
    base_server_https_url: str,
150
    base_server_path: str,
151
) -> ExtraSecret:
152
    """Get the extra secrets that need to be added to the session for an authenticated user."""
153
    secret_data = {}
×
154

155
    parsed_proxy_url = urlparse(urljoin(base_server_url + "/", "oauth2"))
×
156
    vol = ExtraVolume(
×
157
        name="renku-authorized-emails",
158
        secret=SecretAsVolume(
159
            secretName=server_name,
160
            items=[SecretAsVolumeItem(key="authorized_emails", path="authorized_emails")],
161
        ),
162
    )
163
    secret_data["auth"] = dumps(
×
164
        {
165
            "provider": "oidc",
166
            "client_id": nb_config.sessions.oidc.client_id,
167
            "oidc_issuer_url": nb_config.sessions.oidc.issuer_url,
168
            "session_cookie_minimal": True,
169
            "skip_provider_button": True,
170
            # NOTE: If the redirect url is not HTTPS then some or identity providers will fail.
171
            "redirect_url": urljoin(base_server_https_url + "/", "oauth2/callback"),
172
            "cookie_path": base_server_path,
173
            "proxy_prefix": parsed_proxy_url.path,
174
            "authenticated_emails_file": "/authorized_emails",
175
            "client_secret": nb_config.sessions.oidc.client_secret,
176
            "cookie_secret": base64.urlsafe_b64encode(os.urandom(32)).decode(),
177
            "insecure_oidc_allow_unverified_email": nb_config.sessions.oidc.allow_unverified_email,
178
        }
179
    )
180
    secret_data["authorized_emails"] = user.email
×
181
    secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
×
182
    vol_mount = ExtraVolumeMount(
×
183
        name="renku-authorized-emails",
184
        mountPath="/authorized_emails",
185
        subPath="authorized_emails",
186
    )
187
    return ExtraSecret(secret, vol, vol_mount)
×
188

189

190
def get_auth_secret_anonymous(nb_config: NotebooksConfig, server_name: str, request: Request) -> ExtraSecret:
2✔
191
    """Get the extra secrets that need to be added to the session for an anonymous user."""
192
    # NOTE: We extract the session cookie value here in order to avoid creating a cookie.
193
    # The gateway encrypts and signs cookies so the user ID injected in the request headers does not
194
    # match the value of the session cookie.
195
    session_id = cast(str | None, request.cookies.get(nb_config.session_id_cookie_name))
×
196
    if not session_id:
×
197
        raise errors.UnauthorizedError(
×
198
            message=f"You have to have a renku session cookie at {nb_config.session_id_cookie_name} "
199
            "in order to launch an anonymous session."
200
        )
201
    # NOTE: Amalthea looks for the token value first in the cookie and then in the authorization header
202
    secret_data = {
×
203
        "auth": safe_dump(
204
            {
205
                "authproxy": {
206
                    "token": session_id,
207
                    "cookie_key": nb_config.session_id_cookie_name,
208
                    "verbose": True,
209
                }
210
            }
211
        )
212
    }
213
    secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
×
214
    return ExtraSecret(secret)
×
215

216

217
def __get_gitlab_image_pull_secret(
2✔
218
    nb_config: NotebooksConfig, user: AuthenticatedAPIUser, image_pull_secret_name: str, access_token: str
219
) -> ExtraSecret:
220
    """Create a Kubernetes secret for private GitLab registry authentication."""
221

222
    k8s_namespace = nb_config.k8s_client.namespace()
×
223

224
    registry_secret = {
×
225
        "auths": {
226
            nb_config.git.registry: {
227
                "Username": "oauth2",
228
                "Password": access_token,
229
                "Email": user.email,
230
            }
231
        }
232
    }
233
    registry_secret = json.dumps(registry_secret)
×
234

235
    secret_data = {".dockerconfigjson": registry_secret}
×
236
    secret = V1Secret(
×
237
        metadata=V1ObjectMeta(name=image_pull_secret_name, namespace=k8s_namespace),
238
        string_data=secret_data,
239
        type="kubernetes.io/dockerconfigjson",
240
    )
241

242
    return ExtraSecret(secret)
×
243

244

245
async def get_data_sources(
2✔
246
    nb_config: NotebooksConfig,
247
    user: AnonymousAPIUser | AuthenticatedAPIUser,
248
    server_name: str,
249
    data_connectors_stream: AsyncIterator[DataConnectorWithSecrets],
250
    work_dir: PurePosixPath,
251
    cloud_storage_overrides: list[apispec.SessionCloudStoragePost],
252
    user_repo: UserRepo,
253
) -> SessionExtraResources:
254
    """Generate cloud storage related resources."""
255
    data_sources: list[DataSource] = []
×
256
    secrets: list[ExtraSecret] = []
×
257
    dcs: dict[str, RCloneStorage] = {}
×
258
    dcs_secrets: dict[str, list[DataConnectorSecret]] = {}
×
259
    user_secret_key: str | None = None
×
260
    async for dc in data_connectors_stream:
×
261
        mount_folder = (
×
262
            dc.data_connector.storage.target_path
263
            if PurePosixPath(dc.data_connector.storage.target_path).is_absolute()
264
            else (work_dir / dc.data_connector.storage.target_path).as_posix()
265
        )
266
        dcs[str(dc.data_connector.id)] = RCloneStorage(
×
267
            source_path=dc.data_connector.storage.source_path,
268
            mount_folder=mount_folder,
269
            configuration=dc.data_connector.storage.configuration,
270
            readonly=dc.data_connector.storage.readonly,
271
            name=dc.data_connector.name,
272
            secrets={str(secret.secret_id): secret.name for secret in dc.secrets},
273
            storage_class=nb_config.cloud_storage.storage_class,
274
        )
275
        if len(dc.secrets) > 0:
×
276
            dcs_secrets[str(dc.data_connector.id)] = dc.secrets
×
277
    if isinstance(user, AuthenticatedAPIUser) and len(dcs_secrets) > 0:
×
278
        secret_key = await user_repo.get_or_create_user_secret_key(user)
×
279
        user_secret_key = get_encryption_key(secret_key.encode(), user.id.encode()).decode("utf-8")
×
280
    # NOTE: Check the cloud storage overrides from the request body and if any match
281
    # then overwrite the projects cloud storages
282
    # NOTE: Cloud storages in the session launch request body that are not from the DB will cause a 404 error
283
    # NOTE: Overriding the configuration when a saved secret is there will cause a 422 error
284
    for csr in cloud_storage_overrides:
×
285
        csr_id = csr.storage_id
×
286
        if csr_id not in dcs:
×
287
            raise errors.MissingResourceError(
×
288
                message=f"You have requested a cloud storage with ID {csr_id} which does not exist "
289
                "or you don't have access to."
290
            )
291
        if csr.target_path is not None and not PurePosixPath(csr.target_path).is_absolute():
×
292
            csr.target_path = (work_dir / csr.target_path).as_posix()
×
293
        dcs[csr_id] = dcs[csr_id].with_override(csr)
×
294

295
    # Handle potential duplicate target_path
296
    dcs = _deduplicate_target_paths(dcs)
×
297

298
    for cs_id, cs in dcs.items():
×
299
        secret_name = f"{server_name}-ds-{cs_id.lower()}"
×
300
        secret_key_needed = len(dcs_secrets.get(cs_id, [])) > 0
×
301
        if secret_key_needed and user_secret_key is None:
×
302
            raise errors.ProgrammingError(
×
303
                message=f"You have saved storage secrets for data connector {cs_id} "
304
                f"associated with your user ID {user.id} but no key to decrypt them, "
305
                "therefore we cannot mount the requested data connector. "
306
                "Please report this to the renku administrators."
307
            )
308
        secret = ExtraSecret(
×
309
            cs.secret(
310
                secret_name,
311
                await nb_config.k8s_client.namespace(),
312
                user_secret_key=user_secret_key if secret_key_needed else None,
313
            )
314
        )
315
        secrets.append(secret)
×
316
        data_sources.append(
×
317
            DataSource(
318
                mountPath=cs.mount_folder,
319
                secretRef=secret.ref(),
320
                accessMode="ReadOnlyMany" if cs.readonly else "ReadWriteOnce",
321
            )
322
        )
323
    return SessionExtraResources(
×
324
        data_sources=data_sources,
325
        secrets=secrets,
326
        data_connector_secrets=dcs_secrets,
327
    )
328

329

330
async def request_dc_secret_creation(
2✔
331
    user: AuthenticatedAPIUser | AnonymousAPIUser,
332
    nb_config: NotebooksConfig,
333
    manifest: AmaltheaSessionV1Alpha1,
334
    dc_secrets: dict[str, list[DataConnectorSecret]],
335
) -> None:
336
    """Request the specified data connector secrets to be created by the secret service."""
337
    if isinstance(user, AnonymousAPIUser):
×
338
        return
×
339
    owner_reference = {
×
340
        "apiVersion": manifest.apiVersion,
341
        "kind": manifest.kind,
342
        "name": manifest.metadata.name,
343
        "uid": manifest.metadata.uid,
344
    }
345
    secrets_url = nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
×
346
    headers = {"Authorization": f"bearer {user.access_token}"}
×
347

348
    cluster_id = None
×
349
    namespace = await nb_config.k8s_v2_client.namespace()
×
350
    if (cluster := await nb_config.k8s_v2_client.cluster_by_class_id(manifest.resource_class_id(), user)) is not None:
×
351
        cluster_id = cluster.id
×
352
        namespace = cluster.namespace
×
353

354
    for s_id, secrets in dc_secrets.items():
×
355
        if len(secrets) == 0:
×
356
            continue
×
357
        request_data = {
×
358
            "name": f"{manifest.metadata.name}-ds-{s_id.lower()}-secrets",
359
            "namespace": namespace,
360
            "secret_ids": [str(secret.secret_id) for secret in secrets],
361
            "owner_references": [owner_reference],
362
            "key_mapping": {str(secret.secret_id): secret.name for secret in secrets},
363
            "cluster_id": str(cluster_id),
364
        }
365
        async with httpx.AsyncClient(timeout=10) as client:
×
366
            res = await client.post(secrets_url, headers=headers, json=request_data)
×
367
            if res.status_code >= 300 or res.status_code < 200:
×
368
                raise errors.ProgrammingError(
×
369
                    message=f"The secret for data connector with {s_id} could not be "
370
                    f"successfully created, the status code was {res.status_code}."
371
                    "Please contact a Renku administrator.",
372
                    detail=res.text,
373
                )
374

375

376
def get_launcher_env_variables(launcher: SessionLauncher, body: apispec.SessionPostRequest) -> list[SessionEnvItem]:
2✔
377
    """Get the environment variables from the launcher, with overrides from the request."""
378
    output: list[SessionEnvItem] = []
×
379
    env_overrides = {i.name: i.value for i in body.env_variable_overrides or []}
×
380
    for env in launcher.env_variables or []:
×
381
        if env.name in env_overrides:
×
382
            output.append(SessionEnvItem(name=env.name, value=env_overrides[env.name]))
×
383
        else:
384
            output.append(SessionEnvItem(name=env.name, value=env.value))
×
385
    return output
×
386

387

388
def verify_launcher_env_variable_overrides(launcher: SessionLauncher, body: apispec.SessionPostRequest) -> None:
2✔
389
    """Raise an error if there are env variables that are not defined in the launcher."""
390
    env_overrides = {i.name: i.value for i in body.env_variable_overrides or []}
×
391
    known_env_names = {i.name for i in launcher.env_variables or []}
×
392
    unknown_env_names = set(env_overrides.keys()) - known_env_names
×
393
    if unknown_env_names:
×
394
        message = f"""The following environment variables are not defined in the session launcher: {unknown_env_names}.
×
395
            Please remove them from the launch request or add them to the session launcher."""
396
        raise errors.ValidationError(message=message)
×
397

398

399
async def request_session_secret_creation(
2✔
400
    user: AuthenticatedAPIUser | AnonymousAPIUser,
401
    nb_config: NotebooksConfig,
402
    manifest: AmaltheaSessionV1Alpha1,
403
    session_secrets: list[SessionSecret],
404
) -> None:
405
    """Request the specified user session secrets to be created by the secret service."""
406
    if isinstance(user, AnonymousAPIUser):
×
407
        return
×
408
    if not session_secrets:
×
409
        return
×
410
    owner_reference = {
×
411
        "apiVersion": manifest.apiVersion,
412
        "kind": manifest.kind,
413
        "name": manifest.metadata.name,
414
        "uid": manifest.metadata.uid,
415
    }
416
    key_mapping: dict[str, list[str]] = dict()
×
417
    for s in session_secrets:
×
418
        secret_id = str(s.secret_id)
×
419
        if secret_id not in key_mapping:
×
420
            key_mapping[secret_id] = list()
×
421
        key_mapping[secret_id].append(s.secret_slot.filename)
×
422

423
    cluster_id = None
×
424
    namespace = await nb_config.k8s_v2_client.namespace()
×
425
    if (cluster := await nb_config.k8s_v2_client.cluster_by_class_id(manifest.resource_class_id(), user)) is not None:
×
426
        cluster_id = cluster.id
×
427
        namespace = cluster.namespace
×
428

429
    request_data = {
×
430
        "name": f"{manifest.metadata.name}-secrets",
431
        "namespace": namespace,
432
        "secret_ids": [str(s.secret_id) for s in session_secrets],
433
        "owner_references": [owner_reference],
434
        "key_mapping": key_mapping,
435
        "cluster_id": str(cluster_id),
436
    }
437
    secrets_url = nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
×
438
    headers = {"Authorization": f"bearer {user.access_token}"}
×
439
    async with httpx.AsyncClient(timeout=10) as client:
×
440
        res = await client.post(secrets_url, headers=headers, json=request_data)
×
441
        if res.status_code >= 300 or res.status_code < 200:
×
442
            raise errors.ProgrammingError(
×
443
                message="The session secrets could not be successfully created, "
444
                f"the status code was {res.status_code}."
445
                "Please contact a Renku administrator.",
446
                detail=res.text,
447
            )
448

449

450
def resources_from_resource_class(resource_class: ResourceClass) -> Resources:
2✔
451
    """Convert the resource class to a k8s resources spec."""
452
    requests: dict[str, Requests | RequestsStr] = {
×
453
        "cpu": RequestsStr(str(round(resource_class.cpu * 1000)) + "m"),
454
        "memory": RequestsStr(f"{resource_class.memory}Gi"),
455
    }
456
    limits: dict[str, Limits | LimitsStr] = {"memory": LimitsStr(f"{resource_class.memory}Gi")}
×
457
    if resource_class.gpu > 0:
×
458
        gpu_name = GpuKind.NVIDIA.value + "/gpu"
×
459
        requests[gpu_name] = Requests(resource_class.gpu)
×
460
        # NOTE: GPUs have to be set in limits too since GPUs cannot be overcommited, if
461
        # not on some clusters this will cause the session to fully fail to start.
462
        limits[gpu_name] = Limits(resource_class.gpu)
×
463
    return Resources(requests=requests, limits=limits if len(limits) > 0 else None)
×
464

465

466
def repositories_from_project(project: Project, git_providers: list[GitProvider]) -> list[Repository]:
2✔
467
    """Get the list of git repositories from a project."""
468
    repositories: list[Repository] = []
×
469
    for repo in project.repositories:
×
470
        found_provider_id: str | None = None
×
471
        for provider in git_providers:
×
472
            if urlparse(provider.url).netloc == urlparse(repo).netloc:
×
473
                found_provider_id = provider.id
×
474
                break
×
475
        repositories.append(Repository(url=repo, provider=found_provider_id))
×
476
    return repositories
×
477

478

479
async def repositories_from_session(
2✔
480
    user: AnonymousAPIUser | AuthenticatedAPIUser,
481
    session: AmaltheaSessionV1Alpha1,
482
    project_repo: ProjectRepository,
483
    git_providers: list[GitProvider],
484
) -> list[Repository]:
485
    """Get the list of git repositories from a session."""
486
    try:
×
487
        project = await project_repo.get_project(user, session.project_id)
×
488
    except errors.MissingResourceError:
×
489
        return []
×
490
    return repositories_from_project(project, git_providers)
×
491

492

493
def get_culling(
2✔
494
    user: AuthenticatedAPIUser | AnonymousAPIUser, resource_pool: ResourcePool, nb_config: NotebooksConfig
495
) -> Culling:
496
    """Create the culling specification for an AmaltheaSession."""
497
    idle_threshold_seconds = resource_pool.idle_threshold or nb_config.sessions.culling.registered.idle_seconds
×
498
    if user.is_anonymous:
×
499
        # NOTE: Anonymous sessions should not be hibernated at all, but there is no such option in Amalthea
500
        # So in this case we set a very low hibernation threshold so the session is deleted quickly after
501
        # it is hibernated.
502
        hibernation_threshold_seconds = 1
×
503
    else:
504
        hibernation_threshold_seconds = (
×
505
            resource_pool.hibernation_threshold or nb_config.sessions.culling.registered.hibernated_seconds
506
        )
507
    return Culling(
×
508
        maxAge=timedelta(seconds=nb_config.sessions.culling.registered.max_age_seconds),
509
        maxFailedDuration=timedelta(seconds=nb_config.sessions.culling.registered.failed_seconds),
510
        maxHibernatedDuration=timedelta(seconds=hibernation_threshold_seconds),
511
        maxIdleDuration=timedelta(seconds=idle_threshold_seconds),
512
        maxStartingDuration=timedelta(seconds=nb_config.sessions.culling.registered.pending_seconds),
513
    )
514

515

516
async def __requires_image_pull_secret(nb_config: NotebooksConfig, image: str, internal_gitlab_user: APIUser) -> bool:
2✔
517
    """Determines if an image requires a pull secret based on its visibility and their GitLab access token."""
518

519
    parsed_image = Image.from_path(image)
×
520
    image_repo = parsed_image.repo_api()
×
521

522
    image_exists_publicly = await image_repo.image_exists(parsed_image)
×
523
    if image_exists_publicly:
×
524
        return False
×
525

526
    if parsed_image.hostname == nb_config.git.registry and internal_gitlab_user.access_token:
×
527
        image_repo = image_repo.with_oauth2_token(internal_gitlab_user.access_token)
×
528
        image_exists_privately = await image_repo.image_exists(parsed_image)
×
529
        if image_exists_privately:
×
530
            return True
×
531
    # No pull secret needed if the image is private and the user cannot access it
532
    return False
×
533

534

535
def __format_image_pull_secret(secret_name: str, token: str, registry_domain: str) -> ExtraSecret:
2✔
NEW
536
    registry_secret = {"auths": {registry_domain: {"auth": token}}}
×
537
    registry_secret = json.dumps(registry_secret)
×
538
    registry_secret = base64.b64encode(registry_secret.encode()).decode()
×
539
    return ExtraSecret(
×
540
        V1Secret(
541
            data={".dockerconfigjson": registry_secret},
542
            metadata=V1ObjectMeta(name=secret_name),
543
            type="kubernetes.io/dockerconfigjson",
544
        )
545
    )
546

547

548
async def __get_connected_services_image_pull_secret(
2✔
549
    secret_name: str, connected_svcs_repo: ConnectedServicesRepository, image: str, user: APIUser
550
) -> ExtraSecret | None:
551
    """Return a secret for accessing the image if one is available for the given user."""
552
    image_parsed = Image.from_path(image)
×
553
    image_check_result = await ic.check_image(image_parsed, user, connected_svcs_repo)
×
554
    logger.debug(f"Set pull secret for {image} to connection {image_check_result.image_provider}")
×
555
    if not image_check_result.token:
×
556
        return None
×
557

558
    if not image_check_result.image_provider:
×
559
        return None
×
560

NEW
561
    registry_url = image_check_result.image_provider.registry_url
×
NEW
562
    if image_check_result.image_provider.provider.kind == ProviderKind.dockerhub:
×
NEW
563
        registry_url = "https://index.docker.io/v1/"
×
564

UNCOV
565
    return __format_image_pull_secret(
×
566
        secret_name=secret_name,
567
        token=image_check_result.token,
568
        registry_domain=registry_url,
569
    )
570

571

572
async def get_image_pull_secret(
2✔
573
    image: str,
574
    server_name: str,
575
    nb_config: NotebooksConfig,
576
    user: APIUser,
577
    internal_gitlab_user: APIUser,
578
    connected_svcs_repo: ConnectedServicesRepository,
579
) -> ExtraSecret | None:
580
    """Get an image pull secret."""
581

582
    v2_secret = await __get_connected_services_image_pull_secret(
×
583
        f"{server_name}-image-secret", connected_svcs_repo, image, user
584
    )
585
    if v2_secret:
×
586
        return v2_secret
×
587

588
    if (
×
589
        nb_config.enable_internal_gitlab
590
        and isinstance(user, AuthenticatedAPIUser)
591
        and internal_gitlab_user.access_token is not None
592
    ):
UNCOV
593
        needs_pull_secret = await __requires_image_pull_secret(nb_config, image, internal_gitlab_user)
×
594
        if needs_pull_secret:
×
595
            v1_secret = __get_gitlab_image_pull_secret(
×
596
                nb_config, user, f"{server_name}-image-secret-v1", internal_gitlab_user.access_token
597
            )
UNCOV
598
            return v1_secret
×
599

UNCOV
600
    return None
×
601

602

603
async def start_session(
2✔
604
    request: Request,
605
    body: apispec.SessionPostRequest,
606
    user: AnonymousAPIUser | AuthenticatedAPIUser,
607
    internal_gitlab_user: APIUser,
608
    nb_config: NotebooksConfig,
609
    cluster_repo: ClusterRepository,
610
    data_connector_secret_repo: DataConnectorSecretRepository,
611
    project_repo: ProjectRepository,
612
    project_session_secret_repo: ProjectSessionSecretRepository,
613
    rp_repo: ResourcePoolRepository,
614
    session_repo: SessionRepository,
615
    user_repo: UserRepo,
616
    metrics: MetricsService,
617
    connected_svcs_repo: ConnectedServicesRepository,
618
) -> tuple[AmaltheaSessionV1Alpha1, bool]:
619
    """Start an Amalthea session.
620

621
    Returns a tuple where the first item is an instance of an Amalthea session
622
    and the second item is a boolean set to true iff a new session was created.
623
    """
UNCOV
624
    launcher = await session_repo.get_launcher(user, ULID.from_str(body.launcher_id))
×
625
    project = await project_repo.get_project(user=user, project_id=launcher.project_id)
×
626

627
    # Determine resource_class_id: the class can be overwritten at the user's request
UNCOV
628
    resource_class_id = body.resource_class_id or launcher.resource_class_id
×
629

630
    cluster = await nb_config.k8s_v2_client.cluster_by_class_id(resource_class_id, user)
×
631

632
    server_name = renku_2_make_server_name(
×
633
        user=user, project_id=str(launcher.project_id), launcher_id=body.launcher_id, cluster_id=str(cluster.id)
634
    )
635
    existing_session = await nb_config.k8s_v2_client.get_session(server_name, user.id)
×
636
    if existing_session is not None and existing_session.spec is not None:
×
637
        return existing_session, False
×
638

639
    # Fully determine the resource pool and resource class
640
    if resource_class_id is None:
×
641
        resource_pool = await rp_repo.get_default_resource_pool()
×
642
        resource_class = resource_pool.get_default_resource_class()
×
UNCOV
643
        if not resource_class and len(resource_pool.classes) > 0:
×
644
            resource_class = resource_pool.classes[0]
×
645
        if not resource_class or not resource_class.id:
×
646
            raise errors.ProgrammingError(message="Cannot find any resource classes in the default pool.")
×
647
        resource_class_id = resource_class.id
×
648
    else:
UNCOV
649
        resource_pool = await rp_repo.get_resource_pool_from_class(user, resource_class_id)
×
650
        resource_class = resource_pool.get_resource_class(resource_class_id)
×
651
        if not resource_class or not resource_class.id:
×
652
            raise errors.MissingResourceError(message=f"The resource class with ID {resource_class_id} does not exist.")
×
653
    await nb_config.crc_validator.validate_class_storage(user, resource_class.id, body.disk_storage)
×
654

655
    environment = launcher.environment
×
656
    image = environment.container_image
×
657
    work_dir = environment.working_directory
×
658
    if not work_dir:
×
659
        image_workdir = await core.docker_image_workdir(nb_config, environment.container_image, internal_gitlab_user)
×
660
        work_dir_fallback = PurePosixPath("/home/jovyan")
×
UNCOV
661
        work_dir = image_workdir or work_dir_fallback
×
UNCOV
662
    storage_mount_fallback = work_dir / "work"
×
663
    storage_mount = launcher.environment.mount_directory or storage_mount_fallback
×
664
    secrets_mount_directory = storage_mount / project.secrets_mount_directory
×
665
    session_secrets = await project_session_secret_repo.get_all_session_secrets_from_project(
×
666
        user=user, project_id=project.id
667
    )
668
    data_connectors_stream = data_connector_secret_repo.get_data_connectors_with_secrets(user, project.id)
×
669
    git_providers = await nb_config.git_provider_helper.get_providers(user=user)
×
UNCOV
670
    repositories = repositories_from_project(project, git_providers)
×
671

672
    # User secrets
UNCOV
673
    session_extras = SessionExtraResources()
×
UNCOV
674
    session_extras = session_extras.concat(
×
675
        user_secrets_extras(
676
            user=user,
677
            config=nb_config,
678
            secrets_mount_directory=secrets_mount_directory.as_posix(),
679
            k8s_secret_name=f"{server_name}-secrets",
680
            session_secrets=session_secrets,
681
        )
682
    )
683

684
    # Data connectors
UNCOV
685
    session_extras = session_extras.concat(
×
686
        await get_data_sources(
687
            nb_config=nb_config,
688
            server_name=server_name,
689
            user=user,
690
            data_connectors_stream=data_connectors_stream,
691
            work_dir=work_dir,
692
            cloud_storage_overrides=body.cloudstorage or [],
693
            user_repo=user_repo,
694
        )
695
    )
696

697
    # More init containers
UNCOV
698
    session_extras = session_extras.concat(
×
699
        await get_extra_init_containers(
700
            nb_config,
701
            user,
702
            repositories,
703
            git_providers,
704
            storage_mount,
705
            work_dir,
706
            uid=environment.uid,
707
            gid=environment.gid,
708
        )
709
    )
710

711
    # Extra containers
712
    session_extras = session_extras.concat(await get_extra_containers(nb_config, user, repositories, git_providers))
×
713

714
    # Ingress
715
    try:
×
716
        cluster_settings = await cluster_repo.select(cluster.id)
×
UNCOV
717
    except errors.MissingResourceError:
×
UNCOV
718
        cluster_settings = None
×
719

UNCOV
720
    if cluster_settings is not None:
×
UNCOV
721
        (
×
722
            base_server_path,
723
            base_server_url,
724
            base_server_https_url,
725
            host,
726
            tls_secret,
727
            ingress_annotations,
728
        ) = cluster_settings.get_ingress_parameters(server_name)
729
        storage_class = cluster_settings.get_storage_class()
×
730
        service_account_name = cluster_settings.service_account_name
×
731
    else:
732
        # Fallback to global, main cluster parameters
733
        host = nb_config.sessions.ingress.host
×
734
        base_server_path = nb_config.sessions.ingress.base_path(server_name)
×
UNCOV
735
        base_server_url = nb_config.sessions.ingress.base_url(server_name)
×
736
        base_server_https_url = nb_config.sessions.ingress.base_url(server_name, force_https=True)
×
737
        storage_class = nb_config.sessions.storage.pvs_storage_class
×
UNCOV
738
        service_account_name = None
×
739
        ingress_annotations = nb_config.sessions.ingress.annotations
×
740

741
        tls_name = nb_config.sessions.ingress.tls_secret
×
UNCOV
742
        tls_secret = None if tls_name is None else TlsSecret(adopt=False, name=tls_name)
×
743

UNCOV
744
    ui_path = f"{base_server_path}/{environment.default_url.lstrip('/')}"
×
745

UNCOV
746
    ingress = Ingress(
×
747
        host=host,
748
        ingressClassName=ingress_annotations.get("kubernetes.io/ingress.class"),
749
        annotations=ingress_annotations,
750
        tlsSecret=tls_secret,
751
        pathPrefix=base_server_path,
752
    )
753

754
    # Annotations
UNCOV
755
    annotations: dict[str, str] = {
×
756
        "renku.io/project_id": str(launcher.project_id),
757
        "renku.io/launcher_id": body.launcher_id,
758
        "renku.io/resource_class_id": str(resource_class_id),
759
    }
760

761
    # Authentication
762
    if isinstance(user, AuthenticatedAPIUser):
×
763
        auth_secret = await get_auth_secret_authenticated(
×
764
            nb_config, user, server_name, base_server_url, base_server_https_url, base_server_path
765
        )
766
    else:
UNCOV
767
        auth_secret = get_auth_secret_anonymous(nb_config, server_name, request)
×
UNCOV
768
    session_extras = session_extras.concat(
×
769
        SessionExtraResources(
770
            secrets=[auth_secret],
771
            volumes=[auth_secret.volume] if auth_secret.volume else [],
772
        )
773
    )
774

UNCOV
775
    image_secret = await get_image_pull_secret(
×
776
        image=image,
777
        server_name=server_name,
778
        nb_config=nb_config,
779
        user=user,
780
        internal_gitlab_user=internal_gitlab_user,
781
        connected_svcs_repo=connected_svcs_repo,
782
    )
783
    if image_secret:
×
UNCOV
784
        session_extras = session_extras.concat(SessionExtraResources(secrets=[image_secret]))
×
785

786
    # Raise an error if there are invalid environment variables in the request body
UNCOV
787
    verify_launcher_env_variable_overrides(launcher, body)
×
UNCOV
788
    env = [
×
789
        SessionEnvItem(name="RENKU_BASE_URL_PATH", value=base_server_path),
790
        SessionEnvItem(name="RENKU_BASE_URL", value=base_server_url),
791
        SessionEnvItem(name="RENKU_MOUNT_DIR", value=storage_mount.as_posix()),
792
        SessionEnvItem(name="RENKU_SESSION", value="1"),
793
        SessionEnvItem(name="RENKU_SESSION_IP", value="0.0.0.0"),  # nosec B104
794
        SessionEnvItem(name="RENKU_SESSION_PORT", value=f"{environment.port}"),
795
        SessionEnvItem(name="RENKU_WORKING_DIR", value=work_dir.as_posix()),
796
        SessionEnvItem(name="RENKU_SECRETS_PATH", value=project.secrets_mount_directory.as_posix()),
797
        SessionEnvItem(name="RENKU_PROJECT_ID", value=str(project.id)),
798
        SessionEnvItem(name="RENKU_PROJECT_PATH", value=project.path.serialize()),
799
        SessionEnvItem(name="RENKU_LAUNCHER_ID", value=str(launcher.id)),
800
    ]
UNCOV
801
    launcher_env_variables = get_launcher_env_variables(launcher, body)
×
UNCOV
802
    if launcher_env_variables:
×
UNCOV
803
        env.extend(launcher_env_variables)
×
804

UNCOV
805
    session = AmaltheaSessionV1Alpha1(
×
806
        metadata=Metadata(name=server_name, annotations=annotations),
807
        spec=AmaltheaSessionSpec(
808
            imagePullSecrets=[ImagePullSecret(name=image_secret.name, adopt=True)] if image_secret else [],
809
            codeRepositories=[],
810
            hibernated=False,
811
            reconcileStrategy=ReconcileStrategy.whenFailedOrHibernated,
812
            priorityClassName=resource_class.quota,
813
            session=Session(
814
                image=image,
815
                imagePullPolicy=ImagePullPolicy.Always,
816
                urlPath=ui_path,
817
                port=environment.port,
818
                storage=Storage(
819
                    className=storage_class,
820
                    size=SizeStr(str(body.disk_storage) + "G"),
821
                    mountPath=storage_mount.as_posix(),
822
                ),
823
                workingDir=work_dir.as_posix(),
824
                runAsUser=environment.uid,
825
                runAsGroup=environment.gid,
826
                resources=resources_from_resource_class(resource_class),
827
                extraVolumeMounts=session_extras.volume_mounts,
828
                command=environment.command,
829
                args=environment.args,
830
                shmSize=ShmSizeStr("1G"),
831
                stripURLPath=environment.strip_path_prefix,
832
                env=env,
833
            ),
834
            ingress=ingress,
835
            extraContainers=session_extras.containers,
836
            initContainers=session_extras.init_containers,
837
            extraVolumes=session_extras.volumes,
838
            culling=get_culling(user, resource_pool, nb_config),
839
            authentication=Authentication(
840
                enabled=True,
841
                type=AuthenticationType.oauth2proxy
842
                if isinstance(user, AuthenticatedAPIUser)
843
                else AuthenticationType.token,
844
                secretRef=auth_secret.key_ref("auth"),
845
                extraVolumeMounts=[auth_secret.volume_mount] if auth_secret.volume_mount else None,
846
            ),
847
            dataSources=session_extras.data_sources,
848
            tolerations=tolerations_from_resource_class(resource_class, nb_config.sessions.tolerations_model),
849
            affinity=node_affinity_from_resource_class(resource_class, nb_config.sessions.affinity_model),
850
            serviceAccountName=service_account_name,
851
        ),
852
    )
853
    secrets_to_create = session_extras.secrets or []
×
854
    for s in secrets_to_create:
×
855
        await nb_config.k8s_v2_client.create_secret(K8sSecret.from_v1_secret(s.secret, cluster))
×
856
    try:
×
UNCOV
857
        session = await nb_config.k8s_v2_client.create_session(session, user)
×
858
    except Exception as err:
×
859
        for s in secrets_to_create:
×
860
            await nb_config.k8s_v2_client.delete_secret(K8sSecret.from_v1_secret(s.secret, cluster))
×
861
        raise errors.ProgrammingError(message="Could not start the amalthea session") from err
×
862
    else:
863
        try:
×
864
            await request_session_secret_creation(user, nb_config, session, session_secrets)
×
UNCOV
865
            data_connector_secrets = session_extras.data_connector_secrets or dict()
×
866
            await request_dc_secret_creation(user, nb_config, session, data_connector_secrets)
×
UNCOV
867
        except Exception:
×
UNCOV
868
            await nb_config.k8s_v2_client.delete_session(server_name, user.id)
×
UNCOV
869
            raise
×
870

UNCOV
871
    await metrics.user_requested_session_launch(
×
872
        user=user,
873
        metadata={
874
            "cpu": int(resource_class.cpu * 1000),
875
            "memory": resource_class.memory,
876
            "gpu": resource_class.gpu,
877
            "storage": body.disk_storage,
878
            "resource_class_id": resource_class.id,
879
            "resource_pool_id": resource_pool.id or "",
880
            "resource_class_name": f"{resource_pool.name}.{resource_class.name}",
881
            "session_id": server_name,
882
        },
883
    )
UNCOV
884
    return session, True
×
885

886

887
async def patch_session(
2✔
888
    body: apispec.SessionPatchRequest,
889
    session_id: str,
890
    user: AnonymousAPIUser | AuthenticatedAPIUser,
891
    internal_gitlab_user: APIUser,
892
    nb_config: NotebooksConfig,
893
    project_repo: ProjectRepository,
894
    project_session_secret_repo: ProjectSessionSecretRepository,
895
    rp_repo: ResourcePoolRepository,
896
    session_repo: SessionRepository,
897
    connected_svcs_repo: ConnectedServicesRepository,
898
    metrics: MetricsService,
899
) -> AmaltheaSessionV1Alpha1:
900
    """Patch an Amalthea session."""
UNCOV
901
    session = await nb_config.k8s_v2_client.get_session(session_id, user.id)
×
UNCOV
902
    if session is None:
×
903
        raise errors.MissingResourceError(message=f"The session with ID {session_id} does not exist")
×
UNCOV
904
    if session.spec is None:
×
905
        raise errors.ProgrammingError(
×
906
            message=f"The session {session_id} being patched is missing the expected 'spec' field.", quiet=True
907
        )
UNCOV
908
    cluster = await nb_config.k8s_v2_client.cluster_by_class_id(session.resource_class_id(), user)
×
909

UNCOV
910
    patch = AmaltheaSessionV1Alpha1Patch(spec=AmaltheaSessionV1Alpha1SpecPatch())
×
911
    is_getting_hibernated: bool = False
×
912

913
    # Hibernation
914
    # TODO: Some patching should only be done when the session is in some states to avoid inadvertent restarts
915
    # Refresh tokens for git proxy
UNCOV
916
    if (
×
917
        body.state is not None
918
        and body.state.value.lower() == State.Hibernated.value.lower()
919
        and body.state.value.lower() != session.status.state.value.lower()
920
    ):
921
        # Session is being hibernated
UNCOV
922
        patch.spec.hibernated = True
×
UNCOV
923
        is_getting_hibernated = True
×
UNCOV
924
    elif (
×
925
        body.state is not None
926
        and body.state.value.lower() == State.Running.value.lower()
927
        and session.status.state.value.lower() != body.state.value.lower()
928
    ):
929
        # Session is being resumed
930
        patch.spec.hibernated = False
×
931
        await metrics.user_requested_session_resume(user, metadata={"session_id": session_id})
×
932

933
    # Resource class
UNCOV
934
    if body.resource_class_id is not None:
×
UNCOV
935
        new_cluster = await nb_config.k8s_v2_client.cluster_by_class_id(body.resource_class_id, user)
×
UNCOV
936
        if new_cluster.id != cluster.id:
×
UNCOV
937
            raise errors.ValidationError(
×
938
                message=(
939
                    f"The requested resource class {body.resource_class_id} is not in the "
940
                    f"same cluster {cluster.id} as the current resource class {session.resource_class_id()}."
941
                )
942
            )
UNCOV
943
        rp = await rp_repo.get_resource_pool_from_class(user, body.resource_class_id)
×
UNCOV
944
        rc = rp.get_resource_class(body.resource_class_id)
×
945
        if not rc:
×
946
            raise errors.MissingResourceError(
×
947
                message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
948
            )
949
        # TODO: reject session classes which change the cluster
950
        if not patch.metadata:
×
951
            patch.metadata = AmaltheaSessionV1Alpha1MetadataPatch()
×
952
        # Patch the resource class ID in the annotations
953
        patch.metadata.annotations = {"renku.io/resource_class_id": str(body.resource_class_id)}
×
954
        if not patch.spec.session:
×
UNCOV
955
            patch.spec.session = AmaltheaSessionV1Alpha1SpecSessionPatch()
×
956
        patch.spec.session.resources = resources_from_resource_class(rc)
×
957
        # Tolerations
958
        tolerations = tolerations_from_resource_class(rc, nb_config.sessions.tolerations_model)
×
959
        patch.spec.tolerations = tolerations
×
960
        # Affinities
961
        patch.spec.affinity = node_affinity_from_resource_class(rc, nb_config.sessions.affinity_model)
×
962
        # Priority class (if a quota is being used)
UNCOV
963
        patch.spec.priorityClassName = rc.quota
×
UNCOV
964
        patch.spec.culling = get_culling(user, rp, nb_config)
×
965
        if rp.cluster is not None:
×
966
            patch.spec.service_account_name = rp.cluster.service_account_name
×
967

968
    # If the session is being hibernated we do not need to patch anything else that is
969
    # not specifically called for in the request body, we can refresh things when the user resumes.
970
    if is_getting_hibernated:
×
971
        return await nb_config.k8s_v2_client.patch_session(session_id, user.id, patch.to_rfc7386())
×
972

973
    server_name = session.metadata.name
×
974
    launcher = await session_repo.get_launcher(user, session.launcher_id)
×
975
    project = await project_repo.get_project(user=user, project_id=session.project_id)
×
976
    environment = launcher.environment
×
977
    work_dir = environment.working_directory
×
978
    if not work_dir:
×
979
        image_workdir = await core.docker_image_workdir(nb_config, environment.container_image, internal_gitlab_user)
×
980
        work_dir_fallback = PurePosixPath("/home/jovyan")
×
UNCOV
981
        work_dir = image_workdir or work_dir_fallback
×
UNCOV
982
    storage_mount_fallback = work_dir / "work"
×
983
    storage_mount = launcher.environment.mount_directory or storage_mount_fallback
×
984
    secrets_mount_directory = storage_mount / project.secrets_mount_directory
×
UNCOV
985
    session_secrets = await project_session_secret_repo.get_all_session_secrets_from_project(
×
986
        user=user, project_id=project.id
987
    )
988
    git_providers = await nb_config.git_provider_helper.get_providers(user=user)
×
UNCOV
989
    repositories = repositories_from_project(project, git_providers)
×
990

991
    # User secrets
UNCOV
992
    session_extras = SessionExtraResources()
×
UNCOV
993
    session_extras = session_extras.concat(
×
994
        user_secrets_extras(
995
            user=user,
996
            config=nb_config,
997
            secrets_mount_directory=secrets_mount_directory.as_posix(),
998
            k8s_secret_name=f"{server_name}-secrets",
999
            session_secrets=session_secrets,
1000
        )
1001
    )
1002

1003
    # Data connectors: skip
1004
    # TODO: How can we patch data connectors? Should we even patch them?
1005
    # TODO: The fact that `start_session()` accepts overrides for data connectors
1006
    # TODO: but that we do not save these overrides (e.g. as annotations) means that
1007
    # TODO: we cannot patch data connectors upon resume.
1008
    # TODO: If we did, we would lose the user's provided overrides (e.g. unsaved credentials).
1009

1010
    # More init containers
UNCOV
1011
    session_extras = session_extras.concat(
×
1012
        await get_extra_init_containers(
1013
            nb_config,
1014
            user,
1015
            repositories,
1016
            git_providers,
1017
            storage_mount,
1018
            work_dir,
1019
            uid=environment.uid,
1020
            gid=environment.gid,
1021
        )
1022
    )
1023

1024
    # Extra containers
UNCOV
1025
    session_extras = session_extras.concat(await get_extra_containers(nb_config, user, repositories, git_providers))
×
1026

1027
    # Patching the image pull secret
UNCOV
1028
    image = session.spec.session.image
×
UNCOV
1029
    image_pull_secret = await get_image_pull_secret(
×
1030
        image=image,
1031
        server_name=server_name,
1032
        nb_config=nb_config,
1033
        connected_svcs_repo=connected_svcs_repo,
1034
        user=user,
1035
        internal_gitlab_user=internal_gitlab_user,
1036
    )
1037
    if image_pull_secret:
×
UNCOV
1038
        session_extras.concat(SessionExtraResources(secrets=[image_pull_secret]))
×
UNCOV
1039
        patch.spec.imagePullSecrets = [ImagePullSecret(name=image_pull_secret.name, adopt=image_pull_secret.adopt)]
×
1040

1041
    # Construct session patch
UNCOV
1042
    patch.spec.extraContainers = _make_patch_spec_list(
×
1043
        existing=session.spec.extraContainers or [], updated=session_extras.containers
1044
    )
UNCOV
1045
    patch.spec.initContainers = _make_patch_spec_list(
×
1046
        existing=session.spec.initContainers or [], updated=session_extras.init_containers
1047
    )
1048
    patch.spec.extraVolumes = _make_patch_spec_list(
×
1049
        existing=session.spec.extraVolumes or [], updated=session_extras.volumes
1050
    )
UNCOV
1051
    if not patch.spec.session:
×
1052
        patch.spec.session = AmaltheaSessionV1Alpha1SpecSessionPatch()
×
1053
    patch.spec.session.extraVolumeMounts = _make_patch_spec_list(
×
1054
        existing=session.spec.session.extraVolumeMounts or [], updated=session_extras.volume_mounts
1055
    )
1056

1057
    secrets_to_create = session_extras.secrets or []
×
1058
    for s in secrets_to_create:
×
UNCOV
1059
        await nb_config.k8s_v2_client.create_secret(K8sSecret.from_v1_secret(s.secret, cluster))
×
1060

UNCOV
1061
    patch_serialized = patch.to_rfc7386()
×
UNCOV
1062
    if len(patch_serialized) == 0:
×
UNCOV
1063
        return session
×
1064

UNCOV
1065
    return await nb_config.k8s_v2_client.patch_session(session_id, user.id, patch_serialized)
×
1066

1067

1068
def _deduplicate_target_paths(dcs: dict[str, RCloneStorage]) -> dict[str, RCloneStorage]:
2✔
1069
    """Ensures that the target paths for all storages are unique.
1070

1071
    This method will attempt to de-duplicate the target_path for all items passed in,
1072
    and raise an error if it fails to generate unique target_path.
1073
    """
1074
    result_dcs: dict[str, RCloneStorage] = {}
×
1075
    mount_folders: dict[str, list[str]] = {}
×
1076

1077
    def _find_mount_folder(dc: RCloneStorage) -> str:
×
1078
        mount_folder = dc.mount_folder
×
1079
        if mount_folder not in mount_folders:
×
UNCOV
1080
            return mount_folder
×
1081
        # 1. Try with a "-1", "-2", etc. suffix
1082
        mount_folder_try = f"{mount_folder}-{len(mount_folders[mount_folder])}"
×
1083
        if mount_folder_try not in mount_folders:
×
1084
            return mount_folder_try
×
1085
        # 2. Try with a random suffix
UNCOV
1086
        suffix = "".join([random.choice(string.ascii_lowercase + string.digits) for _ in range(4)])  # nosec B311
×
UNCOV
1087
        mount_folder_try = f"{mount_folder}-{suffix}"
×
UNCOV
1088
        if mount_folder_try not in mount_folders:
×
1089
            return mount_folder_try
×
1090
        raise errors.ValidationError(
×
1091
            message=f"Could not start session because two or more data connectors ({', '.join(mount_folders[mount_folder])}) share the same mount point '{mount_folder}'"  # noqa E501
1092
        )
1093

1094
    for dc_id, dc in dcs.items():
×
1095
        original_mount_folder = dc.mount_folder
×
1096
        new_mount_folder = _find_mount_folder(dc)
×
1097
        # Keep track of the original mount folder here
UNCOV
1098
        if new_mount_folder != original_mount_folder:
×
1099
            logger.warning(f"Re-assigning data connector {dc_id} to mount point '{new_mount_folder}'")
×
1100
            dc_ids = mount_folders.get(original_mount_folder, [])
×
1101
            dc_ids.append(dc_id)
×
1102
            mount_folders[original_mount_folder] = dc_ids
×
1103
        # Keep track of the assigned mount folder here
UNCOV
1104
        dc_ids = mount_folders.get(new_mount_folder, [])
×
UNCOV
1105
        dc_ids.append(dc_id)
×
1106
        mount_folders[new_mount_folder] = dc_ids
×
UNCOV
1107
        result_dcs[dc_id] = dc.with_override(
×
1108
            override=apispec.SessionCloudStoragePost(storage_id=dc_id, target_path=new_mount_folder)
1109
        )
1110

UNCOV
1111
    return result_dcs
×
1112

1113

1114
class _NamedResource(Protocol):
2✔
1115
    """Represents a resource with a name."""
1116

1117
    name: str
2✔
1118

1119

1120
_T = TypeVar("_T", bound=_NamedResource)
2✔
1121

1122

1123
def _make_patch_spec_list(existing: Sequence[_T], updated: Sequence[_T]) -> list[_T] | None:
2✔
1124
    """Merges updated into existing by upserting items identified by their name.
1125

1126
    This method is used to construct session patches, merging session resources by name (containers, volumes, etc.).
1127
    """
1128
    patch_list = None
1✔
1129
    if updated:
1✔
1130
        patch_list = list(existing)
1✔
1131
        upsert_list = list(updated)
1✔
1132
        for upsert_item in upsert_list:
1✔
1133
            # Find out if the upsert_item needs to be added or updated
1134
            # found = next(enumerate(filter(lambda item: item.name == upsert_item.name, patch_list)), None)
1135
            found = next(filter(lambda t: t[1].name == upsert_item.name, enumerate(patch_list)), None)
1✔
1136
            if found is not None:
1✔
1137
                idx, _ = found
1✔
1138
                patch_list[idx] = upsert_item
1✔
1139
            else:
1140
                patch_list.append(upsert_item)
1✔
1141
    return patch_list
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc