• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 18585923528

17 Oct 2025 07:36AM UTC coverage: 83.21% (-0.2%) from 83.365%
18585923528

Pull #1068

github

web-flow
Merge 5051fe076 into 6cc42274c
Pull Request #1068: feat: update data connectors when resuming a session

47 of 118 new or added lines in 4 files covered. (39.83%)

11 existing lines in 5 files now uncovered.

21787 of 26183 relevant lines covered (83.21%)

1.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.02
/components/renku_data_services/notebooks/blueprints.py
1
"""Notebooks service API."""
2

3
from dataclasses import dataclass
2✔
4

5
from sanic import Request, empty, exceptions, json
2✔
6
from sanic.response import HTTPResponse, JSONResponse
2✔
7
from sanic_ext import validate
2✔
8

9
from renku_data_services import base_models
2✔
10
from renku_data_services.app_config import logging
2✔
11
from renku_data_services.base_api.auth import authenticate, authenticate_2
2✔
12
from renku_data_services.base_api.blueprint import BlueprintFactoryResponse, CustomBlueprint
2✔
13
from renku_data_services.base_models import AnonymousAPIUser, APIUser, AuthenticatedAPIUser, Authenticator
2✔
14
from renku_data_services.base_models.metrics import MetricsService
2✔
15
from renku_data_services.connected_services.db import ConnectedServicesRepository
2✔
16
from renku_data_services.connected_services.models import ConnectionStatus
2✔
17
from renku_data_services.crc.db import ClusterRepository, ResourcePoolRepository
2✔
18
from renku_data_services.data_connectors.db import (
2✔
19
    DataConnectorRepository,
20
    DataConnectorSecretRepository,
21
)
22
from renku_data_services.errors import errors
2✔
23
from renku_data_services.notebooks import apispec, core, image_check
2✔
24
from renku_data_services.notebooks.api.classes.image import Image
2✔
25
from renku_data_services.notebooks.api.schemas.config_server_options import ServerOptionsEndpointResponse
2✔
26
from renku_data_services.notebooks.api.schemas.logs import ServerLogs
2✔
27
from renku_data_services.notebooks.config import GitProviderHelperProto, NotebooksConfig
2✔
28
from renku_data_services.notebooks.core_sessions import (
2✔
29
    patch_session,
30
    start_session,
31
    validate_session_patch_request,
32
    validate_session_post_request,
33
)
34
from renku_data_services.notebooks.errors.intermittent import AnonymousUserPatchError
2✔
35
from renku_data_services.project.db import ProjectRepository, ProjectSessionSecretRepository
2✔
36
from renku_data_services.session.db import SessionRepository
2✔
37
from renku_data_services.storage.db import StorageRepository
2✔
38
from renku_data_services.users.db import UserRepo
2✔
39

40
logger = logging.getLogger(__name__)
2✔
41

42

43
@dataclass(kw_only=True)
2✔
44
class NotebooksBP(CustomBlueprint):
2✔
45
    """Handlers for manipulating notebooks."""
46

47
    authenticator: Authenticator
2✔
48
    nb_config: NotebooksConfig
2✔
49
    internal_gitlab_authenticator: base_models.Authenticator
2✔
50
    rp_repo: ResourcePoolRepository
2✔
51
    user_repo: UserRepo
2✔
52
    storage_repo: StorageRepository
2✔
53
    git_provider_helper: GitProviderHelperProto
2✔
54

55
    def version(self) -> BlueprintFactoryResponse:
2✔
56
        """Return notebook services version."""
57

58
        async def _version(_: Request) -> JSONResponse:
2✔
59
            return json(core.notebooks_info(self.nb_config))
1✔
60

61
        return "/notebooks/version", ["GET"], _version
2✔
62

63
    def user_servers(self) -> BlueprintFactoryResponse:
2✔
64
        """Return a JSON of running servers for the user."""
65

66
        @authenticate(self.authenticator)
2✔
67
        async def _user_servers(
2✔
68
            request: Request, user: AnonymousAPIUser | AuthenticatedAPIUser, **query_params: dict
69
        ) -> JSONResponse:
70
            filter_attrs = list(filter(lambda x: x[1] is not None, request.get_query_args()))
×
71
            filtered_servers = await core.user_servers(self.nb_config, user, filter_attrs)
×
72
            return core.serialize_v1_servers(filtered_servers, self.nb_config)
×
73

74
        return "/notebooks/servers", ["GET"], _user_servers
2✔
75

76
    def user_server(self) -> BlueprintFactoryResponse:
2✔
77
        """Returns a user server based on its ID."""
78

79
        @authenticate(self.authenticator)
2✔
80
        async def _user_server(
2✔
81
            request: Request, user: AnonymousAPIUser | AuthenticatedAPIUser, server_name: str
82
        ) -> JSONResponse:
83
            server = await core.user_server(self.nb_config, user, server_name)
×
84
            return core.serialize_v1_server(server, self.nb_config)
×
85

86
        return "/notebooks/servers/<server_name>", ["GET"], _user_server
2✔
87

88
    def launch_notebook(self) -> BlueprintFactoryResponse:
2✔
89
        """Start a renku session."""
90

91
        @authenticate_2(self.authenticator, self.internal_gitlab_authenticator)
2✔
92
        @validate(json=apispec.LaunchNotebookRequestOld)
2✔
93
        async def _launch_notebook(
2✔
94
            request: Request,
95
            user: AnonymousAPIUser | AuthenticatedAPIUser,
96
            internal_gitlab_user: APIUser,
97
            body: apispec.LaunchNotebookRequestOld,
98
        ) -> JSONResponse:
99
            server, status_code = await core.launch_notebook(
×
100
                self.nb_config,
101
                user,
102
                internal_gitlab_user,
103
                body,
104
                user_repo=self.user_repo,
105
                storage_repo=self.storage_repo,
106
                git_provider_helper=self.git_provider_helper,
107
            )
108
            return core.serialize_v1_server(server, self.nb_config, status_code)
×
109

110
        return "/notebooks/servers", ["POST"], _launch_notebook
2✔
111

112
    def patch_server(self) -> BlueprintFactoryResponse:
2✔
113
        """Patch a user server by name based on the query param."""
114

115
        @authenticate_2(self.authenticator, self.internal_gitlab_authenticator)
2✔
116
        @validate(json=apispec.PatchServerRequest)
2✔
117
        async def _patch_server(
2✔
118
            request: Request,
119
            user: AnonymousAPIUser | AuthenticatedAPIUser,
120
            internal_gitlab_user: APIUser,
121
            server_name: str,
122
            body: apispec.PatchServerRequest,
123
        ) -> JSONResponse:
124
            if isinstance(user, AnonymousAPIUser):
×
125
                raise AnonymousUserPatchError()
×
126

127
            manifest = await core.patch_server(self.nb_config, user, internal_gitlab_user, server_name, body)
×
128
            return core.serialize_v1_server(manifest, self.nb_config)
×
129

130
        return "/notebooks/servers/<server_name>", ["PATCH"], _patch_server
2✔
131

132
    def stop_server(self) -> BlueprintFactoryResponse:
2✔
133
        """Stop user server by name."""
134

135
        @authenticate(self.authenticator)
2✔
136
        async def _stop_server(
2✔
137
            _: Request, user: AnonymousAPIUser | AuthenticatedAPIUser, server_name: str
138
        ) -> HTTPResponse:
139
            try:
×
140
                await core.stop_server(self.nb_config, user, server_name)
×
141
            except errors.MissingResourceError as err:
×
142
                raise exceptions.NotFound(message=err.message) from err
×
143
            return HTTPResponse(status=204)
×
144

145
        return "/notebooks/servers/<server_name>", ["DELETE"], _stop_server
2✔
146

147
    def server_options(self) -> BlueprintFactoryResponse:
2✔
148
        """Return a set of configurable server options."""
149

150
        async def _server_options(request: Request) -> JSONResponse:
2✔
151
            return json(ServerOptionsEndpointResponse().dump(core.server_options(self.nb_config)))
1✔
152

153
        return "/notebooks/server_options", ["GET"], _server_options
2✔
154

155
    def server_logs(self) -> BlueprintFactoryResponse:
2✔
156
        """Return the logs of the running server."""
157

158
        @authenticate(self.authenticator)
2✔
159
        async def _server_logs(
2✔
160
            request: Request, user: AnonymousAPIUser | AuthenticatedAPIUser, server_name: str
161
        ) -> JSONResponse:
162
            args: dict[str, str | int] = request.get_args()
×
163
            max_lines = int(args.get("max_lines", 250))
×
164
            try:
×
165
                logs = await core.server_logs(self.nb_config, user, server_name, max_lines)
×
166
            except errors.MissingResourceError as err:
×
167
                raise exceptions.NotFound(message=err.message) from err
×
168
            return json(ServerLogs().dump(logs))
×
169

170
        return "/notebooks/logs/<server_name>", ["GET"], _server_logs
2✔
171

172
    def check_docker_image(self) -> BlueprintFactoryResponse:
2✔
173
        """Return the availability of the docker image."""
174

175
        @authenticate_2(self.authenticator, self.internal_gitlab_authenticator)
2✔
176
        @validate(query=apispec.NotebooksImagesGetParametersQuery)
2✔
177
        async def _check_docker_image(
2✔
178
            request: Request,
179
            user: AnonymousAPIUser | AuthenticatedAPIUser,
180
            internal_gitlab_user: APIUser,
181
            query: apispec.NotebooksImagesGetParametersQuery,
182
        ) -> HTTPResponse:
183
            image_url = request.get_args().get("image_url")
1✔
184
            if not isinstance(image_url, str):
1✔
185
                raise ValueError("required string of image url")
×
186

187
            status = 200 if await core.docker_image_exists(self.nb_config, image_url, internal_gitlab_user) else 404
1✔
188
            return HTTPResponse(status=status)
1✔
189

190
        return "/notebooks/images", ["GET"], _check_docker_image
2✔
191

192

193
@dataclass(kw_only=True)
2✔
194
class NotebooksNewBP(CustomBlueprint):
2✔
195
    """Handlers for manipulating notebooks for the new Amalthea operator."""
196

197
    authenticator: base_models.Authenticator
2✔
198
    internal_gitlab_authenticator: base_models.Authenticator
2✔
199
    nb_config: NotebooksConfig
2✔
200
    project_repo: ProjectRepository
2✔
201
    project_session_secret_repo: ProjectSessionSecretRepository
2✔
202
    session_repo: SessionRepository
2✔
203
    rp_repo: ResourcePoolRepository
2✔
204
    storage_repo: StorageRepository
2✔
205
    user_repo: UserRepo
2✔
206
    data_connector_repo: DataConnectorRepository
2✔
207
    data_connector_secret_repo: DataConnectorSecretRepository
2✔
208
    metrics: MetricsService
2✔
209
    cluster_repo: ClusterRepository
2✔
210
    connected_svcs_repo: ConnectedServicesRepository
2✔
211
    git_provider_helper: GitProviderHelperProto
2✔
212

213
    def start(self) -> BlueprintFactoryResponse:
2✔
214
        """Start a session with the new operator."""
215

216
        @authenticate_2(self.authenticator, self.internal_gitlab_authenticator)
2✔
217
        @validate(json=apispec.SessionPostRequest)
2✔
218
        async def _handler(
2✔
219
            request: Request,
220
            user: AuthenticatedAPIUser | AnonymousAPIUser,
221
            internal_gitlab_user: APIUser,
222
            body: apispec.SessionPostRequest,
223
        ) -> JSONResponse:
NEW
224
            launch_request = validate_session_post_request(body=body)
×
UNCOV
225
            session, created = await start_session(
×
226
                request=request,
227
                launch_request=launch_request,
228
                user=user,
229
                internal_gitlab_user=internal_gitlab_user,
230
                nb_config=self.nb_config,
231
                git_provider_helper=self.git_provider_helper,
232
                cluster_repo=self.cluster_repo,
233
                data_connector_secret_repo=self.data_connector_secret_repo,
234
                project_repo=self.project_repo,
235
                project_session_secret_repo=self.project_session_secret_repo,
236
                rp_repo=self.rp_repo,
237
                session_repo=self.session_repo,
238
                user_repo=self.user_repo,
239
                metrics=self.metrics,
240
                connected_svcs_repo=self.connected_svcs_repo,
241
            )
242
            status = 201 if created else 200
×
243
            return json(session.as_apispec().model_dump(exclude_none=True, mode="json"), status)
×
244

245
        return "/sessions", ["POST"], _handler
2✔
246

247
    def get_all(self) -> BlueprintFactoryResponse:
2✔
248
        """Get all sessions for a user."""
249

250
        @authenticate(self.authenticator)
2✔
251
        async def _handler(_: Request, user: AuthenticatedAPIUser | AnonymousAPIUser) -> HTTPResponse:
2✔
252
            sessions = await self.nb_config.k8s_v2_client.list_sessions(user.id)
×
253
            output: list[dict] = []
×
254
            for session in sessions:
×
255
                output.append(session.as_apispec().model_dump(exclude_none=True, mode="json"))
×
256
            return json(output)
×
257

258
        return "/sessions", ["GET"], _handler
2✔
259

260
    def get_one(self) -> BlueprintFactoryResponse:
2✔
261
        """Get a specific session for a user."""
262

263
        @authenticate(self.authenticator)
2✔
264
        async def _handler(_: Request, user: AuthenticatedAPIUser | AnonymousAPIUser, session_id: str) -> HTTPResponse:
2✔
265
            session = await self.nb_config.k8s_v2_client.get_session(session_id, user.id)
×
266
            if session is None:
×
267
                raise errors.ValidationError(message=f"The session with ID {session_id} does not exist.", quiet=True)
×
268
            return json(session.as_apispec().model_dump(exclude_none=True, mode="json"))
×
269

270
        return "/sessions/<session_id>", ["GET"], _handler
2✔
271

272
    def delete(self) -> BlueprintFactoryResponse:
2✔
273
        """Fully delete a session with the new operator."""
274

275
        @authenticate(self.authenticator)
2✔
276
        async def _handler(_: Request, user: AuthenticatedAPIUser | AnonymousAPIUser, session_id: str) -> HTTPResponse:
2✔
277
            await self.nb_config.k8s_v2_client.delete_session(session_id, user.id)
×
278
            await self.metrics.session_stopped(user, metadata={"session_id": session_id})
×
279
            return empty()
×
280

281
        return "/sessions/<session_id>", ["DELETE"], _handler
2✔
282

283
    def patch(self) -> BlueprintFactoryResponse:
2✔
284
        """Patch a session."""
285

286
        @authenticate_2(self.authenticator, self.internal_gitlab_authenticator)
2✔
287
        @validate(json=apispec.SessionPatchRequest)
2✔
288
        async def _handler(
2✔
289
            _: Request,
290
            user: AuthenticatedAPIUser | AnonymousAPIUser,
291
            internal_gitlab_user: APIUser,
292
            session_id: str,
293
            body: apispec.SessionPatchRequest,
294
        ) -> HTTPResponse:
NEW
295
            patch_request = validate_session_patch_request(body=body)
×
UNCOV
296
            new_session = await patch_session(
×
297
                patch_request=patch_request,
298
                session_id=session_id,
299
                user=user,
300
                internal_gitlab_user=internal_gitlab_user,
301
                nb_config=self.nb_config,
302
                git_provider_helper=self.git_provider_helper,
303
                connected_svcs_repo=self.connected_svcs_repo,
304
                data_connector_secret_repo=self.data_connector_secret_repo,
305
                project_repo=self.project_repo,
306
                project_session_secret_repo=self.project_session_secret_repo,
307
                rp_repo=self.rp_repo,
308
                session_repo=self.session_repo,
309
                user_repo=self.user_repo,
310
                metrics=self.metrics,
311
            )
UNCOV
312
            return json(new_session.as_apispec().model_dump(exclude_none=True, mode="json"))
×
313

314
        return "/sessions/<session_id>", ["PATCH"], _handler
2✔
315

316
    def logs(self) -> BlueprintFactoryResponse:
2✔
317
        """Get logs from the session."""
318

319
        @authenticate(self.authenticator)
2✔
320
        @validate(query=apispec.SessionsSessionIdLogsGetParametersQuery)
2✔
321
        async def _handler(
2✔
322
            _: Request,
323
            user: AuthenticatedAPIUser | AnonymousAPIUser,
324
            session_id: str,
325
            query: apispec.SessionsSessionIdLogsGetParametersQuery,
326
        ) -> HTTPResponse:
327
            logs = await self.nb_config.k8s_v2_client.get_session_logs(session_id, user.id, query.max_lines)
×
328
            return json(apispec.SessionLogsResponse.model_validate(logs).model_dump(exclude_none=True))
×
329

330
        return "/sessions/<session_id>/logs", ["GET"], _handler
2✔
331

332
    def check_docker_image(self) -> BlueprintFactoryResponse:
2✔
333
        """Return the availability of the docker image."""
334

335
        @authenticate_2(self.authenticator, self.internal_gitlab_authenticator)
2✔
336
        @validate(query=apispec.SessionsImagesGetParametersQuery)
2✔
337
        async def _check_docker_image(
2✔
338
            request: Request,
339
            user: AnonymousAPIUser | AuthenticatedAPIUser,
340
            internal_gitlab_user: APIUser,
341
            query: apispec.SessionsImagesGetParametersQuery,
342
        ) -> JSONResponse:
343
            image = Image.from_path(query.image_url)
×
344
            result = await image_check.check_image(
×
345
                image,
346
                user,
347
                self.connected_svcs_repo,
348
                image_check.InternalGitLabConfig(internal_gitlab_user, self.nb_config),
349
            )
350
            logger.info(f"Checked image {query.image_url}: {result}")
×
351
            conn = None
×
352
            if result.connection:
×
353
                match result.connection.status:
×
354
                    case ConnectionStatus.connected:
×
355
                        if result.error is not None:
×
356
                            status = apispec.ImageConnectionStatus.invalid_credentials
×
357
                        else:
358
                            status = apispec.ImageConnectionStatus.connected
×
359

360
                    case ConnectionStatus.pending:
×
361
                        status = apispec.ImageConnectionStatus.pending
×
362

363
                conn = apispec.ImageConnection(
×
364
                    id=str(result.connection.id), provider_id=result.connection.provider_id, status=status
365
                )
366

367
            provider: apispec.ImageProvider | None = None
×
368
            if result.client:
×
369
                provider = apispec.ImageProvider(
×
370
                    id=result.client.id, name=result.client.display_name, url=result.client.url
371
                )
372

373
            resp = apispec.ImageCheckResponse(accessible=result.accessible, connection=conn, provider=provider)
×
374

375
            return json(resp.model_dump(exclude_none=True, mode="json"))
×
376

377
        return "/sessions/images", ["GET"], _check_docker_image
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc