• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 16369691288

18 Jul 2025 11:43AM UTC coverage: 87.186% (-0.04%) from 87.229%
16369691288

Pull #929

github

web-flow
Merge 03ce98685 into 8be58eb79
Pull Request #929: exp: session api proxy

8 of 21 new or added lines in 3 files covered. (38.1%)

13 existing lines in 5 files now uncovered.

21486 of 24644 relevant lines covered (87.19%)

1.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.21
/components/renku_data_services/notebooks/core_sessions.py
1
"""A selection of core functions for AmaltheaSessions."""
2

3
import base64
2✔
4
import json
2✔
5
import os
2✔
6
import random
2✔
7
import string
2✔
8
from collections.abc import AsyncIterator
2✔
9
from datetime import timedelta
2✔
10
from pathlib import PurePosixPath
2✔
11
from typing import cast
2✔
12
from urllib.parse import urljoin, urlparse
2✔
13

14
import httpx
2✔
15
from kubernetes.client import V1ObjectMeta, V1Secret
2✔
16
from kubernetes.utils.duration import format_duration
2✔
17
from sanic import Request
2✔
18
from toml import dumps
2✔
19
from yaml import safe_dump
2✔
20

21
from renku_data_services.app_config import logging
2✔
22
from renku_data_services.base_models import APIUser
2✔
23
from renku_data_services.base_models.core import AnonymousAPIUser, AuthenticatedAPIUser
2✔
24
from renku_data_services.base_models.metrics import MetricsService
2✔
25
from renku_data_services.crc.db import ResourcePoolRepository
2✔
26
from renku_data_services.crc.models import GpuKind, ResourceClass, ResourcePool
2✔
27
from renku_data_services.data_connectors.models import DataConnectorSecret, DataConnectorWithSecrets
2✔
28
from renku_data_services.errors import errors
2✔
29
from renku_data_services.notebooks import apispec
2✔
30
from renku_data_services.notebooks.api.amalthea_patches import api_proxy, git_proxy, init_containers
2✔
31
from renku_data_services.notebooks.api.classes.image import Image
2✔
32
from renku_data_services.notebooks.api.classes.k8s_client import sanitizer
2✔
33
from renku_data_services.notebooks.api.classes.repository import GitProvider, Repository
2✔
34
from renku_data_services.notebooks.api.schemas.cloud_storage import RCloneStorage
2✔
35
from renku_data_services.notebooks.config import NotebooksConfig
2✔
36
from renku_data_services.notebooks.crs import (
2✔
37
    AmaltheaSessionV1Alpha1,
38
    AmaltheaSessionV1Alpha1Patch,
39
    AmaltheaSessionV1Alpha1SpecPatch,
40
    AmaltheaSessionV1Alpha1SpecSessionPatch,
41
    Culling,
42
    DataSource,
43
    ExtraContainer,
44
    ExtraVolume,
45
    ExtraVolumeMount,
46
    ImagePullSecret,
47
    InitContainer,
48
    Resources,
49
    SecretAsVolume,
50
    SecretAsVolumeItem,
51
    SessionEnvItem,
52
    State,
53
)
54
from renku_data_services.notebooks.models import ExtraSecret
2✔
55
from renku_data_services.notebooks.utils import (
2✔
56
    node_affinity_from_resource_class,
57
    tolerations_from_resource_class,
58
)
59
from renku_data_services.project.db import ProjectRepository
2✔
60
from renku_data_services.project.models import Project, SessionSecret
2✔
61
from renku_data_services.session.models import SessionLauncher
2✔
62
from renku_data_services.users.db import UserRepo
2✔
63
from renku_data_services.utils.cryptography import get_encryption_key
2✔
64

65
logger = logging.getLogger(__name__)
2✔
66

67

68
async def get_extra_init_containers(
2✔
69
    nb_config: NotebooksConfig,
70
    user: AnonymousAPIUser | AuthenticatedAPIUser,
71
    repositories: list[Repository],
72
    git_providers: list[GitProvider],
73
    storage_mount: PurePosixPath,
74
    work_dir: PurePosixPath,
75
    uid: int = 1000,
76
    gid: int = 1000,
77
) -> tuple[list[InitContainer], list[ExtraVolume]]:
78
    """Get all extra init containers that should be added to an amalthea session."""
79
    cert_init, cert_vols = init_containers.certificates_container(nb_config)
×
80
    session_init_containers = [InitContainer.model_validate(sanitizer(cert_init))]
×
81
    extra_volumes = [ExtraVolume.model_validate(sanitizer(volume)) for volume in cert_vols]
×
82
    git_clone = await init_containers.git_clone_container_v2(
×
83
        user=user,
84
        config=nb_config,
85
        repositories=repositories,
86
        git_providers=git_providers,
87
        workspace_mount_path=storage_mount,
88
        work_dir=work_dir,
89
        uid=uid,
90
        gid=gid,
91
    )
92
    if git_clone is not None:
×
93
        session_init_containers.append(InitContainer.model_validate(git_clone))
×
94
    return session_init_containers, extra_volumes
×
95

96

97
async def get_extra_containers(
2✔
98
    nb_config: NotebooksConfig,
99
    session_id: str,
100
    user: AnonymousAPIUser | AuthenticatedAPIUser,
101
    repositories: list[Repository],
102
    git_providers: list[GitProvider],
103
) -> list[ExtraContainer]:
104
    """Get the extra containers added to amalthea sessions."""
105
    conts: list[ExtraContainer] = []
×
106
    git_proxy_container = await git_proxy.main_container(
×
107
        user=user, config=nb_config, repositories=repositories, git_providers=git_providers
108
    )
109
    if git_proxy_container:
×
110
        conts.append(ExtraContainer.model_validate(sanitizer(git_proxy_container)))
×
NEW
111
    api_proxy_container = api_proxy.main_container(session_id=session_id, user=user, config=nb_config)
×
NEW
112
    if api_proxy_container:
×
NEW
113
        conts.append(ExtraContainer.model_validate(sanitizer(api_proxy_container)))
×
UNCOV
114
    return conts
×
115

116

117
async def get_auth_secret_authenticated(
2✔
118
    nb_config: NotebooksConfig,
119
    user: AuthenticatedAPIUser,
120
    server_name: str,
121
    base_server_url: str,
122
    base_server_https_url: str,
123
    base_server_path: str,
124
) -> ExtraSecret:
125
    """Get the extra secrets that need to be added to the session for an authenticated user."""
126
    secret_data = {}
×
127

128
    parsed_proxy_url = urlparse(urljoin(base_server_url + "/", "oauth2"))
×
129
    vol = ExtraVolume(
×
130
        name="renku-authorized-emails",
131
        secret=SecretAsVolume(
132
            secretName=server_name,
133
            items=[SecretAsVolumeItem(key="authorized_emails", path="authorized_emails")],
134
        ),
135
    )
136
    secret_data["auth"] = dumps(
×
137
        {
138
            "provider": "oidc",
139
            "client_id": nb_config.sessions.oidc.client_id,
140
            "oidc_issuer_url": nb_config.sessions.oidc.issuer_url,
141
            "session_cookie_minimal": True,
142
            "skip_provider_button": True,
143
            # NOTE: If the redirect url is not HTTPS then some or identity providers will fail.
144
            "redirect_url": urljoin(base_server_https_url + "/", "oauth2/callback"),
145
            "cookie_path": base_server_path,
146
            "proxy_prefix": parsed_proxy_url.path,
147
            "authenticated_emails_file": "/authorized_emails",
148
            "client_secret": nb_config.sessions.oidc.client_secret,
149
            "cookie_secret": base64.urlsafe_b64encode(os.urandom(32)).decode(),
150
            "insecure_oidc_allow_unverified_email": nb_config.sessions.oidc.allow_unverified_email,
151
        }
152
    )
153
    secret_data["authorized_emails"] = user.email
×
154
    secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
×
155
    vol_mount = ExtraVolumeMount(
×
156
        name="renku-authorized-emails",
157
        mountPath="/authorized_emails",
158
        subPath="authorized_emails",
159
    )
160
    return ExtraSecret(secret, vol, vol_mount)
×
161

162

163
async def get_auth_secret_anonymous(nb_config: NotebooksConfig, server_name: str, request: Request) -> ExtraSecret:
2✔
164
    """Get the extra secrets that need to be added to the session for an anonymous user."""
165
    # NOTE: We extract the session cookie value here in order to avoid creating a cookie.
166
    # The gateway encrypts and signs cookies so the user ID injected in the request headers does not
167
    # match the value of the session cookie.
168
    session_id = cast(str | None, request.cookies.get(nb_config.session_id_cookie_name))
×
169
    if not session_id:
×
170
        raise errors.UnauthorizedError(
×
171
            message=f"You have to have a renku session cookie at {nb_config.session_id_cookie_name} "
172
            "in order to launch an anonymous session."
173
        )
174
    # NOTE: Amalthea looks for the token value first in the cookie and then in the authorization header
175
    secret_data = {
×
176
        "auth": safe_dump(
177
            {
178
                "authproxy": {
179
                    "token": session_id,
180
                    "cookie_key": nb_config.session_id_cookie_name,
181
                    "verbose": True,
182
                }
183
            }
184
        )
185
    }
186
    secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
×
187
    return ExtraSecret(secret)
×
188

189

190
def get_gitlab_image_pull_secret(
2✔
191
    nb_config: NotebooksConfig, user: AuthenticatedAPIUser, image_pull_secret_name: str, access_token: str
192
) -> ExtraSecret:
193
    """Create a Kubernetes secret for private GitLab registry authentication."""
194

195
    k8s_namespace = nb_config.k8s_client.namespace()
×
196

197
    registry_secret = {
×
198
        "auths": {
199
            nb_config.git.registry: {
200
                "Username": "oauth2",
201
                "Password": access_token,
202
                "Email": user.email,
203
            }
204
        }
205
    }
206
    registry_secret = json.dumps(registry_secret)
×
207

208
    secret_data = {".dockerconfigjson": registry_secret}
×
209
    secret = V1Secret(
×
210
        metadata=V1ObjectMeta(name=image_pull_secret_name, namespace=k8s_namespace),
211
        string_data=secret_data,
212
        type="kubernetes.io/dockerconfigjson",
213
    )
214

215
    return ExtraSecret(secret)
×
216

217

218
async def get_data_sources(
2✔
219
    nb_config: NotebooksConfig,
220
    user: AnonymousAPIUser | AuthenticatedAPIUser,
221
    server_name: str,
222
    data_connectors_stream: AsyncIterator[DataConnectorWithSecrets],
223
    work_dir: PurePosixPath,
224
    cloud_storage_overrides: list[apispec.SessionCloudStoragePost],
225
    user_repo: UserRepo,
226
) -> tuple[list[DataSource], list[ExtraSecret], dict[str, list[DataConnectorSecret]]]:
227
    """Generate cloud storage related resources."""
228
    data_sources: list[DataSource] = []
×
229
    secrets: list[ExtraSecret] = []
×
230
    dcs: dict[str, RCloneStorage] = {}
×
231
    dcs_secrets: dict[str, list[DataConnectorSecret]] = {}
×
232
    async for dc in data_connectors_stream:
×
233
        mount_folder = (
×
234
            dc.data_connector.storage.target_path
235
            if PurePosixPath(dc.data_connector.storage.target_path).is_absolute()
236
            else (work_dir / dc.data_connector.storage.target_path).as_posix()
237
        )
238
        dcs[str(dc.data_connector.id)] = RCloneStorage(
×
239
            source_path=dc.data_connector.storage.source_path,
240
            mount_folder=mount_folder,
241
            configuration=dc.data_connector.storage.configuration,
242
            readonly=dc.data_connector.storage.readonly,
243
            name=dc.data_connector.name,
244
            secrets={str(secret.secret_id): secret.name for secret in dc.secrets},
245
            storage_class=nb_config.cloud_storage.storage_class,
246
        )
247
        if len(dc.secrets) > 0:
×
248
            dcs_secrets[str(dc.data_connector.id)] = dc.secrets
×
249
    if isinstance(user, AuthenticatedAPIUser) and len(dcs_secrets) > 0:
×
250
        secret_key = await user_repo.get_or_create_user_secret_key(user)
×
251
        user_secret_key = get_encryption_key(secret_key.encode(), user.id.encode()).decode("utf-8")
×
252
    # NOTE: Check the cloud storage overrides from the request body and if any match
253
    # then overwrite the projects cloud storages
254
    # NOTE: Cloud storages in the session launch request body that are not from the DB will cause a 404 error
255
    # NOTE: Overriding the configuration when a saved secret is there will cause a 422 error
256
    for csr in cloud_storage_overrides:
×
257
        csr_id = csr.storage_id
×
258
        if csr_id not in dcs:
×
259
            raise errors.MissingResourceError(
×
260
                message=f"You have requested a cloud storage with ID {csr_id} which does not exist "
261
                "or you dont have access to."
262
            )
263
        if csr.target_path is not None and not PurePosixPath(csr.target_path).is_absolute():
×
264
            csr.target_path = (work_dir / csr.target_path).as_posix()
×
265
        dcs[csr_id] = dcs[csr_id].with_override(csr)
×
266

267
    # Handle potential duplicate target_path
268
    dcs = _deduplicate_target_paths(dcs)
×
269

270
    for cs_id, cs in dcs.items():
×
271
        secret_name = f"{server_name}-ds-{cs_id.lower()}"
×
272
        secret_key_needed = len(dcs_secrets.get(cs_id, [])) > 0
×
273
        if secret_key_needed and user_secret_key is None:
×
274
            raise errors.ProgrammingError(
×
275
                message=f"You have saved storage secrets for data connector {cs_id} "
276
                f"associated with your user ID {user.id} but no key to decrypt them, "
277
                "therefore we cannot mount the requested data connector. "
278
                "Please report this to the renku administrators."
279
            )
280
        secret = ExtraSecret(
×
281
            cs.secret(
282
                secret_name,
283
                nb_config.k8s_client.namespace(),
284
                user_secret_key=user_secret_key if secret_key_needed else None,
285
            )
286
        )
287
        secrets.append(secret)
×
288
        data_sources.append(
×
289
            DataSource(
290
                mountPath=cs.mount_folder,
291
                secretRef=secret.ref(),
292
                accessMode="ReadOnlyMany" if cs.readonly else "ReadWriteOnce",
293
            )
294
        )
295
    return data_sources, secrets, dcs_secrets
×
296

297

298
async def request_dc_secret_creation(
2✔
299
    user: AuthenticatedAPIUser | AnonymousAPIUser,
300
    nb_config: NotebooksConfig,
301
    manifest: AmaltheaSessionV1Alpha1,
302
    dc_secrets: dict[str, list[DataConnectorSecret]],
303
) -> None:
304
    """Request the specified data connector secrets to be created by the secret service."""
305
    if isinstance(user, AnonymousAPIUser):
×
306
        return
×
307
    owner_reference = {
×
308
        "apiVersion": manifest.apiVersion,
309
        "kind": manifest.kind,
310
        "name": manifest.metadata.name,
311
        "uid": manifest.metadata.uid,
312
    }
313
    secrets_url = nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
×
314
    headers = {"Authorization": f"bearer {user.access_token}"}
×
315
    for s_id, secrets in dc_secrets.items():
×
316
        if len(secrets) == 0:
×
317
            continue
×
318
        request_data = {
×
319
            "name": f"{manifest.metadata.name}-ds-{s_id.lower()}-secrets",
320
            "namespace": nb_config.k8s_v2_client.namespace(),
321
            "secret_ids": [str(secret.secret_id) for secret in secrets],
322
            "owner_references": [owner_reference],
323
            "key_mapping": {str(secret.secret_id): secret.name for secret in secrets},
324
        }
325
        async with httpx.AsyncClient(timeout=10) as client:
×
326
            res = await client.post(secrets_url, headers=headers, json=request_data)
×
327
            if res.status_code >= 300 or res.status_code < 200:
×
328
                raise errors.ProgrammingError(
×
329
                    message=f"The secret for data connector with {s_id} could not be "
330
                    f"successfully created, the status code was {res.status_code}."
331
                    "Please contact a Renku administrator.",
332
                    detail=res.text,
333
                )
334

335

336
def get_launcher_env_variables(launcher: SessionLauncher, body: apispec.SessionPostRequest) -> list[SessionEnvItem]:
2✔
337
    """Get the environment variables from the launcher, with overrides from the request."""
338
    output: list[SessionEnvItem] = []
×
339
    env_overrides = {i.name: i.value for i in body.env_variable_overrides or []}
×
340
    for env in launcher.env_variables or []:
×
341
        if env.name in env_overrides:
×
342
            output.append(SessionEnvItem(name=env.name, value=env_overrides[env.name]))
×
343
        else:
344
            output.append(SessionEnvItem(name=env.name, value=env.value))
×
345
    return output
×
346

347

348
def verify_launcher_env_variable_overrides(launcher: SessionLauncher, body: apispec.SessionPostRequest) -> None:
2✔
349
    """Raise an error if there are env variables that are not defined in the launcher."""
350
    env_overrides = {i.name: i.value for i in body.env_variable_overrides or []}
×
351
    known_env_names = {i.name for i in launcher.env_variables or []}
×
352
    unknown_env_names = set(env_overrides.keys()) - known_env_names
×
353
    if unknown_env_names:
×
354
        message = f"""The following environment variables are not defined in the session launcher: {unknown_env_names}.
×
355
            Please remove them from the launch request or add them to the session launcher."""
356
        raise errors.ValidationError(message=message)
×
357

358

359
async def request_session_secret_creation(
2✔
360
    user: AuthenticatedAPIUser | AnonymousAPIUser,
361
    nb_config: NotebooksConfig,
362
    manifest: AmaltheaSessionV1Alpha1,
363
    session_secrets: list[SessionSecret],
364
) -> None:
365
    """Request the specified user session secrets to be created by the secret service."""
366
    if isinstance(user, AnonymousAPIUser):
×
367
        return
×
368
    if not session_secrets:
×
369
        return
×
370
    owner_reference = {
×
371
        "apiVersion": manifest.apiVersion,
372
        "kind": manifest.kind,
373
        "name": manifest.metadata.name,
374
        "uid": manifest.metadata.uid,
375
    }
376
    key_mapping: dict[str, list[str]] = dict()
×
377
    for s in session_secrets:
×
378
        secret_id = str(s.secret_id)
×
379
        if secret_id not in key_mapping:
×
380
            key_mapping[secret_id] = list()
×
381
        key_mapping[secret_id].append(s.secret_slot.filename)
×
382
    request_data = {
×
383
        "name": f"{manifest.metadata.name}-secrets",
384
        "namespace": nb_config.k8s_v2_client.namespace(),
385
        "secret_ids": [str(s.secret_id) for s in session_secrets],
386
        "owner_references": [owner_reference],
387
        "key_mapping": key_mapping,
388
    }
389
    secrets_url = nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
×
390
    headers = {"Authorization": f"bearer {user.access_token}"}
×
391
    async with httpx.AsyncClient(timeout=10) as client:
×
392
        res = await client.post(secrets_url, headers=headers, json=request_data)
×
393
        if res.status_code >= 300 or res.status_code < 200:
×
394
            raise errors.ProgrammingError(
×
395
                message="The session secrets could not be successfully created, "
396
                f"the status code was {res.status_code}."
397
                "Please contact a Renku administrator.",
398
                detail=res.text,
399
            )
400

401

402
def resources_from_resource_class(resource_class: ResourceClass) -> Resources:
2✔
403
    """Convert the resource class to a k8s resources spec."""
404
    requests: dict[str, str | int] = {
×
405
        "cpu": str(round(resource_class.cpu * 1000)) + "m",
406
        "memory": f"{resource_class.memory}Gi",
407
    }
408
    limits: dict[str, str | int] = {"memory": f"{resource_class.memory}Gi"}
×
409
    if resource_class.gpu > 0:
×
410
        gpu_name = GpuKind.NVIDIA.value + "/gpu"
×
411
        requests[gpu_name] = resource_class.gpu
×
412
        # NOTE: GPUs have to be set in limits too since GPUs cannot be overcommited, if
413
        # not on some clusters this will cause the session to fully fail to start.
414
        limits[gpu_name] = resource_class.gpu
×
415
    return Resources(requests=requests, limits=limits if len(limits) > 0 else None)
×
416

417

418
def repositories_from_project(project: Project, git_providers: list[GitProvider]) -> list[Repository]:
2✔
419
    """Get the list of git repositories from a project."""
420
    repositories: list[Repository] = []
×
421
    for repo in project.repositories:
×
422
        found_provider_id: str | None = None
×
423
        for provider in git_providers:
×
424
            if urlparse(provider.url).netloc == urlparse(repo).netloc:
×
425
                found_provider_id = provider.id
×
426
                break
×
427
        repositories.append(Repository(url=repo, provider=found_provider_id))
×
428
    return repositories
×
429

430

431
async def repositories_from_session(
2✔
432
    user: AnonymousAPIUser | AuthenticatedAPIUser,
433
    session: AmaltheaSessionV1Alpha1,
434
    project_repo: ProjectRepository,
435
    git_providers: list[GitProvider],
436
) -> list[Repository]:
437
    """Get the list of git repositories from a session."""
438
    try:
×
439
        project = await project_repo.get_project(user, session.project_id)
×
440
    except errors.MissingResourceError:
×
441
        return []
×
442
    return repositories_from_project(project, git_providers)
×
443

444

445
def get_culling(
2✔
446
    user: AuthenticatedAPIUser | AnonymousAPIUser, resource_pool: ResourcePool, nb_config: NotebooksConfig
447
) -> Culling:
448
    """Create the culling specification for an AmaltheaSession."""
449
    idle_threshold_seconds = resource_pool.idle_threshold or nb_config.sessions.culling.registered.idle_seconds
×
450
    if user.is_anonymous:
×
451
        # NOTE: Anonymous sessions should not be hibernated at all, but there is no such option in Amalthea
452
        # So in this case we set a very low hibernation threshold so the session is deleted quickly after
453
        # it is hibernated.
454
        hibernation_threshold_seconds = 1
×
455
    else:
456
        hibernation_threshold_seconds = (
×
457
            resource_pool.hibernation_threshold or nb_config.sessions.culling.registered.hibernated_seconds
458
        )
459
    return Culling(
×
460
        maxAge=format_duration(timedelta(seconds=nb_config.sessions.culling.registered.max_age_seconds)),
461
        maxFailedDuration=format_duration(timedelta(seconds=nb_config.sessions.culling.registered.failed_seconds)),
462
        maxHibernatedDuration=format_duration(timedelta(seconds=hibernation_threshold_seconds)),
463
        maxIdleDuration=format_duration(timedelta(seconds=idle_threshold_seconds)),
464
        maxStartingDuration=format_duration(timedelta(seconds=nb_config.sessions.culling.registered.pending_seconds)),
465
    )
466

467

468
async def requires_image_pull_secret(nb_config: NotebooksConfig, image: str, internal_gitlab_user: APIUser) -> bool:
2✔
469
    """Determines if an image requires a pull secret based on its visibility and their GitLab access token."""
470

471
    parsed_image = Image.from_path(image)
×
472
    image_repo = parsed_image.repo_api()
×
473

474
    image_exists_publicly = await image_repo.image_exists(parsed_image)
×
475
    if image_exists_publicly:
×
476
        return False
×
477

478
    if parsed_image.hostname == nb_config.git.registry and internal_gitlab_user.access_token:
×
479
        image_repo = image_repo.with_oauth2_token(internal_gitlab_user.access_token)
×
480
        image_exists_privately = await image_repo.image_exists(parsed_image)
×
481
        if image_exists_privately:
×
482
            return True
×
483
    # No pull secret needed if the image is private and the user cannot access it
484
    return False
×
485

486

487
async def patch_session(
2✔
488
    body: apispec.SessionPatchRequest,
489
    session_id: str,
490
    nb_config: NotebooksConfig,
491
    user: AnonymousAPIUser | AuthenticatedAPIUser,
492
    internal_gitlab_user: APIUser,
493
    rp_repo: ResourcePoolRepository,
494
    project_repo: ProjectRepository,
495
    metrics: MetricsService,
496
) -> AmaltheaSessionV1Alpha1:
497
    """Patch an Amalthea session."""
498
    session = await nb_config.k8s_v2_client.get_session(session_id, user.id)
×
499
    if session is None:
×
500
        raise errors.MissingResourceError(message=f"The session with ID {session_id} does not exist")
×
501
    if session.spec is None:
×
502
        raise errors.ProgrammingError(
×
503
            message=f"The session {session_id} being patched is missing the expected 'spec' field.", quiet=True
504
        )
505
    cluster = await nb_config.k8s_v2_client.cluster_by_class_id(session.resource_class_id(), user)
×
506

507
    patch = AmaltheaSessionV1Alpha1Patch(spec=AmaltheaSessionV1Alpha1SpecPatch())
×
508
    is_getting_hibernated: bool = False
×
509

510
    # Hibernation
511
    # TODO: Some patching should only be done when the session is in some states to avoid inadvertent restarts
512
    # Refresh tokens for git proxy
513
    if (
×
514
        body.state is not None
515
        and body.state.value.lower() == State.Hibernated.value.lower()
516
        and body.state.value.lower() != session.status.state.value.lower()
517
    ):
518
        # Session is being hibernated
519
        patch.spec.hibernated = True
×
520
        is_getting_hibernated = True
×
521
    elif (
×
522
        body.state is not None
523
        and body.state.value.lower() == State.Running.value.lower()
524
        and session.status.state.value.lower() != body.state.value.lower()
525
    ):
526
        # Session is being resumed
527
        patch.spec.hibernated = False
×
528
        await metrics.user_requested_session_resume(user, metadata={"session_id": session_id})
×
529

530
    # Resource class
531
    if body.resource_class_id is not None:
×
532
        rp = await rp_repo.get_resource_pool_from_class(user, body.resource_class_id)
×
533
        rc = rp.get_resource_class(body.resource_class_id)
×
534
        if not rc:
×
535
            raise errors.MissingResourceError(
×
536
                message=f"The resource class you requested with ID {body.resource_class_id} does not exist"
537
            )
538
        if not patch.spec.session:
×
539
            patch.spec.session = AmaltheaSessionV1Alpha1SpecSessionPatch()
×
540
        patch.spec.session.resources = resources_from_resource_class(rc)
×
541
        # Tolerations
542
        tolerations = tolerations_from_resource_class(rc, nb_config.sessions.tolerations_model)
×
543
        patch.spec.tolerations = tolerations
×
544
        # Affinities
545
        patch.spec.affinity = node_affinity_from_resource_class(rc, nb_config.sessions.affinity_model)
×
546
        # Priority class (if a quota is being used)
547
        patch.spec.priorityClassName = rc.quota
×
548
        patch.spec.culling = get_culling(user, rp, nb_config)
×
549

550
    # If the session is being hibernated we do not need to patch anything else that is
551
    # not specifically called for in the request body, we can refresh things when the user resumes.
552
    if is_getting_hibernated:
×
553
        return await nb_config.k8s_v2_client.patch_session(session_id, user.id, patch.to_rfc7386())
×
554

555
    # Patching the extra containers (includes the git proxy)
NEW
556
    git_providers = await nb_config.git_provider_helper.get_providers(user)
×
557
    repositories = await repositories_from_session(user, session, project_repo, git_providers)
×
558
    extra_containers = await get_extra_containers(
×
559
        nb_config,
560
        session_id,
561
        user,
562
        repositories,
563
        git_providers,
564
    )
565
    if extra_containers:
×
566
        patch.spec.extraContainers = extra_containers
×
567

568
    # Patching the image pull secret
569
    if isinstance(user, AuthenticatedAPIUser) and internal_gitlab_user.access_token is not None:
×
570
        image = session.spec.session.image
×
571
        server_name = session.metadata.name
×
572
        needs_pull_secret = await requires_image_pull_secret(nb_config, image, internal_gitlab_user)
×
573
        logger.info(f"Session with ID {session_id} needs pull secret for image {image}: {needs_pull_secret}")
×
574

575
        if needs_pull_secret:
×
576
            image_pull_secret_name = f"{server_name}-image-secret"
×
577

578
            # Always create a fresh secret to ensure we have the latest token
579
            image_secret = get_gitlab_image_pull_secret(
×
580
                nb_config, user, image_pull_secret_name, internal_gitlab_user.access_token
581
            )
582

583
            if not image_secret:
×
584
                logger.error(f"Failed to create image pull secret for session ID {session_id} with image {image}")
×
585
                raise errors.ProgrammingError(
×
586
                    message=f"We cannot retrive credentials for your private image {image}. "
587
                    "In order to resolve this problem, you can try to log out and back in "
588
                    "and/or check that you still have permissions for the image repository."
589
                )
590
            # Ensure the secret is created in the cluster
591
            await nb_config.k8s_v2_client.create_secret(image_secret.secret, cluster)
×
592

593
            updated_secrets = [
×
594
                secret for secret in (session.spec.imagePullSecrets or []) if not secret.name.endswith("-image-secret")
595
            ]
596
            updated_secrets.append(ImagePullSecret(name=image_pull_secret_name, adopt=True))
×
597
            patch.spec.imagePullSecrets = updated_secrets
×
598

599
    patch_serialized = patch.to_rfc7386()
×
600
    if len(patch_serialized) == 0:
×
601
        return session
×
602

603
    return await nb_config.k8s_v2_client.patch_session(session_id, user.id, patch_serialized)
×
604

605

606
def _deduplicate_target_paths(dcs: dict[str, RCloneStorage]) -> dict[str, RCloneStorage]:
2✔
607
    """Ensures that the target paths for all storages are unique.
608

609
    This method will attempt to de-duplicate the target_path for all items passed in,
610
    and raise an error if it fails to generate unique target_path.
611
    """
612
    result_dcs: dict[str, RCloneStorage] = {}
×
613
    mount_folders: dict[str, list[str]] = {}
×
614

615
    def _find_mount_folder(dc: RCloneStorage) -> str:
×
616
        mount_folder = dc.mount_folder
×
617
        if mount_folder not in mount_folders:
×
618
            return mount_folder
×
619
        # 1. Try with a "-1", "-2", etc. suffix
620
        mount_folder_try = f"{mount_folder}-{len(mount_folders[mount_folder])}"
×
621
        if mount_folder_try not in mount_folders:
×
622
            return mount_folder_try
×
623
        # 2. Try with a random suffix
624
        suffix = "".join([random.choice(string.ascii_lowercase + string.digits) for _ in range(4)])  # nosec B311
×
625
        mount_folder_try = f"{mount_folder}-{suffix}"
×
626
        if mount_folder_try not in mount_folders:
×
627
            return mount_folder_try
×
628
        raise errors.ValidationError(
×
629
            message=f"Could not start session because two or more data connectors ({", ".join(mount_folders[mount_folder])}) share the same mount point '{mount_folder}'"  # noqa E501
630
        )
631

632
    for dc_id, dc in dcs.items():
×
633
        original_mount_folder = dc.mount_folder
×
634
        new_mount_folder = _find_mount_folder(dc)
×
635
        # Keep track of the original mount folder here
636
        if new_mount_folder != original_mount_folder:
×
637
            logger.warning(f"Re-assigning data connector {dc_id} to mount point '{new_mount_folder}'")
×
638
            dc_ids = mount_folders.get(original_mount_folder, [])
×
639
            dc_ids.append(dc_id)
×
640
            mount_folders[original_mount_folder] = dc_ids
×
641
        # Keep track of the assigned mount folder here
642
        dc_ids = mount_folders.get(new_mount_folder, [])
×
643
        dc_ids.append(dc_id)
×
644
        mount_folders[new_mount_folder] = dc_ids
×
645
        result_dcs[dc_id] = dc.with_override(
×
646
            override=apispec.SessionCloudStoragePost(storage_id=dc_id, target_path=new_mount_folder)
647
        )
648

649
    return result_dcs
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc