• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 20339056754

18 Dec 2025 01:46PM UTC coverage: 86.009%. First build
20339056754

Pull #1128

github

web-flow
Merge fe560b56c into c5f960729
Pull Request #1128: feat: set lastInteraction time via session patch and return willHibernateAt

84 of 96 new or added lines in 11 files covered. (87.5%)

24037 of 27947 relevant lines covered (86.01%)

1.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.43
/components/renku_data_services/notebooks/core_sessions.py
1
"""A selection of core functions for AmaltheaSessions."""
2

3
import base64
2✔
4
import json
2✔
5
import os
2✔
6
import random
2✔
7
import string
2✔
8
from collections.abc import AsyncIterator, Mapping, Sequence
2✔
9
from datetime import UTC, datetime, timedelta
2✔
10
from pathlib import PurePosixPath
2✔
11
from typing import Protocol, TypeVar, cast
2✔
12
from urllib.parse import urljoin, urlparse
2✔
13

14
import httpx
2✔
15
from kubernetes.client import V1ObjectMeta, V1Secret
2✔
16
from sanic import Request
2✔
17
from toml import dumps
2✔
18
from ulid import ULID
2✔
19
from yaml import safe_dump
2✔
20

21
from renku_data_services.app_config import logging
2✔
22
from renku_data_services.base_models import RESET, AnonymousAPIUser, APIUser, AuthenticatedAPIUser, ResetType
2✔
23
from renku_data_services.base_models.metrics import MetricsService
2✔
24
from renku_data_services.crc.db import ClusterRepository, ResourcePoolRepository
2✔
25
from renku_data_services.crc.models import (
2✔
26
    ClusterSettings,
27
    GpuKind,
28
    RemoteConfigurationFirecrest,
29
    ResourceClass,
30
    ResourcePool,
31
)
32
from renku_data_services.data_connectors.db import (
2✔
33
    DataConnectorSecretRepository,
34
)
35
from renku_data_services.data_connectors.models import DataConnectorSecret, DataConnectorWithSecrets
2✔
36
from renku_data_services.errors import ValidationError, errors
2✔
37
from renku_data_services.k8s.models import K8sSecret, sanitizer
2✔
38
from renku_data_services.notebooks import apispec, core
2✔
39
from renku_data_services.notebooks.api.amalthea_patches import git_proxy, init_containers
2✔
40
from renku_data_services.notebooks.api.amalthea_patches.init_containers import user_secrets_extras
2✔
41
from renku_data_services.notebooks.api.classes.image import Image
2✔
42
from renku_data_services.notebooks.api.classes.repository import GitProvider, Repository
2✔
43
from renku_data_services.notebooks.api.schemas.cloud_storage import RCloneStorage
2✔
44
from renku_data_services.notebooks.config import GitProviderHelperProto, NotebooksConfig
2✔
45
from renku_data_services.notebooks.crs import (
2✔
46
    AmaltheaSessionSpec,
47
    AmaltheaSessionV1Alpha1,
48
    AmaltheaSessionV1Alpha1MetadataPatch,
49
    AmaltheaSessionV1Alpha1Patch,
50
    AmaltheaSessionV1Alpha1SpecPatch,
51
    AmaltheaSessionV1Alpha1SpecSessionPatch,
52
    Authentication,
53
    AuthenticationType,
54
    Culling,
55
    CullingPatch,
56
    DataSource,
57
    ExtraContainer,
58
    ExtraVolume,
59
    ExtraVolumeMount,
60
    ImagePullPolicy,
61
    ImagePullSecret,
62
    Ingress,
63
    InitContainer,
64
    Limits,
65
    LimitsStr,
66
    Metadata,
67
    ReconcileStrategy,
68
    Requests,
69
    RequestsStr,
70
    Resources,
71
    ResourcesPatch,
72
    SecretAsVolume,
73
    SecretAsVolumeItem,
74
    Session,
75
    SessionEnvItem,
76
    SessionLocation,
77
    ShmSizeStr,
78
    SizeStr,
79
    State,
80
    Storage,
81
)
82
from renku_data_services.notebooks.image_check import ImageCheckRepository
2✔
83
from renku_data_services.notebooks.models import (
2✔
84
    ExtraSecret,
85
    SessionDataConnectorOverride,
86
    SessionEnvVar,
87
    SessionExtraResources,
88
    SessionLaunchRequest,
89
)
90
from renku_data_services.notebooks.util.kubernetes_ import (
2✔
91
    renku_2_make_server_name,
92
)
93
from renku_data_services.notebooks.utils import (
2✔
94
    node_affinity_from_resource_class,
95
    node_affinity_patch_from_resource_class,
96
    tolerations_from_resource_class,
97
)
98
from renku_data_services.project.db import ProjectRepository, ProjectSessionSecretRepository
2✔
99
from renku_data_services.project.models import Project, SessionSecret
2✔
100
from renku_data_services.session.db import SessionRepository
2✔
101
from renku_data_services.session.models import SessionLauncher
2✔
102
from renku_data_services.users.db import UserRepo
2✔
103
from renku_data_services.utils.cryptography import get_encryption_key
2✔
104

105
logger = logging.getLogger(__name__)
2✔
106

107

108
async def get_extra_init_containers(
2✔
109
    nb_config: NotebooksConfig,
110
    user: AnonymousAPIUser | AuthenticatedAPIUser,
111
    repositories: list[Repository],
112
    git_providers: list[GitProvider],
113
    storage_mount: PurePosixPath,
114
    work_dir: PurePosixPath,
115
    uid: int = 1000,
116
    gid: int = 1000,
117
) -> SessionExtraResources:
118
    """Get all extra init containers that should be added to an amalthea session."""
119
    # TODO: The above statement is not correct: the init container for user secrets is not included here
120
    cert_init, cert_vols = init_containers.certificates_container(nb_config)
1✔
121
    session_init_containers = [InitContainer.model_validate(sanitizer(cert_init))]
1✔
122
    extra_volumes = [ExtraVolume.model_validate(sanitizer(volume)) for volume in cert_vols]
1✔
123
    git_clone = await init_containers.git_clone_container_v2(
1✔
124
        user=user,
125
        config=nb_config,
126
        repositories=repositories,
127
        git_providers=git_providers,
128
        workspace_mount_path=storage_mount,
129
        work_dir=work_dir,
130
        uid=uid,
131
        gid=gid,
132
    )
133
    if git_clone is not None:
1✔
134
        session_init_containers.append(InitContainer.model_validate(git_clone))
×
135
    return SessionExtraResources(
1✔
136
        init_containers=session_init_containers,
137
        volumes=extra_volumes,
138
    )
139

140

141
async def get_extra_containers(
2✔
142
    nb_config: NotebooksConfig,
143
    user: AnonymousAPIUser | AuthenticatedAPIUser,
144
    repositories: list[Repository],
145
    git_providers: list[GitProvider],
146
) -> SessionExtraResources:
147
    """Get the extra containers added to amalthea sessions."""
148
    conts: list[ExtraContainer] = []
1✔
149
    git_proxy_container = await git_proxy.main_container(
1✔
150
        user=user, config=nb_config, repositories=repositories, git_providers=git_providers
151
    )
152
    if git_proxy_container:
1✔
153
        conts.append(ExtraContainer.model_validate(sanitizer(git_proxy_container)))
×
154
    return SessionExtraResources(containers=conts)
1✔
155

156

157
async def get_auth_secret_authenticated(
2✔
158
    nb_config: NotebooksConfig,
159
    user: AuthenticatedAPIUser,
160
    server_name: str,
161
    base_server_url: str,
162
    base_server_https_url: str,
163
    base_server_path: str,
164
) -> ExtraSecret:
165
    """Get the extra secrets that need to be added to the session for an authenticated user."""
166
    secret_data = {}
×
167

168
    parsed_proxy_url = urlparse(urljoin(base_server_url + "/", "oauth2"))
×
169
    vol = ExtraVolume(
×
170
        name="renku-authorized-emails",
171
        secret=SecretAsVolume(
172
            secretName=server_name,
173
            items=[SecretAsVolumeItem(key="authorized_emails", path="authorized_emails")],
174
        ),
175
    )
176
    secret_data["auth"] = dumps(
×
177
        {
178
            "provider": "oidc",
179
            "client_id": nb_config.sessions.oidc.client_id,
180
            "oidc_issuer_url": nb_config.sessions.oidc.issuer_url,
181
            "session_cookie_minimal": True,
182
            "skip_provider_button": True,
183
            # NOTE: If the redirect url is not HTTPS then some or identity providers will fail.
184
            "redirect_url": urljoin(base_server_https_url + "/", "oauth2/callback"),
185
            "cookie_path": base_server_path,
186
            "proxy_prefix": parsed_proxy_url.path,
187
            "authenticated_emails_file": "/authorized_emails",
188
            "client_secret": nb_config.sessions.oidc.client_secret,
189
            "cookie_secret": base64.urlsafe_b64encode(os.urandom(32)).decode(),
190
            "insecure_oidc_allow_unverified_email": nb_config.sessions.oidc.allow_unverified_email,
191
        }
192
    )
193
    secret_data["authorized_emails"] = user.email
×
194
    secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
×
195
    vol_mount = ExtraVolumeMount(
×
196
        name="renku-authorized-emails",
197
        mountPath="/authorized_emails",
198
        subPath="authorized_emails",
199
    )
200
    return ExtraSecret(secret, vol, vol_mount)
×
201

202

203
def get_auth_secret_anonymous(nb_config: NotebooksConfig, server_name: str, request: Request) -> ExtraSecret:
2✔
204
    """Get the extra secrets that need to be added to the session for an anonymous user."""
205
    # NOTE: We extract the session cookie value here in order to avoid creating a cookie.
206
    # The gateway encrypts and signs cookies so the user ID injected in the request headers does not
207
    # match the value of the session cookie.
208
    session_id = cast(str | None, request.cookies.get(nb_config.session_id_cookie_name))
1✔
209
    if not session_id:
1✔
210
        raise errors.UnauthorizedError(
×
211
            message=f"You have to have a renku session cookie at {nb_config.session_id_cookie_name} "
212
            "in order to launch an anonymous session."
213
        )
214
    # NOTE: Amalthea looks for the token value first in the cookie and then in the authorization header
215
    secret_data = {
1✔
216
        "auth": safe_dump(
217
            {
218
                "authproxy": {
219
                    "token": session_id,
220
                    "cookie_key": nb_config.session_id_cookie_name,
221
                    "verbose": True,
222
                }
223
            }
224
        )
225
    }
226
    secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
1✔
227
    return ExtraSecret(secret)
1✔
228

229

230
async def __get_gitlab_image_pull_secret(
2✔
231
    nb_config: NotebooksConfig, user: AuthenticatedAPIUser, image_pull_secret_name: str, access_token: str
232
) -> ExtraSecret:
233
    """Create a Kubernetes secret for private GitLab registry authentication."""
234

235
    k8s_namespace = await nb_config.k8s_client.namespace()
×
236

237
    registry_secret = {
×
238
        "auths": {
239
            nb_config.git.registry: {
240
                "Username": "oauth2",
241
                "Password": access_token,
242
                "Email": user.email,
243
            }
244
        }
245
    }
246
    registry_secret = json.dumps(registry_secret)
×
247

248
    secret_data = {".dockerconfigjson": registry_secret}
×
249
    secret = V1Secret(
×
250
        metadata=V1ObjectMeta(name=image_pull_secret_name, namespace=k8s_namespace),
251
        string_data=secret_data,
252
        type="kubernetes.io/dockerconfigjson",
253
    )
254

255
    return ExtraSecret(secret)
×
256

257

258
async def get_data_sources(
2✔
259
    nb_config: NotebooksConfig,
260
    user: AnonymousAPIUser | AuthenticatedAPIUser,
261
    server_name: str,
262
    data_connectors_stream: AsyncIterator[DataConnectorWithSecrets],
263
    work_dir: PurePosixPath,
264
    data_connectors_overrides: list[SessionDataConnectorOverride],
265
    user_repo: UserRepo,
266
) -> SessionExtraResources:
267
    """Generate cloud storage related resources."""
268
    data_sources: list[DataSource] = []
1✔
269
    secrets: list[ExtraSecret] = []
1✔
270
    dcs: dict[str, RCloneStorage] = {}
1✔
271
    dcs_secrets: dict[str, list[DataConnectorSecret]] = {}
1✔
272
    user_secret_key: str | None = None
1✔
273
    async for dc in data_connectors_stream:
1✔
274
        mount_folder = (
×
275
            dc.data_connector.storage.target_path
276
            if PurePosixPath(dc.data_connector.storage.target_path).is_absolute()
277
            else (work_dir / dc.data_connector.storage.target_path).as_posix()
278
        )
279
        dcs[str(dc.data_connector.id)] = RCloneStorage(
×
280
            source_path=dc.data_connector.storage.source_path,
281
            mount_folder=mount_folder,
282
            configuration=dc.data_connector.storage.configuration,
283
            readonly=dc.data_connector.storage.readonly,
284
            name=dc.data_connector.name,
285
            secrets={str(secret.secret_id): secret.name for secret in dc.secrets},
286
            storage_class=nb_config.cloud_storage.storage_class,
287
        )
288
        if len(dc.secrets) > 0:
×
289
            dcs_secrets[str(dc.data_connector.id)] = dc.secrets
×
290
    if isinstance(user, AuthenticatedAPIUser) and len(dcs_secrets) > 0:
1✔
291
        secret_key = await user_repo.get_or_create_user_secret_key(user)
×
292
        user_secret_key = get_encryption_key(secret_key.encode(), user.id.encode()).decode("utf-8")
×
293
    # NOTE: Check the cloud storage overrides from the request body and if any match
294
    # then overwrite the projects cloud storages
295
    # NOTE: Cloud storages in the session launch request body that are not from the DB will cause a 404 error
296
    # TODO: Is this correct? -> NOTE: Overriding the configuration when a saved secret is there will cause a 422 error
297
    for dco in data_connectors_overrides:
1✔
298
        dc_id = str(dco.data_connector_id)
×
299
        if dc_id not in dcs:
×
300
            raise errors.MissingResourceError(
×
301
                message=f"You have requested a data connector with ID {dc_id} which does not exist "
302
                "or you don't have access to."
303
            )
304
        # NOTE: if 'skip' is true, we do not mount that data connector
305
        if dco.skip:
×
306
            del dcs[dc_id]
×
307
            continue
×
308
        if dco.target_path is not None and not PurePosixPath(dco.target_path).is_absolute():
×
309
            dco.target_path = (work_dir / dco.target_path).as_posix()
×
310
        dcs[dc_id] = dcs[dc_id].with_override(dco)
×
311

312
    # Handle potential duplicate target_path
313
    dcs = _deduplicate_target_paths(dcs)
1✔
314

315
    for cs_id, cs in dcs.items():
1✔
316
        secret_name = f"{server_name}-ds-{cs_id.lower()}"
×
317
        secret_key_needed = len(dcs_secrets.get(cs_id, [])) > 0
×
318
        if secret_key_needed and user_secret_key is None:
×
319
            raise errors.ProgrammingError(
×
320
                message=f"You have saved storage secrets for data connector {cs_id} "
321
                f"associated with your user ID {user.id} but no key to decrypt them, "
322
                "therefore we cannot mount the requested data connector. "
323
                "Please report this to the renku administrators."
324
            )
325
        secret = ExtraSecret(
×
326
            cs.secret(
327
                secret_name,
328
                await nb_config.k8s_client.namespace(),
329
                user_secret_key=user_secret_key if secret_key_needed else None,
330
            )
331
        )
332
        secrets.append(secret)
×
333
        data_sources.append(
×
334
            DataSource(
335
                mountPath=cs.mount_folder,
336
                secretRef=secret.ref(),
337
                accessMode="ReadOnlyMany" if cs.readonly else "ReadWriteOnce",
338
            )
339
        )
340
    return SessionExtraResources(
1✔
341
        data_sources=data_sources,
342
        secrets=secrets,
343
        data_connector_secrets=dcs_secrets,
344
    )
345

346

347
async def request_dc_secret_creation(
2✔
348
    user: AuthenticatedAPIUser | AnonymousAPIUser,
349
    nb_config: NotebooksConfig,
350
    manifest: AmaltheaSessionV1Alpha1,
351
    dc_secrets: dict[str, list[DataConnectorSecret]],
352
) -> None:
353
    """Request the specified data connector secrets to be created by the secret service."""
354
    if isinstance(user, AnonymousAPIUser):
1✔
355
        return
×
356
    owner_reference = {
1✔
357
        "apiVersion": manifest.apiVersion,
358
        "kind": manifest.kind,
359
        "name": manifest.metadata.name,
360
        "uid": manifest.metadata.uid,
361
    }
362
    secrets_url = nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
1✔
363
    headers = {"Authorization": f"bearer {user.access_token}"}
1✔
364

365
    cluster_id = None
1✔
366
    namespace = await nb_config.k8s_v2_client.namespace()
1✔
367
    if (cluster := await nb_config.k8s_v2_client.cluster_by_class_id(manifest.resource_class_id(), user)) is not None:
1✔
368
        cluster_id = cluster.id
1✔
369
        namespace = cluster.namespace
1✔
370

371
    for s_id, secrets in dc_secrets.items():
1✔
372
        if len(secrets) == 0:
×
373
            continue
×
374
        request_data = {
×
375
            "name": f"{manifest.metadata.name}-ds-{s_id.lower()}-secrets",
376
            "namespace": namespace,
377
            "secret_ids": [str(secret.secret_id) for secret in secrets],
378
            "owner_references": [owner_reference],
379
            "key_mapping": {str(secret.secret_id): secret.name for secret in secrets},
380
            "cluster_id": str(cluster_id),
381
        }
382
        async with httpx.AsyncClient(timeout=10) as client:
×
383
            res = await client.post(secrets_url, headers=headers, json=request_data)
×
384
            if res.status_code >= 300 or res.status_code < 200:
×
385
                raise errors.ProgrammingError(
×
386
                    message=f"The secret for data connector with {s_id} could not be "
387
                    f"successfully created, the status code was {res.status_code}."
388
                    "Please contact a Renku administrator.",
389
                    detail=res.text,
390
                )
391

392

393
def get_launcher_env_variables(launcher: SessionLauncher, launch_request: SessionLaunchRequest) -> list[SessionEnvItem]:
2✔
394
    """Get the environment variables from the launcher, with overrides from the request."""
395
    output: list[SessionEnvItem] = []
1✔
396
    env_overrides = {i.name: i.value for i in launch_request.env_variable_overrides or []}
1✔
397
    for env in launcher.env_variables or []:
1✔
398
        if env.name in env_overrides:
×
399
            output.append(SessionEnvItem(name=env.name, value=env_overrides[env.name]))
×
400
        else:
401
            output.append(SessionEnvItem(name=env.name, value=env.value))
×
402
    return output
1✔
403

404

405
def verify_launcher_env_variable_overrides(launcher: SessionLauncher, launch_request: SessionLaunchRequest) -> None:
2✔
406
    """Raise an error if there are env variables that are not defined in the launcher."""
407
    env_overrides = {i.name: i.value for i in launch_request.env_variable_overrides or []}
1✔
408
    known_env_names = {i.name for i in launcher.env_variables or []}
1✔
409
    unknown_env_names = set(env_overrides.keys()) - known_env_names
1✔
410
    if unknown_env_names:
1✔
411
        message = f"""The following environment variables are not defined in the session launcher: {unknown_env_names}.
×
412
            Please remove them from the launch request or add them to the session launcher."""
413
        raise errors.ValidationError(message=message)
×
414

415

416
async def request_session_secret_creation(
2✔
417
    user: AuthenticatedAPIUser | AnonymousAPIUser,
418
    nb_config: NotebooksConfig,
419
    manifest: AmaltheaSessionV1Alpha1,
420
    session_secrets: list[SessionSecret],
421
) -> None:
422
    """Request the specified user session secrets to be created by the secret service."""
423
    if isinstance(user, AnonymousAPIUser):
1✔
424
        return
×
425
    if not session_secrets:
1✔
426
        return
1✔
427
    owner_reference = {
×
428
        "apiVersion": manifest.apiVersion,
429
        "kind": manifest.kind,
430
        "name": manifest.metadata.name,
431
        "uid": manifest.metadata.uid,
432
    }
433
    key_mapping: dict[str, list[str]] = dict()
×
434
    for s in session_secrets:
×
435
        secret_id = str(s.secret_id)
×
436
        if secret_id not in key_mapping:
×
437
            key_mapping[secret_id] = list()
×
438
        key_mapping[secret_id].append(s.secret_slot.filename)
×
439

440
    cluster_id = None
×
441
    namespace = await nb_config.k8s_v2_client.namespace()
×
442
    if (cluster := await nb_config.k8s_v2_client.cluster_by_class_id(manifest.resource_class_id(), user)) is not None:
×
443
        cluster_id = cluster.id
×
444
        namespace = cluster.namespace
×
445

446
    request_data = {
×
447
        "name": f"{manifest.metadata.name}-secrets",
448
        "namespace": namespace,
449
        "secret_ids": [str(s.secret_id) for s in session_secrets],
450
        "owner_references": [owner_reference],
451
        "key_mapping": key_mapping,
452
        "cluster_id": str(cluster_id),
453
    }
454
    secrets_url = nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
×
455
    headers = {"Authorization": f"bearer {user.access_token}"}
×
456
    async with httpx.AsyncClient(timeout=10) as client:
×
457
        res = await client.post(secrets_url, headers=headers, json=request_data)
×
458
        if res.status_code >= 300 or res.status_code < 200:
×
459
            raise errors.ProgrammingError(
×
460
                message="The session secrets could not be successfully created, "
461
                f"the status code was {res.status_code}."
462
                "Please contact a Renku administrator.",
463
                detail=res.text,
464
            )
465

466

467
def resources_patch_from_resource_class(resource_class: ResourceClass) -> ResourcesPatch:
2✔
468
    """Convert the resource class to a k8s resources spec."""
469
    gpu_name = GpuKind.NVIDIA.value + "/gpu"
1✔
470
    resources = resources_from_resource_class(resource_class)
1✔
471
    requests: Mapping[str, Requests | RequestsStr | ResetType] | ResetType | None = None
1✔
472
    limits: Mapping[str, Limits | LimitsStr | ResetType] | ResetType | None = None
1✔
473
    defaul_requests = {"memory": RESET, "cpu": RESET, gpu_name: RESET}
1✔
474
    default_limits = {"memory": RESET, "cpu": RESET, gpu_name: RESET}
1✔
475
    if resources.requests is not None:
1✔
476
        requests = RESET if len(resources.requests.keys()) == 0 else {**defaul_requests, **resources.requests}
1✔
477
    if resources.limits is not None:
1✔
478
        limits = RESET if len(resources.limits.keys()) == 0 else {**default_limits, **resources.limits}
1✔
479
    return ResourcesPatch(requests=requests, limits=limits)
1✔
480

481

482
def resources_from_resource_class(resource_class: ResourceClass) -> Resources:
2✔
483
    """Convert the resource class to a k8s resources spec."""
484
    requests: dict[str, Requests | RequestsStr] = {
1✔
485
        "cpu": RequestsStr(str(round(resource_class.cpu * 1000)) + "m"),
486
        "memory": RequestsStr(f"{resource_class.memory}Gi"),
487
    }
488
    limits: dict[str, Limits | LimitsStr] = {"memory": LimitsStr(f"{resource_class.memory}Gi")}
1✔
489
    if resource_class.gpu > 0:
1✔
490
        gpu_name = GpuKind.NVIDIA.value + "/gpu"
×
491
        requests[gpu_name] = Requests(resource_class.gpu)
×
492
        # NOTE: GPUs have to be set in limits too since GPUs cannot be overcommited, if
493
        # not on some clusters this will cause the session to fully fail to start.
494
        limits[gpu_name] = Limits(resource_class.gpu)
×
495
    return Resources(requests=requests, limits=limits if len(limits) > 0 else None)
1✔
496

497

498
def repositories_from_project(project: Project, git_providers: list[GitProvider]) -> list[Repository]:
2✔
499
    """Get the list of git repositories from a project."""
500
    repositories: list[Repository] = []
1✔
501
    for repo in project.repositories:
1✔
502
        found_provider_id: str | None = None
×
503
        for provider in git_providers:
×
504
            if urlparse(provider.url).netloc == urlparse(repo).netloc:
×
505
                found_provider_id = provider.id
×
506
                break
×
507
        repositories.append(Repository(url=repo, provider=found_provider_id))
×
508
    return repositories
1✔
509

510

511
async def repositories_from_session(
2✔
512
    user: AnonymousAPIUser | AuthenticatedAPIUser,
513
    session: AmaltheaSessionV1Alpha1,
514
    project_repo: ProjectRepository,
515
    git_providers: list[GitProvider],
516
) -> list[Repository]:
517
    """Get the list of git repositories from a session."""
518
    try:
×
519
        project = await project_repo.get_project(user, session.project_id)
×
520
    except errors.MissingResourceError:
×
521
        return []
×
522
    return repositories_from_project(project, git_providers)
×
523

524

525
def get_culling(
2✔
526
    user: AuthenticatedAPIUser | AnonymousAPIUser, resource_pool: ResourcePool, nb_config: NotebooksConfig
527
) -> Culling:
528
    """Create the culling specification for an AmaltheaSession."""
529
    hibernation_threshold: timedelta | None = None
1✔
530
    # NOTE: A value of zero on the resource_pool hibernation threshold
531
    # is interpreted by Amalthea as "never automatically delete after hibernation"
532
    match (user.is_anonymous, resource_pool.hibernation_threshold):
1✔
533
        case True, _:
1✔
534
            # NOTE: Anonymous sessions should not be hibernated at all, but there is no such option in Amalthea
535
            # So in this case we set a very low hibernation threshold so the session is deleted quickly after
536
            # it is hibernated.
537
            hibernation_threshold = timedelta(seconds=1)
×
538
        case False, int():
1✔
539
            hibernation_threshold = timedelta(seconds=resource_pool.hibernation_threshold)
1✔
540
        case False, None:
×
541
            hibernation_threshold = timedelta(seconds=nb_config.sessions.culling.registered.hibernated_seconds)
×
542

543
    idle_duration: timedelta | None = None
1✔
544
    # NOTE: A value of zero on the resource_pool idle threshold
545
    # is interpreted by Amalthea as "never automatically hibernate for idleness"
546
    match (user.is_anonymous, resource_pool.idle_threshold):
1✔
547
        case True, None:
1✔
548
            idle_duration = timedelta(seconds=nb_config.sessions.culling.anonymous.idle_seconds)
×
549
        case _, int():
1✔
550
            idle_duration = timedelta(seconds=resource_pool.idle_threshold)
1✔
551
        case False, None:
×
552
            idle_duration = timedelta(seconds=nb_config.sessions.culling.registered.idle_seconds)
×
553

554
    return Culling(
1✔
555
        maxAge=timedelta(seconds=nb_config.sessions.culling.registered.max_age_seconds),
556
        maxFailedDuration=timedelta(seconds=nb_config.sessions.culling.registered.failed_seconds),
557
        maxHibernatedDuration=hibernation_threshold,
558
        maxIdleDuration=idle_duration,
559
        maxStartingDuration=timedelta(seconds=nb_config.sessions.culling.registered.pending_seconds),
560
    )
561

562

563
def get_culling_patch(
2✔
564
    user: AuthenticatedAPIUser | AnonymousAPIUser,
565
    resource_pool: ResourcePool | None,
566
    nb_config: NotebooksConfig,
567
    lastInteraction: datetime | apispec.CurrentTime | None,
568
) -> CullingPatch:
569
    """Get the patch for the culling durations of a session."""
570
    lastInteractionDT: datetime | None = None
1✔
571
    match lastInteraction:
1✔
572
        case apispec.CurrentTime():
1✔
NEW
573
            lastInteractionDT = datetime.now(UTC).replace(microsecond=0)
×
574
        case datetime() as dt:
1✔
NEW
575
            if not dt.tzinfo:
×
NEW
576
                raise ValidationError(message=f"The timestamp has no timezone information: {dt}")
×
NEW
577
            if datetime.now(UTC) < dt:
×
NEW
578
                raise ValidationError(message=f"The timestamp is in the future: {dt}")
×
NEW
579
            lastInteractionDT = min(dt, datetime.now(UTC)).replace(microsecond=0)
×
580

581
    match resource_pool:
1✔
582
        case None:
1✔
583
            # only update lastInteraction
584
            return CullingPatch(lastInteraction=lastInteractionDT or RESET)
1✔
585
        case rp:
1✔
586
            culling = get_culling(user, rp, nb_config) if resource_pool else Culling()
1✔
587
            return CullingPatch(
1✔
588
                maxAge=culling.maxAge or RESET,
589
                maxFailedDuration=culling.maxFailedDuration or RESET,
590
                maxHibernatedDuration=culling.maxHibernatedDuration or RESET,
591
                maxIdleDuration=culling.maxIdleDuration or RESET,
592
                maxStartingDuration=culling.maxStartingDuration or RESET,
593
                lastInteraction=lastInteractionDT or RESET,
594
            )
595

596

597
async def __requires_image_pull_secret(nb_config: NotebooksConfig, image: str, internal_gitlab_user: APIUser) -> bool:
2✔
598
    """Determines if an image requires a pull secret based on its visibility and their GitLab access token."""
599

600
    parsed_image = Image.from_path(image)
×
601
    image_repo = parsed_image.repo_api()
×
602

603
    image_exists_publicly = await image_repo.image_exists(parsed_image)
×
604
    if image_exists_publicly:
×
605
        return False
×
606

607
    if parsed_image.hostname == nb_config.git.registry and internal_gitlab_user.access_token:
×
608
        image_repo = image_repo.with_oauth2_token(internal_gitlab_user.access_token)
×
609
        image_exists_privately = await image_repo.image_exists(parsed_image)
×
610
        if image_exists_privately:
×
611
            return True
×
612
    # No pull secret needed if the image is private and the user cannot access it
613
    return False
×
614

615

616
def __format_image_pull_secret(secret_name: str, access_token: str, registry_domain: str) -> ExtraSecret:
2✔
617
    registry_secret = {
×
618
        "auths": {registry_domain: {"auth": base64.b64encode(f"oauth2:{access_token}".encode()).decode()}}
619
    }
620
    registry_secret = json.dumps(registry_secret)
×
621
    registry_secret = base64.b64encode(registry_secret.encode()).decode()
×
622
    return ExtraSecret(
×
623
        V1Secret(
624
            data={".dockerconfigjson": registry_secret},
625
            metadata=V1ObjectMeta(name=secret_name),
626
            type="kubernetes.io/dockerconfigjson",
627
        )
628
    )
629

630

631
async def __get_connected_services_image_pull_secret(
2✔
632
    secret_name: str, image_check_repo: ImageCheckRepository, image: str, user: APIUser
633
) -> ExtraSecret | None:
634
    """Return a secret for accessing the image if one is available for the given user."""
635
    image_parsed = Image.from_path(image)
1✔
636
    image_check_result = await image_check_repo.check_image(user=user, gitlab_user=None, image=image_parsed)
1✔
637
    logger.debug(f"Set pull secret for {image} to connection {image_check_result.image_provider}")
1✔
638
    if not image_check_result.token:
1✔
639
        return None
1✔
640

641
    if not image_check_result.image_provider:
×
642
        return None
×
643

644
    return __format_image_pull_secret(
×
645
        secret_name=secret_name,
646
        access_token=image_check_result.token,
647
        registry_domain=image_check_result.image_provider.registry_url,
648
    )
649

650

651
async def get_image_pull_secret(
2✔
652
    image: str,
653
    server_name: str,
654
    nb_config: NotebooksConfig,
655
    user: APIUser,
656
    internal_gitlab_user: APIUser,
657
    image_check_repo: ImageCheckRepository,
658
) -> ExtraSecret | None:
659
    """Get an image pull secret."""
660

661
    v2_secret = await __get_connected_services_image_pull_secret(
1✔
662
        f"{server_name}-image-secret", image_check_repo, image, user
663
    )
664
    if v2_secret:
1✔
665
        return v2_secret
×
666

667
    if (
1✔
668
        nb_config.enable_internal_gitlab
669
        and isinstance(user, AuthenticatedAPIUser)
670
        and internal_gitlab_user.access_token is not None
671
    ):
672
        needs_pull_secret = await __requires_image_pull_secret(nb_config, image, internal_gitlab_user)
×
673
        if needs_pull_secret:
×
674
            v1_secret = await __get_gitlab_image_pull_secret(
×
675
                nb_config, user, f"{server_name}-image-secret-v1", internal_gitlab_user.access_token
676
            )
677
            return v1_secret
×
678

679
    return None
1✔
680

681

682
def get_remote_secret(
2✔
683
    user: AuthenticatedAPIUser | AnonymousAPIUser,
684
    config: NotebooksConfig,
685
    server_name: str,
686
    remote_provider_id: str,
687
    git_providers: list[GitProvider],
688
) -> ExtraSecret | None:
689
    """Returns the secret containing the configuration for the remote session controller."""
690
    if not user.is_authenticated or user.access_token is None or user.refresh_token is None:
×
691
        return None
×
692
    remote_provider = next(filter(lambda p: p.id == remote_provider_id, git_providers), None)
×
693
    if not remote_provider:
×
694
        return None
×
695
    renku_base_url = "https://" + config.sessions.ingress.host
×
696
    renku_base_url = renku_base_url.rstrip("/")
×
697
    renku_auth_token_uri = f"{renku_base_url}/auth/realms/{config.keycloak_realm}/protocol/openid-connect/token"
×
698
    secret_data = {
×
699
        "RSC_AUTH_KIND": "renku",
700
        "RSC_AUTH_TOKEN_URI": remote_provider.access_token_url,
701
        "RSC_AUTH_RENKU_ACCESS_TOKEN": user.access_token,
702
        "RSC_AUTH_RENKU_REFRESH_TOKEN": user.refresh_token,
703
        "RSC_AUTH_RENKU_TOKEN_URI": renku_auth_token_uri,
704
        "RSC_AUTH_RENKU_CLIENT_ID": config.sessions.git_proxy.renku_client_id,
705
        "RSC_AUTH_RENKU_CLIENT_SECRET": config.sessions.git_proxy.renku_client_secret,
706
    }
707
    secret_name = f"{server_name}-remote-secret"
×
708
    secret = V1Secret(metadata=V1ObjectMeta(name=secret_name), string_data=secret_data)
×
709
    return ExtraSecret(secret)
×
710

711

712
def get_remote_env(
2✔
713
    remote: RemoteConfigurationFirecrest,
714
) -> list[SessionEnvItem]:
715
    """Returns env variables used for remote sessions."""
716
    env = [
×
717
        SessionEnvItem(name="RSC_REMOTE_KIND", value=remote.kind.value),
718
        SessionEnvItem(name="RSC_FIRECREST_API_URL", value=remote.api_url),
719
        SessionEnvItem(name="RSC_FIRECREST_SYSTEM_NAME", value=remote.system_name),
720
    ]
721
    if remote.partition:
×
722
        env.append(SessionEnvItem(name="RSC_FIRECREST_PARTITION", value=remote.partition))
×
723
    return env
×
724

725

726
async def start_session(
2✔
727
    request: Request,
728
    launch_request: SessionLaunchRequest,
729
    user: AnonymousAPIUser | AuthenticatedAPIUser,
730
    internal_gitlab_user: APIUser,
731
    nb_config: NotebooksConfig,
732
    git_provider_helper: GitProviderHelperProto,
733
    cluster_repo: ClusterRepository,
734
    data_connector_secret_repo: DataConnectorSecretRepository,
735
    project_repo: ProjectRepository,
736
    project_session_secret_repo: ProjectSessionSecretRepository,
737
    rp_repo: ResourcePoolRepository,
738
    session_repo: SessionRepository,
739
    user_repo: UserRepo,
740
    metrics: MetricsService,
741
    image_check_repo: ImageCheckRepository,
742
) -> tuple[AmaltheaSessionV1Alpha1, bool]:
743
    """Start an Amalthea session.
744

745
    Returns a tuple where the first item is an instance of an Amalthea session
746
    and the second item is a boolean set to true iff a new session was created.
747
    """
748
    launcher = await session_repo.get_launcher(user=user, launcher_id=launch_request.launcher_id)
1✔
749
    launcher_id = launcher.id
1✔
750
    project = await project_repo.get_project(user=user, project_id=launcher.project_id)
1✔
751

752
    # Determine resource_class_id: the class can be overwritten at the user's request
753
    resource_class_id = launch_request.resource_class_id or launcher.resource_class_id
1✔
754

755
    cluster = await nb_config.k8s_v2_client.cluster_by_class_id(resource_class_id, user)
1✔
756

757
    server_name = renku_2_make_server_name(
1✔
758
        user=user, project_id=str(launcher.project_id), launcher_id=str(launcher_id), cluster_id=str(cluster.id)
759
    )
760
    existing_session = await nb_config.k8s_v2_client.get_session(name=server_name, safe_username=user.id)
1✔
761
    if existing_session is not None and existing_session.spec is not None:
1✔
762
        return existing_session, False
×
763

764
    # Fully determine the resource pool and resource class
765
    if resource_class_id is None:
1✔
766
        resource_pool = await rp_repo.get_default_resource_pool()
×
767
        resource_class = resource_pool.get_default_resource_class()
×
768
        if not resource_class and len(resource_pool.classes) > 0:
×
769
            resource_class = resource_pool.classes[0]
×
770
        if not resource_class or not resource_class.id:
×
771
            raise errors.ProgrammingError(message="Cannot find any resource classes in the default pool.")
×
772
        resource_class_id = resource_class.id
×
773
    else:
774
        resource_pool = await rp_repo.get_resource_pool_from_class(user, resource_class_id)
1✔
775
        resource_class = resource_pool.get_resource_class(resource_class_id)
1✔
776
        if not resource_class or not resource_class.id:
1✔
777
            raise errors.MissingResourceError(message=f"The resource class with ID {resource_class_id} does not exist.")
×
778
    await nb_config.crc_validator.validate_class_storage(user, resource_class.id, launch_request.disk_storage)
1✔
779
    disk_storage = launch_request.disk_storage or resource_class.default_storage
1✔
780

781
    # Determine session location
782
    session_location = SessionLocation.remote if resource_pool.remote else SessionLocation.local
1✔
783
    if session_location == SessionLocation.remote and not user.is_authenticated:
1✔
784
        raise errors.ValidationError(message="Anonymous users cannot start remote sessions.")
×
785

786
    environment = launcher.environment
1✔
787
    image = environment.container_image
1✔
788
    work_dir = environment.working_directory
1✔
789
    if not work_dir:
1✔
790
        image_workdir = await core.docker_image_workdir(nb_config, environment.container_image, internal_gitlab_user)
1✔
791
        work_dir_fallback = PurePosixPath("/home/jovyan")
1✔
792
        work_dir = image_workdir or work_dir_fallback
1✔
793
    storage_mount_fallback = work_dir / "work"
1✔
794
    storage_mount = launcher.environment.mount_directory or storage_mount_fallback
1✔
795
    secrets_mount_directory = storage_mount / project.secrets_mount_directory
1✔
796
    session_secrets = await project_session_secret_repo.get_all_session_secrets_from_project(
1✔
797
        user=user, project_id=project.id
798
    )
799
    data_connectors_stream = data_connector_secret_repo.get_data_connectors_with_secrets(user, project.id)
1✔
800
    git_providers = await git_provider_helper.get_providers(user=user)
1✔
801
    repositories = repositories_from_project(project, git_providers)
1✔
802

803
    # User secrets
804
    session_extras = SessionExtraResources()
1✔
805
    session_extras = session_extras.concat(
1✔
806
        user_secrets_extras(
807
            user=user,
808
            config=nb_config,
809
            secrets_mount_directory=secrets_mount_directory.as_posix(),
810
            k8s_secret_name=f"{server_name}-secrets",
811
            session_secrets=session_secrets,
812
        )
813
    )
814

815
    # Data connectors
816
    session_extras = session_extras.concat(
1✔
817
        await get_data_sources(
818
            nb_config=nb_config,
819
            server_name=server_name,
820
            user=user,
821
            data_connectors_stream=data_connectors_stream,
822
            work_dir=work_dir,
823
            data_connectors_overrides=launch_request.data_connectors_overrides or [],
824
            user_repo=user_repo,
825
        )
826
    )
827

828
    # More init containers
829
    session_extras = session_extras.concat(
1✔
830
        await get_extra_init_containers(
831
            nb_config,
832
            user,
833
            repositories,
834
            git_providers,
835
            storage_mount,
836
            work_dir,
837
            uid=environment.uid,
838
            gid=environment.gid,
839
        )
840
    )
841

842
    # Extra containers
843
    session_extras = session_extras.concat(await get_extra_containers(nb_config, user, repositories, git_providers))
1✔
844

845
    # Cluster settings (ingress, storage class, etc)
846
    cluster_settings: ClusterSettings
847
    try:
1✔
848
        cluster_settings = await cluster_repo.select(cluster.id)
1✔
849
    except errors.MissingResourceError:
1✔
850
        # Fallback to global, main cluster parameters
851
        cluster_settings = nb_config.local_cluster_settings()
1✔
852

853
    (
1✔
854
        base_server_path,
855
        base_server_url,
856
        base_server_https_url,
857
        host,
858
        tls_secret,
859
        ingress_class_name,
860
        ingress_annotations,
861
    ) = cluster_settings.get_ingress_parameters(server_name)
862
    storage_class = cluster_settings.get_storage_class()
1✔
863
    service_account_name = cluster_settings.service_account_name
1✔
864

865
    ui_path = f"{base_server_path}/{environment.default_url.lstrip('/')}"
1✔
866

867
    ingress = Ingress(
1✔
868
        host=host,
869
        ingressClassName=ingress_class_name,
870
        annotations=ingress_annotations,
871
        tlsSecret=tls_secret,
872
        pathPrefix=base_server_path,
873
    )
874

875
    # Annotations
876
    annotations: dict[str, str] = {
1✔
877
        "renku.io/project_id": str(launcher.project_id),
878
        "renku.io/launcher_id": str(launcher_id),
879
        "renku.io/resource_class_id": str(resource_class_id),
880
    }
881

882
    # Authentication
883
    if isinstance(user, AuthenticatedAPIUser):
1✔
884
        auth_secret = await get_auth_secret_authenticated(
×
885
            nb_config, user, server_name, base_server_url, base_server_https_url, base_server_path
886
        )
887
    else:
888
        auth_secret = get_auth_secret_anonymous(nb_config, server_name, request)
1✔
889
    session_extras = session_extras.concat(
1✔
890
        SessionExtraResources(
891
            secrets=[auth_secret],
892
            volumes=[auth_secret.volume] if auth_secret.volume else [],
893
        )
894
    )
895
    authn_extra_volume_mounts: list[ExtraVolumeMount] = []
1✔
896
    if auth_secret.volume_mount:
1✔
897
        authn_extra_volume_mounts.append(auth_secret.volume_mount)
×
898

899
    cert_vol_mounts = init_containers.certificates_volume_mounts(nb_config)
1✔
900
    if cert_vol_mounts:
1✔
901
        authn_extra_volume_mounts.extend(cert_vol_mounts)
1✔
902

903
    image_secret = await get_image_pull_secret(
1✔
904
        image=image,
905
        server_name=server_name,
906
        nb_config=nb_config,
907
        user=user,
908
        internal_gitlab_user=internal_gitlab_user,
909
        image_check_repo=image_check_repo,
910
    )
911
    if image_secret:
1✔
912
        session_extras = session_extras.concat(SessionExtraResources(secrets=[image_secret]))
×
913

914
    # Remote session configuration
915
    remote_secret = None
1✔
916
    if session_location == SessionLocation.remote:
1✔
917
        assert resource_pool.remote is not None
×
918
        if resource_pool.remote.provider_id is None:
×
919
            raise errors.ProgrammingError(
×
920
                message=f"The resource pool {resource_pool.id} configuration is not valid (missing field 'remote_provider_id')."  # noqa E501
921
            )
922
        remote_secret = get_remote_secret(
×
923
            user=user,
924
            config=nb_config,
925
            server_name=server_name,
926
            remote_provider_id=resource_pool.remote.provider_id,
927
            git_providers=git_providers,
928
        )
929
    if remote_secret is not None:
1✔
930
        session_extras = session_extras.concat(SessionExtraResources(secrets=[remote_secret]))
×
931

932
    # Raise an error if there are invalid environment variables in the request body
933
    verify_launcher_env_variable_overrides(launcher, launch_request)
1✔
934
    env = [
1✔
935
        SessionEnvItem(name="RENKU_BASE_URL_PATH", value=base_server_path),
936
        SessionEnvItem(name="RENKU_BASE_URL", value=base_server_url),
937
        SessionEnvItem(name="RENKU_MOUNT_DIR", value=storage_mount.as_posix()),
938
        SessionEnvItem(name="RENKU_SESSION", value="1"),
939
        SessionEnvItem(name="RENKU_SESSION_IP", value="0.0.0.0"),  # nosec B104
940
        SessionEnvItem(name="RENKU_SESSION_PORT", value=f"{environment.port}"),
941
        SessionEnvItem(name="RENKU_WORKING_DIR", value=work_dir.as_posix()),
942
        SessionEnvItem(name="RENKU_SECRETS_PATH", value=project.secrets_mount_directory.as_posix()),
943
        SessionEnvItem(name="RENKU_PROJECT_ID", value=str(project.id)),
944
        SessionEnvItem(name="RENKU_PROJECT_PATH", value=project.path.serialize()),
945
        SessionEnvItem(name="RENKU_LAUNCHER_ID", value=str(launcher.id)),
946
    ]
947
    if session_location == SessionLocation.remote:
1✔
948
        assert resource_pool.remote is not None
×
949
        env.extend(
×
950
            get_remote_env(
951
                remote=resource_pool.remote,
952
            )
953
        )
954
    launcher_env_variables = get_launcher_env_variables(launcher, launch_request)
1✔
955
    env.extend(launcher_env_variables)
1✔
956

957
    session = AmaltheaSessionV1Alpha1(
1✔
958
        metadata=Metadata(name=server_name, annotations=annotations),
959
        spec=AmaltheaSessionSpec(
960
            location=session_location,
961
            imagePullSecrets=[ImagePullSecret(name=image_secret.name, adopt=True)] if image_secret else [],
962
            codeRepositories=[],
963
            hibernated=False,
964
            reconcileStrategy=ReconcileStrategy.whenFailedOrHibernated,
965
            priorityClassName=resource_class.quota,
966
            session=Session(
967
                image=image,
968
                imagePullPolicy=ImagePullPolicy.Always,
969
                urlPath=ui_path,
970
                port=environment.port,
971
                storage=Storage(
972
                    className=storage_class,
973
                    size=SizeStr(str(disk_storage) + "G"),
974
                    mountPath=storage_mount.as_posix(),
975
                ),
976
                workingDir=work_dir.as_posix(),
977
                runAsUser=environment.uid,
978
                runAsGroup=environment.gid,
979
                resources=resources_from_resource_class(resource_class),
980
                extraVolumeMounts=session_extras.volume_mounts,
981
                command=environment.command,
982
                args=environment.args,
983
                shmSize=ShmSizeStr("1G"),
984
                stripURLPath=environment.strip_path_prefix,
985
                env=env,
986
                remoteSecretRef=remote_secret.ref() if remote_secret else None,
987
            ),
988
            ingress=ingress,
989
            extraContainers=session_extras.containers,
990
            initContainers=session_extras.init_containers,
991
            extraVolumes=session_extras.volumes,
992
            culling=get_culling(user, resource_pool, nb_config),
993
            authentication=Authentication(
994
                enabled=True,
995
                type=AuthenticationType.oauth2proxy
996
                if isinstance(user, AuthenticatedAPIUser)
997
                else AuthenticationType.token,
998
                secretRef=auth_secret.key_ref("auth"),
999
                extraVolumeMounts=authn_extra_volume_mounts,
1000
            ),
1001
            dataSources=session_extras.data_sources,
1002
            tolerations=tolerations_from_resource_class(resource_class, nb_config.sessions.tolerations_model),
1003
            affinity=node_affinity_from_resource_class(resource_class, nb_config.sessions.affinity_model),
1004
            serviceAccountName=service_account_name,
1005
        ),
1006
    )
1007
    secrets_to_create = session_extras.secrets or []
1✔
1008
    for s in secrets_to_create:
1✔
1009
        await nb_config.k8s_v2_client.create_or_patch_secret(K8sSecret.from_v1_secret(s.secret, cluster))
1✔
1010
    try:
1✔
1011
        session = await nb_config.k8s_v2_client.create_session(session, user)
1✔
1012
    except Exception as err:
×
1013
        for s in secrets_to_create:
×
1014
            await nb_config.k8s_v2_client.delete_secret(K8sSecret.from_v1_secret(s.secret, cluster))
×
1015
        raise errors.ProgrammingError(message="Could not start the amalthea session") from err
×
1016
    else:
1017
        try:
1✔
1018
            await request_session_secret_creation(user, nb_config, session, session_secrets)
1✔
1019
            data_connector_secrets = session_extras.data_connector_secrets or dict()
1✔
1020
            await request_dc_secret_creation(user, nb_config, session, data_connector_secrets)
1✔
1021
        except Exception:
×
1022
            await nb_config.k8s_v2_client.delete_session(server_name, user.id)
×
1023
            raise
×
1024

1025
    await metrics.user_requested_session_launch(
1✔
1026
        user=user,
1027
        metadata={
1028
            "cpu": int(resource_class.cpu * 1000),
1029
            "memory": resource_class.memory,
1030
            "gpu": resource_class.gpu,
1031
            "storage": disk_storage,
1032
            "resource_class_id": resource_class.id,
1033
            "resource_pool_id": resource_pool.id or "",
1034
            "resource_class_name": f"{resource_pool.name}.{resource_class.name}",
1035
            "session_id": server_name,
1036
        },
1037
    )
1038
    return session, True
1✔
1039

1040

1041
async def patch_session(
2✔
1042
    body: apispec.SessionPatchRequest,
1043
    session_id: str,
1044
    user: AnonymousAPIUser | AuthenticatedAPIUser,
1045
    internal_gitlab_user: APIUser,
1046
    nb_config: NotebooksConfig,
1047
    git_provider_helper: GitProviderHelperProto,
1048
    project_repo: ProjectRepository,
1049
    project_session_secret_repo: ProjectSessionSecretRepository,
1050
    rp_repo: ResourcePoolRepository,
1051
    session_repo: SessionRepository,
1052
    image_check_repo: ImageCheckRepository,
1053
    metrics: MetricsService,
1054
) -> AmaltheaSessionV1Alpha1:
1055
    """Patch an Amalthea session."""
1056
    session = await nb_config.k8s_v2_client.get_session(session_id, user.id)
1✔
1057
    if session is None:
1✔
1058
        raise errors.MissingResourceError(message=f"The session with ID {session_id} does not exist")
1✔
1059
    if session.spec is None:
1✔
1060
        raise errors.ProgrammingError(
×
1061
            message=f"The session {session_id} being patched is missing the expected 'spec' field.", quiet=True
1062
        )
1063
    cluster = await nb_config.k8s_v2_client.cluster_by_class_id(session.resource_class_id(), user)
1✔
1064

1065
    patch = AmaltheaSessionV1Alpha1Patch(spec=AmaltheaSessionV1Alpha1SpecPatch())
1✔
1066
    is_getting_hibernated: bool = False
1✔
1067

1068
    # Hibernation
1069
    # TODO: Some patching should only be done when the session is in some states to avoid inadvertent restarts
1070
    # Refresh tokens for git proxy
1071
    if (
1✔
1072
        body.state is not None
1073
        and body.state.value.lower() == State.Hibernated.value.lower()
1074
        and body.state.value.lower() != session.status.state.value.lower()
1075
    ):
1076
        # Session is being hibernated
1077
        patch.spec.hibernated = True
1✔
1078
        is_getting_hibernated = True
1✔
1079
    elif (
1✔
1080
        body.state is not None
1081
        and body.state.value.lower() == State.Running.value.lower()
1082
        and session.status.state.value.lower() != body.state.value.lower()
1083
    ):
1084
        # Session is being resumed
1085
        patch.spec.hibernated = False
×
1086
        await metrics.user_requested_session_resume(user, metadata={"session_id": session_id})
×
1087

1088
    rp: ResourcePool | None = None
1✔
1089
    # Resource class
1090
    if body.resource_class_id is not None:
1✔
1091
        new_cluster = await nb_config.k8s_v2_client.cluster_by_class_id(body.resource_class_id, user)
1✔
1092
        if new_cluster.id != cluster.id:
1✔
1093
            raise errors.ValidationError(
×
1094
                message=(
1095
                    f"The requested resource class {body.resource_class_id} is not in the "
1096
                    f"same cluster {cluster.id} as the current resource class {session.resource_class_id()}."
1097
                )
1098
            )
1099
        rp = await rp_repo.get_resource_pool_from_class(user, body.resource_class_id)
1✔
1100
        rc = rp.get_resource_class(body.resource_class_id)
1✔
1101
        if not rc:
1✔
1102
            raise errors.MissingResourceError(
×
1103
                message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
1104
            )
1105
        if not patch.metadata:
1✔
1106
            patch.metadata = AmaltheaSessionV1Alpha1MetadataPatch()
1✔
1107
        # Patch the resource pool and class ID in the annotations
1108
        patch.metadata.annotations = {"renku.io/resource_pool_id": str(rp.id)}
1✔
1109
        patch.metadata.annotations = {"renku.io/resource_class_id": str(body.resource_class_id)}
1✔
1110
        if not patch.spec.session:
1✔
1111
            patch.spec.session = AmaltheaSessionV1Alpha1SpecSessionPatch()
1✔
1112
        patch.spec.session.resources = resources_patch_from_resource_class(rc)
1✔
1113
        # Tolerations
1114
        tolerations = tolerations_from_resource_class(rc, nb_config.sessions.tolerations_model)
1✔
1115
        patch.spec.tolerations = tolerations
1✔
1116
        # Affinities
1117
        patch.spec.affinity = node_affinity_patch_from_resource_class(rc, nb_config.sessions.affinity_model)
1✔
1118
        # Priority class (if a quota is being used)
1119
        if rc.quota is None:
1✔
1120
            patch.spec.priorityClassName = RESET
×
1121
        # Service account name
1122
        if rp.cluster is not None:
1✔
1123
            patch.spec.service_account_name = (
×
1124
                rp.cluster.service_account_name if rp.cluster.service_account_name is not None else RESET
1125
            )
1126

1127
    patch.spec.culling = get_culling_patch(user, rp, nb_config, body.lastInteraction)
1✔
1128

1129
    # If the session is being hibernated we do not need to patch anything else that is
1130
    # not specifically called for in the request body, we can refresh things when the user resumes.
1131
    if is_getting_hibernated:
1✔
1132
        return await nb_config.k8s_v2_client.patch_session(session_id, user.id, patch.to_rfc7386())
1✔
1133

1134
    server_name = session.metadata.name
1✔
1135
    launcher = await session_repo.get_launcher(user, session.launcher_id)
1✔
1136
    project = await project_repo.get_project(user=user, project_id=session.project_id)
1✔
1137
    environment = launcher.environment
1✔
1138
    work_dir = environment.working_directory
1✔
1139
    if not work_dir:
1✔
1140
        image_workdir = await core.docker_image_workdir(nb_config, environment.container_image, internal_gitlab_user)
1✔
1141
        work_dir_fallback = PurePosixPath("/home/jovyan")
1✔
1142
        work_dir = image_workdir or work_dir_fallback
1✔
1143
    storage_mount_fallback = work_dir / "work"
1✔
1144
    storage_mount = launcher.environment.mount_directory or storage_mount_fallback
1✔
1145
    secrets_mount_directory = storage_mount / project.secrets_mount_directory
1✔
1146
    session_secrets = await project_session_secret_repo.get_all_session_secrets_from_project(
1✔
1147
        user=user, project_id=project.id
1148
    )
1149
    git_providers = await git_provider_helper.get_providers(user=user)
1✔
1150
    repositories = repositories_from_project(project, git_providers)
1✔
1151

1152
    # User secrets
1153
    session_extras = SessionExtraResources()
1✔
1154
    session_extras = session_extras.concat(
1✔
1155
        user_secrets_extras(
1156
            user=user,
1157
            config=nb_config,
1158
            secrets_mount_directory=secrets_mount_directory.as_posix(),
1159
            k8s_secret_name=f"{server_name}-secrets",
1160
            session_secrets=session_secrets,
1161
        )
1162
    )
1163

1164
    # Data connectors: skip
1165
    # TODO: How can we patch data connectors? Should we even patch them?
1166
    # TODO: The fact that `start_session()` accepts overrides for data connectors
1167
    # TODO: but that we do not save these overrides (e.g. as annotations) means that
1168
    # TODO: we cannot patch data connectors upon resume.
1169
    # TODO: If we did, we would lose the user's provided overrides (e.g. unsaved credentials).
1170

1171
    # More init containers
1172
    session_extras = session_extras.concat(
1✔
1173
        await get_extra_init_containers(
1174
            nb_config,
1175
            user,
1176
            repositories,
1177
            git_providers,
1178
            storage_mount,
1179
            work_dir,
1180
            uid=environment.uid,
1181
            gid=environment.gid,
1182
        )
1183
    )
1184

1185
    # Extra containers
1186
    session_extras = session_extras.concat(await get_extra_containers(nb_config, user, repositories, git_providers))
1✔
1187

1188
    # Patching the image pull secret
1189
    image = session.spec.session.image
1✔
1190
    image_pull_secret = await get_image_pull_secret(
1✔
1191
        image=image,
1192
        server_name=server_name,
1193
        nb_config=nb_config,
1194
        image_check_repo=image_check_repo,
1195
        user=user,
1196
        internal_gitlab_user=internal_gitlab_user,
1197
    )
1198
    if image_pull_secret:
1✔
1199
        session_extras = session_extras.concat(SessionExtraResources(secrets=[image_pull_secret]))
×
1200
        patch.spec.imagePullSecrets = [ImagePullSecret(name=image_pull_secret.name, adopt=image_pull_secret.adopt)]
×
1201
    else:
1202
        patch.spec.imagePullSecrets = RESET
1✔
1203

1204
    # Construct session patch
1205
    patch.spec.extraContainers = _make_patch_spec_list(
1✔
1206
        existing=session.spec.extraContainers or [], updated=session_extras.containers
1207
    )
1208
    patch.spec.initContainers = _make_patch_spec_list(
1✔
1209
        existing=session.spec.initContainers or [], updated=session_extras.init_containers
1210
    )
1211
    patch.spec.extraVolumes = _make_patch_spec_list(
1✔
1212
        existing=session.spec.extraVolumes or [], updated=session_extras.volumes
1213
    )
1214
    if not patch.spec.session:
1✔
1215
        patch.spec.session = AmaltheaSessionV1Alpha1SpecSessionPatch()
×
1216
    patch.spec.session.extraVolumeMounts = _make_patch_spec_list(
1✔
1217
        existing=session.spec.session.extraVolumeMounts or [], updated=session_extras.volume_mounts
1218
    )
1219

1220
    secrets_to_create = session_extras.secrets or []
1✔
1221
    for s in secrets_to_create:
1✔
1222
        await nb_config.k8s_v2_client.create_or_patch_secret(K8sSecret.from_v1_secret(s.secret, cluster))
×
1223

1224
    patch_serialized = patch.to_rfc7386()
1✔
1225
    if len(patch_serialized) == 0:
1✔
1226
        return session
×
1227

1228
    return await nb_config.k8s_v2_client.patch_session(session_id, user.id, patch_serialized)
1✔
1229

1230

1231
def _deduplicate_target_paths(dcs: dict[str, RCloneStorage]) -> dict[str, RCloneStorage]:
2✔
1232
    """Ensures that the target paths for all storages are unique.
1233

1234
    This method will attempt to de-duplicate the target_path for all items passed in,
1235
    and raise an error if it fails to generate unique target_path.
1236
    """
1237
    result_dcs: dict[str, RCloneStorage] = {}
1✔
1238
    mount_folders: dict[str, list[str]] = {}
1✔
1239

1240
    def _find_mount_folder(dc: RCloneStorage) -> str:
1✔
1241
        mount_folder = dc.mount_folder
×
1242
        if mount_folder not in mount_folders:
×
1243
            return mount_folder
×
1244
        # 1. Try with a "-1", "-2", etc. suffix
1245
        mount_folder_try = f"{mount_folder}-{len(mount_folders[mount_folder])}"
×
1246
        if mount_folder_try not in mount_folders:
×
1247
            return mount_folder_try
×
1248
        # 2. Try with a random suffix
1249
        suffix = "".join([random.choice(string.ascii_lowercase + string.digits) for _ in range(4)])  # nosec B311
×
1250
        mount_folder_try = f"{mount_folder}-{suffix}"
×
1251
        if mount_folder_try not in mount_folders:
×
1252
            return mount_folder_try
×
1253
        raise errors.ValidationError(
×
1254
            message=f"Could not start session because two or more data connectors ({', '.join(mount_folders[mount_folder])}) share the same mount point '{mount_folder}'"  # noqa E501
1255
        )
1256

1257
    for dc_id, dc in dcs.items():
1✔
1258
        original_mount_folder = dc.mount_folder
×
1259
        new_mount_folder = _find_mount_folder(dc)
×
1260
        # Keep track of the original mount folder here
1261
        if new_mount_folder != original_mount_folder:
×
1262
            logger.warning(f"Re-assigning data connector {dc_id} to mount point '{new_mount_folder}'")
×
1263
            dc_ids = mount_folders.get(original_mount_folder, [])
×
1264
            dc_ids.append(dc_id)
×
1265
            mount_folders[original_mount_folder] = dc_ids
×
1266
        # Keep track of the assigned mount folder here
1267
        dc_ids = mount_folders.get(new_mount_folder, [])
×
1268
        dc_ids.append(dc_id)
×
1269
        mount_folders[new_mount_folder] = dc_ids
×
1270
        result_dcs[dc_id] = dc.with_override(
×
1271
            override=SessionDataConnectorOverride(
1272
                skip=False,
1273
                data_connector_id=ULID.from_str(dc_id),
1274
                target_path=new_mount_folder,
1275
                configuration=None,
1276
                source_path=None,
1277
                readonly=None,
1278
            )
1279
        )
1280

1281
    return result_dcs
1✔
1282

1283

1284
class _NamedResource(Protocol):
2✔
1285
    """Represents a resource with a name."""
1286

1287
    name: str
2✔
1288

1289

1290
_T = TypeVar("_T", bound=_NamedResource)
2✔
1291

1292

1293
def _make_patch_spec_list(existing: Sequence[_T], updated: Sequence[_T]) -> list[_T] | None:
2✔
1294
    """Merges updated into existing by upserting items identified by their name.
1295

1296
    This method is used to construct session patches, merging session resources by name (containers, volumes, etc.).
1297
    """
1298
    patch_list = None
1✔
1299
    if updated:
1✔
1300
        patch_list = list(existing)
1✔
1301
        upsert_list = list(updated)
1✔
1302
        for upsert_item in upsert_list:
1✔
1303
            # Find out if the upsert_item needs to be added or updated
1304
            # found = next(enumerate(filter(lambda item: item.name == upsert_item.name, patch_list)), None)
1305
            found = next(filter(lambda t: t[1].name == upsert_item.name, enumerate(patch_list)), None)
1✔
1306
            if found is not None:
1✔
1307
                idx, _ = found
1✔
1308
                patch_list[idx] = upsert_item
1✔
1309
            else:
1310
                patch_list.append(upsert_item)
1✔
1311
    return patch_list
1✔
1312

1313

1314
def validate_session_post_request(body: apispec.SessionPostRequest) -> SessionLaunchRequest:
2✔
1315
    """Validate a session launch request."""
1316
    data_connectors_overrides = (
1✔
1317
        [
1318
            SessionDataConnectorOverride(
1319
                skip=dc.skip,
1320
                data_connector_id=ULID.from_str(dc.data_connector_id),
1321
                configuration=dc.configuration,
1322
                source_path=dc.source_path,
1323
                target_path=dc.target_path,
1324
                readonly=dc.readonly,
1325
            )
1326
            for dc in body.data_connectors_overrides
1327
        ]
1328
        if body.data_connectors_overrides
1329
        else None
1330
    )
1331
    env_variable_overrides = (
1✔
1332
        [SessionEnvVar(name=ev.name, value=ev.value) for ev in body.env_variable_overrides]
1333
        if body.env_variable_overrides
1334
        else None
1335
    )
1336
    return SessionLaunchRequest(
1✔
1337
        launcher_id=ULID.from_str(body.launcher_id),
1338
        disk_storage=body.disk_storage,
1339
        resource_class_id=body.resource_class_id,
1340
        data_connectors_overrides=data_connectors_overrides,
1341
        env_variable_overrides=env_variable_overrides,
1342
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc