• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 10353957042

12 Aug 2024 02:35PM UTC coverage: 90.758% (+0.4%) from 90.398%
10353957042

Pull #338

github

web-flow
Merge 3a49eb6c2 into 8afb94949
Pull Request #338: feat!: expand environments specification

227 of 237 new or added lines in 7 files covered. (95.78%)

48 existing lines in 9 files now uncovered.

9202 of 10139 relevant lines covered (90.76%)

1.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.27
/components/renku_data_services/session/db.py
1
"""Adapters for session database classes."""
2✔
2

3
from __future__ import annotations
2✔
4

5
from collections.abc import Callable
2✔
6
from datetime import UTC, datetime
2✔
7
from pathlib import PurePosixPath
2✔
8
from typing import Any
2✔
9

10
from sqlalchemy import select
2✔
11
from sqlalchemy.ext.asyncio import AsyncSession
2✔
12
from ulid import ULID
2✔
13

14
import renku_data_services.base_models as base_models
2✔
15
from renku_data_services import errors
2✔
16
from renku_data_services.authz.authz import Authz, ResourceType
2✔
17
from renku_data_services.authz.models import Scope
2✔
18
from renku_data_services.crc.db import ResourcePoolRepository
2✔
19
from renku_data_services.session import apispec, models
2✔
20
from renku_data_services.session import orm as schemas
2✔
21

22

23
class SessionRepository:
2✔
24
    """Repository for sessions."""
2✔
25

26
    def __init__(
2✔
27
        self, session_maker: Callable[..., AsyncSession], project_authz: Authz, resource_pools: ResourcePoolRepository
28
    ) -> None:
29
        self.session_maker = session_maker
2✔
30
        self.project_authz: Authz = project_authz
2✔
31
        self.resource_pools: ResourcePoolRepository = resource_pools
2✔
32

33
    async def get_environments(self) -> list[models.Environment]:
2✔
34
        """Get all global session environments from the database."""
35
        async with self.session_maker() as session:
2✔
36
            res = await session.scalars(
2✔
37
                select(schemas.EnvironmentORM).where(
38
                    schemas.EnvironmentORM.environment_kind == models.EnvironmentKind.GLOBAL.value
39
                )
40
            )
41
            environments = res.all()
2✔
42
            return [e.dump() for e in environments]
2✔
43

44
    async def get_environment(self, environment_id: ULID) -> models.Environment:
2✔
45
        """Get one global session environment from the database."""
46
        async with self.session_maker() as session:
2✔
47
            res = await session.scalars(
2✔
48
                select(schemas.EnvironmentORM)
49
                .where(schemas.EnvironmentORM.id == str(environment_id))
50
                .where(schemas.EnvironmentORM.environment_kind == models.EnvironmentKind.GLOBAL.value)
51
            )
52
            environment = res.one_or_none()
2✔
53
            if environment is None:
2✔
54
                raise errors.MissingResourceError(
1✔
55
                    message=f"Session environment with id '{environment_id}' does not exist or you do not have access to it."  # noqa: E501
56
                )
57
            return environment.dump()
1✔
58

59
    async def __insert_environment(
2✔
60
        self,
61
        user: base_models.APIUser,
62
        session: AsyncSession,
63
        new_environment: models.UnsavedEnvironment,
64
    ) -> schemas.EnvironmentORM:
65
        if user.id is None:
2✔
NEW
66
            raise errors.UnauthorizedError(
×
67
                message="You have to be authenticated to insert an environment in the DB.", quiet=True
68
            )
69
        environment = schemas.EnvironmentORM(
2✔
70
            name=new_environment.name,
71
            created_by_id=user.id,
72
            creation_date=datetime.now(UTC),
73
            description=new_environment.description,
74
            container_image=new_environment.container_image,
75
            default_url=new_environment.default_url,
76
            port=new_environment.port,
77
            working_directory=new_environment.working_directory.as_posix(),
78
            mount_directory=new_environment.mount_directory.as_posix(),
79
            uid=new_environment.uid,
80
            gid=new_environment.gid,
81
            environment_kind=new_environment.environment_kind,
82
        )
83

84
        session.add(environment)
2✔
85
        return environment
2✔
86

87
    async def insert_environment(
2✔
88
        self, user: base_models.APIUser, new_environment: models.UnsavedEnvironment
89
    ) -> models.Environment:
90
        """Insert a new global session environment."""
91
        if user.id is None or not user.is_admin:
2✔
92
            raise errors.UnauthorizedError(
1✔
93
                message="You do not have the required permissions for this operation.", quiet=True
94
            )
95
        if new_environment.environment_kind != models.EnvironmentKind.GLOBAL:
2✔
NEW
96
            raise errors.ValidationError(message="This endpoint only supports adding global environments", quiet=True)
×
97

98
        async with self.session_maker() as session, session.begin():
2✔
99
            env = await self.__insert_environment(user, session, new_environment)
2✔
100
            return env.dump()
2✔
101

102
    async def __update_environment(
2✔
103
        self,
104
        user: base_models.APIUser,
105
        session: AsyncSession,
106
        environment_id: ULID,
107
        kind: models.EnvironmentKind,
108
        **kwargs: dict,
109
    ) -> models.Environment:
110
        res = await session.scalars(
2✔
111
            select(schemas.EnvironmentORM)
112
            .where(schemas.EnvironmentORM.id == str(environment_id))
113
            .where(schemas.EnvironmentORM.environment_kind == kind.value)
114
        )
115
        environment = res.one_or_none()
2✔
116
        if environment is None:
2✔
117
            raise errors.MissingResourceError(message=f"Session environment with id '{environment_id}' does not exist.")
1✔
118

119
        for key, value in kwargs.items():
1✔
120
            # NOTE: Only some fields can be edited
121
            if key in [
1✔
122
                "name",
123
                "description",
124
                "container_image",
125
                "default_url",
126
                "port",
127
                "working_directory",
128
                "mount_directory",
129
                "uid",
130
                "gid",
131
            ]:
132
                setattr(environment, key, value)
1✔
133

134
        return environment.dump()
1✔
135

136
    async def update_environment(
2✔
137
        self, user: base_models.APIUser, environment_id: ULID, **kwargs: dict
138
    ) -> models.Environment:
139
        """Update a global session environment entry."""
140
        if not user.is_admin:
2✔
141
            raise errors.ForbiddenError(message="You do not have the required permissions for this operation.")
1✔
142

143
        async with self.session_maker() as session, session.begin():
2✔
144
            return await self.__update_environment(
2✔
145
                user, session, environment_id, models.EnvironmentKind.GLOBAL, **kwargs
146
            )
147

148
    async def delete_environment(self, user: base_models.APIUser, environment_id: ULID) -> None:
2✔
149
        """Delete a global session environment entry."""
150
        if not user.is_admin:
2✔
151
            raise errors.ForbiddenError(message="You do not have the required permissions for this operation.")
1✔
152

153
        async with self.session_maker() as session, session.begin():
2✔
154
            res = await session.scalars(
2✔
155
                select(schemas.EnvironmentORM)
156
                .where(schemas.EnvironmentORM.id == str(environment_id))
157
                .where(schemas.EnvironmentORM.environment_kind == models.EnvironmentKind.GLOBAL.value)
158
            )
159
            environment = res.one_or_none()
2✔
160

161
            if environment is None:
2✔
162
                return
1✔
163

164
            await session.delete(environment)
1✔
165

166
    async def get_launchers(self, user: base_models.APIUser) -> list[models.SessionLauncher]:
2✔
167
        """Get all session launchers visible for a specific user from the database."""
168
        project_ids = await self.project_authz.resources_with_permission(
2✔
169
            user, user.id, ResourceType.project, scope=Scope.READ
170
        )
171

172
        async with self.session_maker() as session:
2✔
173
            res = await session.scalars(
2✔
174
                select(schemas.SessionLauncherORM)
175
                .where(schemas.SessionLauncherORM.project_id.in_(project_ids))
176
                .order_by(schemas.SessionLauncherORM.creation_date.desc())
177
            )
178
            launcher = res.all()
2✔
179
            return [item.dump() for item in launcher]
2✔
180

181
    async def get_project_launchers(self, user: base_models.APIUser, project_id: str) -> list[models.SessionLauncher]:
2✔
182
        """Get all session launchers in a project from the database."""
183
        authorized = await self.project_authz.has_permission(user, ResourceType.project, project_id, Scope.READ)
2✔
184
        if not authorized:
2✔
185
            raise errors.MissingResourceError(
1✔
186
                message=f"Project with id '{project_id}' does not exist or you do not have access to it."
187
            )
188

189
        async with self.session_maker() as session:
1✔
190
            res = await session.scalars(
1✔
191
                select(schemas.SessionLauncherORM)
192
                .where(schemas.SessionLauncherORM.project_id == project_id)
193
                .order_by(schemas.SessionLauncherORM.creation_date.desc())
194
            )
195
            launcher = res.all()
1✔
196
            return [item.dump() for item in launcher]
1✔
197

198
    async def get_launcher(self, user: base_models.APIUser, launcher_id: ULID) -> models.SessionLauncher:
2✔
199
        """Get one session launcher from the database."""
200
        async with self.session_maker() as session:
2✔
201
            res = await session.scalars(
2✔
202
                select(schemas.SessionLauncherORM).where(schemas.SessionLauncherORM.id == str(launcher_id))
203
            )
204
            launcher = res.one_or_none()
2✔
205

206
            authorized = (
2✔
207
                await self.project_authz.has_permission(user, ResourceType.project, launcher.project_id, Scope.READ)
208
                if launcher is not None
209
                else False
210
            )
211
            if not authorized or launcher is None:
2✔
212
                raise errors.MissingResourceError(
1✔
213
                    message=f"Session launcher with id '{launcher_id}' does not exist or you do not have access to it."
214
                )
215

216
            return launcher.dump()
1✔
217

218
    async def insert_launcher(
2✔
219
        self, user: base_models.APIUser, new_launcher: models.UnsavedSessionLauncher
220
    ) -> models.SessionLauncher:
221
        """Insert a new session launcher."""
222
        if not user.is_authenticated or user.id is None:
2✔
223
            raise errors.UnauthorizedError(message="You do not have the required permissions for this operation.")
×
224

225
        project_id = new_launcher.project_id
2✔
226
        authorized = await self.project_authz.has_permission(user, ResourceType.project, project_id, Scope.WRITE)
2✔
227
        if not authorized:
2✔
228
            raise errors.MissingResourceError(
1✔
229
                message=f"Project with id '{project_id}' does not exist or you do not have access to it."
230
            )
231

232
        async with self.session_maker() as session, session.begin():
1✔
233
            res = await session.scalars(select(schemas.ProjectORM).where(schemas.ProjectORM.id == project_id))
1✔
234
            project = res.one_or_none()
1✔
235
            if project is None:
1✔
236
                raise errors.MissingResourceError(
×
237
                    message=f"Project with id '{project_id}' does not exist or you do not have access to it."
238
                )
239

240
            environment_id: str
241
            environment: models.Environment
242
            environment_orm: schemas.EnvironmentORM | None
243
            if isinstance(new_launcher.environment, models.UnsavedEnvironment):
1✔
244
                environment_orm = await self.__insert_environment(user, session, new_launcher.environment)
1✔
245
                environment = environment_orm.dump()
1✔
246
                environment_id = environment.id
1✔
247
            else:
248
                environment_id = new_launcher.environment
1✔
249
                res_env = await session.scalars(
1✔
250
                    select(schemas.EnvironmentORM)
251
                    .where(schemas.EnvironmentORM.id == environment_id)
252
                    .where(schemas.EnvironmentORM.environment_kind == models.EnvironmentKind.GLOBAL.value)
253
                )
254
                environment_orm = res_env.one_or_none()
1✔
255
                if environment_orm is None:
1✔
UNCOV
256
                    raise errors.MissingResourceError(
×
257
                        message=f"Session environment with id '{environment_id}' does not exist or you do not have access to it."  # noqa: E501
258
                    )
259
                environment = environment_orm.dump()
1✔
260

261
            resource_class_id = new_launcher.resource_class_id
1✔
262
            if resource_class_id is not None:
1✔
263
                res = await session.scalars(
1✔
264
                    select(schemas.ResourceClassORM).where(schemas.ResourceClassORM.id == resource_class_id)
265
                )
266
                resource_class = res.one_or_none()
1✔
267
                if resource_class is None:
1✔
268
                    raise errors.MissingResourceError(
×
269
                        message=f"Resource class with id '{resource_class_id}' does not exist."
270
                    )
271

272
                res_classes = await self.resource_pools.get_classes(api_user=user, id=resource_class_id)
1✔
273
                resource_class_by_user = next((rc for rc in res_classes if rc.id == resource_class_id), None)
1✔
274
                if resource_class_by_user is None:
1✔
275
                    raise errors.ForbiddenError(
1✔
276
                        message=f"You do not have access to resource class with id '{resource_class_id}'."
277
                    )
278

279
            launcher = schemas.SessionLauncherORM(
1✔
280
                name=new_launcher.name,
281
                created_by_id=user.id,
282
                creation_date=datetime.now(UTC),
283
                description=new_launcher.description,
284
                project_id=new_launcher.project_id,
285
                environment_id=environment_id,
286
                resource_class_id=new_launcher.resource_class_id,
287
            )
288
            session.add(launcher)
1✔
289
            await session.flush()
1✔
290
            await session.refresh(launcher)
1✔
291
            return launcher.dump()
1✔
292

293
    async def update_launcher(
2✔
294
        self, user: base_models.APIUser, launcher_id: ULID, **kwargs: Any
295
    ) -> models.SessionLauncher:
296
        """Update a session launcher entry."""
297
        if not user.is_authenticated or user.id is None:
2✔
298
            raise errors.UnauthorizedError(message="You do not have the required permissions for this operation.")
×
299

300
        async with self.session_maker() as session, session.begin():
2✔
301
            res = await session.scalars(
2✔
302
                select(schemas.SessionLauncherORM).where(schemas.SessionLauncherORM.id == str(launcher_id))
303
            )
304
            launcher = res.one_or_none()
2✔
305
            if launcher is None:
2✔
306
                raise errors.MissingResourceError(
1✔
307
                    message=f"Session launcher with id '{launcher_id}' does not exist or you do not have access to it."  # noqa: E501
308
                )
309

310
            authorized = await self.project_authz.has_permission(
1✔
311
                user,
312
                ResourceType.project,
313
                launcher.project_id,
314
                Scope.WRITE,
315
            )
316
            if not authorized:
1✔
317
                raise errors.ForbiddenError(message="You do not have the required permissions for this operation.")
×
318

319
            resource_class_id = kwargs.get("resource_class_id")
1✔
320
            if resource_class_id is not None:
1✔
321
                res = await session.scalars(
1✔
322
                    select(schemas.ResourceClassORM).where(schemas.ResourceClassORM.id == resource_class_id)
323
                )
324
                resource_class = res.one_or_none()
1✔
325
                if resource_class is None:
1✔
326
                    raise errors.MissingResourceError(
×
327
                        message=f"Resource class with id '{resource_class_id}' does not exist."
328
                    )
329

330
                res_classes = await self.resource_pools.get_classes(api_user=user, id=resource_class_id)
1✔
331
                resource_class_by_user = next((rc for rc in res_classes if rc.id == resource_class_id), None)
1✔
332
                if resource_class_by_user is None:
1✔
333
                    raise errors.ForbiddenError(
×
334
                        message=f"You do not have access to resource class with id '{resource_class_id}'."
335
                    )
336

337
            for key, value in kwargs.items():
1✔
338
                # NOTE: Only some fields can be updated.
339
                if key in [
1✔
340
                    "name",
341
                    "description",
342
                    "resource_class_id",
343
                ]:
344
                    setattr(launcher, key, value)
1✔
345

346
            env_payload: dict = kwargs.get("environment", {})
1✔
347
            if len(env_payload.keys()) == 1 and "id" in env_payload and isinstance(env_payload["id"], str):
1✔
348
                # The environment ID is being changed or set
349
                old_environment = launcher.environment
1✔
350
                new_environment_id = env_payload["id"]
1✔
351
                res_env = await session.scalars(
1✔
352
                    select(schemas.EnvironmentORM).where(schemas.EnvironmentORM.id == new_environment_id)
353
                )
354
                new_environment = res_env.one_or_none()
1✔
355
                if new_environment is None:
1✔
NEW
356
                    raise errors.MissingResourceError(
×
357
                        message=f"Session environment with id '{new_environment_id}' does not exist or "
358
                        "you do not have access to it."
359
                    )
360
                if new_environment.environment_kind != models.EnvironmentKind.GLOBAL:
1✔
NEW
361
                    raise errors.ValidationError(
×
362
                        message="Cannot set the environment for a launcher to an existing environment if that "
363
                        "existing environment is not global",
364
                        quiet=True,
365
                    )
366
                launcher.environment_id = new_environment_id
1✔
367
                launcher.environment = new_environment
1✔
368
                if old_environment.environment_kind == models.EnvironmentKind.CUSTOM:
1✔
369
                    # A custom environment exists but it is being updated to a global one
370
                    # We remove the custom environment to avoid accumulating custom environments that are not associated
371
                    # with any launchers.
372
                    await session.delete(old_environment)
1✔
373
            else:
374
                # Fields other than the environment ID are being updated
375
                if launcher.environment.environment_kind == models.EnvironmentKind.GLOBAL:
1✔
376
                    # A global environment is being replaced with a custom one
377
                    if env_payload.get("environment_kind") == models.EnvironmentKind.GLOBAL:
1✔
NEW
378
                        raise errors.ValidationError(
×
379
                            message="When one global environment is being replaced with another in a "
380
                            "launcher only the new global environment ID should be specfied",
381
                            quiet=True,
382
                        )
383
                    env_payload["environment_kind"] = models.EnvironmentKind.CUSTOM.value
1✔
384
                    env_payload_valid = apispec.EnvironmentPostInLauncher.model_validate(env_payload)
1✔
385
                    new_unsaved_env = models.UnsavedEnvironment(
1✔
386
                        name=env_payload_valid.name,
387
                        description=env_payload_valid.description,
388
                        container_image=env_payload_valid.container_image,
389
                        default_url=env_payload_valid.default_url,
390
                        port=env_payload_valid.port,
391
                        working_directory=PurePosixPath(env_payload_valid.working_directory),
392
                        mount_directory=PurePosixPath(env_payload_valid.mount_directory),
393
                        uid=env_payload_valid.uid,
394
                        gid=env_payload_valid.gid,
395
                        environment_kind=models.EnvironmentKind(env_payload_valid.environment_kind.value),
396
                    )
397
                    new_env = await self.__insert_environment(user, session, new_unsaved_env)
1✔
398
                    launcher.environment = new_env
1✔
399
                else:
400
                    # Fields on the environment attached to the launcher are being changed.
401
                    for key, val in env_payload.items():
1✔
402
                        # NOTE: Only some fields can be updated.
403
                        if key in [
1✔
404
                            "name",
405
                            "description",
406
                            "container_image",
407
                            "default_url",
408
                            "port",
409
                            "working_directory",
410
                            "mount_directory",
411
                            "uid",
412
                            "gid",
413
                        ]:
414
                            setattr(launcher.environment, key, val)
1✔
415

416
            return launcher.dump()
1✔
417

418
    async def delete_launcher(self, user: base_models.APIUser, launcher_id: ULID) -> None:
2✔
419
        """Delete a session launcher entry."""
420
        if not user.is_authenticated or user.id is None:
2✔
421
            raise errors.UnauthorizedError(message="You do not have the required permissions for this operation.")
×
422

423
        async with self.session_maker() as session, session.begin():
2✔
424
            res = await session.scalars(
2✔
425
                select(schemas.SessionLauncherORM).where(schemas.SessionLauncherORM.id == str(launcher_id))
426
            )
427
            launcher = res.one_or_none()
2✔
428

429
            if launcher is None:
2✔
430
                return
1✔
431

432
            authorized = await self.project_authz.has_permission(
1✔
433
                user,
434
                ResourceType.project,
435
                launcher.project_id,
436
                Scope.WRITE,
437
            )
438
            if not authorized:
1✔
439
                raise errors.ForbiddenError(message="You do not have the required permissions for this operation.")
×
440

441
            await session.delete(launcher)
1✔
442
            if launcher.environment.environment_kind == models.EnvironmentKind.CUSTOM:
1✔
443
                await session.delete(launcher.environment)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc