• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 19181077268

07 Nov 2025 09:00PM UTC coverage: 86.352% (-0.5%) from 86.841%
19181077268

Pull #1059

github

web-flow
Merge fb47045e6 into 58a6e4765
Pull Request #1059: fix: patching of session custom resources

89 of 104 new or added lines in 5 files covered. (85.58%)

577 existing lines in 33 files now uncovered.

22791 of 26393 relevant lines covered (86.35%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.6
/components/renku_data_services/notebooks/core_sessions.py
1
"""A selection of core functions for AmaltheaSessions."""
2

3
import base64
2✔
4
import json
2✔
5
import os
2✔
6
import random
2✔
7
import string
2✔
8
from collections.abc import AsyncIterator, Mapping, Sequence
2✔
9
from datetime import timedelta
2✔
10
from pathlib import PurePosixPath
2✔
11
from typing import Protocol, TypeVar, cast
2✔
12
from urllib.parse import urljoin, urlparse
2✔
13

14
import httpx
2✔
15
from kubernetes.client import V1ObjectMeta, V1Secret
2✔
16
from sanic import Request
2✔
17
from toml import dumps
2✔
18
from ulid import ULID
2✔
19
from yaml import safe_dump
2✔
20

21
import renku_data_services.notebooks.image_check as ic
2✔
22
from renku_data_services.app_config import logging
2✔
23
from renku_data_services.base_models import RESET, AnonymousAPIUser, APIUser, AuthenticatedAPIUser, ResetType
2✔
24
from renku_data_services.base_models.metrics import MetricsService
2✔
25
from renku_data_services.connected_services.db import ConnectedServicesRepository
2✔
26
from renku_data_services.crc.db import ClusterRepository, ResourcePoolRepository
2✔
27
from renku_data_services.crc.models import (
2✔
28
    ClusterSettings,
29
    GpuKind,
30
    RemoteConfigurationFirecrest,
31
    ResourceClass,
32
    ResourcePool,
33
)
34
from renku_data_services.data_connectors.db import (
2✔
35
    DataConnectorSecretRepository,
36
)
37
from renku_data_services.data_connectors.models import DataConnectorSecret, DataConnectorWithSecrets
2✔
38
from renku_data_services.errors import errors
2✔
39
from renku_data_services.k8s.models import K8sSecret, sanitizer
2✔
40
from renku_data_services.notebooks import apispec, core
2✔
41
from renku_data_services.notebooks.api.amalthea_patches import git_proxy, init_containers
2✔
42
from renku_data_services.notebooks.api.amalthea_patches.init_containers import user_secrets_extras
2✔
43
from renku_data_services.notebooks.api.classes.image import Image
2✔
44
from renku_data_services.notebooks.api.classes.repository import GitProvider, Repository
2✔
45
from renku_data_services.notebooks.api.schemas.cloud_storage import RCloneStorage
2✔
46
from renku_data_services.notebooks.config import GitProviderHelperProto, NotebooksConfig
2✔
47
from renku_data_services.notebooks.crs import (
2✔
48
    AmaltheaSessionSpec,
49
    AmaltheaSessionV1Alpha1,
50
    AmaltheaSessionV1Alpha1MetadataPatch,
51
    AmaltheaSessionV1Alpha1Patch,
52
    AmaltheaSessionV1Alpha1SpecPatch,
53
    AmaltheaSessionV1Alpha1SpecSessionPatch,
54
    Authentication,
55
    AuthenticationType,
56
    Culling,
57
    CullingPatch,
58
    DataSource,
59
    ExtraContainer,
60
    ExtraVolume,
61
    ExtraVolumeMount,
62
    ImagePullPolicy,
63
    ImagePullSecret,
64
    Ingress,
65
    InitContainer,
66
    Limits,
67
    LimitsStr,
68
    Metadata,
69
    ReconcileStrategy,
70
    Requests,
71
    RequestsStr,
72
    Resources,
73
    ResourcesPatch,
74
    SecretAsVolume,
75
    SecretAsVolumeItem,
76
    Session,
77
    SessionEnvItem,
78
    SessionLocation,
79
    ShmSizeStr,
80
    SizeStr,
81
    State,
82
    Storage,
83
)
84
from renku_data_services.notebooks.models import (
2✔
85
    ExtraSecret,
86
    SessionDataConnectorOverride,
87
    SessionEnvVar,
88
    SessionExtraResources,
89
    SessionLaunchRequest,
90
)
91
from renku_data_services.notebooks.util.kubernetes_ import (
2✔
92
    renku_2_make_server_name,
93
)
94
from renku_data_services.notebooks.utils import (
2✔
95
    node_affinity_from_resource_class,
96
    node_affinity_patch_from_resource_class,
97
    tolerations_from_resource_class,
98
)
99
from renku_data_services.project.db import ProjectRepository, ProjectSessionSecretRepository
2✔
100
from renku_data_services.project.models import Project, SessionSecret
2✔
101
from renku_data_services.session.db import SessionRepository
2✔
102
from renku_data_services.session.models import SessionLauncher
2✔
103
from renku_data_services.users.db import UserRepo
2✔
104
from renku_data_services.utils.cryptography import get_encryption_key
2✔
105

106
logger = logging.getLogger(__name__)
2✔
107

108

109
async def get_extra_init_containers(
2✔
110
    nb_config: NotebooksConfig,
111
    user: AnonymousAPIUser | AuthenticatedAPIUser,
112
    repositories: list[Repository],
113
    git_providers: list[GitProvider],
114
    storage_mount: PurePosixPath,
115
    work_dir: PurePosixPath,
116
    uid: int = 1000,
117
    gid: int = 1000,
118
) -> SessionExtraResources:
119
    """Get all extra init containers that should be added to an amalthea session."""
120
    # TODO: The above statement is not correct: the init container for user secrets is not included here
121
    cert_init, cert_vols = init_containers.certificates_container(nb_config)
1✔
122
    session_init_containers = [InitContainer.model_validate(sanitizer(cert_init))]
1✔
123
    extra_volumes = [ExtraVolume.model_validate(sanitizer(volume)) for volume in cert_vols]
1✔
124
    git_clone = await init_containers.git_clone_container_v2(
1✔
125
        user=user,
126
        config=nb_config,
127
        repositories=repositories,
128
        git_providers=git_providers,
129
        workspace_mount_path=storage_mount,
130
        work_dir=work_dir,
131
        uid=uid,
132
        gid=gid,
133
    )
134
    if git_clone is not None:
1✔
135
        session_init_containers.append(InitContainer.model_validate(git_clone))
×
136
    return SessionExtraResources(
1✔
137
        init_containers=session_init_containers,
138
        volumes=extra_volumes,
139
    )
140

141

142
async def get_extra_containers(
2✔
143
    nb_config: NotebooksConfig,
144
    user: AnonymousAPIUser | AuthenticatedAPIUser,
145
    repositories: list[Repository],
146
    git_providers: list[GitProvider],
147
) -> SessionExtraResources:
148
    """Get the extra containers added to amalthea sessions."""
149
    conts: list[ExtraContainer] = []
1✔
150
    git_proxy_container = await git_proxy.main_container(
1✔
151
        user=user, config=nb_config, repositories=repositories, git_providers=git_providers
152
    )
153
    if git_proxy_container:
1✔
154
        conts.append(ExtraContainer.model_validate(sanitizer(git_proxy_container)))
×
155
    return SessionExtraResources(containers=conts)
1✔
156

157

158
async def get_auth_secret_authenticated(
2✔
159
    nb_config: NotebooksConfig,
160
    user: AuthenticatedAPIUser,
161
    server_name: str,
162
    base_server_url: str,
163
    base_server_https_url: str,
164
    base_server_path: str,
165
) -> ExtraSecret:
166
    """Get the extra secrets that need to be added to the session for an authenticated user."""
167
    secret_data = {}
×
168

169
    parsed_proxy_url = urlparse(urljoin(base_server_url + "/", "oauth2"))
×
170
    vol = ExtraVolume(
×
171
        name="renku-authorized-emails",
172
        secret=SecretAsVolume(
173
            secretName=server_name,
174
            items=[SecretAsVolumeItem(key="authorized_emails", path="authorized_emails")],
175
        ),
176
    )
177
    secret_data["auth"] = dumps(
×
178
        {
179
            "provider": "oidc",
180
            "client_id": nb_config.sessions.oidc.client_id,
181
            "oidc_issuer_url": nb_config.sessions.oidc.issuer_url,
182
            "session_cookie_minimal": True,
183
            "skip_provider_button": True,
184
            # NOTE: If the redirect url is not HTTPS then some or identity providers will fail.
185
            "redirect_url": urljoin(base_server_https_url + "/", "oauth2/callback"),
186
            "cookie_path": base_server_path,
187
            "proxy_prefix": parsed_proxy_url.path,
188
            "authenticated_emails_file": "/authorized_emails",
189
            "client_secret": nb_config.sessions.oidc.client_secret,
190
            "cookie_secret": base64.urlsafe_b64encode(os.urandom(32)).decode(),
191
            "insecure_oidc_allow_unverified_email": nb_config.sessions.oidc.allow_unverified_email,
192
        }
193
    )
194
    secret_data["authorized_emails"] = user.email
×
195
    secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
×
196
    vol_mount = ExtraVolumeMount(
×
197
        name="renku-authorized-emails",
198
        mountPath="/authorized_emails",
199
        subPath="authorized_emails",
200
    )
201
    return ExtraSecret(secret, vol, vol_mount)
×
202

203

204
def get_auth_secret_anonymous(nb_config: NotebooksConfig, server_name: str, request: Request) -> ExtraSecret:
2✔
205
    """Get the extra secrets that need to be added to the session for an anonymous user."""
206
    # NOTE: We extract the session cookie value here in order to avoid creating a cookie.
207
    # The gateway encrypts and signs cookies so the user ID injected in the request headers does not
208
    # match the value of the session cookie.
209
    session_id = cast(str | None, request.cookies.get(nb_config.session_id_cookie_name))
1✔
210
    if not session_id:
1✔
211
        raise errors.UnauthorizedError(
×
212
            message=f"You have to have a renku session cookie at {nb_config.session_id_cookie_name} "
213
            "in order to launch an anonymous session."
214
        )
215
    # NOTE: Amalthea looks for the token value first in the cookie and then in the authorization header
216
    secret_data = {
1✔
217
        "auth": safe_dump(
218
            {
219
                "authproxy": {
220
                    "token": session_id,
221
                    "cookie_key": nb_config.session_id_cookie_name,
222
                    "verbose": True,
223
                }
224
            }
225
        )
226
    }
227
    secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
1✔
228
    return ExtraSecret(secret)
1✔
229

230

231
async def __get_gitlab_image_pull_secret(
2✔
232
    nb_config: NotebooksConfig, user: AuthenticatedAPIUser, image_pull_secret_name: str, access_token: str
233
) -> ExtraSecret:
234
    """Create a Kubernetes secret for private GitLab registry authentication."""
235

236
    k8s_namespace = await nb_config.k8s_client.namespace()
×
237

238
    registry_secret = {
×
239
        "auths": {
240
            nb_config.git.registry: {
241
                "Username": "oauth2",
242
                "Password": access_token,
243
                "Email": user.email,
244
            }
245
        }
246
    }
247
    registry_secret = json.dumps(registry_secret)
×
248

249
    secret_data = {".dockerconfigjson": registry_secret}
×
250
    secret = V1Secret(
×
251
        metadata=V1ObjectMeta(name=image_pull_secret_name, namespace=k8s_namespace),
252
        string_data=secret_data,
253
        type="kubernetes.io/dockerconfigjson",
254
    )
255

256
    return ExtraSecret(secret)
×
257

258

259
async def get_data_sources(
2✔
260
    nb_config: NotebooksConfig,
261
    user: AnonymousAPIUser | AuthenticatedAPIUser,
262
    server_name: str,
263
    data_connectors_stream: AsyncIterator[DataConnectorWithSecrets],
264
    work_dir: PurePosixPath,
265
    data_connectors_overrides: list[SessionDataConnectorOverride],
266
    user_repo: UserRepo,
267
) -> SessionExtraResources:
268
    """Generate cloud storage related resources."""
269
    data_sources: list[DataSource] = []
1✔
270
    secrets: list[ExtraSecret] = []
1✔
271
    dcs: dict[str, RCloneStorage] = {}
1✔
272
    dcs_secrets: dict[str, list[DataConnectorSecret]] = {}
1✔
273
    user_secret_key: str | None = None
1✔
274
    async for dc in data_connectors_stream:
1✔
275
        mount_folder = (
×
276
            dc.data_connector.storage.target_path
277
            if PurePosixPath(dc.data_connector.storage.target_path).is_absolute()
278
            else (work_dir / dc.data_connector.storage.target_path).as_posix()
279
        )
280
        dcs[str(dc.data_connector.id)] = RCloneStorage(
×
281
            source_path=dc.data_connector.storage.source_path,
282
            mount_folder=mount_folder,
283
            configuration=dc.data_connector.storage.configuration,
284
            readonly=dc.data_connector.storage.readonly,
285
            name=dc.data_connector.name,
286
            secrets={str(secret.secret_id): secret.name for secret in dc.secrets},
287
            storage_class=nb_config.cloud_storage.storage_class,
288
        )
289
        if len(dc.secrets) > 0:
×
290
            dcs_secrets[str(dc.data_connector.id)] = dc.secrets
×
291
    if isinstance(user, AuthenticatedAPIUser) and len(dcs_secrets) > 0:
1✔
292
        secret_key = await user_repo.get_or_create_user_secret_key(user)
×
293
        user_secret_key = get_encryption_key(secret_key.encode(), user.id.encode()).decode("utf-8")
×
294
    # NOTE: Check the cloud storage overrides from the request body and if any match
295
    # then overwrite the projects cloud storages
296
    # NOTE: Cloud storages in the session launch request body that are not from the DB will cause a 404 error
297
    # TODO: Is this correct? -> NOTE: Overriding the configuration when a saved secret is there will cause a 422 error
298
    for dco in data_connectors_overrides:
1✔
299
        dc_id = str(dco.data_connector_id)
×
300
        if dc_id not in dcs:
×
301
            raise errors.MissingResourceError(
×
302
                message=f"You have requested a data connector with ID {dc_id} which does not exist "
303
                "or you don't have access to."
304
            )
305
        # NOTE: if 'skip' is true, we do not mount that data connector
306
        if dco.skip:
×
307
            del dcs[dc_id]
×
308
            continue
×
309
        if dco.target_path is not None and not PurePosixPath(dco.target_path).is_absolute():
×
310
            dco.target_path = (work_dir / dco.target_path).as_posix()
×
311
        dcs[dc_id] = dcs[dc_id].with_override(dco)
×
312

313
    # Handle potential duplicate target_path
314
    dcs = _deduplicate_target_paths(dcs)
1✔
315

316
    for cs_id, cs in dcs.items():
1✔
317
        secret_name = f"{server_name}-ds-{cs_id.lower()}"
×
318
        secret_key_needed = len(dcs_secrets.get(cs_id, [])) > 0
×
319
        if secret_key_needed and user_secret_key is None:
×
320
            raise errors.ProgrammingError(
×
321
                message=f"You have saved storage secrets for data connector {cs_id} "
322
                f"associated with your user ID {user.id} but no key to decrypt them, "
323
                "therefore we cannot mount the requested data connector. "
324
                "Please report this to the renku administrators."
325
            )
326
        secret = ExtraSecret(
×
327
            cs.secret(
328
                secret_name,
329
                await nb_config.k8s_client.namespace(),
330
                user_secret_key=user_secret_key if secret_key_needed else None,
331
            )
332
        )
333
        secrets.append(secret)
×
334
        data_sources.append(
×
335
            DataSource(
336
                mountPath=cs.mount_folder,
337
                secretRef=secret.ref(),
338
                accessMode="ReadOnlyMany" if cs.readonly else "ReadWriteOnce",
339
            )
340
        )
341
    return SessionExtraResources(
1✔
342
        data_sources=data_sources,
343
        secrets=secrets,
344
        data_connector_secrets=dcs_secrets,
345
    )
346

347

348
async def request_dc_secret_creation(
2✔
349
    user: AuthenticatedAPIUser | AnonymousAPIUser,
350
    nb_config: NotebooksConfig,
351
    manifest: AmaltheaSessionV1Alpha1,
352
    dc_secrets: dict[str, list[DataConnectorSecret]],
353
) -> None:
354
    """Request the specified data connector secrets to be created by the secret service."""
355
    if isinstance(user, AnonymousAPIUser):
1✔
356
        return
×
357
    owner_reference = {
1✔
358
        "apiVersion": manifest.apiVersion,
359
        "kind": manifest.kind,
360
        "name": manifest.metadata.name,
361
        "uid": manifest.metadata.uid,
362
    }
363
    secrets_url = nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
1✔
364
    headers = {"Authorization": f"bearer {user.access_token}"}
1✔
365

366
    cluster_id = None
1✔
367
    namespace = await nb_config.k8s_v2_client.namespace()
1✔
368
    if (cluster := await nb_config.k8s_v2_client.cluster_by_class_id(manifest.resource_class_id(), user)) is not None:
1✔
369
        cluster_id = cluster.id
1✔
370
        namespace = cluster.namespace
1✔
371

372
    for s_id, secrets in dc_secrets.items():
1✔
373
        if len(secrets) == 0:
×
374
            continue
×
375
        request_data = {
×
376
            "name": f"{manifest.metadata.name}-ds-{s_id.lower()}-secrets",
377
            "namespace": namespace,
378
            "secret_ids": [str(secret.secret_id) for secret in secrets],
379
            "owner_references": [owner_reference],
380
            "key_mapping": {str(secret.secret_id): secret.name for secret in secrets},
381
            "cluster_id": str(cluster_id),
382
        }
383
        async with httpx.AsyncClient(timeout=10) as client:
×
384
            res = await client.post(secrets_url, headers=headers, json=request_data)
×
385
            if res.status_code >= 300 or res.status_code < 200:
×
386
                raise errors.ProgrammingError(
×
387
                    message=f"The secret for data connector with {s_id} could not be "
388
                    f"successfully created, the status code was {res.status_code}."
389
                    "Please contact a Renku administrator.",
390
                    detail=res.text,
391
                )
392

393

394
def get_launcher_env_variables(launcher: SessionLauncher, launch_request: SessionLaunchRequest) -> list[SessionEnvItem]:
2✔
395
    """Get the environment variables from the launcher, with overrides from the request."""
396
    output: list[SessionEnvItem] = []
1✔
397
    env_overrides = {i.name: i.value for i in launch_request.env_variable_overrides or []}
1✔
398
    for env in launcher.env_variables or []:
1✔
399
        if env.name in env_overrides:
×
400
            output.append(SessionEnvItem(name=env.name, value=env_overrides[env.name]))
×
401
        else:
402
            output.append(SessionEnvItem(name=env.name, value=env.value))
×
403
    return output
1✔
404

405

406
def verify_launcher_env_variable_overrides(launcher: SessionLauncher, launch_request: SessionLaunchRequest) -> None:
2✔
407
    """Raise an error if there are env variables that are not defined in the launcher."""
408
    env_overrides = {i.name: i.value for i in launch_request.env_variable_overrides or []}
1✔
409
    known_env_names = {i.name for i in launcher.env_variables or []}
1✔
410
    unknown_env_names = set(env_overrides.keys()) - known_env_names
1✔
411
    if unknown_env_names:
1✔
412
        message = f"""The following environment variables are not defined in the session launcher: {unknown_env_names}.
×
413
            Please remove them from the launch request or add them to the session launcher."""
414
        raise errors.ValidationError(message=message)
×
415

416

417
async def request_session_secret_creation(
2✔
418
    user: AuthenticatedAPIUser | AnonymousAPIUser,
419
    nb_config: NotebooksConfig,
420
    manifest: AmaltheaSessionV1Alpha1,
421
    session_secrets: list[SessionSecret],
422
) -> None:
423
    """Request the specified user session secrets to be created by the secret service."""
424
    if isinstance(user, AnonymousAPIUser):
1✔
425
        return
×
426
    if not session_secrets:
1✔
427
        return
1✔
428
    owner_reference = {
×
429
        "apiVersion": manifest.apiVersion,
430
        "kind": manifest.kind,
431
        "name": manifest.metadata.name,
432
        "uid": manifest.metadata.uid,
433
    }
434
    key_mapping: dict[str, list[str]] = dict()
×
435
    for s in session_secrets:
×
436
        secret_id = str(s.secret_id)
×
437
        if secret_id not in key_mapping:
×
438
            key_mapping[secret_id] = list()
×
439
        key_mapping[secret_id].append(s.secret_slot.filename)
×
440

441
    cluster_id = None
×
442
    namespace = await nb_config.k8s_v2_client.namespace()
×
443
    if (cluster := await nb_config.k8s_v2_client.cluster_by_class_id(manifest.resource_class_id(), user)) is not None:
×
444
        cluster_id = cluster.id
×
445
        namespace = cluster.namespace
×
446

447
    request_data = {
×
448
        "name": f"{manifest.metadata.name}-secrets",
449
        "namespace": namespace,
450
        "secret_ids": [str(s.secret_id) for s in session_secrets],
451
        "owner_references": [owner_reference],
452
        "key_mapping": key_mapping,
453
        "cluster_id": str(cluster_id),
454
    }
455
    secrets_url = nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
×
456
    headers = {"Authorization": f"bearer {user.access_token}"}
×
457
    async with httpx.AsyncClient(timeout=10) as client:
×
458
        res = await client.post(secrets_url, headers=headers, json=request_data)
×
459
        if res.status_code >= 300 or res.status_code < 200:
×
460
            raise errors.ProgrammingError(
×
461
                message="The session secrets could not be successfully created, "
462
                f"the status code was {res.status_code}."
463
                "Please contact a Renku administrator.",
464
                detail=res.text,
465
            )
466

467

468
def resources_patch_from_resource_class(resource_class: ResourceClass) -> ResourcesPatch:
2✔
469
    """Convert the resource class to a k8s resources spec."""
470
    gpu_name = GpuKind.NVIDIA.value + "/gpu"
1✔
471
    resources = resources_from_resource_class(resource_class)
1✔
472
    requests: Mapping[str, Requests | RequestsStr | ResetType] | ResetType | None = None
1✔
473
    limits: Mapping[str, Limits | LimitsStr | ResetType] | ResetType | None = None
1✔
474
    defaul_requests = {"memory": RESET, "cpu": RESET, gpu_name: RESET}
1✔
475
    default_limits = {"memory": RESET, "cpu": RESET, gpu_name: RESET}
1✔
476
    if resources.requests is not None:
1✔
477
        requests = RESET if len(resources.requests.keys()) == 0 else {**defaul_requests, **resources.requests}
1✔
478
    if resources.limits is not None:
1✔
479
        limits = RESET if len(resources.limits.keys()) == 0 else {**default_limits, **resources.limits}
1✔
480
    return ResourcesPatch(requests=requests, limits=limits)
1✔
481

482

483
def resources_from_resource_class(resource_class: ResourceClass) -> Resources:
2✔
484
    """Convert the resource class to a k8s resources spec."""
485
    requests: dict[str, Requests | RequestsStr] = {
1✔
486
        "cpu": RequestsStr(str(round(resource_class.cpu * 1000)) + "m"),
487
        "memory": RequestsStr(f"{resource_class.memory}Gi"),
488
    }
489
    limits: dict[str, Limits | LimitsStr] = {"memory": LimitsStr(f"{resource_class.memory}Gi")}
1✔
490
    if resource_class.gpu > 0:
1✔
491
        gpu_name = GpuKind.NVIDIA.value + "/gpu"
×
492
        requests[gpu_name] = Requests(resource_class.gpu)
×
493
        # NOTE: GPUs have to be set in limits too since GPUs cannot be overcommited, if
494
        # not on some clusters this will cause the session to fully fail to start.
495
        limits[gpu_name] = Limits(resource_class.gpu)
×
496
    return Resources(requests=requests, limits=limits if len(limits) > 0 else None)
1✔
497

498

499
def repositories_from_project(project: Project, git_providers: list[GitProvider]) -> list[Repository]:
2✔
500
    """Get the list of git repositories from a project."""
501
    repositories: list[Repository] = []
1✔
502
    for repo in project.repositories:
1✔
503
        found_provider_id: str | None = None
×
504
        for provider in git_providers:
×
505
            if urlparse(provider.url).netloc == urlparse(repo).netloc:
×
506
                found_provider_id = provider.id
×
507
                break
×
508
        repositories.append(Repository(url=repo, provider=found_provider_id))
×
509
    return repositories
1✔
510

511

512
async def repositories_from_session(
2✔
513
    user: AnonymousAPIUser | AuthenticatedAPIUser,
514
    session: AmaltheaSessionV1Alpha1,
515
    project_repo: ProjectRepository,
516
    git_providers: list[GitProvider],
517
) -> list[Repository]:
518
    """Get the list of git repositories from a session."""
519
    try:
×
520
        project = await project_repo.get_project(user, session.project_id)
×
521
    except errors.MissingResourceError:
×
522
        return []
×
523
    return repositories_from_project(project, git_providers)
×
524

525

526
def get_culling(
2✔
527
    user: AuthenticatedAPIUser | AnonymousAPIUser, resource_pool: ResourcePool, nb_config: NotebooksConfig
528
) -> Culling:
529
    """Create the culling specification for an AmaltheaSession."""
530
    if user.is_anonymous:
1✔
531
        # NOTE: Anonymous sessions should not be hibernated at all, but there is no such option in Amalthea
532
        # So in this case we set a very low hibernation threshold so the session is deleted quickly after
533
        # it is hibernated.
534
        hibernation_threshold: timedelta | None = timedelta(seconds=1)
×
535
    else:
536
        hibernation_threshold = (
1✔
537
            timedelta(seconds=resource_pool.hibernation_threshold)
538
            if resource_pool.hibernation_threshold is not None
539
            else None
540
        )
541
    return Culling(
1✔
542
        maxAge=timedelta(seconds=nb_config.sessions.culling.registered.max_age_seconds),
543
        maxFailedDuration=timedelta(seconds=nb_config.sessions.culling.registered.failed_seconds),
544
        maxHibernatedDuration=hibernation_threshold,
545
        maxIdleDuration=timedelta(seconds=resource_pool.idle_threshold)
546
        if resource_pool.idle_threshold is not None
547
        else None,
548
        maxStartingDuration=timedelta(seconds=nb_config.sessions.culling.registered.pending_seconds),
549
    )
550

551

552
def get_culling_patch(
2✔
553
    user: AuthenticatedAPIUser | AnonymousAPIUser, resource_pool: ResourcePool, nb_config: NotebooksConfig
554
) -> CullingPatch:
555
    """Get the patch for the culling durations of a session."""
556
    culling = get_culling(user, resource_pool, nb_config)
1✔
557
    return CullingPatch(
1✔
558
        maxAge=culling.maxAge or RESET,
559
        maxFailedDuration=culling.maxFailedDuration or RESET,
560
        maxHibernatedDuration=culling.maxHibernatedDuration or RESET,
561
        maxIdleDuration=culling.maxIdleDuration or RESET,
562
        maxStartingDuration=culling.maxStartingDuration or RESET,
563
    )
564

565

566
async def __requires_image_pull_secret(nb_config: NotebooksConfig, image: str, internal_gitlab_user: APIUser) -> bool:
2✔
567
    """Determines if an image requires a pull secret based on its visibility and their GitLab access token."""
568

569
    parsed_image = Image.from_path(image)
×
570
    image_repo = parsed_image.repo_api()
×
571

572
    image_exists_publicly = await image_repo.image_exists(parsed_image)
×
573
    if image_exists_publicly:
×
574
        return False
×
575

576
    if parsed_image.hostname == nb_config.git.registry and internal_gitlab_user.access_token:
×
577
        image_repo = image_repo.with_oauth2_token(internal_gitlab_user.access_token)
×
578
        image_exists_privately = await image_repo.image_exists(parsed_image)
×
579
        if image_exists_privately:
×
580
            return True
×
581
    # No pull secret needed if the image is private and the user cannot access it
582
    return False
×
583

584

585
def __format_image_pull_secret(secret_name: str, access_token: str, registry_domain: str) -> ExtraSecret:
2✔
586
    registry_secret = {
×
587
        "auths": {registry_domain: {"auth": base64.b64encode(f"oauth2:{access_token}".encode()).decode()}}
588
    }
589
    registry_secret = json.dumps(registry_secret)
×
590
    registry_secret = base64.b64encode(registry_secret.encode()).decode()
×
591
    return ExtraSecret(
×
592
        V1Secret(
593
            data={".dockerconfigjson": registry_secret},
594
            metadata=V1ObjectMeta(name=secret_name),
595
            type="kubernetes.io/dockerconfigjson",
596
        )
597
    )
598

599

600
async def __get_connected_services_image_pull_secret(
2✔
601
    secret_name: str, connected_svcs_repo: ConnectedServicesRepository, image: str, user: APIUser
602
) -> ExtraSecret | None:
603
    """Return a secret for accessing the image if one is available for the given user."""
604
    image_parsed = Image.from_path(image)
1✔
605
    image_check_result = await ic.check_image(image_parsed, user, connected_svcs_repo, None)
1✔
606
    logger.debug(f"Set pull secret for {image} to connection {image_check_result.image_provider}")
1✔
607
    if not image_check_result.token:
1✔
608
        return None
1✔
609

610
    if not image_check_result.image_provider:
×
611
        return None
×
612

613
    return __format_image_pull_secret(
×
614
        secret_name=secret_name,
615
        access_token=image_check_result.token,
616
        registry_domain=image_check_result.image_provider.registry_url,
617
    )
618

619

620
async def get_image_pull_secret(
2✔
621
    image: str,
622
    server_name: str,
623
    nb_config: NotebooksConfig,
624
    user: APIUser,
625
    internal_gitlab_user: APIUser,
626
    connected_svcs_repo: ConnectedServicesRepository,
627
) -> ExtraSecret | None:
628
    """Get an image pull secret."""
629

630
    v2_secret = await __get_connected_services_image_pull_secret(
1✔
631
        f"{server_name}-image-secret", connected_svcs_repo, image, user
632
    )
633
    if v2_secret:
1✔
634
        return v2_secret
×
635

636
    if (
1✔
637
        nb_config.enable_internal_gitlab
638
        and isinstance(user, AuthenticatedAPIUser)
639
        and internal_gitlab_user.access_token is not None
640
    ):
641
        needs_pull_secret = await __requires_image_pull_secret(nb_config, image, internal_gitlab_user)
×
642
        if needs_pull_secret:
×
643
            v1_secret = await __get_gitlab_image_pull_secret(
×
644
                nb_config, user, f"{server_name}-image-secret-v1", internal_gitlab_user.access_token
645
            )
646
            return v1_secret
×
647

648
    return None
1✔
649

650

651
def get_remote_secret(
2✔
652
    user: AuthenticatedAPIUser | AnonymousAPIUser,
653
    config: NotebooksConfig,
654
    server_name: str,
655
    remote_provider_id: str,
656
    git_providers: list[GitProvider],
657
) -> ExtraSecret | None:
658
    """Returns the secret containing the configuration for the remote session controller."""
659
    if not user.is_authenticated or user.access_token is None or user.refresh_token is None:
×
660
        return None
×
661
    remote_provider = next(filter(lambda p: p.id == remote_provider_id, git_providers), None)
×
662
    if not remote_provider:
×
663
        return None
×
664
    renku_base_url = "https://" + config.sessions.ingress.host
×
665
    renku_base_url = renku_base_url.rstrip("/")
×
666
    renku_auth_token_uri = f"{renku_base_url}/auth/realms/{config.keycloak_realm}/protocol/openid-connect/token"
×
667
    secret_data = {
×
668
        "RSC_AUTH_KIND": "renku",
669
        "RSC_AUTH_TOKEN_URI": remote_provider.access_token_url,
670
        "RSC_AUTH_RENKU_ACCESS_TOKEN": user.access_token,
671
        "RSC_AUTH_RENKU_REFRESH_TOKEN": user.refresh_token,
672
        "RSC_AUTH_RENKU_TOKEN_URI": renku_auth_token_uri,
673
        "RSC_AUTH_RENKU_CLIENT_ID": config.sessions.git_proxy.renku_client_id,
674
        "RSC_AUTH_RENKU_CLIENT_SECRET": config.sessions.git_proxy.renku_client_secret,
675
    }
676
    secret_name = f"{server_name}-remote-secret"
×
677
    secret = V1Secret(metadata=V1ObjectMeta(name=secret_name), string_data=secret_data)
×
678
    return ExtraSecret(secret)
×
679

680

681
def get_remote_env(
2✔
682
    remote: RemoteConfigurationFirecrest,
683
) -> list[SessionEnvItem]:
684
    """Returns env variables used for remote sessions."""
685
    env = [
×
686
        SessionEnvItem(name="RSC_REMOTE_KIND", value=remote.kind.value),
687
        SessionEnvItem(name="RSC_FIRECREST_API_URL", value=remote.api_url),
688
        SessionEnvItem(name="RSC_FIRECREST_SYSTEM_NAME", value=remote.system_name),
689
    ]
690
    if remote.partition:
×
691
        env.append(SessionEnvItem(name="RSC_FIRECREST_PARTITION", value=remote.partition))
×
692
    return env
×
693

694

695
async def start_session(
2✔
696
    request: Request,
697
    launch_request: SessionLaunchRequest,
698
    user: AnonymousAPIUser | AuthenticatedAPIUser,
699
    internal_gitlab_user: APIUser,
700
    nb_config: NotebooksConfig,
701
    git_provider_helper: GitProviderHelperProto,
702
    cluster_repo: ClusterRepository,
703
    data_connector_secret_repo: DataConnectorSecretRepository,
704
    project_repo: ProjectRepository,
705
    project_session_secret_repo: ProjectSessionSecretRepository,
706
    rp_repo: ResourcePoolRepository,
707
    session_repo: SessionRepository,
708
    user_repo: UserRepo,
709
    metrics: MetricsService,
710
    connected_svcs_repo: ConnectedServicesRepository,
711
) -> tuple[AmaltheaSessionV1Alpha1, bool]:
712
    """Start an Amalthea session.
713

714
    Returns a tuple where the first item is an instance of an Amalthea session
715
    and the second item is a boolean set to true iff a new session was created.
716
    """
717
    launcher = await session_repo.get_launcher(user=user, launcher_id=launch_request.launcher_id)
1✔
718
    launcher_id = launcher.id
1✔
719
    project = await project_repo.get_project(user=user, project_id=launcher.project_id)
1✔
720

721
    # Determine resource_class_id: the class can be overwritten at the user's request
722
    resource_class_id = launch_request.resource_class_id or launcher.resource_class_id
1✔
723

724
    cluster = await nb_config.k8s_v2_client.cluster_by_class_id(resource_class_id, user)
1✔
725

726
    server_name = renku_2_make_server_name(
1✔
727
        user=user, project_id=str(launcher.project_id), launcher_id=str(launcher_id), cluster_id=str(cluster.id)
728
    )
729
    existing_session = await nb_config.k8s_v2_client.get_session(name=server_name, safe_username=user.id)
1✔
730
    if existing_session is not None and existing_session.spec is not None:
1✔
731
        return existing_session, False
×
732

733
    # Fully determine the resource pool and resource class
734
    if resource_class_id is None:
1✔
735
        resource_pool = await rp_repo.get_default_resource_pool()
×
736
        resource_class = resource_pool.get_default_resource_class()
×
737
        if not resource_class and len(resource_pool.classes) > 0:
×
738
            resource_class = resource_pool.classes[0]
×
739
        if not resource_class or not resource_class.id:
×
740
            raise errors.ProgrammingError(message="Cannot find any resource classes in the default pool.")
×
741
        resource_class_id = resource_class.id
×
742
    else:
743
        resource_pool = await rp_repo.get_resource_pool_from_class(user, resource_class_id)
1✔
744
        resource_class = resource_pool.get_resource_class(resource_class_id)
1✔
745
        if not resource_class or not resource_class.id:
1✔
746
            raise errors.MissingResourceError(message=f"The resource class with ID {resource_class_id} does not exist.")
×
747
    await nb_config.crc_validator.validate_class_storage(user, resource_class.id, launch_request.disk_storage)
1✔
748
    disk_storage = launch_request.disk_storage or resource_class.default_storage
1✔
749

750
    # Determine session location
751
    session_location = SessionLocation.remote if resource_pool.remote else SessionLocation.local
1✔
752
    if session_location == SessionLocation.remote and not user.is_authenticated:
1✔
753
        raise errors.ValidationError(message="Anonymous users cannot start remote sessions.")
×
754

755
    environment = launcher.environment
1✔
756
    image = environment.container_image
1✔
757
    work_dir = environment.working_directory
1✔
758
    if not work_dir:
1✔
759
        image_workdir = await core.docker_image_workdir(nb_config, environment.container_image, internal_gitlab_user)
1✔
760
        work_dir_fallback = PurePosixPath("/home/jovyan")
1✔
761
        work_dir = image_workdir or work_dir_fallback
1✔
762
    storage_mount_fallback = work_dir / "work"
1✔
763
    storage_mount = launcher.environment.mount_directory or storage_mount_fallback
1✔
764
    secrets_mount_directory = storage_mount / project.secrets_mount_directory
1✔
765
    session_secrets = await project_session_secret_repo.get_all_session_secrets_from_project(
1✔
766
        user=user, project_id=project.id
767
    )
768
    data_connectors_stream = data_connector_secret_repo.get_data_connectors_with_secrets(user, project.id)
1✔
769
    git_providers = await git_provider_helper.get_providers(user=user)
1✔
770
    repositories = repositories_from_project(project, git_providers)
1✔
771

772
    # User secrets
773
    session_extras = SessionExtraResources()
1✔
774
    session_extras = session_extras.concat(
1✔
775
        user_secrets_extras(
776
            user=user,
777
            config=nb_config,
778
            secrets_mount_directory=secrets_mount_directory.as_posix(),
779
            k8s_secret_name=f"{server_name}-secrets",
780
            session_secrets=session_secrets,
781
        )
782
    )
783

784
    # Data connectors
785
    session_extras = session_extras.concat(
1✔
786
        await get_data_sources(
787
            nb_config=nb_config,
788
            server_name=server_name,
789
            user=user,
790
            data_connectors_stream=data_connectors_stream,
791
            work_dir=work_dir,
792
            data_connectors_overrides=launch_request.data_connectors_overrides or [],
793
            user_repo=user_repo,
794
        )
795
    )
796

797
    # More init containers
798
    session_extras = session_extras.concat(
1✔
799
        await get_extra_init_containers(
800
            nb_config,
801
            user,
802
            repositories,
803
            git_providers,
804
            storage_mount,
805
            work_dir,
806
            uid=environment.uid,
807
            gid=environment.gid,
808
        )
809
    )
810

811
    # Extra containers
812
    session_extras = session_extras.concat(await get_extra_containers(nb_config, user, repositories, git_providers))
1✔
813

814
    # Cluster settings (ingress, storage class, etc)
815
    cluster_settings: ClusterSettings
816
    try:
1✔
817
        cluster_settings = await cluster_repo.select(cluster.id)
1✔
818
    except errors.MissingResourceError:
1✔
819
        # Fallback to global, main cluster parameters
820
        cluster_settings = nb_config.local_cluster_settings()
1✔
821

822
    (
1✔
823
        base_server_path,
824
        base_server_url,
825
        base_server_https_url,
826
        host,
827
        tls_secret,
828
        ingress_class_name,
829
        ingress_annotations,
830
    ) = cluster_settings.get_ingress_parameters(server_name)
831
    storage_class = cluster_settings.get_storage_class()
1✔
832
    service_account_name = cluster_settings.service_account_name
1✔
833

834
    ui_path = f"{base_server_path}/{environment.default_url.lstrip('/')}"
1✔
835

836
    ingress = Ingress(
1✔
837
        host=host,
838
        ingressClassName=ingress_class_name,
839
        annotations=ingress_annotations,
840
        tlsSecret=tls_secret,
841
        pathPrefix=base_server_path,
842
    )
843

844
    # Annotations
845
    annotations: dict[str, str] = {
1✔
846
        "renku.io/project_id": str(launcher.project_id),
847
        "renku.io/launcher_id": str(launcher_id),
848
        "renku.io/resource_class_id": str(resource_class_id),
849
    }
850

851
    # Authentication
852
    if isinstance(user, AuthenticatedAPIUser):
1✔
853
        auth_secret = await get_auth_secret_authenticated(
×
854
            nb_config, user, server_name, base_server_url, base_server_https_url, base_server_path
855
        )
856
    else:
857
        auth_secret = get_auth_secret_anonymous(nb_config, server_name, request)
1✔
858
    session_extras = session_extras.concat(
1✔
859
        SessionExtraResources(
860
            secrets=[auth_secret],
861
            volumes=[auth_secret.volume] if auth_secret.volume else [],
862
        )
863
    )
864
    authn_extra_volume_mounts: list[ExtraVolumeMount] = []
1✔
865
    if auth_secret.volume_mount:
1✔
866
        authn_extra_volume_mounts.append(auth_secret.volume_mount)
×
867

868
    cert_vol_mounts = init_containers.certificates_volume_mounts(nb_config)
1✔
869
    if cert_vol_mounts:
1✔
870
        authn_extra_volume_mounts.extend(cert_vol_mounts)
1✔
871

872
    image_secret = await get_image_pull_secret(
1✔
873
        image=image,
874
        server_name=server_name,
875
        nb_config=nb_config,
876
        user=user,
877
        internal_gitlab_user=internal_gitlab_user,
878
        connected_svcs_repo=connected_svcs_repo,
879
    )
880
    if image_secret:
1✔
881
        session_extras = session_extras.concat(SessionExtraResources(secrets=[image_secret]))
×
882

883
    # Remote session configuration
884
    remote_secret = None
1✔
885
    if session_location == SessionLocation.remote:
1✔
886
        assert resource_pool.remote is not None
×
887
        if resource_pool.remote.provider_id is None:
×
888
            raise errors.ProgrammingError(
×
889
                message=f"The resource pool {resource_pool.id} configuration is not valid (missing field 'remote_provider_id')."  # noqa E501
890
            )
891
        remote_secret = get_remote_secret(
×
892
            user=user,
893
            config=nb_config,
894
            server_name=server_name,
895
            remote_provider_id=resource_pool.remote.provider_id,
896
            git_providers=git_providers,
897
        )
898
    if remote_secret is not None:
1✔
899
        session_extras = session_extras.concat(SessionExtraResources(secrets=[remote_secret]))
×
900

901
    # Raise an error if there are invalid environment variables in the request body
902
    verify_launcher_env_variable_overrides(launcher, launch_request)
1✔
903
    env = [
1✔
904
        SessionEnvItem(name="RENKU_BASE_URL_PATH", value=base_server_path),
905
        SessionEnvItem(name="RENKU_BASE_URL", value=base_server_url),
906
        SessionEnvItem(name="RENKU_MOUNT_DIR", value=storage_mount.as_posix()),
907
        SessionEnvItem(name="RENKU_SESSION", value="1"),
908
        SessionEnvItem(name="RENKU_SESSION_IP", value="0.0.0.0"),  # nosec B104
909
        SessionEnvItem(name="RENKU_SESSION_PORT", value=f"{environment.port}"),
910
        SessionEnvItem(name="RENKU_WORKING_DIR", value=work_dir.as_posix()),
911
        SessionEnvItem(name="RENKU_SECRETS_PATH", value=project.secrets_mount_directory.as_posix()),
912
        SessionEnvItem(name="RENKU_PROJECT_ID", value=str(project.id)),
913
        SessionEnvItem(name="RENKU_PROJECT_PATH", value=project.path.serialize()),
914
        SessionEnvItem(name="RENKU_LAUNCHER_ID", value=str(launcher.id)),
915
    ]
916
    if session_location == SessionLocation.remote:
1✔
917
        assert resource_pool.remote is not None
×
918
        env.extend(
×
919
            get_remote_env(
920
                remote=resource_pool.remote,
921
            )
922
        )
923
    launcher_env_variables = get_launcher_env_variables(launcher, launch_request)
1✔
924
    env.extend(launcher_env_variables)
1✔
925

926
    session = AmaltheaSessionV1Alpha1(
1✔
927
        metadata=Metadata(name=server_name, annotations=annotations),
928
        spec=AmaltheaSessionSpec(
929
            location=session_location,
930
            imagePullSecrets=[ImagePullSecret(name=image_secret.name, adopt=True)] if image_secret else [],
931
            codeRepositories=[],
932
            hibernated=False,
933
            reconcileStrategy=ReconcileStrategy.whenFailedOrHibernated,
934
            priorityClassName=resource_class.quota,
935
            session=Session(
936
                image=image,
937
                imagePullPolicy=ImagePullPolicy.Always,
938
                urlPath=ui_path,
939
                port=environment.port,
940
                storage=Storage(
941
                    className=storage_class,
942
                    size=SizeStr(str(disk_storage) + "G"),
943
                    mountPath=storage_mount.as_posix(),
944
                ),
945
                workingDir=work_dir.as_posix(),
946
                runAsUser=environment.uid,
947
                runAsGroup=environment.gid,
948
                resources=resources_from_resource_class(resource_class),
949
                extraVolumeMounts=session_extras.volume_mounts,
950
                command=environment.command,
951
                args=environment.args,
952
                shmSize=ShmSizeStr("1G"),
953
                stripURLPath=environment.strip_path_prefix,
954
                env=env,
955
                remoteSecretRef=remote_secret.ref() if remote_secret else None,
956
            ),
957
            ingress=ingress,
958
            extraContainers=session_extras.containers,
959
            initContainers=session_extras.init_containers,
960
            extraVolumes=session_extras.volumes,
961
            culling=get_culling(user, resource_pool, nb_config),
962
            authentication=Authentication(
963
                enabled=True,
964
                type=AuthenticationType.oauth2proxy
965
                if isinstance(user, AuthenticatedAPIUser)
966
                else AuthenticationType.token,
967
                secretRef=auth_secret.key_ref("auth"),
968
                extraVolumeMounts=authn_extra_volume_mounts,
969
            ),
970
            dataSources=session_extras.data_sources,
971
            tolerations=tolerations_from_resource_class(resource_class, nb_config.sessions.tolerations_model),
972
            affinity=node_affinity_from_resource_class(resource_class, nb_config.sessions.affinity_model),
973
            serviceAccountName=service_account_name,
974
        ),
975
    )
976
    secrets_to_create = session_extras.secrets or []
1✔
977
    for s in secrets_to_create:
1✔
978
        await nb_config.k8s_v2_client.create_secret(K8sSecret.from_v1_secret(s.secret, cluster))
1✔
979
    try:
1✔
980
        session = await nb_config.k8s_v2_client.create_session(session, user)
1✔
981
    except Exception as err:
×
982
        for s in secrets_to_create:
×
983
            await nb_config.k8s_v2_client.delete_secret(K8sSecret.from_v1_secret(s.secret, cluster))
×
984
        raise errors.ProgrammingError(message="Could not start the amalthea session") from err
×
985
    else:
986
        try:
1✔
987
            await request_session_secret_creation(user, nb_config, session, session_secrets)
1✔
988
            data_connector_secrets = session_extras.data_connector_secrets or dict()
1✔
989
            await request_dc_secret_creation(user, nb_config, session, data_connector_secrets)
1✔
990
        except Exception:
×
991
            await nb_config.k8s_v2_client.delete_session(server_name, user.id)
×
992
            raise
×
993

994
    await metrics.user_requested_session_launch(
1✔
995
        user=user,
996
        metadata={
997
            "cpu": int(resource_class.cpu * 1000),
998
            "memory": resource_class.memory,
999
            "gpu": resource_class.gpu,
1000
            "storage": disk_storage,
1001
            "resource_class_id": resource_class.id,
1002
            "resource_pool_id": resource_pool.id or "",
1003
            "resource_class_name": f"{resource_pool.name}.{resource_class.name}",
1004
            "session_id": server_name,
1005
        },
1006
    )
1007
    return session, True
1✔
1008

1009

1010
async def patch_session(
2✔
1011
    body: apispec.SessionPatchRequest,
1012
    session_id: str,
1013
    user: AnonymousAPIUser | AuthenticatedAPIUser,
1014
    internal_gitlab_user: APIUser,
1015
    nb_config: NotebooksConfig,
1016
    git_provider_helper: GitProviderHelperProto,
1017
    project_repo: ProjectRepository,
1018
    project_session_secret_repo: ProjectSessionSecretRepository,
1019
    rp_repo: ResourcePoolRepository,
1020
    session_repo: SessionRepository,
1021
    connected_svcs_repo: ConnectedServicesRepository,
1022
    metrics: MetricsService,
1023
) -> AmaltheaSessionV1Alpha1:
1024
    """Patch an Amalthea session."""
1025
    session = await nb_config.k8s_v2_client.get_session(session_id, user.id)
1✔
1026
    if session is None:
1✔
1027
        raise errors.MissingResourceError(message=f"The session with ID {session_id} does not exist")
1✔
1028
    if session.spec is None:
1✔
1029
        raise errors.ProgrammingError(
×
1030
            message=f"The session {session_id} being patched is missing the expected 'spec' field.", quiet=True
1031
        )
1032
    cluster = await nb_config.k8s_v2_client.cluster_by_class_id(session.resource_class_id(), user)
1✔
1033

1034
    patch = AmaltheaSessionV1Alpha1Patch(spec=AmaltheaSessionV1Alpha1SpecPatch())
1✔
1035
    is_getting_hibernated: bool = False
1✔
1036

1037
    # Hibernation
1038
    # TODO: Some patching should only be done when the session is in some states to avoid inadvertent restarts
1039
    # Refresh tokens for git proxy
1040
    if (
1✔
1041
        body.state is not None
1042
        and body.state.value.lower() == State.Hibernated.value.lower()
1043
        and body.state.value.lower() != session.status.state.value.lower()
1044
    ):
1045
        # Session is being hibernated
1046
        patch.spec.hibernated = True
1✔
1047
        is_getting_hibernated = True
1✔
1048
    elif (
1✔
1049
        body.state is not None
1050
        and body.state.value.lower() == State.Running.value.lower()
1051
        and session.status.state.value.lower() != body.state.value.lower()
1052
    ):
1053
        # Session is being resumed
1054
        patch.spec.hibernated = False
×
1055
        await metrics.user_requested_session_resume(user, metadata={"session_id": session_id})
×
1056

1057
    # Resource class
1058
    if body.resource_class_id is not None:
1✔
1059
        new_cluster = await nb_config.k8s_v2_client.cluster_by_class_id(body.resource_class_id, user)
1✔
1060
        if new_cluster.id != cluster.id:
1✔
1061
            raise errors.ValidationError(
×
1062
                message=(
1063
                    f"The requested resource class {body.resource_class_id} is not in the "
1064
                    f"same cluster {cluster.id} as the current resource class {session.resource_class_id()}."
1065
                )
1066
            )
1067
        rp = await rp_repo.get_resource_pool_from_class(user, body.resource_class_id)
1✔
1068
        rc = rp.get_resource_class(body.resource_class_id)
1✔
1069
        if not rc:
1✔
1070
            raise errors.MissingResourceError(
×
1071
                message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
1072
            )
1073
        if not patch.metadata:
1✔
1074
            patch.metadata = AmaltheaSessionV1Alpha1MetadataPatch()
1✔
1075
        # Patch the resource pool and class ID in the annotations
1076
        patch.metadata.annotations = {"renku.io/resource_pool_id": str(rp.id)}
1✔
1077
        patch.metadata.annotations = {"renku.io/resource_class_id": str(body.resource_class_id)}
1✔
1078
        if not patch.spec.session:
1✔
1079
            patch.spec.session = AmaltheaSessionV1Alpha1SpecSessionPatch()
1✔
1080
        patch.spec.session.resources = resources_patch_from_resource_class(rc)
1✔
1081
        # Tolerations
1082
        tolerations = tolerations_from_resource_class(rc, nb_config.sessions.tolerations_model)
1✔
1083
        patch.spec.tolerations = tolerations
1✔
1084
        # Affinities
1085
        patch.spec.affinity = node_affinity_patch_from_resource_class(rc, nb_config.sessions.affinity_model)
1✔
1086
        # Priority class (if a quota is being used)
1087
        if rc.quota is None:
1✔
NEW
1088
            patch.spec.priorityClassName = RESET
×
1089
        patch.spec.culling = get_culling_patch(user, rp, nb_config)
1✔
1090
        # Service account name
1091
        if rp.cluster is not None:
1✔
NEW
1092
            patch.spec.service_account_name = (
×
1093
                rp.cluster.service_account_name if rp.cluster.service_account_name is not None else RESET
1094
            )
1095

1096
    # If the session is being hibernated we do not need to patch anything else that is
1097
    # not specifically called for in the request body, we can refresh things when the user resumes.
1098
    if is_getting_hibernated:
1✔
1099
        return await nb_config.k8s_v2_client.patch_session(session_id, user.id, patch.to_rfc7386())
1✔
1100

1101
    server_name = session.metadata.name
1✔
1102
    launcher = await session_repo.get_launcher(user, session.launcher_id)
1✔
1103
    project = await project_repo.get_project(user=user, project_id=session.project_id)
1✔
1104
    environment = launcher.environment
1✔
1105
    work_dir = environment.working_directory
1✔
1106
    if not work_dir:
1✔
1107
        image_workdir = await core.docker_image_workdir(nb_config, environment.container_image, internal_gitlab_user)
1✔
1108
        work_dir_fallback = PurePosixPath("/home/jovyan")
1✔
1109
        work_dir = image_workdir or work_dir_fallback
1✔
1110
    storage_mount_fallback = work_dir / "work"
1✔
1111
    storage_mount = launcher.environment.mount_directory or storage_mount_fallback
1✔
1112
    secrets_mount_directory = storage_mount / project.secrets_mount_directory
1✔
1113
    session_secrets = await project_session_secret_repo.get_all_session_secrets_from_project(
1✔
1114
        user=user, project_id=project.id
1115
    )
1116
    git_providers = await git_provider_helper.get_providers(user=user)
1✔
1117
    repositories = repositories_from_project(project, git_providers)
1✔
1118

1119
    # User secrets
1120
    session_extras = SessionExtraResources()
1✔
1121
    session_extras = session_extras.concat(
1✔
1122
        user_secrets_extras(
1123
            user=user,
1124
            config=nb_config,
1125
            secrets_mount_directory=secrets_mount_directory.as_posix(),
1126
            k8s_secret_name=f"{server_name}-secrets",
1127
            session_secrets=session_secrets,
1128
        )
1129
    )
1130

1131
    # Data connectors: skip
1132
    # TODO: How can we patch data connectors? Should we even patch them?
1133
    # TODO: The fact that `start_session()` accepts overrides for data connectors
1134
    # TODO: but that we do not save these overrides (e.g. as annotations) means that
1135
    # TODO: we cannot patch data connectors upon resume.
1136
    # TODO: If we did, we would lose the user's provided overrides (e.g. unsaved credentials).
1137

1138
    # More init containers
1139
    session_extras = session_extras.concat(
1✔
1140
        await get_extra_init_containers(
1141
            nb_config,
1142
            user,
1143
            repositories,
1144
            git_providers,
1145
            storage_mount,
1146
            work_dir,
1147
            uid=environment.uid,
1148
            gid=environment.gid,
1149
        )
1150
    )
1151

1152
    # Extra containers
1153
    session_extras = session_extras.concat(await get_extra_containers(nb_config, user, repositories, git_providers))
1✔
1154

1155
    # Patching the image pull secret
1156
    image = session.spec.session.image
1✔
1157
    image_pull_secret = await get_image_pull_secret(
1✔
1158
        image=image,
1159
        server_name=server_name,
1160
        nb_config=nb_config,
1161
        connected_svcs_repo=connected_svcs_repo,
1162
        user=user,
1163
        internal_gitlab_user=internal_gitlab_user,
1164
    )
1165
    if image_pull_secret:
1✔
1166
        session_extras.concat(SessionExtraResources(secrets=[image_pull_secret]))
×
1167
        patch.spec.imagePullSecrets = [ImagePullSecret(name=image_pull_secret.name, adopt=image_pull_secret.adopt)]
×
1168
    else:
1169
        patch.spec.imagePullSecrets = RESET
1✔
1170

1171
    # Construct session patch
1172
    patch.spec.extraContainers = _make_patch_spec_list(
1✔
1173
        existing=session.spec.extraContainers or [], updated=session_extras.containers
1174
    )
1175
    patch.spec.initContainers = _make_patch_spec_list(
1✔
1176
        existing=session.spec.initContainers or [], updated=session_extras.init_containers
1177
    )
1178
    patch.spec.extraVolumes = _make_patch_spec_list(
1✔
1179
        existing=session.spec.extraVolumes or [], updated=session_extras.volumes
1180
    )
1181
    if not patch.spec.session:
1✔
1182
        patch.spec.session = AmaltheaSessionV1Alpha1SpecSessionPatch()
×
1183
    patch.spec.session.extraVolumeMounts = _make_patch_spec_list(
1✔
1184
        existing=session.spec.session.extraVolumeMounts or [], updated=session_extras.volume_mounts
1185
    )
1186

1187
    secrets_to_create = session_extras.secrets or []
1✔
1188
    for s in secrets_to_create:
1✔
1189
        await nb_config.k8s_v2_client.create_secret(K8sSecret.from_v1_secret(s.secret, cluster))
×
1190

1191
    patch_serialized = patch.to_rfc7386()
1✔
1192
    if len(patch_serialized) == 0:
1✔
1193
        return session
×
1194

1195
    return await nb_config.k8s_v2_client.patch_session(session_id, user.id, patch_serialized)
1✔
1196

1197

1198
def _deduplicate_target_paths(dcs: dict[str, RCloneStorage]) -> dict[str, RCloneStorage]:
2✔
1199
    """Ensures that the target paths for all storages are unique.
1200

1201
    This method will attempt to de-duplicate the target_path for all items passed in,
1202
    and raise an error if it fails to generate unique target_path.
1203
    """
1204
    result_dcs: dict[str, RCloneStorage] = {}
1✔
1205
    mount_folders: dict[str, list[str]] = {}
1✔
1206

1207
    def _find_mount_folder(dc: RCloneStorage) -> str:
1✔
1208
        mount_folder = dc.mount_folder
×
1209
        if mount_folder not in mount_folders:
×
1210
            return mount_folder
×
1211
        # 1. Try with a "-1", "-2", etc. suffix
1212
        mount_folder_try = f"{mount_folder}-{len(mount_folders[mount_folder])}"
×
1213
        if mount_folder_try not in mount_folders:
×
1214
            return mount_folder_try
×
1215
        # 2. Try with a random suffix
1216
        suffix = "".join([random.choice(string.ascii_lowercase + string.digits) for _ in range(4)])  # nosec B311
×
1217
        mount_folder_try = f"{mount_folder}-{suffix}"
×
1218
        if mount_folder_try not in mount_folders:
×
1219
            return mount_folder_try
×
1220
        raise errors.ValidationError(
×
1221
            message=f"Could not start session because two or more data connectors ({', '.join(mount_folders[mount_folder])}) share the same mount point '{mount_folder}'"  # noqa E501
1222
        )
1223

1224
    for dc_id, dc in dcs.items():
1✔
1225
        original_mount_folder = dc.mount_folder
×
1226
        new_mount_folder = _find_mount_folder(dc)
×
1227
        # Keep track of the original mount folder here
1228
        if new_mount_folder != original_mount_folder:
×
1229
            logger.warning(f"Re-assigning data connector {dc_id} to mount point '{new_mount_folder}'")
×
1230
            dc_ids = mount_folders.get(original_mount_folder, [])
×
1231
            dc_ids.append(dc_id)
×
1232
            mount_folders[original_mount_folder] = dc_ids
×
1233
        # Keep track of the assigned mount folder here
1234
        dc_ids = mount_folders.get(new_mount_folder, [])
×
1235
        dc_ids.append(dc_id)
×
1236
        mount_folders[new_mount_folder] = dc_ids
×
1237
        result_dcs[dc_id] = dc.with_override(
×
1238
            override=SessionDataConnectorOverride(
1239
                skip=False,
1240
                data_connector_id=ULID.from_str(dc_id),
1241
                target_path=new_mount_folder,
1242
                configuration=None,
1243
                source_path=None,
1244
                readonly=None,
1245
            )
1246
        )
1247

1248
    return result_dcs
1✔
1249

1250

1251
class _NamedResource(Protocol):
2✔
1252
    """Represents a resource with a name."""
1253

1254
    name: str
2✔
1255

1256

1257
_T = TypeVar("_T", bound=_NamedResource)
2✔
1258

1259

1260
def _make_patch_spec_list(existing: Sequence[_T], updated: Sequence[_T]) -> list[_T] | None:
2✔
1261
    """Merges updated into existing by upserting items identified by their name.
1262

1263
    This method is used to construct session patches, merging session resources by name (containers, volumes, etc.).
1264
    """
1265
    patch_list = None
1✔
1266
    if updated:
1✔
1267
        patch_list = list(existing)
1✔
1268
        upsert_list = list(updated)
1✔
1269
        for upsert_item in upsert_list:
1✔
1270
            # Find out if the upsert_item needs to be added or updated
1271
            # found = next(enumerate(filter(lambda item: item.name == upsert_item.name, patch_list)), None)
1272
            found = next(filter(lambda t: t[1].name == upsert_item.name, enumerate(patch_list)), None)
1✔
1273
            if found is not None:
1✔
1274
                idx, _ = found
1✔
1275
                patch_list[idx] = upsert_item
1✔
1276
            else:
1277
                patch_list.append(upsert_item)
1✔
1278
    return patch_list
1✔
1279

1280

1281
def validate_session_post_request(body: apispec.SessionPostRequest) -> SessionLaunchRequest:
2✔
1282
    """Validate a session launch request."""
1283
    data_connectors_overrides = (
1✔
1284
        [
1285
            SessionDataConnectorOverride(
1286
                skip=dc.skip,
1287
                data_connector_id=ULID.from_str(dc.data_connector_id),
1288
                configuration=dc.configuration,
1289
                source_path=dc.source_path,
1290
                target_path=dc.target_path,
1291
                readonly=dc.readonly,
1292
            )
1293
            for dc in body.data_connectors_overrides
1294
        ]
1295
        if body.data_connectors_overrides
1296
        else None
1297
    )
1298
    env_variable_overrides = (
1✔
1299
        [SessionEnvVar(name=ev.name, value=ev.value) for ev in body.env_variable_overrides]
1300
        if body.env_variable_overrides
1301
        else None
1302
    )
1303
    return SessionLaunchRequest(
1✔
1304
        launcher_id=ULID.from_str(body.launcher_id),
1305
        disk_storage=body.disk_storage,
1306
        resource_class_id=body.resource_class_id,
1307
        data_connectors_overrides=data_connectors_overrides,
1308
        env_variable_overrides=env_variable_overrides,
1309
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc