• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 19145569596

06 Nov 2025 06:17PM UTC coverage: 86.819% (-0.03%) from 86.85%
19145569596

Pull #1101

github

web-flow
Merge 252d68849 into fabb4724c
Pull Request #1101: feat: indicate supported platforms when checking a session image

241 of 314 new or added lines in 12 files covered. (76.75%)

8 existing lines in 4 files now uncovered.

23054 of 26554 relevant lines covered (86.82%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.22
/components/renku_data_services/notebooks/core_sessions.py
1
"""A selection of core functions for AmaltheaSessions."""
2

3
import base64
2✔
4
import json
2✔
5
import os
2✔
6
import random
2✔
7
import string
2✔
8
from collections.abc import AsyncIterator, Sequence
2✔
9
from datetime import timedelta
2✔
10
from pathlib import PurePosixPath
2✔
11
from typing import Protocol, TypeVar, cast
2✔
12
from urllib.parse import urljoin, urlparse
2✔
13

14
import httpx
2✔
15
from kubernetes.client import V1ObjectMeta, V1Secret
2✔
16
from sanic import Request
2✔
17
from toml import dumps
2✔
18
from ulid import ULID
2✔
19
from yaml import safe_dump
2✔
20

21
from renku_data_services.app_config import logging
2✔
22
from renku_data_services.base_models import AnonymousAPIUser, APIUser, AuthenticatedAPIUser
2✔
23
from renku_data_services.base_models.metrics import MetricsService
2✔
24
from renku_data_services.crc.db import ClusterRepository, ResourcePoolRepository
2✔
25
from renku_data_services.crc.models import (
2✔
26
    ClusterSettings,
27
    GpuKind,
28
    RemoteConfigurationFirecrest,
29
    ResourceClass,
30
    ResourcePool,
31
)
32
from renku_data_services.data_connectors.db import (
2✔
33
    DataConnectorSecretRepository,
34
)
35
from renku_data_services.data_connectors.models import DataConnectorSecret, DataConnectorWithSecrets
2✔
36
from renku_data_services.errors import errors
2✔
37
from renku_data_services.k8s.models import K8sSecret, sanitizer
2✔
38
from renku_data_services.notebooks import apispec, core
2✔
39
from renku_data_services.notebooks.api.amalthea_patches import git_proxy, init_containers
2✔
40
from renku_data_services.notebooks.api.amalthea_patches.init_containers import user_secrets_extras
2✔
41
from renku_data_services.notebooks.api.classes.image import Image
2✔
42
from renku_data_services.notebooks.api.classes.repository import GitProvider, Repository
2✔
43
from renku_data_services.notebooks.api.schemas.cloud_storage import RCloneStorage
2✔
44
from renku_data_services.notebooks.config import GitProviderHelperProto, NotebooksConfig
2✔
45
from renku_data_services.notebooks.crs import (
2✔
46
    AmaltheaSessionSpec,
47
    AmaltheaSessionV1Alpha1,
48
    AmaltheaSessionV1Alpha1MetadataPatch,
49
    AmaltheaSessionV1Alpha1Patch,
50
    AmaltheaSessionV1Alpha1SpecPatch,
51
    AmaltheaSessionV1Alpha1SpecSessionPatch,
52
    Authentication,
53
    AuthenticationType,
54
    Culling,
55
    DataSource,
56
    ExtraContainer,
57
    ExtraVolume,
58
    ExtraVolumeMount,
59
    ImagePullPolicy,
60
    ImagePullSecret,
61
    Ingress,
62
    InitContainer,
63
    Limits,
64
    LimitsStr,
65
    Metadata,
66
    ReconcileStrategy,
67
    Requests,
68
    RequestsStr,
69
    Resources,
70
    SecretAsVolume,
71
    SecretAsVolumeItem,
72
    Session,
73
    SessionEnvItem,
74
    SessionLocation,
75
    ShmSizeStr,
76
    SizeStr,
77
    State,
78
    Storage,
79
)
80
from renku_data_services.notebooks.image_check import ImageCheckRepository
2✔
81
from renku_data_services.notebooks.models import (
2✔
82
    ExtraSecret,
83
    SessionDataConnectorOverride,
84
    SessionEnvVar,
85
    SessionExtraResources,
86
    SessionLaunchRequest,
87
)
88
from renku_data_services.notebooks.util.kubernetes_ import (
2✔
89
    renku_2_make_server_name,
90
)
91
from renku_data_services.notebooks.utils import (
2✔
92
    node_affinity_from_resource_class,
93
    tolerations_from_resource_class,
94
)
95
from renku_data_services.project.db import ProjectRepository, ProjectSessionSecretRepository
2✔
96
from renku_data_services.project.models import Project, SessionSecret
2✔
97
from renku_data_services.session.db import SessionRepository
2✔
98
from renku_data_services.session.models import SessionLauncher
2✔
99
from renku_data_services.users.db import UserRepo
2✔
100
from renku_data_services.utils.cryptography import get_encryption_key
2✔
101

102
logger = logging.getLogger(__name__)
2✔
103

104

105
async def get_extra_init_containers(
2✔
106
    nb_config: NotebooksConfig,
107
    user: AnonymousAPIUser | AuthenticatedAPIUser,
108
    repositories: list[Repository],
109
    git_providers: list[GitProvider],
110
    storage_mount: PurePosixPath,
111
    work_dir: PurePosixPath,
112
    uid: int = 1000,
113
    gid: int = 1000,
114
) -> SessionExtraResources:
115
    """Get all extra init containers that should be added to an amalthea session."""
116
    # TODO: The above statement is not correct: the init container for user secrets is not included here
117
    cert_init, cert_vols = init_containers.certificates_container(nb_config)
×
118
    session_init_containers = [InitContainer.model_validate(sanitizer(cert_init))]
×
119
    extra_volumes = [ExtraVolume.model_validate(sanitizer(volume)) for volume in cert_vols]
×
120
    git_clone = await init_containers.git_clone_container_v2(
×
121
        user=user,
122
        config=nb_config,
123
        repositories=repositories,
124
        git_providers=git_providers,
125
        workspace_mount_path=storage_mount,
126
        work_dir=work_dir,
127
        uid=uid,
128
        gid=gid,
129
    )
130
    if git_clone is not None:
×
131
        session_init_containers.append(InitContainer.model_validate(git_clone))
×
132
    return SessionExtraResources(
×
133
        init_containers=session_init_containers,
134
        volumes=extra_volumes,
135
    )
136

137

138
async def get_extra_containers(
2✔
139
    nb_config: NotebooksConfig,
140
    user: AnonymousAPIUser | AuthenticatedAPIUser,
141
    repositories: list[Repository],
142
    git_providers: list[GitProvider],
143
) -> SessionExtraResources:
144
    """Get the extra containers added to amalthea sessions."""
145
    conts: list[ExtraContainer] = []
×
146
    git_proxy_container = await git_proxy.main_container(
×
147
        user=user, config=nb_config, repositories=repositories, git_providers=git_providers
148
    )
149
    if git_proxy_container:
×
150
        conts.append(ExtraContainer.model_validate(sanitizer(git_proxy_container)))
×
151
    return SessionExtraResources(containers=conts)
×
152

153

154
async def get_auth_secret_authenticated(
2✔
155
    nb_config: NotebooksConfig,
156
    user: AuthenticatedAPIUser,
157
    server_name: str,
158
    base_server_url: str,
159
    base_server_https_url: str,
160
    base_server_path: str,
161
) -> ExtraSecret:
162
    """Get the extra secrets that need to be added to the session for an authenticated user."""
163
    secret_data = {}
×
164

165
    parsed_proxy_url = urlparse(urljoin(base_server_url + "/", "oauth2"))
×
166
    vol = ExtraVolume(
×
167
        name="renku-authorized-emails",
168
        secret=SecretAsVolume(
169
            secretName=server_name,
170
            items=[SecretAsVolumeItem(key="authorized_emails", path="authorized_emails")],
171
        ),
172
    )
173
    secret_data["auth"] = dumps(
×
174
        {
175
            "provider": "oidc",
176
            "client_id": nb_config.sessions.oidc.client_id,
177
            "oidc_issuer_url": nb_config.sessions.oidc.issuer_url,
178
            "session_cookie_minimal": True,
179
            "skip_provider_button": True,
180
            # NOTE: If the redirect url is not HTTPS then some or identity providers will fail.
181
            "redirect_url": urljoin(base_server_https_url + "/", "oauth2/callback"),
182
            "cookie_path": base_server_path,
183
            "proxy_prefix": parsed_proxy_url.path,
184
            "authenticated_emails_file": "/authorized_emails",
185
            "client_secret": nb_config.sessions.oidc.client_secret,
186
            "cookie_secret": base64.urlsafe_b64encode(os.urandom(32)).decode(),
187
            "insecure_oidc_allow_unverified_email": nb_config.sessions.oidc.allow_unverified_email,
188
        }
189
    )
190
    secret_data["authorized_emails"] = user.email
×
191
    secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
×
192
    vol_mount = ExtraVolumeMount(
×
193
        name="renku-authorized-emails",
194
        mountPath="/authorized_emails",
195
        subPath="authorized_emails",
196
    )
197
    return ExtraSecret(secret, vol, vol_mount)
×
198

199

200
def get_auth_secret_anonymous(nb_config: NotebooksConfig, server_name: str, request: Request) -> ExtraSecret:
2✔
201
    """Get the extra secrets that need to be added to the session for an anonymous user."""
202
    # NOTE: We extract the session cookie value here in order to avoid creating a cookie.
203
    # The gateway encrypts and signs cookies so the user ID injected in the request headers does not
204
    # match the value of the session cookie.
205
    session_id = cast(str | None, request.cookies.get(nb_config.session_id_cookie_name))
×
206
    if not session_id:
×
207
        raise errors.UnauthorizedError(
×
208
            message=f"You have to have a renku session cookie at {nb_config.session_id_cookie_name} "
209
            "in order to launch an anonymous session."
210
        )
211
    # NOTE: Amalthea looks for the token value first in the cookie and then in the authorization header
212
    secret_data = {
×
213
        "auth": safe_dump(
214
            {
215
                "authproxy": {
216
                    "token": session_id,
217
                    "cookie_key": nb_config.session_id_cookie_name,
218
                    "verbose": True,
219
                }
220
            }
221
        )
222
    }
223
    secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
×
224
    return ExtraSecret(secret)
×
225

226

227
async def __get_gitlab_image_pull_secret(
2✔
228
    nb_config: NotebooksConfig, user: AuthenticatedAPIUser, image_pull_secret_name: str, access_token: str
229
) -> ExtraSecret:
230
    """Create a Kubernetes secret for private GitLab registry authentication."""
231

232
    k8s_namespace = await nb_config.k8s_client.namespace()
×
233

234
    registry_secret = {
×
235
        "auths": {
236
            nb_config.git.registry: {
237
                "Username": "oauth2",
238
                "Password": access_token,
239
                "Email": user.email,
240
            }
241
        }
242
    }
243
    registry_secret = json.dumps(registry_secret)
×
244

245
    secret_data = {".dockerconfigjson": registry_secret}
×
246
    secret = V1Secret(
×
247
        metadata=V1ObjectMeta(name=image_pull_secret_name, namespace=k8s_namespace),
248
        string_data=secret_data,
249
        type="kubernetes.io/dockerconfigjson",
250
    )
251

252
    return ExtraSecret(secret)
×
253

254

255
async def get_data_sources(
2✔
256
    nb_config: NotebooksConfig,
257
    user: AnonymousAPIUser | AuthenticatedAPIUser,
258
    server_name: str,
259
    data_connectors_stream: AsyncIterator[DataConnectorWithSecrets],
260
    work_dir: PurePosixPath,
261
    data_connectors_overrides: list[SessionDataConnectorOverride],
262
    user_repo: UserRepo,
263
) -> SessionExtraResources:
264
    """Generate cloud storage related resources."""
265
    data_sources: list[DataSource] = []
×
266
    secrets: list[ExtraSecret] = []
×
267
    dcs: dict[str, RCloneStorage] = {}
×
268
    dcs_secrets: dict[str, list[DataConnectorSecret]] = {}
×
269
    user_secret_key: str | None = None
×
270
    async for dc in data_connectors_stream:
×
271
        mount_folder = (
×
272
            dc.data_connector.storage.target_path
273
            if PurePosixPath(dc.data_connector.storage.target_path).is_absolute()
274
            else (work_dir / dc.data_connector.storage.target_path).as_posix()
275
        )
276
        dcs[str(dc.data_connector.id)] = RCloneStorage(
×
277
            source_path=dc.data_connector.storage.source_path,
278
            mount_folder=mount_folder,
279
            configuration=dc.data_connector.storage.configuration,
280
            readonly=dc.data_connector.storage.readonly,
281
            name=dc.data_connector.name,
282
            secrets={str(secret.secret_id): secret.name for secret in dc.secrets},
283
            storage_class=nb_config.cloud_storage.storage_class,
284
        )
285
        if len(dc.secrets) > 0:
×
286
            dcs_secrets[str(dc.data_connector.id)] = dc.secrets
×
287
    if isinstance(user, AuthenticatedAPIUser) and len(dcs_secrets) > 0:
×
288
        secret_key = await user_repo.get_or_create_user_secret_key(user)
×
289
        user_secret_key = get_encryption_key(secret_key.encode(), user.id.encode()).decode("utf-8")
×
290
    # NOTE: Check the cloud storage overrides from the request body and if any match
291
    # then overwrite the projects cloud storages
292
    # NOTE: Cloud storages in the session launch request body that are not from the DB will cause a 404 error
293
    # TODO: Is this correct? -> NOTE: Overriding the configuration when a saved secret is there will cause a 422 error
294
    for dco in data_connectors_overrides:
×
295
        dc_id = str(dco.data_connector_id)
×
296
        if dc_id not in dcs:
×
297
            raise errors.MissingResourceError(
×
298
                message=f"You have requested a data connector with ID {dc_id} which does not exist "
299
                "or you don't have access to."
300
            )
301
        # NOTE: if 'skip' is true, we do not mount that data connector
302
        if dco.skip:
×
303
            del dcs[dc_id]
×
304
            continue
×
305
        if dco.target_path is not None and not PurePosixPath(dco.target_path).is_absolute():
×
306
            dco.target_path = (work_dir / dco.target_path).as_posix()
×
307
        dcs[dc_id] = dcs[dc_id].with_override(dco)
×
308

309
    # Handle potential duplicate target_path
310
    dcs = _deduplicate_target_paths(dcs)
×
311

312
    for cs_id, cs in dcs.items():
×
313
        secret_name = f"{server_name}-ds-{cs_id.lower()}"
×
314
        secret_key_needed = len(dcs_secrets.get(cs_id, [])) > 0
×
315
        if secret_key_needed and user_secret_key is None:
×
316
            raise errors.ProgrammingError(
×
317
                message=f"You have saved storage secrets for data connector {cs_id} "
318
                f"associated with your user ID {user.id} but no key to decrypt them, "
319
                "therefore we cannot mount the requested data connector. "
320
                "Please report this to the renku administrators."
321
            )
322
        secret = ExtraSecret(
×
323
            cs.secret(
324
                secret_name,
325
                await nb_config.k8s_client.namespace(),
326
                user_secret_key=user_secret_key if secret_key_needed else None,
327
            )
328
        )
329
        secrets.append(secret)
×
330
        data_sources.append(
×
331
            DataSource(
332
                mountPath=cs.mount_folder,
333
                secretRef=secret.ref(),
334
                accessMode="ReadOnlyMany" if cs.readonly else "ReadWriteOnce",
335
            )
336
        )
337
    return SessionExtraResources(
×
338
        data_sources=data_sources,
339
        secrets=secrets,
340
        data_connector_secrets=dcs_secrets,
341
    )
342

343

344
async def request_dc_secret_creation(
2✔
345
    user: AuthenticatedAPIUser | AnonymousAPIUser,
346
    nb_config: NotebooksConfig,
347
    manifest: AmaltheaSessionV1Alpha1,
348
    dc_secrets: dict[str, list[DataConnectorSecret]],
349
) -> None:
350
    """Request the specified data connector secrets to be created by the secret service."""
351
    if isinstance(user, AnonymousAPIUser):
×
352
        return
×
353
    owner_reference = {
×
354
        "apiVersion": manifest.apiVersion,
355
        "kind": manifest.kind,
356
        "name": manifest.metadata.name,
357
        "uid": manifest.metadata.uid,
358
    }
359
    secrets_url = nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
×
360
    headers = {"Authorization": f"bearer {user.access_token}"}
×
361

362
    cluster_id = None
×
363
    namespace = await nb_config.k8s_v2_client.namespace()
×
364
    if (cluster := await nb_config.k8s_v2_client.cluster_by_class_id(manifest.resource_class_id(), user)) is not None:
×
365
        cluster_id = cluster.id
×
366
        namespace = cluster.namespace
×
367

368
    for s_id, secrets in dc_secrets.items():
×
369
        if len(secrets) == 0:
×
370
            continue
×
371
        request_data = {
×
372
            "name": f"{manifest.metadata.name}-ds-{s_id.lower()}-secrets",
373
            "namespace": namespace,
374
            "secret_ids": [str(secret.secret_id) for secret in secrets],
375
            "owner_references": [owner_reference],
376
            "key_mapping": {str(secret.secret_id): secret.name for secret in secrets},
377
            "cluster_id": str(cluster_id),
378
        }
379
        async with httpx.AsyncClient(timeout=10) as client:
×
380
            res = await client.post(secrets_url, headers=headers, json=request_data)
×
381
            if res.status_code >= 300 or res.status_code < 200:
×
382
                raise errors.ProgrammingError(
×
383
                    message=f"The secret for data connector with {s_id} could not be "
384
                    f"successfully created, the status code was {res.status_code}."
385
                    "Please contact a Renku administrator.",
386
                    detail=res.text,
387
                )
388

389

390
def get_launcher_env_variables(launcher: SessionLauncher, launch_request: SessionLaunchRequest) -> list[SessionEnvItem]:
2✔
391
    """Get the environment variables from the launcher, with overrides from the request."""
392
    output: list[SessionEnvItem] = []
×
393
    env_overrides = {i.name: i.value for i in launch_request.env_variable_overrides or []}
×
394
    for env in launcher.env_variables or []:
×
395
        if env.name in env_overrides:
×
396
            output.append(SessionEnvItem(name=env.name, value=env_overrides[env.name]))
×
397
        else:
398
            output.append(SessionEnvItem(name=env.name, value=env.value))
×
399
    return output
×
400

401

402
def verify_launcher_env_variable_overrides(launcher: SessionLauncher, launch_request: SessionLaunchRequest) -> None:
2✔
403
    """Raise an error if there are env variables that are not defined in the launcher."""
404
    env_overrides = {i.name: i.value for i in launch_request.env_variable_overrides or []}
×
405
    known_env_names = {i.name for i in launcher.env_variables or []}
×
406
    unknown_env_names = set(env_overrides.keys()) - known_env_names
×
407
    if unknown_env_names:
×
408
        message = f"""The following environment variables are not defined in the session launcher: {unknown_env_names}.
×
409
            Please remove them from the launch request or add them to the session launcher."""
410
        raise errors.ValidationError(message=message)
×
411

412

413
async def request_session_secret_creation(
2✔
414
    user: AuthenticatedAPIUser | AnonymousAPIUser,
415
    nb_config: NotebooksConfig,
416
    manifest: AmaltheaSessionV1Alpha1,
417
    session_secrets: list[SessionSecret],
418
) -> None:
419
    """Request the specified user session secrets to be created by the secret service."""
420
    if isinstance(user, AnonymousAPIUser):
×
421
        return
×
422
    if not session_secrets:
×
423
        return
×
424
    owner_reference = {
×
425
        "apiVersion": manifest.apiVersion,
426
        "kind": manifest.kind,
427
        "name": manifest.metadata.name,
428
        "uid": manifest.metadata.uid,
429
    }
430
    key_mapping: dict[str, list[str]] = dict()
×
431
    for s in session_secrets:
×
432
        secret_id = str(s.secret_id)
×
433
        if secret_id not in key_mapping:
×
434
            key_mapping[secret_id] = list()
×
435
        key_mapping[secret_id].append(s.secret_slot.filename)
×
436

437
    cluster_id = None
×
438
    namespace = await nb_config.k8s_v2_client.namespace()
×
439
    if (cluster := await nb_config.k8s_v2_client.cluster_by_class_id(manifest.resource_class_id(), user)) is not None:
×
440
        cluster_id = cluster.id
×
441
        namespace = cluster.namespace
×
442

443
    request_data = {
×
444
        "name": f"{manifest.metadata.name}-secrets",
445
        "namespace": namespace,
446
        "secret_ids": [str(s.secret_id) for s in session_secrets],
447
        "owner_references": [owner_reference],
448
        "key_mapping": key_mapping,
449
        "cluster_id": str(cluster_id),
450
    }
451
    secrets_url = nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
×
452
    headers = {"Authorization": f"bearer {user.access_token}"}
×
453
    async with httpx.AsyncClient(timeout=10) as client:
×
454
        res = await client.post(secrets_url, headers=headers, json=request_data)
×
455
        if res.status_code >= 300 or res.status_code < 200:
×
456
            raise errors.ProgrammingError(
×
457
                message="The session secrets could not be successfully created, "
458
                f"the status code was {res.status_code}."
459
                "Please contact a Renku administrator.",
460
                detail=res.text,
461
            )
462

463

464
def resources_from_resource_class(resource_class: ResourceClass) -> Resources:
2✔
465
    """Convert the resource class to a k8s resources spec."""
466
    requests: dict[str, Requests | RequestsStr] = {
×
467
        "cpu": RequestsStr(str(round(resource_class.cpu * 1000)) + "m"),
468
        "memory": RequestsStr(f"{resource_class.memory}Gi"),
469
    }
470
    limits: dict[str, Limits | LimitsStr] = {"memory": LimitsStr(f"{resource_class.memory}Gi")}
×
471
    if resource_class.gpu > 0:
×
472
        gpu_name = GpuKind.NVIDIA.value + "/gpu"
×
473
        requests[gpu_name] = Requests(resource_class.gpu)
×
474
        # NOTE: GPUs have to be set in limits too since GPUs cannot be overcommited, if
475
        # not on some clusters this will cause the session to fully fail to start.
476
        limits[gpu_name] = Limits(resource_class.gpu)
×
477
    return Resources(requests=requests, limits=limits if len(limits) > 0 else None)
×
478

479

480
def repositories_from_project(project: Project, git_providers: list[GitProvider]) -> list[Repository]:
2✔
481
    """Get the list of git repositories from a project."""
482
    repositories: list[Repository] = []
×
483
    for repo in project.repositories:
×
484
        found_provider_id: str | None = None
×
485
        for provider in git_providers:
×
486
            if urlparse(provider.url).netloc == urlparse(repo).netloc:
×
487
                found_provider_id = provider.id
×
488
                break
×
489
        repositories.append(Repository(url=repo, provider=found_provider_id))
×
490
    return repositories
×
491

492

493
async def repositories_from_session(
2✔
494
    user: AnonymousAPIUser | AuthenticatedAPIUser,
495
    session: AmaltheaSessionV1Alpha1,
496
    project_repo: ProjectRepository,
497
    git_providers: list[GitProvider],
498
) -> list[Repository]:
499
    """Get the list of git repositories from a session."""
500
    try:
×
501
        project = await project_repo.get_project(user, session.project_id)
×
502
    except errors.MissingResourceError:
×
503
        return []
×
504
    return repositories_from_project(project, git_providers)
×
505

506

507
def get_culling(
2✔
508
    user: AuthenticatedAPIUser | AnonymousAPIUser, resource_pool: ResourcePool, nb_config: NotebooksConfig
509
) -> Culling:
510
    """Create the culling specification for an AmaltheaSession."""
511
    idle_threshold_seconds = resource_pool.idle_threshold or nb_config.sessions.culling.registered.idle_seconds
×
512
    if user.is_anonymous:
×
513
        # NOTE: Anonymous sessions should not be hibernated at all, but there is no such option in Amalthea
514
        # So in this case we set a very low hibernation threshold so the session is deleted quickly after
515
        # it is hibernated.
516
        hibernation_threshold_seconds = 1
×
517
    else:
518
        hibernation_threshold_seconds = (
×
519
            resource_pool.hibernation_threshold or nb_config.sessions.culling.registered.hibernated_seconds
520
        )
521
    return Culling(
×
522
        maxAge=timedelta(seconds=nb_config.sessions.culling.registered.max_age_seconds),
523
        maxFailedDuration=timedelta(seconds=nb_config.sessions.culling.registered.failed_seconds),
524
        maxHibernatedDuration=timedelta(seconds=hibernation_threshold_seconds),
525
        maxIdleDuration=timedelta(seconds=idle_threshold_seconds),
526
        maxStartingDuration=timedelta(seconds=nb_config.sessions.culling.registered.pending_seconds),
527
    )
528

529

530
async def __requires_image_pull_secret(nb_config: NotebooksConfig, image: str, internal_gitlab_user: APIUser) -> bool:
2✔
531
    """Determines if an image requires a pull secret based on its visibility and their GitLab access token."""
532

533
    parsed_image = Image.from_path(image)
×
534
    image_repo = parsed_image.repo_api()
×
535

536
    image_exists_publicly = await image_repo.image_exists(parsed_image)
×
537
    if image_exists_publicly:
×
538
        return False
×
539

540
    if parsed_image.hostname == nb_config.git.registry and internal_gitlab_user.access_token:
×
541
        image_repo = image_repo.with_oauth2_token(internal_gitlab_user.access_token)
×
542
        image_exists_privately = await image_repo.image_exists(parsed_image)
×
543
        if image_exists_privately:
×
544
            return True
×
545
    # No pull secret needed if the image is private and the user cannot access it
546
    return False
×
547

548

549
def __format_image_pull_secret(secret_name: str, access_token: str, registry_domain: str) -> ExtraSecret:
2✔
550
    registry_secret = {
×
551
        "auths": {registry_domain: {"auth": base64.b64encode(f"oauth2:{access_token}".encode()).decode()}}
552
    }
553
    registry_secret = json.dumps(registry_secret)
×
554
    registry_secret = base64.b64encode(registry_secret.encode()).decode()
×
555
    return ExtraSecret(
×
556
        V1Secret(
557
            data={".dockerconfigjson": registry_secret},
558
            metadata=V1ObjectMeta(name=secret_name),
559
            type="kubernetes.io/dockerconfigjson",
560
        )
561
    )
562

563

564
async def __get_connected_services_image_pull_secret(
2✔
565
    secret_name: str, image_check_repo: ImageCheckRepository, image: str, user: APIUser
566
) -> ExtraSecret | None:
567
    """Return a secret for accessing the image if one is available for the given user."""
568
    image_parsed = Image.from_path(image)
×
NEW
569
    image_check_result = await image_check_repo.check_image(user=user, gitlab_user=None, image=image_parsed)
×
570
    logger.debug(f"Set pull secret for {image} to connection {image_check_result.image_provider}")
×
571
    if not image_check_result.token:
×
572
        return None
×
573

574
    if not image_check_result.image_provider:
×
575
        return None
×
576

577
    return __format_image_pull_secret(
×
578
        secret_name=secret_name,
579
        access_token=image_check_result.token,
580
        registry_domain=image_check_result.image_provider.registry_url,
581
    )
582

583

584
async def get_image_pull_secret(
2✔
585
    image: str,
586
    server_name: str,
587
    nb_config: NotebooksConfig,
588
    user: APIUser,
589
    internal_gitlab_user: APIUser,
590
    image_check_repo: ImageCheckRepository,
591
) -> ExtraSecret | None:
592
    """Get an image pull secret."""
593

594
    v2_secret = await __get_connected_services_image_pull_secret(
×
595
        f"{server_name}-image-secret", image_check_repo, image, user
596
    )
597
    if v2_secret:
×
598
        return v2_secret
×
599

600
    if (
×
601
        nb_config.enable_internal_gitlab
602
        and isinstance(user, AuthenticatedAPIUser)
603
        and internal_gitlab_user.access_token is not None
604
    ):
605
        needs_pull_secret = await __requires_image_pull_secret(nb_config, image, internal_gitlab_user)
×
606
        if needs_pull_secret:
×
607
            v1_secret = await __get_gitlab_image_pull_secret(
×
608
                nb_config, user, f"{server_name}-image-secret-v1", internal_gitlab_user.access_token
609
            )
610
            return v1_secret
×
611

612
    return None
×
613

614

615
def get_remote_secret(
2✔
616
    user: AuthenticatedAPIUser | AnonymousAPIUser,
617
    config: NotebooksConfig,
618
    server_name: str,
619
    remote_provider_id: str,
620
    git_providers: list[GitProvider],
621
) -> ExtraSecret | None:
622
    """Returns the secret containing the configuration for the remote session controller."""
623
    if not user.is_authenticated or user.access_token is None or user.refresh_token is None:
×
624
        return None
×
625
    remote_provider = next(filter(lambda p: p.id == remote_provider_id, git_providers), None)
×
626
    if not remote_provider:
×
627
        return None
×
628
    renku_base_url = "https://" + config.sessions.ingress.host
×
629
    renku_base_url = renku_base_url.rstrip("/")
×
630
    renku_auth_token_uri = f"{renku_base_url}/auth/realms/{config.keycloak_realm}/protocol/openid-connect/token"
×
631
    secret_data = {
×
632
        "RSC_AUTH_KIND": "renku",
633
        "RSC_AUTH_TOKEN_URI": remote_provider.access_token_url,
634
        "RSC_AUTH_RENKU_ACCESS_TOKEN": user.access_token,
635
        "RSC_AUTH_RENKU_REFRESH_TOKEN": user.refresh_token,
636
        "RSC_AUTH_RENKU_TOKEN_URI": renku_auth_token_uri,
637
        "RSC_AUTH_RENKU_CLIENT_ID": config.sessions.git_proxy.renku_client_id,
638
        "RSC_AUTH_RENKU_CLIENT_SECRET": config.sessions.git_proxy.renku_client_secret,
639
    }
640
    secret_name = f"{server_name}-remote-secret"
×
641
    secret = V1Secret(metadata=V1ObjectMeta(name=secret_name), string_data=secret_data)
×
642
    return ExtraSecret(secret)
×
643

644

645
def get_remote_env(
2✔
646
    remote: RemoteConfigurationFirecrest,
647
) -> list[SessionEnvItem]:
648
    """Returns env variables used for remote sessions."""
649
    env = [
×
650
        SessionEnvItem(name="RSC_REMOTE_KIND", value=remote.kind.value),
651
        SessionEnvItem(name="RSC_FIRECREST_API_URL", value=remote.api_url),
652
        SessionEnvItem(name="RSC_FIRECREST_SYSTEM_NAME", value=remote.system_name),
653
    ]
654
    if remote.partition:
×
655
        env.append(SessionEnvItem(name="RSC_FIRECREST_PARTITION", value=remote.partition))
×
656
    return env
×
657

658

659
async def start_session(
2✔
660
    request: Request,
661
    launch_request: SessionLaunchRequest,
662
    user: AnonymousAPIUser | AuthenticatedAPIUser,
663
    internal_gitlab_user: APIUser,
664
    nb_config: NotebooksConfig,
665
    git_provider_helper: GitProviderHelperProto,
666
    cluster_repo: ClusterRepository,
667
    data_connector_secret_repo: DataConnectorSecretRepository,
668
    project_repo: ProjectRepository,
669
    project_session_secret_repo: ProjectSessionSecretRepository,
670
    rp_repo: ResourcePoolRepository,
671
    session_repo: SessionRepository,
672
    user_repo: UserRepo,
673
    metrics: MetricsService,
674
    image_check_repo: ImageCheckRepository,
675
) -> tuple[AmaltheaSessionV1Alpha1, bool]:
676
    """Start an Amalthea session.
677

678
    Returns a tuple where the first item is an instance of an Amalthea session
679
    and the second item is a boolean set to true iff a new session was created.
680
    """
681
    launcher = await session_repo.get_launcher(user=user, launcher_id=launch_request.launcher_id)
×
682
    launcher_id = launcher.id
×
683
    project = await project_repo.get_project(user=user, project_id=launcher.project_id)
×
684

685
    # Determine resource_class_id: the class can be overwritten at the user's request
686
    resource_class_id = launch_request.resource_class_id or launcher.resource_class_id
×
687

688
    cluster = await nb_config.k8s_v2_client.cluster_by_class_id(resource_class_id, user)
×
689

690
    server_name = renku_2_make_server_name(
×
691
        user=user, project_id=str(launcher.project_id), launcher_id=str(launcher_id), cluster_id=str(cluster.id)
692
    )
693
    existing_session = await nb_config.k8s_v2_client.get_session(name=server_name, safe_username=user.id)
×
694
    if existing_session is not None and existing_session.spec is not None:
×
695
        return existing_session, False
×
696

697
    # Fully determine the resource pool and resource class
698
    if resource_class_id is None:
×
699
        resource_pool = await rp_repo.get_default_resource_pool()
×
700
        resource_class = resource_pool.get_default_resource_class()
×
701
        if not resource_class and len(resource_pool.classes) > 0:
×
702
            resource_class = resource_pool.classes[0]
×
703
        if not resource_class or not resource_class.id:
×
704
            raise errors.ProgrammingError(message="Cannot find any resource classes in the default pool.")
×
705
        resource_class_id = resource_class.id
×
706
    else:
707
        resource_pool = await rp_repo.get_resource_pool_from_class(user, resource_class_id)
×
708
        resource_class = resource_pool.get_resource_class(resource_class_id)
×
709
        if not resource_class or not resource_class.id:
×
710
            raise errors.MissingResourceError(message=f"The resource class with ID {resource_class_id} does not exist.")
×
711
    await nb_config.crc_validator.validate_class_storage(user, resource_class.id, launch_request.disk_storage)
×
712
    disk_storage = launch_request.disk_storage or resource_class.default_storage
×
713

714
    # Determine session location
715
    session_location = SessionLocation.remote if resource_pool.remote else SessionLocation.local
×
716
    if session_location == SessionLocation.remote and not user.is_authenticated:
×
717
        raise errors.ValidationError(message="Anonymous users cannot start remote sessions.")
×
718

719
    environment = launcher.environment
×
720
    image = environment.container_image
×
721
    work_dir = environment.working_directory
×
722
    if not work_dir:
×
723
        image_workdir = await core.docker_image_workdir(nb_config, environment.container_image, internal_gitlab_user)
×
724
        work_dir_fallback = PurePosixPath("/home/jovyan")
×
725
        work_dir = image_workdir or work_dir_fallback
×
726
    storage_mount_fallback = work_dir / "work"
×
727
    storage_mount = launcher.environment.mount_directory or storage_mount_fallback
×
728
    secrets_mount_directory = storage_mount / project.secrets_mount_directory
×
729
    session_secrets = await project_session_secret_repo.get_all_session_secrets_from_project(
×
730
        user=user, project_id=project.id
731
    )
732
    data_connectors_stream = data_connector_secret_repo.get_data_connectors_with_secrets(user, project.id)
×
733
    git_providers = await git_provider_helper.get_providers(user=user)
×
734
    repositories = repositories_from_project(project, git_providers)
×
735

736
    # User secrets
737
    session_extras = SessionExtraResources()
×
738
    session_extras = session_extras.concat(
×
739
        user_secrets_extras(
740
            user=user,
741
            config=nb_config,
742
            secrets_mount_directory=secrets_mount_directory.as_posix(),
743
            k8s_secret_name=f"{server_name}-secrets",
744
            session_secrets=session_secrets,
745
        )
746
    )
747

748
    # Data connectors
749
    session_extras = session_extras.concat(
×
750
        await get_data_sources(
751
            nb_config=nb_config,
752
            server_name=server_name,
753
            user=user,
754
            data_connectors_stream=data_connectors_stream,
755
            work_dir=work_dir,
756
            data_connectors_overrides=launch_request.data_connectors_overrides or [],
757
            user_repo=user_repo,
758
        )
759
    )
760

761
    # More init containers
762
    session_extras = session_extras.concat(
×
763
        await get_extra_init_containers(
764
            nb_config,
765
            user,
766
            repositories,
767
            git_providers,
768
            storage_mount,
769
            work_dir,
770
            uid=environment.uid,
771
            gid=environment.gid,
772
        )
773
    )
774

775
    # Extra containers
776
    session_extras = session_extras.concat(await get_extra_containers(nb_config, user, repositories, git_providers))
×
777

778
    # Cluster settings (ingress, storage class, etc)
779
    cluster_settings: ClusterSettings
780
    try:
×
781
        cluster_settings = await cluster_repo.select(cluster.id)
×
782
    except errors.MissingResourceError:
×
783
        # Fallback to global, main cluster parameters
784
        cluster_settings = nb_config.local_cluster_settings()
×
785

786
    (
×
787
        base_server_path,
788
        base_server_url,
789
        base_server_https_url,
790
        host,
791
        tls_secret,
792
        ingress_class_name,
793
        ingress_annotations,
794
    ) = cluster_settings.get_ingress_parameters(server_name)
795
    storage_class = cluster_settings.get_storage_class()
×
796
    service_account_name = cluster_settings.service_account_name
×
797

798
    ui_path = f"{base_server_path}/{environment.default_url.lstrip('/')}"
×
799

800
    ingress = Ingress(
×
801
        host=host,
802
        ingressClassName=ingress_class_name,
803
        annotations=ingress_annotations,
804
        tlsSecret=tls_secret,
805
        pathPrefix=base_server_path,
806
    )
807

808
    # Annotations
809
    annotations: dict[str, str] = {
×
810
        "renku.io/project_id": str(launcher.project_id),
811
        "renku.io/launcher_id": str(launcher_id),
812
        "renku.io/resource_class_id": str(resource_class_id),
813
    }
814

815
    # Authentication
816
    if isinstance(user, AuthenticatedAPIUser):
×
817
        auth_secret = await get_auth_secret_authenticated(
×
818
            nb_config, user, server_name, base_server_url, base_server_https_url, base_server_path
819
        )
820
    else:
821
        auth_secret = get_auth_secret_anonymous(nb_config, server_name, request)
×
822
    session_extras = session_extras.concat(
×
823
        SessionExtraResources(
824
            secrets=[auth_secret],
825
            volumes=[auth_secret.volume] if auth_secret.volume else [],
826
        )
827
    )
828
    authn_extra_volume_mounts: list[ExtraVolumeMount] = []
×
829
    if auth_secret.volume_mount:
×
830
        authn_extra_volume_mounts.append(auth_secret.volume_mount)
×
831

832
    cert_vol_mounts = init_containers.certificates_volume_mounts(nb_config)
×
833
    if cert_vol_mounts:
×
834
        authn_extra_volume_mounts.extend(cert_vol_mounts)
×
835

836
    image_secret = await get_image_pull_secret(
×
837
        image=image,
838
        server_name=server_name,
839
        nb_config=nb_config,
840
        user=user,
841
        internal_gitlab_user=internal_gitlab_user,
842
        image_check_repo=image_check_repo,
843
    )
844
    if image_secret:
×
845
        session_extras = session_extras.concat(SessionExtraResources(secrets=[image_secret]))
×
846

847
    # Remote session configuration
848
    remote_secret = None
×
849
    if session_location == SessionLocation.remote:
×
850
        assert resource_pool.remote is not None
×
851
        if resource_pool.remote.provider_id is None:
×
852
            raise errors.ProgrammingError(
×
853
                message=f"The resource pool {resource_pool.id} configuration is not valid (missing field 'remote_provider_id')."  # noqa E501
854
            )
855
        remote_secret = get_remote_secret(
×
856
            user=user,
857
            config=nb_config,
858
            server_name=server_name,
859
            remote_provider_id=resource_pool.remote.provider_id,
860
            git_providers=git_providers,
861
        )
862
    if remote_secret is not None:
×
863
        session_extras = session_extras.concat(SessionExtraResources(secrets=[remote_secret]))
×
864

865
    # Raise an error if there are invalid environment variables in the request body
866
    verify_launcher_env_variable_overrides(launcher, launch_request)
×
867
    env = [
×
868
        SessionEnvItem(name="RENKU_BASE_URL_PATH", value=base_server_path),
869
        SessionEnvItem(name="RENKU_BASE_URL", value=base_server_url),
870
        SessionEnvItem(name="RENKU_MOUNT_DIR", value=storage_mount.as_posix()),
871
        SessionEnvItem(name="RENKU_SESSION", value="1"),
872
        SessionEnvItem(name="RENKU_SESSION_IP", value="0.0.0.0"),  # nosec B104
873
        SessionEnvItem(name="RENKU_SESSION_PORT", value=f"{environment.port}"),
874
        SessionEnvItem(name="RENKU_WORKING_DIR", value=work_dir.as_posix()),
875
        SessionEnvItem(name="RENKU_SECRETS_PATH", value=project.secrets_mount_directory.as_posix()),
876
        SessionEnvItem(name="RENKU_PROJECT_ID", value=str(project.id)),
877
        SessionEnvItem(name="RENKU_PROJECT_PATH", value=project.path.serialize()),
878
        SessionEnvItem(name="RENKU_LAUNCHER_ID", value=str(launcher.id)),
879
    ]
880
    if session_location == SessionLocation.remote:
×
881
        assert resource_pool.remote is not None
×
882
        env.extend(
×
883
            get_remote_env(
884
                remote=resource_pool.remote,
885
            )
886
        )
887
    launcher_env_variables = get_launcher_env_variables(launcher, launch_request)
×
888
    env.extend(launcher_env_variables)
×
889

890
    session = AmaltheaSessionV1Alpha1(
×
891
        metadata=Metadata(name=server_name, annotations=annotations),
892
        spec=AmaltheaSessionSpec(
893
            location=session_location,
894
            imagePullSecrets=[ImagePullSecret(name=image_secret.name, adopt=True)] if image_secret else [],
895
            codeRepositories=[],
896
            hibernated=False,
897
            reconcileStrategy=ReconcileStrategy.whenFailedOrHibernated,
898
            priorityClassName=resource_class.quota,
899
            session=Session(
900
                image=image,
901
                imagePullPolicy=ImagePullPolicy.Always,
902
                urlPath=ui_path,
903
                port=environment.port,
904
                storage=Storage(
905
                    className=storage_class,
906
                    size=SizeStr(str(disk_storage) + "G"),
907
                    mountPath=storage_mount.as_posix(),
908
                ),
909
                workingDir=work_dir.as_posix(),
910
                runAsUser=environment.uid,
911
                runAsGroup=environment.gid,
912
                resources=resources_from_resource_class(resource_class),
913
                extraVolumeMounts=session_extras.volume_mounts,
914
                command=environment.command,
915
                args=environment.args,
916
                shmSize=ShmSizeStr("1G"),
917
                stripURLPath=environment.strip_path_prefix,
918
                env=env,
919
                remoteSecretRef=remote_secret.ref() if remote_secret else None,
920
            ),
921
            ingress=ingress,
922
            extraContainers=session_extras.containers,
923
            initContainers=session_extras.init_containers,
924
            extraVolumes=session_extras.volumes,
925
            culling=get_culling(user, resource_pool, nb_config),
926
            authentication=Authentication(
927
                enabled=True,
928
                type=AuthenticationType.oauth2proxy
929
                if isinstance(user, AuthenticatedAPIUser)
930
                else AuthenticationType.token,
931
                secretRef=auth_secret.key_ref("auth"),
932
                extraVolumeMounts=authn_extra_volume_mounts,
933
            ),
934
            dataSources=session_extras.data_sources,
935
            tolerations=tolerations_from_resource_class(resource_class, nb_config.sessions.tolerations_model),
936
            affinity=node_affinity_from_resource_class(resource_class, nb_config.sessions.affinity_model),
937
            serviceAccountName=service_account_name,
938
        ),
939
    )
940
    secrets_to_create = session_extras.secrets or []
×
941
    for s in secrets_to_create:
×
942
        await nb_config.k8s_v2_client.create_secret(K8sSecret.from_v1_secret(s.secret, cluster))
×
943
    try:
×
944
        session = await nb_config.k8s_v2_client.create_session(session, user)
×
945
    except Exception as err:
×
946
        for s in secrets_to_create:
×
947
            await nb_config.k8s_v2_client.delete_secret(K8sSecret.from_v1_secret(s.secret, cluster))
×
948
        raise errors.ProgrammingError(message="Could not start the amalthea session") from err
×
949
    else:
950
        try:
×
951
            await request_session_secret_creation(user, nb_config, session, session_secrets)
×
952
            data_connector_secrets = session_extras.data_connector_secrets or dict()
×
953
            await request_dc_secret_creation(user, nb_config, session, data_connector_secrets)
×
954
        except Exception:
×
955
            await nb_config.k8s_v2_client.delete_session(server_name, user.id)
×
956
            raise
×
957

958
    await metrics.user_requested_session_launch(
×
959
        user=user,
960
        metadata={
961
            "cpu": int(resource_class.cpu * 1000),
962
            "memory": resource_class.memory,
963
            "gpu": resource_class.gpu,
964
            "storage": disk_storage,
965
            "resource_class_id": resource_class.id,
966
            "resource_pool_id": resource_pool.id or "",
967
            "resource_class_name": f"{resource_pool.name}.{resource_class.name}",
968
            "session_id": server_name,
969
        },
970
    )
971
    return session, True
×
972

973

974
async def patch_session(
2✔
975
    body: apispec.SessionPatchRequest,
976
    session_id: str,
977
    user: AnonymousAPIUser | AuthenticatedAPIUser,
978
    internal_gitlab_user: APIUser,
979
    nb_config: NotebooksConfig,
980
    git_provider_helper: GitProviderHelperProto,
981
    project_repo: ProjectRepository,
982
    project_session_secret_repo: ProjectSessionSecretRepository,
983
    rp_repo: ResourcePoolRepository,
984
    session_repo: SessionRepository,
985
    image_check_repo: ImageCheckRepository,
986
    metrics: MetricsService,
987
) -> AmaltheaSessionV1Alpha1:
988
    """Patch an Amalthea session."""
989
    session = await nb_config.k8s_v2_client.get_session(session_id, user.id)
×
990
    if session is None:
×
991
        raise errors.MissingResourceError(message=f"The session with ID {session_id} does not exist")
×
992
    if session.spec is None:
×
993
        raise errors.ProgrammingError(
×
994
            message=f"The session {session_id} being patched is missing the expected 'spec' field.", quiet=True
995
        )
996
    cluster = await nb_config.k8s_v2_client.cluster_by_class_id(session.resource_class_id(), user)
×
997

998
    patch = AmaltheaSessionV1Alpha1Patch(spec=AmaltheaSessionV1Alpha1SpecPatch())
×
999
    is_getting_hibernated: bool = False
×
1000

1001
    # Hibernation
1002
    # TODO: Some patching should only be done when the session is in some states to avoid inadvertent restarts
1003
    # Refresh tokens for git proxy
1004
    if (
×
1005
        body.state is not None
1006
        and body.state.value.lower() == State.Hibernated.value.lower()
1007
        and body.state.value.lower() != session.status.state.value.lower()
1008
    ):
1009
        # Session is being hibernated
1010
        patch.spec.hibernated = True
×
1011
        is_getting_hibernated = True
×
1012
    elif (
×
1013
        body.state is not None
1014
        and body.state.value.lower() == State.Running.value.lower()
1015
        and session.status.state.value.lower() != body.state.value.lower()
1016
    ):
1017
        # Session is being resumed
1018
        patch.spec.hibernated = False
×
1019
        await metrics.user_requested_session_resume(user, metadata={"session_id": session_id})
×
1020

1021
    # Resource class
1022
    if body.resource_class_id is not None:
×
1023
        new_cluster = await nb_config.k8s_v2_client.cluster_by_class_id(body.resource_class_id, user)
×
1024
        if new_cluster.id != cluster.id:
×
1025
            raise errors.ValidationError(
×
1026
                message=(
1027
                    f"The requested resource class {body.resource_class_id} is not in the "
1028
                    f"same cluster {cluster.id} as the current resource class {session.resource_class_id()}."
1029
                )
1030
            )
1031
        rp = await rp_repo.get_resource_pool_from_class(user, body.resource_class_id)
×
1032
        rc = rp.get_resource_class(body.resource_class_id)
×
1033
        if not rc:
×
1034
            raise errors.MissingResourceError(
×
1035
                message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
1036
            )
1037
        # TODO: reject session classes which change the cluster
1038
        if not patch.metadata:
×
1039
            patch.metadata = AmaltheaSessionV1Alpha1MetadataPatch()
×
1040
        # Patch the resource class ID in the annotations
1041
        patch.metadata.annotations = {"renku.io/resource_class_id": str(body.resource_class_id)}
×
1042
        if not patch.spec.session:
×
1043
            patch.spec.session = AmaltheaSessionV1Alpha1SpecSessionPatch()
×
1044
        patch.spec.session.resources = resources_from_resource_class(rc)
×
1045
        # Tolerations
1046
        tolerations = tolerations_from_resource_class(rc, nb_config.sessions.tolerations_model)
×
1047
        patch.spec.tolerations = tolerations
×
1048
        # Affinities
1049
        patch.spec.affinity = node_affinity_from_resource_class(rc, nb_config.sessions.affinity_model)
×
1050
        # Priority class (if a quota is being used)
1051
        patch.spec.priorityClassName = rc.quota
×
1052
        patch.spec.culling = get_culling(user, rp, nb_config)
×
1053
        if rp.cluster is not None:
×
1054
            patch.spec.service_account_name = rp.cluster.service_account_name
×
1055

1056
    # If the session is being hibernated we do not need to patch anything else that is
1057
    # not specifically called for in the request body, we can refresh things when the user resumes.
1058
    if is_getting_hibernated:
×
1059
        return await nb_config.k8s_v2_client.patch_session(session_id, user.id, patch.to_rfc7386())
×
1060

1061
    server_name = session.metadata.name
×
1062
    launcher = await session_repo.get_launcher(user, session.launcher_id)
×
1063
    project = await project_repo.get_project(user=user, project_id=session.project_id)
×
1064
    environment = launcher.environment
×
1065
    work_dir = environment.working_directory
×
1066
    if not work_dir:
×
1067
        image_workdir = await core.docker_image_workdir(nb_config, environment.container_image, internal_gitlab_user)
×
1068
        work_dir_fallback = PurePosixPath("/home/jovyan")
×
1069
        work_dir = image_workdir or work_dir_fallback
×
1070
    storage_mount_fallback = work_dir / "work"
×
1071
    storage_mount = launcher.environment.mount_directory or storage_mount_fallback
×
1072
    secrets_mount_directory = storage_mount / project.secrets_mount_directory
×
1073
    session_secrets = await project_session_secret_repo.get_all_session_secrets_from_project(
×
1074
        user=user, project_id=project.id
1075
    )
1076
    git_providers = await git_provider_helper.get_providers(user=user)
×
1077
    repositories = repositories_from_project(project, git_providers)
×
1078

1079
    # User secrets
1080
    session_extras = SessionExtraResources()
×
1081
    session_extras = session_extras.concat(
×
1082
        user_secrets_extras(
1083
            user=user,
1084
            config=nb_config,
1085
            secrets_mount_directory=secrets_mount_directory.as_posix(),
1086
            k8s_secret_name=f"{server_name}-secrets",
1087
            session_secrets=session_secrets,
1088
        )
1089
    )
1090

1091
    # Data connectors: skip
1092
    # TODO: How can we patch data connectors? Should we even patch them?
1093
    # TODO: The fact that `start_session()` accepts overrides for data connectors
1094
    # TODO: but that we do not save these overrides (e.g. as annotations) means that
1095
    # TODO: we cannot patch data connectors upon resume.
1096
    # TODO: If we did, we would lose the user's provided overrides (e.g. unsaved credentials).
1097

1098
    # More init containers
1099
    session_extras = session_extras.concat(
×
1100
        await get_extra_init_containers(
1101
            nb_config,
1102
            user,
1103
            repositories,
1104
            git_providers,
1105
            storage_mount,
1106
            work_dir,
1107
            uid=environment.uid,
1108
            gid=environment.gid,
1109
        )
1110
    )
1111

1112
    # Extra containers
1113
    session_extras = session_extras.concat(await get_extra_containers(nb_config, user, repositories, git_providers))
×
1114

1115
    # Patching the image pull secret
1116
    image = session.spec.session.image
×
1117
    image_pull_secret = await get_image_pull_secret(
×
1118
        image=image,
1119
        server_name=server_name,
1120
        nb_config=nb_config,
1121
        image_check_repo=image_check_repo,
1122
        user=user,
1123
        internal_gitlab_user=internal_gitlab_user,
1124
    )
1125
    if image_pull_secret:
×
1126
        session_extras.concat(SessionExtraResources(secrets=[image_pull_secret]))
×
1127
        patch.spec.imagePullSecrets = [ImagePullSecret(name=image_pull_secret.name, adopt=image_pull_secret.adopt)]
×
1128

1129
    # Construct session patch
1130
    patch.spec.extraContainers = _make_patch_spec_list(
×
1131
        existing=session.spec.extraContainers or [], updated=session_extras.containers
1132
    )
1133
    patch.spec.initContainers = _make_patch_spec_list(
×
1134
        existing=session.spec.initContainers or [], updated=session_extras.init_containers
1135
    )
1136
    patch.spec.extraVolumes = _make_patch_spec_list(
×
1137
        existing=session.spec.extraVolumes or [], updated=session_extras.volumes
1138
    )
1139
    if not patch.spec.session:
×
1140
        patch.spec.session = AmaltheaSessionV1Alpha1SpecSessionPatch()
×
1141
    patch.spec.session.extraVolumeMounts = _make_patch_spec_list(
×
1142
        existing=session.spec.session.extraVolumeMounts or [], updated=session_extras.volume_mounts
1143
    )
1144

1145
    secrets_to_create = session_extras.secrets or []
×
1146
    for s in secrets_to_create:
×
1147
        await nb_config.k8s_v2_client.create_secret(K8sSecret.from_v1_secret(s.secret, cluster))
×
1148

1149
    patch_serialized = patch.to_rfc7386()
×
1150
    if len(patch_serialized) == 0:
×
1151
        return session
×
1152

1153
    return await nb_config.k8s_v2_client.patch_session(session_id, user.id, patch_serialized)
×
1154

1155

1156
def _deduplicate_target_paths(dcs: dict[str, RCloneStorage]) -> dict[str, RCloneStorage]:
2✔
1157
    """Ensures that the target paths for all storages are unique.
1158

1159
    This method will attempt to de-duplicate the target_path for all items passed in,
1160
    and raise an error if it fails to generate unique target_path.
1161
    """
1162
    result_dcs: dict[str, RCloneStorage] = {}
×
1163
    mount_folders: dict[str, list[str]] = {}
×
1164

1165
    def _find_mount_folder(dc: RCloneStorage) -> str:
×
1166
        mount_folder = dc.mount_folder
×
1167
        if mount_folder not in mount_folders:
×
1168
            return mount_folder
×
1169
        # 1. Try with a "-1", "-2", etc. suffix
1170
        mount_folder_try = f"{mount_folder}-{len(mount_folders[mount_folder])}"
×
1171
        if mount_folder_try not in mount_folders:
×
1172
            return mount_folder_try
×
1173
        # 2. Try with a random suffix
1174
        suffix = "".join([random.choice(string.ascii_lowercase + string.digits) for _ in range(4)])  # nosec B311
×
1175
        mount_folder_try = f"{mount_folder}-{suffix}"
×
1176
        if mount_folder_try not in mount_folders:
×
1177
            return mount_folder_try
×
1178
        raise errors.ValidationError(
×
1179
            message=f"Could not start session because two or more data connectors ({', '.join(mount_folders[mount_folder])}) share the same mount point '{mount_folder}'"  # noqa E501
1180
        )
1181

1182
    for dc_id, dc in dcs.items():
×
1183
        original_mount_folder = dc.mount_folder
×
1184
        new_mount_folder = _find_mount_folder(dc)
×
1185
        # Keep track of the original mount folder here
1186
        if new_mount_folder != original_mount_folder:
×
1187
            logger.warning(f"Re-assigning data connector {dc_id} to mount point '{new_mount_folder}'")
×
1188
            dc_ids = mount_folders.get(original_mount_folder, [])
×
1189
            dc_ids.append(dc_id)
×
1190
            mount_folders[original_mount_folder] = dc_ids
×
1191
        # Keep track of the assigned mount folder here
1192
        dc_ids = mount_folders.get(new_mount_folder, [])
×
1193
        dc_ids.append(dc_id)
×
1194
        mount_folders[new_mount_folder] = dc_ids
×
1195
        result_dcs[dc_id] = dc.with_override(
×
1196
            override=SessionDataConnectorOverride(
1197
                skip=False,
1198
                data_connector_id=ULID.from_str(dc_id),
1199
                target_path=new_mount_folder,
1200
                configuration=None,
1201
                source_path=None,
1202
                readonly=None,
1203
            )
1204
        )
1205

1206
    return result_dcs
×
1207

1208

1209
class _NamedResource(Protocol):
2✔
1210
    """Represents a resource with a name."""
1211

1212
    name: str
2✔
1213

1214

1215
_T = TypeVar("_T", bound=_NamedResource)
2✔
1216

1217

1218
def _make_patch_spec_list(existing: Sequence[_T], updated: Sequence[_T]) -> list[_T] | None:
2✔
1219
    """Merges updated into existing by upserting items identified by their name.
1220

1221
    This method is used to construct session patches, merging session resources by name (containers, volumes, etc.).
1222
    """
1223
    patch_list = None
1✔
1224
    if updated:
1✔
1225
        patch_list = list(existing)
1✔
1226
        upsert_list = list(updated)
1✔
1227
        for upsert_item in upsert_list:
1✔
1228
            # Find out if the upsert_item needs to be added or updated
1229
            # found = next(enumerate(filter(lambda item: item.name == upsert_item.name, patch_list)), None)
1230
            found = next(filter(lambda t: t[1].name == upsert_item.name, enumerate(patch_list)), None)
1✔
1231
            if found is not None:
1✔
1232
                idx, _ = found
1✔
1233
                patch_list[idx] = upsert_item
1✔
1234
            else:
1235
                patch_list.append(upsert_item)
1✔
1236
    return patch_list
1✔
1237

1238

1239
def validate_session_post_request(body: apispec.SessionPostRequest) -> SessionLaunchRequest:
2✔
1240
    """Validate a session launch request."""
1241
    data_connectors_overrides = (
×
1242
        [
1243
            SessionDataConnectorOverride(
1244
                skip=dc.skip,
1245
                data_connector_id=ULID.from_str(dc.data_connector_id),
1246
                configuration=dc.configuration,
1247
                source_path=dc.source_path,
1248
                target_path=dc.target_path,
1249
                readonly=dc.readonly,
1250
            )
1251
            for dc in body.data_connectors_overrides
1252
        ]
1253
        if body.data_connectors_overrides
1254
        else None
1255
    )
1256
    env_variable_overrides = (
×
1257
        [SessionEnvVar(name=ev.name, value=ev.value) for ev in body.env_variable_overrides]
1258
        if body.env_variable_overrides
1259
        else None
1260
    )
1261
    return SessionLaunchRequest(
×
1262
        launcher_id=ULID.from_str(body.launcher_id),
1263
        disk_storage=body.disk_storage,
1264
        resource_class_id=body.resource_class_id,
1265
        data_connectors_overrides=data_connectors_overrides,
1266
        env_variable_overrides=env_variable_overrides,
1267
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc